1 from __future__ import print_function
5 import crunchstat_summary.chartjs
13 from arvados.api import OrderedJsonModel
14 from crunchstat_summary import logger
16 # Recommend memory constraints that are this multiple of an integral
17 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
18 # that have amounts like 7.5 GiB according to the kernel.)
19 AVAILABLE_RAM_RATIO = 0.95
25 self.series = collections.defaultdict(list)
28 class Summarizer(object):
29 existing_constraints = {}
31 def __init__(self, logdata, label=None, include_child_jobs=True):
32 self._logdata = logdata
36 self.finishtime = None
37 self._include_child_jobs = include_child_jobs
39 # stats_max: {category: {stat: val}}
40 self.stats_max = collections.defaultdict(
41 functools.partial(collections.defaultdict,
42 lambda: float('-Inf')))
43 # task_stats: {task_id: {category: {stat: val}}}
44 self.task_stats = collections.defaultdict(
45 functools.partial(collections.defaultdict, dict))
48 self.tasks = collections.defaultdict(Task)
50 logger.debug("%s: logdata %s", self.label, repr(logdata))
53 logger.debug("%s: parsing log data", self.label)
54 for line in self._logdata:
55 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
57 seq = int(m.group('seq'))
58 uuid = m.group('task_uuid')
59 self.seq_to_uuid[seq] = uuid
60 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
63 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
65 task_id = self.seq_to_uuid[int(m.group('seq'))]
66 elapsed = int(m.group('elapsed'))
67 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
68 if elapsed > self.stats_max['time']['elapsed']:
69 self.stats_max['time']['elapsed'] = elapsed
72 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
74 uuid = m.group('uuid')
75 if not self._include_child_jobs:
76 logger.warning('%s: omitting %s (try --include-child-job)',
79 logger.debug('%s: follow %s', self.label, uuid)
80 child_summarizer = JobSummarizer(uuid)
81 child_summarizer.stats_max = self.stats_max
82 child_summarizer.task_stats = self.task_stats
83 child_summarizer.tasks = self.tasks
84 child_summarizer.run()
85 logger.debug('%s: done %s', self.label, uuid)
88 m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
92 if self.label is None:
93 self.label = m.group('job_uuid')
94 logger.debug('%s: using job uuid as label', self.label)
95 if m.group('category').endswith(':'):
96 # "stderr crunchstat: notice: ..."
98 elif m.group('category') in ('error', 'caught'):
100 elif m.group('category') == 'read':
101 # "stderr crunchstat: read /proc/1234/net/dev: ..."
102 # (crunchstat formatting fixed, but old logs still say this)
104 task_id = self.seq_to_uuid[int(m.group('seq'))]
105 task = self.tasks[task_id]
107 # Use the first and last crunchstat timestamps as
108 # approximations of starttime and finishtime.
109 timestamp = datetime.datetime.strptime(
110 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
111 if not task.starttime:
112 task.starttime = timestamp
113 logger.debug('%s: task %s starttime %s',
114 self.label, task_id, timestamp)
115 task.finishtime = timestamp
117 if not self.starttime:
118 self.starttime = timestamp
119 self.finishtime = timestamp
121 this_interval_s = None
122 for group in ['current', 'interval']:
123 if not m.group(group):
125 category = m.group('category')
126 words = m.group(group).split(' ')
128 for val, stat in zip(words[::2], words[1::2]):
131 stats[stat] = float(val)
133 stats[stat] = int(val)
134 except ValueError as e:
136 'Error parsing {} stat in "{}": {!r}'.format(
138 if 'user' in stats or 'sys' in stats:
139 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
140 if 'tx' in stats or 'rx' in stats:
141 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
142 for stat, val in stats.iteritems():
143 if group == 'interval':
144 if stat == 'seconds':
145 this_interval_s = val
147 elif not (this_interval_s > 0):
149 "BUG? interval stat given with duration {!r}".
150 format(this_interval_s))
153 stat = stat + '__rate'
154 val = val / this_interval_s
155 if stat in ['user+sys__rate', 'tx+rx__rate']:
156 task.series[category, stat].append(
157 (timestamp - task.starttime, val))
160 task.series[category, stat].append(
161 (timestamp - task.starttime, val))
162 self.task_stats[task_id][category][stat] = val
163 if val > self.stats_max[category][stat]:
164 self.stats_max[category][stat] = val
165 logger.debug('%s: done parsing', self.label)
167 self.job_tot = collections.defaultdict(
168 functools.partial(collections.defaultdict, int))
169 for task_id, task_stat in self.task_stats.iteritems():
170 for category, stat_last in task_stat.iteritems():
171 for stat, val in stat_last.iteritems():
172 if stat in ['cpus', 'cache', 'swap', 'rss']:
173 # meaningless stats like 16 cpu cores x 5 tasks = 80
175 self.job_tot[category][stat] += val
176 logger.debug('%s: done totals', self.label)
178 def long_label(self):
181 label += ' -- elapsed time '
182 s = (self.finishtime - self.starttime).total_seconds()
184 label += '{}d'.format(int(s/86400))
186 label += '{}h'.format(int(s/3600) % 24)
188 label += '{}m'.format(int(s/60) % 60)
189 label += '{}s'.format(int(s) % 60)
192 def text_report(self):
193 return "\n".join(itertools.chain(
194 self._text_report_gen(),
195 self._recommend_gen())) + "\n"
197 def html_report(self):
198 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
200 def _text_report_gen(self):
201 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
202 for category, stat_max in sorted(self.stats_max.iteritems()):
203 for stat, val in sorted(stat_max.iteritems()):
204 if stat.endswith('__rate'):
206 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
207 val = self._format(val)
208 tot = self._format(self.job_tot[category].get(stat, '-'))
209 yield "\t".join([category, stat, str(val), max_rate, tot])
211 ('Number of tasks: {}',
214 ('Max CPU time spent by a single task: {}s',
215 self.stats_max['cpu']['user+sys'],
217 ('Max CPU usage in a single interval: {}%',
218 self.stats_max['cpu']['user+sys__rate'],
220 ('Overall CPU usage: {}%',
221 self.job_tot['cpu']['user+sys'] /
222 self.job_tot['time']['elapsed'],
224 ('Max memory used by a single task: {}GB',
225 self.stats_max['mem']['rss'],
227 ('Max network traffic in a single task: {}GB',
228 self.stats_max['net:eth0']['tx+rx'],
230 ('Max network speed in a single interval: {}MB/s',
231 self.stats_max['net:eth0']['tx+rx__rate'],
233 format_string, val, transform = args
234 if val == float('-Inf'):
238 yield "# "+format_string.format(self._format(val))
240 def _recommend_gen(self):
241 return itertools.chain(
242 self._recommend_cpu(),
243 self._recommend_ram())
245 def _recommend_cpu(self):
246 """Recommend asking for 4 cores if max CPU usage was 333%"""
248 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
249 if cpu_max_rate == float('-Inf'):
250 logger.warning('%s: no CPU usage data', self.label)
252 used_cores = int(math.ceil(cpu_max_rate))
253 asked_cores = self.existing_constraints.get('min_cores_per_node')
254 if asked_cores is None or used_cores < asked_cores:
256 '#!! {} max CPU usage was {}% -- '
257 'try runtime_constraints "min_cores_per_node":{}'
260 int(math.ceil(cpu_max_rate*100)),
263 def _recommend_ram(self):
264 """Recommend an economical RAM constraint for this job.
266 Nodes that are advertised as "8 gibibytes" actually have what
267 we might call "8 nearlygibs" of memory available for jobs.
268 Here, we calculate a whole number of nearlygibs that would
269 have sufficed to run the job, then recommend requesting a node
270 with that number of nearlygibs (expressed as mebibytes).
272 Requesting a node with "nearly 8 gibibytes" is our best hope
273 of getting a node that actually has nearly 8 gibibytes
274 available. If the node manager is smart enough to account for
275 the discrepancy itself when choosing/creating a node, we'll
276 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
277 advertised size of the next-size-smaller node (say, 6 GiB)
278 will be too low to satisfy our request, so we will effectively
279 get rounded up to 8 GiB.
281 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
282 we will generally get a node that is advertised as "8 GiB" and
283 has at least 7500 MiB available. However, asking for 8192 MiB
284 would either result in an unnecessarily expensive 12 GiB node
285 (if node manager knows about the discrepancy), or an 8 GiB
286 node which has less than 8192 MiB available and is therefore
287 considered by crunch-dispatch to be too small to meet our
290 When node manager learns how to predict the available memory
291 for each node type such that crunch-dispatch always agrees
292 that a node is big enough to run the job it was brought up
293 for, all this will be unnecessary. We'll just ask for exactly
294 the memory we want -- even if that happens to be 8192 MiB.
297 used_bytes = self.stats_max['mem']['rss']
298 if used_bytes == float('-Inf'):
299 logger.warning('%s: no memory usage data', self.label)
301 used_mib = math.ceil(float(used_bytes) / 1048576)
302 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
304 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
305 if asked_mib is None or (
306 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
308 '#!! {} max RSS was {} MiB -- '
309 'try runtime_constraints "min_ram_mb_per_node":{}'
313 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
315 def _format(self, val):
316 """Return a string representation of a stat.
318 {:.2f} for floats, default format for everything else."""
319 if isinstance(val, float):
320 return '{:.2f}'.format(val)
322 return '{}'.format(val)
325 class CollectionSummarizer(Summarizer):
326 def __init__(self, collection_id, **kwargs):
327 logger.debug('load collection %s', collection_id)
328 collection = arvados.collection.CollectionReader(collection_id)
329 filenames = [filename for filename in collection]
330 if len(filenames) != 1:
332 "collection {} has {} files; need exactly one".format(
333 collection_id, len(filenames)))
334 super(CollectionSummarizer, self).__init__(
335 collection.open(filenames[0]), **kwargs)
336 self.label = collection_id
339 class JobSummarizer(CollectionSummarizer):
340 def __init__(self, job, **kwargs):
341 arv = arvados.api('v1')
342 if isinstance(job, str):
343 self.job = arv.jobs().get(uuid=job).execute()
346 self.label = self.job['uuid']
347 self.existing_constraints = self.job.get('runtime_constraints', {})
348 if not self.job['log']:
350 "job {} has no log; live summary not implemented".format(
352 super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
353 self.label = self.job['uuid']
356 class PipelineSummarizer():
357 def __init__(self, pipeline_instance_uuid, **kwargs):
358 arv = arvados.api('v1', model=OrderedJsonModel())
359 instance = arv.pipeline_instances().get(
360 uuid=pipeline_instance_uuid).execute()
361 self.summarizers = collections.OrderedDict()
362 for cname, component in instance['components'].iteritems():
363 if 'job' not in component:
365 "%s: skipping component with no job assigned", cname)
366 elif component['job'].get('log') is None:
368 "%s: skipping job %s with no log available",
369 cname, component['job'].get('uuid'))
372 "%s: logdata %s", cname, component['job']['log'])
373 summarizer = JobSummarizer(component['job'], **kwargs)
374 summarizer.label = cname
375 self.summarizers[cname] = summarizer
376 self.label = pipeline_instance_uuid
379 for summarizer in self.summarizers.itervalues():
382 def text_report(self):
384 for cname, summarizer in self.summarizers.iteritems():
385 txt += '### Summary for {} ({})\n'.format(
386 cname, summarizer.job['uuid'])
387 txt += summarizer.text_report()
391 def html_report(self):
392 return crunchstat_summary.chartjs.ChartJS(
393 self.label, self.summarizers.itervalues()).html()