Coverage for configlayer/_profiles.py: 95%

198 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-08 11:36 +0000

1"""Internal config layer profiles support structure""" 

2from __future__ import annotations 

3 

4from copy import deepcopy 

5from types import MappingProxyType 

6from typing import Any, Optional, Callable, Iterable 

7 

8from .types import fields_t 

9from .utils import Locker, check_items, check_types, as_dict_stated, check_lengths, fmt_exc 

10from .exceptions import InputError, ProfilesError 

11 

12 

13class Profiles(Locker): 

14 """Profiles optional structure 

15 Used in config support structure if enabled, for config profiles operations""" 

16 __slots__ = ('_cfg', '_data', '_profiles', '_group', 'active', 'active_fields', 

17 'before_switch', 'after_switch') 

18 _groups: dict[str, list[Profiles]] = dict() # Common fixed class variable (dict methods only) 

19 _cfg: Any 

20 _data: Any 

21 _profiles: dict[str, tuple | dict] 

22 _group: str | None 

23 active: str 

24 active_fields: tuple 

25 before_switch: Optional[Callable] 

26 after_switch: Optional[Callable] 

27 

28 def __init__(self, cfg, data, group: str | None = None): 

29 self._cfg = cfg 

30 self._data = data 

31 self._profiles = {} 

32 self._group = group 

33 self.before_switch = self.after_switch = None 

34 self.active = cfg.def_sect 

35 self.active_fields = tuple(cfg.get_fields) 

36 if group is not None: 

37 if group in self._groups: 

38 curr_group = self._groups[group] 

39 if (curr_active := curr_group[0].active) != self.active: 

40 next(self._switch(curr_active, True, False)) 

41 curr_group.append(self) 

42 else: 

43 self._groups[group] = [self] 

44 

45 # Locks structure for changes with disabling attribute deletion and unlocked switch funcs 

46 super().__init__('before_switch', 'after_switch', del_attr=False, name=str(self)) 

47 

48 def __repr__(self): 

49 return f'{self._cfg!r}.profiles' 

50 

51 def __str__(self): 

52 return f'{self._cfg.name!r} {self._cfg.type_name} profiles support structure' 

53 

54 def __contains__(self, key): 

55 return self._profiles.__contains__(key) 

56 

57 def __getitem__(self, key): 

58 cfg = self._cfg 

59 if key == cfg.def_sect: 

60 return tuple(cfg.get_defaults.values()) 

61 if key == self.active: 

62 self.update() 

63 return self._profiles[key] 

64 

65 def __delitem__(self, key): 

66 if key == self.active: 

67 if len(self._profiles) > 1: 

68 indexed = tuple(self._profiles) 

69 new_key = indexed[1 if (new := indexed.index(key)) == 0 else (new - 1)] # -> or <- 

70 else: 

71 new_key = self._cfg.def_sect 

72 self.switch(new_key) 

73 del self._profiles[key] 

74 

75 def clear(self): 

76 """Delete all profiles""" 

77 if self.active != (default := self._cfg.def_sect): 

78 self.switch(default) 

79 self._profiles.clear() 

80 

81 @property 

82 def get(self) -> MappingProxyType[str, Any]: 

83 """Get profiles dict view""" 

84 self.update() 

85 return MappingProxyType(self._profiles) 

86 

87 def set(self, name: str, data: fields_t | Iterable = (), *, 

88 defaults=True, typecheck=True, typecast=False): 

89 """Set profile data by name, with optional defaults filling, type checking and casting 

90 :arg name: Target profile name 

91 :arg data: Target profile data 

92 :arg defaults: Fill missing fields by defaults and store strictly as tuple 

93 :arg typecheck: Data types check 

94 :arg typecast: Data types cast (if check failed) 

95 :raise InputError: If wrong arguments provided 

96 :raise ProfilesError: Any other error""" 

97 cfg = self._cfg 

98 fields = cfg.get_fields 

99 default_profile = name == cfg.def_sect 

100 func_name = f'{self!r}.set()' 

101 try: 

102 if data: 

103 mapping, items = as_dict_stated(data, fields) 

