# -*- coding: utf-8 -*-
"""
Handling of footprints collectors.
Module's usage is mostly dedicated to the main footprints package.
The footprints proxy could make some part of the interface visible as well.
"""
from __future__ import print_function, absolute_import, division, unicode_literals
import six
from weakref import WeakSet
import collections
import logging
from bronx.fancies import dump, loggers
from bronx.stdtypes.catalog import Catalog
from bronx.patterns import observer, getbytag
from . import config, priorities, reporting
#: No automatic export
__all__ = []
logger = loggers.getLogger(__name__)
# Module Interface
[docs]def get(**kw):
"""Return actual collector object matching description."""
return Collector(**kw)
[docs]def keys():
"""Return the list of current entries names collected."""
return Collector.tag_keys()
[docs]def values():
"""Return the list of current entries values collected."""
return Collector.tag_values()
[docs]def items():
"""Return the items of the collectors table."""
return Collector.tag_items()
# Base class
[docs]class Collector(getbytag.GetByTag, Catalog, observer.Observer):
"""
A class collector is devoted to the gathering of class references that inherit
from a given class (here a class with a footprint), according to some other optional criteria.
:param int non_ambiguous_loglevel: The loglevel used in the find_best method when
several choices are possible but the priority of the various candidates
makes the choice easy (default: logging.INFO).
"""
_tag_default = 'garbage'
def __init__(self, **kw):
logger.debug('Footprints collector init %s', str(self))
self.instances = Catalog(weak=True)
self.abstract_classes = Catalog(weak=True)
self.register = True
self.report = config.ONERROR_REPORTING
self.lreport_len = config.DFLT_MAXLEN_LIGHT_REPORTING
self.report_auto = True
self.report_tag = None
self.report_style = config.RAW_REPORTINGSTYLE
self.non_ambiguous_loglevel = logging.INFO
getbytag.GetByTag.__init__(self)
Catalog.__init__(self, **kw)
if self.report_tag is None:
self.report_tag = 'footprint-' + self.tag
r_maxlen = None if self.report == config.FULL_REPORTING else self.lreport_len
self.report_log = reporting.get(tag=self.report_tag, log_maxlen=r_maxlen)
config.add2proxies(self)
self._del_fasttrack()
[docs] @classmethod
def tag_clean(cls, tag):
"""Return a lower-case string without any "s" at the end."""
return tag.lower().rstrip('s')
[docs] def newobsitem(self, item, info):
"""Register a new instance of some of the classes in the current collector."""
logger.debug('Notified {!r} new item {!r}'.format(self, item))
self.instances.add(item)
[docs] def delobsitem(self, item, info):
"""Unregister an existing object in the current collector of instances."""
logger.debug('Notified {!r} del item {!r}'.format(self, item))
self.instances.discard(item)
[docs] def filter_package(self, packname):
"""Find in current collector classes with name starting with ``packname``."""
return [cl for cl in self.items() if cl.fullname().startswith(packname)]
[docs] def discard_package(self, packname, verbose=True):
"""Discard from current collector classes with name starting with ``packname``."""
for x in self.filter_package(packname):
if verbose:
print('Bye...', x)
self.discard(x)
[docs] def filter_onflag(self, flagmethod, default=True):
"""Find in current collector classes with method ``flagmethod`` returning ``default``."""
return [cl for cl in self.items() if hasattr(cl, flagmethod) and getattr(cl, flagmethod)() == default]
[docs] def discard_onflag(self, flagmethod, default=True, verbose=True):
"""Discard from current collector classes with method ``flagmethod`` returning ``default``."""
for x in self.filter_onflag(flagmethod, default):
if verbose:
print('Bye...', x)
self.discard(x)
[docs] def filter_higher_level(self, tag):
"""Find in current collector classes with priority level higher or equal to ``level``."""
plevel = priorities.top.level(tag)
return [cl for cl in self.items() if cl.footprint_pl() >= plevel]
[docs] def discard_higher_level(self, tag, verbose=True):
"""Discard from current collector classes with priority level higher or equal to ``level``."""
for x in self.filter_higher_level(tag):
if verbose:
print('Bye...', x)
self.discard(x)
[docs] def filter_lower_level(self, tag):
"""Find in current collector classes with priority level lower than ``level``."""
plevel = priorities.top.level(tag)
return [cl for cl in self.items() if cl.footprint_pl() < plevel]
[docs] def discard_lower_level(self, tag, verbose=True):
"""Discard from current collector classes with priority level lower than ``level``."""
for x in self.filter_lower_level(tag):
if verbose:
print('Bye...', x)
self.discard(x)
[docs] def reset_package_level(self, packname, tag):
"""Reset priority level current collector classes with name starting with ``packname``."""
plevel = priorities.top.level(tag)
for cl in self.filter_package(packname):
fp = cl.footprint_retrieve()
fp.priority['level'] = plevel
def _upd_fasttrack_index(self, cls):
attrerror = set()
for myattr in self._fasttrack_attr:
myfp = cls.footprint_retrieve()
if myattr in myfp.mandatory():
myvalues = myfp.get_values(myattr)
# Is there some restrictions on values ?
if myvalues:
# Ensure that the attribute types are consistent
if self._fasttrack_type[myattr] is None:
self._fasttrack_type[myattr] = myfp.attr[myattr].get('type', six.text_type)
self._fasttrack_typeargs[myattr] = myfp.attr[myattr].get('args', dict())
else:
if not (self._fasttrack_type[myattr] is myfp.attr[myattr].get('type', six.text_type) and
self._fasttrack_typeargs[myattr] == myfp.attr[myattr].get('args', dict())):
logger.warning("Inconsistent types (%s vs %s) for fasttrack attributes (class: %s). " +
"Removing it (%s) from the fasttrack list.",
str(self._fasttrack_type[myattr]),
str(myfp.attr[myattr].get('type', str)),
repr(cls), myattr)
attrerror.add(myattr)
continue
# Let's go...
for myvalue in myvalues:
self._fasttrack_index[myattr][myvalue].add(cls)
# No restrictions on values so it's a potential candidate for everyone
else:
self._fasttrack_trap[myattr].add(cls)
# The attribute is optional or missing so it's a possible candidate for everyone
else:
self._fasttrack_trap[myattr].add(cls)
# Process errors
if attrerror:
for myattr in set(self._fasttrack_attr):
if myattr in attrerror:
self._fasttrack_attr.remove(myattr)
del self._fasttrack_type[myattr]
del self._fasttrack_typeargs[myattr]
del self._fasttrack_index[myattr]
del self._fasttrack_trap[myattr]
def _upd_fasttrack_delete(self, cls):
for fvalues in six.itervalues(self._fasttrack_index):
for classes in six.itervalues(fvalues):
classes.discard(cls)
for classes in six.itervalues(self._fasttrack_trap):
classes.discard(cls)
def _del_fasttrack(self):
self._fasttrack_attr = set()
self._fasttrack_index = dict()
self._fasttrack_type = dict()
self._fasttrack_typeargs = dict()
self._fasttrack_trap = dict()
def _set_fasttrack(self, attrset):
self._del_fasttrack()
self._fasttrack_attr = set(attrset)
for myattr in self._fasttrack_attr:
self._fasttrack_type[myattr] = None
self._fasttrack_typeargs[myattr] = dict()
self._fasttrack_index[myattr] = collections.defaultdict(WeakSet)
self._fasttrack_trap[myattr] = WeakSet()
for mycls in self._items:
self._upd_fasttrack_index(mycls)
def _get_fasttrack(self):
return set(self._fasttrack_attr)
fasttrack = property(_get_fasttrack, _set_fasttrack)
def _fasttrack_subsetting(self, desc):
if self._fasttrack_attr:
objgroup_list = list()
for k, v in six.iteritems(desc):
if k in self._fasttrack_attr:
# Check if the key's value is in the index
if v in self._fasttrack_index[k]:
indexkey = v
else:
# A type conversion might be usefull
try:
v_conv = self._fasttrack_type[k](v, ** self._fasttrack_typeargs[k])
except (ValueError, TypeError):
v_conv = None
if v_conv is not None and v_conv in self._fasttrack_index[k]:
indexkey = v_conv
else:
# Ok we give up...
indexkey = None
if indexkey is not None:
logger.debug('Fasttrack subsetting took place for key %s', k)
objgroup_list.append(self._fasttrack_index[k][indexkey] |
self._fasttrack_trap[k])
if objgroup_list:
if len(objgroup_list) > 1:
finalset = objgroup_list[0]
for objgroup in objgroup_list[1:]:
finalset = finalset.intersection(objgroup)
return finalset
else:
return objgroup_list[0]
return self._items
[docs] def add(self, *items, **kwargs):
"""Add the ``items`` entries in the current catalog."""
if kwargs.get('abstract', False):
self.abstract_classes.add(*items)
else:
super(Collector, self).add(*items)
for item in items:
self._upd_fasttrack_index(item)
[docs] def discard(self, bye):
"""Remove the ``bye`` entry from current catalog."""
super(Collector, self).discard(bye)
self._upd_fasttrack_delete(bye)
[docs] def pickup_and_cache(self, desc, resolvecache=None):
"""Try to pickup inside the collector an item that could match the description."""
logger.debug('Pick up a "{:s}" in description {!s} with collector {!r}'.format(self.tag, desc, self))
if resolvecache is None:
resolvecache = ResolveCache()
emptywarning = desc.pop('_emptywarning', True)
mkstdreport = desc.pop('_report', self.report_auto)
reportstyle = desc.pop('_report_style', self.report_style)
for hidden in [x for x in desc.keys() if x.startswith('_')]:
logger.warning('Hidden argument "%s" ignored in pickup attributes', hidden)
del desc[hidden]
if self.tag in desc and desc[self.tag] is not None:
logger.debug('A %s is already defined %s', self.tag, str(desc[self.tag]))
else:
desc[self.tag] = self.find_best(desc, resolvecache=resolvecache)
if desc[self.tag] is not None:
desc = desc[self.tag].footprint_cleanup(desc)
elif emptywarning:
dumper = dump.get()
logger.warning("No %s found in description \n%s", self.tag, dumper.cleandump(desc))
if mkstdreport and self.report:
print("\n", self.report_log.info(), "\n")
if reportstyle == config.RAW_REPORTINGSTYLE:
self.report_last.lightdump()
if reportstyle == config.FLAT_REPORTINGSTYLE:
altreport = self.report_last.as_flat()
altreport.reshuffle([str('why'), str('attribute')], skip=False)
altreport.fulldump()
altreport.reshuffle([str('only'), str('attribute')], skip=False)
altreport.fulldump()
if reportstyle == config.FACTORIZED1_REPORTINGSTYLE:
altreport = self.report_sorted()
altreport.orderedprint()
if reportstyle == config.FACTORIZED2_REPORTINGSTYLE:
altreport = self.report_sorted()
altreport.dumper()
return desc, resolvecache
[docs] def pickup(self, desc, resolvecache=None):
"""Try to pickup inside the collector an item that could match the description."""
return self.pickup_and_cache(desc, resolvecache=resolvecache)[0]
[docs] def find_any(self, desc, resolvecache=None):
"""
Return the first item of the collector that :meth:`footprint_couldbe`
as described by argument ``desc``.
"""
logger.debug('Search any %s in collector %s', str(desc), str(self._items))
if resolvecache is None:
resolvecache = ResolveCache()
requeue = True
report_log = None if self.report == config.ONERROR_REPORTING else self.report_log
while requeue:
requeue = False
if self.report and report_log is not None:
report_log.add(collector=self)
for item in self._fasttrack_subsetting(desc):
resolved, u_input = item.footprint_couldbe(desc, # @UnusedVariable
resolvecache=resolvecache,
report=report_log)
if resolved:
return item(resolved, checked=True)
if (self.report == config.ONERROR_REPORTING and
report_log is None):
requeue = True
report_log = self.report_log
return None
[docs] def find_all(self, desc, resolvecache=None):
"""
Returns all the items of the collector that :meth:`footprint_couldbe`
as described by argument ``desc``.
"""
logger.debug('Search all %s in collector %s', str(desc), str(self._items))
if resolvecache is None:
resolvecache = ResolveCache()
requeue = True
report_log = None if self.report == config.ONERROR_REPORTING else self.report_log
while requeue:
requeue = False
found = list()
if self.report and report_log is not None:
report_log.add(collector=self)
for item in self._fasttrack_subsetting(desc):
resolved, theinput = item.footprint_couldbe(desc,
resolvecache=resolvecache,
report=report_log)
if resolved:
found.append((item, resolved, theinput))
if (not found and
self.report == config.ONERROR_REPORTING and
report_log is None):
requeue = True
report_log = self.report_log
return found
[docs] def find_best(self, desc, resolvecache=None):
"""
Returns the best of the items returned by the :meth:`find_all` method
according to potential priority rules.
"""
logger.debug('Search best %s in collector %s', str(desc), str(self._items))
candidates = self.find_all(desc, resolvecache=resolvecache)
if not candidates:
return None
if len(candidates) > 1:
candidates.sort(key=lambda x: x[0].footprint_weight(len(x[2])), reverse=True)
ambiguous = candidates[0][0].footprint_pl() == candidates[1][0].footprint_pl()
loglevel = logging.WARNING if ambiguous else self.non_ambiguous_loglevel
dumper = dump.get()
logger.log(loglevel, "Multiple %s candidates \n%s", self.tag, dumper.cleandump(desc))
for i, c in enumerate(candidates):
thisclass, u_resolved, theinput = c # @UnusedVariable
logger.log(loglevel, 'no.%d in.%d is %s', i + 1, len(theinput), str(thisclass))
topcl, topr, u_topinput = candidates[0] # @UnusedVariable
return topcl(topr, checked=True)
[docs] def load(self, **desc):
"""Return the value matching current collector's tag after pickup of attributes."""
return self.pickup(desc).get(self.tag, None)
[docs] def almost_clone(self, original, **extra):
"""Return an almost clone, with some extra or different attributes."""
assert hasattr(original, 'footprint_as_dict')
attrs = original.footprint_as_dict()
attrs.update(extra)
return self.load(**attrs)
[docs] def default(self, **kw):
"""
Try to find in existing instances tracked by the ``tag`` collector
a suitable candidate according to description.
"""
for inst in self.instances():
if inst.footprint_reusable() and inst.footprint_compatible(kw):
return inst
return self.load(**kw)
[docs] def grep(self, **kw):
"""
Grep in the current instances of the collector items that match
the set of attributes given as named arguments.
"""
okmatch = list()
for item in self.instances:
ok = True
for k, v in kw.items():
if not hasattr(item, k) or getattr(item, k) != v:
ok = False
break
if ok:
okmatch.append(item)
return okmatch
[docs] def build_attrmap(self, attrmap=None, only=None):
"""Build a reversed attr-class map."""
if attrmap is None:
attrmap = dict()
if only is not None and not hasattr(only, '__contains__'):
only = (only,)
for c in self:
fp = c.footprint_retrieve()
for k in [ka for ka in fp.attr.keys() if only is None or ka in only]:
opt = ' [optional]' if fp.optional(k) else ''
alist = attrmap.setdefault(k + opt, list())
alist.append(dict(
name=c.__name__,
module=c.__module__,
values=fp.get_values(k),
outcast=fp.get_outcast(k)
))
return attrmap
[docs] def show_attrmap(self, only=None):
"""
Show the complete set of attributes that could be found in classes
collected by the current collector, documented with ``values``
or ``outcast`` sets if present.
"""
attrmap = self.build_attrmap(only=only)
for a in sorted(attrmap.keys()):
print(' *', a + ':')
for info in sorted(attrmap[a], key=lambda x: x['name']):
print(' ' * 4, info['name'].ljust(22), '+', info['module'])
for k in [x for x in info.keys() if x not in ('name', 'module') and info[x]]:
print(' ' * 29, '|', k, '=',
str(info[k]).replace("'", '').replace('(', '').replace(')', '').strip(','))
print()
[docs] def show_attrkeys(self, only=None):
"""
Show the list of attribute names that could be found in classes
collected by the current collector.
"""
attrmap = self.build_attrmap(only=only)
for a in [x.split() + [''] for x in sorted(attrmap.keys())]:
print(' *', a[0].ljust(24), a[1])
[docs] def get_values(self, attrname):
"""Complete set of values which are explicitly authorized for a given attribute."""
allvalues = set()
for c in self:
fp = c.footprint_retrieve()
if attrname in fp.attr:
for v in fp.get_values(attrname):
allvalues.add(v)
return sorted(allvalues)
[docs] def report_dump(self, stamp=False):
"""Print a nicelly formatted dump report as a dict."""
self.report_log.fulldump(stamp=stamp)
@property
def report_last(self):
"""
Return the subpart of the report related to the last sequence
of evaluation through the current collector.
"""
return self.report_log.last
[docs] def report_sorted(self, **kw):
"""
Return the subpart of the report related to the last sequence
of evaluation through the current collector ordered by args.
"""
return self.report_last.as_tree(**kw)
[docs] def report_dumplast(self):
"""Print a nicely formatted dump report as a dict."""
print(dump.fulldump(self.report_last.as_dict()))
[docs] def report_whynot(self, classname):
"""
Report why any class matching the ``classname`` pattern
has not been selected through the last evaluation.
"""
return self.report_log.whynot(classname)
# Utility classes that cache some results in order to speed-up the resolution
[docs]class ResolveCache(object):
def __init__(self):
setup = config.get()
self.defaults = setup.defaults
self.extras = setup.extras()
self._shallow_cache = dict()
def get_shallow_fp(self, obj):
if obj not in self._shallow_cache:
self._shallow_cache[obj] = obj.footprint_as_shallow_dict()
return self._shallow_cache[obj]