From 138fef8ee97f3cbd335434ad6acd26771fd0b762 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 10 Apr 2018 14:19:05 -0400 Subject: [PATCH] 13108: Add test for taskqueue Also tighten up code in a few places. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/__init__.py | 40 +++++++++--------- sdk/cwl/arvados_cwl/task_queue.py | 4 +- sdk/cwl/tests/test_tq.py | 50 +++++++++++++++++++++++ sdk/python/arvados/commands/keepdocker.py | 7 +--- 4 files changed, 74 insertions(+), 27 deletions(-) create mode 100644 sdk/cwl/tests/test_tq.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 7affade073..c2f43fe368 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -66,7 +66,8 @@ class ArvCwlRunner(object): """ def __init__(self, api_client, work_api=None, keep_client=None, - output_name=None, output_tags=None, num_retries=4): + output_name=None, output_tags=None, num_retries=4, + thread_count=4): self.api = api_client self.processes = {} self.workflow_eval_lock = threading.Condition(threading.RLock()) @@ -85,7 +86,7 @@ class ArvCwlRunner(object): self.intermediate_output_ttl = 0 self.intermediate_output_collections = [] self.trash_intermediate = False - self.thread_count = 4 + self.thread_count = thread_count self.poll_interval = 12 if keep_client is not None: @@ -165,21 +166,20 @@ class ArvCwlRunner(object): return partial(self.wrapped_callback, cb) def on_message(self, event): - if "object_uuid" in event: - if event["object_uuid"] in self.processes and event["event_type"] == "update": - uuid = event["object_uuid"] - if event["properties"]["new_attributes"]["state"] == "Running": - with self.workflow_eval_lock: - j = self.processes[uuid] - if j.running is False: - j.running = True - j.update_pipeline_component(event["properties"]["new_attributes"]) - logger.info("%s %s is Running", self.label(j), uuid) - elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"): - with self.workflow_eval_lock: - j = self.processes[uuid] - self.task_queue.add(partial(j.done, event["properties"]["new_attributes"])) - logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"]) + if event.get("object_uuid") in self.processes and event["event_type"] == "update": + uuid = event["object_uuid"] + if event["properties"]["new_attributes"]["state"] == "Running": + with self.workflow_eval_lock: + j = self.processes[uuid] + if j.running is False: + j.running = True + j.update_pipeline_component(event["properties"]["new_attributes"]) + logger.info("%s %s is Running", self.label(j), uuid) + elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"): + with self.workflow_eval_lock: + j = self.processes[uuid] + self.task_queue.add(partial(j.done, event["properties"]["new_attributes"])) + logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"]) def label(self, obj): return "[%s %s]" % (self.work_api[0:-1], obj.name) @@ -386,7 +386,6 @@ class ArvCwlRunner(object): collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) self.secret_store = kwargs.get("secret_store") - self.thread_count = kwargs.get("thread_count", 4) self.trash_intermediate = kwargs["trash_intermediate"] if self.trash_intermediate and self.work_api != "containers": @@ -551,7 +550,7 @@ class ArvCwlRunner(object): if (self.task_queue.in_flight + len(self.processes)) > 0: self.workflow_eval_lock.wait(3) else: - logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.") + logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.") break loopperf.__enter__() loopperf.__exit__() @@ -794,7 +793,8 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4) runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, num_retries=4, output_name=arvargs.output_name, - output_tags=arvargs.output_tags) + output_tags=arvargs.output_tags, + thread_count=arvargs.thread_count) except Exception as e: logger.error(e) return 1 diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py index 7efb08aec2..b9fd09807b 100644 --- a/sdk/cwl/arvados_cwl/task_queue.py +++ b/sdk/cwl/arvados_cwl/task_queue.py @@ -31,7 +31,7 @@ class TaskQueue(object): try: task() except Exception as e: - logger.exception("Unexpected error running task") + logger.exception("Unhandled exception running task") self.error = e with self.lock: @@ -49,7 +49,7 @@ class TaskQueue(object): try: # Drain queue while not self.task_queue.empty(): - self.task_queue.get() + self.task_queue.get(True, .1) except Queue.Empty: pass diff --git a/sdk/cwl/tests/test_tq.py b/sdk/cwl/tests/test_tq.py new file mode 100644 index 0000000000..2afbe0cff2 --- /dev/null +++ b/sdk/cwl/tests/test_tq.py @@ -0,0 +1,50 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import functools +import mock +import sys +import unittest +import json +import logging +import os +import threading + +from arvados_cwl.task_queue import TaskQueue + +def success_task(): + pass + +def fail_task(): + raise Exception("Testing error handling") + +class TestTaskQueue(unittest.TestCase): + def test_tq(self): + tq = TaskQueue(threading.Lock(), 2) + + self.assertIsNone(tq.error) + + tq.add(success_task) + tq.add(success_task) + tq.add(success_task) + tq.add(success_task) + + tq.join() + + self.assertIsNone(tq.error) + + + def test_tq_error(self): + tq = TaskQueue(threading.Lock(), 2) + + self.assertIsNone(tq.error) + + tq.add(success_task) + tq.add(success_task) + tq.add(fail_task) + tq.add(success_task) + + tq.join() + + self.assertIsNotNone(tq.error) diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py index 6097b5d949..ff7201a75b 100644 --- a/sdk/python/arvados/commands/keepdocker.py +++ b/sdk/python/arvados/commands/keepdocker.py @@ -422,11 +422,8 @@ def main(arguments=None, stdout=sys.stdout): # Check if this image is already in Arvados. # Project where everything should be owned - if args.project_uuid: - parent_project_uuid = args.project_uuid - else: - parent_project_uuid = api.users().current().execute( - num_retries=args.retries)['uuid'] + parent_project_uuid = args.project_uuid or api.users().current().execute( + num_retries=args.retries)['uuid'] # Find image hash tags existing_links = _get_docker_links( -- 2.30.2