21204: Merge branch '21204-stable-log-sort' from arvados-workbench2.git
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import threading
16 import _strptime
17
18 from crunchstat_summary import logger
19
20 # Recommend memory constraints that are this multiple of an integral
21 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
22 # that have amounts like 7.5 GiB according to the kernel.)
23 AVAILABLE_RAM_RATIO = 0.95
24 MB=2**20
25
26 # Workaround datetime.datetime.strptime() thread-safety bug by calling
27 # it once before starting threads.  https://bugs.python.org/issue7980
28 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
29
30
31 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
32
33
34 class Task(object):
35     def __init__(self):
36         self.starttime = None
37         self.finishtime = None
38         self.series = collections.defaultdict(list)
39
40
41 class Summarizer(object):
42     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
43         self._logdata = logdata
44
45         self.uuid = uuid
46         self.label = label
47         self.starttime = None
48         self.finishtime = None
49         self._skip_child_jobs = skip_child_jobs
50
51         # stats_max: {category: {stat: val}}
52         self.stats_max = collections.defaultdict(
53             functools.partial(collections.defaultdict, lambda: 0))
54         # task_stats: {task_id: {category: {stat: val}}}
55         self.task_stats = collections.defaultdict(
56             functools.partial(collections.defaultdict, dict))
57
58         self.seq_to_uuid = {}
59         self.tasks = collections.defaultdict(Task)
60
61         # We won't bother recommending new runtime constraints if the
62         # constraints given when running the job are known to us and
63         # are already suitable.  If applicable, the subclass
64         # constructor will overwrite this with something useful.
65         self.existing_constraints = {}
66
67         logger.debug("%s: logdata %s", self.label, logdata)
68
69     def run(self):
70         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
71         with self._logdata as logdata:
72             self._run(logdata)
73
74     def _run(self, logdata):
75         self.detected_crunch1 = False
76         for line in logdata:
77             if not self.detected_crunch1 and '-8i9sb-' in line:
78                 self.detected_crunch1 = True
79
80             if self.detected_crunch1:
81                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
82                 if m:
83                     seq = int(m.group('seq'))
84                     uuid = m.group('task_uuid')
85                     self.seq_to_uuid[seq] = uuid
86                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
87                     continue
88
89                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
90                 if m:
91                     task_id = self.seq_to_uuid[int(m.group('seq'))]
92                     elapsed = int(m.group('elapsed'))
93                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
94                     if elapsed > self.stats_max['time']['elapsed']:
95                         self.stats_max['time']['elapsed'] = elapsed
96                     continue
97
98                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
99                 if m:
100                     uuid = m.group('uuid')
101                     if self._skip_child_jobs:
102                         logger.warning('%s: omitting stats from child job %s'
103                                        ' because --skip-child-jobs flag is on',
104                                        self.label, uuid)
105                         continue
106                     logger.debug('%s: follow %s', self.label, uuid)
107                     child_summarizer = NewSummarizer(uuid)
108                     child_summarizer.stats_max = self.stats_max
109                     child_summarizer.task_stats = self.task_stats
110                     child_summarizer.tasks = self.tasks
111                     child_summarizer.starttime = self.starttime
112                     child_summarizer.run()
113                     logger.debug('%s: done %s', self.label, uuid)
114                     continue
115
116                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
117                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
118                 if not m:
119                     continue
120             else:
121                 # crunch2
122                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
123                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
124                 if not m:
125                     continue
126
127             if self.label is None:
128                 try:
129                     self.label = m.group('job_uuid')
130                 except IndexError:
131                     self.label = 'label #1'
132             category = m.group('category')
133             if category.endswith(':'):
134                 # "stderr crunchstat: notice: ..."
135                 continue
136             elif category in ('error', 'caught'):
137                 continue
138             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
139                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
140                 # (old logs are less careful with unprefixed error messages)
141                 continue
142
143             if self.detected_crunch1:
144                 task_id = self.seq_to_uuid[int(m.group('seq'))]
145             else:
146                 task_id = 'container'
147             task = self.tasks[task_id]
148
149             # Use the first and last crunchstat timestamps as
150             # approximations of starttime and finishtime.
151             timestamp = m.group('timestamp')
152             if timestamp[10:11] == '_':
153                 timestamp = datetime.datetime.strptime(
154                     timestamp, '%Y-%m-%d_%H:%M:%S')
155             elif timestamp[10:11] == 'T':
156                 timestamp = datetime.datetime.strptime(
157                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
158             else:
159                 raise ValueError("Cannot parse timestamp {!r}".format(
160                     timestamp))
161
162             if task.starttime is None:
163                 logger.debug('%s: task %s starttime %s',
164                              self.label, task_id, timestamp)
165             if task.starttime is None or timestamp < task.starttime:
166                 task.starttime = timestamp
167             if task.finishtime is None or timestamp > task.finishtime:
168                 task.finishtime = timestamp
169
170             if self.starttime is None or timestamp < self.starttime:
171                 self.starttime = timestamp
172             if self.finishtime is None or timestamp > self.finishtime:
173                 self.finishtime = timestamp
174
175             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
176                 elapsed = (task.finishtime - task.starttime).seconds
177                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
178                 if elapsed > self.stats_max['time']['elapsed']:
179                     self.stats_max['time']['elapsed'] = elapsed
180
181             this_interval_s = None
182             for group in ['current', 'interval']:
183                 if not m.group(group):
184                     continue
185                 category = m.group('category')
186                 words = m.group(group).split(' ')
187                 stats = {}
188                 try:
189                     for val, stat in zip(words[::2], words[1::2]):
190                         if '.' in val:
191                             stats[stat] = float(val)
192                         else:
193                             stats[stat] = int(val)
194                 except ValueError as e:
195                     # If the line doesn't start with 'crunchstat:' we
196                     # might have mistaken an error message for a
197                     # structured crunchstat line.
198                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
199                         logger.warning("%s: log contains message\n  %s", self.label, line)
200                     else:
201                         logger.warning(
202                             '%s: Error parsing value %r (stat %r, category %r): %r',
203                             self.label, val, stat, category, e)
204                         logger.warning('%s', line)
205                     continue
206                 if 'user' in stats or 'sys' in stats:
207                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
208                 if 'tx' in stats or 'rx' in stats:
209                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
210                 if group == 'interval':
211                     if 'seconds' in stats:
212                         this_interval_s = stats.get('seconds',0)
213                         del stats['seconds']
214                         if this_interval_s <= 0:
215                             logger.error(
216                                 "BUG? interval stat given with duration {!r}".
217                                 format(this_interval_s))
218                     else:
219                         logger.error('BUG? interval stat missing duration')
220                 for stat, val in stats.items():
221                     if group == 'interval' and this_interval_s:
222                             stat = stat + '__rate'
223                             val = val / this_interval_s
224                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
225                                 task.series[category, stat].append(
226                                     (timestamp - self.starttime, val))
227                     else:
228                         if stat in ['rss','used','total']:
229                             task.series[category, stat].append(
230                                 (timestamp - self.starttime, val))
231                         self.task_stats[task_id][category][stat] = val
232                     if val > self.stats_max[category][stat]:
233                         self.stats_max[category][stat] = val
234         logger.debug('%s: done parsing', self.label)
235
236         self.job_tot = collections.defaultdict(
237             functools.partial(collections.defaultdict, int))
238         for task_id, task_stat in self.task_stats.items():
239             for category, stat_last in task_stat.items():
240                 for stat, val in stat_last.items():
241                     if stat in ['cpus', 'cache', 'swap', 'rss']:
242                         # meaningless stats like 16 cpu cores x 5 tasks = 80
243                         continue
244                     self.job_tot[category][stat] += val
245         logger.debug('%s: done totals', self.label)
246
247         missing_category = {
248             'cpu': 'CPU',
249             'mem': 'memory',
250             'net:': 'network I/O',
251             'statfs': 'storage space',
252         }
253         for task_stat in self.task_stats.values():
254             for category in task_stat.keys():
255                 for checkcat in missing_category:
256                     if checkcat.endswith(':'):
257                         if category.startswith(checkcat):
258                             missing_category.pop(checkcat)
259                             break
260                     else:
261                         if category == checkcat:
262                             missing_category.pop(checkcat)
263                             break
264         for catlabel in missing_category.values():
265             logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
266                         self.label, catlabel)
267
268     def long_label(self):
269         label = self.label
270         if hasattr(self, 'process') and self.process['uuid'] not in label:
271             label = '{} ({})'.format(label, self.process['uuid'])
272         if self.finishtime:
273             label += ' -- elapsed time '
274             s = (self.finishtime - self.starttime).total_seconds()
275             if s > 86400:
276                 label += '{}d'.format(int(s/86400))
277             if s > 3600:
278                 label += '{}h'.format(int(s/3600) % 24)
279             if s > 60:
280                 label += '{}m'.format(int(s/60) % 60)
281             label += '{}s'.format(int(s) % 60)
282         return label
283
284     def text_report(self):
285         if not self.tasks:
286             return "(no report generated)\n"
287         return "\n".join(itertools.chain(
288             self._text_report_gen(),
289             self._recommend_gen())) + "\n"
290
291     def html_report(self):
292         return WEBCHART_CLASS(self.label, [self]).html()
293
294     def _text_report_gen(self):
295         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
296         for category, stat_max in sorted(self.stats_max.items()):
297             for stat, val in sorted(stat_max.items()):
298                 if stat.endswith('__rate'):
299                     continue
300                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
301                 val = self._format(val)
302                 tot = self._format(self.job_tot[category].get(stat, '-'))
303                 yield "\t".join([category, stat, str(val), max_rate, tot])
304         for args in (
305                 ('Number of tasks: {}',
306                  len(self.tasks),
307                  None),
308                 ('Max CPU time spent by a single task: {}s',
309                  self.stats_max['cpu']['user+sys'],
310                  None),
311                 ('Max CPU usage in a single interval: {}%',
312                  self.stats_max['cpu']['user+sys__rate'],
313                  lambda x: x * 100),
314                 ('Overall CPU usage: {}%',
315                  float(self.job_tot['cpu']['user+sys']) /
316                  self.job_tot['time']['elapsed']
317                  if self.job_tot['time']['elapsed'] > 0 else 0,
318                  lambda x: x * 100),
319                 ('Max memory used by a single task: {}GB',
320                  self.stats_max['mem']['rss'],
321                  lambda x: x / 1e9),
322                 ('Max network traffic in a single task: {}GB',
323                  self.stats_max['net:eth0']['tx+rx'] +
324                  self.stats_max['net:keep0']['tx+rx'],
325                  lambda x: x / 1e9),
326                 ('Max network speed in a single interval: {}MB/s',
327                  self.stats_max['net:eth0']['tx+rx__rate'] +
328                  self.stats_max['net:keep0']['tx+rx__rate'],
329                  lambda x: x / 1e6),
330                 ('Keep cache miss rate {}%',
331                  (float(self.job_tot['keepcache']['miss']) /
332                  float(self.job_tot['keepcalls']['get']))
333                  if self.job_tot['keepcalls']['get'] > 0 else 0,
334                  lambda x: x * 100.0),
335                 ('Keep cache utilization {}%',
336                  (float(self.job_tot['blkio:0:0']['read']) /
337                  float(self.job_tot['net:keep0']['rx']))
338                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
339                  lambda x: x * 100.0),
340                ('Temp disk utilization {}%',
341                  (float(self.job_tot['statfs']['used']) /
342                  float(self.job_tot['statfs']['total']))
343                  if self.job_tot['statfs']['total'] > 0 else 0,
344                  lambda x: x * 100.0),
345                 ):
346             format_string, val, transform = args
347             if val == float('-Inf'):
348                 continue
349             if transform:
350                 val = transform(val)
351             yield "# "+format_string.format(self._format(val))
352
353     def _recommend_gen(self):
354         # TODO recommend fixing job granularity if elapsed time is too short
355         return itertools.chain(
356             self._recommend_cpu(),
357             self._recommend_ram(),
358             self._recommend_keep_cache(),
359             self._recommend_temp_disk(),
360             )
361
362     def _recommend_cpu(self):
363         """Recommend asking for 4 cores if max CPU usage was 333%"""
364
365         constraint_key = self._map_runtime_constraint('vcpus')
366         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
367         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
368             logger.warning('%s: no CPU usage data', self.label)
369             return
370         # TODO Don't necessarily want to recommend on isolated max peak
371         # take average CPU usage into account as well or % time at max
372         used_cores = max(1, int(math.ceil(cpu_max_rate)))
373         asked_cores = self.existing_constraints.get(constraint_key)
374         if asked_cores is None:
375             asked_cores = 1
376         # TODO: This should be more nuanced in cases where max >> avg
377         if used_cores < asked_cores:
378             yield (
379                 '#!! {} max CPU usage was {}% -- '
380                 'try reducing runtime_constraints to "{}":{}'
381             ).format(
382                 self.label,
383                 math.ceil(cpu_max_rate*100),
384                 constraint_key,
385                 int(used_cores))
386
387     # FIXME: This needs to be updated to account for current a-d-c algorithms
388     def _recommend_ram(self):
389         """Recommend an economical RAM constraint for this job.
390
391         Nodes that are advertised as "8 gibibytes" actually have what
392         we might call "8 nearlygibs" of memory available for jobs.
393         Here, we calculate a whole number of nearlygibs that would
394         have sufficed to run the job, then recommend requesting a node
395         with that number of nearlygibs (expressed as mebibytes).
396
397         Requesting a node with "nearly 8 gibibytes" is our best hope
398         of getting a node that actually has nearly 8 gibibytes
399         available.  If the node manager is smart enough to account for
400         the discrepancy itself when choosing/creating a node, we'll
401         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
402         advertised size of the next-size-smaller node (say, 6 GiB)
403         will be too low to satisfy our request, so we will effectively
404         get rounded up to 8 GiB.
405
406         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
407         we will generally get a node that is advertised as "8 GiB" and
408         has at least 7500 MiB available.  However, asking for 8192 MiB
409         would either result in an unnecessarily expensive 12 GiB node
410         (if node manager knows about the discrepancy), or an 8 GiB
411         node which has less than 8192 MiB available and is therefore
412         considered by crunch-dispatch to be too small to meet our
413         constraint.
414
415         When node manager learns how to predict the available memory
416         for each node type such that crunch-dispatch always agrees
417         that a node is big enough to run the job it was brought up
418         for, all this will be unnecessary.  We'll just ask for exactly
419         the memory we want -- even if that happens to be 8192 MiB.
420         """
421
422         constraint_key = self._map_runtime_constraint('ram')
423         used_bytes = self.stats_max['mem']['rss']
424         if used_bytes == float('-Inf'):
425             logger.warning('%s: no memory usage data', self.label)
426             return
427         used_mib = math.ceil(float(used_bytes) / MB)
428         asked_mib = self.existing_constraints.get(constraint_key)
429
430         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
431         if used_mib > 0 and (asked_mib is None or (
432                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
433             yield (
434                 '#!! {} max RSS was {} MiB -- '
435                 'try reducing runtime_constraints to "{}":{}'
436             ).format(
437                 self.label,
438                 int(used_mib),
439                 constraint_key,
440                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
441
442     def _recommend_keep_cache(self):
443         """Recommend increasing keep cache if utilization < 80%"""
444         constraint_key = self._map_runtime_constraint('keep_cache_ram')
445         if self.job_tot['net:keep0']['rx'] == 0:
446             return
447         utilization = (float(self.job_tot['blkio:0:0']['read']) /
448                        float(self.job_tot['net:keep0']['rx']))
449         # FIXME: the default on this get won't work correctly
450         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
451
452         if utilization < 0.8:
453             yield (
454                 '#!! {} Keep cache utilization was {:.2f}% -- '
455                 'try doubling runtime_constraints to "{}":{} (or more)'
456             ).format(
457                 self.label,
458                 utilization * 100.0,
459                 constraint_key,
460                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
461
462
463     def _recommend_temp_disk(self):
464         """Recommend decreasing temp disk if utilization < 50%"""
465         total = float(self.job_tot['statfs']['total'])
466         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
467
468         if utilization < 50.8 and total > 0:
469             yield (
470                 '#!! {} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
471                 'consider reducing "tmpdirMin" and/or "outdirMin"'
472             ).format(
473                 self.label,
474                 utilization * 100.0,
475                 total / MB)
476
477
478     def _format(self, val):
479         """Return a string representation of a stat.
480
481         {:.2f} for floats, default format for everything else."""
482         if isinstance(val, float):
483             return '{:.2f}'.format(val)
484         else:
485             return '{}'.format(val)
486
487     def _runtime_constraint_mem_unit(self):
488         if hasattr(self, 'runtime_constraint_mem_unit'):
489             return self.runtime_constraint_mem_unit
490         elif self.detected_crunch1:
491             return JobSummarizer.runtime_constraint_mem_unit
492         else:
493             return ContainerRequestSummarizer.runtime_constraint_mem_unit
494
495     def _map_runtime_constraint(self, key):
496         if hasattr(self, 'map_runtime_constraint'):
497             return self.map_runtime_constraint[key]
498         elif self.detected_crunch1:
499             return JobSummarizer.map_runtime_constraint[key]
500         else:
501             return key
502
503
504 class CollectionSummarizer(Summarizer):
505     def __init__(self, collection_id, **kwargs):
506         super(CollectionSummarizer, self).__init__(
507             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
508         self.label = collection_id
509
510
511 def NewSummarizer(process_or_uuid, **kwargs):
512     """Construct with the appropriate subclass for this uuid/object."""
513
514     if isinstance(process_or_uuid, dict):
515         process = process_or_uuid
516         uuid = process['uuid']
517     else:
518         uuid = process_or_uuid
519         process = None
520         arv = arvados.api('v1')
521
522     if '-dz642-' in uuid:
523         if process is None:
524             # Get the associated CR. Doesn't matter which since they all have the same logs
525             crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
526             if len(crs) > 0:
527                 process = crs[0]
528         klass = ContainerRequestTreeSummarizer
529     elif '-xvhdp-' in uuid:
530         if process is None:
531             process = arv.container_requests().get(uuid=uuid).execute()
532         klass = ContainerRequestTreeSummarizer
533     elif '-8i9sb-' in uuid:
534         if process is None:
535             process = arv.jobs().get(uuid=uuid).execute()
536         klass = JobTreeSummarizer
537     elif '-d1hrv-' in uuid:
538         if process is None:
539             process = arv.pipeline_instances().get(uuid=uuid).execute()
540         klass = PipelineSummarizer
541     elif '-4zz18-' in uuid:
542         return CollectionSummarizer(collection_id=uuid)
543     else:
544         raise ArgumentError("Unrecognized uuid %s", uuid)
545     return klass(process, uuid=uuid, **kwargs)
546
547
548 class ProcessSummarizer(Summarizer):
549     """Process is a job, pipeline, or container request."""
550
551     def __init__(self, process, label=None, **kwargs):
552         rdr = None
553         self.process = process
554         if label is None:
555             label = self.process.get('name', self.process['uuid'])
556         # Pre-Arvados v1.4 everything is in 'log'
557         # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
558         log_collection = self.process.get('log', self.process.get('log_uuid'))
559         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
560             try:
561                 rdr = crunchstat_summary.reader.CollectionReader(log_collection)
562             except arvados.errors.NotFoundError as e:
563                 logger.warning("Trying event logs after failing to read "
564                                "log collection %s: %s", self.process['log'], e)
565         if rdr is None:
566             uuid = self.process.get('container_uuid', self.process.get('uuid'))
567             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
568             label = label + ' (partial)'
569         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
570         self.existing_constraints = self.process.get('runtime_constraints', {})
571
572
573 class JobSummarizer(ProcessSummarizer):
574     runtime_constraint_mem_unit = MB
575     map_runtime_constraint = {
576         'keep_cache_ram': 'keep_cache_mb_per_task',
577         'ram': 'min_ram_mb_per_node',
578         'vcpus': 'min_cores_per_node',
579     }
580
581
582 class ContainerRequestSummarizer(ProcessSummarizer):
583     runtime_constraint_mem_unit = 1
584
585
586 class MultiSummarizer(object):
587     def __init__(self, children={}, label=None, threads=1, **kwargs):
588         self.throttle = threading.Semaphore(threads)
589         self.children = children
590         self.label = label
591
592     def run_and_release(self, target, *args, **kwargs):
593         try:
594             return target(*args, **kwargs)
595         finally:
596             self.throttle.release()
597
598     def run(self):
599         threads = []
600         for child in self.children.values():
601             self.throttle.acquire()
602             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
603             t.daemon = True
604             t.start()
605             threads.append(t)
606         for t in threads:
607             t.join()
608
609     def text_report(self):
610         txt = ''
611         d = self._descendants()
612         for child in d.values():
613             if len(d) > 1:
614                 txt += '### Summary for {} ({})\n'.format(
615                     child.label, child.process['uuid'])
616             txt += child.text_report()
617             txt += '\n'
618         return txt
619
620     def _descendants(self):
621         """Dict of self and all descendants.
622
623         Nodes with nothing of their own to report (like
624         MultiSummarizers) are omitted.
625         """
626         d = collections.OrderedDict()
627         for key, child in self.children.items():
628             if isinstance(child, Summarizer):
629                 d[key] = child
630             if isinstance(child, MultiSummarizer):
631                 d.update(child._descendants())
632         return d
633
634     def html_report(self):
635         return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
636
637
638 class JobTreeSummarizer(MultiSummarizer):
639     """Summarizes a job and all children listed in its components field."""
640     def __init__(self, job, label=None, **kwargs):
641         arv = arvados.api('v1')
642         label = label or job.get('name', job['uuid'])
643         children = collections.OrderedDict()
644         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
645         if job.get('components', None):
646             preloaded = {}
647             for j in arv.jobs().index(
648                     limit=len(job['components']),
649                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
650                 preloaded[j['uuid']] = j
651             for cname in sorted(job['components'].keys()):
652                 child_uuid = job['components'][cname]
653                 j = (preloaded.get(child_uuid) or
654                      arv.jobs().get(uuid=child_uuid).execute())
655                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
656
657         super(JobTreeSummarizer, self).__init__(
658             children=children,
659             label=label,
660             **kwargs)
661
662
663 class PipelineSummarizer(MultiSummarizer):
664     def __init__(self, instance, **kwargs):
665         children = collections.OrderedDict()
666         for cname, component in instance['components'].items():
667             if 'job' not in component:
668                 logger.warning(
669                     "%s: skipping component with no job assigned", cname)
670             else:
671                 logger.info(
672                     "%s: job %s", cname, component['job']['uuid'])
673                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
674                 summarizer.label = '{} {}'.format(
675                     cname, component['job']['uuid'])
676                 children[cname] = summarizer
677         super(PipelineSummarizer, self).__init__(
678             children=children,
679             label=instance['uuid'],
680             **kwargs)
681
682
683 class ContainerRequestTreeSummarizer(MultiSummarizer):
684     def __init__(self, root, skip_child_jobs=False, **kwargs):
685         arv = arvados.api('v1')
686
687         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
688         root['name'] = label
689
690         children = collections.OrderedDict()
691         todo = collections.deque((root, ))
692         while len(todo) > 0:
693             current = todo.popleft()
694             label = current['name']
695             sort_key = current['created_at']
696
697             summer = ContainerRequestSummarizer(current, label=label, **kwargs)
698             summer.sort_key = sort_key
699             children[current['uuid']] = summer
700
701             page_filters = []
702             while True:
703                 child_crs = arv.container_requests().index(
704                     order=['uuid asc'],
705                     filters=page_filters+[
706                         ['requesting_container_uuid', '=', current['container_uuid']]],
707                 ).execute()
708                 if not child_crs['items']:
709                     break
710                 elif skip_child_jobs:
711                     logger.warning('%s: omitting stats from %d child containers'
712                                    ' because --skip-child-jobs flag is on',
713                                    label, child_crs['items_available'])
714                     break
715                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
716                 for cr in child_crs['items']:
717                     if cr['container_uuid']:
718                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
719                         cr['name'] = cr.get('name') or cr['uuid']
720                         todo.append(cr)
721         sorted_children = collections.OrderedDict()
722         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
723             sorted_children[uuid] = children[uuid]
724         super(ContainerRequestTreeSummarizer, self).__init__(
725             children=sorted_children,
726             label=root['name'],
727             **kwargs)