logger.debug("%s: logdata %s", self.label, logdata)
+ def run_child(self, uuid):
+ if self._skip_child_jobs:
+ logger.warning('%s: omitting stats from child job %s'
+ ' because --skip-child-jobs flag is on',
+ self.label, uuid)
+ return
+ logger.debug('%s: follow %s', self.label, uuid)
+ child_summarizer = JobSummarizer(uuid)
+ child_summarizer.stats_max = self.stats_max
+ child_summarizer.task_stats = self.task_stats
+ child_summarizer.tasks = self.tasks
+ child_summarizer.starttime = self.starttime
+ child_summarizer.run()
+ logger.debug('%s: done %s', self.label, uuid)
+
def run(self):
logger.debug("%s: parsing logdata %s", self.label, self._logdata)
for line in self._logdata:
self.stats_max['time']['elapsed'] = elapsed
continue
+ # Old style job logs only - newer style uses job['components']
+ uuid = None
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
if m:
- uuid = m.group('uuid')
- if self._skip_child_jobs:
- logger.warning('%s: omitting stats from child job %s'
- ' because --skip-child-jobs flag is on',
- self.label, uuid)
- continue
- logger.debug('%s: follow %s', self.label, uuid)
- child_summarizer = JobSummarizer(uuid)
- child_summarizer.stats_max = self.stats_max
- child_summarizer.task_stats = self.task_stats
- child_summarizer.tasks = self.tasks
- child_summarizer.starttime = self.starttime
- child_summarizer.run()
- logger.debug('%s: done %s', self.label, uuid)
+ self.run_child(m.group('uuid'))
continue
m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
self.stats_max[category][stat] = val
except Exception as e:
logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
- logger.debug('%s: done parsing', self.label)
+ logger.debug('%s: done parsing log', self.label)
+
+ # Enabling this will roll up stats for all subjobs into the parent job
+ if False and 'components' in self.job:
+ for cname, component in self.job['components'].iteritems():
+ self.run_child(component)
self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
class PipelineSummarizer(object):
def __init__(self, pipeline_instance_uuid, **kwargs):
- arv = arvados.api('v1', model=OrderedJsonModel())
- instance = arv.pipeline_instances().get(
+ self.arv = arvados.api('v1', model=OrderedJsonModel())
+ instance = self.arv.pipeline_instances().get(
uuid=pipeline_instance_uuid).execute()
self.summarizers = collections.OrderedDict()
for cname, component in instance['components'].iteritems():
logger.warning(
"%s: skipping component with no job assigned", cname)
else:
- logger.info(
- "%s: job %s", cname, component['job']['uuid'])
- summarizer = JobSummarizer(component['job'], **kwargs)
- summarizer.label = '{} {}'.format(
- cname, component['job']['uuid'])
- self.summarizers[cname] = summarizer
+ self.summarize_job(cname, component['job'], **kwargs)
self.label = pipeline_instance_uuid
+ def summarize_job(self, cname, job, **kwargs):
+ uuid = job['uuid']
+ logger.info("%s: job %s", cname, uuid)
+ summarizer = JobSummarizer(job, **kwargs)
+ summarizer.label = '{} {}'.format(cname, uuid)
+ self.summarizers[cname] = summarizer
+ if 'components' in job:
+ for cname, uuid in job['components'].iteritems():
+ subjob = self.arv.jobs().get(uuid=uuid).execute()
+ self.summarize_job(cname, subjob, **kwargs)
+
def run(self):
threads = []
for summarizer in self.summarizers.itervalues():