class Summarizer(object):
existing_constraints = {}
- def __init__(self, logdata, label=None):
+ def __init__(self, logdata, label=None, include_child_jobs=True):
self._logdata = logdata
+
self.label = label
self.starttime = None
self.finishtime = None
- logger.debug("%s: logdata %s", self.label, repr(logdata))
+ self._include_child_jobs = include_child_jobs
- def run(self):
- logger.debug("%s: parsing log data", self.label)
# stats_max: {category: {stat: val}}
self.stats_max = collections.defaultdict(
functools.partial(collections.defaultdict,
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
+
+ self.seq_to_uuid = {}
self.tasks = collections.defaultdict(Task)
+
+ logger.debug("%s: logdata %s", self.label, repr(logdata))
+
+ def run(self):
+ logger.debug("%s: parsing log data", self.label)
for line in self._logdata:
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+ if m:
+ seq = int(m.group('seq'))
+ uuid = m.group('task_uuid')
+ self.seq_to_uuid[seq] = uuid
+ logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+ continue
+
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
if m:
- task_id = m.group('seq')
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
elapsed = int(m.group('elapsed'))
self.task_stats[task_id]['time'] = {'elapsed': elapsed}
if elapsed > self.stats_max['time']['elapsed']:
self.stats_max['time']['elapsed'] = elapsed
continue
+
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+ if m:
+ uuid = m.group('uuid')
+ if not self._include_child_jobs:
+ logger.warning('%s: omitting %s (try --include-child-job)',
+ self.label, uuid)
+ continue
+ logger.debug('%s: follow %s', self.label, uuid)
+ child_summarizer = JobSummarizer(uuid)
+ child_summarizer.stats_max = self.stats_max
+ child_summarizer.task_stats = self.task_stats
+ child_summarizer.tasks = self.tasks
+ child_summarizer.run()
+ logger.debug('%s: done %s', self.label, uuid)
+ continue
+
m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
+
if self.label is None:
self.label = m.group('job_uuid')
logger.debug('%s: using job uuid as label', self.label)
continue
elif m.group('category') == 'error':
continue
- task_id = m.group('seq')
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
task = self.tasks[task_id]
# Use the first and last crunchstat timestamps as
self.task_stats[task_id][category][stat] = val
if val > self.stats_max[category][stat]:
self.stats_max[category][stat] = val
+ logger.debug('%s: done parsing', self.label)
+
self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
for task_id, task_stat in self.task_stats.iteritems():
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
self.job_tot[category][stat] += val
+ logger.debug('%s: done totals', self.label)
def long_label(self):
label = self.label
tot = self._format(self.job_tot[category].get(stat, '-'))
yield "\t".join([category, stat, str(val), max_rate, tot])
for args in (
+ ('Number of tasks: {}',
+ len(self.tasks),
+ None),
('Max CPU time spent by a single task: {}s',
self.stats_max['cpu']['user+sys'],
None),
class CollectionSummarizer(Summarizer):
- def __init__(self, collection_id):
+ def __init__(self, collection_id, **kwargs):
+ logger.debug('load collection %s', collection_id)
collection = arvados.collection.CollectionReader(collection_id)
filenames = [filename for filename in collection]
if len(filenames) != 1:
"collection {} has {} files; need exactly one".format(
collection_id, len(filenames)))
super(CollectionSummarizer, self).__init__(
- collection.open(filenames[0]))
+ collection.open(filenames[0]), **kwargs)
self.label = collection_id
class JobSummarizer(CollectionSummarizer):
- def __init__(self, job):
+ def __init__(self, job, **kwargs):
arv = arvados.api('v1')
if isinstance(job, str):
self.job = arv.jobs().get(uuid=job).execute()
raise ValueError(
"job {} has no log; live summary not implemented".format(
self.job['uuid']))
- super(JobSummarizer, self).__init__(self.job['log'])
+ super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
self.label = self.job['uuid']
class PipelineSummarizer():
- def __init__(self, pipeline_instance_uuid):
+ def __init__(self, pipeline_instance_uuid, **kwargs):
arv = arvados.api('v1', model=OrderedJsonModel())
instance = arv.pipeline_instances().get(
uuid=pipeline_instance_uuid).execute()
else:
logger.info(
"%s: logdata %s", cname, component['job']['log'])
- summarizer = JobSummarizer(component['job'])
+ summarizer = JobSummarizer(component['job'], **kwargs)
summarizer.label = cname
self.summarizers[cname] = summarizer
self.label = pipeline_instance_uuid