1 from __future__ import print_function
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
14 from arvados.api import OrderedJsonModel
15 from crunchstat_summary import logger
17 # Recommend memory constraints that are this multiple of an integral
18 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
19 # that have amounts like 7.5 GiB according to the kernel.)
20 AVAILABLE_RAM_RATIO = 0.95
26 self.series = collections.defaultdict(list)
29 class Summarizer(object):
30 def __init__(self, logdata, label=None, skip_child_jobs=False):
31 self._logdata = logdata
35 self.finishtime = None
36 self._skip_child_jobs = skip_child_jobs
38 # stats_max: {category: {stat: val}}
39 self.stats_max = collections.defaultdict(
40 functools.partial(collections.defaultdict,
41 lambda: float('-Inf')))
42 # task_stats: {task_id: {category: {stat: val}}}
43 self.task_stats = collections.defaultdict(
44 functools.partial(collections.defaultdict, dict))
47 self.tasks = collections.defaultdict(Task)
49 # We won't bother recommending new runtime constraints if the
50 # constraints given when running the job are known to us and
51 # are already suitable. If applicable, the subclass
52 # constructor will overwrite this with something useful.
53 self.existing_constraints = {}
55 logger.debug("%s: logdata %s", self.label, repr(logdata))
58 logger.debug("%s: parsing log data", self.label)
59 for line in self._logdata:
60 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
62 seq = int(m.group('seq'))
63 uuid = m.group('task_uuid')
64 self.seq_to_uuid[seq] = uuid
65 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
68 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
70 task_id = self.seq_to_uuid[int(m.group('seq'))]
71 elapsed = int(m.group('elapsed'))
72 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
73 if elapsed > self.stats_max['time']['elapsed']:
74 self.stats_max['time']['elapsed'] = elapsed
77 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
79 uuid = m.group('uuid')
80 if self._skip_child_jobs:
81 logger.warning('%s: omitting stats from child job %s'
82 ' because --skip-child-jobs flag is on',
85 logger.debug('%s: follow %s', self.label, uuid)
86 child_summarizer = JobSummarizer(uuid)
87 child_summarizer.stats_max = self.stats_max
88 child_summarizer.task_stats = self.task_stats
89 child_summarizer.tasks = self.tasks
90 child_summarizer.starttime = self.starttime
91 child_summarizer.run()
92 logger.debug('%s: done %s', self.label, uuid)
95 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
99 if self.label is None:
100 self.label = m.group('job_uuid')
101 logger.debug('%s: using job uuid as label', self.label)
102 if m.group('category').endswith(':'):
103 # "stderr crunchstat: notice: ..."
105 elif m.group('category') in ('error', 'caught'):
107 elif m.group('category') == 'read':
108 # "stderr crunchstat: read /proc/1234/net/dev: ..."
109 # (crunchstat formatting fixed, but old logs still say this)
111 task_id = self.seq_to_uuid[int(m.group('seq'))]
112 task = self.tasks[task_id]
114 # Use the first and last crunchstat timestamps as
115 # approximations of starttime and finishtime.
116 timestamp = datetime.datetime.strptime(
117 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
118 if not task.starttime:
119 task.starttime = timestamp
120 logger.debug('%s: task %s starttime %s',
121 self.label, task_id, timestamp)
122 task.finishtime = timestamp
124 if not self.starttime:
125 self.starttime = timestamp
126 self.finishtime = timestamp
128 this_interval_s = None
129 for group in ['current', 'interval']:
130 if not m.group(group):
132 category = m.group('category')
133 words = m.group(group).split(' ')
135 for val, stat in zip(words[::2], words[1::2]):
138 stats[stat] = float(val)
140 stats[stat] = int(val)
141 except ValueError as e:
143 'Error parsing {} stat in "{}": {!r}'.format(
145 if 'user' in stats or 'sys' in stats:
146 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
147 if 'tx' in stats or 'rx' in stats:
148 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
149 for stat, val in stats.iteritems():
150 if group == 'interval':
151 if stat == 'seconds':
152 this_interval_s = val
154 elif not (this_interval_s > 0):
156 "BUG? interval stat given with duration {!r}".
157 format(this_interval_s))
160 stat = stat + '__rate'
161 val = val / this_interval_s
162 if stat in ['user+sys__rate', 'tx+rx__rate']:
163 task.series[category, stat].append(
164 (timestamp - self.starttime, val))
167 task.series[category, stat].append(
168 (timestamp - self.starttime, val))
169 self.task_stats[task_id][category][stat] = val
170 if val > self.stats_max[category][stat]:
171 self.stats_max[category][stat] = val
172 logger.debug('%s: done parsing', self.label)
174 self.job_tot = collections.defaultdict(
175 functools.partial(collections.defaultdict, int))
176 for task_id, task_stat in self.task_stats.iteritems():
177 for category, stat_last in task_stat.iteritems():
178 for stat, val in stat_last.iteritems():
179 if stat in ['cpus', 'cache', 'swap', 'rss']:
180 # meaningless stats like 16 cpu cores x 5 tasks = 80
182 self.job_tot[category][stat] += val
183 logger.debug('%s: done totals', self.label)
185 def long_label(self):
188 label += ' -- elapsed time '
189 s = (self.finishtime - self.starttime).total_seconds()
191 label += '{}d'.format(int(s/86400))
193 label += '{}h'.format(int(s/3600) % 24)
195 label += '{}m'.format(int(s/60) % 60)
196 label += '{}s'.format(int(s) % 60)
199 def text_report(self):
200 return "\n".join(itertools.chain(
201 self._text_report_gen(),
202 self._recommend_gen())) + "\n"
204 def html_report(self):
205 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
207 def _text_report_gen(self):
208 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
209 for category, stat_max in sorted(self.stats_max.iteritems()):
210 for stat, val in sorted(stat_max.iteritems()):
211 if stat.endswith('__rate'):
213 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
214 val = self._format(val)
215 tot = self._format(self.job_tot[category].get(stat, '-'))
216 yield "\t".join([category, stat, str(val), max_rate, tot])
218 ('Number of tasks: {}',
221 ('Max CPU time spent by a single task: {}s',
222 self.stats_max['cpu']['user+sys'],
224 ('Max CPU usage in a single interval: {}%',
225 self.stats_max['cpu']['user+sys__rate'],
227 ('Overall CPU usage: {}%',
228 self.job_tot['cpu']['user+sys'] /
229 self.job_tot['time']['elapsed'],
231 ('Max memory used by a single task: {}GB',
232 self.stats_max['mem']['rss'],
234 ('Max network traffic in a single task: {}GB',
235 self.stats_max['net:eth0']['tx+rx'],
237 ('Max network speed in a single interval: {}MB/s',
238 self.stats_max['net:eth0']['tx+rx__rate'],
240 format_string, val, transform = args
241 if val == float('-Inf'):
245 yield "# "+format_string.format(self._format(val))
247 def _recommend_gen(self):
248 return itertools.chain(
249 self._recommend_cpu(),
250 self._recommend_ram())
252 def _recommend_cpu(self):
253 """Recommend asking for 4 cores if max CPU usage was 333%"""
255 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
256 if cpu_max_rate == float('-Inf'):
257 logger.warning('%s: no CPU usage data', self.label)
259 used_cores = int(math.ceil(cpu_max_rate))
260 asked_cores = self.existing_constraints.get('min_cores_per_node')
261 if asked_cores is None or used_cores < asked_cores:
263 '#!! {} max CPU usage was {}% -- '
264 'try runtime_constraints "min_cores_per_node":{}'
267 int(math.ceil(cpu_max_rate*100)),
270 def _recommend_ram(self):
271 """Recommend an economical RAM constraint for this job.
273 Nodes that are advertised as "8 gibibytes" actually have what
274 we might call "8 nearlygibs" of memory available for jobs.
275 Here, we calculate a whole number of nearlygibs that would
276 have sufficed to run the job, then recommend requesting a node
277 with that number of nearlygibs (expressed as mebibytes).
279 Requesting a node with "nearly 8 gibibytes" is our best hope
280 of getting a node that actually has nearly 8 gibibytes
281 available. If the node manager is smart enough to account for
282 the discrepancy itself when choosing/creating a node, we'll
283 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
284 advertised size of the next-size-smaller node (say, 6 GiB)
285 will be too low to satisfy our request, so we will effectively
286 get rounded up to 8 GiB.
288 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
289 we will generally get a node that is advertised as "8 GiB" and
290 has at least 7500 MiB available. However, asking for 8192 MiB
291 would either result in an unnecessarily expensive 12 GiB node
292 (if node manager knows about the discrepancy), or an 8 GiB
293 node which has less than 8192 MiB available and is therefore
294 considered by crunch-dispatch to be too small to meet our
297 When node manager learns how to predict the available memory
298 for each node type such that crunch-dispatch always agrees
299 that a node is big enough to run the job it was brought up
300 for, all this will be unnecessary. We'll just ask for exactly
301 the memory we want -- even if that happens to be 8192 MiB.
304 used_bytes = self.stats_max['mem']['rss']
305 if used_bytes == float('-Inf'):
306 logger.warning('%s: no memory usage data', self.label)
308 used_mib = math.ceil(float(used_bytes) / 1048576)
309 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
311 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
312 if asked_mib is None or (
313 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
315 '#!! {} max RSS was {} MiB -- '
316 'try runtime_constraints "min_ram_mb_per_node":{}'
320 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
322 def _format(self, val):
323 """Return a string representation of a stat.
325 {:.2f} for floats, default format for everything else."""
326 if isinstance(val, float):
327 return '{:.2f}'.format(val)
329 return '{}'.format(val)
332 class CollectionSummarizer(Summarizer):
333 def __init__(self, collection_id, **kwargs):
334 super(CollectionSummarizer, self).__init__(
335 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
336 self.label = collection_id
339 class JobSummarizer(Summarizer):
340 def __init__(self, job, **kwargs):
341 arv = arvados.api('v1')
342 if isinstance(job, basestring):
343 self.job = arv.jobs().get(uuid=job).execute()
347 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
348 label = self.job['uuid']
350 rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
351 label = self.job['uuid'] + ' (partial)'
352 super(JobSummarizer, self).__init__(rdr, **kwargs)
354 self.existing_constraints = self.job.get('runtime_constraints', {})
357 class PipelineSummarizer(object):
358 def __init__(self, pipeline_instance_uuid, **kwargs):
359 arv = arvados.api('v1', model=OrderedJsonModel())
360 instance = arv.pipeline_instances().get(
361 uuid=pipeline_instance_uuid).execute()
362 self.summarizers = collections.OrderedDict()
363 for cname, component in instance['components'].iteritems():
364 if 'job' not in component:
366 "%s: skipping component with no job assigned", cname)
367 elif component['job'].get('log') is None:
369 "%s: skipping job %s with no log available",
370 cname, component['job'].get('uuid'))
373 "%s: logdata %s", cname, component['job']['log'])
374 summarizer = JobSummarizer(component['job'], **kwargs)
375 summarizer.label = cname
376 self.summarizers[cname] = summarizer
377 self.label = pipeline_instance_uuid
380 for summarizer in self.summarizers.itervalues():
383 def text_report(self):
385 for cname, summarizer in self.summarizers.iteritems():
386 txt += '### Summary for {} ({})\n'.format(
387 cname, summarizer.job['uuid'])
388 txt += summarizer.text_report()
392 def html_report(self):
393 return crunchstat_summary.chartjs.ChartJS(
394 self.label, self.summarizers.itervalues()).html()