Coverage for configlayer/_profiles.py: 95%
198 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-08 11:36 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-08 11:36 +0000
1"""Internal config layer profiles support structure"""
2from __future__ import annotations
4from copy import deepcopy
5from types import MappingProxyType
6from typing import Any, Optional, Callable, Iterable
8from .types import fields_t
9from .utils import Locker, check_items, check_types, as_dict_stated, check_lengths, fmt_exc
10from .exceptions import InputError, ProfilesError
13class Profiles(Locker):
14 """Profiles optional structure
15 Used in config support structure if enabled, for config profiles operations"""
16 __slots__ = ('_cfg', '_data', '_profiles', '_group', 'active', 'active_fields',
17 'before_switch', 'after_switch')
18 _groups: dict[str, list[Profiles]] = dict() # Common fixed class variable (dict methods only)
19 _cfg: Any
20 _data: Any
21 _profiles: dict[str, tuple | dict]
22 _group: str | None
23 active: str
24 active_fields: tuple
25 before_switch: Optional[Callable]
26 after_switch: Optional[Callable]
28 def __init__(self, cfg, data, group: str | None = None):
29 self._cfg = cfg
30 self._data = data
31 self._profiles = {}
32 self._group = group
33 self.before_switch = self.after_switch = None
34 self.active = cfg.def_sect
35 self.active_fields = tuple(cfg.get_fields)
36 if group is not None:
37 if group in self._groups:
38 curr_group = self._groups[group]
39 if (curr_active := curr_group[0].active) != self.active:
40 next(self._switch(curr_active, True, False))
41 curr_group.append(self)
42 else:
43 self._groups[group] = [self]
45 # Locks structure for changes with disabling attribute deletion and unlocked switch funcs
46 super().__init__('before_switch', 'after_switch', del_attr=False, name=str(self))
48 def __repr__(self):
49 return f'{self._cfg!r}.profiles'
51 def __str__(self):
52 return f'{self._cfg.name!r} {self._cfg.type_name} profiles support structure'
54 def __contains__(self, key):
55 return self._profiles.__contains__(key)
57 def __getitem__(self, key):
58 cfg = self._cfg
59 if key == cfg.def_sect:
60 return tuple(cfg.get_defaults.values())
61 if key == self.active:
62 self.update()
63 return self._profiles[key]
65 def __delitem__(self, key):
66 if key == self.active:
67 if len(self._profiles) > 1:
68 indexed = tuple(self._profiles)
69 new_key = indexed[1 if (new := indexed.index(key)) == 0 else (new - 1)] # -> or <-
70 else:
71 new_key = self._cfg.def_sect
72 self.switch(new_key)
73 del self._profiles[key]
75 def clear(self):
76 """Delete all profiles"""
77 if self.active != (default := self._cfg.def_sect):
78 self.switch(default)
79 self._profiles.clear()
81 @property
82 def get(self) -> MappingProxyType[str, Any]:
83 """Get profiles dict view"""
84 self.update()
85 return MappingProxyType(self._profiles)
87 def set(self, name: str, data: fields_t | Iterable = (), *,
88 defaults=True, typecheck=True, typecast=False):
89 """Set profile data by name, with optional defaults filling, type checking and casting
90 :arg name: Target profile name
91 :arg data: Target profile data
92 :arg defaults: Fill missing fields by defaults and store strictly as tuple
93 :arg typecheck: Data types check
94 :arg typecast: Data types cast (if check failed)
95 :raise InputError: If wrong arguments provided
96 :raise ProfilesError: Any other error"""
97 cfg = self._cfg
98 fields = cfg.get_fields
99 default_profile = name == cfg.def_sect
100 func_name = f'{self!r}.set()'
101 try:
102 if data:
103 mapping, items = as_dict_stated(data, fields)
104 if not mapping:
105 check_lengths(tuple(data), fields, absent=False, input_exc=(func_name, 'data'))
106 check_items(items, fields, 'field', str, absent=not (defaults or mapping),
107 input_exc=(func_name, 'data'))
108 if typecheck:
109 items = check_types(items, {k: fields[k].type for k in items}, typecast,
110 input_exc=(func_name, 'data'))
111 if defaults:
112 items = cfg.get_defaults | items
113 elif default_profile:
114 raise InputError('data', func_name=func_name,
115 msg='Cannot overwrite profile defaults with empty data')
116 elif not defaults:
117 raise InputError('data', func_name=func_name,
118 msg='Cannot set profile with empty data and disabled defaults')
119 else:
120 items = cfg.get_defaults
122 if default_profile:
123 cfg.set_defaults(items)
124 else:
125 result = tuple(items.values()) if len(fields) == len(items) else items
126 # bug mypy: no Sequence type, tuple or fields_t in previous line
127 self._profiles[name] = deepcopy(result) # type: ignore[assignment]
128 if name == self.active:
129 with self:
130 self.active_fields = tuple(items)
131 cfg.set_fields(items)
133 except InputError:
134 raise
135 except Exception as e:
136 raise ProfilesError(f'Cannot set {name!r} profile to {cfg.name!r} config') from e
138 def update(self):
139 """Copy current config values to active profile. Used in self.switch() or manually"""
140 cfg = self._cfg
141 if self.active == cfg.def_sect:
142 return # Realized faster and automatically in ConfigBase.__set_field__
144 data = cfg.get_data
145 if len(profile := self._profiles[self.active]) != len(cfg.get_fields):
146 self._profiles[self.active] = {k: data[k] for k in profile}
147 else:
148 self._profiles[self.active] = tuple(data.values())
150 def _name_error(self, input_exc, name):
151 return fmt_exc(input_exc, f'{name!r} profile in {self._cfg.name!r} config is not exists',
152 available=tuple(self._profiles))
154 def _group_call(self, func_name: str, *args, revert=True):
155 processed, errors = [], []
156 # note mypy: self._group is not None if _group_call called
157 for profile in self._groups[self._group]: # type: ignore[index]
158 try:
159 next(gen := getattr(profile, func_name)(*args))
160 processed.append((profile._cfg.name, gen))
161 except StopIteration: 161 ↛ 162line 161 didn't jump to line 162, because the exception caught by line 161 didn't happen
162 continue
163 except Exception as e:
164 errors.append(f'{profile._cfg.name}: {e!r}')
165 if not errors:
166 return
168 # Prepare error message
169 msg = '\n\t'.join((f'Some profiles in group {self._group!r} failed ' + '{}:', *errors))
170 if not revert: 170 ↛ 171line 170 didn't jump to line 171, because the condition on line 170 was never true
171 return msg + '\nRevert disabled'
173 # Revert processed profiles
174 errors = []
175 for profile_name, yielded in processed:
176 try:
177 next(yielded)
178 except StopIteration: 178 ↛ 180line 178 didn't jump to line 180
179 continue
180 except Exception as e:
181 errors.append(f'{profile_name}: {e!r}')
182 return msg + '\nRevert ' + ('\n\t'.join(('failed:', *errors)) if errors else 'successful')
184 def _rename_raw(self, new_name, old_name):
185 if old_name == self.active: 185 ↛ 187line 185 didn't jump to line 187, because the condition on line 185 was never false
186 self.active = new_name
187 self._profiles = {new_name if k == old_name else k: v for k, v in self._profiles.items()}
189 def _rename(self, new_name, old_name):
190 if old_name not in self._profiles:
191 raise self._name_error((f'{self!r}.rename()', 'old_name'), old_name)
192 with self: 192 ↛ exitline 192 didn't return from function '_rename', because the return on line 199 wasn't executed
193 try:
194 self._rename_raw(new_name, old_name)
195 except Exception:
196 self._rename_raw(old_name, new_name)
197 raise
198 yield
199 return self._rename_raw(old_name, new_name)
201 def rename(self, new_name: str, old_name: str | None = None):
202 """Rename active or selected profile. Profiles in groups will also be renamed
203 :arg new_name: New profile name
204 :arg old_name: Target, or active (if not provided) profile name
205 :raise InputError: If wrong arguments provided
206 :raise ProfilesError: If any error at group operations"""
207 # Check input
208 if old_name is None:
209 old_name = self.active
210 if old_name == self._cfg.def_sect:
211 raise InputError(msg=f'Cannot rename default profile to {new_name!r}',
212 func_name=f'{self!r}.rename()')
214 # Single config profile rename
215 if self._group is None:
216 return next(self._rename(new_name, old_name))
218 # Group configs profile rename
219 if error := self._group_call('_rename', new_name, old_name):
220 raise ProfilesError(error.format(f'rename from {old_name!r} to {new_name!r}'))
222 def _switch_raw(self, name, cfg, data):
223 if self.before_switch is not None:
224 self.before_switch()
226 fields = cfg.get_fields
227 mapping, data_dict = as_dict_stated(self[name], fields, strict=True)
228 with self:
229 self.active = name
230 self.active_fields = tuple(data_dict if mapping else fields)
231 [setattr(data, *kv) for kv in data_dict.items()]
233 if self.after_switch is not None:
234 self.after_switch()
236 def _switch_check(self, name, add, cfg):
237 if (absent := name not in self and name != cfg.def_sect) and not add:
238 raise self._name_error((f'{self!r}.switch()', 'name'), name)
239 return absent
241 def _switch(self, name, add, add_current, absent=None):
242 cfg = self._cfg
243 if (prev_name := self.active) == name:
244 yield
245 return
247 # Check input and add data from default or current profile if enabled and profile is absent
248 if absent is None:
249 absent = self._switch_check(name, add, cfg)
250 if absent and add:
251 self.set(name, self.get[prev_name] if add_current else None, typecheck=False)
253 # Save current profile values and switch with revert at fail
254 self.update()
255 try:
256 self._switch_raw(name, cfg, self._data)
257 except Exception:
258 self._switch_raw(prev_name, cfg, self._data)
259 raise
260 yield
261 return self._switch_raw(prev_name, cfg, self._data)
263 def switch(self, name: str, add=False, add_current=False):
264 """Switch active profile. Profiles in groups will also be switched
265 :arg name: Exists or new profile name (if :arg add: enabled)
266 :arg add: Adds profile with provided :arg name: if it not exists
267 :arg add_current: Replaces defaults with the active profile values, when it is added
268 :raise InputError: If wrong arguments provided
269 :raise ProfilesError: If any error at group operations"""
270 # Check input
271 if add_current and not add:
272 raise InputError('add', 'add_current',
273 msg="Param 'add' must be True, when 'add_current' is True")
274 absent = self._switch_check(name, add, self._cfg)
276 # Single config profile switch
277 if self._group is None:
278 return next(self._switch(name, add, add_current, absent))
280 # Group configs profile switch
281 if error := self._group_call('_switch', name, add, add_current):
282 raise ProfilesError(error.format(f'switch to {name!r}'))
284 @property
285 def get_groups(self) -> MappingProxyType[str, Any]:
286 """Get groups dict view"""
287 return MappingProxyType(self._groups)
289 def del_group(self, name: str) -> list | None:
290 """Delete provided group name
291 :arg name: Deleting group name
292 :return: List of deleted group config profiles
293 :raise KeyError: If :arg name: is not exists"""
294 return self._groups.pop(name)