From 68fee3aff2bc0e189827956720de58c3e8668bdc Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 15 Aug 2017 16:26:18 -0400 Subject: [PATCH] 11309: Fix parsing and labels. Add --threads option. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- .../crunchstat_summary/command.py | 4 + .../crunchstat_summary/summarizer.py | 228 +++++++++++------- ...e4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz | Bin 0 -> 622 bytes ...2-mjfb0i5hzojp16a-crunchstat.txt.gz.report | 23 ++ .../crunchstat-summary/tests/test_examples.py | 30 +++ 5 files changed, 202 insertions(+), 83 deletions(-) create mode 100644 tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz create mode 100644 tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py index 046f01c764..a70e4b2d03 100644 --- a/tools/crunchstat-summary/crunchstat_summary/command.py +++ b/tools/crunchstat-summary/crunchstat_summary/command.py @@ -33,6 +33,9 @@ class ArgumentParser(argparse.ArgumentParser): self.add_argument( '--format', type=str, choices=('html', 'text'), default='text', help='Report format') + self.add_argument( + '--threads', type=int, default=8, + help='Maximum worker threads to run') self.add_argument( '--verbose', '-v', action='count', default=0, help='Log more information (once for progress, twice for debug)') @@ -46,6 +49,7 @@ class Command(object): def run(self): kwargs = { 'skip_child_jobs': self.args.skip_child_jobs, + 'threads': self.args.threads, } if self.args.pipeline_instance: self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs) diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py index 60f1f70f60..e8a842d3a9 100644 --- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py +++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py @@ -41,9 +41,10 @@ class Task(object): class Summarizer(object): - def __init__(self, logdata, label=None, skip_child_jobs=False): + def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs): self._logdata = logdata + self.uuid = uuid self.label = label self.starttime = None self.finishtime = None @@ -69,55 +70,55 @@ class Summarizer(object): def run(self): logger.debug("%s: parsing logdata %s", self.label, self._logdata) + detected_crunch1 = False for line in self._logdata: - m = re.search(r'^\S+ \S+ \d+ (?P\d+) job_task (?P\S+)$', line) - if m: - seq = int(m.group('seq')) - uuid = m.group('task_uuid') - self.seq_to_uuid[seq] = uuid - logger.debug('%s: seq %d is task %s', self.label, seq, uuid) - continue + if not detected_crunch1 and '-8i9sb-' in line: + detected_crunch1 = True - m = re.search(r'^\S+ \S+ \d+ (?P\d+) (success in|failure \(#., permanent\) after) (?P\d+) seconds', line) - if m: - task_id = self.seq_to_uuid[int(m.group('seq'))] - elapsed = int(m.group('elapsed')) - self.task_stats[task_id]['time'] = {'elapsed': elapsed} - if elapsed > self.stats_max['time']['elapsed']: - self.stats_max['time']['elapsed'] = elapsed - continue + if detected_crunch1: + m = re.search(r'^\S+ \S+ \d+ (?P\d+) job_task (?P\S+)$', line) + if m: + seq = int(m.group('seq')) + uuid = m.group('task_uuid') + self.seq_to_uuid[seq] = uuid + logger.debug('%s: seq %d is task %s', self.label, seq, uuid) + continue - m = re.search(r'^\S+ \S+ \d+ (?P\d+) stderr Queued job (?P\S+)$', line) - if m: - uuid = m.group('uuid') - if self._skip_child_jobs: - logger.warning('%s: omitting stats from child job %s' - ' because --skip-child-jobs flag is on', - self.label, uuid) + m = re.search(r'^\S+ \S+ \d+ (?P\d+) (success in|failure \(#., permanent\) after) (?P\d+) seconds', line) + if m: + task_id = self.seq_to_uuid[int(m.group('seq'))] + elapsed = int(m.group('elapsed')) + self.task_stats[task_id]['time'] = {'elapsed': elapsed} + if elapsed > self.stats_max['time']['elapsed']: + self.stats_max['time']['elapsed'] = elapsed continue - logger.debug('%s: follow %s', self.label, uuid) - child_summarizer = ProcessSummarizer(uuid) - child_summarizer.stats_max = self.stats_max - child_summarizer.task_stats = self.task_stats - child_summarizer.tasks = self.tasks - child_summarizer.starttime = self.starttime - child_summarizer.run() - logger.debug('%s: done %s', self.label, uuid) - continue - m = re.search(r'^(?P[^\s.]+)(\.\d+)? (?P\S+) \d+ (?P\d+) stderr crunchstat: (?P\S+) (?P.*?)( -- interval (?P.*))?\n$', line) - if m: - # crunch1 job - task_id = self.seq_to_uuid[int(m.group('seq'))] + m = re.search(r'^\S+ \S+ \d+ (?P\d+) stderr Queued job (?P\S+)$', line) + if m: + uuid = m.group('uuid') + if self._skip_child_jobs: + logger.warning('%s: omitting stats from child job %s' + ' because --skip-child-jobs flag is on', + self.label, uuid) + continue + logger.debug('%s: follow %s', self.label, uuid) + child_summarizer = ProcessSummarizer(uuid) + child_summarizer.stats_max = self.stats_max + child_summarizer.task_stats = self.task_stats + child_summarizer.tasks = self.tasks + child_summarizer.starttime = self.starttime + child_summarizer.run() + logger.debug('%s: done %s', self.label, uuid) + continue + + m = re.search(r'^(?P[^\s.]+)(\.\d+)? (?P\S+) \d+ (?P\d+) stderr crunchstat: (?P\S+) (?P.*?)( -- interval (?P.*))?\n$', line) + if not m: + continue else: + # crunch2 m = re.search(r'^(?P\S+) (?P\S+) (?P.*?)( -- interval (?P.*))?\n$', line) - if m: - # crunch2 container (seq/task don't apply) - task_id = 'container' - else: - # not a crunchstat log + if not m: continue - task = self.tasks[task_id] if self.label is None: try: @@ -129,11 +130,17 @@ class Summarizer(object): continue elif m.group('category') in ('error', 'caught'): continue - elif m.group('category') == 'read': + elif m.group('category') in ['read', 'open', 'cgroup', 'CID']: # "stderr crunchstat: read /proc/1234/net/dev: ..." - # (crunchstat formatting fixed, but old logs still say this) + # (old logs are less careful with unprefixed error messages) continue + if detected_crunch1: + task_id = self.seq_to_uuid[int(m.group('seq'))] + else: + task_id = 'container' + task = self.tasks[task_id] + # Use the first and last crunchstat timestamps as # approximations of starttime and finishtime. timestamp = m.group('timestamp') @@ -410,47 +417,47 @@ def NewSummarizer(process, **kwargs): process = None arv = arvados.api('v1', model=OrderedJsonModel()) - if re.search('-dz642-', uuid): + if '-dz642-' in uuid: if process is None: process = arv.containers().get(uuid=uuid).execute() - return ContainerSummarizer(process, **kwargs) - elif re.search('-xvhdp-', uuid): + klass = ContainerTreeSummarizer + elif '-xvhdp-' in uuid: if process is None: - ctrReq = arv.container_requests().get(uuid=uuid).execute() - ctrUUID = ctrReq['container_uuid'] - process = arv.containers().get(uuid=ctrUUID).execute() - return ContainerSummarizer(process, **kwargs) - elif re.search('-8i9sb-', uuid): + process = arv.container_requests().get(uuid=uuid).execute() + klass = ContainerTreeSummarizer + elif '-8i9sb-' in uuid: if process is None: process = arv.jobs().get(uuid=uuid).execute() - return JobSummarizer(process, **kwargs) - elif re.search('-d1hrv-', uuid): + klass = JobSummarizer + elif '-d1hrv-' in uuid: if process is None: process = arv.pipeline_instances().get(uuid=uuid).execute() - return PipelineSummarizer(process, **kwargs) + klass = PipelineSummarizer + elif '-4zz18-' in uuid: + return CollectionSummarizer(collection_id=uuid) else: raise ArgumentError("Unrecognized uuid %s", uuid) + return klass(process, uuid=uuid, **kwargs) class ProcessSummarizer(Summarizer): """Process is a job, pipeline, container, or container request.""" - def __init__(self, process, **kwargs): + def __init__(self, process, label=None, **kwargs): rdr = None self.process = process + if label is None: + label = self.process.get('name', self.process['uuid']) if self.process.get('log'): try: rdr = crunchstat_summary.reader.CollectionReader(self.process['log']) except arvados.errors.NotFoundError as e: logger.warning("Trying event logs after failing to read " "log collection %s: %s", self.process['log'], e) - else: - label = self.process['uuid'] if rdr is None: rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid']) - label = self.process['uuid'] + ' (partial)' - super(ProcessSummarizer, self).__init__(rdr, **kwargs) - self.label = label + label = label + ' (partial)' + super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs) self.existing_constraints = self.process.get('runtime_constraints', {}) @@ -462,26 +469,23 @@ class ContainerSummarizer(ProcessSummarizer): pass -class PipelineSummarizer(object): - def __init__(self, instance, **kwargs): - self.summarizers = collections.OrderedDict() - for cname, component in instance['components'].iteritems(): - if 'job' not in component: - logger.warning( - "%s: skipping component with no job assigned", cname) - else: - logger.info( - "%s: job %s", cname, component['job']['uuid']) - summarizer = JobSummarizer(component['job'], **kwargs) - summarizer.label = '{} {}'.format( - cname, component['job']['uuid']) - self.summarizers[cname] = summarizer - self.label = instance['uuid'] +class MultiSummarizer(object): + def __init__(self, children={}, label=None, threads=1, **kwargs): + self.throttle = threading.Semaphore(threads) + self.children = children + self.label = label + + def run_and_release(self, target, *args, **kwargs): + try: + return target(*args, **kwargs) + finally: + self.throttle.release() def run(self): threads = [] - for summarizer in self.summarizers.itervalues(): - t = threading.Thread(target=summarizer.run) + for child in self.children.itervalues(): + self.throttle.acquire() + t = threading.Thread(target=self.run_and_release, args=(child.run, )) t.daemon = True t.start() threads.append(t) @@ -490,12 +494,70 @@ class PipelineSummarizer(object): def text_report(self): txt = '' - for cname, summarizer in self.summarizers.iteritems(): - txt += '### Summary for {} ({})\n'.format( - cname, summarizer.process['uuid']) - txt += summarizer.text_report() + for cname, child in self.children.iteritems(): + if len(self.children) > 1: + txt += '### Summary for {} ({})\n'.format( + cname, child.process['uuid']) + txt += child.text_report() txt += '\n' return txt def html_report(self): - return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html() + return WEBCHART_CLASS(self.label, self.children.itervalues()).html() + + +class PipelineSummarizer(MultiSummarizer): + def __init__(self, instance, **kwargs): + children = collections.OrderedDict() + for cname, component in instance['components'].iteritems(): + if 'job' not in component: + logger.warning( + "%s: skipping component with no job assigned", cname) + else: + logger.info( + "%s: job %s", cname, component['job']['uuid']) + summarizer = JobSummarizer(component['job'], **kwargs) + summarizer.label = '{} {}'.format( + cname, component['job']['uuid']) + children[cname] = summarizer + super(PipelineSummarizer, self).__init__( + children=children, + label=instance['uuid'], + **kwargs) + + +class ContainerTreeSummarizer(MultiSummarizer): + def __init__(self, root, **kwargs): + arv = arvados.api('v1', model=OrderedJsonModel()) + + label = kwargs.pop('label', None) or root.get('name') or root['uuid'] + root['name'] = label + + children = collections.OrderedDict() + todo = collections.deque((root, )) + while len(todo) > 0: + current = todo.popleft() + label = current['name'] + if current['uuid'].find('-xvhdp-') > 0: + current = arv.containers().get(uuid=current['container_uuid']).execute() + children[current['uuid']] = ContainerSummarizer( + current, label=label, **kwargs) + page_filters = [] + while True: + items = arv.container_requests().index( + order=['uuid asc'], + filters=page_filters+[ + ['requesting_container_uuid', '=', current['uuid']]], + ).execute()['items'] + if not items: + break + page_filters = [['uuid', '>', items[-1]['uuid']]] + for cr in items: + if cr['container_uuid']: + logger.debug('%s: container req %s', current['uuid'], cr['uuid']) + cr['name'] = label + ' / ' + (cr.get('name') or cr['uuid']) + todo.append(cr) + super(ContainerTreeSummarizer, self).__init__( + children=children, + label=root['name'], + **kwargs) diff --git a/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz new file mode 100644 index 0000000000000000000000000000000000000000..8b069e7fb32542d323371af536c4f3471733c1ba GIT binary patch literal 622 zcmV-!0+Ib6iwFoWF_T#U17mM)bYW?3WpZCRbY*2UEo6E&G%_u1YGz_EX*FniZ)$Kc zHeoGea&>NFXmfO7bS`vwbO61S&32+d5QX=7ihcp1{`$}7$jXfeuu06QZ~$lG+fN}G zqZ7@H8#grPP}R5Y*S8@+KZ7tsd?uQ+o0HD~qIVvwTbNxltgAFPT`E@V-HYjmV(3lX zHCvPShd!_Syn5-HeVbS9zNo4p&-=~3+vfZAY6%24Gwz z@3``XwI!7nN_HGzWs8cE4;*aZgay^;v8*xG)>j+~sBQ4>LpE!>rs|*Qx(lsCmU~=m z)3)gj4>K{zW#c^8Js@BJ;IJ@SYQR)Q^_q0;e-*7R?aQ`!tBd_+FgjBZ38w3B;6NZE zxDUs}4|US}76|K+TE5dopB`#L<+5?P=^;25?ZRk{feZ-1Xiqo3qrU6-hQNHcgAmev z7qr~`ST_%Ya8%3I%~Eas)Crg6VMtfp@X?Ls5a}c2#DT$5xXe1zMcN%}+8x}8`*Bki z^bq0%nKC+qtXJv+vYE}ydN=5##m1l>0FJ>7sXZE7Vl(+PqYa50|YqJL!u;GCEI53fnoJ157xeB$LuXhfjUy`yWqygP}Z=VQRE;E49=Hz`@* I6VnF(08ivF(*OVf literal 0 HcmV?d00001 diff --git a/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report new file mode 100644 index 0000000000..daabb32d29 --- /dev/null +++ b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report @@ -0,0 +1,23 @@ +category metric task_max task_max_rate job_total +cpu cpus 20 - - +cpu sys 0.82 0.08 0.82 +cpu user 2.31 0.22 2.31 +cpu user+sys 3.13 0.30 3.13 +mem cache 23846912 - - +mem pgmajfault 121 - 121 +mem rss 65470464 - - +mem swap 0 - - +net:eth0 rx 500762 951.15 500762 +net:eth0 tx 36242 226.61 36242 +net:eth0 tx+rx 537004 1177.76 537004 +# Number of tasks: 1 +# Max CPU time spent by a single task: 3.13s +# Max CPU usage in a single interval: 29.89% +# Overall CPU usage: 0% +# Max memory used by a single task: 0.07GB +# Max network traffic in a single task: 0.00GB +# Max network speed in a single interval: 0.00MB/s +# Keep cache miss rate 0.00% +# Keep cache utilization 0.00% +#!! container max CPU usage was 30% -- try runtime_constraints "min_cores_per_node":1 +#!! container max RSS was 63 MiB -- try runtime_constraints "min_ram_mb_per_node":972 diff --git a/tools/crunchstat-summary/tests/test_examples.py b/tools/crunchstat-summary/tests/test_examples.py index d060becec3..6e8604cc0a 100644 --- a/tools/crunchstat-summary/tests/test_examples.py +++ b/tools/crunchstat-summary/tests/test_examples.py @@ -59,6 +59,36 @@ class SummarizeEdgeCases(unittest.TestCase): s.run() +class SummarizeContainer(ReportDiff): + fake_container = { + 'uuid': '9tee4-dz642-mjfb0i5hzojp16a', + 'log': '9tee4-4zz18-ihyzym9tcwjwg4r', + } + fake_request = { + 'uuid': '9tee4-xvhdp-uper95jktm10d3w', + 'name': 'container', + 'container_uuid': fake_container['uuid'], + } + logfile = os.path.join( + TESTS_DIR, 'container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz') + + @mock.patch('arvados.collection.CollectionReader') + @mock.patch('arvados.api') + def test_container(self, mock_api, mock_cr): + mock_api().container_requests().index().execute.return_value = {'items':[]} + mock_api().container_requests().get().execute.return_value = self.fake_request + mock_api().containers().get().execute.return_value = self.fake_container + mock_cr().__iter__.return_value = [ + 'crunch-run.txt', 'stderr.txt', 'node-info.txt', + 'container.json', 'crunchstat.txt'] + mock_cr().open.return_value = gzip.open(self.logfile) + args = crunchstat_summary.command.ArgumentParser().parse_args( + ['--job', self.fake_request['uuid']]) + cmd = crunchstat_summary.command.Command(args) + cmd.run() + self.diff_known_report(self.logfile, cmd) + + class SummarizeJob(ReportDiff): fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh' fake_log_id = 'fake-log-collection-id' -- 2.39.5