Source code for footprints

# -*- coding: utf-8 -*-

"""
A generic multi-purpose fabric for objects with tunable footprints,
i.e. some set of key/value pairs that attributes (possibly optional) could cover.
"""

from __future__ import print_function, absolute_import, division, unicode_literals

import six

import os
import re
import copy
import types
import weakref
import collections

from bronx.fancies import dump, loggers
from bronx.patterns import observer

from . import access, collectors, config, doc
from . import priorities, proxies, reporting, util
from .stdtypes import *


#: No automatic export

__all__ = []


# Default logging

logger = loggers.getLogger('footprints')


# Default setup

setup = config.get(
    docstrings=int(os.environ.get('FOOTPRINT_DOCSTRINGS', 0)),
    shortnames=int(os.environ.get('FOOTPRINT_SHORTNAMES', 0))
)


# Default proxy

proxy = proxies.get()


# Predefined constants

UNKNOWN = '__unknown__'
replattr = re.compile(r'\[(\w+)(?::+([:\w]+))?(?:#(\w+))?(?:%([^\]]+))?\]')


# Footprint exceptions

[docs]class FootprintException(Exception): pass
[docs]class FootprintMaxIter(FootprintException): pass
[docs]class FootprintUnreachableAttr(FootprintException): pass
[docs]class FootprintFatalError(FootprintException): pass
[docs]class FootprintInvalidDefinition(FootprintException): pass
# Module interface
[docs]def pickup(rd): """Find in current description the attributes that are collected under the ``tag`` name.""" return collectors.get(tag=rd.pop('tag', 'garbage'), report=setup.report, lreport_len=setup.lreport_len, report_style=setup.report_style).pickup(rd)
[docs]def load(**kw): """ Same as pickup but operates on an expanded dictionary. Return either ``None`` or an object compatible with the ``tag``. """ return collectors.get(tag=kw.pop('tag', 'garbage'), report=setup.report, lreport_len=setup.lreport_len, report_style=setup.report_style).load(**kw)
[docs]def default(**kw): """ Try to find in existing instances tracked by the ``tag`` collector a suitable candidate according to description. """ return collectors.get(tag=kw.pop('tag', 'garbage'), report=setup.report, lreport_len=setup.lreport_len, report_style=setup.report_style).default(**kw)
[docs]def grep(**kw): """Try to find any instance in all collectors that could match given attributes.""" allgrep = list() for c in collectors.values(): allgrep.extend(c.grep(**kw)) return allgrep
[docs]def collected_classes(): """Return a set of all collected footprint-based classes.""" c_classes = list() for kv in collectors.values(): c_classes.extend(kv.items()) return set(c_classes)
[docs]def collected_priorities(tag): """Print a table of collected classes with a priority level higher or equal to ``tag``.""" plevel = priorities.top.level(tag) for cl in sorted(set([c for cv in collectors.values() for c in cv.filter_higher_level(plevel)]), key=lambda z: z.fullname()): pl = cl.footprint_level() print(pl.rjust(10), '-', cl.fullname())
[docs]def reset_package_priority(packname, tag): """Reset priority level in all collectors for the specified ``package``.""" for c in collectors.values(): c.reset_package_level(packname, tag)
# Base classes
[docs]class Footprint(object): """ This class defines the objects in charge of handling the footprint definition itself and the resolution mecanism through keys-values description matching. """ def __init__(self, *args, **kw): """Initialisation and checking of a given set of footprint.""" myclsname = kw.pop('myclsname', 'unknown class') if kw.pop('nodefault', False): fp = dict(attr=dict()) else: fp = dict( attr=dict(), bind=list(), info='Not documented', only=dict(), priority=dict( level=priorities.top.DEFAULT # @UndefinedVariable ) ) typescheck = collections.defaultdict(list) for a in args: adict = None if isinstance(a, dict) and bool(a): logger.debug('Init Footprint updated with dict %s', a) adict = util.list2dict(a, ('attr', 'only')) if isinstance(a, Footprint) and (bool(a.attr) or bool(a.only)): logger.debug('Init Footprint updated with object %s', a) adict = a.as_dict() if adict is not None: util.dictmerge(fp, adict) if 'attr' in adict: for attr, attrdict in adict['attr'].items(): if 'type' in attrdict: typescheck[attr].append(attrdict['type']) # Check that the type of a given attribute is consistent among # footprints (warning only) for attr, typelist in typescheck.items(): if len(typelist) > 1: fine = True for i in range(len(typelist) - 1, 0, -1): fine = fine and issubclass(typelist[i], typelist[i - 1]) if not fine: logger.warning('%s: Type inconsistency among footprints for attribute %s: %s', myclsname, attr, ",".join([repr(x) for x in typelist])) util.dictmerge(fp, util.list2dict(kw, ('attr', 'only'))) for a in fp['attr'].keys(): fp['attr'][a].setdefault('default', None) fp['attr'][a].setdefault('optional', False) fp['attr'][a].setdefault('access', 'rxx') fp['attr'][a].setdefault('doc_visibility', doc.visibility.DEFAULT) # @UndefinedVariable fp['attr'][a].setdefault('doc_zorder', 0) # doc_zorder is beetween -100 and 100 fp['attr'][a]['doc_zorder'] = min(max(-100, fp['attr'][a]['doc_zorder']), 100) fp['attr'][a]['alias'] = set(fp['attr'][a].get('alias', set())) fp['attr'][a]['remap'] = dict(fp['attr'][a].get('remap', dict())) autoremap = fp['attr'][a]['remap'].pop('autoremap', None) if autoremap is not None: autoremap = util.mktuple(autoremap) if 'first' in autoremap: vfirst = fp['attr'][a]['values'][0] for x in fp['attr'][a]['values'][1:]: fp['attr'][a]['remap'][x] = vfirst fp['attr'][a]['values'] = set(fp['attr'][a].get('values', set())) fp['attr'][a]['outcast'] = set(fp['attr'][a].get('outcast', set())) ktype = fp['attr'][a].get('type', six.text_type) kargs = fp['attr'][a].get('args', dict()) for autoreclass in ('values', 'outcast'): for v in fp['attr'][a][autoreclass]: if not isinstance(v, ktype): fp['attr'][a][autoreclass].remove(v) try: v = ktype(v, **kargs) fp['attr'][a][autoreclass].add(v) logger.debug('Init Footprint [%s] %s reclassed = %s', autoreclass, a, v) except Exception: logger.error('Bad init footprint in [%s]', autoreclass) raise self._fp = fp self._firstguess_keys_internal = None self._resolve_keys_internal = None # Instance docstring... if setup.docstrings: self.__doc__ = doc.format_docstring(self, setup.docstrings, abstractfpobj=True) @property def _fastkeys(self): return self._fp.get('fastkeys', set()) @property def _resolve_keys(self): if self._resolve_keys_internal is None: candidates = list() for k, item in self.attr.items(): candidates.append((not item['optional'], k in self._fastkeys or k in setup.fastkeys, k)) candidates.sort(reverse=True) self._resolve_keys_internal = [list(), list(), None] for item in candidates: self._resolve_keys_internal[0].append(item[2]) # key name self._resolve_keys_internal[1].append(item[1]) # fast self._resolve_keys_internal[0] = collections.deque(self._resolve_keys_internal[0]) self._resolve_keys_internal[1] = collections.deque(self._resolve_keys_internal[1]) self._resolve_keys_internal[2] = set(self._resolve_keys_internal[0]) return [collections.deque(self._resolve_keys_internal[0]), collections.deque(self._resolve_keys_internal[1]), set(self._resolve_keys_internal[2])] @property def _firstguess_keys(self): if self._firstguess_keys_internal is None: self._firstguess_keys_internal = collections.deque() for k, item in self.attr.items(): kdefault = item['default'] self._firstguess_keys_internal.append( (k, item['optional'], item['alias'], kdefault, hasattr(kdefault, 'footprint_value')) ) return self._firstguess_keys_internal def __str__(self): return six.text_type(self.attr) def __repr__(self): """A condensed string representation of the present Footprint.""" return ('<{:s} object at {!s} | info="{:s}">' .format(self.__class__.__name__, hex(id(self)), self.info))
[docs] def allkeys(self): """Return a set of possible keys for the footprint's attributes.""" allk = set() atfp = self.attr for a in atfp: allk.add(a) allk |= atfp[a]['alias'] return allk
[docs] def as_dict(self): """ Returns a shallow copy of the internal footprint structure as a pure dictionary. """ return dict(self._fp)
[docs] def as_copy(self): """ Returns a deep copy of the internal footprint structure as a pure dictionary. Be aware that some objects such as compiled regular expressions remain identical through this indeep copy operation. """ return copy.deepcopy(self._fp)
[docs] def as_opts(self): """Returns the list of all the possible values as attributes or aliases.""" opts = list() for k in self.attr.keys(): opts.extend(self.attr[k]['alias']) opts.extend(self.attr.keys()) return set(opts)
[docs] def nice(self): """Returns a nice dump version of the actual footprint.""" return dump.get().cleandump(self._fp)
[docs] def track(self, desc): """Returns if the items of ``desc`` are found in the specified footstep ``fp``.""" fpa = self._fp['attr'] attrs = list(fpa.keys()) aliases = [] for x in attrs: aliases.extend(fpa[x]['alias']) return [a for a in desc if a in attrs or a in aliases]
[docs] def optional(self, a): """Returns whether the given attribute ``a`` is optional or not in the current footprint.""" return self._fp['attr'][a]['optional']
[docs] def mandatory(self): """Returns the list of mandatory attributes in the current footprint.""" fpa = self._fp['attr'] return [x for x in fpa.keys() if not fpa[x]['optional']]
def _firstguess(self, desc, resolvecache=None): """Produces a complete guess of the actual footprint according to actual description ``desc``.""" if resolvecache is None: resolvecache = collectors.ResolveCache() guess = dict() param = resolvecache.defaults inputattr = set() for k, kopt, kalias, kdefault, kxdefault in self._firstguess_keys: if k in desc and not (kopt and desc[k] is None): guess[k] = desc[k] inputattr.add(k) # logger.debug(' > Attr %s in description : %s', k, desc[k]) else: alias_ok = False for a in kalias: if a in desc and not (kopt and desc[a] is None): guess[k] = desc[a] inputattr.add(k) alias_ok = True break if not alias_ok: if k in param: guess[k] = param[k] inputattr.add(k) else: if kopt: if kdefault is None: guess[k] = UNKNOWN else: if kxdefault: guess[k] = kdefault.footprint_value() else: guess[k] = kdefault else: guess[k] = None return (guess, inputattr) def _findextras(self, desc, resolvecache=None): """ Return a flat dictionary including ground values as defined by ``setup.extras`` extended by a dictionary view of any :class:`FootprintBase` object found in ``desc`` values. """ if resolvecache is None: resolvecache = collectors.ResolveCache() extras = dict(resolvecache.extras) for vdesc in desc.values(): if isinstance(vdesc, FootprintBase): additems = resolvecache.get_shallow_fp(vdesc) extras.update(additems) # if extras: # logger.debug(' > Extras : %s', extras) return extras def _addextras(self, extras, guess, more): """ Extend the specified ``extras`` dictionay with pairs of key/value suggested in the ``more`` dictionary which are not already defined in ``extras`` or the actual ``guess``. """ for k in more.keys(): if k not in extras and k not in guess: extras[k] = more[k] def _process_replm(self, replkv, replm, guessk, changed, guess, extras, myautofmt, requeue=False): """ Deal with calls to properties or methods during the replacement process. """ starter = replkv replms = re.split(':+', replm) for replm in replms: subattr = getattr(starter, replm, None) if subattr is None: guessk = None break else: if callable(subattr): if isinstance(subattr, types.BuiltinFunctionType): starter = subattr() else: try: starter = subattr(guess, extras) except Exception as trouble: logger.critical(trouble) if requeue: starter = '__SKIP__' changed = 0 break else: raise if starter is None: guessk = None break else: starter = subattr if guessk is not None and starter != '__SKIP__': guessk = replattr.sub(myautofmt(starter), guessk, 1) return guessk, changed def _replacement(self, nbpass, k, kfast, guess, extras, todok, todokfast, todokset): """ Try to resolve any replacement sequence inside the ``guess[k]`` value according to actual values in the ``guess`` or ``extras`` current dictionaries. A replacement sequence is a list of one or more items in brackets of the form: * '[key-name]' * '[key-name:attr-name]' or '[key-name::attr-name]' * '[key-name:meth-name]' or '[key-name::meth-name]' If the ``key-name`` could not be found in the actual ``guess`` or ``extras`` dictionaries the method raises an :exception:`FootprintUnreachableAttr`. Additional flags can be added: * '[key-name#01]' will result in '01' if key-name is not in ``guess`` nor in ``extras`` (instead of raising a :exception:`FootprintUnreachableAttr` exception.) * '[key-name%03d]' will print the value of key-name using the '03d' format string. If the format string is incorrect, or if it can not be applied to key-name, a :exception:`ValueError` exception will be raised """ if nbpass > 50: logger.error('Resolve probably cycling too much... %d tries ?', nbpass) raise FootprintMaxIter('Too many Footprint replacements') guessk = guess[k] changed = 1 while changed: changed = 0 if isinstance(guessk, six.string_types): mobj = replattr.search(guessk) if mobj: replk = mobj.group(1) replm = mobj.group(2) replx = mobj.group(3) def myautofmt(repl, myfmt=mobj.group(4)): if myfmt: f_formatter = util.FoxyFormatter() thefmt = ("{0" + myfmt + "}" if (':' in myfmt or '!' in myfmt) else "{0:" + myfmt + "}") try: return f_formatter.format(thefmt, repl) except (ValueError, AttributeError): logger.error('Formating failed for %s. Please check the format string.', mobj.group(0)) raise else: return six.text_type(repl) if replk not in guess and replk not in extras: if replx: changed = 1 # Here we do not call _autofmt since replx is already a str guessk = replattr.sub(replx, guessk, 1) else: logger.error('No %s attribute in guess:', replk) logger.error('%s', guess) logger.error('No %s attribute in extras:', replk) logger.error('%s', extras) logger.error('Actual defaults: %s', setup.defaults) raise FootprintUnreachableAttr('Could not replace attribute ' + replk) if replk in guess: if replk not in todok: changed = 1 if replm: replk_v = guess[replk] guessk, changed = self._process_replm(replk_v, replm, guessk, changed, guess, extras, myautofmt, requeue=False) else: guessk = replattr.sub(myautofmt(guess[replk]), guessk, 1) elif replk in extras: changed = 1 if replm: replk_v = extras[replk] guessk, changed = self._process_replm(replk_v, replm, guessk, changed, guess, extras, myautofmt, requeue=True) else: guessk = replattr.sub(myautofmt(extras[replk]), guessk, 1) if (guessk is not None and isinstance(guessk, six.string_types) and replattr.search(guessk)): # logger.debug(' > Requeue resolve < %s > : %s (npass=%d)', k, guessk, nbpass) todok.append(k) todokfast.append(kfast) todokset.add(k) return False else: # logger.debug(' > No more substitution for %s (npass=%d)', k, nbpass) guess[k] = guessk return True
[docs] def in_values(self, item, values): """Check that item is inside ``values`` or compares as equal to one of these values.""" if item in values: return True else: return bool([x for x in values if x == item])
[docs] def resolve(self, desc, **kw): """Try to guess how the given description ``desc`` could possibly match the current footprint.""" opts = dict(fatal=setup.fatal, fast=setup.fastmode) opts.update(kw) opts_fatal = opts['fatal'] # Shortcut for faster execution opts_fast = opts['fast'] # Shortcut for faster execution report = opts.pop('report', False) or setup.nullreport resolvecache = opts.pop('resolvecache', None) if resolvecache is None: resolvecache = collectors.ResolveCache() guess, attr_input = self._firstguess(desc, resolvecache=resolvecache) extras = self._findextras(desc, resolvecache=resolvecache) attr_seen = set() # Add arguments from current description not yet used to extra parameters self._addextras(extras, guess, desc) if setup.extended: # + Add arguments from defaults footprint not already defined to extra parameters self._addextras(extras, guess, setup.defaults) attrs = self.attr if None in guess.values(): todok = [] todokfast = [] todokset = set() else: todok, todokfast, todokset = self._resolve_keys nbpass = 0 diags = dict() while todok: k = todok.popleft() kfast = todokfast.popleft() todokset.discard(k) kdef = attrs[k] nbpass += 1 if (not self._replacement(nbpass, k, kfast, guess, extras, todok, todokfast, todokset) or guess[k] is None): continue attr_seen.add(k) while guess[k].__hash__ is not None and guess[k] in kdef['remap']: # logger.debug(' > Attr %s remap(%s) = %s', k, guess[k], kdef['remap'][guess[k]]) guess[k] = kdef['remap'][guess[k]] if guess[k] is not UNKNOWN: ktype = kdef.get('type', six.text_type) if kdef.get('isclass', False): if not issubclass(guess[k], ktype): # logger.debug(' > Attr %s class %s not a subclass %s', k, guess[k], ktype) report.add(attribute=k, why=reporting.REPORT_WHY_SUBCLASS, args=ktype.__name__) diags[k] = True guess[k] = None elif not isinstance(guess[k], ktype): # logger.debug(' > Attr %s reclass(%s) as %s', k, guess[k], ktype) kwargs = kdef.get('args', dict()) try: guess[k] = ktype(guess[k], **kwargs) # logger.debug(' > Attr %s reclassed = %s', k, guess[k]) except (ValueError, TypeError, FootprintException): # logger.debug(' > Attr %s badly reclassed as %s = %s', k, ktype, guess[k]) report.add(attribute=k, why=reporting.REPORT_WHY_RECLASS, args=(ktype.__name__, six.text_type(guess[k]))) diags[k] = True guess[k] = None if kdef['values'] and not self.in_values(guess[k], kdef['values']): # logger.debug(' > Attr %s value not in range = %s %s', k, guess[k], kdef['values']) report.add(attribute=k, why=reporting.REPORT_WHY_OUTSIDE, args=guess[k]) diags[k] = True guess[k] = None if kdef['outcast'] and self.in_values(guess[k], kdef['outcast']): # logger.debug(' > Attr %s value excluded from range = %s %s', k, guess[k], kdef['outcast']) report.add(attribute=k, why=reporting.REPORT_WHY_OUTCAST, args=guess[k]) diags[k] = True guess[k] = None if (opts_fast or kfast) and guess[k] is None: # logger.debug(' > Fast exit from resolve on key "%s" (fast=%s, fastkey=%s)', # k, str(opts_fast), str(kfast)) break for k in attrs.keys(): if guess[k] == 'None': guess[k] = None logger.warning(' > Attr %s is a null string', k) if k not in diags: report.add(attribute=k, why=reporting.REPORT_WHY_INVALID) if guess[k] is None: attr_input.discard(k) if k not in diags: report.add(attribute=k, why=reporting.REPORT_WHY_MISSING) if opts_fatal: logger.info('No valid attribute "%s" is fatal', k) raise FootprintFatalError('No attribute `' + k + '` is fatal') # else: # logger.debug(' > No valid attribute %s', k) else: if 'weak' in attrs[k]['access']: guess[k] = weakref.proxy(guess[k]) return (guess, attr_input, attr_seen)
[docs] def checkonly(self, rd, report=setup.nullreport, resolvecache=None): """Ensure that the resolved description also matches at least one item per ``only`` feature.""" if resolvecache is None: resolvecache = collectors.ResolveCache() params = resolvecache.defaults for k, v in self.only.items(): if not hasattr(v, '__iter__'): v = (v,) actualattr = k after, before = False, False if k.startswith('after_'): after = True if k.startswith('before_'): before = True if after or before: actualattr = k.partition('_')[-1] actualvalue = rd.get(actualattr, params.get(actualattr, None)) if actualvalue is None: rd = False report.add(attribute=actualattr, only=reporting.REPORT_ONLY_NOTFOUND, args=k) break checkflag = False for checkvalue in v: if after: checkflag = checkflag or bool(actualvalue >= checkvalue) elif before: checkflag = checkflag or bool(actualvalue < checkvalue) elif hasattr(checkvalue, 'match'): checkflag = checkflag or bool(checkvalue.match(actualvalue)) else: checkflag = checkflag or actualvalue == checkvalue if not checkflag: rd = False if report: report.add(attribute=actualattr, only=reporting.REPORT_ONLY_NOTMATCH, args=v) break return rd
[docs] def get_values(self, attrname): """Return acceptable values for a given ``attrname``.""" return tuple(self.attr[attrname]['values'])
[docs] def get_outcast(self, attrname): """Return inacceptable values for a given ``attrname``.""" return tuple(self.attr[attrname]['outcast'])
@property def info(self): """Read-only property. Direct access to internal footprint informative description.""" return self._fp['info'] @property def attr(self): """Read-only property. Direct access to internal footprint set of attributes.""" return self._fp['attr'] @property def bind(self): """Read-only property. Direct access to internal footprint binding between attributes.""" return self._fp['bind'] @property def only(self): """Read-only property. Direct access to internal footprint restriction rules.""" return self._fp['only'] @property def priority(self): """Read-only property. Direct access to internal footprint priority rules.""" return self._fp['priority'] @property def level(self): """Read-only property. Direct access to internal footprint priority level.""" return self.priority['level']
[docs]class DecorativeFootprint(Footprint): """ This class extends the :class:`Footprint` class. In addition to the definition and processing of the footprint itself, a class decorator can be registered. The :class:`FootprintBaseMeta` will apply this decorator on the target class only when the footprint is used directly (i.e. not when inherited). """ def __init__(self, *args, **kw): super(DecorativeFootprint, self).__init__(*args, **kw) self._decorators = list() for a in args: if isinstance(a, DecorativeFootprint): self._decorators.extend(a.decorators) if 'decorator' in kw: self._decorators.extend(kw['decorator'] if isinstance(kw['decorator'], list) else [kw['decorator'], ]) if not all([callable(d) for d in self._decorators]): raise ValueError("Class decorators must be callables") @property def decorators(self): return self._decorators def as_footprint(self): return Footprint(self._fp)
[docs]class FootprintBaseMeta(type): """ Meta class constructor for :class:`FootprintBase`. The current :data:`_footprint` data which could be a simple dict or a :class:`Footprint` object is used to instantiate a new :class:`Footprint`, built as a merge of the footprint of the base classes. """ def __new__(cls, n, b, d): """ This meta-constructor is in charge of the footprints merging, class registering in footprint collectors and documentation setting. """ logger.debug('Base class for footprint usage "%s / %s", bc = (%s), internal = %s', cls, n, b, d) abstract = d.setdefault('_abstract', False) mkshort = d.setdefault('_mkshort', setup.shortnames) # Footprint merging fplocal = d.get('_footprint', dict()) localdeco = list() bcfp = [c.__dict__.get('_footprint', dict()) for c in b] bcfp.reverse() # That way, footprint's inheritance is consistent with python's if type(fplocal) is list: bcfp.extend(fplocal) for fptmp in fplocal: if isinstance(fptmp, DecorativeFootprint): localdeco.extend(fptmp.decorators) else: bcfp.append(fplocal) if isinstance(fplocal, DecorativeFootprint): localdeco.extend(fplocal.decorators) thisfp = d['_footprint'] = Footprint(*bcfp, myclsname=n) # Setting descriptors for footprint attributes d['_fp_auth'] = hash(d['__module__'] + '.' + n) active_accessors = access.attr_descriptors() for k in thisfp.attr.keys(): k_info = thisfp.attr[k].get('info', None) if setup.docstrings > 1: k_info = (k_info or '').rstrip('.') + ' (see the documentation above for more details).' if isinstance(thisfp.attr[k]['access'], access.FootprintAttrDescriptor): d[k] = thisfp.attr[k]['access'](k, auth=d['_fp_auth'], doc=k_info) else: try: d[k] = active_accessors[thisfp.attr[k]['access']]( k, auth=d['_fp_auth'], doc=k_info ) except AttributeError: logger.error('Could not find any local descriptor with acces mode %s', thisfp.attr['access']) raise # Possibly use short method names if mkshort: for k in [x for x in d.keys() if x.startswith('footprint_')]: kshort = k.replace('footprint_', '') if kshort in d: logger.warning('Shortcut to already defined attribute [%s]', k) else: d[kshort] = d.get(k) # At least build the class itself as a default type realcls = super(FootprintBaseMeta, cls).__new__(cls, n, b, d) # Apply local decorators for deco in localdeco: realcls = deco(realcls) # A class that is not abstrat should register in dedicated collectors if not abstract: if realcls._explicit and not realcls.footprint_mandatory(): raise FootprintInvalidDefinition('Explicit class without any mandatory footprint attribute.') # Add all classes in collectors but take into accout the abstract key for cname in realcls._collector: if cname in thisfp.allkeys(): raise FootprintInvalidDefinition('A attribute or alias name is equal to collector tag: ' + cname) thiscollector = collectors.get(tag=cname, report=setup.report, lreport_len=setup.lreport_len, report_style=setup.report_style) thiscollector.add(realcls, abstract=abstract) if not abstract and thiscollector.register: observer.get(tag=realcls.fullname()).register(thiscollector) logger.debug('Register class %s in collector %s (%s)', realcls, thiscollector, cname) # Docstring building basedoc = realcls.__doc__ if not basedoc: basedoc = 'Not documented yet.' realcls.__doc__ = basedoc if setup.docstrings: realcls.__doc__ += doc.format_docstring(realcls._footprint, setup.docstrings) return realcls
# noinspection PyUnresolvedReferences
[docs]@six.add_metaclass(FootprintBaseMeta) class FootprintBase(object): """ Base class for any other thematic class that would need to incorporate a :class:`Footprint`. Its metaclass is :class:`FootprintBaseMeta`. """ _footprint = Footprint() _abstract = True _explicit = True _reusable = True _collector = ('garbage',) def __init__(self, *args, **kw): logger.debug('Abstract %s init', self.__class__) if self.__class__._abstract: raise FootprintInvalidDefinition('Could not instanciate abstract class.') checked = kw.pop('checked', False) self._attributes = dict() self._puredict = None for a in args: logger.debug('FootprintBase %s arg %s', object.__repr__(self), a) if isinstance(a, dict): self._attributes.update(a) self._attributes.update(kw) if not checked: logger.debug('Resolve attributes at footprint init %s', object.__repr__(self)) self._attributes, u_attr_input, u_attr_seen = self._footprint.resolve(self._attributes, # @UnusedVariable fatal=True) self._observer = observer.get(tag=self.__class__.fullname()) self.footprint_riseup()
[docs] @classmethod def footprint_clskind(cls): """Return a lower-case string of the name of the current footprint class.""" return cls.__name__.lower()
[docs] @classmethod def footprint_clsrealkind(cls): """Return the ``realkind`` property value of the current class.""" return getattr(cls, 'realkind').fget(cls)
@property def realkind(self): """Actual footprint kind, by default the clskind.""" return 'footprintbase' @property def footprint(self): """Footprint associated to current object's class.""" return self.__class__._footprint
[docs] def footprint_clsname(self): """Returns the short name of the object's class.""" return self.__class__.__name__
[docs] @classmethod def footprint_retrieve(cls, **kw): # @UnusedVariable """Returns the internal checked ``footprint`` of the current class object.""" return cls._footprint
[docs] @classmethod def footprint_reusable(cls): """Returns whether the current class could be used for default loading.""" return cls._reusable
[docs] @classmethod def footprint_abstract(cls): """Returns whether the current class could be instanciated or not.""" return cls._abstract
[docs] @classmethod def fullname(cls): """Returns a nicely formatted name of the current class (dump usage).""" return '{0:s}.{1:s}'.format(cls.__module__, cls.__name__)
[docs] def SUPER(self): """A kind of shortcut to parent class. Warning: use with care.""" return super(self.__class__, self)
[docs] def footprint_riseup(self): """Things to do after new or init construction.""" self._observer.notify_new(self, dict())
def __getstate__(self): d = self.__dict__.copy() del d['_observer'] return d def __setstate__(self, state): self._observer = observer.get(tag=self.__class__.fullname()) self.__dict__.update(state) self.footprint_riseup()
[docs] def footprint_getattr(self, attr, auth=None): # @UnusedVariable """Return actual attribute value in internal storage. Protected method.""" thisattr = self._attributes.get(attr, None) if thisattr is UNKNOWN: thisattr = None return thisattr
[docs] def footprint_setattr(self, attr, value, auth=None): """Set actual attribute to the value specified. Protected method.""" if auth != self._fp_auth: raise AttributeError("Can't set attribute without valid authorization") self._attributes[attr] = value
[docs] def footprint_delattr(self, attr, auth=None): """Delete actual attribute. Protected method.""" if auth != self._fp_auth: raise AttributeError("Can't set attribute without valid authorization") del self._attributes[attr]
[docs] def footprint_undefs(self): """Return list of attributes which are still None.""" return [a for a in self.footprint_attributes if self.footprint_getattr(a) is None]
[docs] def footprint_clone(self, full=False, extra=None): """ Return a deep copy of the current object as a brand new one. Only footprint attributes are carried around. Attributes to be replaced or added can be specified in dict **extra**. """ attrs = self._attributes.copy() if extra is not None: attrs.update(extra) objcp = self.__class__(**attrs) if full: for a in [x for x in self.__dict__.keys() if not x.startswith('_')]: setattr(objcp, a, getattr(self, a)) return objcp
[docs] def footprint_has_attribute(self, attr): """Check if the footprint contains the **attr** attribute""" return attr in self._attributes
@property def footprint_attributes(self): """Returns the list of current attributes.""" return sorted(self._attributes.keys()) @property def footprint_attributes_values(self): """Returns the list of current attributes values.""" return sorted(self._attributes.values())
[docs] def footprint_as_shallow_dict(self): """Returns a dictionary that contains the current attributes (shallow copy).""" _puredict = dict() for k in self._attributes.keys(): _puredict[k] = getattr(self, k) return _puredict
[docs] def footprint_as_dict(self): """Returns a dictionary that contains a deepcopy of the current attributes.""" puredict = dict() for k in self._attributes.keys(): puredict[k] = copy.deepcopy(getattr(self, k)) return puredict
[docs] def footprint_export(self): """See the current footprint as a pure dictionary when exported.""" exd = dict() for k in self._attributes.keys(): exportmethod = 'footprint_export_' + k if hasattr(self, exportmethod): exd[k] = getattr(self, exportmethod)() else: thisattr = getattr(self, k) if hasattr(thisattr, 'footprint_export'): exd[k] = thisattr.footprint_export() elif hasattr(thisattr, 'export_dict'): exd[k] = thisattr.export_dict() else: exd[k] = copy.deepcopy(thisattr) return exd
def _str_more(self): """Additional information to be combined in repr output.""" return 'footprint={!s}'.format(len(self._attributes)) def __str__(self): """ Basic layout for nicely formatted print, built as the concatenation of the class full name and some :meth:`_str_more` additional information. """ return '{0:s} | {1:s}>'.format(repr(self).rstrip('>'), self._str_more()) @property def footprint_info(self): """Information from the current footprint.""" return self._footprint.info
[docs] @classmethod def footprint_mandatory(cls): """ Returns the attributes that should be present in a description in order to be able to match the current object. """ return cls._footprint.mandatory()
[docs] @classmethod def footprint_optional(cls, a): """Returns whether the specified attribute ``a`` is optional or not.""" return cls._footprint.optional(a)
[docs] @classmethod def footprint_couldbe(cls, rd, report=None, mkreport=False, resolvecache=None): """ This is the heart of any selection purpose, particularly in relation with the :meth:`find_all` mechanism of :class:`footprints.Collector` classes. It returns the *resolved* form in which the current ``rd`` description could be recognized as a footprint of the current class, :data:`False` otherwise. """ if mkreport and not report: report = reporting.get(tag='void') report.add(collector=proxy.garbages) if report: report.add(candidate=cls) fp = cls._footprint resolved, attr_input, u_attr_seen = fp.resolve(rd, fatal=False, report=report, # @UnusedVariable resolvecache=resolvecache) if resolved and None not in resolved.values(): return (fp.checkonly(resolved, report, resolvecache=resolvecache), attr_input) else: if mkreport: report.last.lightdump() return (False, attr_input)
[docs] def footprint_compatible(self, rd): """ Resolve a subset of a description according to my footprint, and then compare to my actual values. """ fp = self.footprint resolved, u_inputattr, u_attr_seen = fp.resolve(rd, fatal=False, report=None) # @UnusedVariable rc = resolved and None not in resolved.values() if rc: for k in resolved.keys(): if self._attributes[k] != resolved[k]: rc = False break return rc
[docs] def footprint_cleanup(self, rd): """ Removes in the specified ``rd`` description the keys that are tracked as part of the footprint of the current object. """ fp = self.footprint for attr in fp.track(rd): logger.debug('Removing attribute %s : %s', attr, rd[attr]) del rd[attr] return rd
[docs] @classmethod def footprint_weight(cls, realinputs): """Tuple with ordered weights to make a choice possible between various electible footprints.""" fp = cls._footprint return (fp.priority['level'].rank, realinputs)
[docs] @classmethod def footprint_values(cls, attrname): """Return the list of authorized values of a footprint attribute (if any).""" return list(cls._footprint.attr[attrname]['values'])
[docs] @classmethod def footprint_access(cls, attrname): """Return the access mode of a footprint attribute.""" rwd = cls._footprint.attr[attrname]['access'] if isinstance(rwd, access.FootprintAttrDescriptor): rwd = rwd.access_mode return rwd
[docs] @classmethod def footprint_pl(cls): """Return the priority level of the current class footprint object.""" return cls._footprint.level
[docs] @classmethod def footprint_level(cls): """Return the tag name of the priority level of the current class footprint object.""" return cls._footprint.level.tag