8123: Explain mysterious memory constraint logic.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import datetime
7 import functools
8 import itertools
9 import math
10 import re
11 import sys
12
13 from arvados.api import OrderedJsonModel
14 from crunchstat_summary import logger
15
16 # Recommend memory constraints that are this multiple of an integral
17 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
18 # that have amounts like 7.5 GiB according to the kernel.)
19 AVAILABLE_RAM_RATIO = 0.95
20
21
22 class Task(object):
23     def __init__(self):
24         self.starttime = None
25         self.series = collections.defaultdict(list)
26
27
28 class Summarizer(object):
29     existing_constraints = {}
30
31     def __init__(self, logdata, label=None, include_child_jobs=True):
32         self._logdata = logdata
33
34         self.label = label
35         self.starttime = None
36         self.finishtime = None
37         self._include_child_jobs = include_child_jobs
38
39         # stats_max: {category: {stat: val}}
40         self.stats_max = collections.defaultdict(
41             functools.partial(collections.defaultdict,
42                               lambda: float('-Inf')))
43         # task_stats: {task_id: {category: {stat: val}}}
44         self.task_stats = collections.defaultdict(
45             functools.partial(collections.defaultdict, dict))
46
47         self.seq_to_uuid = {}
48         self.tasks = collections.defaultdict(Task)
49
50         logger.debug("%s: logdata %s", self.label, repr(logdata))
51
52     def run(self):
53         logger.debug("%s: parsing log data", self.label)
54         for line in self._logdata:
55             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
56             if m:
57                 seq = int(m.group('seq'))
58                 uuid = m.group('task_uuid')
59                 self.seq_to_uuid[seq] = uuid
60                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
61                 continue
62
63             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
64             if m:
65                 task_id = self.seq_to_uuid[int(m.group('seq'))]
66                 elapsed = int(m.group('elapsed'))
67                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
68                 if elapsed > self.stats_max['time']['elapsed']:
69                     self.stats_max['time']['elapsed'] = elapsed
70                 continue
71
72             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
73             if m:
74                 uuid = m.group('uuid')
75                 if not self._include_child_jobs:
76                     logger.warning('%s: omitting %s (try --include-child-job)',
77                                    self.label, uuid)
78                     continue
79                 logger.debug('%s: follow %s', self.label, uuid)
80                 child_summarizer = JobSummarizer(uuid)
81                 child_summarizer.stats_max = self.stats_max
82                 child_summarizer.task_stats = self.task_stats
83                 child_summarizer.tasks = self.tasks
84                 child_summarizer.run()
85                 logger.debug('%s: done %s', self.label, uuid)
86                 continue
87
88             m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
89             if not m:
90                 continue
91
92             if self.label is None:
93                 self.label = m.group('job_uuid')
94                 logger.debug('%s: using job uuid as label', self.label)
95             if m.group('category').endswith(':'):
96                 # "stderr crunchstat: notice: ..."
97                 continue
98             elif m.group('category') in ('error', 'caught'):
99                 continue
100             elif m.group('category') == 'read':
101                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
102                 # (crunchstat formatting fixed, but old logs still say this)
103                 continue
104             task_id = self.seq_to_uuid[int(m.group('seq'))]
105             task = self.tasks[task_id]
106
107             # Use the first and last crunchstat timestamps as
108             # approximations of starttime and finishtime.
109             timestamp = datetime.datetime.strptime(
110                 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
111             if not task.starttime:
112                 task.starttime = timestamp
113                 logger.debug('%s: task %s starttime %s',
114                              self.label, task_id, timestamp)
115             task.finishtime = timestamp
116
117             if not self.starttime:
118                 self.starttime = timestamp
119             self.finishtime = timestamp
120
121             this_interval_s = None
122             for group in ['current', 'interval']:
123                 if not m.group(group):
124                     continue
125                 category = m.group('category')
126                 words = m.group(group).split(' ')
127                 stats = {}
128                 for val, stat in zip(words[::2], words[1::2]):
129                     try:
130                         if '.' in val:
131                             stats[stat] = float(val)
132                         else:
133                             stats[stat] = int(val)
134                     except ValueError as e:
135                         raise ValueError(
136                             'Error parsing {} stat in "{}": {!r}'.format(
137                                 stat, line, e))
138                 if 'user' in stats or 'sys' in stats:
139                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
140                 if 'tx' in stats or 'rx' in stats:
141                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
142                 for stat, val in stats.iteritems():
143                     if group == 'interval':
144                         if stat == 'seconds':
145                             this_interval_s = val
146                             continue
147                         elif not (this_interval_s > 0):
148                             logger.error(
149                                 "BUG? interval stat given with duration {!r}".
150                                 format(this_interval_s))
151                             continue
152                         else:
153                             stat = stat + '__rate'
154                             val = val / this_interval_s
155                             if stat in ['user+sys__rate', 'tx+rx__rate']:
156                                 task.series[category, stat].append(
157                                     (timestamp - task.starttime, val))
158                     else:
159                         if stat in ['rss']:
160                             task.series[category, stat].append(
161                                 (timestamp - task.starttime, val))
162                         self.task_stats[task_id][category][stat] = val
163                     if val > self.stats_max[category][stat]:
164                         self.stats_max[category][stat] = val
165         logger.debug('%s: done parsing', self.label)
166
167         self.job_tot = collections.defaultdict(
168             functools.partial(collections.defaultdict, int))
169         for task_id, task_stat in self.task_stats.iteritems():
170             for category, stat_last in task_stat.iteritems():
171                 for stat, val in stat_last.iteritems():
172                     if stat in ['cpus', 'cache', 'swap', 'rss']:
173                         # meaningless stats like 16 cpu cores x 5 tasks = 80
174                         continue
175                     self.job_tot[category][stat] += val
176         logger.debug('%s: done totals', self.label)
177
178     def long_label(self):
179         label = self.label
180         if self.finishtime:
181             label += ' -- elapsed time '
182             s = (self.finishtime - self.starttime).total_seconds()
183             if s > 86400:
184                 label += '{}d'.format(int(s/86400))
185             if s > 3600:
186                 label += '{}h'.format(int(s/3600) % 24)
187             if s > 60:
188                 label += '{}m'.format(int(s/60) % 60)
189             label += '{}s'.format(int(s) % 60)
190         return label
191
192     def text_report(self):
193         return "\n".join(itertools.chain(
194             self._text_report_gen(),
195             self._recommend_gen())) + "\n"
196
197     def html_report(self):
198         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
199
200     def _text_report_gen(self):
201         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
202         for category, stat_max in sorted(self.stats_max.iteritems()):
203             for stat, val in sorted(stat_max.iteritems()):
204                 if stat.endswith('__rate'):
205                     continue
206                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
207                 val = self._format(val)
208                 tot = self._format(self.job_tot[category].get(stat, '-'))
209                 yield "\t".join([category, stat, str(val), max_rate, tot])
210         for args in (
211                 ('Number of tasks: {}',
212                  len(self.tasks),
213                  None),
214                 ('Max CPU time spent by a single task: {}s',
215                  self.stats_max['cpu']['user+sys'],
216                  None),
217                 ('Max CPU usage in a single interval: {}%',
218                  self.stats_max['cpu']['user+sys__rate'],
219                  lambda x: x * 100),
220                 ('Overall CPU usage: {}%',
221                  self.job_tot['cpu']['user+sys'] /
222                  self.job_tot['time']['elapsed'],
223                  lambda x: x * 100),
224                 ('Max memory used by a single task: {}GB',
225                  self.stats_max['mem']['rss'],
226                  lambda x: x / 1e9),
227                 ('Max network traffic in a single task: {}GB',
228                  self.stats_max['net:eth0']['tx+rx'],
229                  lambda x: x / 1e9),
230                 ('Max network speed in a single interval: {}MB/s',
231                  self.stats_max['net:eth0']['tx+rx__rate'],
232                  lambda x: x / 1e6)):
233             format_string, val, transform = args
234             if val == float('-Inf'):
235                 continue
236             if transform:
237                 val = transform(val)
238             yield "# "+format_string.format(self._format(val))
239
240     def _recommend_gen(self):
241         return itertools.chain(
242             self._recommend_cpu(),
243             self._recommend_ram())
244
245     def _recommend_cpu(self):
246         """Recommend asking for 4 cores if max CPU usage was 333%"""
247
248         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
249         if cpu_max_rate == float('-Inf'):
250             logger.warning('%s: no CPU usage data', self.label)
251             return
252         used_cores = int(math.ceil(cpu_max_rate))
253         asked_cores =  self.existing_constraints.get('min_cores_per_node')
254         if asked_cores is None or used_cores < asked_cores:
255             yield (
256                 '#!! {} max CPU usage was {}% -- '
257                 'try runtime_constraints "min_cores_per_node":{}'
258             ).format(
259                 self.label,
260                 int(math.ceil(cpu_max_rate*100)),
261                 int(used_cores))
262
263     def _recommend_ram(self):
264         """Recommend an economical RAM constraint for this job.
265
266         Nodes that are advertised as "8 gibibytes" actually have what
267         we might call "8 nearlygibs" of memory available for jobs.
268         Here, we calculate a whole number of nearlygibs that would
269         have sufficed to run the job, then recommend requesting a node
270         with that number of nearlygibs (expressed as mebibytes).
271
272         Requesting a node with "nearly 8 gibibytes" is our best hope
273         of getting a node that actually has nearly 8 gibibytes
274         available.  If the node manager is smart enough to account for
275         the discrepancy itself when choosing/creating a node, we'll
276         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
277         advertised size of the next-size-smaller node (say, 6 GiB)
278         will be too low to satisfy our request, so we will effectively
279         get rounded up to 8 GiB.
280
281         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
282         we will generally get a node that is advertised as "8 GiB" and
283         has at least 7500 MiB available.  However, asking for 8192 MiB
284         would either result in an unnecessarily expensive 12 GiB node
285         (if node manager knows about the discrepancy), or an 8 GiB
286         node which has less than 8192 MiB available and is therefore
287         considered by crunch-dispatch to be too small to meet our
288         constraint.
289
290         When node manager learns how to predict the available memory
291         for each node type such that crunch-dispatch always agrees
292         that a node is big enough to run the job it was brought up
293         for, all this will be unnecessary.  We'll just ask for exactly
294         the memory we want -- even if that happens to be 8192 MiB.
295         """
296
297         used_bytes = self.stats_max['mem']['rss']
298         if used_bytes == float('-Inf'):
299             logger.warning('%s: no memory usage data', self.label)
300             return
301         used_mib = math.ceil(float(used_bytes) / 1048576)
302         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
303
304         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
305         if asked_mib is None or (
306                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
307             yield (
308                 '#!! {} max RSS was {} MiB -- '
309                 'try runtime_constraints "min_ram_mb_per_node":{}'
310             ).format(
311                 self.label,
312                 int(used_mib),
313                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
314
315     def _format(self, val):
316         """Return a string representation of a stat.
317
318         {:.2f} for floats, default format for everything else."""
319         if isinstance(val, float):
320             return '{:.2f}'.format(val)
321         else:
322             return '{}'.format(val)
323
324
325 class CollectionSummarizer(Summarizer):
326     def __init__(self, collection_id, **kwargs):
327         logger.debug('load collection %s', collection_id)
328         collection = arvados.collection.CollectionReader(collection_id)
329         filenames = [filename for filename in collection]
330         if len(filenames) != 1:
331             raise ValueError(
332                 "collection {} has {} files; need exactly one".format(
333                     collection_id, len(filenames)))
334         super(CollectionSummarizer, self).__init__(
335             collection.open(filenames[0]), **kwargs)
336         self.label = collection_id
337
338
339 class JobSummarizer(CollectionSummarizer):
340     def __init__(self, job, **kwargs):
341         arv = arvados.api('v1')
342         if isinstance(job, str):
343             self.job = arv.jobs().get(uuid=job).execute()
344         else:
345             self.job = job
346         self.label = self.job['uuid']
347         self.existing_constraints = self.job.get('runtime_constraints', {})
348         if not self.job['log']:
349             raise ValueError(
350                 "job {} has no log; live summary not implemented".format(
351                     self.job['uuid']))
352         super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
353         self.label = self.job['uuid']
354
355
356 class PipelineSummarizer():
357     def __init__(self, pipeline_instance_uuid, **kwargs):
358         arv = arvados.api('v1', model=OrderedJsonModel())
359         instance = arv.pipeline_instances().get(
360             uuid=pipeline_instance_uuid).execute()
361         self.summarizers = collections.OrderedDict()
362         for cname, component in instance['components'].iteritems():
363             if 'job' not in component:
364                 logger.warning(
365                     "%s: skipping component with no job assigned", cname)
366             elif component['job'].get('log') is None:
367                 logger.warning(
368                     "%s: skipping job %s with no log available",
369                     cname, component['job'].get('uuid'))
370             else:
371                 logger.info(
372                     "%s: logdata %s", cname, component['job']['log'])
373                 summarizer = JobSummarizer(component['job'], **kwargs)
374                 summarizer.label = cname
375                 self.summarizers[cname] = summarizer
376         self.label = pipeline_instance_uuid
377
378     def run(self):
379         for summarizer in self.summarizers.itervalues():
380             summarizer.run()
381
382     def text_report(self):
383         txt = ''
384         for cname, summarizer in self.summarizers.iteritems():
385             txt += '### Summary for {} ({})\n'.format(
386                 cname, summarizer.job['uuid'])
387             txt += summarizer.text_report()
388             txt += '\n'
389         return txt
390
391     def html_report(self):
392         return crunchstat_summary.chartjs.ChartJS(
393             self.label, self.summarizers.itervalues()).html()