8341: Include Keep network activity in net stats.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
index f648e9b6b65aa47a24ca19a1f1d4f1a30d16a4a7..cf748ff7039360a5b3dce73833e74365a04df4d6 100644 (file)
@@ -3,6 +3,7 @@ from __future__ import print_function
 import arvados
 import collections
 import crunchstat_summary.chartjs
+import crunchstat_summary.reader
 import datetime
 import functools
 import itertools
@@ -26,20 +27,17 @@ class Task(object):
 
 
 class Summarizer(object):
-    existing_constraints = {}
-
-    def __init__(self, logdata, label=None, include_child_jobs=True):
+    def __init__(self, logdata, label=None, skip_child_jobs=False):
         self._logdata = logdata
 
         self.label = label
         self.starttime = None
         self.finishtime = None
-        self._include_child_jobs = include_child_jobs
+        self._skip_child_jobs = skip_child_jobs
 
         # stats_max: {category: {stat: val}}
         self.stats_max = collections.defaultdict(
-            functools.partial(collections.defaultdict,
-                              lambda: float('-Inf')))
+            functools.partial(collections.defaultdict, lambda: 0))
         # task_stats: {task_id: {category: {stat: val}}}
         self.task_stats = collections.defaultdict(
             functools.partial(collections.defaultdict, dict))
@@ -47,10 +45,16 @@ class Summarizer(object):
         self.seq_to_uuid = {}
         self.tasks = collections.defaultdict(Task)
 
-        logger.debug("%s: logdata %s", self.label, repr(logdata))
+        # We won't bother recommending new runtime constraints if the
+        # constraints given when running the job are known to us and
+        # are already suitable.  If applicable, the subclass
+        # constructor will overwrite this with something useful.
+        self.existing_constraints = {}
+
+        logger.debug("%s: logdata %s", self.label, logdata)
 
     def run(self):
-        logger.debug("%s: parsing log data", self.label)
+        logger.debug("%s: parsing logdata %s", self.label, self._logdata)
         for line in self._logdata:
             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
             if m:
@@ -72,8 +76,9 @@ class Summarizer(object):
             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
             if m:
                 uuid = m.group('uuid')
-                if not self._include_child_jobs:
-                    logger.warning('%s: omitting %s (try --include-child-job)',
+                if self._skip_child_jobs:
+                    logger.warning('%s: omitting stats from child job %s'
+                                   ' because --skip-child-jobs flag is on',
                                    self.label, uuid)
                     continue
                 logger.debug('%s: follow %s', self.label, uuid)
@@ -81,11 +86,12 @@ class Summarizer(object):
                 child_summarizer.stats_max = self.stats_max
                 child_summarizer.task_stats = self.task_stats
                 child_summarizer.tasks = self.tasks
+                child_summarizer.starttime = self.starttime
                 child_summarizer.run()
                 logger.debug('%s: done %s', self.label, uuid)
                 continue
 
-            m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
             if not m:
                 continue
 
@@ -95,7 +101,7 @@ class Summarizer(object):
             if m.group('category').endswith(':'):
                 # "stderr crunchstat: notice: ..."
                 continue
-            elif m.group('category') == 'error':
+            elif m.group('category') in ('error', 'caught'):
                 continue
             elif m.group('category') == 'read':
                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
@@ -154,11 +160,11 @@ class Summarizer(object):
                             val = val / this_interval_s
                             if stat in ['user+sys__rate', 'tx+rx__rate']:
                                 task.series[category, stat].append(
-                                    (timestamp - task.starttime, val))
+                                    (timestamp - self.starttime, val))
                     else:
                         if stat in ['rss']:
                             task.series[category, stat].append(
-                                (timestamp - task.starttime, val))
+                                (timestamp - self.starttime, val))
                         self.task_stats[task_id][category][stat] = val
                     if val > self.stats_max[category][stat]:
                         self.stats_max[category][stat] = val
@@ -225,10 +231,12 @@ class Summarizer(object):
                  self.stats_max['mem']['rss'],
                  lambda x: x / 1e9),
                 ('Max network traffic in a single task: {}GB',
-                 self.stats_max['net:eth0']['tx+rx'],
+                 self.stats_max['net:eth0']['tx+rx'] +
+                 self.stats_max['net:keep0']['tx+rx'],
                  lambda x: x / 1e9),
                 ('Max network speed in a single interval: {}MB/s',
-                 self.stats_max['net:eth0']['tx+rx__rate'],
+                 self.stats_max['net:eth0']['tx+rx__rate'] +
+                 self.stats_max['net:keep0']['tx+rx__rate'],
                  lambda x: x / 1e6)):
             format_string, val, transform = args
             if val == float('-Inf'):
