ccee1e24d20e045c2f6cfd9b171167a182e57c96
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import datetime
7 import functools
8 import itertools
9 import math
10 import re
11 import sys
12
13 from arvados.api import OrderedJsonModel
14 from crunchstat_summary import logger
15
16 # Recommend memory constraints that are this multiple of an integral
17 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
18 # that have amounts like 7.5 GiB according to the kernel.)
19 AVAILABLE_RAM_RATIO = 0.95
20
21
22 class Task(object):
23     def __init__(self):
24         self.starttime = None
25         self.series = collections.defaultdict(list)
26
27
28 class Summarizer(object):
29     existing_constraints = {}
30
31     def __init__(self, logdata, label=None, include_child_jobs=True):
32         self._logdata = logdata
33
34         self.label = label
35         self.starttime = None
36         self.finishtime = None
37         self._include_child_jobs = include_child_jobs
38
39         # stats_max: {category: {stat: val}}
40         self.stats_max = collections.defaultdict(
41             functools.partial(collections.defaultdict,
42                               lambda: float('-Inf')))
43         # task_stats: {task_id: {category: {stat: val}}}
44         self.task_stats = collections.defaultdict(
45             functools.partial(collections.defaultdict, dict))
46
47         self.seq_to_uuid = {}
48         self.tasks = collections.defaultdict(Task)
49
50         logger.debug("%s: logdata %s", self.label, repr(logdata))
51
52     def run(self):
53         logger.debug("%s: parsing log data", self.label)
54         for line in self._logdata:
55             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
56             if m:
57                 seq = int(m.group('seq'))
58                 uuid = m.group('task_uuid')
59                 self.seq_to_uuid[seq] = uuid
60                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
61                 continue
62
63             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
64             if m:
65                 task_id = self.seq_to_uuid[int(m.group('seq'))]
66                 elapsed = int(m.group('elapsed'))
67                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
68                 if elapsed > self.stats_max['time']['elapsed']:
69                     self.stats_max['time']['elapsed'] = elapsed
70                 continue
71
72             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
73             if m:
74                 uuid = m.group('uuid')
75                 if not self._include_child_jobs:
76                     logger.warning('%s: omitting %s (try --include-child-job)',
77                                    self.label, uuid)
78                     continue
79                 logger.debug('%s: follow %s', self.label, uuid)
80                 child_summarizer = JobSummarizer(uuid)
81                 child_summarizer.stats_max = self.stats_max
82                 child_summarizer.task_stats = self.task_stats
83                 child_summarizer.tasks = self.tasks
84                 child_summarizer.run()
85                 logger.debug('%s: done %s', self.label, uuid)
86                 continue
87
88             m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
89             if not m:
90                 continue
91
92             if self.label is None:
93                 self.label = m.group('job_uuid')
94                 logger.debug('%s: using job uuid as label', self.label)
95             if m.group('category').endswith(':'):
96                 # "notice:" etc.
97                 continue
98             elif m.group('category') == 'error':
99                 continue
100             task_id = self.seq_to_uuid[int(m.group('seq'))]
101             task = self.tasks[task_id]
102
103             # Use the first and last crunchstat timestamps as
104             # approximations of starttime and finishtime.
105             timestamp = datetime.datetime.strptime(
106                 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
107             if not task.starttime:
108                 task.starttime = timestamp
109                 logger.debug('%s: task %s starttime %s',
110                              self.label, task_id, timestamp)
111             task.finishtime = timestamp
112
113             if not self.starttime:
114                 self.starttime = timestamp
115             self.finishtime = timestamp
116
117             this_interval_s = None
118             for group in ['current', 'interval']:
119                 if not m.group(group):
120                     continue
121                 category = m.group('category')
122                 words = m.group(group).split(' ')
123                 stats = {}
124                 for val, stat in zip(words[::2], words[1::2]):
125                     try:
126                         if '.' in val:
127                             stats[stat] = float(val)
128                         else:
129                             stats[stat] = int(val)
130                     except ValueError as e:
131                         raise ValueError(
132                             'Error parsing {} stat in "{}": {!r}'.format(
133                                 stat, line, e))
134                 if 'user' in stats or 'sys' in stats:
135                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
136                 if 'tx' in stats or 'rx' in stats:
137                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
138                 for stat, val in stats.iteritems():
139                     if group == 'interval':
140                         if stat == 'seconds':
141                             this_interval_s = val
142                             continue
143                         elif not (this_interval_s > 0):
144                             logger.error(
145                                 "BUG? interval stat given with duration {!r}".
146                                 format(this_interval_s))
147                             continue
148                         else:
149                             stat = stat + '__rate'
150                             val = val / this_interval_s
151                             if stat in ['user+sys__rate', 'tx+rx__rate']:
152                                 task.series[category, stat].append(
153                                     (timestamp - task.starttime, val))
154                     else:
155                         if stat in ['rss']:
156                             task.series[category, stat].append(
157                                 (timestamp - task.starttime, val))
158                         self.task_stats[task_id][category][stat] = val
159                     if val > self.stats_max[category][stat]:
160                         self.stats_max[category][stat] = val
161         logger.debug('%s: done parsing', self.label)
162
163         self.job_tot = collections.defaultdict(
164             functools.partial(collections.defaultdict, int))
165         for task_id, task_stat in self.task_stats.iteritems():
166             for category, stat_last in task_stat.iteritems():
167                 for stat, val in stat_last.iteritems():
168                     if stat in ['cpus', 'cache', 'swap', 'rss']:
169                         # meaningless stats like 16 cpu cores x 5 tasks = 80
170                         continue
171                     self.job_tot[category][stat] += val
172         logger.debug('%s: done totals', self.label)
173
174     def long_label(self):
175         label = self.label
176         if self.finishtime:
177             label += ' -- elapsed time '
178             s = (self.finishtime - self.starttime).total_seconds()
179             if s > 86400:
180                 label += '{}d'.format(int(s/86400))
181             if s > 3600:
182                 label += '{}h'.format(int(s/3600) % 24)
183             if s > 60:
184                 label += '{}m'.format(int(s/60) % 60)
185             label += '{}s'.format(int(s) % 60)
186         return label
187
188     def text_report(self):
189         return "\n".join(itertools.chain(
190             self._text_report_gen(),
191             self._recommend_gen())) + "\n"
192
193     def html_report(self):
194         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
195
196     def _text_report_gen(self):
197         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
198         for category, stat_max in sorted(self.stats_max.iteritems()):
199             for stat, val in sorted(stat_max.iteritems()):
200                 if stat.endswith('__rate'):
201                     continue
202                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
203                 val = self._format(val)
204                 tot = self._format(self.job_tot[category].get(stat, '-'))
205                 yield "\t".join([category, stat, str(val), max_rate, tot])
206         for args in (
207                 ('Number of tasks: {}',
208                  len(self.tasks),
209                  None),
210                 ('Max CPU time spent by a single task: {}s',
211                  self.stats_max['cpu']['user+sys'],
212                  None),
213                 ('Max CPU usage in a single interval: {}%',
214                  self.stats_max['cpu']['user+sys__rate'],
215                  lambda x: x * 100),
216                 ('Overall CPU usage: {}%',
217                  self.job_tot['cpu']['user+sys'] /
218                  self.job_tot['time']['elapsed'],
219                  lambda x: x * 100),
220                 ('Max memory used by a single task: {}GB',
221                  self.stats_max['mem']['rss'],
222                  lambda x: x / 1e9),
223                 ('Max network traffic in a single task: {}GB',
224                  self.stats_max['net:eth0']['tx+rx'],
225                  lambda x: x / 1e9),
226                 ('Max network speed in a single interval: {}MB/s',
227                  self.stats_max['net:eth0']['tx+rx__rate'],
228                  lambda x: x / 1e6)):
229             format_string, val, transform = args
230             if val == float('-Inf'):
231                 continue
232             if transform:
233                 val = transform(val)
234             yield "# "+format_string.format(self._format(val))
235
236     def _recommend_gen(self):
237         return itertools.chain(
238             self._recommend_cpu(),
239             self._recommend_ram())
240
241     def _recommend_cpu(self):
242         """Recommend asking for 4 cores if max CPU usage was 333%"""
243
244         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
245         if cpu_max_rate == float('-Inf'):
246             logger.warning('%s: no CPU usage data', self.label)
247             return
248         used_cores = int(math.ceil(cpu_max_rate))
249         asked_cores =  self.existing_constraints.get('min_cores_per_node')
250         if asked_cores is None or used_cores < asked_cores:
251             yield (
252                 '#!! {} max CPU usage was {}% -- '
253                 'try runtime_constraints "min_cores_per_node":{}'
254             ).format(
255                 self.label,
256                 int(math.ceil(cpu_max_rate*100)),
257                 int(used_cores))
258
259     def _recommend_ram(self):
260         """Recommend asking for (2048*0.95) MiB RAM if max rss was 1248 MiB"""
261
262         used_ram = self.stats_max['mem']['rss']
263         if used_ram == float('-Inf'):
264             logger.warning('%s: no memory usage data', self.label)
265             return
266         used_ram = math.ceil(float(used_ram) / (1<<20))
267         asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
268         if asked_ram is None or (
269                 math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10)) <
270                 (asked_ram/AVAILABLE_RAM_RATIO)/(1<<10)):
271             yield (
272                 '#!! {} max RSS was {} MiB -- '
273                 'try runtime_constraints "min_ram_mb_per_node":{}'
274             ).format(
275                 self.label,
276                 int(used_ram),
277                 int(math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10))*(1<<10)*AVAILABLE_RAM_RATIO))
278
279     def _format(self, val):
280         """Return a string representation of a stat.
281
282         {:.2f} for floats, default format for everything else."""
283         if isinstance(val, float):
284             return '{:.2f}'.format(val)
285         else:
286             return '{}'.format(val)
287
288
289 class CollectionSummarizer(Summarizer):
290     def __init__(self, collection_id, **kwargs):
291         logger.debug('load collection %s', collection_id)
292         collection = arvados.collection.CollectionReader(collection_id)
293         filenames = [filename for filename in collection]
294         if len(filenames) != 1:
295             raise ValueError(
296                 "collection {} has {} files; need exactly one".format(
297                     collection_id, len(filenames)))
298         super(CollectionSummarizer, self).__init__(
299             collection.open(filenames[0]), **kwargs)
300         self.label = collection_id
301
302
303 class JobSummarizer(CollectionSummarizer):
304     def __init__(self, job, **kwargs):
305         arv = arvados.api('v1')
306         if isinstance(job, str):
307             self.job = arv.jobs().get(uuid=job).execute()
308         else:
309             self.job = job
310         self.label = self.job['uuid']
311         self.existing_constraints = self.job.get('runtime_constraints', {})
312         if not self.job['log']:
313             raise ValueError(
314                 "job {} has no log; live summary not implemented".format(
315                     self.job['uuid']))
316         super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
317         self.label = self.job['uuid']
318
319
320 class PipelineSummarizer():
321     def __init__(self, pipeline_instance_uuid, **kwargs):
322         arv = arvados.api('v1', model=OrderedJsonModel())
323         instance = arv.pipeline_instances().get(
324             uuid=pipeline_instance_uuid).execute()
325         self.summarizers = collections.OrderedDict()
326         for cname, component in instance['components'].iteritems():
327             if 'job' not in component:
328                 logger.warning(
329                     "%s: skipping component with no job assigned", cname)
330             elif component['job'].get('log') is None:
331                 logger.warning(
332                     "%s: skipping job %s with no log available",
333                     cname, component['job'].get('uuid'))
334             else:
335                 logger.info(
336                     "%s: logdata %s", cname, component['job']['log'])
337                 summarizer = JobSummarizer(component['job'], **kwargs)
338                 summarizer.label = cname
339                 self.summarizers[cname] = summarizer
340         self.label = pipeline_instance_uuid
341
342     def run(self):
343         for summarizer in self.summarizers.itervalues():
344             summarizer.run()
345
346     def text_report(self):
347         txt = ''
348         for cname, summarizer in self.summarizers.iteritems():
349             txt += '### Summary for {} ({})\n'.format(
350                 cname, summarizer.job['uuid'])
351             txt += summarizer.text_report()
352             txt += '\n'
353         return txt
354
355     def html_report(self):
356         return crunchstat_summary.chartjs.ChartJS(
357             self.label, self.summarizers.itervalues()).html()