+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
from __future__ import print_function
import arvados
import re
import sys
import threading
+import _strptime
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
AVAILABLE_RAM_RATIO = 0.95
+# Workaround datetime.datetime.strptime() thread-safety bug by calling
+# it once before starting threads. https://bugs.python.org/issue7980
+datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
+
+
class Task(object):
def __init__(self):
self.starttime = None
if not m:
continue
- if self.label is None:
- self.label = m.group('job_uuid')
- logger.debug('%s: using job uuid as label', self.label)
- if m.group('category').endswith(':'):
- # "stderr crunchstat: notice: ..."
- continue
- elif m.group('category') in ('error', 'caught'):
- continue
- elif m.group('category') == 'read':
- # "stderr crunchstat: read /proc/1234/net/dev: ..."
- # (crunchstat formatting fixed, but old logs still say this)
- continue
- task_id = self.seq_to_uuid[int(m.group('seq'))]
- task = self.tasks[task_id]
-
- # Use the first and last crunchstat timestamps as
- # approximations of starttime and finishtime.
- timestamp = datetime.datetime.strptime(
- m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
- if not task.starttime:
- task.starttime = timestamp
- logger.debug('%s: task %s starttime %s',
- self.label, task_id, timestamp)
- task.finishtime = timestamp
-
- if not self.starttime:
- self.starttime = timestamp
- self.finishtime = timestamp
-
- this_interval_s = None
- for group in ['current', 'interval']:
- if not m.group(group):
+ try:
+ if self.label is None:
+ self.label = m.group('job_uuid')
+ logger.debug('%s: using job uuid as label', self.label)
+ if m.group('category').endswith(':'):
+ # "stderr crunchstat: notice: ..."
continue
- category = m.group('category')
- words = m.group(group).split(' ')
- stats = {}
- for val, stat in zip(words[::2], words[1::2]):
- try:
- if '.' in val:
- stats[stat] = float(val)
- else:
- stats[stat] = int(val)
- except ValueError as e:
- raise ValueError(
- 'Error parsing {} stat in "{}": {!r}'.format(
- stat, line, e))
- if 'user' in stats or 'sys' in stats:
- stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
- if 'tx' in stats or 'rx' in stats:
- stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
- for stat, val in stats.iteritems():
- if group == 'interval':
- if stat == 'seconds':
- this_interval_s = val
- continue
- elif not (this_interval_s > 0):
- logger.error(
- "BUG? interval stat given with duration {!r}".
- format(this_interval_s))
- continue
+ elif m.group('category') in ('error', 'caught'):
+ continue
+ elif m.group('category') == 'read':
+ # "stderr crunchstat: read /proc/1234/net/dev: ..."
+ # (crunchstat formatting fixed, but old logs still say this)
+ continue
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
+ task = self.tasks[task_id]
+
+ # Use the first and last crunchstat timestamps as
+ # approximations of starttime and finishtime.
+ timestamp = datetime.datetime.strptime(
+ m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
+ if not task.starttime:
+ task.starttime = timestamp
+ logger.debug('%s: task %s starttime %s',
+ self.label, task_id, timestamp)
+ task.finishtime = timestamp
+
+ if not self.starttime:
+ self.starttime = timestamp
+ self.finishtime = timestamp
+
+ this_interval_s = None
+ for group in ['current', 'interval']:
+ if not m.group(group):
+ continue
+ category = m.group('category')
+ words = m.group(group).split(' ')
+ stats = {}
+ for val, stat in zip(words[::2], words[1::2]):
+ try:
+ if '.' in val:
+ stats[stat] = float(val)
+ else:
+ stats[stat] = int(val)
+ except ValueError as e:
+ raise ValueError(
+ 'Error parsing {} stat: {!r}'.format(
+ stat, e))
+ if 'user' in stats or 'sys' in stats:
+ stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
+ if 'tx' in stats or 'rx' in stats:
+ stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
+ for stat, val in stats.iteritems():
+ if group == 'interval':
+ if stat == 'seconds':
+ this_interval_s = val
+ continue
+ elif not (this_interval_s > 0):
+ logger.error(
+ "BUG? interval stat given with duration {!r}".
+ format(this_interval_s))
+ continue
+ else:
+ stat = stat + '__rate'
+ val = val / this_interval_s
+ if stat in ['user+sys__rate', 'tx+rx__rate']:
+ task.series[category, stat].append(
+ (timestamp - self.starttime, val))
else:
- stat = stat + '__rate'
- val = val / this_interval_s
- if stat in ['user+sys__rate', 'tx+rx__rate']:
+ if stat in ['rss']:
task.series[category, stat].append(
(timestamp - self.starttime, val))
- else:
- if stat in ['rss']:
- task.series[category, stat].append(
- (timestamp - self.starttime, val))
- self.task_stats[task_id][category][stat] = val
- if val > self.stats_max[category][stat]:
- self.stats_max[category][stat] = val
+ self.task_stats[task_id][category][stat] = val
+ if val > self.stats_max[category][stat]:
+ self.stats_max[category][stat] = val
+ except Exception as e:
+ logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
logger.debug('%s: done parsing', self.label)
self.job_tot = collections.defaultdict(
return label
def text_report(self):
+ if not self.tasks:
+ return "(no report generated)\n"
return "\n".join(itertools.chain(
self._text_report_gen(),
self._recommend_gen())) + "\n"
lambda x: x * 100),
('Overall CPU usage: {}%',
self.job_tot['cpu']['user+sys'] /
- self.job_tot['time']['elapsed'],
+ self.job_tot['time']['elapsed']
+ if self.job_tot['time']['elapsed'] > 0 else 0,
lambda x: x * 100),
('Max memory used by a single task: {}GB',
self.stats_max['mem']['rss'],
int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
def _recommend_keep_cache(self):
- """Recommend increasing keep cache if miss rate is above 0.2%"""
- if self.job_tot['keepcalls']['get'] == 0:
+ """Recommend increasing keep cache if utilization < 80%"""
+ if self.job_tot['net:keep0']['rx'] == 0:
return
- miss_rate = float(self.job_tot['keepcache']['miss']) / float(self.job_tot['keepcalls']['get']) * 100.0
+ utilization = (float(self.job_tot['blkio:0:0']['read']) /
+ float(self.job_tot['net:keep0']['rx']))
asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
- if miss_rate > 0.2:
+ if utilization < 0.8:
yield (
- '#!! {} Keep cache miss rate was {:.2f}% -- '
- 'try runtime_constraints "keep_cache_mb_per_task":{}'
+ '#!! {} Keep cache utilization was {:.2f}% -- '
+ 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
).format(
self.label,
- miss_rate,
+ utilization * 100.0,
asked_mib*2)
else:
self.job = job
rdr = None
- if self.job['log']:
+ if self.job.get('log'):
try:
rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
except arvados.errors.NotFoundError as e:
if 'job' not in component:
logger.warning(
"%s: skipping component with no job assigned", cname)
- elif component['job'].get('log') is None:
- logger.warning(
- "%s: skipping job %s with no log available",
- cname, component['job'].get('uuid'))
else:
logger.info(
- "%s: logdata %s", cname, component['job']['log'])
+ "%s: job %s", cname, component['job']['uuid'])
summarizer = JobSummarizer(component['job'], **kwargs)
- summarizer.label = cname
+ summarizer.label = '{} {}'.format(
+ cname, component['job']['uuid'])
self.summarizers[cname] = summarizer
self.label = pipeline_instance_uuid