@@ -250,7 +258,7 @@ class Summarizer(object):
             logger.warning('%s: no CPU usage data', self.label)
             return
         used_cores = int(math.ceil(cpu_max_rate))
-        asked_cores =  self.existing_constraints.get('min_cores_per_node')
+        asked_cores = self.existing_constraints.get('min_cores_per_node')
         if asked_cores is None or used_cores < asked_cores:
             yield (
                 '#!! {} max CPU usage was {}% -- '
@@ -261,24 +269,56 @@ class Summarizer(object):
                 int(used_cores))
 
     def _recommend_ram(self):
-        """Recommend asking for (2048*0.95) MiB RAM if max rss was 1248 MiB"""
-
-        used_ram = self.stats_max['mem']['rss']
-        if used_ram == float('-Inf'):
+        """Recommend an economical RAM constraint for this job.
+
+        Nodes that are advertised as "8 gibibytes" actually have what
+        we might call "8 nearlygibs" of memory available for jobs.
+        Here, we calculate a whole number of nearlygibs that would
+        have sufficed to run the job, then recommend requesting a node
+        with that number of nearlygibs (expressed as mebibytes).
+
+        Requesting a node with "nearly 8 gibibytes" is our best hope
+        of getting a node that actually has nearly 8 gibibytes
+        available.  If the node manager is smart enough to account for
+        the discrepancy itself when choosing/creating a node, we'll
+        get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
+        advertised size of the next-size-smaller node (say, 6 GiB)
+        will be too low to satisfy our request, so we will effectively
+        get rounded up to 8 GiB.
+
+        For example, if we need 7500 MiB, we can ask for 7500 MiB, and
+        we will generally get a node that is advertised as "8 GiB" and
+        has at least 7500 MiB available.  However, asking for 8192 MiB
+        would either result in an unnecessarily expensive 12 GiB node
+        (if node manager knows about the discrepancy), or an 8 GiB
+        node which has less than 8192 MiB available and is therefore
+        considered by crunch-dispatch to be too small to meet our
+        constraint.
+
+        When node manager learns how to predict the available memory
+        for each node type such that crunch-dispatch always agrees
+        that a node is big enough to run the job it was brought up
+        for, all this will be unnecessary.  We'll just ask for exactly
+        the memory we want -- even if that happens to be 8192 MiB.
+        """
+
+        used_bytes = self.stats_max['mem']['rss']
+        if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
-        used_ram = math.ceil(float(used_ram) / (1<<20))
-        asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
-        if asked_ram is None or (
-                math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10)) <
-                (asked_ram/AVAILABLE_RAM_RATIO)/(1<<10)):
+        used_mib = math.ceil(float(used_bytes) / 1048576)
+        asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+
+        nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
+        if asked_mib is None or (
+                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
             yield (
                 '#!! {} max RSS was {} MiB -- '
                 'try runtime_constraints "min_ram_mb_per_node":{}'
             ).format(
                 self.label,
-                int(used_ram),
-                int(math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10))*(1<<10)*AVAILABLE_RAM_RATIO))
+                int(used_mib),
+                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
 
     def _format(self, val):
         """Return a string representation of a stat.
@@ -292,36 +332,30 @@ class Summarizer(object):
 
 class CollectionSummarizer(Summarizer):
     def __init__(self, collection_id, **kwargs):
-        logger.debug('load collection %s', collection_id)
-        collection = arvados.collection.CollectionReader(collection_id)
-        filenames = [filename for filename in collection]
-        if len(filenames) != 1:
-            raise ValueError(
-                "collection {} has {} files; need exactly one".format(
-                    collection_id, len(filenames)))
         super(CollectionSummarizer, self).__init__(
-            collection.open(filenames[0]), **kwargs)
+            crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
         self.label = collection_id
 
 
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
     def __init__(self, job, **kwargs):
         arv = arvados.api('v1')
-        if isinstance(job, str):
+        if isinstance(job, basestring):
             self.job = arv.jobs().get(uuid=job).execute()
         else:
             self.job = job
-        self.label = self.job['uuid']
+        if self.job['log']:
+            rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+            label = self.job['uuid']
+        else:
+            rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
+            label = self.job['uuid'] + ' (partial)'
+        super(JobSummarizer, self).__init__(rdr, **kwargs)
+        self.label = label
         self.existing_constraints = self.job.get('runtime_constraints', {})
-        if not self.job['log']:
-            raise ValueError(
-                "job {} has no log; live summary not implemented".format(
-                    self.job['uuid']))
-        super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
-        self.label = self.job['uuid']
 
 
-class PipelineSummarizer():
+class PipelineSummarizer(object):
     def __init__(self, pipeline_instance_uuid, **kwargs):
         arv = arvados.api('v1', model=OrderedJsonModel())
         instance = arv.pipeline_instances().get(