Merge branch 'master' into 12513-make-dashboard-fast-again
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import collections
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
11 import datetime
12 import functools
13 import itertools
14 import math
15 import re
16 import sys
17 import threading
18 import _strptime
19
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
22
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
27
28
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads.  https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32
33
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
35
36
37 class Task(object):
38     def __init__(self):
39         self.starttime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45         self._logdata = logdata
46
47         self.uuid = uuid
48         self.label = label
49         self.starttime = None
50         self.finishtime = None
51         self._skip_child_jobs = skip_child_jobs
52
53         # stats_max: {category: {stat: val}}
54         self.stats_max = collections.defaultdict(
55             functools.partial(collections.defaultdict, lambda: 0))
56         # task_stats: {task_id: {category: {stat: val}}}
57         self.task_stats = collections.defaultdict(
58             functools.partial(collections.defaultdict, dict))
59
60         self.seq_to_uuid = {}
61         self.tasks = collections.defaultdict(Task)
62
63         # We won't bother recommending new runtime constraints if the
64         # constraints given when running the job are known to us and
65         # are already suitable.  If applicable, the subclass
66         # constructor will overwrite this with something useful.
67         self.existing_constraints = {}
68
69         logger.debug("%s: logdata %s", self.label, logdata)
70
71     def run(self):
72         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
73         with self._logdata as logdata:
74             self._run(logdata)
75
76     def _run(self, logdata):
77         self.detected_crunch1 = False
78         for line in logdata:
79             if not self.detected_crunch1 and '-8i9sb-' in line:
80                 self.detected_crunch1 = True
81
82             if self.detected_crunch1:
83                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
84                 if m:
85                     seq = int(m.group('seq'))
86                     uuid = m.group('task_uuid')
87                     self.seq_to_uuid[seq] = uuid
88                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
89                     continue
90
91                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
92                 if m:
93                     task_id = self.seq_to_uuid[int(m.group('seq'))]
94                     elapsed = int(m.group('elapsed'))
95                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
96                     if elapsed > self.stats_max['time']['elapsed']:
97                         self.stats_max['time']['elapsed'] = elapsed
98                     continue
99
100                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
101                 if m:
102                     uuid = m.group('uuid')
103                     if self._skip_child_jobs:
104                         logger.warning('%s: omitting stats from child job %s'
105                                        ' because --skip-child-jobs flag is on',
106                                        self.label, uuid)
107                         continue
108                     logger.debug('%s: follow %s', self.label, uuid)
109                     child_summarizer = ProcessSummarizer(uuid)
110                     child_summarizer.stats_max = self.stats_max
111                     child_summarizer.task_stats = self.task_stats
112                     child_summarizer.tasks = self.tasks
113                     child_summarizer.starttime = self.starttime
114                     child_summarizer.run()
115                     logger.debug('%s: done %s', self.label, uuid)
116                     continue
117
118                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
119                 if not m:
120                     continue
121             else:
122                 # crunch2
123                 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
124                 if not m:
125                     continue
126
127             if self.label is None:
128                 try:
129                     self.label = m.group('job_uuid')
130                 except IndexError:
131                     self.label = 'container'
132             if m.group('category').endswith(':'):
133                 # "stderr crunchstat: notice: ..."
134                 continue
135             elif m.group('category') in ('error', 'caught'):
136                 continue
137             elif m.group('category') in ('read', 'open', 'cgroup', 'CID', 'Running'):
138                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
139                 # (old logs are less careful with unprefixed error messages)
140                 continue
141
142             if self.detected_crunch1:
143                 task_id = self.seq_to_uuid[int(m.group('seq'))]
144             else:
145                 task_id = 'container'
146             task = self.tasks[task_id]
147
148             # Use the first and last crunchstat timestamps as
149             # approximations of starttime and finishtime.
150             timestamp = m.group('timestamp')
151             if timestamp[10:11] == '_':
152                 timestamp = datetime.datetime.strptime(
153                     timestamp, '%Y-%m-%d_%H:%M:%S')
154             elif timestamp[10:11] == 'T':
155                 timestamp = datetime.datetime.strptime(
156                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
157             else:
158                 raise ValueError("Cannot parse timestamp {!r}".format(
159                     timestamp))
160
161             if not task.starttime:
162                 task.starttime = timestamp
163                 logger.debug('%s: task %s starttime %s',
164                              self.label, task_id, timestamp)
165             task.finishtime = timestamp
166
167             if not self.starttime:
168                 self.starttime = timestamp
169             self.finishtime = timestamp
170
171             this_interval_s = None
172             for group in ['current', 'interval']:
173                 if not m.group(group):
174                     continue
175                 category = m.group('category')
176                 words = m.group(group).split(' ')
177                 stats = {}
178                 try:
179                     for val, stat in zip(words[::2], words[1::2]):
180                         if '.' in val:
181                             stats[stat] = float(val)
182                         else:
183                             stats[stat] = int(val)
184                 except ValueError as e:
185                     logger.warning(
186                         'Error parsing value %r (stat %r, category %r): %r',
187                         val, stat, category, e)
188                     logger.warning('%s', line)
189                     continue
190                 if 'user' in stats or 'sys' in stats:
191                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
192                 if 'tx' in stats or 'rx' in stats:
193                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
194                 for stat, val in stats.iteritems():
195                     if group == 'interval':
196                         if stat == 'seconds':
197                             this_interval_s = val
198                             continue
199                         elif not (this_interval_s > 0):
200                             logger.error(
201                                 "BUG? interval stat given with duration {!r}".
202                                 format(this_interval_s))
203                             continue
204                         else:
205                             stat = stat + '__rate'
206                             val = val / this_interval_s
207                             if stat in ['user+sys__rate', 'tx+rx__rate']:
208                                 task.series[category, stat].append(
209                                     (timestamp - self.starttime, val))
210                     else:
211                         if stat in ['rss']:
212                             task.series[category, stat].append(
213                                 (timestamp - self.starttime, val))
214                         self.task_stats[task_id][category][stat] = val
215                     if val > self.stats_max[category][stat]:
216                         self.stats_max[category][stat] = val
217         logger.debug('%s: done parsing', self.label)
218
219         self.job_tot = collections.defaultdict(
220             functools.partial(collections.defaultdict, int))
221         for task_id, task_stat in self.task_stats.iteritems():
222             for category, stat_last in task_stat.iteritems():
223                 for stat, val in stat_last.iteritems():
224                     if stat in ['cpus', 'cache', 'swap', 'rss']:
225                         # meaningless stats like 16 cpu cores x 5 tasks = 80
226                         continue
227                     self.job_tot[category][stat] += val
228         logger.debug('%s: done totals', self.label)
229
230     def long_label(self):
231         label = self.label
232         if hasattr(self, 'process') and self.process['uuid'] not in label:
233             label = '{} ({})'.format(label, self.process['uuid'])
234         if self.finishtime:
235             label += ' -- elapsed time '
236             s = (self.finishtime - self.starttime).total_seconds()
237             if s > 86400:
238                 label += '{}d'.format(int(s/86400))
239             if s > 3600:
240                 label += '{}h'.format(int(s/3600) % 24)
241             if s > 60:
242                 label += '{}m'.format(int(s/60) % 60)
243             label += '{}s'.format(int(s) % 60)
244         return label
245
246     def text_report(self):
247         if not self.tasks:
248             return "(no report generated)\n"
249         return "\n".join(itertools.chain(
250             self._text_report_gen(),
251             self._recommend_gen())) + "\n"
252
253     def html_report(self):
254         return WEBCHART_CLASS(self.label, [self]).html()
255
256     def _text_report_gen(self):
257         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
258         for category, stat_max in sorted(self.stats_max.iteritems()):
259             for stat, val in sorted(stat_max.iteritems()):
260                 if stat.endswith('__rate'):
261                     continue
262                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
263                 val = self._format(val)
264                 tot = self._format(self.job_tot[category].get(stat, '-'))
265                 yield "\t".join([category, stat, str(val), max_rate, tot])
266         for args in (
267                 ('Number of tasks: {}',
268                  len(self.tasks),
269                  None),
270                 ('Max CPU time spent by a single task: {}s',
271                  self.stats_max['cpu']['user+sys'],
272                  None),
273                 ('Max CPU usage in a single interval: {}%',
274                  self.stats_max['cpu']['user+sys__rate'],
275                  lambda x: x * 100),
276                 ('Overall CPU usage: {}%',
277                  self.job_tot['cpu']['user+sys'] /
278                  self.job_tot['time']['elapsed']
279                  if self.job_tot['time']['elapsed'] > 0 else 0,
280                  lambda x: x * 100),
281                 ('Max memory used by a single task: {}GB',
282                  self.stats_max['mem']['rss'],
283                  lambda x: x / 1e9),
284                 ('Max network traffic in a single task: {}GB',
285                  self.stats_max['net:eth0']['tx+rx'] +
286                  self.stats_max['net:keep0']['tx+rx'],
287                  lambda x: x / 1e9),
288                 ('Max network speed in a single interval: {}MB/s',
289                  self.stats_max['net:eth0']['tx+rx__rate'] +
290                  self.stats_max['net:keep0']['tx+rx__rate'],
291                  lambda x: x / 1e6),
292                 ('Keep cache miss rate {}%',
293                  (float(self.job_tot['keepcache']['miss']) /
294                  float(self.job_tot['keepcalls']['get']))
295                  if self.job_tot['keepcalls']['get'] > 0 else 0,
296                  lambda x: x * 100.0),
297                 ('Keep cache utilization {}%',
298                  (float(self.job_tot['blkio:0:0']['read']) /
299                  float(self.job_tot['net:keep0']['rx']))
300                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
301                  lambda x: x * 100.0)):
302             format_string, val, transform = args
303             if val == float('-Inf'):
304                 continue
305             if transform:
306                 val = transform(val)
307             yield "# "+format_string.format(self._format(val))
308
309     def _recommend_gen(self):
310         return itertools.chain(
311             self._recommend_cpu(),
312             self._recommend_ram(),
313             self._recommend_keep_cache())
314
315     def _recommend_cpu(self):
316         """Recommend asking for 4 cores if max CPU usage was 333%"""
317
318         constraint_key = self._map_runtime_constraint('vcpus')
319         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
320         if cpu_max_rate == float('-Inf'):
321             logger.warning('%s: no CPU usage data', self.label)
322             return
323         used_cores = max(1, int(math.ceil(cpu_max_rate)))
324         asked_cores = self.existing_constraints.get(constraint_key)
325         if asked_cores is None or used_cores < asked_cores:
326             yield (
327                 '#!! {} max CPU usage was {}% -- '
328                 'try runtime_constraints "{}":{}'
329             ).format(
330                 self.label,
331                 int(math.ceil(cpu_max_rate*100)),
332                 constraint_key,
333                 int(used_cores))
334
335     def _recommend_ram(self):
336         """Recommend an economical RAM constraint for this job.
337
338         Nodes that are advertised as "8 gibibytes" actually have what
339         we might call "8 nearlygibs" of memory available for jobs.
340         Here, we calculate a whole number of nearlygibs that would
341         have sufficed to run the job, then recommend requesting a node
342         with that number of nearlygibs (expressed as mebibytes).
343
344         Requesting a node with "nearly 8 gibibytes" is our best hope
345         of getting a node that actually has nearly 8 gibibytes
346         available.  If the node manager is smart enough to account for
347         the discrepancy itself when choosing/creating a node, we'll
348         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
349         advertised size of the next-size-smaller node (say, 6 GiB)
350         will be too low to satisfy our request, so we will effectively
351         get rounded up to 8 GiB.
352
353         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
354         we will generally get a node that is advertised as "8 GiB" and
355         has at least 7500 MiB available.  However, asking for 8192 MiB
356         would either result in an unnecessarily expensive 12 GiB node
357         (if node manager knows about the discrepancy), or an 8 GiB
358         node which has less than 8192 MiB available and is therefore
359         considered by crunch-dispatch to be too small to meet our
360         constraint.
361
362         When node manager learns how to predict the available memory
363         for each node type such that crunch-dispatch always agrees
364         that a node is big enough to run the job it was brought up
365         for, all this will be unnecessary.  We'll just ask for exactly
366         the memory we want -- even if that happens to be 8192 MiB.
367         """
368
369         constraint_key = self._map_runtime_constraint('ram')
370         used_bytes = self.stats_max['mem']['rss']
371         if used_bytes == float('-Inf'):
372             logger.warning('%s: no memory usage data', self.label)
373             return
374         used_mib = math.ceil(float(used_bytes) / 1048576)
375         asked_mib = self.existing_constraints.get(constraint_key)
376
377         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
378         if asked_mib is None or (
379                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
380             yield (
381                 '#!! {} max RSS was {} MiB -- '
382                 'try runtime_constraints "{}":{}'
383             ).format(
384                 self.label,
385                 int(used_mib),
386                 constraint_key,
387                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
388
389     def _recommend_keep_cache(self):
390         """Recommend increasing keep cache if utilization < 80%"""
391         constraint_key = self._map_runtime_constraint('keep_cache_ram')
392         if self.job_tot['net:keep0']['rx'] == 0:
393             return
394         utilization = (float(self.job_tot['blkio:0:0']['read']) /
395                        float(self.job_tot['net:keep0']['rx']))
396         asked_mib = self.existing_constraints.get(constraint_key, 256)
397
398         if utilization < 0.8:
399             yield (
400                 '#!! {} Keep cache utilization was {:.2f}% -- '
401                 'try runtime_constraints "{}":{} (or more)'
402             ).format(
403                 self.label,
404                 utilization * 100.0,
405                 constraint_key,
406                 asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
407
408
409     def _format(self, val):
410         """Return a string representation of a stat.
411
412         {:.2f} for floats, default format for everything else."""
413         if isinstance(val, float):
414             return '{:.2f}'.format(val)
415         else:
416             return '{}'.format(val)
417
418     def _runtime_constraint_mem_unit(self):
419         if hasattr(self, 'runtime_constraint_mem_unit'):
420             return self.runtime_constraint_mem_unit
421         elif self.detected_crunch1:
422             return JobSummarizer.runtime_constraint_mem_unit
423         else:
424             return ContainerSummarizer.runtime_constraint_mem_unit
425
426     def _map_runtime_constraint(self, key):
427         if hasattr(self, 'map_runtime_constraint'):
428             return self.map_runtime_constraint[key]
429         elif self.detected_crunch1:
430             return JobSummarizer.map_runtime_constraint[key]
431         else:
432             return key
433
434
435 class CollectionSummarizer(Summarizer):
436     def __init__(self, collection_id, **kwargs):
437         super(CollectionSummarizer, self).__init__(
438             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
439         self.label = collection_id
440
441
442 def NewSummarizer(process_or_uuid, **kwargs):
443     """Construct with the appropriate subclass for this uuid/object."""
444
445     if isinstance(process_or_uuid, dict):
446         process = process_or_uuid
447         uuid = process['uuid']
448     else:
449         uuid = process_or_uuid
450         process = None
451         arv = arvados.api('v1', model=OrderedJsonModel())
452
453     if '-dz642-' in uuid:
454         if process is None:
455             process = arv.containers().get(uuid=uuid).execute()
456         klass = ContainerTreeSummarizer
457     elif '-xvhdp-' in uuid:
458         if process is None:
459             process = arv.container_requests().get(uuid=uuid).execute()
460         klass = ContainerTreeSummarizer
461     elif '-8i9sb-' in uuid:
462         if process is None:
463             process = arv.jobs().get(uuid=uuid).execute()
464         klass = JobTreeSummarizer
465     elif '-d1hrv-' in uuid:
466         if process is None:
467             process = arv.pipeline_instances().get(uuid=uuid).execute()
468         klass = PipelineSummarizer
469     elif '-4zz18-' in uuid:
470         return CollectionSummarizer(collection_id=uuid)
471     else:
472         raise ArgumentError("Unrecognized uuid %s", uuid)
473     return klass(process, uuid=uuid, **kwargs)
474
475
476 class ProcessSummarizer(Summarizer):
477     """Process is a job, pipeline, container, or container request."""
478
479     def __init__(self, process, label=None, **kwargs):
480         rdr = None
481         self.process = process
482         if label is None:
483             label = self.process.get('name', self.process['uuid'])
484         if self.process.get('log'):
485             try:
486                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
487             except arvados.errors.NotFoundError as e:
488                 logger.warning("Trying event logs after failing to read "
489                                "log collection %s: %s", self.process['log'], e)
490         if rdr is None:
491             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
492             label = label + ' (partial)'
493         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
494         self.existing_constraints = self.process.get('runtime_constraints', {})
495
496
497 class JobSummarizer(ProcessSummarizer):
498     runtime_constraint_mem_unit = 1048576
499     map_runtime_constraint = {
500         'keep_cache_ram': 'keep_cache_mb_per_task',
501         'ram': 'min_ram_mb_per_node',
502         'vcpus': 'min_cores_per_node',
503     }
504
505
506 class ContainerSummarizer(ProcessSummarizer):
507     runtime_constraint_mem_unit = 1
508
509
510 class MultiSummarizer(object):
511     def __init__(self, children={}, label=None, threads=1, **kwargs):
512         self.throttle = threading.Semaphore(threads)
513         self.children = children
514         self.label = label
515
516     def run_and_release(self, target, *args, **kwargs):
517         try:
518             return target(*args, **kwargs)
519         finally:
520             self.throttle.release()
521
522     def run(self):
523         threads = []
524         for child in self.children.itervalues():
525             self.throttle.acquire()
526             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
527             t.daemon = True
528             t.start()
529             threads.append(t)
530         for t in threads:
531             t.join()
532
533     def text_report(self):
534         txt = ''
535         d = self._descendants()
536         for child in d.itervalues():
537             if len(d) > 1:
538                 txt += '### Summary for {} ({})\n'.format(
539                     child.label, child.process['uuid'])
540             txt += child.text_report()
541             txt += '\n'
542         return txt
543
544     def _descendants(self):
545         """Dict of self and all descendants.
546
547         Nodes with nothing of their own to report (like
548         MultiSummarizers) are omitted.
549         """
550         d = collections.OrderedDict()
551         for key, child in self.children.iteritems():
552             if isinstance(child, Summarizer):
553                 d[key] = child
554             if isinstance(child, MultiSummarizer):
555                 d.update(child._descendants())
556         return d
557
558     def html_report(self):
559         return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
560
561
562 class JobTreeSummarizer(MultiSummarizer):
563     """Summarizes a job and all children listed in its components field."""
564     def __init__(self, job, label=None, **kwargs):
565         arv = arvados.api('v1', model=OrderedJsonModel())
566         label = label or job.get('name', job['uuid'])
567         children = collections.OrderedDict()
568         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
569         if job.get('components', None):
570             preloaded = {}
571             for j in arv.jobs().index(
572                     limit=len(job['components']),
573                     filters=[['uuid','in',job['components'].values()]]).execute()['items']:
574                 preloaded[j['uuid']] = j
575             for cname in sorted(job['components'].keys()):
576                 child_uuid = job['components'][cname]
577                 j = (preloaded.get(child_uuid) or
578                      arv.jobs().get(uuid=child_uuid).execute())
579                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
580
581         super(JobTreeSummarizer, self).__init__(
582             children=children,
583             label=label,
584             **kwargs)
585
586
587 class PipelineSummarizer(MultiSummarizer):
588     def __init__(self, instance, **kwargs):
589         children = collections.OrderedDict()
590         for cname, component in instance['components'].iteritems():
591             if 'job' not in component:
592                 logger.warning(
593                     "%s: skipping component with no job assigned", cname)
594             else:
595                 logger.info(
596                     "%s: job %s", cname, component['job']['uuid'])
597                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
598                 summarizer.label = '{} {}'.format(
599                     cname, component['job']['uuid'])
600                 children[cname] = summarizer
601         super(PipelineSummarizer, self).__init__(
602             children=children,
603             label=instance['uuid'],
604             **kwargs)
605
606
607 class ContainerTreeSummarizer(MultiSummarizer):
608     def __init__(self, root, skip_child_jobs=False, **kwargs):
609         arv = arvados.api('v1', model=OrderedJsonModel())
610
611         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
612         root['name'] = label
613
614         children = collections.OrderedDict()
615         todo = collections.deque((root, ))
616         while len(todo) > 0:
617             current = todo.popleft()
618             label = current['name']
619             sort_key = current['created_at']
620             if current['uuid'].find('-xvhdp-') > 0:
621                 current = arv.containers().get(uuid=current['container_uuid']).execute()
622
623             summer = ContainerSummarizer(current, label=label, **kwargs)
624             summer.sort_key = sort_key
625             children[current['uuid']] = summer
626
627             page_filters = []
628             while True:
629                 child_crs = arv.container_requests().index(
630                     order=['uuid asc'],
631                     filters=page_filters+[
632                         ['requesting_container_uuid', '=', current['uuid']]],
633                 ).execute()
634                 if not child_crs['items']:
635                     break
636                 elif skip_child_jobs:
637                     logger.warning('%s: omitting stats from %d child containers'
638                                    ' because --skip-child-jobs flag is on',
639                                    label, child_crs['items_available'])
640                     break
641                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
642                 for cr in child_crs['items']:
643                     if cr['container_uuid']:
644                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
645                         cr['name'] = cr.get('name') or cr['uuid']
646                         todo.append(cr)
647         sorted_children = collections.OrderedDict()
648         for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
649             sorted_children[uuid] = children[uuid]
650         super(ContainerTreeSummarizer, self).__init__(
651             children=sorted_children,
652             label=root['name'],
653             **kwargs)