import arvados
import collections
import crunchstat_summary.chartjs
+import crunchstat_summary.reader
import datetime
import functools
import itertools
class Summarizer(object):
- existing_constraints = {}
-
- def __init__(self, logdata, label=None, include_child_jobs=True):
+ def __init__(self, logdata, label=None, skip_child_jobs=False):
self._logdata = logdata
self.label = label
self.starttime = None
self.finishtime = None
- self._include_child_jobs = include_child_jobs
+ self._skip_child_jobs = skip_child_jobs
# stats_max: {category: {stat: val}}
self.stats_max = collections.defaultdict(
- functools.partial(collections.defaultdict,
- lambda: float('-Inf')))
+ functools.partial(collections.defaultdict, lambda: 0))
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
self.seq_to_uuid = {}
self.tasks = collections.defaultdict(Task)
- logger.debug("%s: logdata %s", self.label, repr(logdata))
+ # We won't bother recommending new runtime constraints if the
+ # constraints given when running the job are known to us and
+ # are already suitable. If applicable, the subclass
+ # constructor will overwrite this with something useful.
+ self.existing_constraints = {}
+
+ logger.debug("%s: logdata %s", self.label, logdata)
def run(self):
- logger.debug("%s: parsing log data", self.label)
+ logger.debug("%s: parsing logdata %s", self.label, self._logdata)
for line in self._logdata:
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
if m:
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
if m:
uuid = m.group('uuid')
- if not self._include_child_jobs:
- logger.warning('%s: omitting %s (try --include-child-job)',
+ if self._skip_child_jobs:
+ logger.warning('%s: omitting stats from child job %s'
+ ' because --skip-child-jobs flag is on',
self.label, uuid)
continue
logger.debug('%s: follow %s', self.label, uuid)
child_summarizer.stats_max = self.stats_max
child_summarizer.task_stats = self.task_stats
child_summarizer.tasks = self.tasks
+ child_summarizer.starttime = self.starttime
child_summarizer.run()
logger.debug('%s: done %s', self.label, uuid)
continue
- m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
if m.group('category').endswith(':'):
# "stderr crunchstat: notice: ..."
continue
- elif m.group('category') == 'error':
+ elif m.group('category') in ('error', 'caught'):
continue
elif m.group('category') == 'read':
# "stderr crunchstat: read /proc/1234/net/dev: ..."
val = val / this_interval_s
if stat in ['user+sys__rate', 'tx+rx__rate']:
task.series[category, stat].append(
- (timestamp - task.starttime, val))
+ (timestamp - self.starttime, val))
else:
if stat in ['rss']:
task.series[category, stat].append(
- (timestamp - task.starttime, val))
+ (timestamp - self.starttime, val))
self.task_stats[task_id][category][stat] = val
if val > self.stats_max[category][stat]:
self.stats_max[category][stat] = val
self.stats_max['mem']['rss'],
lambda x: x / 1e9),
('Max network traffic in a single task: {}GB',
- self.stats_max['net:eth0']['tx+rx'],
+ self.stats_max['net:eth0']['tx+rx'] +
+ self.stats_max['net:keep0']['tx+rx'],
lambda x: x / 1e9),
('Max network speed in a single interval: {}MB/s',
- self.stats_max['net:eth0']['tx+rx__rate'],
+ self.stats_max['net:eth0']['tx+rx__rate'] +
+ self.stats_max['net:keep0']['tx+rx__rate'],
lambda x: x / 1e6)):
format_string, val, transform = args
if val == float('-Inf'):
logger.warning('%s: no CPU usage data', self.label)
return
used_cores = int(math.ceil(cpu_max_rate))
- asked_cores = self.existing_constraints.get('min_cores_per_node')
+ asked_cores = self.existing_constraints.get('min_cores_per_node')
if asked_cores is None or used_cores < asked_cores:
yield (
'#!! {} max CPU usage was {}% -- '
int(used_cores))
def _recommend_ram(self):
- """Recommend asking for (2048*0.95) MiB RAM if max rss was 1248 MiB"""
-
- used_ram = self.stats_max['mem']['rss']
- if used_ram == float('-Inf'):
+ """Recommend an economical RAM constraint for this job.
+
+ Nodes that are advertised as "8 gibibytes" actually have what
+ we might call "8 nearlygibs" of memory available for jobs.
+ Here, we calculate a whole number of nearlygibs that would
+ have sufficed to run the job, then recommend requesting a node
+ with that number of nearlygibs (expressed as mebibytes).
+
+ Requesting a node with "nearly 8 gibibytes" is our best hope
+ of getting a node that actually has nearly 8 gibibytes
+ available. If the node manager is smart enough to account for
+ the discrepancy itself when choosing/creating a node, we'll
+ get an 8 GiB node with nearly 8 GiB available. Otherwise, the
+ advertised size of the next-size-smaller node (say, 6 GiB)
+ will be too low to satisfy our request, so we will effectively
+ get rounded up to 8 GiB.
+
+ For example, if we need 7500 MiB, we can ask for 7500 MiB, and
+ we will generally get a node that is advertised as "8 GiB" and
+ has at least 7500 MiB available. However, asking for 8192 MiB
+ would either result in an unnecessarily expensive 12 GiB node
+ (if node manager knows about the discrepancy), or an 8 GiB
+ node which has less than 8192 MiB available and is therefore
+ considered by crunch-dispatch to be too small to meet our
+ constraint.
+
+ When node manager learns how to predict the available memory
+ for each node type such that crunch-dispatch always agrees
+ that a node is big enough to run the job it was brought up
+ for, all this will be unnecessary. We'll just ask for exactly
+ the memory we want -- even if that happens to be 8192 MiB.
+ """
+
+ used_bytes = self.stats_max['mem']['rss']
+ if used_bytes == float('-Inf'):
logger.warning('%s: no memory usage data', self.label)
return
- used_ram = math.ceil(float(used_ram) / (1<<20))
- asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
- if asked_ram is None or (
- math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10)) <
- (asked_ram/AVAILABLE_RAM_RATIO)/(1<<10)):
+ used_mib = math.ceil(float(used_bytes) / 1048576)
+ asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+
+ nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
+ if asked_mib is None or (
+ math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
yield (
'#!! {} max RSS was {} MiB -- '
'try runtime_constraints "min_ram_mb_per_node":{}'
).format(
self.label,
- int(used_ram),
- int(math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10))*(1<<10)*AVAILABLE_RAM_RATIO))
+ int(used_mib),
+ int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
def _format(self, val):
"""Return a string representation of a stat.
class CollectionSummarizer(Summarizer):
def __init__(self, collection_id, **kwargs):
- logger.debug('load collection %s', collection_id)
- collection = arvados.collection.CollectionReader(collection_id)
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- collection_id, len(filenames)))
super(CollectionSummarizer, self).__init__(
- collection.open(filenames[0]), **kwargs)
+ crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
self.label = collection_id
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
def __init__(self, job, **kwargs):
arv = arvados.api('v1')
- if isinstance(job, str):
+ if isinstance(job, basestring):
self.job = arv.jobs().get(uuid=job).execute()
else:
self.job = job
- self.label = self.job['uuid']
+ if self.job['log']:
+ rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+ label = self.job['uuid']
+ else:
+ rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
+ label = self.job['uuid'] + ' (partial)'
+ super(JobSummarizer, self).__init__(rdr, **kwargs)
+ self.label = label
self.existing_constraints = self.job.get('runtime_constraints', {})
- if not self.job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.job['uuid']))
- super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
- self.label = self.job['uuid']
-class PipelineSummarizer():
+class PipelineSummarizer(object):
def __init__(self, pipeline_instance_uuid, **kwargs):
arv = arvados.api('v1', model=OrderedJsonModel())
instance = arv.pipeline_instances().get(