Source code for footprints.reporting

"""
Hierarchical documents to store footprints information.
StandardReport is derived from :class:`xml.dom.minidom.Document`.
"""

import collections
from datetime import datetime
import re
import weakref

from bronx.fancies import dump
from bronx.patterns import getbytag

#: No automatic export
__all__ = []

REPORT_WHY_MISSING = 'Missing value'
REPORT_WHY_INVALID = 'Invalid value'
REPORT_WHY_OUTSIDE = 'Not in values'
REPORT_WHY_OUTCAST = 'Outcast value'
REPORT_WHY_RECLASS = 'Could not reclass'
REPORT_WHY_SUBCLASS = 'Not a subclass'

REPORT_ONLY_NOTFOUND = 'No value found'
REPORT_ONLY_NOTMATCH = 'Do not match'


# Module Interface

[docs]def get(**kw): """Return actual footprint log object matching description.""" return FootprintLog(**kw)
[docs]def keys(): """Return the list of current entries names collected.""" return FootprintLog.tag_keys()
[docs]def values(): """Return the list of current entries values collected.""" return FootprintLog.tag_values()
[docs]def items(): """Return the items of the footprint logs table.""" return FootprintLog.tag_items()
[docs]class FootprintBadLogEntry(Exception): """Exception raised when inappropriate log entries are met.""" pass
[docs]class NullReport: """Fake reporting: accept any log report command but do nothing.""" def __init__(self, *args, **kw): self._maxlen = 1000 self._blindlog = collections.deque(maxlen=self._maxlen) def __len__(self): return len(self._blindlog)
[docs] def items(self): """Internal list of items recorded.""" return self._blindlog
[docs] def clear(self): """Rewind internal raw list of log commands.""" self._blindlog = collections.deque(maxlen=self._maxlen)
[docs] def add(self, *args, **kw): """Push any arg provided to the internal raw list of log commands.""" if args: self._blindlog.append(args) if kw: self._blindlog.append(kw)
[docs]class FootprintLogEntry: """ Generic entry item in the footprint log. Could be : * a collector item * a candidate item (i.e.: a class) """ def __init__(self, node, **kw): self.context = 'void' self.stamp = datetime.now() self._items = list() self._weak = kw.pop('weak', True) self.__dict__.update(kw) if self._weak: self._node = weakref.ref(node) else: self._node = node @property def node(self): if self._weak: return self._node() else: return self._node
[docs] def add(self, item): """Push the specified ``item`` at the end of the internal log list.""" self._items.append(item)
[docs]class FootprintLogCollector(FootprintLogEntry): """Dedicated entry to :class:`footprints.Collector` items.""" def __init__(self, node, **kw): """Default name is the ``node`` entry keypoint.""" super().__init__(node, **kw) self.name = self.node.tag def __iter__(self): """Iterates on :class:`FootprintLogClass` items.""" yield from sorted(self._items, key=lambda item: item.name)
[docs] def feed_xml(self, xmlnode): """Insert in the specified ``xmlnode`` informations relative to candidate classes.""" xmlbase = xmlnode.new_entry('collector', name=self.name, stamp=self.stamp.isoformat()) for kid in self: xmlnode.current(xmlbase) kid.feed_xml(xmlnode)
[docs] def as_dict(self): """Convenient method for retrieving some handy dictionary.""" dico = dict() for item in self._items: dico[item.name] = item.as_dict() return dico
[docs] def as_tree(self, **kw): """Feed a :class:`FactorizedReport` according to the order specified.""" fr = FactorizedReport(**kw) for kid in self: for item in kid: info = item.copy() info['class'] = kid.name fr.add(**info) return fr
[docs] def as_flat(self, **kw): """Feed a :class:`FlatReport` according to the order specified.""" flat = FlatReport(**kw) for kid in self: for item in kid: info = item.copy() info['attribute'] = info.pop('name') flat.add(focus=kid.name, **info) return flat
[docs] def lightdump(self, **kw): """Pseudo structured dump of the current collector item report.""" for kid in self: kid.lightdump(**kw)
[docs]class FootprintLogClass(FootprintLogEntry): """Dedicated entry to :class:`footprints.FootprintBase` items.""" def __init__(self, node, parent, **kw): """Default name is the ``node`` fullname method output.""" super().__init__(node, **kw) self.name = self.node.fullname() self.parent = parent self.parent.add(self) def __iter__(self): yield from self._items
[docs] def feed_xml(self, xmlnode): """Insert in the specified ``xmlnode`` informations relative to attributes of the candidate class.""" xmlnode.current(xmlnode.add('class', name=self.name)) for kid in self._items: kidstr = {k: str(v) for k, v in kid.items()} xmlnode.add('attribute', **kidstr)
[docs] def as_dict(self): """Convenient method for retrieving a handy dictionary.""" dico = dict() for item in self._items: info = item.copy() attr = info.pop('name') dico[attr] = info return dico
[docs] def lightdump(self, indent=' ', attrjust=10): """Pseudo structured dump of the current class item report.""" if self._items: print(indent, self.name) for item in self._items: info = item.copy() print(indent * 2, info.pop('name').ljust(attrjust), ':', info) else: print('=>'.rjust(len(indent)), self.name) print()
[docs]class FootprintLog(getbytag.GetByTag): """Collect log informations to produce footprints reports.""" def __init__(self, log_maxlen=None, weak=True): self._log = collections.deque(maxlen=log_maxlen) self._weak = weak self._current = None self._xml = None self._dict = dict() self._touch = False
[docs] def info(self): """Return a simple description as a string.""" return 'Report ' + self.tag.title() + ':'
@property def weak(self): """Boolean value, true if the log was built with weak references (default).""" return self._weak def __len__(self): return len(self._log) def __iter__(self): yield from self._log
[docs] def items(self): """Internal list of items recorded.""" return self._log
[docs] def clear(self): """Start a fresh new log history.""" self._log = list()
[docs] def reduce_to_last(self): """Remove from the current log history all but the last collector resolution attempt.""" self._log[0:-1] = []
@property def last(self): return self._log[-1] if self._log else None
[docs] def current(self, node=None): """Return current active entry (collector or class) of the log.""" if node: self._current = node return self._current
[docs] def add_collector(self, node, **kw): """Insert a collector entry into the log.""" self._current = FootprintLogCollector(node, **kw) self._log.append(self._current) self._touch = True
[docs] def add_candidate(self, node, **kw): """Insert a class entry into the log.""" if self._current is not None and isinstance(self._current, FootprintLogClass): self._current = self._current.parent if self._current is None or not isinstance(self._current, FootprintLogCollector): raise FootprintBadLogEntry('Current log context is either empty or not a collector') self._current = FootprintLogClass(node, parent=self._current, **kw) self._touch = True
[docs] def add_attribute(self, name, **kw): """Insert an attribute resolution information entry into the log.""" if self._current is None or not isinstance(self._current, FootprintLogClass): raise FootprintBadLogEntry('Current log context is either empty or not a class candidate') kw['name'] = name self._current.add(kw) self._touch = True
[docs] def add(self, **kw): """ Add an entry to the current log. One of these arguments should be provided: * collector * candidate * attribute """ this_collector = kw.pop('collector', None) if this_collector is not None: return self.add_collector(this_collector, weak=self.weak, **kw) this_candidate = kw.pop('candidate', None) if this_candidate is not None: return self.add_candidate(this_candidate, weak=self.weak, **kw) this_attribute = kw.pop('attribute', None) if this_attribute is not None: return self.add_attribute(this_attribute, **kw) raise FootprintBadLogEntry('Log entry should be a collector, a class candidate or an attribute.')
[docs] def whynot(self, select): """ Diagnostic method for retrieving valuable information on the reason why some class candidates have failed. The ``select`` argument is used as a pattern matching on full class names (case insensitive). """ if self.last: info = self.last.as_dict() for k in [x for x in info if not re.search(select, x, re.IGNORECASE) or not info[x]]: del info[k] return info else: return None
[docs] def as_xml(self, force=False): """Return a true class:`xml.dom.minidom.Document`.""" if not self._xml or self._touch or force: self._xml = StandardReport(tag=self.tag) for item in self._log: item.feed_xml(self._xml) self._touch = False return self._xml
[docs] def as_dict(self, force=False, stamp=True): """Convenient method for retrieving some handy dictionary.""" if not self._dict or self._touch or force: self._dict = dict() for i, item in enumerate(self._log): key = item.name if stamp: key += ' ' + item.stamp.isoformat() else: key += '_{:04d}'.format(i + 1) self._dict[key] = item.as_dict() self._touch = False return self._dict
[docs] def fulldump(self, stamp=False): """Shortcut to :mod:``dump`` facilities.""" print(dump.fulldump(self.as_dict(force=True, stamp=stamp)))
[docs]class StandardReport: """XML structured report.""" def __init__(self, doc=None, tag=None): if doc is None: import xml.dom.minidom self._doc = xml.dom.minidom.Document() else: self._doc = doc self.root = self._doc.createElement('report') self.root.setAttribute('tag', tag) self._doc.appendChild(self.root) self._current = self.root def __call__(self): """Print the complete dump of the current report object.""" print(self.dump_all()) @property def doc(self): return self._doc
[docs] def add(self, key, **kw): """Add a information node to the ``base`` or to the current node.""" base = kw.pop('base', self.current()) entry = self.doc.createElement(key) for k, v in sorted(kw.items()): entry.setAttribute(k, v) base.appendChild(entry) return base.lastChild
[docs] def new_entry(self, key, **kw): """Insert a top level entry (child of the root node).""" kw['base'] = self.root return self.add(key, **kw)
[docs] def current(self, node=None): """Return the current active node of the document.""" if node: self._current = node return self._current
[docs] def dump_all(self): """Return a string with a complete formatted dump of the document.""" return self.doc.toprettyxml(indent=' ')
[docs] def dump_last(self): """Return a string with a complete formatted dump of the last entry.""" return self.root.lastChild.toprettyxml(indent=' ')
[docs] def iter_last(self): """Iterate on last node and return ( class, name, why ) information.""" for kid in self.root.lastChild.childNodes: dico = dict(classname=kid.getAttribute('name')) for subkid in kid.childNodes: dico['name'] = subkid.getAttribute('name') dico['why'] = subkid.getAttribute('why') yield dico
[docs]class FlatReport: """Store entries as simple dictionaries that could be hierarchically reshuffled afterward.""" def __init__(self, sortlist=None): """By default the report is empty.""" self._items = list() self._tree = dict() if sortlist is None: sortlist = list() self._sort = list(sortlist)
[docs] def add(self, **kw): """Push the current key-value description as a new report entry.""" self._items.append(kw)
[docs] def reshuffle(self, sortlist=None, skip=True): """ Sort the entire set of items as a hierarchical tree driven by keys of the specified ``sortlist``. """ self._tree = dict() if sortlist is not None: self._sort = sortlist[:] for item in self._items: current = self._tree info = item.copy() done = True for k in self._sort: if k in info: entry = k + ': ' + info.pop(k) if entry not in current: current[entry] = dict() current = current[entry] else: done = False if not skip: break if done or skip: focus = info.pop('focus') if info: current[focus] = ' / '.join([str(x) + ': ' + str(info[x]) for x in info.keys()]) else: current[focus] = None
[docs] def fulldump(self): """Print out the internal tree.""" print('- ' * 5, "\n") print(self.__class__.__name__, 'shuffle', self._sort) print(dump.fulldump(self._tree)) print()
[docs]class FactorizedReport: def __init__( self, focus='class', indent=' ', ordering=( (('name', ), ('kind', )), (('why', 'only'), (REPORT_WHY_MISSING, REPORT_WHY_INVALID, REPORT_WHY_OUTSIDE, REPORT_WHY_OUTCAST, REPORT_WHY_RECLASS, REPORT_WHY_SUBCLASS, REPORT_ONLY_NOTFOUND, REPORT_ONLY_NOTMATCH)), ), renaming=(('name', 'attribute_name'),)): """ Generates a report whose items are sorted using some parameters: * ``tag`` is the end-level entry that has to be sorted * ``ordering`` describes the sorting options. Ordering must be a list of pair (key-name, selected-values) where: * the order of the list defines the priority order for sorting * the selected-values is a tuple of values to focus on if encountered. """ self.focus = focus self._define = collections.OrderedDict(ordering) self._renaming = dict(renaming) self._indent = indent self._tree = dict() def _depth_key(self, depth): return list(self.keys())[depth] def get_order(self, dic, depth): order = list() other = list(dic.keys()) for val in self.interestingValues(list(self.keys())[depth]): for v in other: if v[1].startswith(val): order.append(v) other.remove(v) order.extend(other) return order def keys(self): return self._define.keys() def __len__(self): return len(self._define) def interestingValues(self, key): return self._define[key] def add(self, **kw): tagvalue = kw[self.focus] dic = self._tree for k in self.keys(): for ki in k: kj = self._renaming.get(ki, ki) v = kw.get(ki, None) if v is not None: if (kj, v) not in dic: dic[(kj, v)] = dict() dic = dic[(kj, v)] break if v is None: raise KeyError("Ordering key not found: {:s}".format(k)) info = kw.get('args', '') dic[tagvalue] = str(info) def printer(self, dic, currentindent, depth, ordered=False): if depth == len(self): for tagValue in sorted(dic.keys()): print(currentindent, self.focus, ':', tagValue, '(' + dic[tagValue] + ')') else: if ordered: order = self.get_order(dic, depth) else: order = dic for v in order: print('{:s} {:s} = {:s}'.format(currentindent, *v)) self.printer(dic[v], currentindent + self._indent, depth + 1, ordered) def softprint(self): self.printer(self._tree, self._indent, 0) def orderedprint(self): self.printer(self._tree, self._indent, 0, ordered=True) def simpleprinter(self, dic, depth, msg=None, space=True): if depth == len(self): if space: print() for tagValue in sorted(dic.keys()): print(self._indent, self.focus, ':', tagValue, '(' + dic[tagValue] + ')') if msg: print(self._indent * 3, msg) else: for v in self.get_order(dic, depth): newmsg = msg + ' | ' if msg else '' newmsg += '{:s} = {:s}'.format(*v) self.simpleprinter(dic[v], depth + 1, newmsg) def niceprinter(self, dic, depth, maxdepth, group, msg=None, separator='+'): if depth == maxdepth: self.simpleprinter(dic, depth, msg, depth % group != 0) else: toprint = None if depth % group == 0: toprint = msg msg = None if toprint: separator = {'+': '-', '-': '~'}.get(separator, separator) for v in self.get_order(dic, depth): newmsg = msg + ' | ' if msg else '' newmsg += '{:s} = {:s}'.format(*v) self.niceprinter(dic[v], depth + 1, maxdepth, group, newmsg, separator) if depth % group == 0: print(self._indent + (separator * (40 + 5 * len(self._indent)))) if toprint: print(self._indent * ((maxdepth - depth) // group + 4), toprint) def dumper(self, maxdepth=1, group=1): if maxdepth > len(self): maxdepth = len(self) self.niceprinter(self._tree, 0, maxdepth, group)