class Summarizer(object):
- def __init__(self, logdata, label=None, skip_child_jobs=False):
+ def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
self._logdata = logdata
+ self.uuid = uuid
self.label = label
self.starttime = None
self.finishtime = None
def run(self):
logger.debug("%s: parsing logdata %s", self.label, self._logdata)
+ detected_crunch1 = False
for line in self._logdata:
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
- if m:
- seq = int(m.group('seq'))
- uuid = m.group('task_uuid')
- self.seq_to_uuid[seq] = uuid
- logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
- continue
+ if not detected_crunch1 and '-8i9sb-' in line:
+ detected_crunch1 = True
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
- if m:
- task_id = self.seq_to_uuid[int(m.group('seq'))]
- elapsed = int(m.group('elapsed'))
- self.task_stats[task_id]['time'] = {'elapsed': elapsed}
- if elapsed > self.stats_max['time']['elapsed']:
- self.stats_max['time']['elapsed'] = elapsed
- continue
+ if detected_crunch1:
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+ if m:
+ seq = int(m.group('seq'))
+ uuid = m.group('task_uuid')
+ self.seq_to_uuid[seq] = uuid
+ logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+ continue
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
- if m:
- uuid = m.group('uuid')
- if self._skip_child_jobs:
- logger.warning('%s: omitting stats from child job %s'
- ' because --skip-child-jobs flag is on',
- self.label, uuid)
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
+ if m:
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
+ elapsed = int(m.group('elapsed'))
+ self.task_stats[task_id]['time'] = {'elapsed': elapsed}
+ if elapsed > self.stats_max['time']['elapsed']:
+ self.stats_max['time']['elapsed'] = elapsed
continue
- logger.debug('%s: follow %s', self.label, uuid)
- child_summarizer = ProcessSummarizer(uuid)
- child_summarizer.stats_max = self.stats_max
- child_summarizer.task_stats = self.task_stats
- child_summarizer.tasks = self.tasks
- child_summarizer.starttime = self.starttime
- child_summarizer.run()
- logger.debug('%s: done %s', self.label, uuid)
- continue
- m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
- if m:
- # crunch1 job
- task_id = self.seq_to_uuid[int(m.group('seq'))]
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+ if m:
+ uuid = m.group('uuid')
+ if self._skip_child_jobs:
+ logger.warning('%s: omitting stats from child job %s'
+ ' because --skip-child-jobs flag is on',
+ self.label, uuid)
+ continue
+ logger.debug('%s: follow %s', self.label, uuid)
+ child_summarizer = ProcessSummarizer(uuid)
+ child_summarizer.stats_max = self.stats_max
+ child_summarizer.task_stats = self.task_stats
+ child_summarizer.tasks = self.tasks
+ child_summarizer.starttime = self.starttime
+ child_summarizer.run()
+ logger.debug('%s: done %s', self.label, uuid)
+ continue
+
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+ if not m:
+ continue
else:
+ # crunch2
m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
- if m:
- # crunch2 container (seq/task don't apply)
- task_id = 'container'
- else:
- # not a crunchstat log
+ if not m:
continue
- task = self.tasks[task_id]
if self.label is None:
try:
continue
elif m.group('category') in ('error', 'caught'):
continue
- elif m.group('category') == 'read':
+ elif m.group('category') in ['read', 'open', 'cgroup', 'CID']:
# "stderr crunchstat: read /proc/1234/net/dev: ..."
- # (crunchstat formatting fixed, but old logs still say this)
+ # (old logs are less careful with unprefixed error messages)
continue
+ if detected_crunch1:
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
+ else:
+ task_id = 'container'
+ task = self.tasks[task_id]
+
# Use the first and last crunchstat timestamps as
# approximations of starttime and finishtime.
timestamp = m.group('timestamp')
process = None
arv = arvados.api('v1', model=OrderedJsonModel())
- if re.search('-dz642-', uuid):
+ if '-dz642-' in uuid:
if process is None:
process = arv.containers().get(uuid=uuid).execute()
- return ContainerSummarizer(process, **kwargs)
- elif re.search('-xvhdp-', uuid):
+ klass = ContainerTreeSummarizer
+ elif '-xvhdp-' in uuid:
if process is None:
- ctrReq = arv.container_requests().get(uuid=uuid).execute()
- ctrUUID = ctrReq['container_uuid']
- process = arv.containers().get(uuid=ctrUUID).execute()
- return ContainerSummarizer(process, **kwargs)
- elif re.search('-8i9sb-', uuid):
+ process = arv.container_requests().get(uuid=uuid).execute()
+ klass = ContainerTreeSummarizer
+ elif '-8i9sb-' in uuid:
if process is None:
process = arv.jobs().get(uuid=uuid).execute()
- return JobSummarizer(process, **kwargs)
- elif re.search('-d1hrv-', uuid):
+ klass = JobSummarizer
+ elif '-d1hrv-' in uuid:
if process is None:
process = arv.pipeline_instances().get(uuid=uuid).execute()
- return PipelineSummarizer(process, **kwargs)
+ klass = PipelineSummarizer
+ elif '-4zz18-' in uuid:
+ return CollectionSummarizer(collection_id=uuid)
else:
raise ArgumentError("Unrecognized uuid %s", uuid)
+ return klass(process, uuid=uuid, **kwargs)
class ProcessSummarizer(Summarizer):
"""Process is a job, pipeline, container, or container request."""
- def __init__(self, process, **kwargs):
+ def __init__(self, process, label=None, **kwargs):
rdr = None
self.process = process
+ if label is None:
+ label = self.process.get('name', self.process['uuid'])
if self.process.get('log'):
try:
rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
except arvados.errors.NotFoundError as e:
logger.warning("Trying event logs after failing to read "
"log collection %s: %s", self.process['log'], e)
- else:
- label = self.process['uuid']
if rdr is None:
rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
- label = self.process['uuid'] + ' (partial)'
- super(ProcessSummarizer, self).__init__(rdr, **kwargs)
- self.label = label
+ label = label + ' (partial)'
+ super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
self.existing_constraints = self.process.get('runtime_constraints', {})
pass
-class PipelineSummarizer(object):
- def __init__(self, instance, **kwargs):
- self.summarizers = collections.OrderedDict()
- for cname, component in instance['components'].iteritems():
- if 'job' not in component:
- logger.warning(
- "%s: skipping component with no job assigned", cname)
- else:
- logger.info(
- "%s: job %s", cname, component['job']['uuid'])
- summarizer = JobSummarizer(component['job'], **kwargs)
- summarizer.label = '{} {}'.format(
- cname, component['job']['uuid'])
- self.summarizers[cname] = summarizer
- self.label = instance['uuid']
+class MultiSummarizer(object):
+ def __init__(self, children={}, label=None, threads=1, **kwargs):
+ self.throttle = threading.Semaphore(threads)
+ self.children = children
+ self.label = label
+
+ def run_and_release(self, target, *args, **kwargs):
+ try:
+ return target(*args, **kwargs)
+ finally:
+ self.throttle.release()
def run(self):
threads = []
- for summarizer in self.summarizers.itervalues():
- t = threading.Thread(target=summarizer.run)
+ for child in self.children.itervalues():
+ self.throttle.acquire()
+ t = threading.Thread(target=self.run_and_release, args=(child.run, ))
t.daemon = True
t.start()
threads.append(t)
def text_report(self):
txt = ''
- for cname, summarizer in self.summarizers.iteritems():
- txt += '### Summary for {} ({})\n'.format(
- cname, summarizer.process['uuid'])
- txt += summarizer.text_report()
+ for cname, child in self.children.iteritems():
+ if len(self.children) > 1:
+ txt += '### Summary for {} ({})\n'.format(
+ cname, child.process['uuid'])
+ txt += child.text_report()
txt += '\n'
return txt
def html_report(self):
- return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()
+ return WEBCHART_CLASS(self.label, self.children.itervalues()).html()
+
+
+class PipelineSummarizer(MultiSummarizer):
+ def __init__(self, instance, **kwargs):
+ children = collections.OrderedDict()
+ for cname, component in instance['components'].iteritems():
+ if 'job' not in component:
+ logger.warning(
+ "%s: skipping component with no job assigned", cname)
+ else:
+ logger.info(
+ "%s: job %s", cname, component['job']['uuid'])
+ summarizer = JobSummarizer(component['job'], **kwargs)
+ summarizer.label = '{} {}'.format(
+ cname, component['job']['uuid'])
+ children[cname] = summarizer
+ super(PipelineSummarizer, self).__init__(
+ children=children,
+ label=instance['uuid'],
+ **kwargs)
+
+
+class ContainerTreeSummarizer(MultiSummarizer):
+ def __init__(self, root, **kwargs):
+ arv = arvados.api('v1', model=OrderedJsonModel())
+
+ label = kwargs.pop('label', None) or root.get('name') or root['uuid']
+ root['name'] = label
+
+ children = collections.OrderedDict()
+ todo = collections.deque((root, ))
+ while len(todo) > 0:
+ current = todo.popleft()
+ label = current['name']
+ if current['uuid'].find('-xvhdp-') > 0:
+ current = arv.containers().get(uuid=current['container_uuid']).execute()
+ children[current['uuid']] = ContainerSummarizer(
+ current, label=label, **kwargs)
+ page_filters = []
+ while True:
+ items = arv.container_requests().index(
+ order=['uuid asc'],
+ filters=page_filters+[
+ ['requesting_container_uuid', '=', current['uuid']]],
+ ).execute()['items']
+ if not items:
+ break
+ page_filters = [['uuid', '>', items[-1]['uuid']]]
+ for cr in items:
+ if cr['container_uuid']:
+ logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
+ cr['name'] = label + ' / ' + (cr.get('name') or cr['uuid'])
+ todo.append(cr)
+ super(ContainerTreeSummarizer, self).__init__(
+ children=children,
+ label=root['name'],
+ **kwargs)