Coverage for configlayer/_io.py: 96%
228 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-08 11:36 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-08 11:36 +0000
1"""Internal config layer IO support structure"""
2from ast import literal_eval
3from typing import Any, Mapping, Callable
4from pathlib import Path
6from .exceptions import CheckValueError, InputError, FieldError, IOExportError, IOImportError
7from .types import mb_holder_t, fields_t, Field
8from .utils import (Locker, GetName, as_holder, check_input, check_extra, check_items, check_type,
9 with_type, fmt_exc, as_dict)
12_UNIQUE = object()
15# Error templates
16_TEMPL_FIELD_DESC = "Field {}={} by {}: {!r}"
17_TEMPL_CONFIG = "Cannot {} {!r} config"
19# Exceptions holder
20_EXC_LIST = {'import': IOImportError, 'export': IOExportError}
22# Hooks
23_EXPORT_HOOKS: dict[type, Callable] = {Path: str}
24_IMPORT_HOOKS: dict[type, Callable] = {Path: Path}
27class IO(Locker):
28 """IO optional structure
29 Used in config support structure if enabled, for any IO operations"""
30 __slots__ = ('_cfg', '_data')
31 _key_section = '_CONFIG_LAYER' # Class constant
32 _key_version = 'version' # Class constant
33 _key_profile = 'profile' # Class constant
34 _key_fields = 'fields' # Class constant
36 def __init__(self, cfg, data, fields):
37 self._cfg = cfg
38 self._data = data
40 # Config IO check (rewrite to export/import section with all fields)
41 errors = []
42 for name, field in fields.items():
43 exported = self.export_field(name, field.default)
44 imported = self.import_field(name, exported)
45 if field.default != imported:
46 export_func = GetName(field.export_func)
47 import_func = GetName(field.import_func)
48 errors.append(f'Field {name}={with_type(field.default)} must be equal '
49 f'imported={with_type(imported)}: '
50 f'{export_func = }, {exported = }, {import_func = }')
51 if errors:
52 raise CheckValueError('\n\t'.join((f'{cfg.name!r} config IO check failed:', *errors)))
54 # Locks structure for changes with disabling attribute deletion
55 super().__init__(del_attr=False, name=str(self))
57 def __repr__(self):
58 return f'{self._cfg!r}.io'
60 def __str__(self):
61 return f'{self._cfg.name!r} {self._cfg.type_name} I/O support structure'
63 def _exc(self, op, exc, section=_UNIQUE):
64 section = '' if section == _UNIQUE else f' section {section!r}' if section else ' section'
65 return _EXC_LIST[op](f"{_TEMPL_CONFIG.format(op, self._cfg.name)}{section}. {exc}")
67 @staticmethod
68 def _export_field(field: Field, value, typecast: bool) -> str:
69 hook = _EXPORT_HOOKS.get(field.type, field.export_func)
70 return check_type(hook(value), str, typecast, 'field', False)
72 def export_field(self, name: str, value: Any = None, typecast=True) -> str:
73 """Export single field to raw str type
74 :arg name: Field name
75 :arg value: Field value (if override needed)
76 :arg typecast: Force str type if field export_func result is not str
77 :return: Field raw value
78 :raise FieldError: Any error"""
79 field = None
80 try:
81 field = self._cfg.get_fields[name]
82 value = getattr(self._data, name) if value is None else value
83 return self._export_field(field, value, typecast)
84 except Exception as e:
85 kw = {} if field is None else {'by_func': GetName(field.export_func, code=True)}
86 kw |= {} if value is None else {'from_value': value}
87 raise FieldError('Export', self._cfg.name, name, **kw, type_name=self._cfg.type_name,
88 reason=repr(e)) from e
90 def export_section(self, section: str | fields_t | None = None, strict=False, typecast=True
91 ) -> fields_t[str]:
92 """Export single section to dict with raw str type values
93 :arg section: Section name, fields dict, or active section (if not provided)
94 :arg strict: Export all fields (not skip equal to default)
95 :arg typecast: Force str type if field export_func result is not str
96 :return: Fields raw values
97 :raise InputError: If wrong arguments provided
98 :raise IOExportError: Any other error"""
99 cfg = self._cfg
100 profiles = cfg.profiles
101 fields = cfg.get_fields
102 active_fields = tuple(fields)
103 name, section = (section, None) if isinstance(section, str) else (None, section)
104 ie = (f'{self!r}.export_section()', 'section')
106 try:
107 # Provided section select
108 if section is not None:
109 check_type(section, Mapping, input_exc=ie)
110 check_extra(section, fields, 'field', input_exc=ie)
111 items, defaults = section, cfg.get_defaults
113 # Internal section direct export
114 elif name == self._key_section:
115 support = {}
116 if cfg.version: 116 ↛ 117line 116 didn't jump to line 117, because the condition on line 116 was never true
117 support[self._key_version] = repr(cfg.version)
118 if profiles:
119 support[self._key_profile] = repr(profiles.active)
120 if fields := {k: tuple(v) for k, v in profiles.get.items()
121 if isinstance(v, dict)}:
122 support[self._key_fields] = repr(fields)
123 return support
125 # Default section select
126 elif (name == cfg.def_sect or
127 name is None and profiles and profiles.active == cfg.def_sect):
128 name, items, defaults = cfg.def_sect, cfg.get_defaults, cfg.get_factory_defaults
130 # Profile section select
131 elif profiles:
132 if name is None:
133 name = profiles.active
134 elif name not in profiles:
135 p = ", ".join(map(repr, profiles.get))
136 details = f'available profiles: {p}' if p else 'there is no profiles'
137 raise fmt_exc(ie, f'Profile is not exists, {details}')
138 items, defaults = as_dict(profiles[name], fields), cfg.get_defaults
139 active_fields = tuple(items)
141 # Config section select
142 else:
143 if name is None or name == cfg.name:
144 name, items, defaults = cfg.name, cfg.get_data, cfg.get_defaults
145 else:
146 raise fmt_exc(ie, must_be=repr(cfg.name), received=repr(name))
148 # Export section
149 _export = self._export_field
150 result, errors = {}, []
151 for key, field in fields.items():
152 default = defaults[key]
153 value = items.get(key, default)
154 if key in active_fields and strict or value != default:
155 try:
156 result[key] = _export(field, value, typecast)
157 except Exception as e:
158 fn = GetName(field.export_func, code=True)
159 errors.append(_TEMPL_FIELD_DESC.format(key, with_type(value), fn, e))
160 if errors:
161 raise CheckValueError('\n\t'.join(('Errors:', *errors)))
162 return result
164 except InputError:
165 raise
166 except CheckValueError as e: 166 ↛ 168line 166 didn't jump to line 168
167 raise self._exc('export', str(e), name)
168 except Exception as e:
169 raise self._exc('export', repr(e), name) from e # not tested extreme case exception
171 def export_config(self, sections: mb_holder_t[str] | None = None, *, strict_defaults=False,
172 strict_data=False, typecast=True) -> dict[str, fields_t[str]]:
173 """Export whole config or specified profile(s) (if profiles enabled).
174 Also, by defaults, export only changed by user default and data fields
175 :arg sections: Selected section name(s) or all (if not provided)
176 :arg strict_defaults: Export all fields from default section (not skip equal to factory)
177 :arg strict_data: Export all fields from data sections (not skip equal to default)
178 :arg typecast: Force str type if field export_func result is not str
179 :return: Sections with fields raw values
180 :raise InputError: If wrong arguments provided
181 :raise IOExportError: Any other error"""
182 cfg = self._cfg
183 ie = (f'{self!r}.export_config()', 'profiles')
184 try:
185 result = {}
186 if sections is not None and cfg.profiles is None:
187 raise fmt_exc(ie, f'Profiles disabled, but provided: {sections!r}')
189 # Export config support fields
190 if support := self.export_section(self._key_section):
191 result[self._key_section] = support
193 # Export config defaults
194 cds = cfg.def_sect
195 result[cds] = self.export_section(cds, strict=strict_defaults, typecast=typecast)
197 # Export config data
198 if cfg.profiles:
199 exists = cfg.profiles.get
200 # bug mypy: profiles cannot be None here
201 if (selected := as_holder(sections, exists)) != exists: # type: ignore[arg-type]
202 check_extra(selected, exists, 'profile', input_exc=ie)
203 # bug mypy: k cannot be None here
204 result |= {k: self.export_section(k, strict=strict_data, typecast=typecast) # type: ignore[misc]
205 for k in selected}
206 else:
207 result[cfg.name] = self.export_section(strict=strict_data, typecast=typecast)
208 return result
210 except (InputError, IOExportError): 210 ↛ 212line 210 didn't jump to line 212
211 raise
212 except Exception as e:
213 raise self._exc('export', repr(e)) from e # not tested extreme case exception
215 @staticmethod
216 def _import_field(field: Field, raw_value: str, typecast: bool) -> Any:
217 hook = _IMPORT_HOOKS.get(field.type, field.import_func)
218 return check_type(hook(raw_value), field.type, typecast, 'field', False)
220 def import_field(self, name: str, raw_value: str, typecast=True) -> Any:
221 """Import single field to field type
222 :arg name: Field name
223 :arg raw_value: Field raw value
224 :arg typecast: Force field type if field import_func result has any other type
225 :return: Field value
226 :raise FieldError: Any error"""
227 try:
228 return self._import_field(self._cfg.get_fields[name], raw_value, typecast)
229 except Exception as e:
230 field = self._cfg.get_fields.get(name, None)
231 kwargs = {} if field is None else {'by_func': GetName(field.import_func, code=True)}
232 raise FieldError('Import', self._cfg.name, name, from_value=raw_value, **kwargs,
233 type_name=self._cfg.type_name, reason=repr(e)) from e
235 def import_section(self, raw_section: fields_t[str], name: str | None = None, typecast=True
236 ) -> fields_t:
237 """Import single section to dict with fields values
238 note: Import section directly into the config is not available and may not be!
239 :arg raw_section: Fields raw values
240 :arg name: Section name (only for error message)
241 :arg typecast: Force field type if field import_func result has any other type
242 :return: Fields values
243 :raise InputError: If wrong arguments provided
244 :raise IOImportError: Any other error"""
245 cfg = self._cfg
246 fields = cfg.get_fields
247 ie = (f'{self!r}.import_section()', 'raw_section')
248 check_type(raw_section, Mapping, input_exc=ie)
249 check_extra(raw_section, fields, 'field', input_exc=ie)
250 try:
251 result, errors = {}, []
252 _import = self._import_field
253 for key, raw_value in raw_section.items():
254 field = fields[key]
255 try:
256 result[key] = _import(field, raw_value, typecast)
257 except Exception as e:
258 fn = GetName(field.import_func, code=True)
259 errors.append(_TEMPL_FIELD_DESC.format(key, with_type(raw_value), fn, e))
260 if errors:
261 raise CheckValueError("\n\t".join(('Errors:', *errors)))
262 return result
264 except CheckValueError as e: 264 ↛ 266line 264 didn't jump to line 266
265 raise self._exc('import', str(e), name)
266 except Exception as e:
267 raise self._exc('import', repr(e), name) from e # not tested extreme case exception
269 def import_config(self, raw_config: Mapping[str, fields_t[str]],
270 sections: mb_holder_t[str] | None = None, typecast=True):
271 """Import whole config, or specified section(s) from it
272 :arg raw_config: Sections with fields raw values
273 :arg sections: Selected section name(s) to import or all (if not provided)
274 :arg typecast: Force field type if field import_func result has any other type
275 :return: Sections with fields values
276 :raise InputError: If wrong arguments provided
277 :raise IOImportError: Any other error"""
278 cfg = self._cfg
279 def_sect = cfg.def_sect
280 profiles = cfg.profiles
281 fields = tuple(cfg.get_fields)
282 ie_func = f'{self!r}.import_config()'
283 ie_cfg = (ie_func, 'raw_config')
284 ie_sect = (ie_func, 'sections')
285 try:
286 raw_config = dict(raw_config)
288 # Import config support structure fields
289 active = None
290 active_fields = {}
291 support = raw_config.pop(self._key_section, None)
292 check_input(support, cfg.version or profiles, f'{self._key_section!r} section',
293 input_exc=ie_cfg)
294 if support is not None:
295 version = support.get(self._key_version)
296 if version: 296 ↛ 297line 296 didn't jump to line 297, because the condition on line 296 was never true
297 version = str(literal_eval(version))
298 if check_input(version, cfg.version, 'version', input_exc=ie_cfg):
299 raise NotImplementedError('Version import is not available yet')
301 active = support.get(self._key_profile)
302 if active:
303 active = str(literal_eval(active))
304 if check_input(active, profiles, 'profile', input_exc=ie_cfg): 304 ↛ 321line 304 didn't jump to line 321, because the condition on line 304 was never false
305 if active not in raw_config and active != def_sect:
306 raise fmt_exc(ie_cfg, f'Active profile is not provided: {active!r}')
308 if self._key_fields in support:
309 active_fields_raw = support[self._key_fields]
310 try:
311 active_fields = dict(literal_eval(active_fields_raw))
312 except Exception as e:
313 raise fmt_exc(ie_cfg, 'Active fields dict is not parsed: '
314 f'{active_fields_raw!r}') from e
315 check_extra(active_fields, raw_config, 'active fields profile',
316 input_exc=ie_cfg)
317 [check_extra(v, fields, f'{k!r} profile active field', input_exc=ie_cfg)
318 for k, v in active_fields.items()]
320 # Check sections
321 if sections:
322 sections = as_holder(sections)
323 if profiles:
324 selected = tuple(raw_config)
325 else:
326 selected = ((def_sect,) if def_sect in raw_config else ()) + (cfg.name,)
327 check_extra(sections, selected, 'section', input_exc=ie_sect)
328 raw_config = {k: raw_config[k] for k in sections}
330 # Import defaults
331 if defaults := raw_config.pop(def_sect, {}):
332 check_extra(defaults, fields, 'default field', input_exc=ie_cfg)
333 defaults = self.import_section(defaults, def_sect, typecast)
334 defaults = cfg.get_factory_defaults | defaults
336 # Import data
337 profiles_data = {}
338 if profiles:
339 for k, v in raw_config.items():
340 p_data = self.import_section(v, k, typecast)
341 if af := active_fields.get(k):
342 check_items(p_data, af, f'{k!r} profile field', input_exc=ie_cfg)
343 profiles_data[k] = {k: v for k, v in p_data.items() if k in af}
344 else:
345 profiles_data[k] = defaults | p_data
346 else:
347 data = defaults | self.import_section(raw_config[cfg.name], cfg.name, typecast)
349 # Apply successfully imported data
350 cfg.set_defaults(defaults, typecheck=False)
351 if profiles:
352 profiles.clear()
353 [profiles.set(name, profile, defaults=False, typecheck=False)
354 for name, profile in profiles_data.items()]
355 profiles.switch(active)
356 else:
357 cfg._set_fields(data) # noqa
359 except (InputError, IOImportError):
360 raise
361 except Exception as e:
362 raise self._exc('import', repr(e)) from e