1 from __future__ import print_function
12 logger = logging.getLogger(__name__)
13 logger.addHandler(logging.NullHandler())
15 class Summarizer(object):
16 existing_constraints = {}
18 def __init__(self, logdata, label='job'):
19 self._logdata = logdata
23 # stats_max: {category: {stat: val}}
24 self.stats_max = collections.defaultdict(
25 functools.partial(collections.defaultdict,
26 lambda: float('-Inf')))
27 # task_stats: {task_id: {category: {stat: val}}}
28 self.task_stats = collections.defaultdict(
29 functools.partial(collections.defaultdict, dict))
30 for line in self._logdata:
31 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
33 task_id = m.group('seq')
34 elapsed = int(m.group('elapsed'))
35 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
36 if elapsed > self.stats_max['time']['elapsed']:
37 self.stats_max['time']['elapsed'] = elapsed
39 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
42 if m.group('category').endswith(':'):
45 elif m.group('category') == 'error':
47 task_id = m.group('seq')
48 this_interval_s = None
49 for group in ['current', 'interval']:
50 if not m.group(group):
52 category = m.group('category')
53 words = m.group(group).split(' ')
55 for val, stat in zip(words[::2], words[1::2]):
58 stats[stat] = float(val)
60 stats[stat] = int(val)
61 except ValueError as e:
63 'Error parsing {} stat in "{}": {!r}'.format(
65 if 'user' in stats or 'sys' in stats:
66 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
67 if 'tx' in stats or 'rx' in stats:
68 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
69 for stat, val in stats.iteritems():
70 if group == 'interval':
74 elif not (this_interval_s > 0):
76 "BUG? interval stat given with duration {!r}".
77 format(this_interval_s))
80 stat = stat + '__rate'
81 val = val / this_interval_s
83 self.task_stats[task_id][category][stat] = val
84 if val > self.stats_max[category][stat]:
85 self.stats_max[category][stat] = val
86 self.job_tot = collections.defaultdict(
87 functools.partial(collections.defaultdict, int))
88 for task_id, task_stat in self.task_stats.iteritems():
89 for category, stat_last in task_stat.iteritems():
90 for stat, val in stat_last.iteritems():
91 if stat in ['cpus', 'cache', 'swap', 'rss']:
92 # meaningless stats like 16 cpu cores x 5 tasks = 80
94 self.job_tot[category][stat] += val
97 return "\n".join(itertools.chain(
99 self._recommend_gen())) + "\n"
101 def _report_gen(self):
102 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
103 for category, stat_max in sorted(self.stats_max.iteritems()):
104 for stat, val in sorted(stat_max.iteritems()):
105 if stat.endswith('__rate'):
107 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
108 val = self._format(val)
109 tot = self._format(self.job_tot[category].get(stat, '-'))
110 yield "\t".join([category, stat, str(val), max_rate, tot])
112 ('Max CPU time spent by a single task: {}s',
113 self.stats_max['cpu']['user+sys'],
115 ('Max CPU usage in a single interval: {}%',
116 self.stats_max['cpu']['user+sys__rate'],
118 ('Overall CPU usage: {}%',
119 self.job_tot['cpu']['user+sys'] /
120 self.job_tot['time']['elapsed'],
122 ('Max memory used by a single task: {}GB',
123 self.stats_max['mem']['rss'],
125 ('Max network traffic in a single task: {}GB',
126 self.stats_max['net:eth0']['tx+rx'],
128 ('Max network speed in a single interval: {}MB/s',
129 self.stats_max['net:eth0']['tx+rx__rate'],
131 format_string, val, transform = args
132 if val == float('-Inf'):
136 yield "# "+format_string.format(self._format(val))
138 def _recommend_gen(self):
139 return itertools.chain(
140 self._recommend_cpu(),
141 self._recommend_ram())
143 def _recommend_cpu(self):
144 """Recommend asking for 4 cores if max CPU usage was 333%"""
146 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
147 if cpu_max_rate == float('-Inf'):
148 logger.warning('%s: no CPU usage data', self.label)
150 used_cores = int(math.ceil(cpu_max_rate))
151 asked_cores = self.existing_constraints.get('min_cores_per_node')
152 if asked_cores is None or used_cores < asked_cores:
154 '#!! {} max CPU usage was {}% -- '
155 'try runtime_constraints "min_cores_per_node":{}'
158 int(math.ceil(cpu_max_rate*100)),
161 def _recommend_ram(self):
162 """Recommend asking for 2048 MiB RAM if max rss was 1248 MiB"""
164 used_ram = self.stats_max['mem']['rss']
165 if used_ram == float('-Inf'):
166 logger.warning('%s: no memory usage data', self.label)
168 used_ram = math.ceil(float(used_ram) / (1<<20))
169 asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
170 if asked_ram is None or math.ceil(used_ram/(1<<10)) < asked_ram/(1<<10):
172 '#!! {} never used more than {} MiB RAM -- '
173 'try runtime_constraints "min_ram_mb_per_node":{}'
177 int(math.ceil(used_ram/(1<<10))*(1<<10)))
179 def _format(self, val):
180 """Return a string representation of a stat.
182 {:.2f} for floats, default format for everything else."""
183 if isinstance(val, float):
184 return '{:.2f}'.format(val)
186 return '{}'.format(val)
189 class CollectionSummarizer(Summarizer):
190 def __init__(self, collection_id):
191 collection = arvados.collection.CollectionReader(collection_id)
192 filenames = [filename for filename in collection]
193 if len(filenames) != 1:
195 "collection {} has {} files; need exactly one".format(
196 collection_id, len(filenames)))
197 super(CollectionSummarizer, self).__init__(
198 collection.open(filenames[0]))
201 class JobSummarizer(CollectionSummarizer):
202 def __init__(self, job):
203 arv = arvados.api('v1')
204 if isinstance(job, str):
205 self.job = arv.jobs().get(uuid=job).execute()
208 self.label = self.job['uuid']
209 self.existing_constraints = self.job.get('runtime_constraints', {})
210 if not self.job['log']:
212 "job {} has no log; live summary not implemented".format(
214 super(JobSummarizer, self).__init__(self.job['log'])
217 class PipelineSummarizer():
218 def __init__(self, pipeline_instance_uuid):
219 arv = arvados.api('v1')
220 instance = arv.pipeline_instances().get(
221 uuid=pipeline_instance_uuid).execute()
222 self.summarizers = collections.OrderedDict()
223 for cname, component in instance['components'].iteritems():
224 if 'job' not in component:
226 "%s: skipping component with no job assigned", cname)
227 elif component['job'].get('log') is None:
229 "%s: skipping job %s with no log available",
230 cname, component['job'].get('uuid'))
233 "%s: reading log from %s", cname, component['job']['log'])
234 summarizer = JobSummarizer(component['job'])
235 summarizer.label = cname
236 self.summarizers[cname] = summarizer
239 for summarizer in self.summarizers.itervalues():
244 for cname, summarizer in self.summarizers.iteritems():
245 txt += '### Summary for {} ({})\n'.format(
246 cname, summarizer.job['uuid'])
247 txt += summarizer.report()