Coverage for configlayer/_io.py: 96%

228 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-08 11:36 +0000

1"""Internal config layer IO support structure""" 

2from ast import literal_eval 

3from typing import Any, Mapping, Callable 

4from pathlib import Path 

5 

6from .exceptions import CheckValueError, InputError, FieldError, IOExportError, IOImportError 

7from .types import mb_holder_t, fields_t, Field 

8from .utils import (Locker, GetName, as_holder, check_input, check_extra, check_items, check_type, 

9 with_type, fmt_exc, as_dict) 

10 

11 

12_UNIQUE = object() 

13 

14 

15# Error templates 

16_TEMPL_FIELD_DESC = "Field {}={} by {}: {!r}" 

17_TEMPL_CONFIG = "Cannot {} {!r} config" 

18 

19# Exceptions holder 

20_EXC_LIST = {'import': IOImportError, 'export': IOExportError} 

21 

22# Hooks 

23_EXPORT_HOOKS: dict[type, Callable] = {Path: str} 

24_IMPORT_HOOKS: dict[type, Callable] = {Path: Path} 

25 

26 

27class IO(Locker): 

28 """IO optional structure 

29 Used in config support structure if enabled, for any IO operations""" 

30 __slots__ = ('_cfg', '_data') 

31 _key_section = '_CONFIG_LAYER' # Class constant 

32 _key_version = 'version' # Class constant 

33 _key_profile = 'profile' # Class constant 

34 _key_fields = 'fields' # Class constant 

35 

36 def __init__(self, cfg, data, fields): 

37 self._cfg = cfg 

38 self._data = data 

39 

40 # Config IO check (rewrite to export/import section with all fields) 

41 errors = [] 

42 for name, field in fields.items(): 

43 exported = self.export_field(name, field.default) 

44 imported = self.import_field(name, exported) 

45 if field.default != imported: 

46 export_func = GetName(field.export_func) 

47 import_func = GetName(field.import_func) 

48 errors.append(f'Field {name}={with_type(field.default)} must be equal ' 

49 f'imported={with_type(imported)}: ' 

50 f'{export_func = }, {exported = }, {import_func = }') 

51 if errors: 

52 raise CheckValueError('\n\t'.join((f'{cfg.name!r} config IO check failed:', *errors))) 

53 

54 # Locks structure for changes with disabling attribute deletion 

55 super().__init__(del_attr=False, name=str(self)) 

56 

57 def __repr__(self): 

58 return f'{self._cfg!r}.io' 

59 

60 def __str__(self): 

61 return f'{self._cfg.name!r} {self._cfg.type_name} I/O support structure' 

62 

63 def _exc(self, op, exc, section=_UNIQUE): 

64 section = '' if section == _UNIQUE else f' section {section!r}' if section else ' section' 

65 return _EXC_LIST[op](f"{_TEMPL_CONFIG.format(op, self._cfg.name)}{section}. {exc}") 

66 

67 @staticmethod 

68 def _export_field(field: Field, value, typecast: bool) -> str: 

69 hook = _EXPORT_HOOKS.get(field.type, field.export_func) 

70 return check_type(hook(value), str, typecast, 'field', False) 

71 

72 def export_field(self, name: str, value: Any = None, typecast=True) -> str: 

73 """Export single field to raw str type 

74 :arg name: Field name 

75 :arg value: Field value (if override needed) 

76 :arg typecast: Force str type if field export_func result is not str 

77 :return: Field raw value 

78 :raise FieldError: Any error""" 

79 field = None 

80 try: 

81 field = self._cfg.get_fields[name] 

82 value = getattr(self._data, name) if value is None else value 

83 return self._export_field(field, value, typecast) 

84 except Exception as e: 

85 kw = {} if field is None else {'by_func': GetName(field.export_func, code=True)} 

86 kw |= {} if value is None else {'from_value': value} 

87 raise FieldError('Export', self._cfg.name, name, **kw, type_name=self._cfg.type_name, 

88 reason=repr(e)) from e 

89 

90 def export_section(self, section: str | fields_t | None = None, strict=False, typecast=True 

91 ) -> fields_t[str]: 

