20862: Add 'sdk/ruby-google-api-client/' from commit '2f4be67955e48bb65d008ecd9ff6da9...
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import threading
16 import _strptime
17
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
20
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
25 MB=2**20
26
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads.  https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
30
31
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
33
34
35 class Task(object):
36     def __init__(self):
37         self.starttime = None
38         self.finishtime = None
39         self.series = collections.defaultdict(list)
40
41
42 class Summarizer(object):
43     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44         self._logdata = logdata
45
46         self.uuid = uuid
47         self.label = label
48         self.starttime = None
49         self.finishtime = None
50         self._skip_child_jobs = skip_child_jobs
51
52         # stats_max: {category: {stat: val}}
53         self.stats_max = collections.defaultdict(
54             functools.partial(collections.defaultdict, lambda: 0))
55         # task_stats: {task_id: {category: {stat: val}}}
56         self.task_stats = collections.defaultdict(
57             functools.partial(collections.defaultdict, dict))
58
59         self.seq_to_uuid = {}
60         self.tasks = collections.defaultdict(Task)
61
62         # We won't bother recommending new runtime constraints if the
63         # constraints given when running the job are known to us and
64         # are already suitable.  If applicable, the subclass
65         # constructor will overwrite this with something useful.
66         self.existing_constraints = {}
67
68         logger.debug("%s: logdata %s", self.label, logdata)
69
70     def run(self):
71         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72         with self._logdata as logdata:
73             self._run(logdata)
74
75     def _run(self, logdata):
76         self.detected_crunch1 = False
77         for line in logdata:
78             if not self.detected_crunch1 and '-8i9sb-' in line:
79                 self.detected_crunch1 = True
80
81             if self.detected_crunch1:
82                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
83                 if m:
84                     seq = int(m.group('seq'))
85                     uuid = m.group('task_uuid')
86                     self.seq_to_uuid[seq] = uuid
87                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
88                     continue
89
90                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
91                 if m:
92                     task_id = self.seq_to_uuid[int(m.group('seq'))]
93                     elapsed = int(m.group('elapsed'))
94                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95                     if elapsed > self.stats_max['time']['elapsed']:
96                         self.stats_max['time']['elapsed'] = elapsed
97                     continue
98
99                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
100                 if m:
101                     uuid = m.group('uuid')
102                     if self._skip_child_jobs:
103                         logger.warning('%s: omitting stats from child job %s'
104                                        ' because --skip-child-jobs flag is on',
105                                        self.label, uuid)
106                         continue
107                     logger.debug('%s: follow %s', self.label, uuid)
108                     child_summarizer = NewSummarizer(uuid)
109                     child_summarizer.stats_max = self.stats_max
110                     child_summarizer.task_stats = self.task_stats
111                     child_summarizer.tasks = self.tasks
112                     child_summarizer.starttime = self.starttime
113                     child_summarizer.run()
114                     logger.debug('%s: done %s', self.label, uuid)
115                     continue
116
117                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
119                 if not m:
120                     continue
121             else:
122                 # crunch2
123                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
125                 if not m:
126                     continue
127
128             if self.label is None:
129                 try:
130                     self.label = m.group('job_uuid')
131                 except IndexError:
132                     self.label = 'label #1'
133             category = m.group('category')
134             if category.endswith(':'):
135                 # "stderr crunchstat: notice: ..."
136                 continue
137             elif category in ('error', 'caught'):
138                 continue
139             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
140                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
141                 # (old logs are less careful with unprefixed error messages)
142                 continue
143
144             if self.detected_crunch1:
145                 task_id = self.seq_to_uuid[int(m.group('seq'))]
146             else:
147                 task_id = 'container'
148             task = self.tasks[task_id]
149
150             # Use the first and last crunchstat timestamps as
151             # approximations of starttime and finishtime.
152             timestamp = m.group('timestamp')
153             if timestamp[10:11] == '_':
154                 timestamp = datetime.datetime.strptime(
155                     timestamp, '%Y-%m-%d_%H:%M:%S')
156             elif timestamp[10:11] == 'T':
157                 timestamp = datetime.datetime.strptime(
158                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
159             else:
160                 raise ValueError("Cannot parse timestamp {!r}".format(
161                     timestamp))
162
163             if task.starttime is None:
164                 logger.debug('%s: task %s starttime %s',
165                              self.label, task_id, timestamp)
166             if task.starttime is None or timestamp < task.starttime:
167                 task.starttime = timestamp
168             if task.finishtime is None or timestamp > task.finishtime:
169                 task.finishtime = timestamp
170
171             if self.starttime is None or timestamp < self.starttime:
172                 self.starttime = timestamp
173             if self.finishtime is None or timestamp > self.finishtime:
174                 self.finishtime = timestamp
175
176             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
177                 elapsed = (task.finishtime - task.starttime).seconds
178                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
179                 if elapsed > self.stats_max['time']['elapsed']:
180                     self.stats_max['time']['elapsed'] = elapsed
181
182             this_interval_s = None
183             for group in ['current', 'interval']:
184                 if not m.group(group):
185                     continue
186                 category = m.group('category')
187                 words = m.group(group).split(' ')
188                 stats = {}
189                 try:
190                     for val, stat in zip(words[::2], words[1::2]):
191                         if '.' in val:
192                             stats[stat] = float(val)
193                         else:
194                             stats[stat] = int(val)
195                 except ValueError as e:
196                     # If the line doesn't start with 'crunchstat:' we
197                     # might have mistaken an error message for a
198                     # structured crunchstat line.
199                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
200                         logger.warning("%s: log contains message\n  %s", self.label, line)
201                     else:
202                         logger.warning(
203                             '%s: Error parsing value %r (stat %r, category %r): %r',
204                             self.label, val, stat, category, e)
205                         logger.warning('%s', line)
206                     continue
207                 if 'user' in stats or 'sys' in stats:
208                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
209                 if 'tx' in stats or 'rx' in stats:
210                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
211                 if group == 'interval':
212                     if 'seconds' in stats:
213                         this_interval_s = stats.get('seconds',0)
214                         del stats['seconds']
215                         if this_interval_s <= 0:
216                             logger.error(
217                                 "BUG? interval stat given with duration {!r}".
218                                 format(this_interval_s))
219                     else:
220                         logger.error('BUG? interval stat missing duration')
221                 for stat, val in stats.items():
222                     if group == 'interval' and this_interval_s:
223                             stat = stat + '__rate'
224                             val = val / this_interval_s
225                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
226                                 task.series[category, stat].append(
227                                     (timestamp - self.starttime, val))
228                     else:
229                         if stat in ['rss','used','total']:
230                             task.series[category, stat].append(
231                                 (timestamp - self.starttime, val))
232                         self.task_stats[task_id][category][stat] = val
233                     if val > self.stats_max[category][stat]:
234                         self.stats_max[category][stat] = val
235         logger.debug('%s: done parsing', self.label)
236
237         self.job_tot = collections.defaultdict(
238             functools.partial(collections.defaultdict, int))
239         for task_id, task_stat in self.task_stats.items():
240             for category, stat_last in task_stat.items():
241                 for stat, val in stat_last.items():
242                     if stat in ['cpus', 'cache', 'swap', 'rss']:
243                         # meaningless stats like 16 cpu cores x 5 tasks = 80
244                         continue
245                     self.job_tot[category][stat] += val
246         logger.debug('%s: done totals', self.label)
247
248         missing_category = {
249             'cpu': 'CPU',
250             'mem': 'memory',
251             'net:': 'network I/O',
252             'statfs': 'storage space',
253         }
254         for task_stat in self.task_stats.values():
255             for category in task_stat.keys():
256                 for checkcat in missing_category:
257                     if checkcat.endswith(':'):
258                         if category.startswith(checkcat):
259                             missing_category.pop(checkcat)
260                             break
261                     else:
262                         if category == checkcat:
263                             missing_category.pop(checkcat)
264                             break
265         for catlabel in missing_category.values():
266             logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
267                         self.label, catlabel)
268
269     def long_label(self):
270         label = self.label
271         if hasattr(self, 'process') and self.process['uuid'] not in label:
272             label = '{} ({})'.format(label, self.process['uuid'])
273         if self.finishtime:
274             label += ' -- elapsed time '
275             s = (self.finishtime - self.starttime).total_seconds()
276             if s > 86400:
277                 label += '{}d'.format(int(s/86400))
278             if s > 3600:
279                 label += '{}h'.format(int(s/3600) % 24)
280             if s > 60:
281                 label += '{}m'.format(int(s/60) % 60)
282             label += '{}s'.format(int(s) % 60)
283         return label
284
285     def text_report(self):
286         if not self.tasks:
287             return "(no report generated)\n"
288         return "\n".join(itertools.chain(
289             self._text_report_gen(),
290             self._recommend_gen())) + "\n"
291
292     def html_report(self):
293         return WEBCHART_CLASS(self.label, [self]).html()
294
295     def _text_report_gen(self):
296         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
297         for category, stat_max in sorted(self.stats_max.items()):
298             for stat, val in sorted(stat_max.items()):
299                 if stat.endswith('__rate'):
300                     continue
301                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
302                 val = self._format(val)
303                 tot = self._format(self.job_tot[category].get(stat, '-'))
304                 yield "\t".join([category, stat, str(val), max_rate, tot])
305         for args in (
306                 ('Number of tasks: {}',
307                  len(self.tasks),
308                  None),
309                 ('Max CPU time spent by a single task: {}s',
310                  self.stats_max['cpu']['user+sys'],
311                  None),
312                 ('Max CPU usage in a single interval: {}%',
313                  self.stats_max['cpu']['user+sys__rate'],
314                  lambda x: x * 100),
315                 ('Overall CPU usage: {}%',
316                  float(self.job_tot['cpu']['user+sys']) /
317                  self.job_tot['time']['elapsed']
318                  if self.job_tot['time']['elapsed'] > 0 else 0,
319                  lambda x: x * 100),
320                 ('Max memory used by a single task: {}GB',
321                  self.stats_max['mem']['rss'],
322                  lambda x: x / 1e9),
323                 ('Max network traffic in a single task: {}GB',
324                  self.stats_max['net:eth0']['tx+rx'] +
325                  self.stats_max['net:keep0']['tx+rx'],
326                  lambda x: x / 1e9),
327                 ('Max network speed in a single interval: {}MB/s',
328                  self.stats_max['net:eth0']['tx+rx__rate'] +
329                  self.stats_max['net:keep0']['tx+rx__rate'],
330                  lambda x: x / 1e6),
331                 ('Keep cache miss rate {}%',
332                  (float(self.job_tot['keepcache']['miss']) /
333                  float(self.job_tot['keepcalls']['get']))
334                  if self.job_tot['keepcalls']['get'] > 0 else 0,
335                  lambda x: x * 100.0),
336                 ('Keep cache utilization {}%',
337                  (float(self.job_tot['blkio:0:0']['read']) /
338                  float(self.job_tot['net:keep0']['rx']))
339                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
340                  lambda x: x * 100.0),
341                ('Temp disk utilization {}%',
342                  (float(self.job_tot['statfs']['used']) /
343                  float(self.job_tot['statfs']['total']))
344                  if self.job_tot['statfs']['total'] > 0 else 0,
345                  lambda x: x * 100.0),
346                 ):
347             format_string, val, transform = args
348             if val == float('-Inf'):
349                 continue
350             if transform:
351                 val = transform(val)
352             yield "# "+format_string.format(self._format(val))
353
354     def _recommend_gen(self):
355         # TODO recommend fixing job granularity if elapsed time is too short
356         return itertools.chain(
357             self._recommend_cpu(),
358             self._recommend_ram(),
359             self._recommend_keep_cache(),
360             self._recommend_temp_disk(),
361             )
362
363     def _recommend_cpu(self):
364         """Recommend asking for 4 cores if max CPU usage was 333%"""
365
366         constraint_key = self._map_runtime_constraint('vcpus')
367         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
368         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
369             logger.warning('%s: no CPU usage data', self.label)
370             return
371         # TODO Don't necessarily want to recommend on isolated max peak
372         # take average CPU usage into account as well or % time at max
373         used_cores = max(1, int(math.ceil(cpu_max_rate)))
374         asked_cores = self.existing_constraints.get(constraint_key)
375         if asked_cores is None:
376             asked_cores = 1
377         # TODO: This should be more nuanced in cases where max >> avg
378         if used_cores < asked_cores:
379             yield (
380                 '#!! {} max CPU usage was {}% -- '
381                 'try reducing runtime_constraints to "{}":{}'
382             ).format(
383                 self.label,
384                 math.ceil(cpu_max_rate*100),
385                 constraint_key,
386                 int(used_cores))
387
388     # FIXME: This needs to be updated to account for current a-d-c algorithms
389     def _recommend_ram(self):
390         """Recommend an economical RAM constraint for this job.
391
392         Nodes that are advertised as "8 gibibytes" actually have what
393         we might call "8 nearlygibs" of memory available for jobs.
394         Here, we calculate a whole number of nearlygibs that would
395         have sufficed to run the job, then recommend requesting a node
396         with that number of nearlygibs (expressed as mebibytes).
397
398         Requesting a node with "nearly 8 gibibytes" is our best hope
399         of getting a node that actually has nearly 8 gibibytes
400         available.  If the node manager is smart enough to account for
401         the discrepancy itself when choosing/creating a node, we'll
402         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
403         advertised size of the next-size-smaller node (say, 6 GiB)
404         will be too low to satisfy our request, so we will effectively
405         get rounded up to 8 GiB.
406
407         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
408         we will generally get a node that is advertised as "8 GiB" and
409         has at least 7500 MiB available.  However, asking for 8192 MiB
410         would either result in an unnecessarily expensive 12 GiB node
411         (if node manager knows about the discrepancy), or an 8 GiB
412         node which has less than 8192 MiB available and is therefore
413         considered by crunch-dispatch to be too small to meet our
414         constraint.
415
416         When node manager learns how to predict the available memory
417         for each node type such that crunch-dispatch always agrees
418         that a node is big enough to run the job it was brought up
419         for, all this will be unnecessary.  We'll just ask for exactly
420         the memory we want -- even if that happens to be 8192 MiB.
421         """
422
423         constraint_key = self._map_runtime_constraint('ram')
424         used_bytes = self.stats_max['mem']['rss']
425         if used_bytes == float('-Inf'):
426             logger.warning('%s: no memory usage data', self.label)
427             return
428         used_mib = math.ceil(float(used_bytes) / MB)
429         asked_mib = self.existing_constraints.get(constraint_key)
430
431         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
432         if used_mib > 0 and (asked_mib is None or (
433                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
434             yield (
435                 '#!! {} max RSS was {} MiB -- '
436                 'try reducing runtime_constraints to "{}":{}'
437             ).format(
438                 self.label,
439                 int(used_mib),
440                 constraint_key,
441                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
442
443     def _recommend_keep_cache(self):
444         """Recommend increasing keep cache if utilization < 80%"""
445         constraint_key = self._map_runtime_constraint('keep_cache_ram')
446         if self.job_tot['net:keep0']['rx'] == 0:
447             return
448         utilization = (float(self.job_tot['blkio:0:0']['read']) /
449                        float(self.job_tot['net:keep0']['rx']))
450         # FIXME: the default on this get won't work correctly
451         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
452
453         if utilization < 0.8:
454             yield (
455                 '#!! {} Keep cache utilization was {:.2f}% -- '
456                 'try doubling runtime_constraints to "{}":{} (or more)'
457             ).format(
458                 self.label,
459                 utilization * 100.0,
460                 constraint_key,
461                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
462
463
464     def _recommend_temp_disk(self):
465         """Recommend decreasing temp disk if utilization < 50%"""
466         total = float(self.job_tot['statfs']['total'])
467         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
468
469         if utilization < 50.8 and total > 0:
470             yield (
471                 '#!! {} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
472                 'consider reducing "tmpdirMin" and/or "outdirMin"'
473             ).format(
474                 self.label,
475                 utilization * 100.0,
476                 total / MB)
477
478
479     def _format(self, val):
480         """Return a string representation of a stat.
481
482         {:.2f} for floats, default format for everything else."""
483         if isinstance(val, float):
484             return '{:.2f}'.format(val)
485         else:
486             return '{}'.format(val)
487
488     def _runtime_constraint_mem_unit(self):
489         if hasattr(self, 'runtime_constraint_mem_unit'):
490             return self.runtime_constraint_mem_unit
491         elif self.detected_crunch1:
492             return JobSummarizer.runtime_constraint_mem_unit
493         else:
494             return ContainerRequestSummarizer.runtime_constraint_mem_unit
495
496     def _map_runtime_constraint(self, key):
497         if hasattr(self, 'map_runtime_constraint'):
498             return self.map_runtime_constraint[key]
499         elif self.detected_crunch1:
500             return JobSummarizer.map_runtime_constraint[key]
501         else:
502             return key
503
504
505 class CollectionSummarizer(Summarizer):
506     def __init__(self, collection_id, **kwargs):
507         super(CollectionSummarizer, self).__init__(
508             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
509         self.label = collection_id
510
511
512 def NewSummarizer(process_or_uuid, **kwargs):
513     """Construct with the appropriate subclass for this uuid/object."""
514
515     if isinstance(process_or_uuid, dict):
516         process = process_or_uuid
517         uuid = process['uuid']
518     else:
519         uuid = process_or_uuid
520         process = None
521         arv = arvados.api('v1', model=OrderedJsonModel())
522
523     if '-dz642-' in uuid:
524         if process is None:
525             # Get the associated CR. Doesn't matter which since they all have the same logs
526             crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
527             if len(crs) > 0:
528                 process = crs[0]
529         klass = ContainerRequestTreeSummarizer
530     elif '-xvhdp-' in uuid:
531         if process is None:
532             process = arv.container_requests().get(uuid=uuid).execute()
533         klass = ContainerRequestTreeSummarizer
534     elif '-8i9sb-' in uuid:
535         if process is None:
536             process = arv.jobs().get(uuid=uuid).execute()
537         klass = JobTreeSummarizer
538     elif '-d1hrv-' in uuid:
539         if process is None:
540             process = arv.pipeline_instances().get(uuid=uuid).execute()
541         klass = PipelineSummarizer
542     elif '-4zz18-' in uuid:
543         return CollectionSummarizer(collection_id=uuid)
544     else:
545         raise ArgumentError("Unrecognized uuid %s", uuid)
546     return klass(process, uuid=uuid, **kwargs)
547
548
549 class ProcessSummarizer(Summarizer):
550     """Process is a job, pipeline, or container request."""
551
552     def __init__(self, process, label=None, **kwargs):
553         rdr = None
554         self.process = process
555         if label is None:
556             label = self.process.get('name', self.process['uuid'])
557         # Pre-Arvados v1.4 everything is in 'log'
558         # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
559         log_collection = self.process.get('log', self.process.get('log_uuid'))
560         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
561             try:
562                 rdr = crunchstat_summary.reader.CollectionReader(log_collection)
563             except arvados.errors.NotFoundError as e:
564                 logger.warning("Trying event logs after failing to read "
565                                "log collection %s: %s", self.process['log'], e)
566         if rdr is None:
567             uuid = self.process.get('container_uuid', self.process.get('uuid'))
568             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
569             label = label + ' (partial)'
570         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
571         self.existing_constraints = self.process.get('runtime_constraints', {})
572
573
574 class JobSummarizer(ProcessSummarizer):
575     runtime_constraint_mem_unit = MB
576     map_runtime_constraint = {
577         'keep_cache_ram': 'keep_cache_mb_per_task',
578         'ram': 'min_ram_mb_per_node',
579         'vcpus': 'min_cores_per_node',
580     }
581
582
583 class ContainerRequestSummarizer(ProcessSummarizer):
584     runtime_constraint_mem_unit = 1
585
586
587 class MultiSummarizer(object):
588     def __init__(self, children={}, label=None, threads=1, **kwargs):
589         self.throttle = threading.Semaphore(threads)
590         self.children = children
591         self.label = label
592
593     def run_and_release(self, target, *args, **kwargs):
594         try:
595             return target(*args, **kwargs)
596         finally:
597             self.throttle.release()
598
599     def run(self):
600         threads = []
601         for child in self.children.values():
602             self.throttle.acquire()
603             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
604             t.daemon = True
605             t.start()
606             threads.append(t)
607         for t in threads:
608             t.join()
609
610     def text_report(self):
611         txt = ''
612         d = self._descendants()
613         for child in d.values():
614             if len(d) > 1:
615                 txt += '### Summary for {} ({})\n'.format(
616                     child.label, child.process['uuid'])
617             txt += child.text_report()
618             txt += '\n'
619         return txt
620
621     def _descendants(self):
622         """Dict of self and all descendants.
623
624         Nodes with nothing of their own to report (like
625         MultiSummarizers) are omitted.
626         """
627         d = collections.OrderedDict()
628         for key, child in self.children.items():
629             if isinstance(child, Summarizer):
630                 d[key] = child
631             if isinstance(child, MultiSummarizer):
632                 d.update(child._descendants())
633         return d
634
635     def html_report(self):
636         return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
637
638
639 class JobTreeSummarizer(MultiSummarizer):
640     """Summarizes a job and all children listed in its components field."""
641     def __init__(self, job, label=None, **kwargs):
642         arv = arvados.api('v1', model=OrderedJsonModel())
643         label = label or job.get('name', job['uuid'])
644         children = collections.OrderedDict()
645         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
646         if job.get('components', None):
647             preloaded = {}
648             for j in arv.jobs().index(
649                     limit=len(job['components']),
650                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
651                 preloaded[j['uuid']] = j
652             for cname in sorted(job['components'].keys()):
653                 child_uuid = job['components'][cname]
654                 j = (preloaded.get(child_uuid) or
655                      arv.jobs().get(uuid=child_uuid).execute())
656                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
657
658         super(JobTreeSummarizer, self).__init__(
659             children=children,
660             label=label,
661             **kwargs)
662
663
664 class PipelineSummarizer(MultiSummarizer):
665     def __init__(self, instance, **kwargs):
666         children = collections.OrderedDict()
667         for cname, component in instance['components'].items():
668             if 'job' not in component:
669                 logger.warning(
670                     "%s: skipping component with no job assigned", cname)
671             else:
672                 logger.info(
673                     "%s: job %s", cname, component['job']['uuid'])
674                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
675                 summarizer.label = '{} {}'.format(
676                     cname, component['job']['uuid'])
677                 children[cname] = summarizer
678         super(PipelineSummarizer, self).__init__(
679             children=children,
680             label=instance['uuid'],
681             **kwargs)
682
683
684 class ContainerRequestTreeSummarizer(MultiSummarizer):
685     def __init__(self, root, skip_child_jobs=False, **kwargs):
686         arv = arvados.api('v1', model=OrderedJsonModel())
687
688         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
689         root['name'] = label
690
691         children = collections.OrderedDict()
692         todo = collections.deque((root, ))
693         while len(todo) > 0:
694             current = todo.popleft()
695             label = current['name']
696             sort_key = current['created_at']
697
698             summer = ContainerRequestSummarizer(current, label=label, **kwargs)
699             summer.sort_key = sort_key
700             children[current['uuid']] = summer
701
702             page_filters = []
703             while True:
704                 child_crs = arv.container_requests().index(
705                     order=['uuid asc'],
706                     filters=page_filters+[
707                         ['requesting_container_uuid', '=', current['container_uuid']]],
708                 ).execute()
709                 if not child_crs['items']:
710                     break
711                 elif skip_child_jobs:
712                     logger.warning('%s: omitting stats from %d child containers'
713                                    ' because --skip-child-jobs flag is on',
714                                    label, child_crs['items_available'])
715                     break
716                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
717                 for cr in child_crs['items']:
718                     if cr['container_uuid']:
719                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
720                         cr['name'] = cr.get('name') or cr['uuid']
721                         todo.append(cr)
722         sorted_children = collections.OrderedDict()
723         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
724             sorted_children[uuid] = children[uuid]
725         super(ContainerRequestTreeSummarizer, self).__init__(
726             children=sorted_children,
727             label=root['name'],
728             **kwargs)