X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f04ff34e47cc5ff1e625e9a5c1b1b0cbeb810d8c..c663e037c8ad2c8dab1f4867a51e3d045357c9f6:/tools/crunchstat-summary/crunchstat_summary/summarizer.py diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py index 68fe2b4885..b2f6f1bb70 100644 --- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py +++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py @@ -37,6 +37,7 @@ WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart class Task(object): def __init__(self): self.starttime = None + self.finishtime = None self.series = collections.defaultdict(list) @@ -70,8 +71,12 @@ class Summarizer(object): def run(self): logger.debug("%s: parsing logdata %s", self.label, self._logdata) + with self._logdata as logdata: + self._run(logdata) + + def _run(self, logdata): self.detected_crunch1 = False - for line in self._logdata: + for line in logdata: if not self.detected_crunch1 and '-8i9sb-' in line: self.detected_crunch1 = True @@ -111,12 +116,14 @@ class Summarizer(object): logger.debug('%s: done %s', self.label, uuid) continue - m = re.search(r'^(?P[^\s.]+)(\.\d+)? (?P\S+) \d+ (?P\d+) stderr crunchstat: (?P\S+) (?P.*?)( -- interval (?P.*))?\n$', line) + # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get + m = re.search(r'^(?P[^\s.]+)(\.\d+)? (?P\S+) \d+ (?P\d+) stderr (?Pcrunchstat: )(?P\S+) (?P.*?)( -- interval (?P.*))?\n$', line) if not m: continue else: # crunch2 - m = re.search(r'^(?P\S+) (?P\S+) (?P.*?)( -- interval (?P.*))?\n$', line) + # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get + m = re.search(r'^(?P\S+) (?Pcrunchstat: )?(?P\S+) (?P.*?)( -- interval (?P.*))?\n$', line) if not m: continue @@ -130,7 +137,7 @@ class Summarizer(object): continue elif m.group('category') in ('error', 'caught'): continue - elif m.group('category') in ['read', 'open', 'cgroup', 'CID']: + elif m.group('category') in ('read', 'open', 'cgroup', 'CID', 'Running'): # "stderr crunchstat: read /proc/1234/net/dev: ..." # (old logs are less careful with unprefixed error messages) continue @@ -154,15 +161,24 @@ class Summarizer(object): raise ValueError("Cannot parse timestamp {!r}".format( timestamp)) - if not task.starttime: - task.starttime = timestamp + if task.starttime is None: logger.debug('%s: task %s starttime %s', self.label, task_id, timestamp) - task.finishtime = timestamp + if task.starttime is None or timestamp < task.starttime: + task.starttime = timestamp + if task.finishtime is None or timestamp > task.finishtime: + task.finishtime = timestamp - if not self.starttime: + if self.starttime is None or timestamp < task.starttime: self.starttime = timestamp - self.finishtime = timestamp + if self.finishtime is None or timestamp < task.finishtime: + self.finishtime = timestamp + + if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None: + elapsed = (task.finishtime - task.starttime).seconds + self.task_stats[task_id]['time'] = {'elapsed': elapsed} + if elapsed > self.stats_max['time']['elapsed']: + self.stats_max['time']['elapsed'] = elapsed this_interval_s = None for group in ['current', 'interval']: @@ -178,8 +194,16 @@ class Summarizer(object): else: stats[stat] = int(val) except ValueError as e: - logger.warning('Error parsing {} stat: {!r}'.format( - stat, e)) + # If the line doesn't start with 'crunchstat:' we + # might have mistaken an error message for a + # structured crunchstat line. + if m.group("crunchstat") is None or m.group("category") == "crunchstat": + logger.warning("%s: log contains message\n %s", self.label, line) + else: + logger.warning( + '%s: Error parsing value %r (stat %r, category %r): %r', + self.label, val, stat, category, e) + logger.warning('%s', line) continue if 'user' in stats or 'sys' in stats: stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0) @@ -223,6 +247,8 @@ class Summarizer(object): def long_label(self): label = self.label + if hasattr(self, 'process') and self.process['uuid'] not in label: + label = '{} ({})'.format(label, self.process['uuid']) if self.finishtime: label += ' -- elapsed time ' s = (self.finishtime - self.starttime).total_seconds() @@ -431,11 +457,14 @@ class CollectionSummarizer(Summarizer): self.label = collection_id -def NewSummarizer(process, **kwargs): +def NewSummarizer(process_or_uuid, **kwargs): """Construct with the appropriate subclass for this uuid/object.""" - if not isinstance(process, dict): - uuid = process + if isinstance(process_or_uuid, dict): + process = process_or_uuid + uuid = process['uuid'] + else: + uuid = process_or_uuid process = None arv = arvados.api('v1', model=OrderedJsonModel()) @@ -450,7 +479,7 @@ def NewSummarizer(process, **kwargs): elif '-8i9sb-' in uuid: if process is None: process = arv.jobs().get(uuid=uuid).execute() - klass = JobSummarizer + klass = JobTreeSummarizer elif '-d1hrv-' in uuid: if process is None: process = arv.pipeline_instances().get(uuid=uuid).execute() @@ -521,16 +550,56 @@ class MultiSummarizer(object): def text_report(self): txt = '' - for cname, child in self.children.iteritems(): - if len(self.children) > 1: + d = self._descendants() + for child in d.itervalues(): + if len(d) > 1: txt += '### Summary for {} ({})\n'.format( - cname, child.process['uuid']) + child.label, child.process['uuid']) txt += child.text_report() txt += '\n' return txt + def _descendants(self): + """Dict of self and all descendants. + + Nodes with nothing of their own to report (like + MultiSummarizers) are omitted. + """ + d = collections.OrderedDict() + for key, child in self.children.iteritems(): + if isinstance(child, Summarizer): + d[key] = child + if isinstance(child, MultiSummarizer): + d.update(child._descendants()) + return d + def html_report(self): - return WEBCHART_CLASS(self.label, self.children.itervalues()).html() + return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html() + + +class JobTreeSummarizer(MultiSummarizer): + """Summarizes a job and all children listed in its components field.""" + def __init__(self, job, label=None, **kwargs): + arv = arvados.api('v1', model=OrderedJsonModel()) + label = label or job.get('name', job['uuid']) + children = collections.OrderedDict() + children[job['uuid']] = JobSummarizer(job, label=label, **kwargs) + if job.get('components', None): + preloaded = {} + for j in arv.jobs().index( + limit=len(job['components']), + filters=[['uuid','in',job['components'].values()]]).execute()['items']: + preloaded[j['uuid']] = j + for cname in sorted(job['components'].keys()): + child_uuid = job['components'][cname] + j = (preloaded.get(child_uuid) or + arv.jobs().get(uuid=child_uuid).execute()) + children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs) + + super(JobTreeSummarizer, self).__init__( + children=children, + label=label, + **kwargs) class PipelineSummarizer(MultiSummarizer): @@ -543,7 +612,7 @@ class PipelineSummarizer(MultiSummarizer): else: logger.info( "%s: job %s", cname, component['job']['uuid']) - summarizer = JobSummarizer(component['job'], **kwargs) + summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs) summarizer.label = '{} {}'.format( cname, component['job']['uuid']) children[cname] = summarizer @@ -554,7 +623,7 @@ class PipelineSummarizer(MultiSummarizer): class ContainerTreeSummarizer(MultiSummarizer): - def __init__(self, root, **kwargs): + def __init__(self, root, skip_child_jobs=False, **kwargs): arv = arvados.api('v1', model=OrderedJsonModel()) label = kwargs.pop('label', None) or root.get('name') or root['uuid'] @@ -575,15 +644,20 @@ class ContainerTreeSummarizer(MultiSummarizer): page_filters = [] while True: - items = arv.container_requests().index( + child_crs = arv.container_requests().index( order=['uuid asc'], filters=page_filters+[ ['requesting_container_uuid', '=', current['uuid']]], - ).execute()['items'] - if not items: + ).execute() + if not child_crs['items']: + break + elif skip_child_jobs: + logger.warning('%s: omitting stats from %d child containers' + ' because --skip-child-jobs flag is on', + label, child_crs['items_available']) break - page_filters = [['uuid', '>', items[-1]['uuid']]] - for cr in items: + page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]] + for cr in child_crs['items']: if cr['container_uuid']: logger.debug('%s: container req %s', current['uuid'], cr['uuid']) cr['name'] = cr.get('name') or cr['uuid']