1 from __future__ import print_function
5 import crunchstat_summary.chartjs
13 from arvados.api import OrderedJsonModel
14 from crunchstat_summary import logger
16 # Recommend memory constraints that are this multiple of an integral
17 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
18 # that have amounts like 7.5 GiB according to the kernel.)
19 AVAILABLE_RAM_RATIO = 0.95
25 self.series = collections.defaultdict(list)
28 class Summarizer(object):
29 existing_constraints = {}
31 def __init__(self, logdata, label=None, include_child_jobs=True):
32 self._logdata = logdata
36 self.finishtime = None
37 self._include_child_jobs = include_child_jobs
39 # stats_max: {category: {stat: val}}
40 self.stats_max = collections.defaultdict(
41 functools.partial(collections.defaultdict,
42 lambda: float('-Inf')))
43 # task_stats: {task_id: {category: {stat: val}}}
44 self.task_stats = collections.defaultdict(
45 functools.partial(collections.defaultdict, dict))
48 self.tasks = collections.defaultdict(Task)
50 logger.debug("%s: logdata %s", self.label, repr(logdata))
53 logger.debug("%s: parsing log data", self.label)
54 for line in self._logdata:
55 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
57 seq = int(m.group('seq'))
58 uuid = m.group('task_uuid')
59 self.seq_to_uuid[seq] = uuid
60 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
63 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
65 task_id = self.seq_to_uuid[int(m.group('seq'))]
66 elapsed = int(m.group('elapsed'))
67 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
68 if elapsed > self.stats_max['time']['elapsed']:
69 self.stats_max['time']['elapsed'] = elapsed
72 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
74 uuid = m.group('uuid')
75 if not self._include_child_jobs:
76 logger.warning('%s: omitting %s (try --include-child-job)',
79 logger.debug('%s: follow %s', self.label, uuid)
80 child_summarizer = JobSummarizer(uuid)
81 child_summarizer.stats_max = self.stats_max
82 child_summarizer.task_stats = self.task_stats
83 child_summarizer.tasks = self.tasks
84 child_summarizer.run()
85 logger.debug('%s: done %s', self.label, uuid)
88 m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
92 if self.label is None:
93 self.label = m.group('job_uuid')
94 logger.debug('%s: using job uuid as label', self.label)
95 if m.group('category').endswith(':'):
98 elif m.group('category') == 'error':
100 task_id = self.seq_to_uuid[int(m.group('seq'))]
101 task = self.tasks[task_id]
103 # Use the first and last crunchstat timestamps as
104 # approximations of starttime and finishtime.
105 timestamp = datetime.datetime.strptime(
106 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
107 if not task.starttime:
108 task.starttime = timestamp
109 logger.debug('%s: task %s starttime %s',
110 self.label, task_id, timestamp)
111 task.finishtime = timestamp
113 if not self.starttime:
114 self.starttime = timestamp
115 self.finishtime = timestamp
117 this_interval_s = None
118 for group in ['current', 'interval']:
119 if not m.group(group):
121 category = m.group('category')
122 words = m.group(group).split(' ')
124 for val, stat in zip(words[::2], words[1::2]):
127 stats[stat] = float(val)
129 stats[stat] = int(val)
130 except ValueError as e:
132 'Error parsing {} stat in "{}": {!r}'.format(
134 if 'user' in stats or 'sys' in stats:
135 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
136 if 'tx' in stats or 'rx' in stats:
137 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
138 for stat, val in stats.iteritems():
139 if group == 'interval':
140 if stat == 'seconds':
141 this_interval_s = val
143 elif not (this_interval_s > 0):
145 "BUG? interval stat given with duration {!r}".
146 format(this_interval_s))
149 stat = stat + '__rate'
150 val = val / this_interval_s
151 if stat in ['user+sys__rate', 'tx+rx__rate']:
152 task.series[category, stat].append(
153 (timestamp - task.starttime, val))
156 task.series[category, stat].append(
157 (timestamp - task.starttime, val))
158 self.task_stats[task_id][category][stat] = val
159 if val > self.stats_max[category][stat]:
160 self.stats_max[category][stat] = val
161 logger.debug('%s: done parsing', self.label)
163 self.job_tot = collections.defaultdict(
164 functools.partial(collections.defaultdict, int))
165 for task_id, task_stat in self.task_stats.iteritems():
166 for category, stat_last in task_stat.iteritems():
167 for stat, val in stat_last.iteritems():
168 if stat in ['cpus', 'cache', 'swap', 'rss']:
169 # meaningless stats like 16 cpu cores x 5 tasks = 80
171 self.job_tot[category][stat] += val
172 logger.debug('%s: done totals', self.label)
174 def long_label(self):
177 label += ' -- elapsed time '
178 s = (self.finishtime - self.starttime).total_seconds()
180 label += '{}d'.format(int(s/86400))
182 label += '{}h'.format(int(s/3600) % 24)
184 label += '{}m'.format(int(s/60) % 60)
185 label += '{}s'.format(int(s) % 60)
188 def text_report(self):
189 return "\n".join(itertools.chain(
190 self._text_report_gen(),
191 self._recommend_gen())) + "\n"
193 def html_report(self):
194 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
196 def _text_report_gen(self):
197 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
198 for category, stat_max in sorted(self.stats_max.iteritems()):
199 for stat, val in sorted(stat_max.iteritems()):
200 if stat.endswith('__rate'):
202 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
203 val = self._format(val)
204 tot = self._format(self.job_tot[category].get(stat, '-'))
205 yield "\t".join([category, stat, str(val), max_rate, tot])
207 ('Number of tasks: {}',
210 ('Max CPU time spent by a single task: {}s',
211 self.stats_max['cpu']['user+sys'],
213 ('Max CPU usage in a single interval: {}%',
214 self.stats_max['cpu']['user+sys__rate'],
216 ('Overall CPU usage: {}%',
217 self.job_tot['cpu']['user+sys'] /
218 self.job_tot['time']['elapsed'],
220 ('Max memory used by a single task: {}GB',
221 self.stats_max['mem']['rss'],
223 ('Max network traffic in a single task: {}GB',
224 self.stats_max['net:eth0']['tx+rx'],
226 ('Max network speed in a single interval: {}MB/s',
227 self.stats_max['net:eth0']['tx+rx__rate'],
229 format_string, val, transform = args
230 if val == float('-Inf'):
234 yield "# "+format_string.format(self._format(val))
236 def _recommend_gen(self):
237 return itertools.chain(
238 self._recommend_cpu(),
239 self._recommend_ram())
241 def _recommend_cpu(self):
242 """Recommend asking for 4 cores if max CPU usage was 333%"""
244 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
245 if cpu_max_rate == float('-Inf'):
246 logger.warning('%s: no CPU usage data', self.label)
248 used_cores = int(math.ceil(cpu_max_rate))
249 asked_cores = self.existing_constraints.get('min_cores_per_node')
250 if asked_cores is None or used_cores < asked_cores:
252 '#!! {} max CPU usage was {}% -- '
253 'try runtime_constraints "min_cores_per_node":{}'
256 int(math.ceil(cpu_max_rate*100)),
259 def _recommend_ram(self):
260 """Recommend asking for (2048*0.95) MiB RAM if max rss was 1248 MiB"""
262 used_ram = self.stats_max['mem']['rss']
263 if used_ram == float('-Inf'):
264 logger.warning('%s: no memory usage data', self.label)
266 used_ram = math.ceil(float(used_ram) / (1<<20))
267 asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
268 if asked_ram is None or (
269 math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10)) <
270 (asked_ram/AVAILABLE_RAM_RATIO)/(1<<10)):
272 '#!! {} max RSS was {} MiB -- '
273 'try runtime_constraints "min_ram_mb_per_node":{}'
277 int(math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10))*(1<<10)*AVAILABLE_RAM_RATIO))
279 def _format(self, val):
280 """Return a string representation of a stat.
282 {:.2f} for floats, default format for everything else."""
283 if isinstance(val, float):
284 return '{:.2f}'.format(val)
286 return '{}'.format(val)
289 class CollectionSummarizer(Summarizer):
290 def __init__(self, collection_id, **kwargs):
291 logger.debug('load collection %s', collection_id)
292 collection = arvados.collection.CollectionReader(collection_id)
293 filenames = [filename for filename in collection]
294 if len(filenames) != 1:
296 "collection {} has {} files; need exactly one".format(
297 collection_id, len(filenames)))
298 super(CollectionSummarizer, self).__init__(
299 collection.open(filenames[0]), **kwargs)
300 self.label = collection_id
303 class JobSummarizer(CollectionSummarizer):
304 def __init__(self, job, **kwargs):
305 arv = arvados.api('v1')
306 if isinstance(job, str):
307 self.job = arv.jobs().get(uuid=job).execute()
310 self.label = self.job['uuid']
311 self.existing_constraints = self.job.get('runtime_constraints', {})
312 if not self.job['log']:
314 "job {} has no log; live summary not implemented".format(
316 super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
317 self.label = self.job['uuid']
320 class PipelineSummarizer():
321 def __init__(self, pipeline_instance_uuid, **kwargs):
322 arv = arvados.api('v1', model=OrderedJsonModel())
323 instance = arv.pipeline_instances().get(
324 uuid=pipeline_instance_uuid).execute()
325 self.summarizers = collections.OrderedDict()
326 for cname, component in instance['components'].iteritems():
327 if 'job' not in component:
329 "%s: skipping component with no job assigned", cname)
330 elif component['job'].get('log') is None:
332 "%s: skipping job %s with no log available",
333 cname, component['job'].get('uuid'))
336 "%s: logdata %s", cname, component['job']['log'])
337 summarizer = JobSummarizer(component['job'], **kwargs)
338 summarizer.label = cname
339 self.summarizers[cname] = summarizer
340 self.label = pipeline_instance_uuid
343 for summarizer in self.summarizers.itervalues():
346 def text_report(self):
348 for cname, summarizer in self.summarizers.iteritems():
349 txt += '### Summary for {} ({})\n'.format(
350 cname, summarizer.job['uuid'])
351 txt += summarizer.text_report()
355 def html_report(self):
356 return crunchstat_summary.chartjs.ChartJS(
357 self.label, self.summarizers.itervalues()).html()