104 if not mapping: 

105 check_lengths(tuple(data), fields, absent=False, input_exc=(func_name, 'data')) 

106 check_items(items, fields, 'field', str, absent=not (defaults or mapping), 

107 input_exc=(func_name, 'data')) 

108 if typecheck: 

109 items = check_types(items, {k: fields[k].type for k in items}, typecast, 

110 input_exc=(func_name, 'data')) 

111 if defaults: 

112 items = cfg.get_defaults | items 

113 elif default_profile: 

114 raise InputError('data', func_name=func_name, 

115 msg='Cannot overwrite profile defaults with empty data') 

116 elif not defaults: 

117 raise InputError('data', func_name=func_name, 

118 msg='Cannot set profile with empty data and disabled defaults') 

119 else: 

120 items = cfg.get_defaults 

121 

122 if default_profile: 

123 cfg.set_defaults(items) 

124 else: 

125 result = tuple(items.values()) if len(fields) == len(items) else items 

126 # bug mypy: no Sequence type, tuple or fields_t in previous line 

127 self._profiles[name] = deepcopy(result) # type: ignore[assignment] 

128 if name == self.active: 

129 with self: 

130 self.active_fields = tuple(items) 

131 cfg.set_fields(items) 

132 

133 except InputError: 

134 raise 

135 except Exception as e: 

136 raise ProfilesError(f'Cannot set {name!r} profile to {cfg.name!r} config') from e 

137 

138 def update(self): 

139 """Copy current config values to active profile. Used in self.switch() or manually""" 

140 cfg = self._cfg 

141 if self.active == cfg.def_sect: 

142 return # Realized faster and automatically in ConfigBase.__set_field__ 

143 

144 data = cfg.get_data 

145 if len(profile := self._profiles[self.active]) != len(cfg.get_fields): 

146 self._profiles[self.active] = {k: data[k] for k in profile} 

147 else: 

148 self._profiles[self.active] = tuple(data.values()) 

149 

150 def _name_error(self, input_exc, name): 

151 return fmt_exc(input_exc, f'{name!r} profile in {self._cfg.name!r} config is not exists', 

152 available=tuple(self._profiles)) 

153 

154 def _group_call(self, func_name: str, *args, revert=True): 

155 processed, errors = [], [] 

156 # note mypy: self._group is not None if _group_call called 

157 for profile in self._groups[self._group]: # type: ignore[index] 

158 try: 

159 next(gen := getattr(profile, func_name)(*args)) 

160 processed.append((profile._cfg.name, gen)) 

161 except StopIteration: 161 ↛ 162line 161 didn't jump to line 162, because the exception caught by line 161 didn't happen

162 continue 

163 except Exception as e: 

164 errors.append(f'{profile._cfg.name}: {e!r}') 

165 if not errors: 

166 return 

167 

168 # Prepare error message 

169 msg = '\n\t'.join((f'Some profiles in group {self._group!r} failed ' + '{}:', *errors)) 

170 if not revert: 170 ↛ 171line 170 didn't jump to line 171, because the condition on line 170 was never true

171 return msg + '\nRevert disabled' 

172 

173 # Revert processed profiles 

174 errors = [] 

175 for profile_name, yielded in processed: 

176 try: 

177 next(yielded) 

178 except StopIteration: 178 ↛ 180line 178 didn't jump to line 180

179 continue 

180 except Exception as e: 

181 errors.append(f'{profile_name}: {e!r}') 

182 return msg + '\nRevert ' + ('\n\t'.join(('failed:', *errors)) if errors else 'successful') 

183 

184 def _rename_raw(self, new_name, old_name): 

185 if old_name == self.active: 185 ↛ 187line 185 didn't jump to line 187, because the condition on line 185 was never false

186 self.active = new_name 

187 self._profiles = {new_name if k == old_name else k: v for k, v in self._profiles.items()} 

188 

189 def _rename(self, new_name, old_name): 

190 if old_name not in self._profiles: 

191 raise self._name_error((f'{self!r}.rename()', 'old_name'), old_name) 

192 with self: 192 ↛ exitline 192 didn't return from function '_rename', because the return on line 199 wasn't executed

