+ max_rate = self._format(stat_max.get(stat+'__rate', '-'))
+ val = self._format(val)
+ tot = self._format(self.job_tot[category].get(stat, '-'))
+ yield "\t".join([category, stat, str(val), max_rate, tot])
+ for args in (
+ ('Number of tasks: {}',
+ len(self.tasks),
+ None),
+ ('Max CPU time spent by a single task: {}s',
+ self.stats_max['cpu']['user+sys'],
+ None),
+ ('Max CPU usage in a single interval: {}%',
+ self.stats_max['cpu']['user+sys__rate'],
+ lambda x: x * 100),
+ ('Overall CPU usage: {}%',
+ self.job_tot['cpu']['user+sys'] /
+ self.job_tot['time']['elapsed']
+ if self.job_tot['time']['elapsed'] > 0 else 0,
+ lambda x: x * 100),
+ ('Max memory used by a single task: {}GB',
+ self.stats_max['mem']['rss'],
+ lambda x: x / 1e9),
+ ('Max network traffic in a single task: {}GB',
+ self.stats_max['net:eth0']['tx+rx'] +
+ self.stats_max['net:keep0']['tx+rx'],
+ lambda x: x / 1e9),
+ ('Max network speed in a single interval: {}MB/s',
+ self.stats_max['net:eth0']['tx+rx__rate'] +
+ self.stats_max['net:keep0']['tx+rx__rate'],
+ lambda x: x / 1e6),
+ ('Keep cache miss rate {}%',
+ (float(self.job_tot['keepcache']['miss']) /
+ float(self.job_tot['keepcalls']['get']))
+ if self.job_tot['keepcalls']['get'] > 0 else 0,
+ lambda x: x * 100.0),
+ ('Keep cache utilization {}%',
+ (float(self.job_tot['blkio:0:0']['read']) /
+ float(self.job_tot['net:keep0']['rx']))
+ if self.job_tot['net:keep0']['rx'] > 0 else 0,
+ lambda x: x * 100.0)):
+ format_string, val, transform = args
+ if val == float('-Inf'):
+ continue
+ if transform:
+ val = transform(val)
+ yield "# "+format_string.format(self._format(val))
+
+ def _recommend_gen(self):
+ return itertools.chain(
+ self._recommend_cpu(),
+ self._recommend_ram(),
+ self._recommend_keep_cache())
+
+ def _recommend_cpu(self):
+ """Recommend asking for 4 cores if max CPU usage was 333%"""
+
+ cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
+ if cpu_max_rate == float('-Inf'):
+ logger.warning('%s: no CPU usage data', self.label)
+ return
+ used_cores = max(1, int(math.ceil(cpu_max_rate)))
+ asked_cores = self.existing_constraints.get('min_cores_per_node')
+ if asked_cores is None or used_cores < asked_cores:
+ yield (
+ '#!! {} max CPU usage was {}% -- '
+ 'try runtime_constraints "min_cores_per_node":{}'
+ ).format(
+ self.label,
+ int(math.ceil(cpu_max_rate*100)),
+ int(used_cores))
+
+ def _recommend_ram(self):
+ """Recommend an economical RAM constraint for this job.
+
+ Nodes that are advertised as "8 gibibytes" actually have what
+ we might call "8 nearlygibs" of memory available for jobs.
+ Here, we calculate a whole number of nearlygibs that would
+ have sufficed to run the job, then recommend requesting a node
+ with that number of nearlygibs (expressed as mebibytes).
+
+ Requesting a node with "nearly 8 gibibytes" is our best hope
+ of getting a node that actually has nearly 8 gibibytes
+ available. If the node manager is smart enough to account for
+ the discrepancy itself when choosing/creating a node, we'll
+ get an 8 GiB node with nearly 8 GiB available. Otherwise, the
+ advertised size of the next-size-smaller node (say, 6 GiB)
+ will be too low to satisfy our request, so we will effectively
+ get rounded up to 8 GiB.
+
+ For example, if we need 7500 MiB, we can ask for 7500 MiB, and
+ we will generally get a node that is advertised as "8 GiB" and
+ has at least 7500 MiB available. However, asking for 8192 MiB
+ would either result in an unnecessarily expensive 12 GiB node
+ (if node manager knows about the discrepancy), or an 8 GiB
+ node which has less than 8192 MiB available and is therefore
+ considered by crunch-dispatch to be too small to meet our
+ constraint.
+
+ When node manager learns how to predict the available memory
+ for each node type such that crunch-dispatch always agrees
+ that a node is big enough to run the job it was brought up
+ for, all this will be unnecessary. We'll just ask for exactly
+ the memory we want -- even if that happens to be 8192 MiB.
+ """
+
+ used_bytes = self.stats_max['mem']['rss']
+ if used_bytes == float('-Inf'):
+ logger.warning('%s: no memory usage data', self.label)
+ return
+ used_mib = math.ceil(float(used_bytes) / 1048576)
+ asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+
+ nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
+ if asked_mib is None or (
+ math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+ yield (
+ '#!! {} max RSS was {} MiB -- '
+ 'try runtime_constraints "min_ram_mb_per_node":{}'
+ ).format(
+ self.label,
+ int(used_mib),
+ int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
+
+ def _recommend_keep_cache(self):
+ """Recommend increasing keep cache if utilization < 80%"""
+ if self.job_tot['net:keep0']['rx'] == 0:
+ return
+ utilization = (float(self.job_tot['blkio:0:0']['read']) /
+ float(self.job_tot['net:keep0']['rx']))
+ asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
+
+ if utilization < 0.8:
+ yield (
+ '#!! {} Keep cache utilization was {:.2f}% -- '
+ 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
+ ).format(
+ self.label,
+ utilization * 100.0,
+ asked_mib*2)
+
+
+ def _format(self, val):
+ """Return a string representation of a stat.
+
+ {:.2f} for floats, default format for everything else."""
+ if isinstance(val, float):
+ return '{:.2f}'.format(val)
+ else:
+ return '{}'.format(val)
+
+
+class CollectionSummarizer(Summarizer):
+ def __init__(self, collection_id, **kwargs):
+ super(CollectionSummarizer, self).__init__(
+ crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
+ self.label = collection_id
+
+
+class JobSummarizer(Summarizer):
+ def __init__(self, job, **kwargs):
+ arv = arvados.api('v1')
+ if isinstance(job, basestring):
+ self.job = arv.jobs().get(uuid=job).execute()