062be3a65a929b91bc818a01de915aafdb439596
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import threading
16 import _strptime
17
18 from crunchstat_summary import logger
19
20 # Recommend memory constraints that are this multiple of an integral
21 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
22 # that have amounts like 7.5 GiB according to the kernel.)
23 AVAILABLE_RAM_RATIO = 0.90
24 MB=2**20
25
26 # Workaround datetime.datetime.strptime() thread-safety bug by calling
27 # it once before starting threads.  https://bugs.python.org/issue7980
28 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
29
30
31 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
32
33
34 class Task(object):
35     def __init__(self):
36         self.starttime = None
37         self.finishtime = None
38         self.series = collections.defaultdict(list)
39
40
41 class Summarizer(object):
42     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
43         self._logdata = logdata
44
45         self.uuid = uuid
46         self.label = label
47         self.starttime = None
48         self.finishtime = None
49         self._skip_child_jobs = skip_child_jobs
50
51         # stats_max: {category: {stat: val}}
52         self.stats_max = collections.defaultdict(
53             functools.partial(collections.defaultdict, lambda: 0))
54         # task_stats: {task_id: {category: {stat: val}}}
55         self.task_stats = collections.defaultdict(
56             functools.partial(collections.defaultdict, dict))
57
58         self.seq_to_uuid = {}
59         self.tasks = collections.defaultdict(Task)
60
61         # We won't bother recommending new runtime constraints if the
62         # constraints given when running the job are known to us and
63         # are already suitable.  If applicable, the subclass
64         # constructor will overwrite this with something useful.
65         self.existing_constraints = {}
66
67         logger.debug("%s: logdata %s", self.label, logdata)
68
69     def run(self):
70         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
71         with self._logdata as logdata:
72             self._run(logdata)
73
74     def _run(self, logdata):
75         self.detected_crunch1 = False
76         for line in logdata:
77             if not self.detected_crunch1 and '-8i9sb-' in line:
78                 self.detected_crunch1 = True
79
80             if self.detected_crunch1:
81                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
82                 if m:
83                     seq = int(m.group('seq'))
84                     uuid = m.group('task_uuid')
85                     self.seq_to_uuid[seq] = uuid
86                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
87                     continue
88
89                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
90                 if m:
91                     task_id = self.seq_to_uuid[int(m.group('seq'))]
92                     elapsed = int(m.group('elapsed'))
93                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
94                     if elapsed > self.stats_max['time']['elapsed']:
95                         self.stats_max['time']['elapsed'] = elapsed
96                     continue
97
98                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
99                 if m:
100                     uuid = m.group('uuid')
101                     if self._skip_child_jobs:
102                         logger.warning('%s: omitting stats from child job %s'
103                                        ' because --skip-child-jobs flag is on',
104                                        self.label, uuid)
105                         continue
106                     logger.debug('%s: follow %s', self.label, uuid)
107                     child_summarizer = NewSummarizer(uuid)
108                     child_summarizer.stats_max = self.stats_max
109                     child_summarizer.task_stats = self.task_stats
110                     child_summarizer.tasks = self.tasks
111                     child_summarizer.starttime = self.starttime
112                     child_summarizer.run()
113                     logger.debug('%s: done %s', self.label, uuid)
114                     continue
115
116                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
117                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
118                 if not m:
119                     continue
120             else:
121                 # crunch2
122                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
123                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
124                 if not m:
125                     continue
126
127             if self.label is None:
128                 try:
129                     self.label = m.group('job_uuid')
130                 except IndexError:
131                     self.label = 'label #1'
132             category = m.group('category')
133             if category.endswith(':'):
134                 # "stderr crunchstat: notice: ..."
135                 continue
136             elif category in ('error', 'caught'):
137                 continue
138             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
139                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
140                 # (old logs are less careful with unprefixed error messages)
141                 continue
142
143             if self.detected_crunch1:
144                 task_id = self.seq_to_uuid[int(m.group('seq'))]
145             else:
146                 task_id = 'container'
147             task = self.tasks[task_id]
148
149             # Use the first and last crunchstat timestamps as
150             # approximations of starttime and finishtime.
151             timestamp = m.group('timestamp')
152             if timestamp[10:11] == '_':
153                 timestamp = datetime.datetime.strptime(
154                     timestamp, '%Y-%m-%d_%H:%M:%S')
155             elif timestamp[10:11] == 'T':
156                 timestamp = datetime.datetime.strptime(
157                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
158             else:
159                 raise ValueError("Cannot parse timestamp {!r}".format(
160                     timestamp))
161
162             if task.starttime is None:
163                 logger.debug('%s: task %s starttime %s',
164                              self.label, task_id, timestamp)
165             if task.starttime is None or timestamp < task.starttime:
166                 task.starttime = timestamp
167             if task.finishtime is None or timestamp > task.finishtime:
168                 task.finishtime = timestamp
169
170             if self.starttime is None or timestamp < self.starttime:
171                 self.starttime = timestamp
172             if self.finishtime is None or timestamp > self.finishtime:
173                 self.finishtime = timestamp
174
175             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
176                 elapsed = (task.finishtime - task.starttime).seconds
177                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
178                 if elapsed > self.stats_max['time']['elapsed']:
179                     self.stats_max['time']['elapsed'] = elapsed
180
181             this_interval_s = None
182             for group in ['current', 'interval']:
183                 if not m.group(group):
184                     continue
185                 category = m.group('category')
186                 words = m.group(group).split(' ')
187                 stats = {}
188                 try:
189                     for val, stat in zip(words[::2], words[1::2]):
190                         if '.' in val:
191                             stats[stat] = float(val)
192                         else:
193                             stats[stat] = int(val)
194                 except ValueError as e:
195                     # If the line doesn't start with 'crunchstat:' we
196                     # might have mistaken an error message for a
197                     # structured crunchstat line.
198                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
199                         logger.warning("%s: log contains message\n  %s", self.label, line)
200                     else:
201                         logger.warning(
202                             '%s: Error parsing value %r (stat %r, category %r): %r',
203                             self.label, val, stat, category, e)
204                         logger.warning('%s', line)
205                     continue
206                 if 'user' in stats or 'sys' in stats:
207                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
208                 if 'tx' in stats or 'rx' in stats:
209                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
210                 if group == 'interval':
211                     if 'seconds' in stats:
212                         this_interval_s = stats.get('seconds',0)
213                         del stats['seconds']
214                         if this_interval_s <= 0:
215                             logger.error(
216                                 "BUG? interval stat given with duration {!r}".
217                                 format(this_interval_s))
218                     else:
219                         logger.error('BUG? interval stat missing duration')
220                 for stat, val in stats.items():
221                     if group == 'interval' and this_interval_s:
222                             stat = stat + '__rate'
223                             val = val / this_interval_s
224                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
225                                 task.series[category, stat].append(
226                                     (timestamp - self.starttime, val))
227                     else:
228                         if stat in ['rss','used','total']:
229                             task.series[category, stat].append(
230                                 (timestamp - self.starttime, val))
231                         self.task_stats[task_id][category][stat] = val
232                     if val > self.stats_max[category][stat]:
233                         self.stats_max[category][stat] = val
234         logger.debug('%s: done parsing', self.label)
235
236         self.job_tot = collections.defaultdict(
237             functools.partial(collections.defaultdict, int))
238         for task_id, task_stat in self.task_stats.items():
239             for category, stat_last in task_stat.items():
240                 for stat, val in stat_last.items():
241                     if stat in ['cpus', 'cache', 'swap', 'rss']:
242                         # meaningless stats like 16 cpu cores x 5 tasks = 80
243                         continue
244                     self.job_tot[category][stat] += val
245         logger.debug('%s: done totals', self.label)
246
247         missing_category = {
248             'cpu': 'CPU',
249             'mem': 'memory',
250             'net:': 'network I/O',
251             'statfs': 'storage space',
252         }
253         for task_stat in self.task_stats.values():
254             for category in task_stat.keys():
255                 for checkcat in missing_category:
256                     if checkcat.endswith(':'):
257                         if category.startswith(checkcat):
258                             missing_category.pop(checkcat)
259                             break
260                     else:
261                         if category == checkcat:
262                             missing_category.pop(checkcat)
263                             break
264         for catlabel in missing_category.values():
265             logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
266                         self.label, catlabel)
267
268     def long_label(self):
269         label = self.label
270         if hasattr(self, 'process') and self.process['uuid'] not in label:
271             label = '{} ({})'.format(label, self.process['uuid'])
272         return label
273
274     def elapsed_time(self):
275         if not self.finishtime:
276             return ""
277         label = ""
278         s = (self.finishtime - self.starttime).total_seconds()
279         if s > 86400:
280             label += '{}d'.format(int(s/86400))
281         if s > 3600:
282             label += '{}h'.format(int(s/3600) % 24)
283         if s > 60:
284             label += '{}m'.format(int(s/60) % 60)
285         label += '{}s'.format(int(s) % 60)
286         return label
287
288     def text_report(self):
289         if not self.tasks:
290             return "(no report generated)\n"
291         return "\n".join(itertools.chain(
292             self._text_report_table_gen(lambda x: "\t".join(x),
293                                   lambda x: "\t".join(x)),
294             self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
295             self._recommend_gen(lambda x: "#!! "+x))) + "\n"
296
297     def html_report(self):
298         return WEBCHART_CLASS(self.label, [self]).html()
299
300     def _text_report_table_gen(self, headerformat, rowformat):
301         yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
302         for category, stat_max in sorted(self.stats_max.items()):
303             for stat, val in sorted(stat_max.items()):
304                 if stat.endswith('__rate'):
305                     continue
306                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
307                 val = self._format(val)
308                 tot = self._format(self.job_tot[category].get(stat, '-'))
309                 yield rowformat([category, stat, str(val), max_rate, tot])
310
311     def _text_report_agg_gen(self, aggformat):
312         by_single_task = ""
313         if len(self.tasks) > 1:
314             by_single_task = " by a single task"
315         metrics = [
316             ('Elapsed time',
317              self.elapsed_time(),
318              None,
319              ''),
320                 ('CPU time spent{}'.format(by_single_task),
321                  self.stats_max['cpu']['user+sys'],
322                  None,
323                  's'),
324                 ('Max CPU usage in a single interval',
325                  self.stats_max['cpu']['user+sys__rate'],
326                  lambda x: x * 100,
327                  '%'),
328                 ('Overall CPU usage',
329                  float(self.job_tot['cpu']['user+sys']) /
330                  self.job_tot['time']['elapsed']
331                  if self.job_tot['time']['elapsed'] > 0 else 0,
332                  lambda x: x * 100,
333                  '%'),
334                 ('Max memory used{}'.format(by_single_task),
335                  self.stats_max['mem']['rss'],
336                  lambda x: x / 1e9,
337                  'GB'),
338                 ('Max network traffic{}'.format(by_single_task),
339                  self.stats_max['net:eth0']['tx+rx'] +
340                  self.stats_max['net:keep0']['tx+rx'],
341                  lambda x: x / 1e9,
342                  'GB'),
343                 ('Max network speed in a single interval',
344                  self.stats_max['net:eth0']['tx+rx__rate'] +
345                  self.stats_max['net:keep0']['tx+rx__rate'],
346                  lambda x: x / 1e6,
347                  'MB/s'),
348                 ('Keep cache miss rate',
349                  (float(self.job_tot['keepcache']['miss']) /
350                  float(self.job_tot['keepcalls']['get']))
351                  if self.job_tot['keepcalls']['get'] > 0 else 0,
352                  lambda x: x * 100.0,
353                  '%'),
354                 ('Keep cache utilization',
355                  (float(self.job_tot['blkio:0:0']['read']) /
356                  float(self.job_tot['net:keep0']['rx']))
357                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
358                  lambda x: x * 100.0,
359                  '%'),
360                ('Temp disk utilization',
361                  (float(self.job_tot['statfs']['used']) /
362                  float(self.job_tot['statfs']['total']))
363                  if self.job_tot['statfs']['total'] > 0 else 0,
364                  lambda x: x * 100.0,
365                 '%'),
366         ]
367
368         if len(self.tasks) > 1:
369             metrics.insert(0, ('Number of tasks',
370                  len(self.tasks),
371                  None,
372                  ''))
373         for args in metrics:
374             format_string, val, transform, suffix = args
375             if val == float('-Inf'):
376                 continue
377             if transform:
378                 val = transform(val)
379             yield aggformat((format_string, self._format(val), suffix))
380
381     def _recommend_gen(self, recommendformat):
382         # TODO recommend fixing job granularity if elapsed time is too short
383         return itertools.chain(
384             self._recommend_cpu(recommendformat),
385             self._recommend_ram(recommendformat),
386             self._recommend_keep_cache(recommendformat),
387             self._recommend_temp_disk(recommendformat),
388             )
389
390     def _recommend_cpu(self, recommendformat):
391         """Recommend asking for 4 cores if max CPU usage was 333%"""
392
393         constraint_key = self._map_runtime_constraint('vcpus')
394         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
395         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
396             logger.warning('%s: no CPU usage data', self.label)
397             return
398         # TODO Don't necessarily want to recommend on isolated max peak
399         # take average CPU usage into account as well or % time at max
400         used_cores = max(1, int(math.ceil(cpu_max_rate)))
401         asked_cores = self.existing_constraints.get(constraint_key)
402         if asked_cores is None:
403             asked_cores = 1
404         # TODO: This should be more nuanced in cases where max >> avg
405         if used_cores < asked_cores:
406             yield recommendformat(
407                 '{} max CPU usage was {}% -- '
408                 'try reducing runtime_constraints to "{}":{}'
409             ).format(
410                 self.label,
411                 math.ceil(cpu_max_rate*100),
412                 constraint_key,
413                 int(used_cores))
414
415     # FIXME: This needs to be updated to account for current a-d-c algorithms
416     def _recommend_ram(self, recommendformat):
417         """Recommend an economical RAM constraint for this job.
418
419         Nodes that are advertised as "8 gibibytes" actually have what
420         we might call "8 nearlygibs" of memory available for jobs.
421         Here, we calculate a whole number of nearlygibs that would
422         have sufficed to run the job, then recommend requesting a node
423         with that number of nearlygibs (expressed as mebibytes).
424
425         Requesting a node with "nearly 8 gibibytes" is our best hope
426         of getting a node that actually has nearly 8 gibibytes
427         available.  If the node manager is smart enough to account for
428         the discrepancy itself when choosing/creating a node, we'll
429         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
430         advertised size of the next-size-smaller node (say, 6 GiB)
431         will be too low to satisfy our request, so we will effectively
432         get rounded up to 8 GiB.
433
434         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
435         we will generally get a node that is advertised as "8 GiB" and
436         has at least 7500 MiB available.  However, asking for 8192 MiB
437         would either result in an unnecessarily expensive 12 GiB node
438         (if node manager knows about the discrepancy), or an 8 GiB
439         node which has less than 8192 MiB available and is therefore
440         considered by crunch-dispatch to be too small to meet our
441         constraint.
442
443         When node manager learns how to predict the available memory
444         for each node type such that crunch-dispatch always agrees
445         that a node is big enough to run the job it was brought up
446         for, all this will be unnecessary.  We'll just ask for exactly
447         the memory we want -- even if that happens to be 8192 MiB.
448         """
449
450         constraint_key = self._map_runtime_constraint('ram')
451         used_bytes = self.stats_max['mem']['rss']
452         if used_bytes == float('-Inf'):
453             logger.warning('%s: no memory usage data', self.label)
454             return
455         used_mib = math.ceil(float(used_bytes) / MB)
456         asked_mib = self.existing_constraints.get(constraint_key) / MB
457
458         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
459         ratio = 0.5
460         recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
461         if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
462             yield recommendformat(
463                 '{} requested {} MiB of RAM but actual RAM usage was below {}% at {} MiB -- '
464                 'suggest reducing RAM request to {} MiB'
465             ).format(
466                 self.label,
467                 int(asked_mib),
468                 int(100*ratio),
469                 int(used_mib),
470                 recommend_mib)
471
472     def _recommend_keep_cache(self, recommendformat):
473         """Recommend increasing keep cache if utilization < 80%"""
474         constraint_key = self._map_runtime_constraint('keep_cache_ram')
475         if self.job_tot['net:keep0']['rx'] == 0:
476             return
477         utilization = (float(self.job_tot['blkio:0:0']['read']) /
478                        float(self.job_tot['net:keep0']['rx']))
479         # FIXME: the default on this get won't work correctly
480         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
481
482         if utilization < 0.8:
483             yield recommendformat(
484                 '{} Keep cache utilization was {:.2f}% -- '
485                 'try doubling runtime_constraints to "{}":{} (or more)'
486             ).format(
487                 self.label,
488                 utilization * 100.0,
489                 constraint_key,
490                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
491
492
493     def _recommend_temp_disk(self, recommendformat):
494         """Recommend decreasing temp disk if utilization < 50%"""
495         total = float(self.job_tot['statfs']['total'])
496         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
497
498         if utilization < 50.8 and total > 0:
499             yield recommendformat(
500                 '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
501                 'consider reducing "tmpdirMin" and/or "outdirMin"'
502             ).format(
503                 self.label,
504                 utilization * 100.0,
505                 total / MB)
506
507
508     def _format(self, val):
509         """Return a string representation of a stat.
510
511         {:.2f} for floats, default format for everything else."""
512         if isinstance(val, float):
513             return '{:.2f}'.format(val)
514         else:
515             return '{}'.format(val)
516
517     def _runtime_constraint_mem_unit(self):
518         if hasattr(self, 'runtime_constraint_mem_unit'):
519             return self.runtime_constraint_mem_unit
520         elif self.detected_crunch1:
521             return JobSummarizer.runtime_constraint_mem_unit
522         else:
523             return ContainerRequestSummarizer.runtime_constraint_mem_unit
524
525     def _map_runtime_constraint(self, key):
526         if hasattr(self, 'map_runtime_constraint'):
527             return self.map_runtime_constraint[key]
528         elif self.detected_crunch1:
529             return JobSummarizer.map_runtime_constraint[key]
530         else:
531             return key
532
533
534 class CollectionSummarizer(Summarizer):
535     def __init__(self, collection_id, **kwargs):
536         super(CollectionSummarizer, self).__init__(
537             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
538         self.label = collection_id
539
540
541 def NewSummarizer(process_or_uuid, **kwargs):
542     """Construct with the appropriate subclass for this uuid/object."""
543
544     if isinstance(process_or_uuid, dict):
545         process = process_or_uuid
546         uuid = process['uuid']
547     else:
548         uuid = process_or_uuid
549         process = None
550         arv = kwargs.get("arv") or arvados.api('v1')
551
552     if '-dz642-' in uuid:
553         if process is None:
554             # Get the associated CR. Doesn't matter which since they all have the same logs
555             crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
556             if len(crs) > 0:
557                 process = crs[0]
558         klass = ContainerRequestTreeSummarizer
559     elif '-xvhdp-' in uuid:
560         if process is None:
561             process = arv.container_requests().get(uuid=uuid).execute()
562         klass = ContainerRequestTreeSummarizer
563     elif '-8i9sb-' in uuid:
564         if process is None:
565             process = arv.jobs().get(uuid=uuid).execute()
566         klass = JobTreeSummarizer
567     elif '-d1hrv-' in uuid:
568         if process is None:
569             process = arv.pipeline_instances().get(uuid=uuid).execute()
570         klass = PipelineSummarizer
571     elif '-4zz18-' in uuid:
572         return CollectionSummarizer(collection_id=uuid)
573     else:
574         raise ArgumentError("Unrecognized uuid %s", uuid)
575     return klass(process, uuid=uuid, **kwargs)
576
577
578 class ProcessSummarizer(Summarizer):
579     """Process is a job, pipeline, or container request."""
580
581     def __init__(self, process, label=None, **kwargs):
582         rdr = None
583         self.process = process
584         if label is None:
585             label = self.process.get('name', self.process['uuid'])
586         # Pre-Arvados v1.4 everything is in 'log'
587         # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
588         log_collection = self.process.get('log', self.process.get('log_uuid'))
589         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
590             try:
591                 rdr = crunchstat_summary.reader.CollectionReader(log_collection)
592             except arvados.errors.NotFoundError as e:
593                 logger.warning("Trying event logs after failing to read "
594                                "log collection %s: %s", self.process['log'], e)
595         if rdr is None:
596             uuid = self.process.get('container_uuid', self.process.get('uuid'))
597             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
598             label = label + ' (partial)'
599         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
600         self.existing_constraints = self.process.get('runtime_constraints', {})
601
602
603 class JobSummarizer(ProcessSummarizer):
604     runtime_constraint_mem_unit = MB
605     map_runtime_constraint = {
606         'keep_cache_ram': 'keep_cache_mb_per_task',
607         'ram': 'min_ram_mb_per_node',
608         'vcpus': 'min_cores_per_node',
609     }
610
611
612 class ContainerRequestSummarizer(ProcessSummarizer):
613     runtime_constraint_mem_unit = 1
614
615
616 class MultiSummarizer(object):
617     def __init__(self, children={}, label=None, threads=1, **kwargs):
618         self.throttle = threading.Semaphore(threads)
619         self.children = children
620         self.label = label
621
622     def run_and_release(self, target, *args, **kwargs):
623         try:
624             return target(*args, **kwargs)
625         finally:
626             self.throttle.release()
627
628     def run(self):
629         threads = []
630         for child in self.children.values():
631             self.throttle.acquire()
632             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
633             t.daemon = True
634             t.start()
635             threads.append(t)
636         for t in threads:
637             t.join()
638
639     def text_report(self):
640         txt = ''
641         d = self._descendants()
642         for child in d.values():
643             if len(d) > 1:
644                 txt += '### Summary for {} ({})\n'.format(
645                     child.label, child.process['uuid'])
646             txt += child.text_report()
647             txt += '\n'
648         return txt
649
650     def _descendants(self):
651         """Dict of self and all descendants.
652
653         Nodes with nothing of their own to report (like
654         MultiSummarizers) are omitted.
655         """
656         d = collections.OrderedDict()
657         for key, child in self.children.items():
658             if isinstance(child, Summarizer):
659                 d[key] = child
660             if isinstance(child, MultiSummarizer):
661                 d.update(child._descendants())
662         return d
663
664     def html_report(self):
665         tophtml = ""
666         bottomhtml = ""
667         label = self.label
668         if len(self._descendants()) == 1:
669             summarizer = next(iter(self._descendants().values()))
670             tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
671                 "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
672                 "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
673
674             bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
675                 "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
676                                                             lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
677             label = summarizer.long_label()
678
679         return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
680
681
682 class JobTreeSummarizer(MultiSummarizer):
683     """Summarizes a job and all children listed in its components field."""
684     def __init__(self, job, label=None, **kwargs):
685         arv = kwargs.get("arv") or arvados.api('v1')
686         label = label or job.get('name', job['uuid'])
687         children = collections.OrderedDict()
688         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
689         if job.get('components', None):
690             preloaded = {}
691             for j in arv.jobs().index(
692                     limit=len(job['components']),
693                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
694                 preloaded[j['uuid']] = j
695             for cname in sorted(job['components'].keys()):
696                 child_uuid = job['components'][cname]
697                 j = (preloaded.get(child_uuid) or
698                      arv.jobs().get(uuid=child_uuid).execute())
699                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
700
701         super(JobTreeSummarizer, self).__init__(
702             children=children,
703             label=label,
704             **kwargs)
705
706
707 class PipelineSummarizer(MultiSummarizer):
708     def __init__(self, instance, **kwargs):
709         children = collections.OrderedDict()
710         for cname, component in instance['components'].items():
711             if 'job' not in component:
712                 logger.warning(
713                     "%s: skipping component with no job assigned", cname)
714             else:
715                 logger.info(
716                     "%s: job %s", cname, component['job']['uuid'])
717                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
718                 summarizer.label = '{} {}'.format(
719                     cname, component['job']['uuid'])
720                 children[cname] = summarizer
721         super(PipelineSummarizer, self).__init__(
722             children=children,
723             label=instance['uuid'],
724             **kwargs)
725
726
727 class ContainerRequestTreeSummarizer(MultiSummarizer):
728     def __init__(self, root, skip_child_jobs=False, **kwargs):
729         arv = kwargs.get("arv") or arvados.api('v1')
730
731         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
732         root['name'] = label
733
734         children = collections.OrderedDict()
735         todo = collections.deque((root, ))
736         while len(todo) > 0:
737             current = todo.popleft()
738             label = current['name']
739             sort_key = current['created_at']
740
741             summer = ContainerRequestSummarizer(current, label=label, **kwargs)
742             summer.sort_key = sort_key
743             children[current['uuid']] = summer
744
745             page_filters = []
746             while True:
747                 child_crs = arv.container_requests().index(
748                     order=['uuid asc'],
749                     filters=page_filters+[
750                         ['requesting_container_uuid', '=', current['container_uuid']]],
751                 ).execute()
752                 if not child_crs['items']:
753                     break
754                 elif skip_child_jobs:
755                     logger.warning('%s: omitting stats from %d child containers'
756                                    ' because --skip-child-jobs flag is on',
757                                    label, child_crs['items_available'])
758                     break
759                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
760                 for cr in child_crs['items']:
761                     if cr['container_uuid']:
762                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
763                         cr['name'] = cr.get('name') or cr['uuid']
764                         todo.append(cr)
765         sorted_children = collections.OrderedDict()
766         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
767             sorted_children[uuid] = children[uuid]
768         super(ContainerRequestTreeSummarizer, self).__init__(
769             children=sorted_children,
770             label=root['name'],
771             **kwargs)