8123: Generate multiple sets of charts when data source is a pipeline instance.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import datetime
7 import functools
8 import itertools
9 import math
10 import re
11 import sys
12
13 from arvados.api import OrderedJsonModel
14 from crunchstat_summary import logger
15
16 # Recommend memory constraints that are this multiple of an integral
17 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
18 # that have amounts like 7.5 GiB according to the kernel.)
19 AVAILABLE_RAM_RATIO = 0.95
20
21
22 class Task(object):
23     def __init__(self):
24         self.starttime = None
25         self.series = collections.defaultdict(list)
26
27
28 class Summarizer(object):
29     existing_constraints = {}
30
31     def __init__(self, logdata, label=None):
32         self._logdata = logdata
33         self.label = label
34         self.starttime = None
35         self.finishtime = None
36         logger.debug("%s: logdata %s", self.label, repr(logdata))
37
38     def run(self):
39         logger.debug("%s: parsing log data", self.label)
40         # stats_max: {category: {stat: val}}
41         self.stats_max = collections.defaultdict(
42             functools.partial(collections.defaultdict,
43                               lambda: float('-Inf')))
44         # task_stats: {task_id: {category: {stat: val}}}
45         self.task_stats = collections.defaultdict(
46             functools.partial(collections.defaultdict, dict))
47         self.tasks = collections.defaultdict(Task)
48         for line in self._logdata:
49             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
50             if m:
51                 task_id = m.group('seq')
52                 elapsed = int(m.group('elapsed'))
53                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
54                 if elapsed > self.stats_max['time']['elapsed']:
55                     self.stats_max['time']['elapsed'] = elapsed
56                 continue
57             m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
58             if not m:
59                 continue
60             if self.label is None:
61                 self.label = m.group('job_uuid')
62                 logger.debug('%s: using job uuid as label', self.label)
63             if m.group('category').endswith(':'):
64                 # "notice:" etc.
65                 continue
66             elif m.group('category') == 'error':
67                 continue
68             task_id = m.group('seq')
69             task = self.tasks[task_id]
70
71             # Use the first and last crunchstat timestamps as
72             # approximations of starttime and finishtime.
73             timestamp = datetime.datetime.strptime(
74                 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
75             if not task.starttime:
76                 task.starttime = timestamp
77                 logger.debug('%s: task %s starttime %s',
78                              self.label, task_id, timestamp)
79             task.finishtime = timestamp
80
81             if not self.starttime:
82                 self.starttime = timestamp
83             self.finishtime = timestamp
84
85             this_interval_s = None
86             for group in ['current', 'interval']:
87                 if not m.group(group):
88                     continue
89                 category = m.group('category')
90                 words = m.group(group).split(' ')
91                 stats = {}
92                 for val, stat in zip(words[::2], words[1::2]):
93                     try:
94                         if '.' in val:
95                             stats[stat] = float(val)
96                         else:
97                             stats[stat] = int(val)
98                     except ValueError as e:
99                         raise ValueError(
100                             'Error parsing {} stat in "{}": {!r}'.format(
101                                 stat, line, e))
102                 if 'user' in stats or 'sys' in stats:
103                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
104                 if 'tx' in stats or 'rx' in stats:
105                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
106                 for stat, val in stats.iteritems():
107                     if group == 'interval':
108                         if stat == 'seconds':
109                             this_interval_s = val
110                             continue
111                         elif not (this_interval_s > 0):
112                             logger.error(
113                                 "BUG? interval stat given with duration {!r}".
114                                 format(this_interval_s))
115                             continue
116                         else:
117                             stat = stat + '__rate'
118                             val = val / this_interval_s
119                             if stat in ['user+sys__rate', 'tx+rx__rate']:
120                                 task.series[category, stat].append(
121                                     (timestamp - task.starttime, val))
122                     else:
123                         if stat in ['rss']:
124                             task.series[category, stat].append(
125                                 (timestamp - task.starttime, val))
126                         self.task_stats[task_id][category][stat] = val
127                     if val > self.stats_max[category][stat]:
128                         self.stats_max[category][stat] = val
129         self.job_tot = collections.defaultdict(
130             functools.partial(collections.defaultdict, int))
131         for task_id, task_stat in self.task_stats.iteritems():
132             for category, stat_last in task_stat.iteritems():
133                 for stat, val in stat_last.iteritems():
134                     if stat in ['cpus', 'cache', 'swap', 'rss']:
135                         # meaningless stats like 16 cpu cores x 5 tasks = 80
136                         continue
137                     self.job_tot[category][stat] += val
138
139     def long_label(self):
140         label = self.label
141         if self.finishtime:
142             label += ' -- elapsed time '
143             s = (self.finishtime - self.starttime).total_seconds()
144             if s > 86400:
145                 label += '{}d'.format(int(s/86400))
146             if s > 3600:
147                 label += '{}h'.format(int(s/3600) % 24)
148             if s > 60:
149                 label += '{}m'.format(int(s/60) % 60)
150             label += '{}s'.format(int(s) % 60)
151         return label
152
153     def text_report(self):
154         return "\n".join(itertools.chain(
155             self._text_report_gen(),
156             self._recommend_gen())) + "\n"
157
158     def html_report(self):
159         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
160
161     def _text_report_gen(self):
162         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
163         for category, stat_max in sorted(self.stats_max.iteritems()):
164             for stat, val in sorted(stat_max.iteritems()):
165                 if stat.endswith('__rate'):
166                     continue
167                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
168                 val = self._format(val)
169                 tot = self._format(self.job_tot[category].get(stat, '-'))
170                 yield "\t".join([category, stat, str(val), max_rate, tot])
171         for args in (
172                 ('Max CPU time spent by a single task: {}s',
173                  self.stats_max['cpu']['user+sys'],
174                  None),
175                 ('Max CPU usage in a single interval: {}%',
176                  self.stats_max['cpu']['user+sys__rate'],
177                  lambda x: x * 100),
178                 ('Overall CPU usage: {}%',
179                  self.job_tot['cpu']['user+sys'] /
180                  self.job_tot['time']['elapsed'],
181                  lambda x: x * 100),
182                 ('Max memory used by a single task: {}GB',
183                  self.stats_max['mem']['rss'],
184                  lambda x: x / 1e9),
185                 ('Max network traffic in a single task: {}GB',
186                  self.stats_max['net:eth0']['tx+rx'],
187                  lambda x: x / 1e9),
188                 ('Max network speed in a single interval: {}MB/s',
189                  self.stats_max['net:eth0']['tx+rx__rate'],
190                  lambda x: x / 1e6)):
191             format_string, val, transform = args
192             if val == float('-Inf'):
193                 continue
194             if transform:
195                 val = transform(val)
196             yield "# "+format_string.format(self._format(val))
197
198     def _recommend_gen(self):
199         return itertools.chain(
200             self._recommend_cpu(),
201             self._recommend_ram())
202
203     def _recommend_cpu(self):
204         """Recommend asking for 4 cores if max CPU usage was 333%"""
205
206         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
207         if cpu_max_rate == float('-Inf'):
208             logger.warning('%s: no CPU usage data', self.label)
209             return
210         used_cores = int(math.ceil(cpu_max_rate))
211         asked_cores =  self.existing_constraints.get('min_cores_per_node')
212         if asked_cores is None or used_cores < asked_cores:
213             yield (
214                 '#!! {} max CPU usage was {}% -- '
215                 'try runtime_constraints "min_cores_per_node":{}'
216             ).format(
217                 self.label,
218                 int(math.ceil(cpu_max_rate*100)),
219                 int(used_cores))
220
221     def _recommend_ram(self):
222         """Recommend asking for (2048*0.95) MiB RAM if max rss was 1248 MiB"""
223
224         used_ram = self.stats_max['mem']['rss']
225         if used_ram == float('-Inf'):
226             logger.warning('%s: no memory usage data', self.label)
227             return
228         used_ram = math.ceil(float(used_ram) / (1<<20))
229         asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
230         if asked_ram is None or (
231                 math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10)) <
232                 (asked_ram/AVAILABLE_RAM_RATIO)/(1<<10)):
233             yield (
234                 '#!! {} max RSS was {} MiB -- '
235                 'try runtime_constraints "min_ram_mb_per_node":{}'
236             ).format(
237                 self.label,
238                 int(used_ram),
239                 int(math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10))*(1<<10)*AVAILABLE_RAM_RATIO))
240
241     def _format(self, val):
242         """Return a string representation of a stat.
243
244         {:.2f} for floats, default format for everything else."""
245         if isinstance(val, float):
246             return '{:.2f}'.format(val)
247         else:
248             return '{}'.format(val)
249
250
251 class CollectionSummarizer(Summarizer):
252     def __init__(self, collection_id):
253         collection = arvados.collection.CollectionReader(collection_id)
254         filenames = [filename for filename in collection]
255         if len(filenames) != 1:
256             raise ValueError(
257                 "collection {} has {} files; need exactly one".format(
258                     collection_id, len(filenames)))
259         super(CollectionSummarizer, self).__init__(
260             collection.open(filenames[0]))
261         self.label = collection_id
262
263
264 class JobSummarizer(CollectionSummarizer):
265     def __init__(self, job):
266         arv = arvados.api('v1')
267         if isinstance(job, str):
268             self.job = arv.jobs().get(uuid=job).execute()
269         else:
270             self.job = job
271         self.label = self.job['uuid']
272         self.existing_constraints = self.job.get('runtime_constraints', {})
273         if not self.job['log']:
274             raise ValueError(
275                 "job {} has no log; live summary not implemented".format(
276                     self.job['uuid']))
277         super(JobSummarizer, self).__init__(self.job['log'])
278         self.label = self.job['uuid']
279
280
281 class PipelineSummarizer():
282     def __init__(self, pipeline_instance_uuid):
283         arv = arvados.api('v1', model=OrderedJsonModel())
284         instance = arv.pipeline_instances().get(
285             uuid=pipeline_instance_uuid).execute()
286         self.summarizers = collections.OrderedDict()
287         for cname, component in instance['components'].iteritems():
288             if 'job' not in component:
289                 logger.warning(
290                     "%s: skipping component with no job assigned", cname)
291             elif component['job'].get('log') is None:
292                 logger.warning(
293                     "%s: skipping job %s with no log available",
294                     cname, component['job'].get('uuid'))
295             else:
296                 logger.info(
297                     "%s: logdata %s", cname, component['job']['log'])
298                 summarizer = JobSummarizer(component['job'])
299                 summarizer.label = cname
300                 self.summarizers[cname] = summarizer
301         self.label = pipeline_instance_uuid
302
303     def run(self):
304         for summarizer in self.summarizers.itervalues():
305             summarizer.run()
306
307     def text_report(self):
308         txt = ''
309         for cname, summarizer in self.summarizers.iteritems():
310             txt += '### Summary for {} ({})\n'.format(
311                 cname, summarizer.job['uuid'])
312             txt += summarizer.text_report()
313             txt += '\n'
314         return txt
315
316     def html_report(self):
317         return crunchstat_summary.chartjs.ChartJS(
318             self.label, self.summarizers.itervalues()).html()