193 try: 

194 self._rename_raw(new_name, old_name) 

195 except Exception: 

196 self._rename_raw(old_name, new_name) 

197 raise 

198 yield 

199 return self._rename_raw(old_name, new_name) 

200 

201 def rename(self, new_name: str, old_name: str | None = None): 

202 """Rename active or selected profile. Profiles in groups will also be renamed 

203 :arg new_name: New profile name 

204 :arg old_name: Target, or active (if not provided) profile name 

205 :raise InputError: If wrong arguments provided 

206 :raise ProfilesError: If any error at group operations""" 

207 # Check input 

208 if old_name is None: 

209 old_name = self.active 

210 if old_name == self._cfg.def_sect: 

211 raise InputError(msg=f'Cannot rename default profile to {new_name!r}', 

212 func_name=f'{self!r}.rename()') 

213 

214 # Single config profile rename 

215 if self._group is None: 

216 return next(self._rename(new_name, old_name)) 

217 

218 # Group configs profile rename 

219 if error := self._group_call('_rename', new_name, old_name): 

220 raise ProfilesError(error.format(f'rename from {old_name!r} to {new_name!r}')) 

221 

222 def _switch_raw(self, name, cfg, data): 

223 if self.before_switch is not None: 

224 self.before_switch() 

225 

226 fields = cfg.get_fields 

227 mapping, data_dict = as_dict_stated(self[name], fields, strict=True) 

228 with self: 

229 self.active = name 

230 self.active_fields = tuple(data_dict if mapping else fields) 

231 [setattr(data, *kv) for kv in data_dict.items()] 

232 

233 if self.after_switch is not None: 

234 self.after_switch() 

235 

236 def _switch_check(self, name, add, cfg): 

237 if (absent := name not in self and name != cfg.def_sect) and not add: 

238 raise self._name_error((f'{self!r}.switch()', 'name'), name) 

239 return absent 

240 

241 def _switch(self, name, add, add_current, absent=None): 

242 cfg = self._cfg 

243 if (prev_name := self.active) == name: 

244 yield 

245 return 

246 

247 # Check input and add data from default or current profile if enabled and profile is absent 

248 if absent is None: 

249 absent = self._switch_check(name, add, cfg) 

250 if absent and add: 

251 self.set(name, self.get[prev_name] if add_current else None, typecheck=False) 

252 

253 # Save current profile values and switch with revert at fail 

254 self.update() 

255 try: 

256 self._switch_raw(name, cfg, self._data) 

257 except Exception: 

258 self._switch_raw(prev_name, cfg, self._data) 

259 raise 

260 yield 

261 return self._switch_raw(prev_name, cfg, self._data) 

262 

263 def switch(self, name: str, add=False, add_current=False): 

264 """Switch active profile. Profiles in groups will also be switched 

265 :arg name: Exists or new profile name (if :arg add: enabled) 

266 :arg add: Adds profile with provided :arg name: if it not exists 

267 :arg add_current: Replaces defaults with the active profile values, when it is added 

268 :raise InputError: If wrong arguments provided 

269 :raise ProfilesError: If any error at group operations""" 

270 # Check input 

271 if add_current and not add: 

272 raise InputError('add', 'add_current', 

273 msg="Param 'add' must be True, when 'add_current' is True") 

274 absent = self._switch_check(name, add, self._cfg) 

275 

276 # Single config profile switch 

277 if self._group is None: 

278 return next(self._switch(name, add, add_current, absent)) 

279 

280 # Group configs profile switch 

281 if error := self._group_call('_switch', name, add, add_current): 

282 raise ProfilesError(error.format(f'switch to {name!r}')) 

283 

284 @property 

285 def get_groups(self) -> MappingProxyType[str, Any]: 

286 """Get groups dict view""" 

287 return MappingProxyType(self._groups) 

288 

289 def del_group(self, name: str) -> list | None: 

290 """Delete provided group name 

291 :arg name: Deleting group name 

292 :return: List of deleted group config profiles 

293 :raise KeyError: If :arg name: is not exists""" 

294 return self._groups.pop(name)