92 """Export single section to dict with raw str type values 

93 :arg section: Section name, fields dict, or active section (if not provided) 

94 :arg strict: Export all fields (not skip equal to default) 

95 :arg typecast: Force str type if field export_func result is not str 

96 :return: Fields raw values 

97 :raise InputError: If wrong arguments provided 

98 :raise IOExportError: Any other error""" 

99 cfg = self._cfg 

100 profiles = cfg.profiles 

101 fields = cfg.get_fields 

102 active_fields = tuple(fields) 

103 name, section = (section, None) if isinstance(section, str) else (None, section) 

104 ie = (f'{self!r}.export_section()', 'section') 

105 

106 try: 

107 # Provided section select 

108 if section is not None: 

109 check_type(section, Mapping, input_exc=ie) 

110 check_extra(section, fields, 'field', input_exc=ie) 

111 items, defaults = section, cfg.get_defaults 

112 

113 # Internal section direct export 

114 elif name == self._key_section: 

115 support = {} 

116 if cfg.version: 116 ↛ 117line 116 didn't jump to line 117, because the condition on line 116 was never true

117 support[self._key_version] = repr(cfg.version) 

118 if profiles: 

119 support[self._key_profile] = repr(profiles.active) 

120 if fields := {k: tuple(v) for k, v in profiles.get.items() 

121 if isinstance(v, dict)}: 

122 support[self._key_fields] = repr(fields) 

123 return support 

124 

125 # Default section select 

126 elif (name == cfg.def_sect or 

127 name is None and profiles and profiles.active == cfg.def_sect): 

128 name, items, defaults = cfg.def_sect, cfg.get_defaults, cfg.get_factory_defaults 

129 

130 # Profile section select 

131 elif profiles: 

132 if name is None: 

133 name = profiles.active 

134 elif name not in profiles: 

135 p = ", ".join(map(repr, profiles.get)) 

136 details = f'available profiles: {p}' if p else 'there is no profiles' 

137 raise fmt_exc(ie, f'Profile is not exists, {details}') 

138 items, defaults = as_dict(profiles[name], fields), cfg.get_defaults 

139 active_fields = tuple(items) 

140 

141 # Config section select 

142 else: 

143 if name is None or name == cfg.name: 

144 name, items, defaults = cfg.name, cfg.get_data, cfg.get_defaults 

145 else: 

146 raise fmt_exc(ie, must_be=repr(cfg.name), received=repr(name)) 

147 

148 # Export section 

149 _export = self._export_field 

150 result, errors = {}, [] 

151 for key, field in fields.items(): 

152 default = defaults[key] 

153 value = items.get(key, default) 

154 if key in active_fields and strict or value != default: 

155 try: 

156 result[key] = _export(field, value, typecast) 

157 except Exception as e: 

158 fn = GetName(field.export_func, code=True) 

159 errors.append(_TEMPL_FIELD_DESC.format(key, with_type(value), fn, e)) 

160 if errors: 

161 raise CheckValueError('\n\t'.join(('Errors:', *errors))) 

162 return result 

163 

164 except InputError: 

165 raise 

166 except CheckValueError as e: 166 ↛ 168line 166 didn't jump to line 168

167 raise self._exc('export', str(e), name) 

168 except Exception as e: 

169 raise self._exc('export', repr(e), name) from e # not tested extreme case exception 

170 

171 def export_config(self, sections: mb_holder_t[str] | None = None, *, strict_defaults=False, 

172 strict_data=False, typecast=True) -> dict[str, fields_t[str]]: 

173 """Export whole config or specified profile(s) (if profiles enabled). 

174 Also, by defaults, export only changed by user default and data fields 

175 :arg sections: Selected section name(s) or all (if not provided) 

176 :arg strict_defaults: Export all fields from default section (not skip equal to factory) 

177 :arg strict_data: Export all fields from data sections (not skip equal to default) 

178 :arg typecast: Force str type if field export_func result is not str 

179 :return: Sections with fields raw values 

180 :raise InputError: If wrong arguments provided 

181 :raise IOExportError: Any other error""" 

182 cfg = self._cfg 

183 ie = (f'{self!r}.export_config()', 'profiles') 

