Use default of 1 if no cores requested. refs #12026
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
index ea30d0c5493ee26216b29127be375d818a9a1599..96da98cdd9561cffd92f30d041ee78da60a5b153 100644 (file)
@@ -2,8 +2,6 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-from __future__ import print_function
-
 import arvados
 import collections
 import crunchstat_summary.dygraphs
@@ -24,7 +22,7 @@ from crunchstat_summary import logger
 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
 # that have amounts like 7.5 GiB according to the kernel.)
 AVAILABLE_RAM_RATIO = 0.95
-
+MB=2**20
 
 # Workaround datetime.datetime.strptime() thread-safety bug by calling
 # it once before starting threads.  https://bugs.python.org/issue7980
@@ -37,6 +35,7 @@ WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
 class Task(object):
     def __init__(self):
         self.starttime = None
+        self.finishtime = None
         self.series = collections.defaultdict(list)
 
 
@@ -116,13 +115,13 @@ class Summarizer(object):
                     continue
 
                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
-                m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+                m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
                 if not m:
                     continue
             else:
                 # crunch2
                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
-                m = re.search(r'^(?P<timestamp>\S+) (crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+                m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
                 if not m:
                     continue
 
@@ -160,15 +159,24 @@ class Summarizer(object):
                 raise ValueError("Cannot parse timestamp {!r}".format(
                     timestamp))
 
-            if not task.starttime:
-                task.starttime = timestamp
+            if task.starttime is None:
                 logger.debug('%s: task %s starttime %s',
                              self.label, task_id, timestamp)
-            task.finishtime = timestamp
+            if task.starttime is None or timestamp < task.starttime:
+                task.starttime = timestamp
+            if task.finishtime is None or timestamp > task.finishtime:
+                task.finishtime = timestamp
 
-            if not self.starttime:
+            if self.starttime is None or timestamp < task.starttime:
                 self.starttime = timestamp
-            self.finishtime = timestamp
+            if self.finishtime is None or timestamp < task.finishtime:
+                self.finishtime = timestamp
+
+            if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
+                elapsed = (task.finishtime - task.starttime).seconds
+                self.task_stats[task_id]['time'] = {'elapsed': elapsed}
+                if elapsed > self.stats_max['time']['elapsed']:
+                    self.stats_max['time']['elapsed'] = elapsed
 
             this_interval_s = None
             for group in ['current', 'interval']:
@@ -184,16 +192,22 @@ class Summarizer(object):
                         else:
                             stats[stat] = int(val)
                 except ValueError as e:
-                    logger.warning(
-                        'Error parsing value %r (stat %r, category %r): %r',
-                        val, stat, category, e)
-                    logger.warning('%s', line)
+                    # If the line doesn't start with 'crunchstat:' we
+                    # might have mistaken an error message for a
+                    # structured crunchstat line.
+                    if m.group("crunchstat") is None or m.group("category") == "crunchstat":
+                        logger.warning("%s: log contains message\n  %s", self.label, line)
+                    else:
+                        logger.warning(
+                            '%s: Error parsing value %r (stat %r, category %r): %r',
+                            self.label, val, stat, category, e)
+                        logger.warning('%s', line)
                     continue
                 if 'user' in stats or 'sys' in stats:
                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
                 if 'tx' in stats or 'rx' in stats:
                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
-                for stat, val in stats.iteritems():
+                for stat, val in stats.items():
                     if group == 'interval':
                         if stat == 'seconds':
                             this_interval_s = val
@@ -220,9 +234,9 @@ class Summarizer(object):
 
         self.job_tot = collections.defaultdict(
             functools.partial(collections.defaultdict, int))
-        for task_id, task_stat in self.task_stats.iteritems():
-            for category, stat_last in task_stat.iteritems():
-                for stat, val in stat_last.iteritems():
+        for task_id, task_stat in self.task_stats.items():
+            for category, stat_last in task_stat.items():
+                for stat, val in stat_last.items():
                     if stat in ['cpus', 'cache', 'swap', 'rss']:
                         # meaningless stats like 16 cpu cores x 5 tasks = 80
                         continue
@@ -257,8 +271,8 @@ class Summarizer(object):
 
     def _text_report_gen(self):
         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
-        for category, stat_max in sorted(self.stats_max.iteritems()):
-            for stat, val in sorted(stat_max.iteritems()):
+        for category, stat_max in sorted(self.stats_max.items()):
+            for stat, val in sorted(stat_max.items()):
                 if stat.endswith('__rate'):
                     continue
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
@@ -276,7 +290,7 @@ class Summarizer(object):
                  self.stats_max['cpu']['user+sys__rate'],
                  lambda x: x * 100),
                 ('Overall CPU usage: {}%',
-                 self.job_tot['cpu']['user+sys'] /
+                 float(self.job_tot['cpu']['user+sys']) /
                  self.job_tot['time']['elapsed']
                  if self.job_tot['time']['elapsed'] > 0 else 0,
                  lambda x: x * 100),
@@ -309,6 +323,7 @@ class Summarizer(object):
             yield "# "+format_string.format(self._format(val))
 
     def _recommend_gen(self):
