import math
import re
import sys
-import threading
import _strptime
+import arvados.util
+
+from concurrent.futures import ThreadPoolExecutor
from crunchstat_summary import logger
# are already suitable. If applicable, the subclass
# constructor will overwrite this with something useful.
self.existing_constraints = {}
+ self.node_info = {}
- logger.debug("%s: logdata %s", self.label, logdata)
+ logger.info("%s: logdata %s", self.label, logdata)
def run(self):
logger.debug("%s: parsing logdata %s", self.label, self._logdata)
def _run(self, logdata):
self.detected_crunch1 = False
+
+ if not self.node_info:
+ self.node_info = logdata.node_info()
+
for line in logdata:
if not self.detected_crunch1 and '-8i9sb-' in line:
self.detected_crunch1 = True
uuid = self.process.get('container_uuid', self.process.get('uuid'))
rdr = crunchstat_summary.reader.LiveLogReader(uuid)
label = label + ' (partial)'
- else:
- self.node_info = rdr.node_info()
super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
self.existing_constraints = self.process.get('runtime_constraints', {})
class MultiSummarizer(object):
def __init__(self, children={}, label=None, threads=1, **kwargs):
- self.throttle = threading.Semaphore(threads)
self.children = children
self.label = label
-
- def run_and_release(self, target, *args, **kwargs):
- try:
- return target(*args, **kwargs)
- finally:
- self.throttle.release()
+ self.threadcount = threads
def run(self):
- threads = []
- for child in self.children.values():
- self.throttle.acquire()
- t = threading.Thread(target=self.run_and_release, args=(child.run, ))
- t.daemon = True
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
+ if self.threadcount > 1 and len(self.children) > 1:
+ completed = 0
+ def run_and_progress(child):
+ try:
+ child.run()
+ except Exception as e:
+ logger.exception("parse error")
+ completed += 1
+ logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
+ with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
+ for child in self.children.values():
+ tpe.submit(run_and_progress, child)
+ else:
+ for child in self.children.values():
+ child.run()
def text_report(self):
txt = ''
summer.sort_key = sort_key
children[current['uuid']] = summer
- page_filters = []
- while True:
- child_crs = arv.container_requests().index(
- order=['uuid asc'],
- filters=page_filters+[
- ['requesting_container_uuid', '=', current['container_uuid']]],
- ).execute()
- if not child_crs['items']:
- break
- elif skip_child_jobs:
- logger.warning('%s: omitting stats from %d child containers'
- ' because --skip-child-jobs flag is on',
- label, child_crs['items_available'])
- break
- page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
- for cr in child_crs['items']:
+ if skip_child_jobs:
+ child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
+ limit=0).execute()
+ logger.warning('%s: omitting stats from child containers'
+ ' because --skip-child-jobs flag is on',
+ label, child_crs['items_available'])
+ else:
+ for cr in arvados.util.keyset_list_all(arv.container_requests().list,
+ filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
if cr['container_uuid']:
logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
cr['name'] = cr.get('name') or cr['uuid']