184 try: 

185 result = {} 

186 if sections is not None and cfg.profiles is None: 

187 raise fmt_exc(ie, f'Profiles disabled, but provided: {sections!r}') 

188 

189 # Export config support fields 

190 if support := self.export_section(self._key_section): 

191 result[self._key_section] = support 

192 

193 # Export config defaults 

194 cds = cfg.def_sect 

195 result[cds] = self.export_section(cds, strict=strict_defaults, typecast=typecast) 

196 

197 # Export config data 

198 if cfg.profiles: 

199 exists = cfg.profiles.get 

200 # bug mypy: profiles cannot be None here 

201 if (selected := as_holder(sections, exists)) != exists: # type: ignore[arg-type] 

202 check_extra(selected, exists, 'profile', input_exc=ie) 

203 # bug mypy: k cannot be None here 

204 result |= {k: self.export_section(k, strict=strict_data, typecast=typecast) # type: ignore[misc] 

205 for k in selected} 

206 else: 

207 result[cfg.name] = self.export_section(strict=strict_data, typecast=typecast) 

208 return result 

209 

210 except (InputError, IOExportError): 210 ↛ 212line 210 didn't jump to line 212

211 raise 

212 except Exception as e: 

213 raise self._exc('export', repr(e)) from e # not tested extreme case exception 

214 

215 @staticmethod 

216 def _import_field(field: Field, raw_value: str, typecast: bool) -> Any: 

217 hook = _IMPORT_HOOKS.get(field.type, field.import_func) 

218 return check_type(hook(raw_value), field.type, typecast, 'field', False) 

219 

220 def import_field(self, name: str, raw_value: str, typecast=True) -> Any: 

221 """Import single field to field type 

222 :arg name: Field name 

223 :arg raw_value: Field raw value 

224 :arg typecast: Force field type if field import_func result has any other type 

225 :return: Field value 

226 :raise FieldError: Any error""" 

227 try: 

228 return self._import_field(self._cfg.get_fields[name], raw_value, typecast) 

229 except Exception as e: 

230 field = self._cfg.get_fields.get(name, None) 

231 kwargs = {} if field is None else {'by_func': GetName(field.import_func, code=True)} 

232 raise FieldError('Import', self._cfg.name, name, from_value=raw_value, **kwargs, 

233 type_name=self._cfg.type_name, reason=repr(e)) from e 

234 

235 def import_section(self, raw_section: fields_t[str], name: str | None = None, typecast=True 

236 ) -> fields_t: 

237 """Import single section to dict with fields values 

238 note: Import section directly into the config is not available and may not be! 

239 :arg raw_section: Fields raw values 

240 :arg name: Section name (only for error message) 

241 :arg typecast: Force field type if field import_func result has any other type 

242 :return: Fields values 

243 :raise InputError: If wrong arguments provided 

244 :raise IOImportError: Any other error""" 

245 cfg = self._cfg 

246 fields = cfg.get_fields 

247 ie = (f'{self!r}.import_section()', 'raw_section') 

248 check_type(raw_section, Mapping, input_exc=ie) 

249 check_extra(raw_section, fields, 'field', input_exc=ie) 

250 try: 

251 result, errors = {}, [] 

252 _import = self._import_field 

253 for key, raw_value in raw_section.items(): 

254 field = fields[key] 

255 try: 

256 result[key] = _import(field, raw_value, typecast) 

257 except Exception as e: 

258 fn = GetName(field.import_func, code=True) 

259 errors.append(_TEMPL_FIELD_DESC.format(key, with_type(raw_value), fn, e)) 

260 if errors: 

261 raise CheckValueError("\n\t".join(('Errors:', *errors))) 

262 return result 

263 

264 except CheckValueError as e: 264 ↛ 266line 264 didn't jump to line 266

265 raise self._exc('import', str(e), name) 

266 except Exception as e: 

267 raise self._exc('import', repr(e), name) from e # not tested extreme case exception 

268 

269 def import_config(self, raw_config: Mapping[str, fields_t[str]], 

270 sections: mb_holder_t[str] | None = None, typecast=True): 

