Merge branch '14807-prod-blockers'
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 21 Mar 2019 20:50:32 +0000 (16:50 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 21 Mar 2019 20:50:32 +0000 (16:50 -0400)
refs #14807

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

18 files changed:
build/run-tests.sh
sdk/cli/arvados-cli.gemspec
sdk/ruby/arvados.gemspec
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/crunchstat.py
services/fuse/setup.py
tools/crunchstat-summary/crunchstat_summary/command.py
tools/crunchstat-summary/crunchstat_summary/dygraphs.py
tools/crunchstat-summary/crunchstat_summary/reader.py
tools/crunchstat-summary/crunchstat_summary/summarizer.py
tools/crunchstat-summary/crunchstat_summary/webchart.py
tools/crunchstat-summary/tests/container_9tee4-dz642-lymtndkpy39eibk-arv-mount.txt.gz.report
tools/crunchstat-summary/tests/container_9tee4-dz642-lymtndkpy39eibk-crunchstat.txt.gz.report
tools/crunchstat-summary/tests/container_9tee4-dz642-lymtndkpy39eibk.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report
tools/crunchstat-summary/tests/test_examples.py

index d4afb52fa34382a33bca323ebbdf26b56e89efd4..a6858f315f873cf384e0ba3110ae430bf3a49bf1 100755 (executable)
@@ -123,6 +123,7 @@ sdk/cwl
 sdk/R
 tools/sync-groups
 tools/crunchstat-summary
+tools/crunchstat-summary:py3
 tools/keep-exercise
 tools/keep-rsync
 tools/keep-block-check
index c7e20e2a72947d2e74f147e6a6c0fd68d14254f8..60aeb1892b11e13c980c8c2a97e10da9fbc7a639 100644 (file)
@@ -33,8 +33,8 @@ Gem::Specification.new do |s|
   s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
   # Our google-api-client dependency used to be < 0.9, but that could be
   # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
-  s.add_runtime_dependency 'cure-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
-  s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5'
+  s.add_runtime_dependency 'arvados-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
+  s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5.1'
   s.add_runtime_dependency 'json', '>= 1.7.7', '<3'
   s.add_runtime_dependency 'optimist', '~> 3.0'
   s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
index da919309f4e829f227f3241eb7d41759087dde08..d4f04eb370659fac2ade716e302a301ea7494577 100644 (file)
@@ -29,7 +29,7 @@ Gem::Specification.new do |s|
   s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
   # Our google-api-client dependency used to be < 0.9, but that could be
   # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
-  s.add_dependency('cure-google-api-client', '>= 0.7', '< 0.8.9')
+  s.add_dependency('arvados-google-api-client', '>= 0.7', '< 0.8.9')
   # work around undeclared dependency on i18n in some activesupport 3.x.x:
   s.add_dependency('i18n', '~> 0')
   s.add_dependency('json', '>= 1.7.7', '<3')
index f1e49f5afcffff32143b9033c5f83dddcd0c7c65..b37ef695bde6708cdb8fd2bc7d3117275920460e 100644 (file)
@@ -71,6 +71,7 @@ import ciso8601
 import collections
 import functools
 import arvados.keep
+from prometheus_client import Summary
 
 import Queue
 
@@ -351,6 +352,27 @@ class Operations(llfuse.Operations):
 
     """
 
+    fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
+    read_time = fuse_time.labels(op='read')
+    write_time = fuse_time.labels(op='write')
+    destroy_time = fuse_time.labels(op='destroy')
+    on_event_time = fuse_time.labels(op='on_event')
+    getattr_time = fuse_time.labels(op='getattr')
+    setattr_time = fuse_time.labels(op='setattr')
+    lookup_time = fuse_time.labels(op='lookup')
+    forget_time = fuse_time.labels(op='forget')
+    open_time = fuse_time.labels(op='open')
+    release_time = fuse_time.labels(op='release')
+    opendir_time = fuse_time.labels(op='opendir')
+    readdir_time = fuse_time.labels(op='readdir')
+    statfs_time = fuse_time.labels(op='statfs')
+    create_time = fuse_time.labels(op='create')
+    mkdir_time = fuse_time.labels(op='mkdir')
+    unlink_time = fuse_time.labels(op='unlink')
+    rmdir_time = fuse_time.labels(op='rmdir')
+    rename_time = fuse_time.labels(op='rename')
+    flush_time = fuse_time.labels(op='flush')
+
     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
         super(Operations, self).__init__()
 
@@ -391,6 +413,28 @@ class Operations(llfuse.Operations):
         # initializing to continue
         self.initlock.set()
 
+    def metric_samples(self):
+        return self.fuse_time.collect()[0].samples 
+
+    def metric_op_names(self):
+        ops = []
+        for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
+            if cur_op not in ops:
+                ops.append(cur_op)
+        return ops
+
+    def metric_value(self, opname, metric):
+        op_value = [sample.value for sample in self.metric_samples()
+                    if sample.name == metric and sample.labels['op'] == opname]
+        return op_value[0] if len(op_value) == 1 else None
+
+    def metric_sum_func(self, opname):
+        return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
+
+    def metric_count_func(self, opname):
+        return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
+
+    @destroy_time.time()
     @catch_exceptions
     def destroy(self):
         self._shutdown_started.set()
@@ -417,6 +461,7 @@ class Operations(llfuse.Operations):
             [["event_type", "in", ["create", "update", "delete"]]],
             self.on_event)
 
+    @on_event_time.time()
     @catch_exceptions
     def on_event(self, ev):
         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
@@ -447,6 +492,7 @@ class Operations(llfuse.Operations):
                     self.inodes.inode_cache.find_by_uuid(newowner)):
                 parent.child_event(ev)
 
+    @getattr_time.time()
     @catch_exceptions
     def getattr(self, inode, ctx=None):
         if inode not in self.inodes:
@@ -493,6 +539,7 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @setattr_time.time()
     @catch_exceptions
     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
         entry = self.getattr(inode)
@@ -516,6 +563,7 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @lookup_time.time()
     @catch_exceptions
     def lookup(self, parent_inode, name, ctx=None):
         name = unicode(name, self.inodes.encoding)
@@ -542,6 +590,7 @@ class Operations(llfuse.Operations):
                       parent_inode, name)
             raise llfuse.FUSEError(errno.ENOENT)
 
+    @forget_time.time()
     @catch_exceptions
     def forget(self, inodes):
         if self._shutdown_started.is_set():
@@ -552,6 +601,7 @@ class Operations(llfuse.Operations):
             if ent.dec_ref(nlookup) == 0 and ent.dead:
                 self.inodes.del_entry(ent)
 
+    @open_time.time()
     @catch_exceptions
     def open(self, inode, flags, ctx=None):
         if inode in self.inodes:
@@ -587,6 +637,7 @@ class Operations(llfuse.Operations):
 
         return fh
 
+    @read_time.time()
     @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
@@ -604,6 +655,7 @@ class Operations(llfuse.Operations):
             self.read_counter.add(len(r))
         return r
 
+    @write_time.time()
     @catch_exceptions
     def write(self, fh, off, buf):
         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
@@ -624,6 +676,7 @@ class Operations(llfuse.Operations):
             self.write_counter.add(w)
         return w
 
+    @release_time.time()
     @catch_exceptions
     def release(self, fh):
         if fh in self._filehandles:
@@ -640,6 +693,7 @@ class Operations(llfuse.Operations):
     def releasedir(self, fh):
         self.release(fh)
 
+    @opendir_time.time()
     @catch_exceptions
     def opendir(self, inode, ctx=None):
         _logger.debug("arv-mount opendir: inode %i", inode)
@@ -664,6 +718,7 @@ class Operations(llfuse.Operations):
         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
         return fh
 
+    @readdir_time.time()
     @catch_exceptions
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
@@ -679,6 +734,7 @@ class Operations(llfuse.Operations):
                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
             e += 1
 
+    @statfs_time.time()
     @catch_exceptions
     def statfs(self, ctx=None):
         st = llfuse.StatvfsData()
@@ -712,6 +768,7 @@ class Operations(llfuse.Operations):
 
         return p
 
+    @create_time.time()
     @catch_exceptions
     def create(self, inode_parent, name, mode, flags, ctx=None):
         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
@@ -728,6 +785,7 @@ class Operations(llfuse.Operations):
         f.inc_ref()
         return (fh, self.getattr(f.inode))
 
+    @mkdir_time.time()
     @catch_exceptions
     def mkdir(self, inode_parent, name, mode, ctx=None):
         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
@@ -741,18 +799,21 @@ class Operations(llfuse.Operations):
         d.inc_ref()
         return self.getattr(d.inode)
 
+    @unlink_time.time()
     @catch_exceptions
     def unlink(self, inode_parent, name, ctx=None):
         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
 
+    @rmdir_time.time()
     @catch_exceptions
     def rmdir(self, inode_parent, name, ctx=None):
         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
 
+    @rename_time.time()
     @catch_exceptions
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
@@ -760,6 +821,7 @@ class Operations(llfuse.Operations):
         dest = self._check_writable(inode_parent_new)
         dest.rename(name_old, name_new, src)
 
+    @flush_time.time()
     @catch_exceptions
     def flush(self, fh):
         if fh in self._filehandles:
index 47d89d8b9594a2befeff49a8ef2b13e89950b2ce..e99573752e1c0e69be8f2e07201530bf56a4024b 100644 (file)
@@ -4,64 +4,77 @@
 
 import sys
 import time
+from collections import namedtuple
 
-class Stat(object):
-    def __init__(self, prefix, interval,
-                 egr_name, ing_name,
-                 egr_func, ing_func):
+Stat = namedtuple("Stat", ['name', 'get'])
+
+class StatWriter(object):    
+    def __init__(self, prefix, interval, stats):
         self.prefix = prefix
         self.interval = interval
-        self.egr_name = egr_name
-        self.ing_name = ing_name
-        self.egress = egr_func
-        self.ingress = ing_func
-        self.egr_prev = self.egress()
-        self.ing_prev = self.ingress()
-
-    def update(self):
-        egr = self.egress()
-        ing = self.ingress()
+        self.stats = stats
+        self.previous_stats = []
+        self.update_previous_stats()
 
-        delta = " -- interval %.4f seconds %d %s %d %s" % (self.interval,
-                                                           egr - self.egr_prev,
-                                                           self.egr_name,
-                                                           ing - self.ing_prev,
-                                                           self.ing_name)
+    def update_previous_stats(self):
+        self.previous_stats = [stat.get() for stat in self.stats]
 
-        sys.stderr.write("crunchstat: %s %d %s %d %s%s\n" % (self.prefix,
-                                                             egr,
-                                                             self.egr_name,
-                                                             ing,
-                                                             self.ing_name,
-                                                             delta))
+    def update(self):
+        def append_by_type(string, name, value):
+            if type(value) is float:
+                string += " %.6f %s" % (value, name)
+            else:
+                string += " %s %s" % (str(value), name)
+            return string
 
-        self.egr_prev = egr
-        self.ing_prev = ing
+        out = "crunchstat: %s" % self.prefix
+        delta = "-- interval %.4f seconds" % self.interval
+        for i, stat in enumerate(self.stats):
+            value = stat.get()
+            diff = value - self.previous_stats[i]
+            delta = append_by_type(delta, stat.name, diff)
+            out = append_by_type(out, stat.name, value)
 
+        sys.stderr.write("%s %s\n" % (out, delta))
+        self.update_previous_stats()
 
 def statlogger(interval, keep, ops):
-    calls = Stat("keepcalls", interval, "put", "get",
-                 keep.put_counter.get,
-                 keep.get_counter.get)
-    net = Stat("net:keep0", interval, "tx", "rx",
-               keep.upload_counter.get,
-               keep.download_counter.get)
-    cache = Stat("keepcache", interval, "hit", "miss",
-               keep.hits_counter.get,
-               keep.misses_counter.get)
-    fuseops = Stat("fuseops", interval,"write", "read",
-                   ops.write_ops_counter.get,
-                   ops.read_ops_counter.get)
-    blk = Stat("blkio:0:0", interval, "write", "read",
-               ops.write_counter.get,
-               ops.read_counter.get)
+    calls = StatWriter("keepcalls", interval, [
+        Stat("put", keep.put_counter.get), 
+        Stat("get", keep.get_counter.get)
+    ])
+    net = StatWriter("net:keep0", interval, [
+        Stat("tx", keep.upload_counter.get),
+        Stat("rx", keep.download_counter.get)
+    ])
+    cache = StatWriter("keepcache", interval, [
+        Stat("hit", keep.hits_counter.get), 
+        Stat("miss", keep.misses_counter.get)
+    ])
+    fuseops = StatWriter("fuseops", interval, [
+        Stat("write", ops.write_ops_counter.get), 
+        Stat("read", ops.read_ops_counter.get)
+    ])
+    fusetimes = []
+    for cur_op in ops.metric_op_names():   
+        name = "fuseop:{0}".format(cur_op)
+        fusetimes.append(StatWriter(name, interval, [
+            Stat("count", ops.metric_count_func(cur_op)),
+            Stat("time", ops.metric_sum_func(cur_op))
+        ]))
+    blk = StatWriter("blkio:0:0", interval, [
+        Stat("write", ops.write_counter.get),
+        Stat("read", ops.read_counter.get)
+    ])
 
     while True:
         time.sleep(interval)
         calls.update()
         net.update()
         cache.update()
-        fuseops.update()
         blk.update()
+        fuseops.update()
+        for ftime in fusetimes:
+            ftime.update()
 
 
index 9b4b997cdc68dd14353e4621a77da7f30f7146a1..cbc9cb23edf5f36f0c6a83120d8c21cc0f582241 100644 (file)
@@ -43,7 +43,8 @@ setup(name='arvados_fuse',
         'llfuse >=1.2, <1.3.4',
         'python-daemon',
         'ciso8601 >= 2.0.0',
-        'setuptools'
+        'setuptools',
+        "prometheus_client"
         ],
       extras_require={
           ':python_version<"3"': ['pytz'],
index 71bf38357b885952fd9c327317a458a81ab48984..aadc775823caf136c7f7094a0d2b55fcb50f4478 100644 (file)
@@ -4,6 +4,7 @@
 
 import argparse
 import gzip
+from io import open
 import logging
 import sys
 
@@ -41,6 +42,31 @@ class ArgumentParser(argparse.ArgumentParser):
             help='Log more information (once for progress, twice for debug)')
 
 
+class UTF8Decode(object):
+    '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
+    '''
+    def __init__(self, fh):
+        self.fh = fh
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return next(self.fh).decode('utf-8')
+
+    next = __next__
+
+    def close(self):
+        # mimic Gzip behavior and don't close underlying object
+        pass
+
+
 class Command(object):
     def __init__(self, args):
         self.args = args
@@ -57,9 +83,9 @@ class Command(object):
             self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
         elif self.args.log_file:
             if self.args.log_file.endswith('.gz'):
-                fh = gzip.open(self.args.log_file)
+                fh = UTF8Decode(gzip.open(self.args.log_file))
             else:
-                fh = open(self.args.log_file)
+                fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
             self.summer = summarizer.Summarizer(fh, **kwargs)
         else:
             self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
index 1314e9df35612817e260e6644212ef2e8a387bc3..6df440a14e37f87f8fcea5cac7c57ca1269915b4 100644 (file)
@@ -18,7 +18,7 @@ class DygraphsChart(crunchstat_summary.webchart.WebChart):
             'data': self._collate_data(tasks, stat),
             'options': {
                 'connectSeparatedPoints': True,
-                'labels': ['elapsed']+[uuid for uuid, _ in tasks.iteritems()],
+                'labels': ['elapsed']+[uuid for uuid, _ in tasks.items()],
                 'title': '{}: {} {}'.format(label, stat[0], stat[1]),
             },
         }
@@ -26,7 +26,7 @@ class DygraphsChart(crunchstat_summary.webchart.WebChart):
     def _collate_data(self, tasks, stat):
         data = []
         nulls = []
-        for uuid, task in tasks.iteritems():
+        for uuid, task in tasks.items():
             for pt in task.series[stat]:
                 data.append([pt[0].total_seconds()] + nulls + [pt[1]])
             nulls.append(None)
index 311c006c07d882a40ee5af8eaae651ba1e3c7145..8ccdbc2fcf04e45ca3ab3ec6e2270933d050ea1c 100644 (file)
@@ -2,11 +2,9 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-from __future__ import print_function
-
 import arvados
 import itertools
-import Queue
+import queue
 import threading
 
 from crunchstat_summary import logger
@@ -87,19 +85,21 @@ class LiveLogReader(object):
             self._queue.put(self.EOF)
 
     def __iter__(self):
-        self._queue = Queue.Queue()
+        self._queue = queue.Queue()
         self._thread = threading.Thread(target=self._get_all_pages)
         self._thread.daemon = True
         self._thread.start()
         return self
 
-    def next(self):
+    def __next__(self):
         line = self._queue.get()
         if line is self.EOF:
             self._thread.join()
             raise StopIteration
         return line
 
+    next = __next__ # for Python 2
+
     def __enter__(self):
         return self
 
index b2f6f1bb700b6d5d2a04f0212c699eb1ace15435..bf905a394606a4e9a1465979a575bf07e85e0657 100644 (file)
@@ -2,8 +2,6 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-from __future__ import print_function
-
 import arvados
 import collections
 import crunchstat_summary.dygraphs
@@ -24,7 +22,7 @@ from crunchstat_summary import logger
 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
 # that have amounts like 7.5 GiB according to the kernel.)
 AVAILABLE_RAM_RATIO = 0.95
-
+MB=2**20
 
 # Workaround datetime.datetime.strptime() thread-safety bug by calling
 # it once before starting threads.  https://bugs.python.org/issue7980
@@ -209,7 +207,7 @@ class Summarizer(object):
                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
                 if 'tx' in stats or 'rx' in stats:
                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
-                for stat, val in stats.iteritems():
+                for stat, val in stats.items():
                     if group == 'interval':
                         if stat == 'seconds':
                             this_interval_s = val
@@ -236,9 +234,9 @@ class Summarizer(object):
 
         self.job_tot = collections.defaultdict(
             functools.partial(collections.defaultdict, int))
-        for task_id, task_stat in self.task_stats.iteritems():
-            for category, stat_last in task_stat.iteritems():
-                for stat, val in stat_last.iteritems():
+        for task_id, task_stat in self.task_stats.items():
+            for category, stat_last in task_stat.items():
+                for stat, val in stat_last.items():
                     if stat in ['cpus', 'cache', 'swap', 'rss']:
                         # meaningless stats like 16 cpu cores x 5 tasks = 80
                         continue
@@ -273,8 +271,8 @@ class Summarizer(object):
 
     def _text_report_gen(self):
         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
-        for category, stat_max in sorted(self.stats_max.iteritems()):
-            for stat, val in sorted(stat_max.iteritems()):
+        for category, stat_max in sorted(self.stats_max.items()):
+            for stat, val in sorted(stat_max.items()):
                 if stat.endswith('__rate'):
                     continue
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
@@ -292,7 +290,7 @@ class Summarizer(object):
                  self.stats_max['cpu']['user+sys__rate'],
                  lambda x: x * 100),
                 ('Overall CPU usage: {}%',
-                 self.job_tot['cpu']['user+sys'] /
+                 float(self.job_tot['cpu']['user+sys']) /
                  self.job_tot['time']['elapsed']
                  if self.job_tot['time']['elapsed'] > 0 else 0,
                  lambda x: x * 100),
@@ -325,6 +323,7 @@ class Summarizer(object):
             yield "# "+format_string.format(self._format(val))
 
     def _recommend_gen(self):
+        # TODO recommend fixing job granularity if elapsed time is too short
         return itertools.chain(
             self._recommend_cpu(),
             self._recommend_ram(),
@@ -335,21 +334,27 @@ class Summarizer(object):
 
         constraint_key = self._map_runtime_constraint('vcpus')
         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
-        if cpu_max_rate == float('-Inf'):
+        if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
             logger.warning('%s: no CPU usage data', self.label)
             return
+        # TODO Don't necessarily want to recommend on isolated max peak
+        # take average CPU usage into account as well or % time at max
         used_cores = max(1, int(math.ceil(cpu_max_rate)))
         asked_cores = self.existing_constraints.get(constraint_key)
-        if asked_cores is None or used_cores < asked_cores:
+        if asked_cores is None:
+            asked_cores = 1
+        # TODO: This should be more nuanced in cases where max >> avg
+        if used_cores < asked_cores:
             yield (
                 '#!! {} max CPU usage was {}% -- '
-                'try runtime_constraints "{}":{}'
+                'try reducing runtime_constraints to "{}":{}'
             ).format(
                 self.label,
-                int(math.ceil(cpu_max_rate*100)),
+                math.ceil(cpu_max_rate*100),
                 constraint_key,
                 int(used_cores))
 
+    # FIXME: This needs to be updated to account for current nodemanager algorithms
     def _recommend_ram(self):
         """Recommend an economical RAM constraint for this job.
 
@@ -389,20 +394,20 @@ class Summarizer(object):
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
-        used_mib = math.ceil(float(used_bytes) / 1048576)
+        used_mib = math.ceil(float(used_bytes) / MB)
         asked_mib = self.existing_constraints.get(constraint_key)
 
         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
-        if asked_mib is None or (
-                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+        if used_mib > 0 and (asked_mib is None or (
+                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
             yield (
                 '#!! {} max RSS was {} MiB -- '
-                'try runtime_constraints "{}":{}'
+                'try reducing runtime_constraints to "{}":{}'
             ).format(
                 self.label,
                 int(used_mib),
                 constraint_key,
-                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
+                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
 
     def _recommend_keep_cache(self):
         """Recommend increasing keep cache if utilization < 80%"""
@@ -411,17 +416,18 @@ class Summarizer(object):
             return
         utilization = (float(self.job_tot['blkio:0:0']['read']) /
                        float(self.job_tot['net:keep0']['rx']))
-        asked_mib = self.existing_constraints.get(constraint_key, 256)
+        # FIXME: the default on this get won't work correctly
+        asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
 
         if utilization < 0.8:
             yield (
                 '#!! {} Keep cache utilization was {:.2f}% -- '
-                'try runtime_constraints "{}":{} (or more)'
+                'try doubling runtime_constraints to "{}":{} (or more)'
             ).format(
                 self.label,
                 utilization * 100.0,
                 constraint_key,
-                asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
+                math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
 
 
     def _format(self, val):
@@ -513,7 +519,7 @@ class ProcessSummarizer(Summarizer):
 
 
 class JobSummarizer(ProcessSummarizer):
-    runtime_constraint_mem_unit = 1048576
+    runtime_constraint_mem_unit = MB
     map_runtime_constraint = {
         'keep_cache_ram': 'keep_cache_mb_per_task',
         'ram': 'min_ram_mb_per_node',
@@ -539,7 +545,7 @@ class MultiSummarizer(object):
 
     def run(self):
         threads = []
-        for child in self.children.itervalues():
+        for child in self.children.values():
             self.throttle.acquire()
             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
             t.daemon = True
@@ -551,7 +557,7 @@ class MultiSummarizer(object):
     def text_report(self):
         txt = ''
         d = self._descendants()
-        for child in d.itervalues():
+        for child in d.values():
             if len(d) > 1:
                 txt += '### Summary for {} ({})\n'.format(
                     child.label, child.process['uuid'])
@@ -566,7 +572,7 @@ class MultiSummarizer(object):
         MultiSummarizers) are omitted.
         """
         d = collections.OrderedDict()
-        for key, child in self.children.iteritems():
+        for key, child in self.children.items():
             if isinstance(child, Summarizer):
                 d[key] = child
             if isinstance(child, MultiSummarizer):
@@ -574,7 +580,7 @@ class MultiSummarizer(object):
         return d
 
     def html_report(self):
-        return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
+        return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
 
 
 class JobTreeSummarizer(MultiSummarizer):
@@ -588,7 +594,7 @@ class JobTreeSummarizer(MultiSummarizer):
             preloaded = {}
             for j in arv.jobs().index(
                     limit=len(job['components']),
-                    filters=[['uuid','in',job['components'].values()]]).execute()['items']:
+                    filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
                 preloaded[j['uuid']] = j
             for cname in sorted(job['components'].keys()):
                 child_uuid = job['components'][cname]
@@ -605,7 +611,7 @@ class JobTreeSummarizer(MultiSummarizer):
 class PipelineSummarizer(MultiSummarizer):
     def __init__(self, instance, **kwargs):
         children = collections.OrderedDict()
-        for cname, component in instance['components'].iteritems():
+        for cname, component in instance['components'].items():
             if 'job' not in component:
                 logger.warning(
                     "%s: skipping component with no job assigned", cname)
@@ -663,7 +669,7 @@ class ContainerTreeSummarizer(MultiSummarizer):
                         cr['name'] = cr.get('name') or cr['uuid']
                         todo.append(cr)
         sorted_children = collections.OrderedDict()
-        for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
+        for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
             sorted_children[uuid] = children[uuid]
         super(ContainerTreeSummarizer, self).__init__(
             children=sorted_children,
index 9d18883ce2506d71abe03e08abde2fee28006343..cf0c1e67aa1ffdcf7853b2b1271bb2f03b16bae2 100644 (file)
@@ -2,7 +2,11 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-import cgi
+try:
+    from html import escape
+except ImportError:
+    from cgi import escape
+
 import json
 import pkg_resources
 
@@ -27,13 +31,13 @@ class WebChart(object):
         <script type="text/javascript">{}</script>
         {}
         </head><body></body></html>
-        '''.format(cgi.escape(self.label),
+        '''.format(escape(self.label),
                    self.JSLIB, self.js(), self.headHTML())
 
     def js(self):
         return 'var chartdata = {};\n{}'.format(
             json.dumps(self.sections()),
-            '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS]))
+            '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa).decode('utf-8') for jsa in self.JSASSETS]))
 
     def sections(self):
         return [
index 98194619495816bfa4a612710ad1506dcbcab114..0691e4f1ef4ea7e1604a0b7b73787f25b7dd7e58 100644 (file)
@@ -14,11 +14,9 @@ time elapsed 10      -       10
 # Number of tasks: 1
 # Max CPU time spent by a single task: 0s
 # Max CPU usage in a single interval: 0%
-# Overall CPU usage: 0%
+# Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! container max CPU usage was 0% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 0 MiB -- try runtime_constraints "ram":0
index b61da154ffbdfd720ce2f69d1ee76e01cf2b687a..c64c34c80ec6cd775e81031330070c047265a96d 100644 (file)
@@ -23,5 +23,4 @@ time  elapsed 20      -       20
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
index 9d3cd78d3f81f88239b1521c9a530b7898686e60..3075c24b951020d1444311bc083d269b581219b2 100644 (file)
@@ -34,5 +34,4 @@ time  elapsed 20      -       20
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
index f0a60957bba706b18b6d41288f5fc46ded754b1c..5e3ad152f7e0e48759312592344cdc936eb95f23 100644 (file)
@@ -31,5 +31,4 @@ time  elapsed 80      -       80
 # Max network speed in a single interval: 42.58MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
index f9a34cfb98c7c6d42a633166b946b7c49db77cc1..e260ca5bdeeed232ee61e094c17fe1ccfad5063f 100644 (file)
@@ -20,5 +20,4 @@ time  elapsed 2       -       4
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
index c54102d78a25d1158ba563a22de7279b3d39c2f2..ffe1072250123f2b05f67ddd62da2bf0881b35a1 100644 (file)
@@ -20,5 +20,4 @@ time  elapsed 2       -       3
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
index af92becd80a6875d64e1d406d2b21f8bfbd6ec57..7603ea488c37e4a74f7946d3e108116329b0bad2 100644 (file)
@@ -8,20 +8,25 @@ import crunchstat_summary.command
 import difflib
 import glob
 import gzip
+from io import open
 import mock
 import os
+import sys
 import unittest
 
+from crunchstat_summary.command import UTF8Decode
+
 TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
 
 
 class ReportDiff(unittest.TestCase):
     def diff_known_report(self, logfile, cmd):
         expectfile = logfile+'.report'
-        expect = open(expectfile).readlines()
+        with open(expectfile, encoding='utf-8') as f:
+            expect = f.readlines()
         self.diff_report(cmd, expect, expectfile=expectfile)
 
-    def diff_report(self, cmd, expect, expectfile=None):
+    def diff_report(self, cmd, expect, expectfile='(expected)'):
         got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
         self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
             expect, got, fromfile=expectfile, tofile="(generated)")))
@@ -49,12 +54,15 @@ class HTMLFromFile(ReportDiff):
                 ['--format=html', '--log-file', logfile])
             cmd = crunchstat_summary.command.Command(args)
             cmd.run()
-            self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
+            if sys.version_info >= (3,2):
+                self.assertRegex(cmd.report(), r'(?is)<html>.*</html>\s*$')
+            else:
+                self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
 
 
 class SummarizeEdgeCases(unittest.TestCase):
     def test_error_messages(self):
-        logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+        logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'), encoding='utf-8')
         s = crunchstat_summary.summarizer.Summarizer(logfile)
         s.run()
 
@@ -89,9 +97,9 @@ class SummarizeContainer(ReportDiff):
             'container.json', 'crunchstat.txt', 'arv-mount.txt']
         def _open(n):
             if n == "crunchstat.txt":
-                return gzip.open(self.logfile)
+                return UTF8Decode(gzip.open(self.logfile))
             elif n == "arv-mount.txt":
-                return gzip.open(self.arvmountlog)
+                return UTF8Decode(gzip.open(self.arvmountlog))
         mock_cr().open.side_effect = _open
         args = crunchstat_summary.command.ArgumentParser().parse_args(
             ['--job', self.fake_request['uuid']])
@@ -114,7 +122,7 @@ class SummarizeJob(ReportDiff):
     def test_job_report(self, mock_api, mock_cr):
         mock_api().jobs().get().execute.return_value = self.fake_job
         mock_cr().__iter__.return_value = ['fake-logfile.txt']
-        mock_cr().open.return_value = gzip.open(self.logfile)
+        mock_cr().open.return_value = UTF8Decode(gzip.open(self.logfile))
         args = crunchstat_summary.command.ArgumentParser().parse_args(
             ['--job', self.fake_job_uuid])
         cmd = crunchstat_summary.command.Command(args)
@@ -175,15 +183,14 @@ class SummarizePipeline(ReportDiff):
         mock_api().pipeline_instances().get().execute. \
             return_value = self.fake_instance
         mock_cr().__iter__.return_value = ['fake-logfile.txt']
-        mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+        mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
         args = crunchstat_summary.command.ArgumentParser().parse_args(
             ['--pipeline-instance', self.fake_instance['uuid']])
         cmd = crunchstat_summary.command.Command(args)
         cmd.run()
 
-        job_report = [
-            line for line in open(logfile+'.report').readlines()
-            if not line.startswith('#!! ')]
+        with open(logfile+'.report', encoding='utf-8') as f:
+            job_report = [line for line in f if not line.startswith('#!! ')]
         expect = (
             ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
             job_report + ['\n'] +
@@ -251,15 +258,14 @@ class SummarizeACRJob(ReportDiff):
         mock_api().jobs().index().execute.return_value = self.fake_jobs_index
         mock_api().jobs().get().execute.return_value = self.fake_job
         mock_cr().__iter__.return_value = ['fake-logfile.txt']
-        mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+        mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
         args = crunchstat_summary.command.ArgumentParser().parse_args(
             ['--job', self.fake_job['uuid']])
         cmd = crunchstat_summary.command.Command(args)
         cmd.run()
 
-        job_report = [
-            line for line in open(logfile+'.report').readlines()
-            if not line.startswith('#!! ')]
+        with open(logfile+'.report', encoding='utf-8') as f:
+            job_report = [line for line in f if not line.startswith('#!! ')]
         expect = (
             ['### Summary for zzzzz-8i9sb-i3e77t9z5y8j9cc (partial) (zzzzz-8i9sb-i3e77t9z5y8j9cc)\n',
              '(no report generated)\n',