sdk/R
tools/sync-groups
tools/crunchstat-summary
+tools/crunchstat-summary:py3
tools/keep-exercise
tools/keep-rsync
tools/keep-block-check
s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
# Our google-api-client dependency used to be < 0.9, but that could be
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
- s.add_runtime_dependency 'cure-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
- s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5'
+ s.add_runtime_dependency 'arvados-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
+ s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5.1'
s.add_runtime_dependency 'json', '>= 1.7.7', '<3'
s.add_runtime_dependency 'optimist', '~> 3.0'
s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
# Our google-api-client dependency used to be < 0.9, but that could be
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
- s.add_dependency('cure-google-api-client', '>= 0.7', '< 0.8.9')
+ s.add_dependency('arvados-google-api-client', '>= 0.7', '< 0.8.9')
# work around undeclared dependency on i18n in some activesupport 3.x.x:
s.add_dependency('i18n', '~> 0')
s.add_dependency('json', '>= 1.7.7', '<3')
import collections
import functools
import arvados.keep
+from prometheus_client import Summary
import Queue
"""
+ fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
+ read_time = fuse_time.labels(op='read')
+ write_time = fuse_time.labels(op='write')
+ destroy_time = fuse_time.labels(op='destroy')
+ on_event_time = fuse_time.labels(op='on_event')
+ getattr_time = fuse_time.labels(op='getattr')
+ setattr_time = fuse_time.labels(op='setattr')
+ lookup_time = fuse_time.labels(op='lookup')
+ forget_time = fuse_time.labels(op='forget')
+ open_time = fuse_time.labels(op='open')
+ release_time = fuse_time.labels(op='release')
+ opendir_time = fuse_time.labels(op='opendir')
+ readdir_time = fuse_time.labels(op='readdir')
+ statfs_time = fuse_time.labels(op='statfs')
+ create_time = fuse_time.labels(op='create')
+ mkdir_time = fuse_time.labels(op='mkdir')
+ unlink_time = fuse_time.labels(op='unlink')
+ rmdir_time = fuse_time.labels(op='rmdir')
+ rename_time = fuse_time.labels(op='rename')
+ flush_time = fuse_time.labels(op='flush')
+
def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
super(Operations, self).__init__()
# initializing to continue
self.initlock.set()
+ def metric_samples(self):
+ return self.fuse_time.collect()[0].samples
+
+ def metric_op_names(self):
+ ops = []
+ for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
+ if cur_op not in ops:
+ ops.append(cur_op)
+ return ops
+
+ def metric_value(self, opname, metric):
+ op_value = [sample.value for sample in self.metric_samples()
+ if sample.name == metric and sample.labels['op'] == opname]
+ return op_value[0] if len(op_value) == 1 else None
+
+ def metric_sum_func(self, opname):
+ return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
+
+ def metric_count_func(self, opname):
+ return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
+
+ @destroy_time.time()
@catch_exceptions
def destroy(self):
self._shutdown_started.set()
[["event_type", "in", ["create", "update", "delete"]]],
self.on_event)
+ @on_event_time.time()
@catch_exceptions
def on_event(self, ev):
if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
self.inodes.inode_cache.find_by_uuid(newowner)):
parent.child_event(ev)
+ @getattr_time.time()
@catch_exceptions
def getattr(self, inode, ctx=None):
if inode not in self.inodes:
return entry
+ @setattr_time.time()
@catch_exceptions
def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
entry = self.getattr(inode)
return entry
+ @lookup_time.time()
@catch_exceptions
def lookup(self, parent_inode, name, ctx=None):
name = unicode(name, self.inodes.encoding)
parent_inode, name)
raise llfuse.FUSEError(errno.ENOENT)
+ @forget_time.time()
@catch_exceptions
def forget(self, inodes):
if self._shutdown_started.is_set():
if ent.dec_ref(nlookup) == 0 and ent.dead:
self.inodes.del_entry(ent)
+ @open_time.time()
@catch_exceptions
def open(self, inode, flags, ctx=None):
if inode in self.inodes:
return fh
+ @read_time.time()
@catch_exceptions
def read(self, fh, off, size):
_logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
self.read_counter.add(len(r))
return r
+ @write_time.time()
@catch_exceptions
def write(self, fh, off, buf):
_logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
self.write_counter.add(w)
return w
+ @release_time.time()
@catch_exceptions
def release(self, fh):
if fh in self._filehandles:
def releasedir(self, fh):
self.release(fh)
+ @opendir_time.time()
@catch_exceptions
def opendir(self, inode, ctx=None):
_logger.debug("arv-mount opendir: inode %i", inode)
self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
return fh
+ @readdir_time.time()
@catch_exceptions
def readdir(self, fh, off):
_logger.debug("arv-mount readdir: fh %i off %i", fh, off)
yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
e += 1
+ @statfs_time.time()
@catch_exceptions
def statfs(self, ctx=None):
st = llfuse.StatvfsData()
return p
+ @create_time.time()
@catch_exceptions
def create(self, inode_parent, name, mode, flags, ctx=None):
_logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
f.inc_ref()
return (fh, self.getattr(f.inode))
+ @mkdir_time.time()
@catch_exceptions
def mkdir(self, inode_parent, name, mode, ctx=None):
_logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
d.inc_ref()
return self.getattr(d.inode)
+ @unlink_time.time()
@catch_exceptions
def unlink(self, inode_parent, name, ctx=None):
_logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.unlink(name)
+ @rmdir_time.time()
@catch_exceptions
def rmdir(self, inode_parent, name, ctx=None):
_logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.rmdir(name)
+ @rename_time.time()
@catch_exceptions
def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
_logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
dest = self._check_writable(inode_parent_new)
dest.rename(name_old, name_new, src)
+ @flush_time.time()
@catch_exceptions
def flush(self, fh):
if fh in self._filehandles:
import sys
import time
+from collections import namedtuple
-class Stat(object):
- def __init__(self, prefix, interval,
- egr_name, ing_name,
- egr_func, ing_func):
+Stat = namedtuple("Stat", ['name', 'get'])
+
+class StatWriter(object):
+ def __init__(self, prefix, interval, stats):
self.prefix = prefix
self.interval = interval
- self.egr_name = egr_name
- self.ing_name = ing_name
- self.egress = egr_func
- self.ingress = ing_func
- self.egr_prev = self.egress()
- self.ing_prev = self.ingress()
-
- def update(self):
- egr = self.egress()
- ing = self.ingress()
+ self.stats = stats
+ self.previous_stats = []
+ self.update_previous_stats()
- delta = " -- interval %.4f seconds %d %s %d %s" % (self.interval,
- egr - self.egr_prev,
- self.egr_name,
- ing - self.ing_prev,
- self.ing_name)
+ def update_previous_stats(self):
+ self.previous_stats = [stat.get() for stat in self.stats]
- sys.stderr.write("crunchstat: %s %d %s %d %s%s\n" % (self.prefix,
- egr,
- self.egr_name,
- ing,
- self.ing_name,
- delta))
+ def update(self):
+ def append_by_type(string, name, value):
+ if type(value) is float:
+ string += " %.6f %s" % (value, name)
+ else:
+ string += " %s %s" % (str(value), name)
+ return string
- self.egr_prev = egr
- self.ing_prev = ing
+ out = "crunchstat: %s" % self.prefix
+ delta = "-- interval %.4f seconds" % self.interval
+ for i, stat in enumerate(self.stats):
+ value = stat.get()
+ diff = value - self.previous_stats[i]
+ delta = append_by_type(delta, stat.name, diff)
+ out = append_by_type(out, stat.name, value)
+ sys.stderr.write("%s %s\n" % (out, delta))
+ self.update_previous_stats()
def statlogger(interval, keep, ops):
- calls = Stat("keepcalls", interval, "put", "get",
- keep.put_counter.get,
- keep.get_counter.get)
- net = Stat("net:keep0", interval, "tx", "rx",
- keep.upload_counter.get,
- keep.download_counter.get)
- cache = Stat("keepcache", interval, "hit", "miss",
- keep.hits_counter.get,
- keep.misses_counter.get)
- fuseops = Stat("fuseops", interval,"write", "read",
- ops.write_ops_counter.get,
- ops.read_ops_counter.get)
- blk = Stat("blkio:0:0", interval, "write", "read",
- ops.write_counter.get,
- ops.read_counter.get)
+ calls = StatWriter("keepcalls", interval, [
+ Stat("put", keep.put_counter.get),
+ Stat("get", keep.get_counter.get)
+ ])
+ net = StatWriter("net:keep0", interval, [
+ Stat("tx", keep.upload_counter.get),
+ Stat("rx", keep.download_counter.get)
+ ])
+ cache = StatWriter("keepcache", interval, [
+ Stat("hit", keep.hits_counter.get),
+ Stat("miss", keep.misses_counter.get)
+ ])
+ fuseops = StatWriter("fuseops", interval, [
+ Stat("write", ops.write_ops_counter.get),
+ Stat("read", ops.read_ops_counter.get)
+ ])
+ fusetimes = []
+ for cur_op in ops.metric_op_names():
+ name = "fuseop:{0}".format(cur_op)
+ fusetimes.append(StatWriter(name, interval, [
+ Stat("count", ops.metric_count_func(cur_op)),
+ Stat("time", ops.metric_sum_func(cur_op))
+ ]))
+ blk = StatWriter("blkio:0:0", interval, [
+ Stat("write", ops.write_counter.get),
+ Stat("read", ops.read_counter.get)
+ ])
while True:
time.sleep(interval)
calls.update()
net.update()
cache.update()
- fuseops.update()
blk.update()
+ fuseops.update()
+ for ftime in fusetimes:
+ ftime.update()
'llfuse >=1.2, <1.3.4',
'python-daemon',
'ciso8601 >= 2.0.0',
- 'setuptools'
+ 'setuptools',
+ "prometheus_client"
],
extras_require={
':python_version<"3"': ['pytz'],
import argparse
import gzip
+from io import open
import logging
import sys
help='Log more information (once for progress, twice for debug)')
+class UTF8Decode(object):
+ '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
+ '''
+ def __init__(self, fh):
+ self.fh = fh
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return next(self.fh).decode('utf-8')
+
+ next = __next__
+
+ def close(self):
+ # mimic Gzip behavior and don't close underlying object
+ pass
+
+
class Command(object):
def __init__(self, args):
self.args = args
self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
elif self.args.log_file:
if self.args.log_file.endswith('.gz'):
- fh = gzip.open(self.args.log_file)
+ fh = UTF8Decode(gzip.open(self.args.log_file))
else:
- fh = open(self.args.log_file)
+ fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
self.summer = summarizer.Summarizer(fh, **kwargs)
else:
self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
'data': self._collate_data(tasks, stat),
'options': {
'connectSeparatedPoints': True,
- 'labels': ['elapsed']+[uuid for uuid, _ in tasks.iteritems()],
+ 'labels': ['elapsed']+[uuid for uuid, _ in tasks.items()],
'title': '{}: {} {}'.format(label, stat[0], stat[1]),
},
}
def _collate_data(self, tasks, stat):
data = []
nulls = []
- for uuid, task in tasks.iteritems():
+ for uuid, task in tasks.items():
for pt in task.series[stat]:
data.append([pt[0].total_seconds()] + nulls + [pt[1]])
nulls.append(None)
#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import print_function
-
import arvados
import itertools
-import Queue
+import queue
import threading
from crunchstat_summary import logger
self._queue.put(self.EOF)
def __iter__(self):
- self._queue = Queue.Queue()
+ self._queue = queue.Queue()
self._thread = threading.Thread(target=self._get_all_pages)
self._thread.daemon = True
self._thread.start()
return self
- def next(self):
+ def __next__(self):
line = self._queue.get()
if line is self.EOF:
self._thread.join()
raise StopIteration
return line
+ next = __next__ # for Python 2
+
def __enter__(self):
return self
#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import print_function
-
import arvados
import collections
import crunchstat_summary.dygraphs
# number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
# that have amounts like 7.5 GiB according to the kernel.)
AVAILABLE_RAM_RATIO = 0.95
-
+MB=2**20
# Workaround datetime.datetime.strptime() thread-safety bug by calling
# it once before starting threads. https://bugs.python.org/issue7980
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
if 'tx' in stats or 'rx' in stats:
stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
- for stat, val in stats.iteritems():
+ for stat, val in stats.items():
if group == 'interval':
if stat == 'seconds':
this_interval_s = val
self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
- for task_id, task_stat in self.task_stats.iteritems():
- for category, stat_last in task_stat.iteritems():
- for stat, val in stat_last.iteritems():
+ for task_id, task_stat in self.task_stats.items():
+ for category, stat_last in task_stat.items():
+ for stat, val in stat_last.items():
if stat in ['cpus', 'cache', 'swap', 'rss']:
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
def _text_report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
- for category, stat_max in sorted(self.stats_max.iteritems()):
- for stat, val in sorted(stat_max.iteritems()):
+ for category, stat_max in sorted(self.stats_max.items()):
+ for stat, val in sorted(stat_max.items()):
if stat.endswith('__rate'):
continue
max_rate = self._format(stat_max.get(stat+'__rate', '-'))
self.stats_max['cpu']['user+sys__rate'],
lambda x: x * 100),
('Overall CPU usage: {}%',
- self.job_tot['cpu']['user+sys'] /
+ float(self.job_tot['cpu']['user+sys']) /
self.job_tot['time']['elapsed']
if self.job_tot['time']['elapsed'] > 0 else 0,
lambda x: x * 100),
yield "# "+format_string.format(self._format(val))
def _recommend_gen(self):
+ # TODO recommend fixing job granularity if elapsed time is too short
return itertools.chain(
self._recommend_cpu(),
self._recommend_ram(),
constraint_key = self._map_runtime_constraint('vcpus')
cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
- if cpu_max_rate == float('-Inf'):
+ if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
logger.warning('%s: no CPU usage data', self.label)
return
+ # TODO Don't necessarily want to recommend on isolated max peak
+ # take average CPU usage into account as well or % time at max
used_cores = max(1, int(math.ceil(cpu_max_rate)))
asked_cores = self.existing_constraints.get(constraint_key)
- if asked_cores is None or used_cores < asked_cores:
+ if asked_cores is None:
+ asked_cores = 1
+ # TODO: This should be more nuanced in cases where max >> avg
+ if used_cores < asked_cores:
yield (
'#!! {} max CPU usage was {}% -- '
- 'try runtime_constraints "{}":{}'
+ 'try reducing runtime_constraints to "{}":{}'
).format(
self.label,
- int(math.ceil(cpu_max_rate*100)),
+ math.ceil(cpu_max_rate*100),
constraint_key,
int(used_cores))
+ # FIXME: This needs to be updated to account for current nodemanager algorithms
def _recommend_ram(self):
"""Recommend an economical RAM constraint for this job.
if used_bytes == float('-Inf'):
logger.warning('%s: no memory usage data', self.label)
return
- used_mib = math.ceil(float(used_bytes) / 1048576)
+ used_mib = math.ceil(float(used_bytes) / MB)
asked_mib = self.existing_constraints.get(constraint_key)
nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
- if asked_mib is None or (
- math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+ if used_mib > 0 and (asked_mib is None or (
+ math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
yield (
'#!! {} max RSS was {} MiB -- '
- 'try runtime_constraints "{}":{}'
+ 'try reducing runtime_constraints to "{}":{}'
).format(
self.label,
int(used_mib),
constraint_key,
- int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
+ int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
def _recommend_keep_cache(self):
"""Recommend increasing keep cache if utilization < 80%"""
return
utilization = (float(self.job_tot['blkio:0:0']['read']) /
float(self.job_tot['net:keep0']['rx']))
- asked_mib = self.existing_constraints.get(constraint_key, 256)
+ # FIXME: the default on this get won't work correctly
+ asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
if utilization < 0.8:
yield (
'#!! {} Keep cache utilization was {:.2f}% -- '
- 'try runtime_constraints "{}":{} (or more)'
+ 'try doubling runtime_constraints to "{}":{} (or more)'
).format(
self.label,
utilization * 100.0,
constraint_key,
- asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
+ math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
def _format(self, val):
class JobSummarizer(ProcessSummarizer):
- runtime_constraint_mem_unit = 1048576
+ runtime_constraint_mem_unit = MB
map_runtime_constraint = {
'keep_cache_ram': 'keep_cache_mb_per_task',
'ram': 'min_ram_mb_per_node',
def run(self):
threads = []
- for child in self.children.itervalues():
+ for child in self.children.values():
self.throttle.acquire()
t = threading.Thread(target=self.run_and_release, args=(child.run, ))
t.daemon = True
def text_report(self):
txt = ''
d = self._descendants()
- for child in d.itervalues():
+ for child in d.values():
if len(d) > 1:
txt += '### Summary for {} ({})\n'.format(
child.label, child.process['uuid'])
MultiSummarizers) are omitted.
"""
d = collections.OrderedDict()
- for key, child in self.children.iteritems():
+ for key, child in self.children.items():
if isinstance(child, Summarizer):
d[key] = child
if isinstance(child, MultiSummarizer):
return d
def html_report(self):
- return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
+ return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
class JobTreeSummarizer(MultiSummarizer):
preloaded = {}
for j in arv.jobs().index(
limit=len(job['components']),
- filters=[['uuid','in',job['components'].values()]]).execute()['items']:
+ filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
preloaded[j['uuid']] = j
for cname in sorted(job['components'].keys()):
child_uuid = job['components'][cname]
class PipelineSummarizer(MultiSummarizer):
def __init__(self, instance, **kwargs):
children = collections.OrderedDict()
- for cname, component in instance['components'].iteritems():
+ for cname, component in instance['components'].items():
if 'job' not in component:
logger.warning(
"%s: skipping component with no job assigned", cname)
cr['name'] = cr.get('name') or cr['uuid']
todo.append(cr)
sorted_children = collections.OrderedDict()
- for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
+ for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
sorted_children[uuid] = children[uuid]
super(ContainerTreeSummarizer, self).__init__(
children=sorted_children,
#
# SPDX-License-Identifier: AGPL-3.0
-import cgi
+try:
+ from html import escape
+except ImportError:
+ from cgi import escape
+
import json
import pkg_resources
<script type="text/javascript">{}</script>
{}
</head><body></body></html>
- '''.format(cgi.escape(self.label),
+ '''.format(escape(self.label),
self.JSLIB, self.js(), self.headHTML())
def js(self):
return 'var chartdata = {};\n{}'.format(
json.dumps(self.sections()),
- '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS]))
+ '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa).decode('utf-8') for jsa in self.JSASSETS]))
def sections(self):
return [
# Number of tasks: 1
# Max CPU time spent by a single task: 0s
# Max CPU usage in a single interval: 0%
-# Overall CPU usage: 0%
+# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! container max CPU usage was 0% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 0 MiB -- try runtime_constraints "ram":0
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
# Max network speed in a single interval: 42.58MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
import difflib
import glob
import gzip
+from io import open
import mock
import os
+import sys
import unittest
+from crunchstat_summary.command import UTF8Decode
+
TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
class ReportDiff(unittest.TestCase):
def diff_known_report(self, logfile, cmd):
expectfile = logfile+'.report'
- expect = open(expectfile).readlines()
+ with open(expectfile, encoding='utf-8') as f:
+ expect = f.readlines()
self.diff_report(cmd, expect, expectfile=expectfile)
- def diff_report(self, cmd, expect, expectfile=None):
+ def diff_report(self, cmd, expect, expectfile='(expected)'):
got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
expect, got, fromfile=expectfile, tofile="(generated)")))
['--format=html', '--log-file', logfile])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
+ if sys.version_info >= (3,2):
+ self.assertRegex(cmd.report(), r'(?is)<html>.*</html>\s*$')
+ else:
+ self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
class SummarizeEdgeCases(unittest.TestCase):
def test_error_messages(self):
- logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+ logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'), encoding='utf-8')
s = crunchstat_summary.summarizer.Summarizer(logfile)
s.run()
'container.json', 'crunchstat.txt', 'arv-mount.txt']
def _open(n):
if n == "crunchstat.txt":
- return gzip.open(self.logfile)
+ return UTF8Decode(gzip.open(self.logfile))
elif n == "arv-mount.txt":
- return gzip.open(self.arvmountlog)
+ return UTF8Decode(gzip.open(self.arvmountlog))
mock_cr().open.side_effect = _open
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_request['uuid']])
def test_job_report(self, mock_api, mock_cr):
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.return_value = gzip.open(self.logfile)
+ mock_cr().open.return_value = UTF8Decode(gzip.open(self.logfile))
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job_uuid])
cmd = crunchstat_summary.command.Command(args)
mock_api().pipeline_instances().get().execute. \
return_value = self.fake_instance
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--pipeline-instance', self.fake_instance['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- job_report = [
- line for line in open(logfile+'.report').readlines()
- if not line.startswith('#!! ')]
+ with open(logfile+'.report', encoding='utf-8') as f:
+ job_report = [line for line in f if not line.startswith('#!! ')]
expect = (
['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
job_report + ['\n'] +
mock_api().jobs().index().execute.return_value = self.fake_jobs_index
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- job_report = [
- line for line in open(logfile+'.report').readlines()
- if not line.startswith('#!! ')]
+ with open(logfile+'.report', encoding='utf-8') as f:
+ job_report = [line for line in f if not line.startswith('#!! ')]
expect = (
['### Summary for zzzzz-8i9sb-i3e77t9z5y8j9cc (partial) (zzzzz-8i9sb-i3e77t9z5y8j9cc)\n',
'(no report generated)\n',