import arvados
import collections
import crunchstat_summary.chartjs
+import crunchstat_summary.reader
import datetime
import functools
import itertools
import math
import re
import sys
+import threading
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
class Summarizer(object):
- existing_constraints = {}
-
- def __init__(self, logdata, label=None, include_child_jobs=True):
+ def __init__(self, logdata, label=None, skip_child_jobs=False):
self._logdata = logdata
self.label = label
self.starttime = None
self.finishtime = None
- self._include_child_jobs = include_child_jobs
+ self._skip_child_jobs = skip_child_jobs
# stats_max: {category: {stat: val}}
self.stats_max = collections.defaultdict(
- functools.partial(collections.defaultdict,
- lambda: float('-Inf')))
+ functools.partial(collections.defaultdict, lambda: 0))
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
self.seq_to_uuid = {}
self.tasks = collections.defaultdict(Task)
- logger.debug("%s: logdata %s", self.label, repr(logdata))
+ # We won't bother recommending new runtime constraints if the
+ # constraints given when running the job are known to us and
+ # are already suitable. If applicable, the subclass
+ # constructor will overwrite this with something useful.
+ self.existing_constraints = {}
+
+ logger.debug("%s: logdata %s", self.label, logdata)
def run(self):
- logger.debug("%s: parsing log data", self.label)
+ logger.debug("%s: parsing logdata %s", self.label, self._logdata)
for line in self._logdata:
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
if m:
logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
continue
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
if m:
task_id = self.seq_to_uuid[int(m.group('seq'))]
elapsed = int(m.group('elapsed'))
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
if m:
uuid = m.group('uuid')
- if not self._include_child_jobs:
- logger.warning('%s: omitting %s (try --include-child-job)',
+ if self._skip_child_jobs:
+ logger.warning('%s: omitting stats from child job %s'
+ ' because --skip-child-jobs flag is on',
self.label, uuid)
continue
logger.debug('%s: follow %s', self.label, uuid)
child_summarizer.stats_max = self.stats_max
child_summarizer.task_stats = self.task_stats
child_summarizer.tasks = self.tasks
+ child_summarizer.starttime = self.starttime
child_summarizer.run()
logger.debug('%s: done %s', self.label, uuid)
continue
- m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
val = val / this_interval_s
if stat in ['user+sys__rate', 'tx+rx__rate']:
task.series[category, stat].append(
- (timestamp - task.starttime, val))
+ (timestamp - self.starttime, val))
else:
if stat in ['rss']:
task.series[category, stat].append(
- (timestamp - task.starttime, val))
+ (timestamp - self.starttime, val))
self.task_stats[task_id][category][stat] = val
if val > self.stats_max[category][stat]:
self.stats_max[category][stat] = val
return label
def text_report(self):
+ if not self.tasks:
+ return "(no report generated)\n"
return "\n".join(itertools.chain(
self._text_report_gen(),
self._recommend_gen())) + "\n"
lambda x: x * 100),
('Overall CPU usage: {}%',
self.job_tot['cpu']['user+sys'] /
- self.job_tot['time']['elapsed'],
+ self.job_tot['time']['elapsed']
+ if self.job_tot['time']['elapsed'] > 0 else 0,
lambda x: x * 100),
('Max memory used by a single task: {}GB',
self.stats_max['mem']['rss'],
lambda x: x / 1e9),
('Max network traffic in a single task: {}GB',
- self.stats_max['net:eth0']['tx+rx'],
+ self.stats_max['net:eth0']['tx+rx'] +
+ self.stats_max['net:keep0']['tx+rx'],
lambda x: x / 1e9),
('Max network speed in a single interval: {}MB/s',
- self.stats_max['net:eth0']['tx+rx__rate'],
- lambda x: x / 1e6)):
+ self.stats_max['net:eth0']['tx+rx__rate'] +
+ self.stats_max['net:keep0']['tx+rx__rate'],
+ lambda x: x / 1e6),
+ ('Keep cache miss rate {}%',
+ (float(self.job_tot['keepcache']['miss']) /
+ float(self.job_tot['keepcalls']['get']))
+ if self.job_tot['keepcalls']['get'] > 0 else 0,
+ lambda x: x * 100.0),
+ ('Keep cache utilization {}%',
+ (float(self.job_tot['blkio:0:0']['read']) /
+ float(self.job_tot['net:keep0']['rx']))
+ if self.job_tot['net:keep0']['rx'] > 0 else 0,
+ lambda x: x * 100.0)):
format_string, val, transform = args
if val == float('-Inf'):
continue
def _recommend_gen(self):
return itertools.chain(
self._recommend_cpu(),
- self._recommend_ram())
+ self._recommend_ram(),
+ self._recommend_keep_cache())
def _recommend_cpu(self):
"""Recommend asking for 4 cores if max CPU usage was 333%"""
if cpu_max_rate == float('-Inf'):
logger.warning('%s: no CPU usage data', self.label)
return
- used_cores = int(math.ceil(cpu_max_rate))
- asked_cores = self.existing_constraints.get('min_cores_per_node')
+ used_cores = max(1, int(math.ceil(cpu_max_rate)))
+ asked_cores = self.existing_constraints.get('min_cores_per_node')
if asked_cores is None or used_cores < asked_cores:
yield (
'#!! {} max CPU usage was {}% -- '
int(used_cores))
def _recommend_ram(self):
- """Recommend asking for (2048*0.95) MiB RAM if max rss was 1248 MiB"""
-
- used_ram = self.stats_max['mem']['rss']
- if used_ram == float('-Inf'):
+ """Recommend an economical RAM constraint for this job.
+
+ Nodes that are advertised as "8 gibibytes" actually have what
+ we might call "8 nearlygibs" of memory available for jobs.
+ Here, we calculate a whole number of nearlygibs that would
+ have sufficed to run the job, then recommend requesting a node
+ with that number of nearlygibs (expressed as mebibytes).
+
+ Requesting a node with "nearly 8 gibibytes" is our best hope
+ of getting a node that actually has nearly 8 gibibytes
+ available. If the node manager is smart enough to account for
+ the discrepancy itself when choosing/creating a node, we'll
+ get an 8 GiB node with nearly 8 GiB available. Otherwise, the
+ advertised size of the next-size-smaller node (say, 6 GiB)
+ will be too low to satisfy our request, so we will effectively
+ get rounded up to 8 GiB.
+
+ For example, if we need 7500 MiB, we can ask for 7500 MiB, and
+ we will generally get a node that is advertised as "8 GiB" and
+ has at least 7500 MiB available. However, asking for 8192 MiB
+ would either result in an unnecessarily expensive 12 GiB node
+ (if node manager knows about the discrepancy), or an 8 GiB
+ node which has less than 8192 MiB available and is therefore
+ considered by crunch-dispatch to be too small to meet our
+ constraint.
+
+ When node manager learns how to predict the available memory
+ for each node type such that crunch-dispatch always agrees
+ that a node is big enough to run the job it was brought up
+ for, all this will be unnecessary. We'll just ask for exactly
+ the memory we want -- even if that happens to be 8192 MiB.
+ """
+
+ used_bytes = self.stats_max['mem']['rss']
+ if used_bytes == float('-Inf'):
logger.warning('%s: no memory usage data', self.label)
return
- used_ram = math.ceil(float(used_ram) / (1<<20))
- asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
- if asked_ram is None or (
- math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10)) <
- (asked_ram/AVAILABLE_RAM_RATIO)/(1<<10)):
+ used_mib = math.ceil(float(used_bytes) / 1048576)
+ asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+
+ nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
+ if asked_mib is None or (
+ math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
yield (
'#!! {} max RSS was {} MiB -- '
'try runtime_constraints "min_ram_mb_per_node":{}'
).format(
self.label,
- int(used_ram),
- int(math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10))*(1<<10)*AVAILABLE_RAM_RATIO))
+ int(used_mib),
+ int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
+
+ def _recommend_keep_cache(self):
+ """Recommend increasing keep cache if utilization < 80%"""
+ if self.job_tot['net:keep0']['rx'] == 0:
+ return
+ utilization = (float(self.job_tot['blkio:0:0']['read']) /
+ float(self.job_tot['net:keep0']['rx']))
+ asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
+
+ if utilization < 0.8:
+ yield (
+ '#!! {} Keep cache utilization was {:.2f}% -- '
+ 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
+ ).format(
+ self.label,
+ utilization * 100.0,
+ asked_mib*2)
+
def _format(self, val):
"""Return a string representation of a stat.
class CollectionSummarizer(Summarizer):
def __init__(self, collection_id, **kwargs):
- logger.debug('load collection %s', collection_id)
- collection = arvados.collection.CollectionReader(collection_id)
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- collection_id, len(filenames)))
super(CollectionSummarizer, self).__init__(
- collection.open(filenames[0]), **kwargs)
+ crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
self.label = collection_id
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
def __init__(self, job, **kwargs):
arv = arvados.api('v1')
- if isinstance(job, str):
+ if isinstance(job, basestring):
self.job = arv.jobs().get(uuid=job).execute()
else:
self.job = job
- self.label = self.job['uuid']
+ rdr = None
+ if self.job.get('log'):
+ try:
+ rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+ except arvados.errors.NotFoundError as e:
+ logger.warning("Trying event logs after failing to read "
+ "log collection %s: %s", self.job['log'], e)
+ else:
+ label = self.job['uuid']
+ if rdr is None:
+ rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
+ label = self.job['uuid'] + ' (partial)'
+ super(JobSummarizer, self).__init__(rdr, **kwargs)
+ self.label = label
self.existing_constraints = self.job.get('runtime_constraints', {})
- if not self.job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.job['uuid']))
- super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
- self.label = self.job['uuid']
-class PipelineSummarizer():
+class PipelineSummarizer(object):
def __init__(self, pipeline_instance_uuid, **kwargs):
arv = arvados.api('v1', model=OrderedJsonModel())
instance = arv.pipeline_instances().get(
if 'job' not in component:
logger.warning(
"%s: skipping component with no job assigned", cname)
- elif component['job'].get('log') is None:
- logger.warning(
- "%s: skipping job %s with no log available",
- cname, component['job'].get('uuid'))
else:
logger.info(
- "%s: logdata %s", cname, component['job']['log'])
+ "%s: job %s", cname, component['job']['uuid'])
summarizer = JobSummarizer(component['job'], **kwargs)
- summarizer.label = cname
+ summarizer.label = '{} {}'.format(
+ cname, component['job']['uuid'])
self.summarizers[cname] = summarizer
self.label = pipeline_instance_uuid
def run(self):
+ threads = []
for summarizer in self.summarizers.itervalues():
- summarizer.run()
+ t = threading.Thread(target=summarizer.run)
+ t.daemon = True
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
def text_report(self):
txt = ''