271 """Import whole config, or specified section(s) from it 

272 :arg raw_config: Sections with fields raw values 

273 :arg sections: Selected section name(s) to import or all (if not provided) 

274 :arg typecast: Force field type if field import_func result has any other type 

275 :return: Sections with fields values 

276 :raise InputError: If wrong arguments provided 

277 :raise IOImportError: Any other error""" 

278 cfg = self._cfg 

279 def_sect = cfg.def_sect 

280 profiles = cfg.profiles 

281 fields = tuple(cfg.get_fields) 

282 ie_func = f'{self!r}.import_config()' 

283 ie_cfg = (ie_func, 'raw_config') 

284 ie_sect = (ie_func, 'sections') 

285 try: 

286 raw_config = dict(raw_config) 

287 

288 # Import config support structure fields 

289 active = None 

290 active_fields = {} 

291 support = raw_config.pop(self._key_section, None) 

292 check_input(support, cfg.version or profiles, f'{self._key_section!r} section', 

293 input_exc=ie_cfg) 

294 if support is not None: 

295 version = support.get(self._key_version) 

296 if version: 296 ↛ 297line 296 didn't jump to line 297, because the condition on line 296 was never true

297 version = str(literal_eval(version)) 

298 if check_input(version, cfg.version, 'version', input_exc=ie_cfg): 

299 raise NotImplementedError('Version import is not available yet') 

300 

301 active = support.get(self._key_profile) 

302 if active: 

303 active = str(literal_eval(active)) 

304 if check_input(active, profiles, 'profile', input_exc=ie_cfg): 304 ↛ 321line 304 didn't jump to line 321, because the condition on line 304 was never false

305 if active not in raw_config and active != def_sect: 

306 raise fmt_exc(ie_cfg, f'Active profile is not provided: {active!r}') 

307 

308 if self._key_fields in support: 

309 active_fields_raw = support[self._key_fields] 

310 try: 

311 active_fields = dict(literal_eval(active_fields_raw)) 

312 except Exception as e: 

313 raise fmt_exc(ie_cfg, 'Active fields dict is not parsed: ' 

314 f'{active_fields_raw!r}') from e 

315 check_extra(active_fields, raw_config, 'active fields profile', 

316 input_exc=ie_cfg) 

317 [check_extra(v, fields, f'{k!r} profile active field', input_exc=ie_cfg) 

318 for k, v in active_fields.items()] 

319 

320 # Check sections 

321 if sections: 

322 sections = as_holder(sections) 

323 if profiles: 

324 selected = tuple(raw_config) 

325 else: 

326 selected = ((def_sect,) if def_sect in raw_config else ()) + (cfg.name,) 

327 check_extra(sections, selected, 'section', input_exc=ie_sect) 

328 raw_config = {k: raw_config[k] for k in sections} 

329 

330 # Import defaults 

331 if defaults := raw_config.pop(def_sect, {}): 

332 check_extra(defaults, fields, 'default field', input_exc=ie_cfg) 

333 defaults = self.import_section(defaults, def_sect, typecast) 

334 defaults = cfg.get_factory_defaults | defaults 

335 

336 # Import data 

337 profiles_data = {} 

338 if profiles: 

339 for k, v in raw_config.items(): 

340 p_data = self.import_section(v, k, typecast) 

341 if af := active_fields.get(k): 

342 check_items(p_data, af, f'{k!r} profile field', input_exc=ie_cfg) 

343 profiles_data[k] = {k: v for k, v in p_data.items() if k in af} 

344 else: 

345 profiles_data[k] = defaults | p_data 

346 else: 

347 data = defaults | self.import_section(raw_config[cfg.name], cfg.name, typecast) 

348 

349 # Apply successfully imported data 

350 cfg.set_defaults(defaults, typecheck=False) 

351 if profiles: 

352 profiles.clear() 

353 [profiles.set(name, profile, defaults=False, typecheck=False) 

354 for name, profile in profiles_data.items()] 

355 profiles.switch(active) 

356 else: 

357 cfg._set_fields(data) # noqa 

358 

359 except (InputError, IOImportError): 

360 raise 

361 except Exception as e: 

362 raise self._exc('import', repr(e)) from e