from cwltool.errors import WorkflowException
from cwltool.process import UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
from cwltool.job import JobBase
import arvados.collection
from cwltool.load_tool import fetch_document, resolve_and_validate_document
from cwltool.process import shortname
from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.context import LoadingContext
import ruamel.yaml as yaml
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
from .perf import Perf
from .pathmapper import NoFollowPathMapper
-from .task_queue import TaskQueue
+from cwltool.task_queue import TaskQueue
from .context import ArvLoadingContext, ArvRuntimeContext
from ._version import __version__
from schema_salad.sourceline import SourceLine
from arvados.errors import ApiError
-from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
+from cwltool.pathmapper import PathMapper, MapperEnt
+from cwltool.utils import adjustFileObjs, adjustDirObjs
+from cwltool.stdfsaccess import abspath
from cwltool.workflow import WorkflowException
from .http import http_to_keep
from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
shortname, Process, fill_in_defaults)
from cwltool.load_tool import fetch_document
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import substitute
from cwltool.pack import pack
from cwltool.update import INTERNAL_VERSION
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from future import standard_library
-standard_library.install_aliases()
-from builtins import range
-from builtins import object
-
-import queue
-import threading
-import logging
-
-logger = logging.getLogger('arvados.cwl-runner')
-
-class TaskQueue(object):
- def __init__(self, lock, thread_count):
- self.thread_count = thread_count
- self.task_queue = queue.Queue(maxsize=self.thread_count)
- self.task_queue_threads = []
- self.lock = lock
- self.in_flight = 0
- self.error = None
-
- for r in range(0, self.thread_count):
- t = threading.Thread(target=self.task_queue_func)
- self.task_queue_threads.append(t)
- t.start()
-
- def task_queue_func(self):
- while True:
- task = self.task_queue.get()
- if task is None:
- return
- try:
- task()
- except Exception as e:
- logger.exception("Unhandled exception running task")
- self.error = e
-
- with self.lock:
- self.in_flight -= 1
-
- def add(self, task, unlock, check_done):
- if self.thread_count > 1:
- with self.lock:
- self.in_flight += 1
- else:
- task()
- return
-
- while True:
- try:
- unlock.release()
- if check_done.is_set():
- return
- self.task_queue.put(task, block=True, timeout=3)
- return
- except queue.Full:
- pass
- finally:
- unlock.acquire()
-
-
- def drain(self):
- try:
- # Drain queue
- while not self.task_queue.empty():
- self.task_queue.get(True, .1)
- except queue.Empty:
- pass
-
- def join(self):
- for t in self.task_queue_threads:
- self.task_queue.put(None)
- for t in self.task_queue_threads:
- t.join()
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @mock.patch("arvados_cwl.task_queue.TaskQueue")
+ @mock.patch("cwltool.task_queue.TaskQueue")
@mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
@mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection")
@stubs
make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
self.assertEqual(exited, 0)
- @mock.patch("arvados_cwl.task_queue.TaskQueue")
+ @mock.patch("cwltool.task_queue.TaskQueue")
@mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
@mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection")
@stubs
import os
import threading
-from arvados_cwl.task_queue import TaskQueue
+from cwltool.task_queue import TaskQueue
def success_task():
pass