8123: Recommend more economical values for resource_constraints.
authorTom Clegg <tom@curoverse.com>
Mon, 21 Dec 2015 21:53:37 +0000 (16:53 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 12 Jan 2016 15:47:20 +0000 (10:47 -0500)
tools/crunchstat-summary/bin/crunchstat-summary
tools/crunchstat-summary/crunchstat_summary/summarizer.py
tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report
tools/crunchstat-summary/tests/test_examples.py

index f42d30b3c72035471b60f65334ba7a72ce23c00f..c32b50e3a8204203a17023daceb3c96c72c984c2 100755 (executable)
@@ -4,8 +4,11 @@ from __future__ import print_function
 
 import crunchstat_summary.command
 import crunchstat_summary.summarizer
+import logging
 import sys
 
+logging.getLogger().addHandler(logging.StreamHandler())
+
 args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
 s = crunchstat_summary.command.Command(args).summarizer()
 s.run()
index 1b7f95075e51ba3614b1dc235d87c34a1973177c..49b67ffa1f4c4cf4fccd33bec0e74c4b4e58ffef 100644 (file)
@@ -3,13 +3,21 @@ from __future__ import print_function
 import arvados
 import collections
 import functools
+import itertools
+import logging
+import math
 import re
 import sys
 
+logger = logging.getLogger(__name__)
+logger.addHandler(logging.NullHandler())
 
 class Summarizer(object):
-    def __init__(self, logdata):
+    existing_constraints = {}
+
+    def __init__(self, logdata, label='job'):
         self._logdata = logdata
+        self.label = label
 
     def run(self):
         # stats_max: {category: {stat: val}}
@@ -64,9 +72,9 @@ class Summarizer(object):
                             this_interval_s = val
                             continue
                         elif not (this_interval_s > 0):
-                            print("BUG? interval stat given with duration {!r}".
-                                  format(this_interval_s),
-                                  file=sys.stderr)
+                            logger.error(
+                                "BUG? interval stat given with duration {!r}".
+                                format(this_interval_s))
                             continue
                         else:
                             stat = stat + '__rate'
@@ -75,12 +83,7 @@ class Summarizer(object):
                         self.task_stats[task_id][category][stat] = val
                     if val > self.stats_max[category][stat]:
                         self.stats_max[category][stat] = val
-
-    def report(self):
-        return "\n".join(self._report_gen()) + "\n"
-
-    def _report_gen(self):
-        job_tot = collections.defaultdict(
+        self.job_tot = collections.defaultdict(
             functools.partial(collections.defaultdict, int))
         for task_id, task_stat in self.task_stats.iteritems():
             for category, stat_last in task_stat.iteritems():
@@ -88,7 +91,14 @@ class Summarizer(object):
                     if stat in ['cpus', 'cache', 'swap', 'rss']:
                         # meaningless stats like 16 cpu cores x 5 tasks = 80
                         continue
-                    job_tot[category][stat] += val
+                    self.job_tot[category][stat] += val
+
+    def report(self):
+        return "\n".join(itertools.chain(
+            self._report_gen(),
+            self._recommend_gen())) + "\n"
+
+    def _report_gen(self):
         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
         for category, stat_max in sorted(self.stats_max.iteritems()):
             for stat, val in sorted(stat_max.iteritems()):
@@ -96,7 +106,7 @@ class Summarizer(object):
                     continue
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
                 val = self._format(val)
-                tot = self._format(job_tot[category].get(stat, '-'))
+                tot = self._format(self.job_tot[category].get(stat, '-'))
                 yield "\t".join([category, stat, str(val), max_rate, tot])
         for args in (
                 ('Max CPU time spent by a single task: {}s',
@@ -106,7 +116,8 @@ class Summarizer(object):
                  self.stats_max['cpu']['user+sys__rate'],
                  lambda x: x * 100),
                 ('Overall CPU usage: {}%',
-                 job_tot['cpu']['user+sys'] / job_tot['time']['elapsed'],
+                 self.job_tot['cpu']['user+sys'] /
+                 self.job_tot['time']['elapsed'],
                  lambda x: x * 100),
                 ('Max memory used by a single task: {}GB',
                  self.stats_max['mem']['rss'],
@@ -124,6 +135,47 @@ class Summarizer(object):
                 val = transform(val)
             yield "# "+format_string.format(self._format(val))
 
+    def _recommend_gen(self):
+        return itertools.chain(
+            self._recommend_cpu(),
+            self._recommend_ram())
+
+    def _recommend_cpu(self):
+        """Recommend asking for 4 cores if max CPU usage was 333%"""
+
+        cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
+        if cpu_max_rate == float('-Inf'):
+            logger.warning('%s: no CPU usage data', self.label)
+            return
+        used_cores = int(math.ceil(cpu_max_rate))
+        asked_cores =  self.existing_constraints.get('min_cores_per_node')
+        if asked_cores is None or used_cores < asked_cores:
+            yield (
+                '#!! {} max CPU usage was {}% -- '
+                'try runtime_constraints "min_cores_per_node":{}'
+            ).format(
+                self.label,
+                int(math.ceil(cpu_max_rate*100)),
+                int(used_cores))
+
+    def _recommend_ram(self):
+        """Recommend asking for 2048 MiB RAM if max rss was 1248 MiB"""
+
+        used_ram = self.stats_max['mem']['rss']
+        if used_ram == float('-Inf'):
+            logger.warning('%s: no memory usage data', self.label)
+            return
+        used_ram = math.ceil(float(used_ram) / (1<<20))
+        asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
+        if asked_ram is None or math.ceil(used_ram/(1<<10)) < asked_ram/(1<<10):
+            yield (
+                '#!! {} never used more than {} MiB RAM -- '
+                'try runtime_constraints "min_ram_mb_per_node":{}'
+            ).format(
+                self.label,
+                int(used_ram),
+                int(math.ceil(used_ram/(1<<10))*(1<<10)))
+
     def _format(self, val):
         """Return a string representation of a stat.
 
@@ -133,6 +185,7 @@ class Summarizer(object):
         else:
             return '{}'.format(val)
 
+
 class CollectionSummarizer(Summarizer):
     def __init__(self, collection_id):
         collection = arvados.collection.CollectionReader(collection_id)
@@ -141,7 +194,9 @@ class CollectionSummarizer(Summarizer):
             raise ValueError(
                 "collection {} has {} files; need exactly one".format(
                     collection_id, len(filenames)))
-        super(CollectionSummarizer, self).__init__(collection.open(filenames[0]))
+        super(CollectionSummarizer, self).__init__(
+            collection.open(filenames[0]))
+
 
 class JobSummarizer(CollectionSummarizer):
     def __init__(self, job):
@@ -150,12 +205,15 @@ class JobSummarizer(CollectionSummarizer):
             self.job = arv.jobs().get(uuid=job).execute()
         else:
             self.job = job
+        self.label = self.job['uuid']
+        self.existing_constraints = self.job.get('runtime_constraints', {})
         if not self.job['log']:
             raise ValueError(
                 "job {} has no log; live summary not implemented".format(
                     self.job['uuid']))
         super(JobSummarizer, self).__init__(self.job['log'])
 
+
 class PipelineSummarizer():
     def __init__(self, pipeline_instance_uuid):
         arv = arvados.api('v1')
@@ -164,16 +222,17 @@ class PipelineSummarizer():
         self.summarizers = collections.OrderedDict()
         for cname, component in instance['components'].iteritems():
             if 'job' not in component:
-                print("{}: skipping component with no job assigned".format(
-                    cname), file=sys.stderr)
+                logger.warning(
+                    "%s: skipping component with no job assigned", cname)
             elif component['job'].get('log') is None:
-                print("{}: skipping component with no log available".format(
-                    cname), file=sys.stderr)
+                logger.warning(
+                    "%s: skipping job %s with no log available",
+                    cname, component['job'].get('uuid'))
             else:
-                print("{}: reading log from {}".format(
-                    cname, component['job']['log']), file=sys.stderr)
-                summarizer = CollectionSummarizer(component['job']['log'])
-                summarizer.job_uuid = component['job']['uuid']
+                logger.debug(
+                    "%s: reading log from %s", cname, component['job']['log'])
+                summarizer = JobSummarizer(component['job'])
+                summarizer.label = cname
                 self.summarizers[cname] = summarizer
 
     def run(self):
@@ -184,7 +243,7 @@ class PipelineSummarizer():
         txt = ''
         for cname, summarizer in self.summarizers.iteritems():
             txt += '### Summary for {} ({})\n'.format(
-                cname, summarizer.job_uuid)
+                cname, summarizer.job['uuid'])
             txt += summarizer.report()
             txt += '\n'
         return txt
index ef7beb11c0057009f8465aa2836349ca32235f5a..c94cd24d8a102b2847fe70166d7cfab4d5d00aee 100644 (file)
@@ -28,3 +28,5 @@ time  elapsed 80      -       80
 # Max memory used by a single task: 0.35GB
 # Max network traffic in a single task: 1.79GB
 # Max network speed in a single interval: 42.58MB/s
+#!! job max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
+#!! job never used more than 334 MiB RAM -- try runtime_constraints "min_ram_mb_per_node":1024
index 38af3e7e8c7752eb07532f5a2f4fd54a616eab4e..e71182449374c7f90bc5a6a66166794ba42d26ac 100644 (file)
@@ -15,3 +15,4 @@ time  elapsed 2       -       4
 # Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
+#!! job never used more than 1 MiB RAM -- try runtime_constraints "min_ram_mb_per_node":1024
index 7e42d612b753fadb25e4bac7cee7e941e01231e3..5772cb44504a5317c29075315d53735734efd953 100644 (file)
@@ -15,3 +15,4 @@ time  elapsed 2       -       3
 # Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
+#!! job never used more than 1 MiB RAM -- try runtime_constraints "min_ram_mb_per_node":1024
index 1bce69301fa975c143138c6f2dbee6b0d06a3a59..a19d7ad0a4fdfa0465a2ee614a9f0d39bb6ee0b5 100644 (file)
@@ -24,7 +24,7 @@ class ReportDiff(unittest.TestCase):
             expect, got, fromfile=expectfile, tofile="(generated)")))
 
 
-class ExampleLogsTestCase(ReportDiff):
+class SummarizeFile(ReportDiff):
     def test_example_files(self):
         for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
             logfile = os.path.join(TESTS_DIR, fnm)
@@ -35,22 +35,28 @@ class ExampleLogsTestCase(ReportDiff):
             self.diff_known_report(logfile, summarizer)
 
 
-class LookupJobUUID(ReportDiff):
-    fake_uuid = 'zzzzz-8i9sb-jq0ekny1xou3zoh'
+class SummarizeJob(ReportDiff):
+    fake_job_uuid = 'zzzzz-8i9sb-jjjjjjjjjjjjjjj'
+    fake_log_id = 'fake-log-collection-id'
+    fake_job = {
+        'uuid': fake_job_uuid,
+        'log': fake_log_id,
+    }
+    logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
 
     @mock.patch('arvados.collection.CollectionReader')
     @mock.patch('arvados.api')
-    def test_job_uuid(self, mock_api, mock_cr):
-        logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
-        mock_api().jobs().get().execute.return_value = {'log': 'fake-uuid'}
+    def test_job_report(self, mock_api, mock_cr):
+        mock_api().jobs().get().execute.return_value = self.fake_job
         mock_cr().__iter__.return_value = ['fake-logfile.txt']
-        mock_cr().open.return_value = gzip.open(logfile)
+        mock_cr().open.return_value = gzip.open(self.logfile)
         args = crunchstat_summary.command.ArgumentParser().parse_args(
-            ['--job', self.fake_uuid])
+            ['--job', self.fake_job_uuid])
         summarizer = crunchstat_summary.command.Command(args).summarizer()
         summarizer.run()
-        self.diff_known_report(logfile, summarizer)
-        mock_api().jobs().get.assert_called_with(uuid=self.fake_uuid)
+        self.diff_known_report(self.logfile, summarizer)
+        mock_api().jobs().get.assert_called_with(uuid=self.fake_job_uuid)
+        mock_cr.assert_called_with(self.fake_log_id)
         mock_cr().open.assert_called_with('fake-logfile.txt')
 
 
@@ -63,12 +69,20 @@ class SummarizePipeline(ReportDiff):
                 'job': {
                     'uuid': 'zzzzz-8i9sb-000000000000000',
                     'log': 'fake-log-pdh-0',
+                    'runtime_constraints': {
+                        'min_ram_mb_per_node': 1024,
+                        'min_cores_per_node': 1,
+                    },
                 },
             }],
             ['bar', {
                 'job': {
                     'uuid': 'zzzzz-8i9sb-000000000000001',
                     'log': 'fake-log-pdh-1',
+                    'runtime_constraints': {
+                        'min_ram_mb_per_node': 1024,
+                        'min_cores_per_node': 1,
+                    },
                 },
             }],
             ['no-job-assigned', {}],
@@ -81,6 +95,10 @@ class SummarizePipeline(ReportDiff):
                 'job': {
                     'uuid': 'zzzzz-8i9sb-000000000000002',
                     'log': 'fake-log-pdh-2',
+                    'runtime_constraints': {
+                        'min_ram_mb_per_node': 1024,
+                        'min_cores_per_node': 1,
+                    },
                 },
             }]]),
     }
@@ -98,13 +116,16 @@ class SummarizePipeline(ReportDiff):
         summarizer = crunchstat_summary.command.Command(args).summarizer()
         summarizer.run()
 
+        job_report = [
+            line for line in open(logfile+'.report').readlines()
+            if not line.startswith('#!! ')]
         expect = (
             ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
-            open(logfile+'.report').readlines() + ['\n'] +
+            job_report + ['\n'] +
             ['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
-            open(logfile+'.report').readlines() + ['\n'] +
+            job_report + ['\n'] +
             ['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
-            open(logfile+'.report').readlines())
+            job_report)
         self.diff_report(summarizer, expect)
         mock_cr.assert_has_calls(
             [