"""
Handling of footprints collectors.
Module's usage is mostly dedicated to the main footprints package.
The footprints proxy could make some part of the interface visible as well.
"""
from weakref import WeakSet
import collections
import logging
from bronx.fancies import dump, loggers
from bronx.stdtypes.catalog import Catalog
from bronx.patterns import observer, getbytag
from . import config, priorities, reporting
#: No automatic export
__all__ = []
logger = loggers.getLogger(__name__)
# Module Interface
[docs]def get(**kw):
"""Return actual collector object matching description."""
return Collector(**kw)
[docs]def keys():
"""Return the list of current entries names collected."""
return Collector.tag_keys()
[docs]def values():
"""Return the list of current entries values collected."""
return Collector.tag_values()
[docs]def items():
"""Return the items of the collectors table."""
return Collector.tag_items()
# Base class
[docs]class Collector(getbytag.GetByTag, Catalog, observer.Observer):
"""
A class collector is devoted to the gathering of class references that inherit
from a given class (here a class with a footprint), according to some other optional criteria.
:param int non_ambiguous_loglevel: The loglevel used in the find_best method when
several choices are possible but the priority of the various candidates
makes the choice easy (default: logging.INFO).
"""
_tag_default = 'garbage'
def __init__(self, **kw):
logger.debug('Footprints collector init %s', str(self))
self.instances = Catalog(weak=True)
self.abstract_classes = Catalog(weak=True)
self.register = True
self.report = config.ONERROR_REPORTING
self.lreport_len = config.DFLT_MAXLEN_LIGHT_REPORTING
self.report_auto = True
self.report_tag = None
self.report_style = config.RAW_REPORTINGSTYLE
self.non_ambiguous_loglevel = logging.INFO
getbytag.GetByTag.__init__(self)
Catalog.__init__(self, **kw)
if self.report_tag is None:
self.report_tag = 'footprint-' + self.tag
r_maxlen = None if self.report == config.FULL_REPORTING else self.lreport_len
self.report_log = reporting.get(tag=self.report_tag, log_maxlen=r_maxlen)
config.add2proxies(self)
self._del_fasttrack()
[docs] @classmethod
def tag_clean(cls, tag):
"""Return a lower-case string without any "s" at the end."""
return tag.lower().rstrip('s')
[docs] def newobsitem(self, item, info):
"""Register a new instance of some of the classes in the current collector."""
logger.debug('Notified {!r} new item {!r}'.format(self, item))
self.instances.add(item)
[docs] def delobsitem(self, item, info):
"""Unregister an existing object in the current collector of instances."""
logger.debug('Notified {!r} del item {!r}'.format(self, item))
self.instances.discard(item)
[docs] def filter_package(self, packname):
"""Find in current collector classes with name starting with ``packname``."""
return [cl for cl in self.items() if cl.fullname().startswith(packname)]
[docs] def discard_package(self, packname, verbose=True):
"""Discard from current collector classes with name starting with ``packname``."""
for x in self.filter_package(packname):
if verbose:
print('Bye...', x)
self.discard(x)
[docs] def filter_onflag(self, flagmethod, default=True):
"""Find in current collector classes with method ``flagmethod`` returning ``default``."""
return [cl for cl in self.items() if hasattr(cl, flagmethod) and getattr(cl, flagmethod)() == default]
[docs] def discard_onflag(self, flagmethod, default=True, verbose=True):
"""Discard from current collector classes with method ``flagmethod`` returning ``default``."""
for x in self.filter_onflag(flagmethod, default):
if verbose:
print('Bye...', x)
self.discard(x)
[docs] def filter_higher_level(self, tag):
"""Find in current collector classes with priority level higher or equal to ``level``."""
plevel = priorities.top.level(tag)
return [cl for cl in self.items() if cl.footprint_pl() >= plevel]
[docs] def discard_higher_level(self, tag, verbose=True):
"""Discard from current collector classes with priority level higher or equal to ``level``."""
for x in self.filter_higher_level(tag):
if verbose:
print('Bye...', x)
self.discard(x)
[docs] def filter_lower_level(self, tag):
"""Find in current collector classes with priority level lower than ``level``."""
plevel = priorities.top.level(tag)
return [cl for cl in self.items() if cl.footprint_pl() < plevel]
[docs] def discard_lower_level(self, tag, verbose=True):
"""Discard from current collector classes with priority level lower than ``level``."""
for x in self.filter_lower_level(tag):
if verbose:
print('Bye...', x)
self.discard(x)
[docs] def reset_package_level(self, packname, tag):
"""Reset priority level current collector classes with name starting with ``packname``."""
plevel = priorities.top.level(tag)
for cl in self.filter_package(packname):
fp = cl.footprint_retrieve()
fp.priority['level'] = plevel
def _upd_fasttrack_index(self, cls):
attrerror = set()
for myattr in self._fasttrack_attr:
myfp = cls.footprint_retrieve()
if myattr in myfp.mandatory():
myvalues = myfp.get_values(myattr)
# Is there some restrictions on values ?
if myvalues:
# Ensure that the attribute types are consistent
if self._fasttrack_type[myattr] is None:
self._fasttrack_type[myattr] = myfp.attr[myattr].get('type', str)
self._fasttrack_typeargs[myattr] = myfp.attr[myattr].get('args', dict())
else:
if not (self._fasttrack_type[myattr] is myfp.attr[myattr].get('type', str) and
self._fasttrack_typeargs[myattr] == myfp.attr[myattr].get('args', dict())):
logger.warning("Inconsistent types (%s vs %s) for fasttrack attributes (class: %s). " +
"Removing it (%s) from the fasttrack list.",
str(self._fasttrack_type[myattr]),
str(myfp.attr[myattr].get('type', str)),
repr(cls), myattr)
attrerror.add(myattr)
continue
# Let's go...
for myvalue in myvalues:
self._fasttrack_index[myattr][myvalue].add(cls)
# No restrictions on values so it's a potential candidate for everyone
else:
self._fasttrack_trap[myattr].add(cls)
# The attribute is optional or missing so it's a possible candidate for everyone
else:
self._fasttrack_trap[myattr].add(cls)
# Process errors
if attrerror:
for myattr in set(self._fasttrack_attr):
if myattr in attrerror:
self._fasttrack_attr.remove(myattr)
del self._fasttrack_type[myattr]
del self._fasttrack_typeargs[myattr]
del self._fasttrack_index[myattr]
del self._fasttrack_trap[myattr]
def _upd_fasttrack_delete(self, cls):
for fvalues in self._fasttrack_index.values():
for classes in fvalues.values():
classes.discard(cls)
for classes in self._fasttrack_trap.values():
classes.discard(cls)
def _del_fasttrack(self):
self._fasttrack_attr = set()
self._fasttrack_index = dict()
self._fasttrack_type = dict()
self._fasttrack_typeargs = dict()
self._fasttrack_trap = dict()
def _set_fasttrack(self, attrset):
self._del_fasttrack()
self._fasttrack_attr = set(attrset)
for myattr in self._fasttrack_attr:
self._fasttrack_type[myattr] = None
self._fasttrack_typeargs[myattr] = dict()
self._fasttrack_index[myattr] = collections.defaultdict(WeakSet)
self._fasttrack_trap[myattr] = WeakSet()
for mycls in self._items:
self._upd_fasttrack_index(mycls)
def _get_fasttrack(self):
return set(self._fasttrack_attr)
fasttrack = property(_get_fasttrack, _set_fasttrack)
def _fasttrack_subsetting(self, desc):
if self._fasttrack_attr:
objgroup_list = list()
for k, v in desc.items():
if k in self._fasttrack_attr:
# Check if the key's value is in the index
if v in self._fasttrack_index[k]:
indexkey = v
else:
# A type conversion might be usefull
try:
v_conv = self._fasttrack_type[k](v, ** self._fasttrack_typeargs[k])
except (ValueError, TypeError):
v_conv = None
if v_conv is not None and v_conv in self._fasttrack_index[k]:
indexkey = v_conv
else:
# Ok we give up...
indexkey = None
if indexkey is not None:
logger.debug('Fasttrack subsetting took place for key %s', k)
objgroup_list.append(self._fasttrack_index[k][indexkey] |
self._fasttrack_trap[k])
if objgroup_list:
if len(objgroup_list) > 1:
finalset = objgroup_list[0]
for objgroup in objgroup_list[1:]:
finalset = finalset.intersection(objgroup)
return finalset
else:
return objgroup_list[0]
return self._items
[docs] def add(self, *items, **kwargs):
"""Add the ``items`` entries in the current catalog."""
if kwargs.get('abstract', False):
self.abstract_classes.add(*items)
else:
super().add(*items)
for item in items:
self._upd_fasttrack_index(item)
[docs] def discard(self, bye):
"""Remove the ``bye`` entry from current catalog."""
super().discard(bye)
self._upd_fasttrack_delete(bye)
[docs] def pickup_and_cache(self, desc, resolvecache=None):
"""Try to pickup inside the collector an item that could match the description."""
logger.debug('Pick up a "{:s}" in description {!s} with collector {!r}'.format(self.tag, desc, self))
if resolvecache is None:
resolvecache = ResolveCache()
emptywarning = desc.pop('_emptywarning', True)
mkstdreport = desc.pop('_report', self.report_auto)
reportstyle = desc.pop('_report_style', self.report_style)
for hidden in [x for x in desc.keys() if x.startswith('_')]:
logger.warning('Hidden argument "%s" ignored in pickup attributes', hidden)
del desc[hidden]
if self.tag in desc and desc[self.tag] is not None:
logger.debug('A %s is already defined %s', self.tag, str(desc[self.tag]))
else:
desc[self.tag] = self.find_best(desc, resolvecache=resolvecache)
if desc[self.tag] is not None:
desc = desc[self.tag].footprint_cleanup(desc)
elif emptywarning:
dumper = dump.get()
logger.warning("No %s found in description \n%s", self.tag, dumper.cleandump(desc))
if mkstdreport and self.report:
print("\n", self.report_log.info(), "\n")
if reportstyle == config.RAW_REPORTINGSTYLE:
self.report_last.lightdump()
if reportstyle == config.FLAT_REPORTINGSTYLE:
altreport = self.report_last.as_flat()
altreport.reshuffle(['why', 'attribute'], skip=False)
altreport.fulldump()
altreport.reshuffle(['only', 'attribute'], skip=False)
altreport.fulldump()
if reportstyle == config.FACTORIZED1_REPORTINGSTYLE:
altreport = self.report_sorted()
altreport.orderedprint()
if reportstyle == config.FACTORIZED2_REPORTINGSTYLE:
altreport = self.report_sorted()
altreport.dumper()
return desc, resolvecache
[docs] def pickup(self, desc, resolvecache=None):
"""Try to pickup inside the collector an item that could match the description."""
return self.pickup_and_cache(desc, resolvecache=resolvecache)[0]
[docs] def find_any(self, desc, resolvecache=None):
"""
Return the first item of the collector that :meth:`footprint_couldbe`
as described by argument ``desc``.
"""
logger.debug('Search any %s in collector %s', str(desc), str(self._items))
if resolvecache is None:
resolvecache = ResolveCache()
requeue = True
report_log = None if self.report == config.ONERROR_REPORTING else self.report_log
while requeue:
requeue = False
if self.report and report_log is not None:
report_log.add(collector=self)
for item in self._fasttrack_subsetting(desc):
resolved, u_input = item.footprint_couldbe(desc, # @UnusedVariable
resolvecache=resolvecache,
report=report_log)
if resolved:
return item(resolved, checked=True)
if (self.report == config.ONERROR_REPORTING and
report_log is None):
requeue = True
report_log = self.report_log
return None
[docs] def find_all(self, desc, resolvecache=None):
"""
Returns all the items of the collector that :meth:`footprint_couldbe`
as described by argument ``desc``.
"""
logger.debug('Search all %s in collector %s', str(desc), str(self._items))
if resolvecache is None:
resolvecache = ResolveCache()
requeue = True
report_log = None if self.report == config.ONERROR_REPORTING else self.report_log
while requeue:
requeue = False
found = list()
if self.report and report_log is not None:
report_log.add(collector=self)
for item in self._fasttrack_subsetting(desc):
resolved, theinput = item.footprint_couldbe(desc,
resolvecache=resolvecache,
report=report_log)
if resolved:
found.append((item, resolved, theinput))
if (not found and
self.report == config.ONERROR_REPORTING and
report_log is None):
requeue = True
report_log = self.report_log
return found
[docs] def find_best(self, desc, resolvecache=None):
"""
Returns the best of the items returned by the :meth:`find_all` method
according to potential priority rules.
"""
logger.debug('Search best %s in collector %s', str(desc), str(self._items))
candidates = self.find_all(desc, resolvecache=resolvecache)
if not candidates:
return None
if len(candidates) > 1:
candidates.sort(key=lambda x: x[0].footprint_weight(len(x[2])), reverse=True)
ambiguous = candidates[0][0].footprint_pl() == candidates[1][0].footprint_pl()
loglevel = logging.WARNING if ambiguous else self.non_ambiguous_loglevel
dumper = dump.get()
logger.log(loglevel, "Multiple %s candidates \n%s", self.tag, dumper.cleandump(desc))
for i, c in enumerate(candidates):
thisclass, u_resolved, theinput = c # @UnusedVariable
logger.log(loglevel, 'no.%d in.%d is %s', i + 1, len(theinput), str(thisclass))
topcl, topr, u_topinput = candidates[0] # @UnusedVariable
return topcl(topr, checked=True)
[docs] def load(self, **desc):
"""Return the value matching current collector's tag after pickup of attributes."""
return self.pickup(desc).get(self.tag, None)
[docs] def almost_clone(self, original, **extra):
"""Return an almost clone, with some extra or different attributes."""
assert hasattr(original, 'footprint_as_dict')
attrs = original.footprint_as_dict()
attrs.update(extra)
return self.load(**attrs)
[docs] def default(self, **kw):
"""
Try to find in existing instances tracked by the ``tag`` collector
a suitable candidate according to description.
"""
for inst in self.instances():
if inst.footprint_reusable() and inst.footprint_compatible(kw):
return inst
return self.load(**kw)
[docs] def grep(self, **kw):
"""
Grep in the current instances of the collector items that match
the set of attributes given as named arguments.
"""
okmatch = list()
for item in self.instances:
ok = True
for k, v in kw.items():
if not hasattr(item, k) or getattr(item, k) != v:
ok = False
break
if ok:
okmatch.append(item)
return okmatch
[docs] def build_attrmap(self, attrmap=None, only=None):
"""Build a reversed attr-class map."""
if attrmap is None:
attrmap = dict()
if only is not None and not hasattr(only, '__contains__'):
only = (only,)
for c in self:
fp = c.footprint_retrieve()
for k in [ka for ka in fp.attr.keys() if only is None or ka in only]:
opt = ' [optional]' if fp.optional(k) else ''
alist = attrmap.setdefault(k + opt, list())
alist.append(dict(
name=c.__name__,
module=c.__module__,
values=fp.get_values(k),
outcast=fp.get_outcast(k)
))
return attrmap
[docs] def show_attrmap(self, only=None):
"""
Show the complete set of attributes that could be found in classes
collected by the current collector, documented with ``values``
or ``outcast`` sets if present.
"""
attrmap = self.build_attrmap(only=only)
for a in sorted(attrmap.keys()):
print(' *', a + ':')
for info in sorted(attrmap[a], key=lambda x: x['name']):
print(' ' * 4, info['name'].ljust(22), '+', info['module'])
for k in [x for x in info.keys() if x not in ('name', 'module') and info[x]]:
print(' ' * 29, '|', k, '=',
str(info[k]).replace("'", '').replace('(', '').replace(')', '').strip(','))
print()
[docs] def show_attrkeys(self, only=None):
"""
Show the list of attribute names that could be found in classes
collected by the current collector.
"""
attrmap = self.build_attrmap(only=only)
for a in [x.split() + [''] for x in sorted(attrmap.keys())]:
print(' *', a[0].ljust(24), a[1])
[docs] def get_values(self, attrname):
"""Complete set of values which are explicitly authorized for a given attribute."""
allvalues = set()
for c in self:
fp = c.footprint_retrieve()
if attrname in fp.attr:
for v in fp.get_values(attrname):
allvalues.add(v)
return sorted(allvalues)
[docs] def report_dump(self, stamp=False):
"""Print a nicelly formatted dump report as a dict."""
self.report_log.fulldump(stamp=stamp)
@property
def report_last(self):
"""
Return the subpart of the report related to the last sequence
of evaluation through the current collector.
"""
return self.report_log.last
[docs] def report_sorted(self, **kw):
"""
Return the subpart of the report related to the last sequence
of evaluation through the current collector ordered by args.
"""
return self.report_last.as_tree(**kw)
[docs] def report_dumplast(self):
"""Print a nicely formatted dump report as a dict."""
print(dump.fulldump(self.report_last.as_dict()))
[docs] def report_whynot(self, classname):
"""
Report why any class matching the ``classname`` pattern
has not been selected through the last evaluation.
"""
return self.report_log.whynot(classname)
# Utility classes that cache some results in order to speed-up the resolution
[docs]class ResolveCache:
def __init__(self):
setup = config.get()
self.defaults = setup.defaults
self.extras = setup.extras()
self._shallow_cache = dict()
def get_shallow_fp(self, obj):
if obj not in self._shallow_cache:
self._shallow_cache[obj] = obj.footprint_as_shallow_dict()
return self._shallow_cache[obj]