19744: Fix up threading
authorPeter Amstutz <peter.amstutz@curii.com>
Thu, 22 Feb 2024 16:02:38 +0000 (11:02 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Thu, 22 Feb 2024 16:02:38 +0000 (11:02 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

tools/crunchstat-summary/crunchstat_summary/command.py
tools/crunchstat-summary/crunchstat_summary/summarizer.py

index 71eae519969bf1f34ec10b491c3885c1425596df..4ece5c3b2e6d8c05d5a29946826e6e31fbdc1967 100644 (file)
@@ -7,6 +7,7 @@ import gzip
 from io import open
 import logging
 import sys
+import arvados
 
 from crunchstat_summary import logger, summarizer
 from crunchstat_summary._version import __version__
@@ -86,6 +87,7 @@ class Command(object):
         kwargs = {
             'skip_child_jobs': self.args.skip_child_jobs,
             'threads': self.args.threads,
+            'arv': arvados.api('v1')
         }
         if self.args.pipeline_instance:
             self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs)
index a721ff36b60d2be4bc4d4dc625fc05b92590610e..a67b72f89cdc84dc4736c77cff49ec3a0f142353 100644 (file)
@@ -12,8 +12,10 @@ import itertools
 import math
 import re
 import sys
-import threading
 import _strptime
+import arvados.util
+
+from concurrent.futures import ThreadPoolExecutor
 
 from crunchstat_summary import logger
 
@@ -63,8 +65,9 @@ class Summarizer(object):
         # are already suitable.  If applicable, the subclass
         # constructor will overwrite this with something useful.
         self.existing_constraints = {}
+        self.node_info = {}
 
-        logger.debug("%s: logdata %s", self.label, logdata)
+        logger.info("%s: logdata %s", self.label, logdata)
 
     def run(self):
         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
@@ -73,6 +76,10 @@ class Summarizer(object):
 
     def _run(self, logdata):
         self.detected_crunch1 = False
+
+        if not self.node_info:
+            self.node_info = logdata.node_info()
+
         for line in logdata:
             if not self.detected_crunch1 and '-8i9sb-' in line:
                 self.detected_crunch1 = True
@@ -639,8 +646,6 @@ class ProcessSummarizer(Summarizer):
             uuid = self.process.get('container_uuid', self.process.get('uuid'))
             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
             label = label + ' (partial)'
-        else:
-            self.node_info = rdr.node_info()
 
         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
         self.existing_constraints = self.process.get('runtime_constraints', {})
@@ -664,26 +669,26 @@ class ContainerRequestSummarizer(ProcessSummarizer):
 
 class MultiSummarizer(object):
     def __init__(self, children={}, label=None, threads=1, **kwargs):
-        self.throttle = threading.Semaphore(threads)
         self.children = children
         self.label = label
-
-    def run_and_release(self, target, *args, **kwargs):
-        try:
-            return target(*args, **kwargs)
-        finally:
-            self.throttle.release()
+        self.threadcount = threads
 
     def run(self):
-        threads = []
-        for child in self.children.values():
-            self.throttle.acquire()
-            t = threading.Thread(target=self.run_and_release, args=(child.run, ))
-            t.daemon = True
-            t.start()
-            threads.append(t)
-        for t in threads:
-            t.join()
+        if self.threadcount > 1 and len(self.children) > 1:
+            completed = 0
+            def run_and_progress(child):
+                try:
+                    child.run()
+                except Exception as e:
+                    logger.exception("parse error")
+                completed += 1
+                logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
+            with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
+                for child in self.children.values():
+                    tpe.submit(run_and_progress, child)
+        else:
+            for child in self.children.values():
+                child.run()
 
     def text_report(self):
         txt = ''
@@ -791,22 +796,15 @@ class ContainerRequestTreeSummarizer(MultiSummarizer):
             summer.sort_key = sort_key
             children[current['uuid']] = summer
 
-            page_filters = []
-            while True:
-                child_crs = arv.container_requests().index(
-                    order=['uuid asc'],
-                    filters=page_filters+[
-                        ['requesting_container_uuid', '=', current['container_uuid']]],
-                ).execute()
-                if not child_crs['items']:
-                    break
-                elif skip_child_jobs:
-                    logger.warning('%s: omitting stats from %d child containers'
-                                   ' because --skip-child-jobs flag is on',
-                                   label, child_crs['items_available'])
-                    break
-                page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
-                for cr in child_crs['items']:
+            if skip_child_jobs:
+                child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
+                                                          limit=0).execute()
+                logger.warning('%s: omitting stats from child containers'
+                               ' because --skip-child-jobs flag is on',
+                               label, child_crs['items_available'])
+            else:
+                for cr in arvados.util.keyset_list_all(arv.container_requests().list,
+                                                       filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
                     if cr['container_uuid']:
                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
                         cr['name'] = cr.get('name') or cr['uuid']