+        # TODO recommend fixing job granularity if elapsed time is too short
         return itertools.chain(
             self._recommend_cpu(),
             self._recommend_ram(),
@@ -319,18 +334,22 @@ class Summarizer(object):
 
         constraint_key = self._map_runtime_constraint('vcpus')
         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
-        if cpu_max_rate == float('-Inf'):
+        if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
             logger.warning('%s: no CPU usage data', self.label)
             return
+        # TODO Don't necessarily want to recommend on isolated max peak
+        # take average CPU usage into account as well or % time at max
         used_cores = max(1, int(math.ceil(cpu_max_rate)))
         asked_cores = self.existing_constraints.get(constraint_key)
-        if asked_cores is None or used_cores < asked_cores:
+        if asked_cores is None:
+            asked_cores = 1
+        if used_cores < asked_cores:
             yield (
                 '#!! {} max CPU usage was {}% -- '
                 'try runtime_constraints "{}":{}'
             ).format(
                 self.label,
-                int(math.ceil(cpu_max_rate*100)),
+                math.ceil(cpu_max_rate*100),
                 constraint_key,
                 int(used_cores))
 
@@ -373,12 +392,12 @@ class Summarizer(object):
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
-        used_mib = math.ceil(float(used_bytes) / 1048576)
+        used_mib = math.ceil(float(used_bytes) / MB)
         asked_mib = self.existing_constraints.get(constraint_key)
 
         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
-        if asked_mib is None or (
-                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+        if used_mib > 0 and (asked_mib is None or (
+                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
             yield (
                 '#!! {} max RSS was {} MiB -- '
                 'try runtime_constraints "{}":{}'
@@ -386,7 +405,7 @@ class Summarizer(object):
                 self.label,
                 int(used_mib),
                 constraint_key,
-                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
+                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
 
     def _recommend_keep_cache(self):
         """Recommend increasing keep cache if utilization < 80%"""
@@ -395,7 +414,8 @@ class Summarizer(object):
             return
         utilization = (float(self.job_tot['blkio:0:0']['read']) /
                        float(self.job_tot['net:keep0']['rx']))
-        asked_mib = self.existing_constraints.get(constraint_key, 256)
+        # FIXME: the default on this get won't work correctly
+        asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
 
         if utilization < 0.8:
             yield (
@@ -405,7 +425,7 @@ class Summarizer(object):
                 self.label,
                 utilization * 100.0,
                 constraint_key,
-                asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
+                math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
 
 
     def _format(self, val):
@@ -497,7 +517,7 @@ class ProcessSummarizer(Summarizer):
 
 
 class JobSummarizer(ProcessSummarizer):
-    runtime_constraint_mem_unit = 1048576
+    runtime_constraint_mem_unit = MB
     map_runtime_constraint = {
         'keep_cache_ram': 'keep_cache_mb_per_task',
         'ram': 'min_ram_mb_per_node',
@@ -523,7 +543,7 @@ class MultiSummarizer(object):
 
     def run(self):
         threads = []
-        for child in self.children.itervalues():
+        for child in self.children.values():
             self.throttle.acquire()
             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
             t.daemon = True
@@ -535,7 +555,7 @@ class MultiSummarizer(object):
     def text_report(self):
         txt = ''
         d = self._descendants()
-        for child in d.itervalues():
+        for child in d.values():
             if len(d) > 1:
                 txt += '### Summary for {} ({})\n'.format(
                     child.label, child.process['uuid'])
@@ -550,7 +570,7 @@ class MultiSummarizer(object):
         MultiSummarizers) are omitted.
         """
         d = collections.OrderedDict()
-        for key, child in self.children.iteritems():
+        for key, child in self.children.items():
             if isinstance(child, Summarizer):
                 d[key] = child
             if isinstance(child, MultiSummarizer):
@@ -558,7 +578,7 @@ class MultiSummarizer(object):
         return d
 
     def html_report(self):
-        return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
+        return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
 
 
 class JobTreeSummarizer(MultiSummarizer):
@@ -572,7 +592,7 @@ class JobTreeSummarizer(MultiSummarizer):
             preloaded = {}
             for j in arv.jobs().index(
                     limit=len(job['components']),
-                    filters=[['uuid','in',job['components'].values()]]).execute()['items']:
+                    filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
                 preloaded[j['uuid']] = j
             for cname in sorted(job['components'].keys()):
                 child_uuid = job['components'][cname]
@@ -589,7 +609,7 @@ class JobTreeSummarizer(MultiSummarizer):
 class PipelineSummarizer(MultiSummarizer):
     def __init__(self, instance, **kwargs):
         children = collections.OrderedDict()
-        for cname, component in instance['components'].iteritems():
+        for cname, component in instance['components'].items():
             if 'job' not in component:
                 logger.warning(
                     "%s: skipping component with no job assigned", cname)
@@ -647,7 +667,7 @@ class ContainerTreeSummarizer(MultiSummarizer):
                         cr['name'] = cr.get('name') or cr['uuid']
                         todo.append(cr)
         sorted_children = collections.OrderedDict()
-        for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
+        for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
             sorted_children[uuid] = children[uuid]
         super(ContainerTreeSummarizer, self).__init__(
             children=sorted_children,