Arvados-DCO-1.1-Signed-off-by: Tom Morris <tfmorris@veritasgenetics.com>
import argparse
import gzip
import argparse
import gzip
import logging
import sys
import logging
import sys
help='Log more information (once for progress, twice for debug)')
help='Log more information (once for progress, twice for debug)')
+class UTF8Decode(object):
+ '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
+ '''
+ def __init__(self, fh):
+ self.fh = fh
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return next(self.fh).decode('utf-8')
+
+ next = __next__
+
+ def close(self):
+ # mimic Gzip behavior and don't close underlying object
+ pass
+
+
class Command(object):
def __init__(self, args):
self.args = args
class Command(object):
def __init__(self, args):
self.args = args
self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
elif self.args.log_file:
if self.args.log_file.endswith('.gz'):
self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
elif self.args.log_file:
if self.args.log_file.endswith('.gz'):
- fh = gzip.open(self.args.log_file)
+ fh = UTF8Decode(gzip.open(self.args.log_file))
- fh = open(self.args.log_file)
+ fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
self.summer = summarizer.Summarizer(fh, **kwargs)
else:
self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
self.summer = summarizer.Summarizer(fh, **kwargs)
else:
self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
'data': self._collate_data(tasks, stat),
'options': {
'connectSeparatedPoints': True,
'data': self._collate_data(tasks, stat),
'options': {
'connectSeparatedPoints': True,
- 'labels': ['elapsed']+[uuid for uuid, _ in tasks.iteritems()],
+ 'labels': ['elapsed']+[uuid for uuid, _ in tasks.items()],
'title': '{}: {} {}'.format(label, stat[0], stat[1]),
},
}
'title': '{}: {} {}'.format(label, stat[0], stat[1]),
},
}
def _collate_data(self, tasks, stat):
data = []
nulls = []
def _collate_data(self, tasks, stat):
data = []
nulls = []
- for uuid, task in tasks.iteritems():
+ for uuid, task in tasks.items():
for pt in task.series[stat]:
data.append([pt[0].total_seconds()] + nulls + [pt[1]])
nulls.append(None)
for pt in task.series[stat]:
data.append([pt[0].total_seconds()] + nulls + [pt[1]])
nulls.append(None)
#
# SPDX-License-Identifier: AGPL-3.0
#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import print_function
-
import arvados
import itertools
import arvados
import itertools
import threading
from crunchstat_summary import logger
import threading
from crunchstat_summary import logger
self._queue.put(self.EOF)
def __iter__(self):
self._queue.put(self.EOF)
def __iter__(self):
- self._queue = Queue.Queue()
+ self._queue = queue.Queue()
self._thread = threading.Thread(target=self._get_all_pages)
self._thread.daemon = True
self._thread.start()
return self
self._thread = threading.Thread(target=self._get_all_pages)
self._thread.daemon = True
self._thread.start()
return self
line = self._queue.get()
if line is self.EOF:
self._thread.join()
raise StopIteration
return line
line = self._queue.get()
if line is self.EOF:
self._thread.join()
raise StopIteration
return line
+ next = __next__ # for Python 2
+
def __enter__(self):
return self
def __enter__(self):
return self
#
# SPDX-License-Identifier: AGPL-3.0
#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import print_function
-
import arvados
import collections
import crunchstat_summary.dygraphs
import arvados
import collections
import crunchstat_summary.dygraphs
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
if 'tx' in stats or 'rx' in stats:
stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
if 'tx' in stats or 'rx' in stats:
stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
- for stat, val in stats.iteritems():
+ for stat, val in stats.items():
if group == 'interval':
if stat == 'seconds':
this_interval_s = val
if group == 'interval':
if stat == 'seconds':
this_interval_s = val
self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
- for task_id, task_stat in self.task_stats.iteritems():
- for category, stat_last in task_stat.iteritems():
- for stat, val in stat_last.iteritems():
+ for task_id, task_stat in self.task_stats.items():
+ for category, stat_last in task_stat.items():
+ for stat, val in stat_last.items():
if stat in ['cpus', 'cache', 'swap', 'rss']:
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
if stat in ['cpus', 'cache', 'swap', 'rss']:
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
def _text_report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
def _text_report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
- for category, stat_max in sorted(self.stats_max.iteritems()):
- for stat, val in sorted(stat_max.iteritems()):
+ for category, stat_max in sorted(self.stats_max.items()):
+ for stat, val in sorted(stat_max.items()):
if stat.endswith('__rate'):
continue
max_rate = self._format(stat_max.get(stat+'__rate', '-'))
if stat.endswith('__rate'):
continue
max_rate = self._format(stat_max.get(stat+'__rate', '-'))
def run(self):
threads = []
def run(self):
threads = []
- for child in self.children.itervalues():
+ for child in self.children.values():
self.throttle.acquire()
t = threading.Thread(target=self.run_and_release, args=(child.run, ))
t.daemon = True
self.throttle.acquire()
t = threading.Thread(target=self.run_and_release, args=(child.run, ))
t.daemon = True
def text_report(self):
txt = ''
d = self._descendants()
def text_report(self):
txt = ''
d = self._descendants()
- for child in d.itervalues():
+ for child in d.values():
if len(d) > 1:
txt += '### Summary for {} ({})\n'.format(
child.label, child.process['uuid'])
if len(d) > 1:
txt += '### Summary for {} ({})\n'.format(
child.label, child.process['uuid'])
MultiSummarizers) are omitted.
"""
d = collections.OrderedDict()
MultiSummarizers) are omitted.
"""
d = collections.OrderedDict()
- for key, child in self.children.iteritems():
+ for key, child in self.children.items():
if isinstance(child, Summarizer):
d[key] = child
if isinstance(child, MultiSummarizer):
if isinstance(child, Summarizer):
d[key] = child
if isinstance(child, MultiSummarizer):
return d
def html_report(self):
return d
def html_report(self):
- return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
+ return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
class JobTreeSummarizer(MultiSummarizer):
class JobTreeSummarizer(MultiSummarizer):
preloaded = {}
for j in arv.jobs().index(
limit=len(job['components']),
preloaded = {}
for j in arv.jobs().index(
limit=len(job['components']),
- filters=[['uuid','in',job['components'].values()]]).execute()['items']:
+ filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
preloaded[j['uuid']] = j
for cname in sorted(job['components'].keys()):
child_uuid = job['components'][cname]
preloaded[j['uuid']] = j
for cname in sorted(job['components'].keys()):
child_uuid = job['components'][cname]
class PipelineSummarizer(MultiSummarizer):
def __init__(self, instance, **kwargs):
children = collections.OrderedDict()
class PipelineSummarizer(MultiSummarizer):
def __init__(self, instance, **kwargs):
children = collections.OrderedDict()
- for cname, component in instance['components'].iteritems():
+ for cname, component in instance['components'].items():
if 'job' not in component:
logger.warning(
"%s: skipping component with no job assigned", cname)
if 'job' not in component:
logger.warning(
"%s: skipping component with no job assigned", cname)
cr['name'] = cr.get('name') or cr['uuid']
todo.append(cr)
sorted_children = collections.OrderedDict()
cr['name'] = cr.get('name') or cr['uuid']
todo.append(cr)
sorted_children = collections.OrderedDict()
- for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
+ for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
sorted_children[uuid] = children[uuid]
super(ContainerTreeSummarizer, self).__init__(
children=sorted_children,
sorted_children[uuid] = children[uuid]
super(ContainerTreeSummarizer, self).__init__(
children=sorted_children,
def js(self):
return 'var chartdata = {};\n{}'.format(
json.dumps(self.sections()),
def js(self):
return 'var chartdata = {};\n{}'.format(
json.dumps(self.sections()),
- '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS]))
+ '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa).decode('utf-8') for jsa in self.JSASSETS]))
def sections(self):
return [
def sections(self):
return [
import difflib
import glob
import gzip
import difflib
import glob
import gzip
import mock
import os
import unittest
import mock
import os
import unittest
+from crunchstat_summary.command import UTF8Decode
+
TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
class ReportDiff(unittest.TestCase):
def diff_known_report(self, logfile, cmd):
expectfile = logfile+'.report'
TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
class ReportDiff(unittest.TestCase):
def diff_known_report(self, logfile, cmd):
expectfile = logfile+'.report'
- expect = open(expectfile).readlines()
+ expect = open(expectfile, encoding='utf-8').readlines()
self.diff_report(cmd, expect, expectfile=expectfile)
self.diff_report(cmd, expect, expectfile=expectfile)
- def diff_report(self, cmd, expect, expectfile=None):
+ def diff_report(self, cmd, expect, expectfile='(expected)'):
got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
expect, got, fromfile=expectfile, tofile="(generated)")))
got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
expect, got, fromfile=expectfile, tofile="(generated)")))
cmd.run()
self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
cmd.run()
self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
class SummarizeEdgeCases(unittest.TestCase):
def test_error_messages(self):
class SummarizeEdgeCases(unittest.TestCase):
def test_error_messages(self):
- logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+ logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'), encoding='utf-8')
s = crunchstat_summary.summarizer.Summarizer(logfile)
s.run()
s = crunchstat_summary.summarizer.Summarizer(logfile)
s.run()
'container.json', 'crunchstat.txt', 'arv-mount.txt']
def _open(n):
if n == "crunchstat.txt":
'container.json', 'crunchstat.txt', 'arv-mount.txt']
def _open(n):
if n == "crunchstat.txt":
- return gzip.open(self.logfile)
+ return UTF8Decode(gzip.open(self.logfile))
elif n == "arv-mount.txt":
elif n == "arv-mount.txt":
- return gzip.open(self.arvmountlog)
+ return UTF8Decode(gzip.open(self.arvmountlog))
mock_cr().open.side_effect = _open
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_request['uuid']])
mock_cr().open.side_effect = _open
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_request['uuid']])
def test_job_report(self, mock_api, mock_cr):
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
def test_job_report(self, mock_api, mock_cr):
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.return_value = gzip.open(self.logfile)
+ mock_cr().open.return_value = UTF8Decode(gzip.open(self.logfile))
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job_uuid])
cmd = crunchstat_summary.command.Command(args)
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job_uuid])
cmd = crunchstat_summary.command.Command(args)
mock_api().pipeline_instances().get().execute. \
return_value = self.fake_instance
mock_cr().__iter__.return_value = ['fake-logfile.txt']
mock_api().pipeline_instances().get().execute. \
return_value = self.fake_instance
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--pipeline-instance', self.fake_instance['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
job_report = [
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--pipeline-instance', self.fake_instance['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
job_report = [
- line for line in open(logfile+'.report').readlines()
+ line for line in open(logfile+'.report', encoding='utf-8').readlines()
if not line.startswith('#!! ')]
expect = (
['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
if not line.startswith('#!! ')]
expect = (
['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
mock_api().jobs().index().execute.return_value = self.fake_jobs_index
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
mock_api().jobs().index().execute.return_value = self.fake_jobs_index
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
job_report = [
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
job_report = [
- line for line in open(logfile+'.report').readlines()
+ line for line in open(logfile+'.report', encoding='utf-8').readlines()
if not line.startswith('#!! ')]
expect = (
['### Summary for zzzzz-8i9sb-i3e77t9z5y8j9cc (partial) (zzzzz-8i9sb-i3e77t9z5y8j9cc)\n',
if not line.startswith('#!! ')]
expect = (
['### Summary for zzzzz-8i9sb-i3e77t9z5y8j9cc (partial) (zzzzz-8i9sb-i3e77t9z5y8j9cc)\n',