"""Collapse multiple config-sources and instantiate from them.
A lot of extra documentation on this is in dev-notes/config.rst.
"""
__all__ = (
"CollapsedConfig",
"ConfigManager",
)
import typing
import warnings
import weakref
from collections import defaultdict, deque, namedtuple
from snakeoil import klass, mappings
from snakeoil.compatibility import IGNORED_EXCEPTIONS
from . import basics, errors
_section_data = namedtuple("_section_data", ["name", "section"])
class _ConfigMapping(mappings.DictMixin):
"""Minimal dict-like wrapper returning config sections by type.
Similar to :class:`mappings.LazyValDict` but __getitem__
does not call the key func for __getitem__.
Careful: getting the keys for this mapping will collapse all of
central's configs to get at their types, which might be slow if
any of them are remote!
"""
typename: str
def __init__(self, manager, typename: str) -> None:
super().__init__()
self.manager = manager
self.typename = typename
def __getitem__(self, key: str) -> typing.Any:
conf = self.manager.collapse_named_section(key, raise_on_missing=False)
if conf is None or conf.type.name != self.typename:
raise KeyError(key)
return conf.instantiate()
def keys(self) -> typing.Iterator[str]:
for name in self.manager.sections():
try:
collapsed = self.manager.collapse_named_section(name)
except errors.ConfigError:
# Cannot be collapsed, ignore it (this is not
# an error, it can be used as base for
# something that can be collapsed)
pass
else:
if collapsed.type.name == self.typename:
yield name
def __contains__(self, key: str) -> bool:
conf = self.manager.collapse_named_section(key, raise_on_missing=False)
return conf is not None and conf.type.name == self.typename
class _ConfigStack(defaultdict[str, list[typing.Any]]):
def __init__(self) -> None:
super().__init__(list)
def render_value(
self, manager, key: str, type_name: str
) -> typing.Optional[typing.Any]:
for data in self.get(key, ()):
if key in data.section:
return data.section.render_value(manager, key, type_name)
return None
[docs]
class CollapsedConfig:
"""A collapsed config section.
:type type: :obj:`basics.ConfigType`
:ivar type: Our type.
:type config: dict
:ivar config: The supplied configuration values.
:ivar debug: if True exception wrapping is disabled.
:ivar default: True if this section is a default.
:type name: C{str} or C{None}
:ivar name: our section name or C{None} for an anonymous section.
"""
type_obj: basics.ConfigType
config: dict[str, typing.Any]
debug: bool
default: bool
def __init__(
self,
type_obj: basics.ConfigType,
config: dict[str, typing.Any],
manager,
debug: bool = False,
default: bool = False,
) -> None:
"""Initialize instance vars."""
# Check if we got all values required to instantiate.
missing = set(type_obj.required) - set(config)
if missing:
module = type_obj.callable.__module__
name = type_obj.callable.__name__
missing_vars = ", ".join(map(repr, missing))
raise errors.ConfigurationError(
f"type {module}.{name} needs settings for {missing_vars}"
)
self.name = None
self.default = default
self.debug = debug
self.type = type_obj
self.config = config
# Cached instance if we have one.
self._instance = None
if manager is not None:
manager = weakref.ref(manager)
self.manager = manager
[docs]
def instantiate(self) -> typing.Any:
if self._instance is None:
try:
self._instance = self._instantiate()
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise errors.InstantiationError(self.name) from e
return self._instance
def _instantiate(self) -> typing.Any:
"""Call our type's callable, cache and return the result.
Calling instantiate more than once will return the cached value.
"""
# Needed because this code can run twice even with instance
# caching if we trigger an ComplexInstantiationError.
config = mappings.ProtectedDict(self.config)
# Instantiate section refs.
# Careful: not everything we have for needs to be in the conf dict
# (because of default values) and not everything in the conf dict
# needs to have a type (because of allow_unknowns).
for name, val in config.items():
typename = self.type.types.get(name)
if typename is None:
continue
# central already checked the type, no need to repeat that here.
unlist_it = False
if typename.startswith("ref:"):
val = [val]
unlist_it = True
if typename.startswith("refs:") or unlist_it:
try:
final_val = []
for ref in val:
final_val.append(ref.instantiate())
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise errors.ConfigurationError(
f"Instantiating reference {name!r} pointing at {ref.name!r}"
) from e
if unlist_it:
final_val = final_val[0]
config[name] = final_val
if self.type.requires_config:
if self.manager is None:
raise Exception(
"configuration internal error; "
"requires_config is enabled "
"but we have no config manager to return "
)
manager = self.manager()
if manager is None:
raise Exception(
"Configuration internal error, potentially "
"client code error; manager requested, but the config "
"manager is no longer in memory"
)
config[self.type.requires_config] = manager
callable_obj = self.type.callable
# return raw, uninstantiated class object if requested
if self.type.raw_class:
return callable_obj
pargs = []
for var in self.type.positional:
pargs.append(config.pop(var))
# Python is basically the worst language ever:
# TypeError: repo() argument after ** must be a dictionary
configdict = dict(config)
try:
self._instance = callable_obj(*pargs, **configdict)
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
source = errors._identify_functor_source(self.type.callable)
raise errors.InstantiationError(
self.name, f"exception caught from {source!r}"
) from e
if self._instance is None:
raise errors.ComplexInstantiationError(
"No object returned",
callable_obj=callable_obj,
pargs=pargs,
kwargs=configdict,
)
return self._instance
def __getstate__(self) -> dict[str, typing.Any]:
d = self.__dict__.copy()
# pull actual value from weakref
d["manager"] = d["manager"]()
return d
def __setstate__(self, state: dict[str, typing.Any]) -> None:
self.__dict__ = state.copy()
# reset weakref
self.__dict__["manager"] = weakref.ref(self.__dict__["manager"])
class _ConfigObjMap:
def __init__(self, manager):
self._manager = manager
def __getattr__(self, attr: str) -> typing.Any:
return _ConfigMapping(self._manager, attr)
def __getitem__(self, key: str) -> typing.Any:
val = getattr(self._manager.objects, key, klass.sentinel)
if val is None:
raise KeyError(key)
return val
def __getstate__(self) -> dict[str, typing.Any]:
# Explicitly defined to force pickling to work as expected without
# trying to pull __getstate__ from _ConfigMapping due to __getattr__.
return self.__dict__.copy()
def __setstate__(self, state: dict[str, typing.Any]) -> None:
self.__dict__.update(state)
class CompatConfigManager:
"""This is a compatibility hack to alias attribute access to the new location
See commit fa90aff05306fb4935604e64645f2d1d2049233e for when this was introduced.
This should be removed once consumers are converted in their access."""
def __init__(self, manager) -> None:
self._manager = manager
def __getattr__(self, attr: str) -> typing.Any:
if attr == "_manager":
return object.__getattribute__(self, "_manager")
obj = getattr(self._manager, attr, klass.sentinel)
if obj is klass.sentinel:
obj = getattr(self._manager.objects, attr)
warnings.warn(
f"Access to central objects must be done via '.objects.{attr}' rather than '.{attr}'",
stacklevel=2,
)
return obj
__dir__ = klass.DirProxy("_manager")
[docs]
class ConfigManager:
"""Combine config type definitions and configuration sections.
Creates instances of a requested type and name by pulling the
required data from any number of provided configuration sources.
The following special type names are recognized:
- configsection: instantiated and used the same way as an entry in the
configs :obj:`__init__` arg.
These "magic" typenames are only recognized if they are used by a
section with a name starting with "autoload".
"""
def __init__(self, configs=(), debug: bool = False):
"""Initialize.
:type configs: sequence of mappings of string to ConfigSection.
:param configs: configuration to use.
Can define extra configs that are also loaded.
:param debug: if set to True exception wrapping is disabled.
This means things can raise other exceptions than
ConfigurationError but tracebacks are complete.
"""
self.original_config_sources = tuple(map(self._compat_mangle_config, configs))
# Set of encountered section names, used to catch recursive references.
self._refs = set()
self.debug = debug
self.reload()
# cycle...
self.objects = _ConfigObjMap(self)
def _compat_mangle_config(self, config):
if hasattr(config, "sections"):
return config
return basics.GeneratedConfigSource(config, "unknown")
[docs]
def reload(self) -> None:
"""Reinitialize from the configured config sources.
This throws away all cached instances.
"""
# "Attribute defined outside __init__"
# pylint: disable-msg=W0201
self.configs = []
self.config_sources = []
# Cache mapping confname to CollapsedConfig.
self.rendered_sections = {}
self.sections_lookup = defaultdict(deque)
# force regeneration.
self._types = klass._uncached_singleton
for config in self.original_config_sources:
self._integrate_config_source(config)
[docs]
def add_config_source(self, config) -> None:
"""Add the given config source and reload the internal rendering"""
self.original_config_sources += (config,)
self.reload()
def _integrate_config_source(self, config) -> None:
"""Pull extra type and config sections from configs and use them.
Things loaded this way are added after already loaded things
(meaning the config containing the autoload section overrides
the config(s) added by that section).
"""
config = self._compat_mangle_config(config)
config_data = config.sections()
collision = set(self.rendered_sections)
collision.intersection_update(config_data)
if collision:
# If this matches something we previously instantiated
# we should probably blow up to prevent massive
# amounts of confusion (and recursive autoloads)
sections = ", ".join(repr(x) for x in sorted(collision))
raise errors.ConfigurationError(
"New config is trying to modify existing section(s) "
f"{sections} that was already instantiated."
)
self.configs.append(config_data)
self.config_sources.append(config)
for name in config_data:
self.sections_lookup[name].appendleft(config_data[name])
# Do not even touch the ConfigSection if it's not an autoload.
if not name.startswith("autoload"):
continue
try:
collapsed = self.collapse_named_section(name)
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise errors.ConfigurationError(
f"Failed collapsing autoload section {name!r}"
) from e
if collapsed.type.name != "configsection":
raise errors.ConfigurationError(
f"Section {name!r} is marked as autoload but "
f"type is {collapsed.type.name}, not configsection"
)
try:
instance = collapsed.instantiate()
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise errors.AutoloadInstantiationError(name) from e
if collapsed.type.name == "configsection":
self._integrate_config_source(instance)
[docs]
def sections(self) -> typing.Iterator[str]:
"""Return an iterator of all section names."""
return iter(self.sections_lookup.keys())
[docs]
def collapse_named_section(self, name: str, raise_on_missing: bool = True):
"""Collapse a config by name, possibly returning a cached instance.
@returns: :obj:`CollapsedConfig`.
If there is no section with this name a ConfigurationError is raised,
unless raise_on_missing is False in which case None is returned.
"""
if name in self._refs:
raise errors.ConfigurationError(f"Reference to {name!r} is recursive")
self._refs.add(name)
try:
result = self.rendered_sections.get(name)
if result is not None:
return result
section_stack = self.sections_lookup.get(name)
if section_stack is None:
if not raise_on_missing:
return None
raise errors.ConfigurationError(f"no section called {name!r}")
try:
result = self.collapse_section(section_stack, name)
result.name = name
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise errors.ConfigurationError(
f"Collapsing section named {name!r}"
) from e
self.rendered_sections[name] = result
return result
finally:
self._refs.remove(name)
def _get_inherited_sections(self, name: str, sections):
# List of (name, ConfigSection, index) tuples, most specific first.
slist = [(name, sections)]
# first map out inherits.
inherit_names = set([name])
for current_section, section_stack in slist:
current_conf = section_stack[0]
if "inherit" not in current_conf:
continue
inherits = current_conf.render_value(self, "inherit", "list")
for inherit in inherits:
if inherit == current_section:
# self-inherit. Mkae use of section_stack to handle this.
if len(section_stack) == 1:
# nothing else to self inherit.
raise errors.ConfigurationError(
f"Self-inherit {inherit!r} cannot be found"
)
if isinstance(section_stack, deque):
slist.append((inherit, list(section_stack)[1:]))
else:
slist.append((inherit, section_stack[1:]))
else:
if inherit in inherit_names:
raise errors.ConfigurationError(
f"Inherit {inherit!r} is recursive"
)
inherit_names.add(inherit)
target = self.sections_lookup.get(inherit)
if target is None:
raise errors.ConfigurationError(
f"Inherit target {inherit!r} cannot be found"
)
slist.append((inherit, target))
return [_section_data(name, stack[0]) for (name, stack) in slist]
def _section_is_inherit_only(self, section) -> bool:
if "inherit-only" in section:
if section.render_value(self, "inherit-only", "bool"):
return True
return False
[docs]
def collapse_section(self, sections, _name: typing.Optional[str] = None):
"""Collapse a ConfigSection to a :obj:`CollapsedConfig`."""
if self._section_is_inherit_only(sections[0]):
if sections[0].render_value(self, "inherit-only", "bool"):
raise errors.CollapseInheritOnly("cannot collapse inherit-only section")
relevant_sections = self._get_inherited_sections(_name, sections)
config_stack = _ConfigStack()
for data in relevant_sections:
for key in data.section.keys():
config_stack[key].append(data)
kls = config_stack.render_value(self, "class", "callable")
if kls is None:
raise errors.ConfigurationError("no class specified")
type_obj = basics.ConfigType(kls)
is_default = bool(config_stack.render_value(self, "default", "bool"))
for key in ("inherit", "inherit-only", "class", "default"):
config_stack.pop(key, None)
collapsed = CollapsedConfig(
type_obj,
self._render_config_stack(type_obj, config_stack),
self,
default=is_default,
debug=self.debug,
)
return collapsed
@klass.jit_attr
def types(self) -> dict[str, dict[str, typing.Any]]:
type_map = defaultdict(dict)
for name, sections in self.sections_lookup.items():
if self._section_is_inherit_only(sections[0]):
continue
obj = self.collapse_named_section(name)
type_map[obj.type.name][name] = obj
return mappings.ImmutableDict(
(k, mappings.ImmutableDict(v)) for k, v in type_map.items()
)
def _render_config_stack(self, type_obj, config_stack) -> dict[str, typing.Any]:
conf = {}
for key in config_stack:
typename = type_obj.types.get(key)
if typename is None:
if not type_obj.allow_unknowns:
raise errors.ConfigurationError(f"Type of {key!r} unknown")
typename = "str"
is_ref = typename.startswith("ref:")
is_refs = typename.startswith("refs:")
if typename.startswith("lazy_"):
typename = typename[5:]
result = config_stack.render_value(self, key, typename)
if is_ref:
result = [result]
is_refs = True
if is_refs:
try:
result = [ref.collapse() for ref in result]
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise errors.ConfigurationError(
f"Failed collapsing section key {key!r}"
) from e
if is_ref:
result = result[0]
conf[key] = result
# Check if we got all values required to instantiate.
missing = set(type_obj.required) - set(conf)
if missing:
module = type_obj.callable.__module__
name = type_obj.callable.__name__
missing_vars = ", ".join(map(repr, missing))
raise errors.ConfigurationError(
f"type {module}.{name} needs settings for {missing_vars}"
)
return mappings.ImmutableDict(conf)
[docs]
def get_default(self, type_name: str) -> typing.Optional[typing.Any]:
"""Finds the configuration specified default obj of type_name.
Returns C{None} if no defaults.
"""
try:
defaults = self.types.get(type_name, {}).items()
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise errors.ConfigurationError(
f"Collapsing defaults for {type_name!r}"
) from e
defaults = [(name, section) for name, section in defaults if section.default]
if not defaults:
return None
if len(defaults) > 1:
defaults = ", ".join(map(repr, sorted(x[0] for x in defaults)))
raise errors.ConfigurationError(
f"type {type_name} incorrectly has multiple default sections: {defaults}"
)
try:
return defaults[0][1].instantiate()
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise errors.ConfigurationError(
f"failed instantiating default {type_name} {defaults[0][0]!r}"
) from e
return None