From 99dd11ec75589f79c8c0abc185deba6951936373 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 22 Feb 2024 11:02:38 -0500 Subject: [PATCH] 19744: Fix up threading Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- .../crunchstat_summary/command.py | 2 + .../crunchstat_summary/summarizer.py | 70 +++++++++---------- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py index 71eae51996..4ece5c3b2e 100644 --- a/tools/crunchstat-summary/crunchstat_summary/command.py +++ b/tools/crunchstat-summary/crunchstat_summary/command.py @@ -7,6 +7,7 @@ import gzip from io import open import logging import sys +import arvados from crunchstat_summary import logger, summarizer from crunchstat_summary._version import __version__ @@ -86,6 +87,7 @@ class Command(object): kwargs = { 'skip_child_jobs': self.args.skip_child_jobs, 'threads': self.args.threads, + 'arv': arvados.api('v1') } if self.args.pipeline_instance: self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs) diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py index a721ff36b6..a67b72f89c 100644 --- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py +++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py @@ -12,8 +12,10 @@ import itertools import math import re import sys -import threading import _strptime +import arvados.util + +from concurrent.futures import ThreadPoolExecutor from crunchstat_summary import logger @@ -63,8 +65,9 @@ class Summarizer(object): # are already suitable. If applicable, the subclass # constructor will overwrite this with something useful. self.existing_constraints = {} + self.node_info = {} - logger.debug("%s: logdata %s", self.label, logdata) + logger.info("%s: logdata %s", self.label, logdata) def run(self): logger.debug("%s: parsing logdata %s", self.label, self._logdata) @@ -73,6 +76,10 @@ class Summarizer(object): def _run(self, logdata): self.detected_crunch1 = False + + if not self.node_info: + self.node_info = logdata.node_info() + for line in logdata: if not self.detected_crunch1 and '-8i9sb-' in line: self.detected_crunch1 = True @@ -639,8 +646,6 @@ class ProcessSummarizer(Summarizer): uuid = self.process.get('container_uuid', self.process.get('uuid')) rdr = crunchstat_summary.reader.LiveLogReader(uuid) label = label + ' (partial)' - else: - self.node_info = rdr.node_info() super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs) self.existing_constraints = self.process.get('runtime_constraints', {}) @@ -664,26 +669,26 @@ class ContainerRequestSummarizer(ProcessSummarizer): class MultiSummarizer(object): def __init__(self, children={}, label=None, threads=1, **kwargs): - self.throttle = threading.Semaphore(threads) self.children = children self.label = label - - def run_and_release(self, target, *args, **kwargs): - try: - return target(*args, **kwargs) - finally: - self.throttle.release() + self.threadcount = threads def run(self): - threads = [] - for child in self.children.values(): - self.throttle.acquire() - t = threading.Thread(target=self.run_and_release, args=(child.run, )) - t.daemon = True - t.start() - threads.append(t) - for t in threads: - t.join() + if self.threadcount > 1 and len(self.children) > 1: + completed = 0 + def run_and_progress(child): + try: + child.run() + except Exception as e: + logger.exception("parse error") + completed += 1 + logger.info("%s/%s summarized %s", completed, len(self.children), child.label) + with ThreadPoolExecutor(max_workers=self.threadcount) as tpe: + for child in self.children.values(): + tpe.submit(run_and_progress, child) + else: + for child in self.children.values(): + child.run() def text_report(self): txt = '' @@ -791,22 +796,15 @@ class ContainerRequestTreeSummarizer(MultiSummarizer): summer.sort_key = sort_key children[current['uuid']] = summer - page_filters = [] - while True: - child_crs = arv.container_requests().index( - order=['uuid asc'], - filters=page_filters+[ - ['requesting_container_uuid', '=', current['container_uuid']]], - ).execute() - if not child_crs['items']: - break - elif skip_child_jobs: - logger.warning('%s: omitting stats from %d child containers' - ' because --skip-child-jobs flag is on', - label, child_crs['items_available']) - break - page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]] - for cr in child_crs['items']: + if skip_child_jobs: + child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]], + limit=0).execute() + logger.warning('%s: omitting stats from child containers' + ' because --skip-child-jobs flag is on', + label, child_crs['items_available']) + else: + for cr in arvados.util.keyset_list_all(arv.container_requests().list, + filters=[['requesting_container_uuid', '=', current['container_uuid']]]): if cr['container_uuid']: logger.debug('%s: container req %s', current['uuid'], cr['uuid']) cr['name'] = cr.get('name') or cr['uuid'] -- 2.30.2