1 from __future__ import print_function
5 import crunchstat_summary.chartjs
13 from arvados.api import OrderedJsonModel
14 from crunchstat_summary import logger
16 # Recommend memory constraints that are this multiple of an integral
17 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
18 # that have amounts like 7.5 GiB according to the kernel.)
19 AVAILABLE_RAM_RATIO = 0.95
25 self.series = collections.defaultdict(list)
28 class Summarizer(object):
29 existing_constraints = {}
31 def __init__(self, logdata, label=None):
32 self._logdata = logdata
35 self.finishtime = None
36 logger.debug("%s: logdata %s", self.label, repr(logdata))
39 logger.debug("%s: parsing log data", self.label)
40 # stats_max: {category: {stat: val}}
41 self.stats_max = collections.defaultdict(
42 functools.partial(collections.defaultdict,
43 lambda: float('-Inf')))
44 # task_stats: {task_id: {category: {stat: val}}}
45 self.task_stats = collections.defaultdict(
46 functools.partial(collections.defaultdict, dict))
47 self.tasks = collections.defaultdict(Task)
48 for line in self._logdata:
49 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
51 task_id = m.group('seq')
52 elapsed = int(m.group('elapsed'))
53 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
54 if elapsed > self.stats_max['time']['elapsed']:
55 self.stats_max['time']['elapsed'] = elapsed
57 m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
60 if self.label is None:
61 self.label = m.group('job_uuid')
62 logger.debug('%s: using job uuid as label', self.label)
63 if m.group('category').endswith(':'):
66 elif m.group('category') == 'error':
68 task_id = m.group('seq')
69 task = self.tasks[task_id]
71 # Use the first and last crunchstat timestamps as
72 # approximations of starttime and finishtime.
73 timestamp = datetime.datetime.strptime(
74 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
75 if not task.starttime:
76 task.starttime = timestamp
77 logger.debug('%s: task %s starttime %s',
78 self.label, task_id, timestamp)
79 task.finishtime = timestamp
81 if not self.starttime:
82 self.starttime = timestamp
83 self.finishtime = timestamp
85 this_interval_s = None
86 for group in ['current', 'interval']:
87 if not m.group(group):
89 category = m.group('category')
90 words = m.group(group).split(' ')
92 for val, stat in zip(words[::2], words[1::2]):
95 stats[stat] = float(val)
97 stats[stat] = int(val)
98 except ValueError as e:
100 'Error parsing {} stat in "{}": {!r}'.format(
102 if 'user' in stats or 'sys' in stats:
103 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
104 if 'tx' in stats or 'rx' in stats:
105 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
106 for stat, val in stats.iteritems():
107 if group == 'interval':
108 if stat == 'seconds':
109 this_interval_s = val
111 elif not (this_interval_s > 0):
113 "BUG? interval stat given with duration {!r}".
114 format(this_interval_s))
117 stat = stat + '__rate'
118 val = val / this_interval_s
119 if stat in ['user+sys__rate', 'tx+rx__rate']:
120 task.series[category, stat].append(
121 (timestamp - task.starttime, val))
124 task.series[category, stat].append(
125 (timestamp - task.starttime, val))
126 self.task_stats[task_id][category][stat] = val
127 if val > self.stats_max[category][stat]:
128 self.stats_max[category][stat] = val
129 self.job_tot = collections.defaultdict(
130 functools.partial(collections.defaultdict, int))
131 for task_id, task_stat in self.task_stats.iteritems():
132 for category, stat_last in task_stat.iteritems():
133 for stat, val in stat_last.iteritems():
134 if stat in ['cpus', 'cache', 'swap', 'rss']:
135 # meaningless stats like 16 cpu cores x 5 tasks = 80
137 self.job_tot[category][stat] += val
139 def long_label(self):
142 label += ' -- elapsed time '
143 s = (self.finishtime - self.starttime).total_seconds()
145 label += '{}d'.format(int(s/86400))
147 label += '{}h'.format(int(s/3600) % 24)
149 label += '{}m'.format(int(s/60) % 60)
150 label += '{}s'.format(int(s) % 60)
153 def text_report(self):
154 return "\n".join(itertools.chain(
155 self._text_report_gen(),
156 self._recommend_gen())) + "\n"
158 def html_report(self):
159 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
161 def _text_report_gen(self):
162 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
163 for category, stat_max in sorted(self.stats_max.iteritems()):
164 for stat, val in sorted(stat_max.iteritems()):
165 if stat.endswith('__rate'):
167 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
168 val = self._format(val)
169 tot = self._format(self.job_tot[category].get(stat, '-'))
170 yield "\t".join([category, stat, str(val), max_rate, tot])
172 ('Max CPU time spent by a single task: {}s',
173 self.stats_max['cpu']['user+sys'],
175 ('Max CPU usage in a single interval: {}%',
176 self.stats_max['cpu']['user+sys__rate'],
178 ('Overall CPU usage: {}%',
179 self.job_tot['cpu']['user+sys'] /
180 self.job_tot['time']['elapsed'],
182 ('Max memory used by a single task: {}GB',
183 self.stats_max['mem']['rss'],
185 ('Max network traffic in a single task: {}GB',
186 self.stats_max['net:eth0']['tx+rx'],
188 ('Max network speed in a single interval: {}MB/s',
189 self.stats_max['net:eth0']['tx+rx__rate'],
191 format_string, val, transform = args
192 if val == float('-Inf'):
196 yield "# "+format_string.format(self._format(val))
198 def _recommend_gen(self):
199 return itertools.chain(
200 self._recommend_cpu(),
201 self._recommend_ram())
203 def _recommend_cpu(self):
204 """Recommend asking for 4 cores if max CPU usage was 333%"""
206 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
207 if cpu_max_rate == float('-Inf'):
208 logger.warning('%s: no CPU usage data', self.label)
210 used_cores = int(math.ceil(cpu_max_rate))
211 asked_cores = self.existing_constraints.get('min_cores_per_node')
212 if asked_cores is None or used_cores < asked_cores:
214 '#!! {} max CPU usage was {}% -- '
215 'try runtime_constraints "min_cores_per_node":{}'
218 int(math.ceil(cpu_max_rate*100)),
221 def _recommend_ram(self):
222 """Recommend asking for (2048*0.95) MiB RAM if max rss was 1248 MiB"""
224 used_ram = self.stats_max['mem']['rss']
225 if used_ram == float('-Inf'):
226 logger.warning('%s: no memory usage data', self.label)
228 used_ram = math.ceil(float(used_ram) / (1<<20))
229 asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
230 if asked_ram is None or (
231 math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10)) <
232 (asked_ram/AVAILABLE_RAM_RATIO)/(1<<10)):
234 '#!! {} max RSS was {} MiB -- '
235 'try runtime_constraints "min_ram_mb_per_node":{}'
239 int(math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10))*(1<<10)*AVAILABLE_RAM_RATIO))
241 def _format(self, val):
242 """Return a string representation of a stat.
244 {:.2f} for floats, default format for everything else."""
245 if isinstance(val, float):
246 return '{:.2f}'.format(val)
248 return '{}'.format(val)
251 class CollectionSummarizer(Summarizer):
252 def __init__(self, collection_id):
253 collection = arvados.collection.CollectionReader(collection_id)
254 filenames = [filename for filename in collection]
255 if len(filenames) != 1:
257 "collection {} has {} files; need exactly one".format(
258 collection_id, len(filenames)))
259 super(CollectionSummarizer, self).__init__(
260 collection.open(filenames[0]))
261 self.label = collection_id
264 class JobSummarizer(CollectionSummarizer):
265 def __init__(self, job):
266 arv = arvados.api('v1')
267 if isinstance(job, str):
268 self.job = arv.jobs().get(uuid=job).execute()
271 self.label = self.job['uuid']
272 self.existing_constraints = self.job.get('runtime_constraints', {})
273 if not self.job['log']:
275 "job {} has no log; live summary not implemented".format(
277 super(JobSummarizer, self).__init__(self.job['log'])
278 self.label = self.job['uuid']
281 class PipelineSummarizer():
282 def __init__(self, pipeline_instance_uuid):
283 arv = arvados.api('v1', model=OrderedJsonModel())
284 instance = arv.pipeline_instances().get(
285 uuid=pipeline_instance_uuid).execute()
286 self.summarizers = collections.OrderedDict()
287 for cname, component in instance['components'].iteritems():
288 if 'job' not in component:
290 "%s: skipping component with no job assigned", cname)
291 elif component['job'].get('log') is None:
293 "%s: skipping job %s with no log available",
294 cname, component['job'].get('uuid'))
297 "%s: logdata %s", cname, component['job']['log'])
298 summarizer = JobSummarizer(component['job'])
299 summarizer.label = cname
300 self.summarizers[cname] = summarizer
301 self.label = pipeline_instance_uuid
304 for summarizer in self.summarizers.itervalues():
307 def text_report(self):
309 for cname, summarizer in self.summarizers.iteritems():
310 txt += '### Summary for {} ({})\n'.format(
311 cname, summarizer.job['uuid'])
312 txt += summarizer.text_report()
316 def html_report(self):
317 return crunchstat_summary.chartjs.ChartJS(
318 self.label, self.summarizers.itervalues()).html()