17072: Fix imports. Use task_queue from cwltool.
authorPeter Amstutz <peter.amstutz@curii.com>
Wed, 25 Nov 2020 21:10:57 +0000 (16:10 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Wed, 25 Nov 2020 21:10:57 +0000 (16:10 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/arvados_cwl/task_queue.py [deleted file]
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/test_tq.py

index 99d82f3398332d10e0ac0cbf95decb0d5870bd5a..d3521099c4d83d2379808d25045280634645c1f2 100644 (file)
@@ -21,8 +21,7 @@ import ruamel.yaml as yaml
 
 from cwltool.errors import WorkflowException
 from cwltool.process import UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.job import JobBase
 
 import arvados.collection
index 56c6f39e9d76654816e60b30595841f80b106d34..6067ae9f442b70c6d42db62df1581ab32a7cea37 100644 (file)
@@ -17,7 +17,7 @@ from cwltool.pack import pack
 from cwltool.load_tool import fetch_document, resolve_and_validate_document
 from cwltool.process import shortname
 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.context import LoadingContext
 
 import ruamel.yaml as yaml
index 68141586decdc7f656ca715e14bec1a6c436528a..947b630bab9d861deebf3772bb1ef53376fb2be4 100644 (file)
@@ -37,7 +37,7 @@ from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
-from .task_queue import TaskQueue
+from cwltool.task_queue import TaskQueue
 from .context import ArvLoadingContext, ArvRuntimeContext
 from ._version import __version__
 
index 5bad290773be9f49ef2e87b10b2dac48e70ef75b..e0b2d25bc5e89155bccc09567998fe1afcda2888 100644 (file)
@@ -21,7 +21,9 @@ import arvados.collection
 from schema_salad.sourceline import SourceLine
 
 from arvados.errors import ApiError
-from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
+from cwltool.pathmapper import PathMapper, MapperEnt
+from cwltool.utils import adjustFileObjs, adjustDirObjs
+from cwltool.stdfsaccess import abspath
 from cwltool.workflow import WorkflowException
 
 from .http import http_to_keep
index f0645311651c244c7d4f18dc3710663d01c2c481..bad8f1e40c2ff1ff480e5de805f15f610e8da6f5 100644 (file)
@@ -31,8 +31,7 @@ import cwltool.workflow
 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
deleted file mode 100644 (file)
index d75fec6..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from future import standard_library
-standard_library.install_aliases()
-from builtins import range
-from builtins import object
-
-import queue
-import threading
-import logging
-
-logger = logging.getLogger('arvados.cwl-runner')
-
-class TaskQueue(object):
-    def __init__(self, lock, thread_count):
-        self.thread_count = thread_count
-        self.task_queue = queue.Queue(maxsize=self.thread_count)
-        self.task_queue_threads = []
-        self.lock = lock
-        self.in_flight = 0
-        self.error = None
-
-        for r in range(0, self.thread_count):
-            t = threading.Thread(target=self.task_queue_func)
-            self.task_queue_threads.append(t)
-            t.start()
-
-    def task_queue_func(self):
-        while True:
-            task = self.task_queue.get()
-            if task is None:
-                return
-            try:
-                task()
-            except Exception as e:
-                logger.exception("Unhandled exception running task")
-                self.error = e
-
-            with self.lock:
-                self.in_flight -= 1
-
-    def add(self, task, unlock, check_done):
-        if self.thread_count > 1:
-            with self.lock:
-                self.in_flight += 1
-        else:
-            task()
-            return
-
-        while True:
-            try:
-                unlock.release()
-                if check_done.is_set():
-                    return
-                self.task_queue.put(task, block=True, timeout=3)
-                return
-            except queue.Full:
-                pass
-            finally:
-                unlock.acquire()
-
-
-    def drain(self):
-        try:
-            # Drain queue
-            while not self.task_queue.empty():
-                self.task_queue.get(True, .1)
-        except queue.Empty:
-            pass
-
-    def join(self):
-        for t in self.task_queue_threads:
-            self.task_queue.put(None)
-        for t in self.task_queue_threads:
-            t.join()
index 517ca000bb3674817592855879a64370b4d6381c..dc7e32bfe489c6473cdc96bfc1fffd754ecf1a45 100644 (file)
@@ -525,7 +525,7 @@ class TestSubmit(unittest.TestCase):
                          stubs.expect_container_request_uuid + '\n')
         self.assertEqual(exited, 0)
 
-    @mock.patch("arvados_cwl.task_queue.TaskQueue")
+    @mock.patch("cwltool.task_queue.TaskQueue")
     @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
     @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection")
     @stubs
@@ -546,7 +546,7 @@ class TestSubmit(unittest.TestCase):
         make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
         self.assertEqual(exited, 0)
 
-    @mock.patch("arvados_cwl.task_queue.TaskQueue")
+    @mock.patch("cwltool.task_queue.TaskQueue")
     @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
     @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection")
     @stubs
index a094890650e1a3049f177e9f01ec2330df7c7451..05e5116d722fb75a59973cb4bfc0373999dff50d 100644 (file)
@@ -11,7 +11,7 @@ import logging
 import os
 import threading
 
-from arvados_cwl.task_queue import TaskQueue
+from cwltool.task_queue import TaskQueue
 
 def success_task():
     pass