Source code for footprints.collectors

# -*- coding: utf-8 -*-

"""
Handling of footprints collectors.
Module's usage is mostly dedicated to the main footprints package.
The footprints proxy could make some part of the interface visible as well.
"""

from __future__ import print_function, absolute_import, division, unicode_literals

import six

from weakref import WeakSet
import collections
import logging

from bronx.fancies import dump, loggers
from bronx.stdtypes.catalog import Catalog
from bronx.patterns import observer, getbytag

from . import config, priorities, reporting

#: No automatic export
__all__ = []

logger = loggers.getLogger(__name__)


# Module Interface

[docs]def get(**kw): """Return actual collector object matching description.""" return Collector(**kw)
[docs]def keys(): """Return the list of current entries names collected.""" return Collector.tag_keys()
[docs]def values(): """Return the list of current entries values collected.""" return Collector.tag_values()
[docs]def items(): """Return the items of the collectors table.""" return Collector.tag_items()
# Base class
[docs]class Collector(getbytag.GetByTag, Catalog, observer.Observer): """ A class collector is devoted to the gathering of class references that inherit from a given class (here a class with a footprint), according to some other optional criteria. :param int non_ambiguous_loglevel: The loglevel used in the find_best method when several choices are possible but the priority of the various candidates makes the choice easy (default: logging.INFO). """ _tag_default = 'garbage' def __init__(self, **kw): logger.debug('Footprints collector init %s', str(self)) self.instances = Catalog(weak=True) self.abstract_classes = Catalog(weak=True) self.register = True self.report = config.ONERROR_REPORTING self.lreport_len = config.DFLT_MAXLEN_LIGHT_REPORTING self.report_auto = True self.report_tag = None self.report_style = config.RAW_REPORTINGSTYLE self.non_ambiguous_loglevel = logging.INFO getbytag.GetByTag.__init__(self) Catalog.__init__(self, **kw) if self.report_tag is None: self.report_tag = 'footprint-' + self.tag r_maxlen = None if self.report == config.FULL_REPORTING else self.lreport_len self.report_log = reporting.get(tag=self.report_tag, log_maxlen=r_maxlen) config.add2proxies(self) self._del_fasttrack()
[docs] @classmethod def tag_clean(cls, tag): """Return a lower-case string without any "s" at the end.""" return tag.lower().rstrip('s')
[docs] def newobsitem(self, item, info): """Register a new instance of some of the classes in the current collector.""" logger.debug('Notified {!r} new item {!r}'.format(self, item)) self.instances.add(item)
[docs] def delobsitem(self, item, info): """Unregister an existing object in the current collector of instances.""" logger.debug('Notified {!r} del item {!r}'.format(self, item)) self.instances.discard(item)
[docs] def filter_package(self, packname): """Find in current collector classes with name starting with ``packname``.""" return [cl for cl in self.items() if cl.fullname().startswith(packname)]
[docs] def discard_package(self, packname, verbose=True): """Discard from current collector classes with name starting with ``packname``.""" for x in self.filter_package(packname): if verbose: print('Bye...', x) self.discard(x)
[docs] def filter_onflag(self, flagmethod, default=True): """Find in current collector classes with method ``flagmethod`` returning ``default``.""" return [cl for cl in self.items() if hasattr(cl, flagmethod) and getattr(cl, flagmethod)() == default]
[docs] def discard_onflag(self, flagmethod, default=True, verbose=True): """Discard from current collector classes with method ``flagmethod`` returning ``default``.""" for x in self.filter_onflag(flagmethod, default): if verbose: print('Bye...', x) self.discard(x)
[docs] def filter_higher_level(self, tag): """Find in current collector classes with priority level higher or equal to ``level``.""" plevel = priorities.top.level(tag) return [cl for cl in self.items() if cl.footprint_pl() >= plevel]
[docs] def discard_higher_level(self, tag, verbose=True): """Discard from current collector classes with priority level higher or equal to ``level``.""" for x in self.filter_higher_level(tag): if verbose: print('Bye...', x) self.discard(x)
[docs] def filter_lower_level(self, tag): """Find in current collector classes with priority level lower than ``level``.""" plevel = priorities.top.level(tag) return [cl for cl in self.items() if cl.footprint_pl() < plevel]
[docs] def discard_lower_level(self, tag, verbose=True): """Discard from current collector classes with priority level lower than ``level``.""" for x in self.filter_lower_level(tag): if verbose: print('Bye...', x) self.discard(x)
[docs] def reset_package_level(self, packname, tag): """Reset priority level current collector classes with name starting with ``packname``.""" plevel = priorities.top.level(tag) for cl in self.filter_package(packname): fp = cl.footprint_retrieve() fp.priority['level'] = plevel
def _upd_fasttrack_index(self, cls): attrerror = set() for myattr in self._fasttrack_attr: myfp = cls.footprint_retrieve() if myattr in myfp.mandatory(): myvalues = myfp.get_values(myattr) # Is there some restrictions on values ? if myvalues: # Ensure that the attribute types are consistent if self._fasttrack_type[myattr] is None: self._fasttrack_type[myattr] = myfp.attr[myattr].get('type', six.text_type) self._fasttrack_typeargs[myattr] = myfp.attr[myattr].get('args', dict()) else: if not (self._fasttrack_type[myattr] is myfp.attr[myattr].get('type', six.text_type) and self._fasttrack_typeargs[myattr] == myfp.attr[myattr].get('args', dict())): logger.warning("Inconsistent types (%s vs %s) for fasttrack attributes (class: %s). " + "Removing it (%s) from the fasttrack list.", str(self._fasttrack_type[myattr]), str(myfp.attr[myattr].get('type', str)), repr(cls), myattr) attrerror.add(myattr) continue # Let's go... for myvalue in myvalues: self._fasttrack_index[myattr][myvalue].add(cls) # No restrictions on values so it's a potential candidate for everyone else: self._fasttrack_trap[myattr].add(cls) # The attribute is optional or missing so it's a possible candidate for everyone else: self._fasttrack_trap[myattr].add(cls) # Process errors if attrerror: for myattr in set(self._fasttrack_attr): if myattr in attrerror: self._fasttrack_attr.remove(myattr) del self._fasttrack_type[myattr] del self._fasttrack_typeargs[myattr] del self._fasttrack_index[myattr] del self._fasttrack_trap[myattr] def _upd_fasttrack_delete(self, cls): for fvalues in six.itervalues(self._fasttrack_index): for classes in six.itervalues(fvalues): classes.discard(cls) for classes in six.itervalues(self._fasttrack_trap): classes.discard(cls) def _del_fasttrack(self): self._fasttrack_attr = set() self._fasttrack_index = dict() self._fasttrack_type = dict() self._fasttrack_typeargs = dict() self._fasttrack_trap = dict() def _set_fasttrack(self, attrset): self._del_fasttrack() self._fasttrack_attr = set(attrset) for myattr in self._fasttrack_attr: self._fasttrack_type[myattr] = None self._fasttrack_typeargs[myattr] = dict() self._fasttrack_index[myattr] = collections.defaultdict(WeakSet) self._fasttrack_trap[myattr] = WeakSet() for mycls in self._items: self._upd_fasttrack_index(mycls) def _get_fasttrack(self): return set(self._fasttrack_attr) fasttrack = property(_get_fasttrack, _set_fasttrack) def _fasttrack_subsetting(self, desc): if self._fasttrack_attr: objgroup_list = list() for k, v in six.iteritems(desc): if k in self._fasttrack_attr: # Check if the key's value is in the index if v in self._fasttrack_index[k]: indexkey = v else: # A type conversion might be usefull try: v_conv = self._fasttrack_type[k](v, ** self._fasttrack_typeargs[k]) except (ValueError, TypeError): v_conv = None if v_conv is not None and v_conv in self._fasttrack_index[k]: indexkey = v_conv else: # Ok we give up... indexkey = None if indexkey is not None: logger.debug('Fasttrack subsetting took place for key %s', k) objgroup_list.append(self._fasttrack_index[k][indexkey] | self._fasttrack_trap[k]) if objgroup_list: if len(objgroup_list) > 1: finalset = objgroup_list[0] for objgroup in objgroup_list[1:]: finalset = finalset.intersection(objgroup) return finalset else: return objgroup_list[0] return self._items
[docs] def add(self, *items, **kwargs): """Add the ``items`` entries in the current catalog.""" if kwargs.get('abstract', False): self.abstract_classes.add(*items) else: super(Collector, self).add(*items) for item in items: self._upd_fasttrack_index(item)
[docs] def discard(self, bye): """Remove the ``bye`` entry from current catalog.""" super(Collector, self).discard(bye) self._upd_fasttrack_delete(bye)
[docs] def pickup_and_cache(self, desc, resolvecache=None): """Try to pickup inside the collector an item that could match the description.""" logger.debug('Pick up a "{:s}" in description {!s} with collector {!r}'.format(self.tag, desc, self)) if resolvecache is None: resolvecache = ResolveCache() emptywarning = desc.pop('_emptywarning', True) mkstdreport = desc.pop('_report', self.report_auto) reportstyle = desc.pop('_report_style', self.report_style) for hidden in [x for x in desc.keys() if x.startswith('_')]: logger.warning('Hidden argument "%s" ignored in pickup attributes', hidden) del desc[hidden] if self.tag in desc and desc[self.tag] is not None: logger.debug('A %s is already defined %s', self.tag, str(desc[self.tag])) else: desc[self.tag] = self.find_best(desc, resolvecache=resolvecache) if desc[self.tag] is not None: desc = desc[self.tag].footprint_cleanup(desc) elif emptywarning: dumper = dump.get() logger.warning("No %s found in description \n%s", self.tag, dumper.cleandump(desc)) if mkstdreport and self.report: print("\n", self.report_log.info(), "\n") if reportstyle == config.RAW_REPORTINGSTYLE: self.report_last.lightdump() if reportstyle == config.FLAT_REPORTINGSTYLE: altreport = self.report_last.as_flat() altreport.reshuffle([str('why'), str('attribute')], skip=False) altreport.fulldump() altreport.reshuffle([str('only'), str('attribute')], skip=False) altreport.fulldump() if reportstyle == config.FACTORIZED1_REPORTINGSTYLE: altreport = self.report_sorted() altreport.orderedprint() if reportstyle == config.FACTORIZED2_REPORTINGSTYLE: altreport = self.report_sorted() altreport.dumper() return desc, resolvecache
[docs] def pickup(self, desc, resolvecache=None): """Try to pickup inside the collector an item that could match the description.""" return self.pickup_and_cache(desc, resolvecache=resolvecache)[0]
[docs] def find_any(self, desc, resolvecache=None): """ Return the first item of the collector that :meth:`footprint_couldbe` as described by argument ``desc``. """ logger.debug('Search any %s in collector %s', str(desc), str(self._items)) if resolvecache is None: resolvecache = ResolveCache() requeue = True report_log = None if self.report == config.ONERROR_REPORTING else self.report_log while requeue: requeue = False if self.report and report_log is not None: report_log.add(collector=self) for item in self._fasttrack_subsetting(desc): resolved, u_input = item.footprint_couldbe(desc, # @UnusedVariable resolvecache=resolvecache, report=report_log) if resolved: return item(resolved, checked=True) if (self.report == config.ONERROR_REPORTING and report_log is None): requeue = True report_log = self.report_log return None
[docs] def find_all(self, desc, resolvecache=None): """ Returns all the items of the collector that :meth:`footprint_couldbe` as described by argument ``desc``. """ logger.debug('Search all %s in collector %s', str(desc), str(self._items)) if resolvecache is None: resolvecache = ResolveCache() requeue = True report_log = None if self.report == config.ONERROR_REPORTING else self.report_log while requeue: requeue = False found = list() if self.report and report_log is not None: report_log.add(collector=self) for item in self._fasttrack_subsetting(desc): resolved, theinput = item.footprint_couldbe(desc, resolvecache=resolvecache, report=report_log) if resolved: found.append((item, resolved, theinput)) if (not found and self.report == config.ONERROR_REPORTING and report_log is None): requeue = True report_log = self.report_log return found
[docs] def find_best(self, desc, resolvecache=None): """ Returns the best of the items returned by the :meth:`find_all` method according to potential priority rules. """ logger.debug('Search best %s in collector %s', str(desc), str(self._items)) candidates = self.find_all(desc, resolvecache=resolvecache) if not candidates: return None if len(candidates) > 1: candidates.sort(key=lambda x: x[0].footprint_weight(len(x[2])), reverse=True) ambiguous = candidates[0][0].footprint_pl() == candidates[1][0].footprint_pl() loglevel = logging.WARNING if ambiguous else self.non_ambiguous_loglevel dumper = dump.get() logger.log(loglevel, "Multiple %s candidates \n%s", self.tag, dumper.cleandump(desc)) for i, c in enumerate(candidates): thisclass, u_resolved, theinput = c # @UnusedVariable logger.log(loglevel, 'no.%d in.%d is %s', i + 1, len(theinput), str(thisclass)) topcl, topr, u_topinput = candidates[0] # @UnusedVariable return topcl(topr, checked=True)
[docs] def load(self, **desc): """Return the value matching current collector's tag after pickup of attributes.""" return self.pickup(desc).get(self.tag, None)
[docs] def almost_clone(self, original, **extra): """Return an almost clone, with some extra or different attributes.""" assert hasattr(original, 'footprint_as_dict') attrs = original.footprint_as_dict() attrs.update(extra) return self.load(**attrs)
[docs] def default(self, **kw): """ Try to find in existing instances tracked by the ``tag`` collector a suitable candidate according to description. """ for inst in self.instances(): if inst.footprint_reusable() and inst.footprint_compatible(kw): return inst return self.load(**kw)
[docs] def grep(self, **kw): """ Grep in the current instances of the collector items that match the set of attributes given as named arguments. """ okmatch = list() for item in self.instances: ok = True for k, v in kw.items(): if not hasattr(item, k) or getattr(item, k) != v: ok = False break if ok: okmatch.append(item) return okmatch
[docs] def build_attrmap(self, attrmap=None, only=None): """Build a reversed attr-class map.""" if attrmap is None: attrmap = dict() if only is not None and not hasattr(only, '__contains__'): only = (only,) for c in self: fp = c.footprint_retrieve() for k in [ka for ka in fp.attr.keys() if only is None or ka in only]: opt = ' [optional]' if fp.optional(k) else '' alist = attrmap.setdefault(k + opt, list()) alist.append(dict( name=c.__name__, module=c.__module__, values=fp.get_values(k), outcast=fp.get_outcast(k) )) return attrmap
[docs] def show_attrmap(self, only=None): """ Show the complete set of attributes that could be found in classes collected by the current collector, documented with ``values`` or ``outcast`` sets if present. """ attrmap = self.build_attrmap(only=only) for a in sorted(attrmap.keys()): print(' *', a + ':') for info in sorted(attrmap[a], key=lambda x: x['name']): print(' ' * 4, info['name'].ljust(22), '+', info['module']) for k in [x for x in info.keys() if x not in ('name', 'module') and info[x]]: print(' ' * 29, '|', k, '=', str(info[k]).replace("'", '').replace('(', '').replace(')', '').strip(',')) print()
[docs] def show_attrkeys(self, only=None): """ Show the list of attribute names that could be found in classes collected by the current collector. """ attrmap = self.build_attrmap(only=only) for a in [x.split() + [''] for x in sorted(attrmap.keys())]: print(' *', a[0].ljust(24), a[1])
[docs] def get_values(self, attrname): """Complete set of values which are explicitly authorized for a given attribute.""" allvalues = set() for c in self: fp = c.footprint_retrieve() if attrname in fp.attr: for v in fp.get_values(attrname): allvalues.add(v) return sorted(allvalues)
[docs] def report_dump(self, stamp=False): """Print a nicelly formatted dump report as a dict.""" self.report_log.fulldump(stamp=stamp)
@property def report_last(self): """ Return the subpart of the report related to the last sequence of evaluation through the current collector. """ return self.report_log.last
[docs] def report_sorted(self, **kw): """ Return the subpart of the report related to the last sequence of evaluation through the current collector ordered by args. """ return self.report_last.as_tree(**kw)
[docs] def report_dumplast(self): """Print a nicely formatted dump report as a dict.""" print(dump.fulldump(self.report_last.as_dict()))
[docs] def report_whynot(self, classname): """ Report why any class matching the ``classname`` pattern has not been selected through the last evaluation. """ return self.report_log.whynot(classname)
# Utility classes that cache some results in order to speed-up the resolution
[docs]class ResolveCache(object): def __init__(self): setup = config.get() self.defaults = setup.defaults self.extras = setup.extras() self._shallow_cache = dict() def get_shallow_fp(self, obj): if obj not in self._shallow_cache: self._shallow_cache[obj] = obj.footprint_as_shallow_dict() return self._shallow_cache[obj]