Merge branch '14198-fed-testing' refs #14198
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 21 Nov 2018 18:06:00 +0000 (13:06 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 21 Nov 2018 18:06:00 +0000 (13:06 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

73 files changed:
.licenseignore
build/build.list
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arv-cwl-schema.yml
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/context.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/executor.py [new file with mode: 0644]
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/federation/README [new file with mode: 0644]
sdk/cwl/tests/federation/arvbox-make-federation.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/arvbox/fed-config.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/arvbox/mkdir.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/arvbox/setup-user.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/arvbox/setup_user.py [new file with mode: 0644]
sdk/cwl/tests/federation/arvbox/start.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/arvbox/stop.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/base-case.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/cat.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/hint-on-tool.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/hint-on-wf.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/md5sum-tool-hint.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/md5sum.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/remote-case.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/rev-input-to-output.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/rev.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/runner-home-step-remote.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/runner-remote-step-home.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/scatter-gather.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/threestep-remote.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/twostep-both-remote.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/twostep-home-to-remote.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/twostep-remote-copy-to-home.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/cases/twostep-remote-to-home.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/data/base-case-input.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/hint-on-tool.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/hint-on-wf.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/remote-case-input.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/runner-home-step-remote-input.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/runner-remote-step-home-input.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/scatter-gather-s1.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/scatter-gather-s2.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/scatter-gather-s3.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/threestep-remote.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/twostep-both-remote.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/twostep-home-to-remote.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/twostep-remote-copy-to-home.txt [new file with mode: 0644]
sdk/cwl/tests/federation/data/twostep-remote-to-home.txt [new file with mode: 0644]
sdk/cwl/tests/federation/framework/check-exist.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/framework/check_exist.py [new file with mode: 0644]
sdk/cwl/tests/federation/framework/dockerbuild.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/framework/prepare.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/framework/prepare.py [new file with mode: 0644]
sdk/cwl/tests/federation/framework/run-acr.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/framework/testcase.cwl [new file with mode: 0644]
sdk/cwl/tests/federation/main.cwl [new file with mode: 0755]
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_make_output.py
sdk/cwl/tests/test_pathmapper.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/expect_packed.cwl
sdk/cwl/tests/wf/submit_wf_packed.cwl
sdk/dev-jobs.dockerfile
sdk/python/setup.py
services/api/app/controllers/arvados/v1/schema_controller.rb

index 51a1e7cbd2f0aabca972527475630923f1f1ef75..113bf4fa4e5f2196f74a9609d4805117392d85a3 100644 (file)
@@ -46,6 +46,7 @@ docker/jobs/apt.arvados.org.list
 */script/rails
 sdk/cwl/tests/input/blorp.txt
 sdk/cwl/tests/tool/blub.txt
+sdk/cwl/tests/federation/data/*
 sdk/go/manifest/testdata/*_manifest
 sdk/java/.classpath
 sdk/java/pom.xml
index 4c3d740b0b82c21a71e274bfc0db1311bdcd3e43..f97897a848c983186854e10e3f12bcfb10139cc4 100644 (file)
@@ -22,7 +22,7 @@ debian8,debian9,ubuntu1404,centos7|pycurl|7.19.5.3|3|python|amd64
 debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|pyyaml|3.12|2|python|amd64
 debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|rdflib|4.2.2|2|python|all
 debian8,debian9,ubuntu1404,centos7|shellescape|3.4.1|2|python|all
-debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|mistune|0.7.3|2|python|all
+debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|mistune|0.8.1|2|python|all
 debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|typing|3.6.4|2|python|all
 debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|avro|1.8.1|2|python|all
 debian8,debian9,ubuntu1404,centos7|ruamel.ordereddict|0.4.9|2|python|amd64
@@ -33,7 +33,7 @@ debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|docker-py|1.7.2|2|pytho
 debian8,debian9,centos7|six|1.10.0|2|python3|all
 debian8,debian9,ubuntu1404,centos7|requests|2.12.4|2|python3|all
 debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|websocket-client|0.37.0|2|python3|all
-ubuntu1404|requests|2.4.3|2|python|all
+debian8,ubuntu1404,centos7|requests|2.6.1|2|python|all
 centos7|contextlib2|0.5.4|2|python|all
 centos7|isodate|0.5.4|2|python|all
 centos7|python-daemon|2.1.2|1|python|all
@@ -44,7 +44,7 @@ centos7|networkx|1.11|0|python|all
 centos7|psutil|5.0.1|0|python|all
 debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|lockfile|0.12.2|2|python|all|--epoch 1
 debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|subprocess32|3.5.1|2|python|all
-all|ruamel.yaml|0.14.12|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
+all|ruamel.yaml|0.15.77|0|python|amd64|--python-setup-py-arguments --single-version-externally-managed
 all|cwltest|1.0.20180518074130|4|python|all|--depends 'python-futures >= 3.0.5' --depends 'python-subprocess32 >= 3.5.0'
 all|junit-xml|1.8|3|python|all
 all|rdflib-jsonld|0.4.0|2|python|all
index 2e1ea50a389485ec0d5cdcdf0f044ac20e7d1080..7b2731ea900a8055f9ec0c114d161674b534d8fb 100644 (file)
@@ -10,51 +10,38 @@ import argparse
 import logging
 import os
 import sys
-import threading
-import hashlib
-import copy
-import json
 import re
-from functools import partial
 import pkg_resources  # part of setuptools
-import Queue
-import time
-import signal
-import thread
 
-from cwltool.errors import WorkflowException
+from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
 import cwltool.main
 import cwltool.workflow
 import cwltool.process
-from schema_salad.sourceline import SourceLine
-import schema_salad.validate as validate
 import cwltool.argparser
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
 
 import arvados
 import arvados.config
 from arvados.keep import KeepClient
 from arvados.errors import ApiError
 import arvados.commands._util as arv_cmd
+from arvados.api import OrderedJsonModel
 
-from .arvcontainer import ArvadosContainer, RunnerContainer
-from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool
-from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
-from .pathmapper import NoFollowPathMapper
-from .task_queue import TaskQueue
-from .context import ArvLoadingContext, ArvRuntimeContext
-from .util import get_current_container
 from ._version import __version__
+from .executor import ArvCwlExecutor
 
-from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.command_line_tool import compute_checksums
-
-from arvados.api import OrderedJsonModel
+# These arn't used directly in this file but
+# other code expects to import them from here
+from .arvcontainer import ArvadosContainer
+from .arvjob import ArvadosJob
+from .arvtool import ArvadosCommandTool
+from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
+from .util import get_current_container
+from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
+from .arvworkflow import ArvadosWorkflow
 
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
@@ -64,679 +51,6 @@ arvados.log_handler.setFormatter(logging.Formatter(
         '%(asctime)s %(name)s %(levelname)s: %(message)s',
         '%Y-%m-%d %H:%M:%S'))
 
-DEFAULT_PRIORITY = 500
-
-class RuntimeStatusLoggingHandler(logging.Handler):
-    """
-    Intercepts logging calls and report them as runtime statuses on runner
-    containers.
-    """
-    def __init__(self, runtime_status_update_func):
-        super(RuntimeStatusLoggingHandler, self).__init__()
-        self.runtime_status_update = runtime_status_update_func
-
-    def emit(self, record):
-        kind = None
-        if record.levelno >= logging.ERROR:
-            kind = 'error'
-        elif record.levelno >= logging.WARNING:
-            kind = 'warning'
-        if kind is not None:
-            log_msg = record.getMessage()
-            if '\n' in log_msg:
-                # If the logged message is multi-line, use its first line as status
-                # and the rest as detail.
-                status, detail = log_msg.split('\n', 1)
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, status),
-                    detail
-                )
-            else:
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, record.getMessage())
-                )
-
-class ArvCwlRunner(object):
-    """Execute a CWL tool or workflow, submit work (using either jobs or
-    containers API), wait for them to complete, and report output.
-
-    """
-
-    def __init__(self, api_client,
-                 arvargs=None,
-                 keep_client=None,
-                 num_retries=4,
-                 thread_count=4):
-
-        if arvargs is None:
-            arvargs = argparse.Namespace()
-            arvargs.work_api = None
-            arvargs.output_name = None
-            arvargs.output_tags = None
-            arvargs.thread_count = 1
-
-        self.api = api_client
-        self.processes = {}
-        self.workflow_eval_lock = threading.Condition(threading.RLock())
-        self.final_output = None
-        self.final_status = None
-        self.num_retries = num_retries
-        self.uuid = None
-        self.stop_polling = threading.Event()
-        self.poll_api = None
-        self.pipeline = None
-        self.final_output_collection = None
-        self.output_name = arvargs.output_name
-        self.output_tags = arvargs.output_tags
-        self.project_uuid = None
-        self.intermediate_output_ttl = 0
-        self.intermediate_output_collections = []
-        self.trash_intermediate = False
-        self.thread_count = arvargs.thread_count
-        self.poll_interval = 12
-        self.loadingContext = None
-
-        if keep_client is not None:
-            self.keep_client = keep_client
-        else:
-            self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
-
-        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
-
-        self.fetcher_constructor = partial(CollectionFetcher,
-                                           api_client=self.api,
-                                           fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
-                                           num_retries=self.num_retries)
-
-        self.work_api = None
-        expected_api = ["jobs", "containers"]
-        for api in expected_api:
-            try:
-                methods = self.api._rootDesc.get('resources')[api]['methods']
-                if ('httpMethod' in methods['create'] and
-                    (arvargs.work_api == api or arvargs.work_api is None)):
-                    self.work_api = api
-                    break
-            except KeyError:
-                pass
-
-        if not self.work_api:
-            if arvargs.work_api is None:
-                raise Exception("No supported APIs")
-            else:
-                raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
-
-        if self.work_api == "jobs":
-            logger.warn("""
-*******************************
-Using the deprecated 'jobs' API.
-
-To get rid of this warning:
-
-Users: read about migrating at
-http://doc.arvados.org/user/cwl/cwl-style.html#migrate
-and use the option --api=containers
-
-Admins: configure the cluster to disable the 'jobs' API as described at:
-http://doc.arvados.org/install/install-api-server.html#disable_api_methods
-*******************************""")
-
-        self.loadingContext = ArvLoadingContext(vars(arvargs))
-        self.loadingContext.fetcher_constructor = self.fetcher_constructor
-        self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
-        self.loadingContext.construct_tool_object = self.arv_make_tool
-
-        # Add a custom logging handler to the root logger for runtime status reporting
-        # if running inside a container
-        if get_current_container(self.api, self.num_retries, logger):
-            root_logger = logging.getLogger('')
-            handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
-            root_logger.addHandler(handler)
-
-    def arv_make_tool(self, toolpath_object, loadingContext):
-        if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, loadingContext)
-        elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
-            return ArvadosWorkflow(self, toolpath_object, loadingContext)
-        else:
-            return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
-
-    def output_callback(self, out, processStatus):
-        with self.workflow_eval_lock:
-            if processStatus == "success":
-                logger.info("Overall process status is %s", processStatus)
-                state = "Complete"
-            else:
-                logger.error("Overall process status is %s", processStatus)
-                state = "Failed"
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                        body={"state": state}).execute(num_retries=self.num_retries)
-            self.final_status = processStatus
-            self.final_output = out
-            self.workflow_eval_lock.notifyAll()
-
-
-    def start_run(self, runnable, runtimeContext):
-        self.task_queue.add(partial(runnable.run, runtimeContext))
-
-    def process_submitted(self, container):
-        with self.workflow_eval_lock:
-            self.processes[container.uuid] = container
-
-    def process_done(self, uuid, record):
-        with self.workflow_eval_lock:
-            j = self.processes[uuid]
-            logger.info("%s %s is %s", self.label(j), uuid, record["state"])
-            self.task_queue.add(partial(j.done, record))
-            del self.processes[uuid]
-
-    def runtime_status_update(self, kind, message, detail=None):
-        """
-        Updates the runtime_status field on the runner container.
-        Called when there's a need to report errors, warnings or just
-        activity statuses, for example in the RuntimeStatusLoggingHandler.
-        """
-        with self.workflow_eval_lock:
-            current = get_current_container(self.api, self.num_retries, logger)
-            if current is None:
-                return
-            runtime_status = current.get('runtime_status', {})
-            # In case of status being an error, only report the first one.
-            if kind == 'error':
-                if not runtime_status.get('error'):
-                    runtime_status.update({
-                        'error': message
-                    })
-                    if detail is not None:
-                        runtime_status.update({
-                            'errorDetail': detail
-                        })
-                # Further errors are only mentioned as a count.
-                else:
-                    # Get anything before an optional 'and N more' string.
-                    try:
-                        error_msg = re.match(
-                            r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
-                        more_failures = re.match(
-                            r'.*\(and (\d+) more\)', runtime_status.get('error'))
-                    except TypeError:
-                        # Ignore tests stubbing errors
-                        return
-                    if more_failures:
-                        failure_qty = int(more_failures.groups()[0])
-                        runtime_status.update({
-                            'error': "%s (and %d more)" % (error_msg, failure_qty+1)
-                        })
-                    else:
-                        runtime_status.update({
-                            'error': "%s (and 1 more)" % error_msg
-                        })
-            elif kind in ['warning', 'activity']:
-                # Record the last warning/activity status without regard of
-                # previous occurences.
-                runtime_status.update({
-                    kind: message
-                })
-                if detail is not None:
-                    runtime_status.update({
-                        kind+"Detail": detail
-                    })
-            else:
-                # Ignore any other status kind
-                return
-            try:
-                self.api.containers().update(uuid=current['uuid'],
-                                            body={
-                                                'runtime_status': runtime_status,
-                                            }).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.info("Couldn't update runtime_status: %s", e)
-
-    def wrapped_callback(self, cb, obj, st):
-        with self.workflow_eval_lock:
-            cb(obj, st)
-            self.workflow_eval_lock.notifyAll()
-
-    def get_wrapped_callback(self, cb):
-        return partial(self.wrapped_callback, cb)
-
-    def on_message(self, event):
-        if event.get("object_uuid") in self.processes and event["event_type"] == "update":
-            uuid = event["object_uuid"]
-            if event["properties"]["new_attributes"]["state"] == "Running":
-                with self.workflow_eval_lock:
-                    j = self.processes[uuid]
-                    if j.running is False:
-                        j.running = True
-                        j.update_pipeline_component(event["properties"]["new_attributes"])
-                        logger.info("%s %s is Running", self.label(j), uuid)
-            elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
-                self.process_done(uuid, event["properties"]["new_attributes"])
-
-    def label(self, obj):
-        return "[%s %s]" % (self.work_api[0:-1], obj.name)
-
-    def poll_states(self):
-        """Poll status of jobs or containers listed in the processes dict.
-
-        Runs in a separate thread.
-        """
-
-        try:
-            remain_wait = self.poll_interval
-            while True:
-                if remain_wait > 0:
-                    self.stop_polling.wait(remain_wait)
-                if self.stop_polling.is_set():
-                    break
-                with self.workflow_eval_lock:
-                    keys = list(self.processes.keys())
-                if not keys:
-                    remain_wait = self.poll_interval
-                    continue
-
-                begin_poll = time.time()
-                if self.work_api == "containers":
-                    table = self.poll_api.container_requests()
-                elif self.work_api == "jobs":
-                    table = self.poll_api.jobs()
-
-                try:
-                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-                except Exception as e:
-                    logger.warn("Error checking states on API server: %s", e)
-                    remain_wait = self.poll_interval
-                    continue
-
-                for p in proc_states["items"]:
-                    self.on_message({
-                        "object_uuid": p["uuid"],
-                        "event_type": "update",
-                        "properties": {
-                            "new_attributes": p
-                        }
-                    })
-                finish_poll = time.time()
-                remain_wait = self.poll_interval - (finish_poll - begin_poll)
-        except:
-            logger.exception("Fatal error in state polling thread.")
-            with self.workflow_eval_lock:
-                self.processes.clear()
-                self.workflow_eval_lock.notifyAll()
-        finally:
-            self.stop_polling.set()
-
-    def add_intermediate_output(self, uuid):
-        if uuid:
-            self.intermediate_output_collections.append(uuid)
-
-    def trash_intermediate_output(self):
-        logger.info("Cleaning up intermediate output collections")
-        for i in self.intermediate_output_collections:
-            try:
-                self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
-            except:
-                logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
-                break
-
-    def check_features(self, obj):
-        if isinstance(obj, dict):
-            if obj.get("writable") and self.work_api != "containers":
-                raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
-            if obj.get("class") == "DockerRequirement":
-                if obj.get("dockerOutputDirectory"):
-                    if self.work_api != "containers":
-                        raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
-                            "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
-                    if not obj.get("dockerOutputDirectory").startswith('/'):
-                        raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
-                            "Option 'dockerOutputDirectory' must be an absolute path.")
-            if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
-                raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
-            for v in obj.itervalues():
-                self.check_features(v)
-        elif isinstance(obj, list):
-            for i,v in enumerate(obj):
-                with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
-                    self.check_features(v)
-
-    def make_output_collection(self, name, storage_classes, tagsString, outputObj):
-        outputObj = copy.deepcopy(outputObj)
-
-        files = []
-        def capture(fileobj):
-            files.append(fileobj)
-
-        adjustDirObjs(outputObj, capture)
-        adjustFileObjs(outputObj, capture)
-
-        generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
-
-        final = arvados.collection.Collection(api_client=self.api,
-                                              keep_client=self.keep_client,
-                                              num_retries=self.num_retries)
-
-        for k,v in generatemapper.items():
-            if k.startswith("_:"):
-                if v.type == "Directory":
-                    continue
-                if v.type == "CreateFile":
-                    with final.open(v.target, "wb") as f:
-                        f.write(v.resolved.encode("utf-8"))
-                    continue
-
-            if not k.startswith("keep:"):
-                raise Exception("Output source is not in keep or a literal")
-            sp = k.split("/")
-            srccollection = sp[0][5:]
-            try:
-                reader = self.collection_cache.get(srccollection)
-                srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
-                final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
-            except arvados.errors.ArgumentError as e:
-                logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
-                raise
-            except IOError as e:
-                logger.warn("While preparing output collection: %s", e)
-
-        def rewrite(fileobj):
-            fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
-            for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
-                if k in fileobj:
-                    del fileobj[k]
-
-        adjustDirObjs(outputObj, rewrite)
-        adjustFileObjs(outputObj, rewrite)
-
-        with final.open("cwl.output.json", "w") as f:
-            json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
-
-        final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
-
-        logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
-                    final.api_response()["name"],
-                    final.manifest_locator())
-
-        final_uuid = final.manifest_locator()
-        tags = tagsString.split(',')
-        for tag in tags:
-             self.api.links().create(body={
-                "head_uuid": final_uuid, "link_class": "tag", "name": tag
-                }).execute(num_retries=self.num_retries)
-
-        def finalcollection(fileobj):
-            fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
-
-        adjustDirObjs(outputObj, finalcollection)
-        adjustFileObjs(outputObj, finalcollection)
-
-        return (outputObj, final)
-
-    def set_crunch_output(self):
-        if self.work_api == "containers":
-            current = get_current_container(self.api, self.num_retries, logger)
-            if current is None:
-                return
-            try:
-                self.api.containers().update(uuid=current['uuid'],
-                                             body={
-                                                 'output': self.final_output_collection.portable_data_hash(),
-                                             }).execute(num_retries=self.num_retries)
-                self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
-                                              body={
-                                                  'is_trashed': True
-                                              }).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.info("Setting container output: %s", e)
-        elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
-            self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
-                                   body={
-                                       'output': self.final_output_collection.portable_data_hash(),
-                                       'success': self.final_status == "success",
-                                       'progress':1.0
-                                   }).execute(num_retries=self.num_retries)
-
-    def arv_executor(self, tool, job_order, runtimeContext, logger=None):
-        self.debug = runtimeContext.debug
-
-        tool.visit(self.check_features)
-
-        self.project_uuid = runtimeContext.project_uuid
-        self.pipeline = None
-        self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
-        self.secret_store = runtimeContext.secret_store
-
-        self.trash_intermediate = runtimeContext.trash_intermediate
-        if self.trash_intermediate and self.work_api != "containers":
-            raise Exception("--trash-intermediate is only supported with --api=containers.")
-
-        self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
-        if self.intermediate_output_ttl and self.work_api != "containers":
-            raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
-        if self.intermediate_output_ttl < 0:
-            raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
-
-        if runtimeContext.submit_request_uuid and self.work_api != "containers":
-            raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
-
-        if not runtimeContext.name:
-            runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
-
-        # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
-        # Also uploads docker images.
-        merged_map = upload_workflow_deps(self, tool)
-
-        # Reload tool object which may have been updated by
-        # upload_workflow_deps
-        # Don't validate this time because it will just print redundant errors.
-        loadingContext = self.loadingContext.copy()
-        loadingContext.loader = tool.doc_loader
-        loadingContext.avsc_names = tool.doc_schema
-        loadingContext.metadata = tool.metadata
-        loadingContext.do_validate = False
-
-        tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
-                                  loadingContext)
-
-        # Upload local file references in the job order.
-        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
-                                     tool, job_order)
-
-        existing_uuid = runtimeContext.update_workflow
-        if existing_uuid or runtimeContext.create_workflow:
-            # Create a pipeline template or workflow record and exit.
-            if self.work_api == "jobs":
-                tmpl = RunnerTemplate(self, tool, job_order,
-                                      runtimeContext.enable_reuse,
-                                      uuid=existing_uuid,
-                                      submit_runner_ram=runtimeContext.submit_runner_ram,
-                                      name=runtimeContext.name,
-                                      merged_map=merged_map)
-                tmpl.save()
-                # cwltool.main will write our return value to stdout.
-                return (tmpl.uuid, "success")
-            elif self.work_api == "containers":
-                return (upload_workflow(self, tool, job_order,
-                                        self.project_uuid,
-                                        uuid=existing_uuid,
-                                        submit_runner_ram=runtimeContext.submit_runner_ram,
-                                        name=runtimeContext.name,
-                                        merged_map=merged_map),
-                        "success")
-
-        self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
-        self.eval_timeout = runtimeContext.eval_timeout
-
-        runtimeContext = runtimeContext.copy()
-        runtimeContext.use_container = True
-        runtimeContext.tmpdir_prefix = "tmp"
-        runtimeContext.work_api = self.work_api
-
-        if self.work_api == "containers":
-            if self.ignore_docker_for_reuse:
-                raise Exception("--ignore-docker-for-reuse not supported with containers API.")
-            runtimeContext.outdir = "/var/spool/cwl"
-            runtimeContext.docker_outdir = "/var/spool/cwl"
-            runtimeContext.tmpdir = "/tmp"
-            runtimeContext.docker_tmpdir = "/tmp"
-        elif self.work_api == "jobs":
-            if runtimeContext.priority != DEFAULT_PRIORITY:
-                raise Exception("--priority not implemented for jobs API.")
-            runtimeContext.outdir = "$(task.outdir)"
-            runtimeContext.docker_outdir = "$(task.outdir)"
-            runtimeContext.tmpdir = "$(task.tmpdir)"
-
-        if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
-            raise Exception("--priority must be in the range 1..1000.")
-
-        runnerjob = None
-        if runtimeContext.submit:
-            # Submit a runner job to run the workflow for us.
-            if self.work_api == "containers":
-                if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
-                    runtimeContext.runnerjob = tool.tool["id"]
-                    runnerjob = tool.job(job_order,
-                                         self.output_callback,
-                                         runtimeContext).next()
-                else:
-                    runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
-                                                self.output_name,
-                                                self.output_tags,
-                                                submit_runner_ram=runtimeContext.submit_runner_ram,
-                                                name=runtimeContext.name,
-                                                on_error=runtimeContext.on_error,
-                                                submit_runner_image=runtimeContext.submit_runner_image,
-                                                intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
-                                                merged_map=merged_map,
-                                                priority=runtimeContext.priority,
-                                                secret_store=self.secret_store)
-            elif self.work_api == "jobs":
-                runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
-                                      self.output_name,
-                                      self.output_tags,
-                                      submit_runner_ram=runtimeContext.submit_runner_ram,
-                                      name=runtimeContext.name,
-                                      on_error=runtimeContext.on_error,
-                                      submit_runner_image=runtimeContext.submit_runner_image,
-                                      merged_map=merged_map)
-        elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
-            # Create pipeline for local run
-            self.pipeline = self.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.project_uuid,
-                    "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
-                    "components": {},
-                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
-            logger.info("Pipeline instance %s", self.pipeline["uuid"])
-
-        if runnerjob and not runtimeContext.wait:
-            submitargs = runtimeContext.copy()
-            submitargs.submit = False
-            runnerjob.run(submitargs)
-            return (runnerjob.uuid, "success")
-
-        self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
-        self.polling_thread = threading.Thread(target=self.poll_states)
-        self.polling_thread.start()
-
-        self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
-
-        if runnerjob:
-            jobiter = iter((runnerjob,))
-        else:
-            if runtimeContext.cwl_runner_job is not None:
-                self.uuid = runtimeContext.cwl_runner_job.get('uuid')
-            jobiter = tool.job(job_order,
-                               self.output_callback,
-                               runtimeContext)
-
-        try:
-            self.workflow_eval_lock.acquire()
-            # Holds the lock while this code runs and releases it when
-            # it is safe to do so in self.workflow_eval_lock.wait(),
-            # at which point on_message can update job state and
-            # process output callbacks.
-
-            loopperf = Perf(metrics, "jobiter")
-            loopperf.__enter__()
-            for runnable in jobiter:
-                loopperf.__exit__()
-
-                if self.stop_polling.is_set():
-                    break
-
-                if self.task_queue.error is not None:
-                    raise self.task_queue.error
-
-                if runnable:
-                    with Perf(metrics, "run"):
-                        self.start_run(runnable, runtimeContext)
-                else:
-                    if (self.task_queue.in_flight + len(self.processes)) > 0:
-                        self.workflow_eval_lock.wait(3)
-                    else:
-                        logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
-                        break
-                loopperf.__enter__()
-            loopperf.__exit__()
-
-            while (self.task_queue.in_flight + len(self.processes)) > 0:
-                if self.task_queue.error is not None:
-                    raise self.task_queue.error
-                self.workflow_eval_lock.wait(3)
-
-        except UnsupportedRequirement:
-            raise
-        except:
-            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
-                logger.error("Interrupted, workflow will be cancelled")
-            else:
-                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            if runnerjob and runnerjob.uuid and self.work_api == "containers":
-                self.api.container_requests().update(uuid=runnerjob.uuid,
-                                                     body={"priority": "0"}).execute(num_retries=self.num_retries)
-        finally:
-            self.workflow_eval_lock.release()
-            self.task_queue.drain()
-            self.stop_polling.set()
-            self.polling_thread.join()
-            self.task_queue.join()
-
-        if self.final_status == "UnsupportedRequirement":
-            raise UnsupportedRequirement("Check log for details.")
-
-        if self.final_output is None:
-            raise WorkflowException("Workflow did not return a result.")
-
-        if runtimeContext.submit and isinstance(runnerjob, Runner):
-            logger.info("Final output collection %s", runnerjob.final_output)
-        else:
-            if self.output_name is None:
-                self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
-            if self.output_tags is None:
-                self.output_tags = ""
-
-            storage_classes = runtimeContext.storage_classes.strip().split(",")
-            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
-            self.set_crunch_output()
-
-        if runtimeContext.compute_checksum:
-            adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
-            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
-
-        if self.trash_intermediate and self.final_status == "success":
-            self.trash_intermediate_output()
-
-        return (self.final_output, self.final_status)
-
-
 def versionstring():
     """Print version string of key packages for provenance and debugging."""
 
@@ -831,9 +145,19 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
                         default=None)
 
-    parser.add_argument("--submit-request-uuid", type=str,
-                        default=None,
-                        help="Update and commit supplied container request instead of creating a new one (containers API only).")
+    parser.add_argument("--always-submit-runner", action="store_true",
+                        help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
+                        default=False)
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--submit-request-uuid", type=str,
+                         default=None,
+                         help="Update and commit to supplied container request instead of creating a new one (containers API only).",
+                         metavar="UUID")
+    exgroup.add_argument("--submit-runner-cluster", type=str,
+                         help="Submit workflow runner to a remote cluster (containers API only)",
+                         default=None,
+                         metavar="CLUSTER_ID")
 
     parser.add_argument("--name", type=str,
                         help="Name to use for workflow execution instance.",
@@ -898,7 +222,8 @@ def add_arv_hints():
         "http://arvados.org/cwl#APIRequirement",
         "http://commonwl.org/cwltool#LoadListingRequirement",
         "http://arvados.org/cwl#IntermediateOutput",
-        "http://arvados.org/cwl#ReuseRequirement"
+        "http://arvados.org/cwl#ReuseRequirement",
+        "http://arvados.org/cwl#ClusterTarget"
     ])
 
 def exit_signal_handler(sigcode, frame):
@@ -941,6 +266,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
 
     add_arv_hints()
 
+    for key, val in cwltool.argparser.get_default_args().items():
+        if not hasattr(arvargs, key):
+            setattr(arvargs, key, val)
+
     try:
         if api_client is None:
             api_client = arvados.safeapi.ThreadSafeApiCache(
@@ -951,7 +280,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
             api_client.users().current().execute()
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
-        runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
+        executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
     except Exception as e:
         logger.error(e)
         return 1
@@ -976,22 +305,13 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
     else:
         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
 
-    for key, val in cwltool.argparser.get_default_args().items():
-        if not hasattr(arvargs, key):
-            setattr(arvargs, key, val)
-
-    runtimeContext = ArvRuntimeContext(vars(arvargs))
-    runtimeContext.make_fs_access = partial(CollectionFsAccess,
-                             collection_cache=runner.collection_cache)
-    runtimeContext.http_timeout = arvargs.http_timeout
-
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
-                             executor=runner.arv_executor,
+                             executor=executor.arv_executor,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
                              logger_handler=arvados.log_handler,
                              custom_schema_callback=add_arv_hints,
-                             loadingContext=runner.loadingContext,
-                             runtimeContext=runtimeContext)
+                             loadingContext=executor.loadingContext,
+                             runtimeContext=executor.runtimeContext)
index 4f762192a2a386f3c08c0d17e5704eccbf8f65e3..902b1ffba299240438c60c8a0a866db598b2a101 100644 (file)
@@ -232,4 +232,24 @@ $graph:
     coresMin:
       type: int?
       doc: Minimum cores allocated to cwl-runner
-      jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
\ No newline at end of file
+      jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
+
+- name: ClusterTarget
+  type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
+  doc: |
+    Specify where a workflow step should run
+  fields:
+    class:
+      type: string
+      doc: "Always 'arv:ClusterTarget'"
+      jsonldPredicate:
+        _id: "@type"
+        _type: "@vocab"
+    cluster_id:
+      type: string?
+      doc: The cluster to run the container
+    project_uuid:
+      type: string?
+      doc: The project that will own the container requests and intermediate collections
index b4d01019fc099179dc0cae6fcb36821bfeab0471..278ff08e91cb5fca9b5578a72297bb2045f47415 100644 (file)
@@ -12,7 +12,7 @@ import ciso8601
 import uuid
 import math
 
-from arvados_cwl.util import get_current_container, get_intermediate_collection_info
+import arvados_cwl.util
 import ruamel.yaml as yaml
 
 from cwltool.errors import WorkflowException
@@ -36,7 +36,7 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
 class ArvadosContainer(JobBase):
     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
 
-    def __init__(self, runner,
+    def __init__(self, runner, job_runtime,
                  builder,   # type: Builder
                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
                  make_path_mapper,  # type: Callable[..., PathMapper]
@@ -46,6 +46,7 @@ class ArvadosContainer(JobBase):
     ):
         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
         self.arvrunner = runner
+        self.job_runtime = job_runtime
         self.running = False
         self.uuid = None
 
@@ -60,6 +61,8 @@ class ArvadosContainer(JobBase):
         # ArvadosContainer object by CommandLineTool.job() before
         # run() is called.
 
+        runtimeContext = self.job_runtime
+
         container_request = {
             "command": self.command_line,
             "name": self.name,
@@ -168,8 +171,8 @@ class ArvadosContainer(JobBase):
                 keepemptydirs(vwd)
 
                 if not runtimeContext.current_container:
-                    runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
-                info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+                    runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
                 vwd.save_new(name=info["name"],
                              owner_uuid=self.arvrunner.project_uuid,
                              ensure_unique_name=True,
@@ -212,9 +215,9 @@ class ArvadosContainer(JobBase):
             docker_req = {"dockerImageId": "arvados/jobs"}
 
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
-                                                                     docker_req,
-                                                                     runtimeContext.pull_image,
-                                                                     self.arvrunner.project_uuid)
+                                                                    docker_req,
+                                                                    runtimeContext.pull_image,
+                                                                    self.arvrunner.project_uuid)
 
         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
         if api_req:
@@ -250,6 +253,10 @@ class ArvadosContainer(JobBase):
         if self.timelimit is not None:
             scheduling_parameters["max_run_time"] = self.timelimit
 
+        extra_submit_params = {}
+        if runtimeContext.submit_runner_cluster:
+            extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
+
         container_request["output_name"] = "Output for step %s" % (self.name)
         container_request["output_ttl"] = self.output_ttl
         container_request["mounts"] = mounts
@@ -277,11 +284,13 @@ class ArvadosContainer(JobBase):
             if runtimeContext.submit_request_uuid:
                 response = self.arvrunner.api.container_requests().update(
                     uuid=runtimeContext.submit_request_uuid,
-                    body=container_request
+                    body=container_request,
+                    **extra_submit_params
                 ).execute(num_retries=self.arvrunner.num_retries)
             else:
                 response = self.arvrunner.api.container_requests().create(
-                    body=container_request
+                    body=container_request,
+                    **extra_submit_params
                 ).execute(num_retries=self.arvrunner.num_retries)
 
             self.uuid = response["uuid"]
@@ -479,14 +488,20 @@ class RunnerContainer(Runner):
         if self.arvrunner.project_uuid:
             job_spec["owner_uuid"] = self.arvrunner.project_uuid
 
+        extra_submit_params = {}
+        if runtimeContext.submit_runner_cluster:
+            extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
+
         if runtimeContext.submit_request_uuid:
             response = self.arvrunner.api.container_requests().update(
                 uuid=runtimeContext.submit_request_uuid,
-                body=job_spec
+                body=job_spec,
+                **extra_submit_params
             ).execute(num_retries=self.arvrunner.num_retries)
         else:
             response = self.arvrunner.api.container_requests().create(
-                body=job_spec
+                body=job_spec,
+                **extra_submit_params
             ).execute(num_retries=self.arvrunner.num_retries)
 
         self.uuid = response["uuid"]
index 7508febb08cc8bd704d251cc0490ea045a75053b..84006b47d2a8ba86fd97f88b63772a53e3d711f6 100644 (file)
@@ -21,6 +21,9 @@ cached_lookups_lock = threading.Lock()
 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
 
+    if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
+        return dockerRequirement["http://arvados.org/cwl#dockerCollectionPDH"]
+
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement = copy.deepcopy(dockerRequirement)
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
@@ -31,7 +34,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
     global cached_lookups_lock
     with cached_lookups_lock:
         if dockerRequirement["dockerImageId"] in cached_lookups:
-            return dockerRequirement["dockerImageId"]
+            return cached_lookups[dockerRequirement["dockerImageId"]]
 
     with SourceLine(dockerRequirement, "dockerImageId", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
         sp = dockerRequirement["dockerImageId"].split(":")
@@ -70,10 +73,12 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         if not images:
             raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag))
 
+        pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+
         with cached_lookups_lock:
-            cached_lookups[dockerRequirement["dockerImageId"]] = True
+            cached_lookups[dockerRequirement["dockerImageId"]] = pdh
 
-    return dockerRequirement["dockerImageId"]
+    return pdh
 
 def arv_docker_clear_cache():
     global cached_lookups
index 1287fbb6eaf7b8387ca3fe700c7c97cf0678b867..9a03372d32de9375e9401fe4fc4099dce61f1181 100644 (file)
@@ -18,7 +18,7 @@ from cwltool.job import JobBase
 
 from schema_salad.sourceline import SourceLine
 
-from arvados_cwl.util import get_current_container, get_intermediate_collection_info
+import arvados_cwl.util
 import ruamel.yaml as yaml
 
 import arvados.collection
@@ -30,6 +30,7 @@ from .pathmapper import VwdPathMapper, trim_listing
 from .perf import Perf
 from . import done
 from ._version import __version__
+from .util import get_intermediate_collection_info
 
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
@@ -77,9 +78,7 @@ class ArvadosJob(JobBase):
 
                 if vwd:
                     with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                        if not runtimeContext.current_container:
-                            runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
-                        info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+                        info = get_intermediate_collection_info(self.name, None, runtimeContext.intermediate_output_ttl)
                         vwd.save_new(name=info["name"],
                                      owner_uuid=self.arvrunner.project_uuid,
                                      ensure_unique_name=True,
index 119acc30392ceb9f124a6d0101c0868beeb6c1ae..5257645304fad30e819633c6ee09164ddbc6f382 100644 (file)
@@ -7,6 +7,34 @@ from .arvjob import ArvadosJob
 from .arvcontainer import ArvadosContainer
 from .pathmapper import ArvPathMapper
 from functools import partial
+from schema_salad.sourceline import SourceLine
+from cwltool.errors import WorkflowException
+
+def validate_cluster_target(arvrunner, runtimeContext):
+    if (runtimeContext.submit_runner_cluster and
+        runtimeContext.submit_runner_cluster not in arvrunner.api._rootDesc["remoteHosts"] and
+        runtimeContext.submit_runner_cluster != arvrunner.api._rootDesc["uuidPrefix"]):
+        raise WorkflowException("Unknown or invalid cluster id '%s' known remote clusters are %s" % (runtimeContext.submit_runner_cluster,
+                                                                                                  ", ".join(arvrunner.api._rootDesc["remoteHosts"].keys())))
+def set_cluster_target(tool, arvrunner, builder, runtimeContext):
+    cluster_target_req = None
+    for field in ("hints", "requirements"):
+        if field not in tool:
+            continue
+        for item in tool[field]:
+            if item["class"] == "http://arvados.org/cwl#ClusterTarget":
+                cluster_target_req = item
+
+    if cluster_target_req is None:
+        return runtimeContext
+
+    with SourceLine(cluster_target_req, None, WorkflowException, runtimeContext.debug):
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.submit_runner_cluster = builder.do_eval(cluster_target_req.get("cluster_id")) or runtimeContext.submit_runner_cluster
+        runtimeContext.project_uuid = builder.do_eval(cluster_target_req.get("project_uuid")) or runtimeContext.project_uuid
+        validate_cluster_target(arvrunner, runtimeContext)
+
+    return runtimeContext
 
 class ArvadosCommandTool(CommandLineTool):
     """Wrap cwltool CommandLineTool to override selected methods."""
@@ -17,7 +45,7 @@ class ArvadosCommandTool(CommandLineTool):
 
     def make_job_runner(self, runtimeContext):
         if runtimeContext.work_api == "containers":
-            return partial(ArvadosContainer, self.arvrunner)
+            return partial(ArvadosContainer, self.arvrunner, runtimeContext)
         elif runtimeContext.work_api == "jobs":
             return partial(ArvadosJob, self.arvrunner)
         else:
@@ -34,15 +62,8 @@ class ArvadosCommandTool(CommandLineTool):
                                  "$(task.keep)/%s/%s")
 
     def job(self, joborder, output_callback, runtimeContext):
-
-        # Workaround for #13365
-        builderargs = runtimeContext.copy()
-        builderargs.toplevel = True
-        builderargs.tmp_outdir_prefix = ""
-        builder = self._init_job(joborder, builderargs)
-        joborder = builder.job
-
-        runtimeContext = runtimeContext.copy()
+        builder = self._init_job(joborder, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
 
         if runtimeContext.work_api == "containers":
             dockerReq, is_req = self.get_requirement("DockerRequirement")
index ae90625102ff155cd67daa44d4ab4384aa996866..4cc01a91ce80e08aca68af320a03c8b199d2b3fe 100644 (file)
@@ -12,7 +12,7 @@ from schema_salad.sourceline import SourceLine, cmap
 from cwltool.pack import pack
 from cwltool.load_tool import fetch_document
 from cwltool.process import shortname
-from cwltool.workflow import Workflow, WorkflowException
+from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import Builder
 from cwltool.context import LoadingContext
@@ -22,7 +22,7 @@ import ruamel.yaml as yaml
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
 from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool
+from .arvtool import ArvadosCommandTool, set_cluster_target
 from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -118,171 +118,201 @@ def get_overall_res_req(res_reqs):
             overall_res_req["class"] = "ResourceRequirement"
         return cmap(overall_res_req)
 
+class ArvadosWorkflowStep(WorkflowStep):
+    def __init__(self,
+                 toolpath_object,      # type: Dict[Text, Any]
+                 pos,                  # type: int
+                 loadingContext,       # type: LoadingContext
+                 arvrunner,
+                 *argc,
+                 **argv
+                ):  # type: (...) -> None
+
+        super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+        self.tool["class"] = "WorkflowStep"
+        self.arvrunner = arvrunner
+
+    def job(self, joborder, output_callback, runtimeContext):
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.toplevel = True  # Preserve behavior for #13365
+
+        builder = self._init_job({shortname(k): v for k,v in joborder.items()}, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
+        return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
+
+
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
     def __init__(self, arvrunner, toolpath_object, loadingContext):
-        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
         self.arvrunner = arvrunner
         self.wf_pdh = None
         self.dynamic_resource_req = []
         self.static_resource_req = []
         self.wf_reffiles = []
         self.loadingContext = loadingContext
+        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+        self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
 
     def job(self, joborder, output_callback, runtimeContext):
+
+        builder = self._init_job(joborder, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
+
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
-        if req:
-            with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
-                if "id" not in self.tool:
-                    raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
-            document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
+        if not req:
+            return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
 
-            discover_secondary_files(self.tool["inputs"], joborder)
+        # RunInSingleContainer is true
+
+        with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
+            if "id" not in self.tool:
+                raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
+        document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
+
+        discover_secondary_files(self.tool["inputs"], joborder)
+
+        with Perf(metrics, "subworkflow upload_deps"):
+            upload_dependencies(self.arvrunner,
+                                os.path.basename(joborder.get("id", "#")),
+                                document_loader,
+                                joborder,
+                                joborder.get("id", "#"),
+                                False)
+
+            if self.wf_pdh is None:
+                workflowobj["requirements"] = dedup_reqs(self.requirements)
+                workflowobj["hints"] = dedup_reqs(self.hints)
+
+                packed = pack(document_loader, workflowobj, uri, self.metadata)
+
+                def visit(item):
+                    for t in ("hints", "requirements"):
+                        if t not in item:
+                            continue
+                        for req in item[t]:
+                            if req["class"] == "ResourceRequirement":
+                                dyn = False
+                                for k in max_res_pars + sum_res_pars:
+                                    if k in req:
+                                        if isinstance(req[k], basestring):
+                                            if item["id"] == "#main":
+                                                # only the top-level requirements/hints may contain expressions
+                                                self.dynamic_resource_req.append(req)
+                                                dyn = True
+                                                break
+                                            else:
+                                                with SourceLine(req, k, WorkflowException):
+                                                    raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
+                                if not dyn:
+                                    self.static_resource_req.append(req)
+
+                visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
+
+                if self.static_resource_req:
+                    self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
 
-            with Perf(metrics, "subworkflow upload_deps"):
                 upload_dependencies(self.arvrunner,
-                                    os.path.basename(joborder.get("id", "#")),
+                                    runtimeContext.name,
                                     document_loader,
-                                    joborder,
-                                    joborder.get("id", "#"),
+                                    packed,
+                                    uri,
                                     False)
 
-                if self.wf_pdh is None:
-                    workflowobj["requirements"] = dedup_reqs(self.requirements)
-                    workflowobj["hints"] = dedup_reqs(self.hints)
-
-                    packed = pack(document_loader, workflowobj, uri, self.metadata)
-
-                    builder = Builder(joborder,
-                                      requirements=workflowobj["requirements"],
-                                      hints=workflowobj["hints"],
-                                      resources={})
-
-                    def visit(item):
-                        for t in ("hints", "requirements"):
-                            if t not in item:
-                                continue
-                            for req in item[t]:
-                                if req["class"] == "ResourceRequirement":
-                                    dyn = False
-                                    for k in max_res_pars + sum_res_pars:
-                                        if k in req:
-                                            if isinstance(req[k], basestring):
-                                                if item["id"] == "#main":
-                                                    # only the top-level requirements/hints may contain expressions
-                                                    self.dynamic_resource_req.append(req)
-                                                    dyn = True
-                                                    break
-                                                else:
-                                                    with SourceLine(req, k, WorkflowException):
-                                                        raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
-                                    if not dyn:
-                                        self.static_resource_req.append(req)
-
-                    visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
-
-                    if self.static_resource_req:
-                        self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
-
-                    upload_dependencies(self.arvrunner,
-                                        runtimeContext.name,
-                                        document_loader,
-                                        packed,
-                                        uri,
-                                        False)
-
-                    # Discover files/directories referenced by the
-                    # workflow (mainly "default" values)
-                    visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
-
-
-            if self.dynamic_resource_req:
-                builder = Builder(joborder,
-                                  requirements=self.requirements,
-                                  hints=self.hints,
-                                  resources={})
-
-                # Evaluate dynamic resource requirements using current builder
-                rs = copy.copy(self.static_resource_req)
-                for dyn_rs in self.dynamic_resource_req:
-                    eval_req = {"class": "ResourceRequirement"}
-                    for a in max_res_pars + sum_res_pars:
-                        if a in dyn_rs:
-                            eval_req[a] = builder.do_eval(dyn_rs[a])
-                    rs.append(eval_req)
-                job_res_reqs = [get_overall_res_req(rs)]
-            else:
-                job_res_reqs = self.static_resource_req
-
-            with Perf(metrics, "subworkflow adjust"):
-                joborder_resolved = copy.deepcopy(joborder)
-                joborder_keepmount = copy.deepcopy(joborder)
-
-                reffiles = []
-                visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
-
-                mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
-                                       "/keep/%s",
-                                       "/keep/%s/%s")
-
-                # For containers API, we need to make sure any extra
-                # referenced files (ie referenced by the workflow but
-                # not in the inputs) are included in the mounts.
-                if self.wf_reffiles:
-                    runtimeContext = runtimeContext.copy()
-                    runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
-
-                def keepmount(obj):
-                    remove_redundant_fields(obj)
-                    with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
-                        if "location" not in obj:
-                            raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
-                    with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
-                        if obj["location"].startswith("keep:"):
-                            obj["location"] = mapper.mapper(obj["location"]).target
-                            if "listing" in obj:
-                                del obj["listing"]
-                        elif obj["location"].startswith("_:"):
-                            del obj["location"]
-                        else:
-                            raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
-
-                visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
-
-                def resolved(obj):
-                    if obj["location"].startswith("keep:"):
-                        obj["location"] = mapper.mapper(obj["location"]).resolved
-
-                visit_class(joborder_resolved, ("File", "Directory"), resolved)
-
-                if self.wf_pdh is None:
-                    adjustFileObjs(packed, keepmount)
-                    adjustDirObjs(packed, keepmount)
-                    self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
-
-            wf_runner = cmap({
-                "class": "CommandLineTool",
-                "baseCommand": "cwltool",
-                "inputs": self.tool["inputs"],
-                "outputs": self.tool["outputs"],
-                "stdout": "cwl.output.json",
-                "requirements": self.requirements+job_res_reqs+[
-                    {"class": "InlineJavascriptRequirement"},
-                    {
-                    "class": "InitialWorkDirRequirement",
-                    "listing": [{
-                            "entryname": "workflow.cwl",
-                            "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
-                        }, {
-                            "entryname": "cwl.input.yml",
-                            "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
-                        }]
-                }],
-                "hints": self.hints,
-                "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
-                "id": "#"
-            })
-            return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
+                # Discover files/directories referenced by the
+                # workflow (mainly "default" values)
+                visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
+
+
+        if self.dynamic_resource_req:
+            # Evaluate dynamic resource requirements using current builder
+            rs = copy.copy(self.static_resource_req)
+            for dyn_rs in self.dynamic_resource_req:
+                eval_req = {"class": "ResourceRequirement"}
+                for a in max_res_pars + sum_res_pars:
+                    if a in dyn_rs:
+                        eval_req[a] = builder.do_eval(dyn_rs[a])
+                rs.append(eval_req)
+            job_res_reqs = [get_overall_res_req(rs)]
         else:
-            return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
+            job_res_reqs = self.static_resource_req
+
+        with Perf(metrics, "subworkflow adjust"):
+            joborder_resolved = copy.deepcopy(joborder)
+            joborder_keepmount = copy.deepcopy(joborder)
+
+            reffiles = []
+            visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
+
+            mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
+                                   "/keep/%s",
+                                   "/keep/%s/%s")
+
+            # For containers API, we need to make sure any extra
+            # referenced files (ie referenced by the workflow but
+            # not in the inputs) are included in the mounts.
+            if self.wf_reffiles:
+                runtimeContext = runtimeContext.copy()
+                runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
+
+            def keepmount(obj):
+                remove_redundant_fields(obj)
+                with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
+                    if "location" not in obj:
+                        raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
+                with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
+                    if obj["location"].startswith("keep:"):
+                        obj["location"] = mapper.mapper(obj["location"]).target
+                        if "listing" in obj:
+                            del obj["listing"]
+                    elif obj["location"].startswith("_:"):
+                        del obj["location"]
+                    else:
+                        raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
+
+            visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
+
+            def resolved(obj):
+                if obj["location"].startswith("keep:"):
+                    obj["location"] = mapper.mapper(obj["location"]).resolved
+
+            visit_class(joborder_resolved, ("File", "Directory"), resolved)
+
+            if self.wf_pdh is None:
+                adjustFileObjs(packed, keepmount)
+                adjustDirObjs(packed, keepmount)
+                self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+
+        wf_runner = cmap({
+            "class": "CommandLineTool",
+            "baseCommand": "cwltool",
+            "inputs": self.tool["inputs"],
+            "outputs": self.tool["outputs"],
+            "stdout": "cwl.output.json",
+            "requirements": self.requirements+job_res_reqs+[
+                {"class": "InlineJavascriptRequirement"},
+                {
+                "class": "InitialWorkDirRequirement",
+                "listing": [{
+                        "entryname": "workflow.cwl",
+                        "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
+                    }, {
+                        "entryname": "cwl.input.yml",
+                        "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+                    }]
+            }],
+            "hints": self.hints,
+            "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
+            "id": "#"
+        })
+        return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
+
+    def make_workflow_step(self,
+                           toolpath_object,      # type: Dict[Text, Any]
+                           pos,                  # type: int
+                           loadingContext,       # type: LoadingContext
+                           *argc,
+                           **argv
+    ):
+        # (...) -> WorkflowStep
+        return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)
index 48a3edec5227aa54a6900a6ff1de6084781b83bb..7831e1cfd0822abbcac5a77c460a33e8ff492714 100644 (file)
@@ -3,6 +3,7 @@
 # SPDX-License-Identifier: Apache-2.0
 
 from cwltool.context import LoadingContext, RuntimeContext
+from collections import namedtuple
 
 class ArvLoadingContext(LoadingContext):
     def __init__(self, kwargs=None):
@@ -30,5 +31,11 @@ class ArvRuntimeContext(RuntimeContext):
         self.storage_classes = "default"
         self.current_container = None
         self.http_timeout = 300
+        self.submit_runner_cluster = None
+        self.cluster_target_id = 0
+        self.always_submit_runner = False
 
         super(ArvRuntimeContext, self).__init__(kwargs)
+
+        if self.submit_request_uuid:
+            self.submit_runner_cluster = self.submit_request_uuid[0:5]
index 9f0c91f111b0f547c2bb60f3f9c48faf0bbe0404..7512d5bef27f28014f650d897d24e3d59cb7b3c4 100644 (file)
@@ -104,7 +104,7 @@ def run():
         arvargs.output_tags = output_tags
         arvargs.thread_count = 1
 
-        runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
+        runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
             api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
                                           arvargs=arvargs)
 
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
new file mode 100644 (file)
index 0000000..6cac709
--- /dev/null
@@ -0,0 +1,732 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import argparse
+import logging
+import os
+import sys
+import threading
+import copy
+import json
+import re
+from functools import partial
+import time
+
+from cwltool.errors import WorkflowException
+import cwltool.workflow
+from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
+
+import arvados
+import arvados.config
+from arvados.keep import KeepClient
+from arvados.errors import ApiError
+
+import arvados_cwl.util
+from .arvcontainer import RunnerContainer
+from .arvjob import RunnerJob, RunnerTemplate
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .arvtool import ArvadosCommandTool, validate_cluster_target
+from .arvworkflow import ArvadosWorkflow, upload_workflow
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
+from .perf import Perf
+from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
+from .context import ArvLoadingContext, ArvRuntimeContext
+from ._version import __version__
+
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.command_line_tool import compute_checksums
+
+logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
+
+DEFAULT_PRIORITY = 500
+
+class RuntimeStatusLoggingHandler(logging.Handler):
+    """
+    Intercepts logging calls and report them as runtime statuses on runner
+    containers.
+    """
+    def __init__(self, runtime_status_update_func):
+        super(RuntimeStatusLoggingHandler, self).__init__()
+        self.runtime_status_update = runtime_status_update_func
+
+    def emit(self, record):
+        kind = None
+        if record.levelno >= logging.ERROR:
+            kind = 'error'
+        elif record.levelno >= logging.WARNING:
+            kind = 'warning'
+        if kind is not None:
+            log_msg = record.getMessage()
+            if '\n' in log_msg:
+                # If the logged message is multi-line, use its first line as status
+                # and the rest as detail.
+                status, detail = log_msg.split('\n', 1)
+                self.runtime_status_update(
+                    kind,
+                    "%s: %s" % (record.name, status),
+                    detail
+                )
+            else:
+                self.runtime_status_update(
+                    kind,
+                    "%s: %s" % (record.name, record.getMessage())
+                )
+
+class ArvCwlExecutor(object):
+    """Execute a CWL tool or workflow, submit work (using either jobs or
+    containers API), wait for them to complete, and report output.
+
+    """
+
+    def __init__(self, api_client,
+                 arvargs=None,
+                 keep_client=None,
+                 num_retries=4,
+                 thread_count=4):
+
+        if arvargs is None:
+            arvargs = argparse.Namespace()
+            arvargs.work_api = None
+            arvargs.output_name = None
+            arvargs.output_tags = None
+            arvargs.thread_count = 1
+
+        self.api = api_client
+        self.processes = {}
+        self.workflow_eval_lock = threading.Condition(threading.RLock())
+        self.final_output = None
+        self.final_status = None
+        self.num_retries = num_retries
+        self.uuid = None
+        self.stop_polling = threading.Event()
+        self.poll_api = None
+        self.pipeline = None
+        self.final_output_collection = None
+        self.output_name = arvargs.output_name
+        self.output_tags = arvargs.output_tags
+        self.project_uuid = None
+        self.intermediate_output_ttl = 0
+        self.intermediate_output_collections = []
+        self.trash_intermediate = False
+        self.thread_count = arvargs.thread_count
+        self.poll_interval = 12
+        self.loadingContext = None
+
+        if keep_client is not None:
+            self.keep_client = keep_client
+        else:
+            self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
+
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
+        self.fetcher_constructor = partial(CollectionFetcher,
+                                           api_client=self.api,
+                                           fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+                                           num_retries=self.num_retries)
+
+        self.work_api = None
+        expected_api = ["jobs", "containers"]
+        for api in expected_api:
+            try:
+                methods = self.api._rootDesc.get('resources')[api]['methods']
+                if ('httpMethod' in methods['create'] and
+                    (arvargs.work_api == api or arvargs.work_api is None)):
+                    self.work_api = api
+                    break
+            except KeyError:
+                pass
+
+        if not self.work_api:
+            if arvargs.work_api is None:
+                raise Exception("No supported APIs")
+            else:
+                raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
+
+        if self.work_api == "jobs":
+            logger.warn("""
+*******************************
+Using the deprecated 'jobs' API.
+
+To get rid of this warning:
+
+Users: read about migrating at
+http://doc.arvados.org/user/cwl/cwl-style.html#migrate
+and use the option --api=containers
+
+Admins: configure the cluster to disable the 'jobs' API as described at:
+http://doc.arvados.org/install/install-api-server.html#disable_api_methods
+*******************************""")
+
+        self.loadingContext = ArvLoadingContext(vars(arvargs))
+        self.loadingContext.fetcher_constructor = self.fetcher_constructor
+        self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
+        self.loadingContext.construct_tool_object = self.arv_make_tool
+
+        # Add a custom logging handler to the root logger for runtime status reporting
+        # if running inside a container
+        if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
+            root_logger = logging.getLogger('')
+            handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
+            root_logger.addHandler(handler)
+
+        self.runtimeContext = ArvRuntimeContext(vars(arvargs))
+        self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
+                                                     collection_cache=self.collection_cache)
+
+        validate_cluster_target(self, self.runtimeContext)
+
+
+    def arv_make_tool(self, toolpath_object, loadingContext):
+        if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
+            return ArvadosCommandTool(self, toolpath_object, loadingContext)
+        elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
+            return ArvadosWorkflow(self, toolpath_object, loadingContext)
+        else:
+            return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
+
+    def output_callback(self, out, processStatus):
+        with self.workflow_eval_lock:
+            if processStatus == "success":
+                logger.info("Overall process status is %s", processStatus)
+                state = "Complete"
+            else:
+                logger.error("Overall process status is %s", processStatus)
+                state = "Failed"
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                        body={"state": state}).execute(num_retries=self.num_retries)
+            self.final_status = processStatus
+            self.final_output = out
+            self.workflow_eval_lock.notifyAll()
+
+
+    def start_run(self, runnable, runtimeContext):
+        self.task_queue.add(partial(runnable.run, runtimeContext))
+
+    def process_submitted(self, container):
+        with self.workflow_eval_lock:
+            self.processes[container.uuid] = container
+
+    def process_done(self, uuid, record):
+        with self.workflow_eval_lock:
+            j = self.processes[uuid]
+            logger.info("%s %s is %s", self.label(j), uuid, record["state"])
+            self.task_queue.add(partial(j.done, record))
+            del self.processes[uuid]
+
+    def runtime_status_update(self, kind, message, detail=None):
+        """
+        Updates the runtime_status field on the runner container.
+        Called when there's a need to report errors, warnings or just
+        activity statuses, for example in the RuntimeStatusLoggingHandler.
+        """
+        with self.workflow_eval_lock:
+            current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
+            if current is None:
+                return
+            runtime_status = current.get('runtime_status', {})
+            # In case of status being an error, only report the first one.
+            if kind == 'error':
+                if not runtime_status.get('error'):
+                    runtime_status.update({
+                        'error': message
+                    })
+                    if detail is not None:
+                        runtime_status.update({
+                            'errorDetail': detail
+                        })
+                # Further errors are only mentioned as a count.
+                else:
+                    # Get anything before an optional 'and N more' string.
+                    try:
+                        error_msg = re.match(
+                            r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+                        more_failures = re.match(
+                            r'.*\(and (\d+) more\)', runtime_status.get('error'))
+                    except TypeError:
+                        # Ignore tests stubbing errors
+                        return
+                    if more_failures:
+                        failure_qty = int(more_failures.groups()[0])
+                        runtime_status.update({
+                            'error': "%s (and %d more)" % (error_msg, failure_qty+1)
+                        })
+                    else:
+                        runtime_status.update({
+                            'error': "%s (and 1 more)" % error_msg
+                        })
+            elif kind in ['warning', 'activity']:
+                # Record the last warning/activity status without regard of
+                # previous occurences.
+                runtime_status.update({
+                    kind: message
+                })
+                if detail is not None:
+                    runtime_status.update({
+                        kind+"Detail": detail
+                    })
+            else:
+                # Ignore any other status kind
+                return
+            try:
+                self.api.containers().update(uuid=current['uuid'],
+                                            body={
+                                                'runtime_status': runtime_status,
+                                            }).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.info("Couldn't update runtime_status: %s", e)
+
+    def wrapped_callback(self, cb, obj, st):
+        with self.workflow_eval_lock:
+            cb(obj, st)
+            self.workflow_eval_lock.notifyAll()
+
+    def get_wrapped_callback(self, cb):
+        return partial(self.wrapped_callback, cb)
+
+    def on_message(self, event):
+        if event.get("object_uuid") in self.processes and event["event_type"] == "update":
+            uuid = event["object_uuid"]
+            if event["properties"]["new_attributes"]["state"] == "Running":
+                with self.workflow_eval_lock:
+                    j = self.processes[uuid]
+                    if j.running is False:
+                        j.running = True
+                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                        logger.info("%s %s is Running", self.label(j), uuid)
+            elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
+                self.process_done(uuid, event["properties"]["new_attributes"])
+
+    def label(self, obj):
+        return "[%s %s]" % (self.work_api[0:-1], obj.name)
+
+    def poll_states(self):
+        """Poll status of jobs or containers listed in the processes dict.
+
+        Runs in a separate thread.
+        """
+
+        try:
+            remain_wait = self.poll_interval
+            while True:
+                if remain_wait > 0:
+                    self.stop_polling.wait(remain_wait)
+                if self.stop_polling.is_set():
+                    break
+                with self.workflow_eval_lock:
+                    keys = list(self.processes.keys())
+                if not keys:
+                    remain_wait = self.poll_interval
+                    continue
+
+                begin_poll = time.time()
+                if self.work_api == "containers":
+                    table = self.poll_api.container_requests()
+                elif self.work_api == "jobs":
+                    table = self.poll_api.jobs()
+
+                pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
+
+                while keys:
+                    page = keys[:pageSize]
+                    keys = keys[pageSize:]
+                    try:
+                        proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
+                    except Exception as e:
+                        logger.warn("Error checking states on API server: %s", e)
+                        remain_wait = self.poll_interval
+                        continue
+
+                    for p in proc_states["items"]:
+                        self.on_message({
+                            "object_uuid": p["uuid"],
+                            "event_type": "update",
+                            "properties": {
+                                "new_attributes": p
+                            }
+                        })
+                finish_poll = time.time()
+                remain_wait = self.poll_interval - (finish_poll - begin_poll)
+        except:
+            logger.exception("Fatal error in state polling thread.")
+            with self.workflow_eval_lock:
+                self.processes.clear()
+                self.workflow_eval_lock.notifyAll()
+        finally:
+            self.stop_polling.set()
+
+    def add_intermediate_output(self, uuid):
+        if uuid:
+            self.intermediate_output_collections.append(uuid)
+
+    def trash_intermediate_output(self):
+        logger.info("Cleaning up intermediate output collections")
+        for i in self.intermediate_output_collections:
+            try:
+                self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
+            except:
+                logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+                break
+
+    def check_features(self, obj):
+        if isinstance(obj, dict):
+            if obj.get("writable") and self.work_api != "containers":
+                raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
+            if obj.get("class") == "DockerRequirement":
+                if obj.get("dockerOutputDirectory"):
+                    if self.work_api != "containers":
+                        raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+                            "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+                    if not obj.get("dockerOutputDirectory").startswith('/'):
+                        raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+                            "Option 'dockerOutputDirectory' must be an absolute path.")
+            if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
+                raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
+            for v in obj.itervalues():
+                self.check_features(v)
+        elif isinstance(obj, list):
+            for i,v in enumerate(obj):
+                with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
+                    self.check_features(v)
+
+    def make_output_collection(self, name, storage_classes, tagsString, outputObj):
+        outputObj = copy.deepcopy(outputObj)
+
+        files = []
+        def capture(fileobj):
+            files.append(fileobj)
+
+        adjustDirObjs(outputObj, capture)
+        adjustFileObjs(outputObj, capture)
+
+        generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
+
+        final = arvados.collection.Collection(api_client=self.api,
+                                              keep_client=self.keep_client,
+                                              num_retries=self.num_retries)
+
+        for k,v in generatemapper.items():
+            if k.startswith("_:"):
+                if v.type == "Directory":
+                    continue
+                if v.type == "CreateFile":
+                    with final.open(v.target, "wb") as f:
+                        f.write(v.resolved.encode("utf-8"))
+                    continue
+
+            if not k.startswith("keep:"):
+                raise Exception("Output source is not in keep or a literal")
+            sp = k.split("/")
+            srccollection = sp[0][5:]
+            try:
+                reader = self.collection_cache.get(srccollection)
+                srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
+                final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+            except arvados.errors.ArgumentError as e:
+                logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                raise
+            except IOError as e:
+                logger.warn("While preparing output collection: %s", e)
+
+        def rewrite(fileobj):
+            fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
+            for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
+                if k in fileobj:
+                    del fileobj[k]
+
+        adjustDirObjs(outputObj, rewrite)
+        adjustFileObjs(outputObj, rewrite)
+
+        with final.open("cwl.output.json", "w") as f:
+            json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
+
+        final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
+
+        logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
+                    final.api_response()["name"],
+                    final.manifest_locator())
+
+        final_uuid = final.manifest_locator()
+        tags = tagsString.split(',')
+        for tag in tags:
+             self.api.links().create(body={
+                "head_uuid": final_uuid, "link_class": "tag", "name": tag
+                }).execute(num_retries=self.num_retries)
+
+        def finalcollection(fileobj):
+            fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+        adjustDirObjs(outputObj, finalcollection)
+        adjustFileObjs(outputObj, finalcollection)
+
+        return (outputObj, final)
+
+    def set_crunch_output(self):
+        if self.work_api == "containers":
+            current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
+            if current is None:
+                return
+            try:
+                self.api.containers().update(uuid=current['uuid'],
+                                             body={
+                                                 'output': self.final_output_collection.portable_data_hash(),
+                                             }).execute(num_retries=self.num_retries)
+                self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
+                                              body={
+                                                  'is_trashed': True
+                                              }).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.info("Setting container output: %s", e)
+        elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
+            self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
+                                   body={
+                                       'output': self.final_output_collection.portable_data_hash(),
+                                       'success': self.final_status == "success",
+                                       'progress':1.0
+                                   }).execute(num_retries=self.num_retries)
+
+    def arv_executor(self, tool, job_order, runtimeContext, logger=None):
+        self.debug = runtimeContext.debug
+
+        tool.visit(self.check_features)
+
+        self.project_uuid = runtimeContext.project_uuid
+        self.pipeline = None
+        self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+        self.secret_store = runtimeContext.secret_store
+
+        self.trash_intermediate = runtimeContext.trash_intermediate
+        if self.trash_intermediate and self.work_api != "containers":
+            raise Exception("--trash-intermediate is only supported with --api=containers.")
+
+        self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
+        if self.intermediate_output_ttl and self.work_api != "containers":
+            raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
+        if self.intermediate_output_ttl < 0:
+            raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
+
+        if runtimeContext.submit_request_uuid and self.work_api != "containers":
+            raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+
+        if not runtimeContext.name:
+            runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+
+        # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+        # Also uploads docker images.
+        merged_map = upload_workflow_deps(self, tool)
+
+        # Reload tool object which may have been updated by
+        # upload_workflow_deps
+        # Don't validate this time because it will just print redundant errors.
+        loadingContext = self.loadingContext.copy()
+        loadingContext.loader = tool.doc_loader
+        loadingContext.avsc_names = tool.doc_schema
+        loadingContext.metadata = tool.metadata
+        loadingContext.do_validate = False
+
+        tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
+                                  loadingContext)
+
+        # Upload local file references in the job order.
+        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+                                     tool, job_order)
+
+        existing_uuid = runtimeContext.update_workflow
+        if existing_uuid or runtimeContext.create_workflow:
+            # Create a pipeline template or workflow record and exit.
+            if self.work_api == "jobs":
+                tmpl = RunnerTemplate(self, tool, job_order,
+                                      runtimeContext.enable_reuse,
+                                      uuid=existing_uuid,
+                                      submit_runner_ram=runtimeContext.submit_runner_ram,
+                                      name=runtimeContext.name,
+                                      merged_map=merged_map)
+                tmpl.save()
+                # cwltool.main will write our return value to stdout.
+                return (tmpl.uuid, "success")
+            elif self.work_api == "containers":
+                return (upload_workflow(self, tool, job_order,
+                                        self.project_uuid,
+                                        uuid=existing_uuid,
+                                        submit_runner_ram=runtimeContext.submit_runner_ram,
+                                        name=runtimeContext.name,
+                                        merged_map=merged_map),
+                        "success")
+
+        self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
+        self.eval_timeout = runtimeContext.eval_timeout
+
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.use_container = True
+        runtimeContext.tmpdir_prefix = "tmp"
+        runtimeContext.work_api = self.work_api
+
+        if self.work_api == "containers":
+            if self.ignore_docker_for_reuse:
+                raise Exception("--ignore-docker-for-reuse not supported with containers API.")
+            runtimeContext.outdir = "/var/spool/cwl"
+            runtimeContext.docker_outdir = "/var/spool/cwl"
+            runtimeContext.tmpdir = "/tmp"
+            runtimeContext.docker_tmpdir = "/tmp"
+        elif self.work_api == "jobs":
+            if runtimeContext.priority != DEFAULT_PRIORITY:
+                raise Exception("--priority not implemented for jobs API.")
+            runtimeContext.outdir = "$(task.outdir)"
+            runtimeContext.docker_outdir = "$(task.outdir)"
+            runtimeContext.tmpdir = "$(task.tmpdir)"
+
+        if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
+            raise Exception("--priority must be in the range 1..1000.")
+
+        runnerjob = None
+        if runtimeContext.submit:
+            # Submit a runner job to run the workflow for us.
+            if self.work_api == "containers":
+                if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
+                    runtimeContext.runnerjob = tool.tool["id"]
+                    runnerjob = tool.job(job_order,
+                                         self.output_callback,
+                                         runtimeContext).next()
+                else:
+                    runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
+                                                self.output_name,
+                                                self.output_tags,
+                                                submit_runner_ram=runtimeContext.submit_runner_ram,
+                                                name=runtimeContext.name,
+                                                on_error=runtimeContext.on_error,
+                                                submit_runner_image=runtimeContext.submit_runner_image,
+                                                intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
+                                                merged_map=merged_map,
+                                                priority=runtimeContext.priority,
+                                                secret_store=self.secret_store)
+            elif self.work_api == "jobs":
+                runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
+                                      self.output_name,
+                                      self.output_tags,
+                                      submit_runner_ram=runtimeContext.submit_runner_ram,
+                                      name=runtimeContext.name,
+                                      on_error=runtimeContext.on_error,
+                                      submit_runner_image=runtimeContext.submit_runner_image,
+                                      merged_map=merged_map)
+        elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
+            # Create pipeline for local run
+            self.pipeline = self.api.pipeline_instances().create(
+                body={
+                    "owner_uuid": self.project_uuid,
+                    "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
+                    "components": {},
+                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+            logger.info("Pipeline instance %s", self.pipeline["uuid"])
+
+        if runnerjob and not runtimeContext.wait:
+            submitargs = runtimeContext.copy()
+            submitargs.submit = False
+            runnerjob.run(submitargs)
+            return (runnerjob.uuid, "success")
+
+        current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
+        if current_container:
+            logger.info("Running inside container %s", current_container.get("uuid"))
+
+        self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
+        self.polling_thread = threading.Thread(target=self.poll_states)
+        self.polling_thread.start()
+
+        self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
+
+        try:
+            self.workflow_eval_lock.acquire()
+            if runnerjob:
+                jobiter = iter((runnerjob,))
+            else:
+                if runtimeContext.cwl_runner_job is not None:
+                    self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+                jobiter = tool.job(job_order,
+                                   self.output_callback,
+                                   runtimeContext)
+
+            # Holds the lock while this code runs and releases it when
+            # it is safe to do so in self.workflow_eval_lock.wait(),
+            # at which point on_message can update job state and
+            # process output callbacks.
+
+            loopperf = Perf(metrics, "jobiter")
+            loopperf.__enter__()
+            for runnable in jobiter:
+                loopperf.__exit__()
+
+                if self.stop_polling.is_set():
+                    break
+
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+
+                if runnable:
+                    with Perf(metrics, "run"):
+                        self.start_run(runnable, runtimeContext)
+                else:
+                    if (self.task_queue.in_flight + len(self.processes)) > 0:
+                        self.workflow_eval_lock.wait(3)
+                    else:
+                        logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
+                        break
+                loopperf.__enter__()
+            loopperf.__exit__()
+
+            while (self.task_queue.in_flight + len(self.processes)) > 0:
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+                self.workflow_eval_lock.wait(3)
+
+        except UnsupportedRequirement:
+            raise
+        except:
+            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+                logger.error("Interrupted, workflow will be cancelled")
+            else:
+                logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            if runnerjob and runnerjob.uuid and self.work_api == "containers":
+                self.api.container_requests().update(uuid=runnerjob.uuid,
+                                                     body={"priority": "0"}).execute(num_retries=self.num_retries)
+        finally:
+            self.workflow_eval_lock.release()
+            self.task_queue.drain()
+            self.stop_polling.set()
+            self.polling_thread.join()
+            self.task_queue.join()
+
+        if self.final_status == "UnsupportedRequirement":
+            raise UnsupportedRequirement("Check log for details.")
+
+        if self.final_output is None:
+            raise WorkflowException("Workflow did not return a result.")
+
+        if runtimeContext.submit and isinstance(runnerjob, Runner):
+            logger.info("Final output collection %s", runnerjob.final_output)
+        else:
+            if self.output_name is None:
+                self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
+            if self.output_tags is None:
+                self.output_tags = ""
+
+            storage_classes = runtimeContext.storage_classes.strip().split(",")
+            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
+            self.set_crunch_output()
+
+        if runtimeContext.compute_checksum:
+            adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
+            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
+        if self.trash_intermediate and self.final_status == "success":
+            self.trash_intermediate_output()
+
+        return (self.final_output, self.final_status)
index 316a652529b384205661827e2c46d056025d5506..b22b2ffd6f4574ea6a774064d782fbfd07c7a94d 100644 (file)
@@ -265,6 +265,12 @@ class CollectionFetcher(DefaultFetcher):
 
         return super(CollectionFetcher, self).urljoin(base_url, url)
 
+    schemes = [u"file", u"http", u"https", u"mailto", u"keep"]
+
+    def supported_schemes(self):  # type: () -> List[Text]
+        return self.schemes
+
+
 workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
 pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
 
index d083b78f5a061906164a5978530af9230e767473..26c85d300ddcb17c8038d31c4d0f8cd1d39aabc9 100644 (file)
@@ -8,7 +8,7 @@ import uuid
 import os
 import urllib
 
-from arvados_cwl.util import get_current_container, get_intermediate_collection_info
+import arvados_cwl.util
 import arvados.commands.run
 import arvados.collection
 
@@ -155,8 +155,8 @@ class ArvPathMapper(PathMapper):
                 for l in srcobj.get("listing", []):
                     self.addentry(l, c, ".", remap)
 
-                container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
-                info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
+                container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
 
                 c.save_new(name=info["name"],
                            owner_uuid=self.arvrunner.project_uuid,
@@ -174,8 +174,8 @@ class ArvPathMapper(PathMapper):
                                                   num_retries=self.arvrunner.num_retries                                                  )
                 self.addentry(srcobj, c, ".", remap)
 
-                container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
-                info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
+                container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
 
                 c.save_new(name=info["name"],
                            owner_uuid=self.arvrunner.project_uuid,
index 41166c5122fa50ee04b3880948a7205b6f4e9ba8..a846f2b0016931dcd6a938c47f9572083963d2bc 100644 (file)
@@ -26,7 +26,7 @@ from cwltool.pack import pack
 import arvados.collection
 import ruamel.yaml as yaml
 
-from .arvdocker import arv_docker_get_image
+import arvados_cwl.arvdocker
 from .pathmapper import ArvPathMapper, trim_listing
 from ._version import __version__
 from . import done
@@ -215,9 +215,9 @@ def upload_docker(arvrunner, tool):
                 # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-            arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
         else:
-            arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
@@ -244,6 +244,8 @@ def packed_workflow(arvrunner, tool, merged_map):
                 v["location"] = merged_map[cur_id].resolved[v["location"]]
             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            if v.get("class") == "DockerRequirement":
+                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -324,10 +326,10 @@ def arvados_jobs_image(arvrunner, img):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
-    return img
+
 
 def upload_workflow_collection(arvrunner, name, packed):
     collection = arvados.collection.Collection(api_client=arvrunner.api,
index 2b7b31b9f3f4b4070cbd14d986ffe87259989200..5d373282b6ca8ba3129b28c78e1bbe8934fda005 100644 (file)
@@ -33,12 +33,10 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20180806194258',
-          'schema-salad==2.7.20180719125426',
+          'cwltool==1.0.20181116032456',
+          'schema-salad==2.7.20181116024232',
           'typing >= 3.6.4',
-          # Need to limit ruamel.yaml version to 0.15.26 because of bug
-          # https://bitbucket.org/ruamel/yaml/issues/227/regression-parsing-flow-mapping
-          'ruamel.yaml >=0.13.11, <= 0.15.26',
+          'ruamel.yaml >=0.15.54, <=0.15.77',
           'arvados-python-client>=1.1.4.20180607143841',
           'setuptools',
           'ciso8601 >=1.0.6, <2.0.0',
diff --git a/sdk/cwl/tests/federation/README b/sdk/cwl/tests/federation/README
new file mode 100644 (file)
index 0000000..e5eb04c
--- /dev/null
@@ -0,0 +1,44 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+Arvados federated workflow testing
+
+Requires cwltool 1.0.20181109150732 or later
+
+Create main-test.json:
+
+{
+    "acr": "/path/to/arvados-cwl-runner",
+    "arvado_api_host_insecure": false,
+    "arvados_api_hosts": [
+        "c97qk.arvadosapi.com",
+        "4xphq.arvadosapi.com",
+        "9tee4.arvadosapi.com"
+    ],
+    "arvados_api_token": "...",
+    "arvados_cluster_ids": [
+        "c97qk",
+        "4xphq",
+        "9tee4"
+    ]
+}
+
+Or create an arvbox test cluster:
+
+$ cwltool --enable-ext arvbox-make-federation.cwl --arvbox_base ~/.arvbox/ --in_acr /path/to/arvados-cwl-runner > main-test.json
+
+
+Run tests:
+
+$ cwltool main.cwl main-test.json
+
+
+List test cases:
+
+$ cwltool --print-targets main.cwl
+
+
+Run a specific test case:
+
+$ cwltool -t twostep-remote-copy-to-home main.cwl main-test.json
diff --git a/sdk/cwl/tests/federation/arvbox-make-federation.cwl b/sdk/cwl/tests/federation/arvbox-make-federation.cwl
new file mode 100644 (file)
index 0000000..9a08195
--- /dev/null
@@ -0,0 +1,72 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
+requirements:
+  ScatterFeatureRequirement: {}
+  StepInputExpressionRequirement: {}
+  cwltool:LoadListingRequirement:
+    loadListing: no_listing
+  InlineJavascriptRequirement: {}
+inputs:
+  containers:
+    type: string[]
+    default: [fedbox1, fedbox2, fedbox3]
+  arvbox_base: Directory
+  in_acr: string?
+  insecure:
+    type: boolean
+    default: true
+outputs:
+  arvados_api_token:
+    type: string
+    outputSource: setup-user/test_user_token
+  arvados_api_hosts:
+    type: string[]
+    outputSource: start/container_host
+  arvados_cluster_ids:
+    type: string[]
+    outputSource: start/cluster_id
+  acr:
+    type: string?
+    outputSource: in_acr
+  arvado_api_host_insecure:
+    type: boolean
+    outputSource: insecure
+steps:
+  mkdir:
+    in:
+      containers: containers
+      arvbox_base: arvbox_base
+    out: [arvbox_data]
+    run: arvbox/mkdir.cwl
+  start:
+    in:
+      container_name: containers
+      arvbox_data: mkdir/arvbox_data
+    out: [cluster_id, container_host, arvbox_data_out, superuser_token]
+    scatter: [container_name, arvbox_data]
+    scatterMethod: dotproduct
+    run: arvbox/start.cwl
+  fed-config:
+    in:
+      container_name: containers
+      this_cluster_id: start/cluster_id
+      cluster_ids: start/cluster_id
+      cluster_hosts: start/container_host
+      arvbox_data: start/arvbox_data_out
+    out: []
+    scatter: [container_name, this_cluster_id, arvbox_data]
+    scatterMethod: dotproduct
+    run: arvbox/fed-config.cwl
+  setup-user:
+    in:
+      container_host: {source: start/container_host, valueFrom: "$(self[0])"}
+      superuser_token: {source: start/superuser_token, valueFrom: "$(self[0])"}
+    out: [test_user_uuid, test_user_token]
+    run: arvbox/setup-user.cwl
diff --git a/sdk/cwl/tests/federation/arvbox/fed-config.cwl b/sdk/cwl/tests/federation/arvbox/fed-config.cwl
new file mode 100644 (file)
index 0000000..77567ee
--- /dev/null
@@ -0,0 +1,66 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
+inputs:
+  container_name: string
+  this_cluster_id: string
+  cluster_ids: string[]
+  cluster_hosts: string[]
+  arvbox_data: Directory
+outputs:
+  arvbox_data_out:
+    type: Directory
+    outputBinding:
+      outputEval: $(inputs.arvbox_data)
+requirements:
+  EnvVarRequirement:
+    envDef:
+      ARVBOX_CONTAINER: $(inputs.container_name)
+      ARVBOX_DATA: $(inputs.arvbox_data.path)
+  InitialWorkDirRequirement:
+    listing:
+      - entryname: cluster_config.yml.override
+        entry: >-
+          ${
+          var remoteClusters = {};
+          for (var i = 0; i < inputs.cluster_ids.length; i++) {
+            remoteClusters[inputs.cluster_ids[i]] = {
+              "Host": inputs.cluster_hosts[i],
+              "Proxy": true,
+              "Insecure": true
+            };
+          }
+          var r = {"Clusters": {}};
+          r["Clusters"][inputs.this_cluster_id] = {"RemoteClusters": remoteClusters};
+          return JSON.stringify(r);
+          }
+      - entryname: application.yml.override
+        entry: >-
+          ${
+          var remoteClusters = {};
+          for (var i = 0; i < inputs.cluster_ids.length; i++) {
+            remoteClusters[inputs.cluster_ids[i]] = inputs.cluster_hosts[i];
+          }
+          return JSON.stringify({"development": {"remote_hosts": remoteClusters}});
+          }
+  cwltool:LoadListingRequirement:
+    loadListing: no_listing
+  ShellCommandRequirement: {}
+  InlineJavascriptRequirement: {}
+  cwltool:InplaceUpdateRequirement:
+    inplaceUpdate: true
+arguments:
+  - shellQuote: false
+    valueFrom: |
+      docker cp cluster_config.yml.override $(inputs.container_name):/var/lib/arvados
+      docker cp application.yml.override $(inputs.container_name):/usr/src/arvados/services/api/config
+      arvbox sv restart api
+      arvbox sv restart controller
+      arvbox sv restart keepstore0
+      arvbox sv restart keepstore1
diff --git a/sdk/cwl/tests/federation/arvbox/mkdir.cwl b/sdk/cwl/tests/federation/arvbox/mkdir.cwl
new file mode 100644 (file)
index 0000000..727d491
--- /dev/null
@@ -0,0 +1,47 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
+inputs:
+  containers:
+    type:
+      type: array
+      items: string
+      inputBinding:
+        position: 3
+        valueFrom: |
+          ${
+          return "base/"+self;
+          }
+  arvbox_base: Directory
+outputs:
+  arvbox_data:
+    type: Directory[]
+    outputBinding:
+      glob: |
+        ${
+        var r = [];
+        for (var i = 0; i < inputs.containers.length; i++) {
+          r.push("base/"+inputs.containers[i]);
+        }
+        return r;
+        }
+requirements:
+  InitialWorkDirRequirement:
+    listing:
+      - entry: $(inputs.arvbox_base)
+        entryname: base
+        writable: true
+  cwltool:LoadListingRequirement:
+    loadListing: no_listing
+  InlineJavascriptRequirement: {}
+  cwltool:InplaceUpdateRequirement:
+    inplaceUpdate: true
+arguments:
+  - mkdir
+  - "-p"
diff --git a/sdk/cwl/tests/federation/arvbox/setup-user.cwl b/sdk/cwl/tests/federation/arvbox/setup-user.cwl
new file mode 100644 (file)
index 0000000..0fddc1b
--- /dev/null
@@ -0,0 +1,34 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
+requirements:
+  EnvVarRequirement:
+    envDef:
+      ARVADOS_API_HOST: $(inputs.container_host)
+      ARVADOS_API_TOKEN: $(inputs.superuser_token)
+      ARVADOS_API_HOST_INSECURE: "true"
+  cwltool:LoadListingRequirement:
+    loadListing: no_listing
+  InlineJavascriptRequirement: {}
+  cwltool:InplaceUpdateRequirement:
+    inplaceUpdate: true
+  DockerRequirement:
+    dockerPull: arvados/jobs
+inputs:
+  container_host: string
+  superuser_token: string
+  make_user_script:
+    type: File
+    default:
+      class: File
+      location: setup_user.py
+outputs:
+  test_user_uuid: string
+  test_user_token: string
+arguments: [python2, $(inputs.make_user_script)]
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/arvbox/setup_user.py b/sdk/cwl/tests/federation/arvbox/setup_user.py
new file mode 100644 (file)
index 0000000..a456976
--- /dev/null
@@ -0,0 +1,40 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import arvados.errors
+import time
+import json
+
+while True:
+    try:
+        api = arvados.api()
+        break
+    except arvados.errors.ApiError:
+        time.sleep(2)
+
+existing = api.users().list(filters=[["email", "=", "test@example.com"],
+                                     ["is_active", "=", True]], limit=1).execute()
+if existing["items"]:
+    u = existing["items"][0]
+else:
+    u = api.users().create(body={
+        'first_name': 'Test',
+        'last_name': 'User',
+        'email': 'test@example.com',
+        'is_admin': False
+    }).execute()
+    api.users().activate(uuid=u["uuid"]).execute()
+
+tok = api.api_client_authorizations().create(body={
+    "api_client_authorization": {
+        "owner_uuid": u["uuid"]
+    }
+}).execute()
+
+with open("cwl.output.json", "w") as f:
+    json.dump({
+        "test_user_uuid": u["uuid"],
+        "test_user_token": "v2/%s/%s" % (tok["uuid"], tok["api_token"])
+    }, f)
diff --git a/sdk/cwl/tests/federation/arvbox/start.cwl b/sdk/cwl/tests/federation/arvbox/start.cwl
new file mode 100644 (file)
index 0000000..f69775a
--- /dev/null
@@ -0,0 +1,72 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
+inputs:
+  container_name: string
+  arvbox_data: Directory
+outputs:
+  cluster_id:
+    type: string
+    outputBinding:
+      glob: status.txt
+      loadContents: true
+      outputEval: |
+        ${
+        var sp = self[0].contents.split("\n");
+        for (var i = 0; i < sp.length; i++) {
+          if (sp[i].startsWith("Cluster id: ")) {
+            return sp[i].substr(12);
+          }
+        }
+        }
+  container_host:
+    type: string
+    outputBinding:
+      glob: status.txt
+      loadContents: true
+      outputEval: |
+        ${
+        var sp = self[0].contents.split("\n");
+        for (var i = 0; i < sp.length; i++) {
+          if (sp[i].startsWith("Container IP: ")) {
+            return sp[i].substr(14)+":8000";
+          }
+        }
+        }
+  superuser_token:
+    type: string
+    outputBinding:
+      glob: superuser_token.txt
+      loadContents: true
+      outputEval: $(self[0].contents.trim())
+  arvbox_data_out:
+    type: Directory
+    outputBinding:
+      outputEval: $(inputs.arvbox_data)
+requirements:
+  EnvVarRequirement:
+    envDef:
+      ARVBOX_CONTAINER: $(inputs.container_name)
+      ARVBOX_DATA: $(inputs.arvbox_data.path)
+  ShellCommandRequirement: {}
+  InitialWorkDirRequirement:
+    listing:
+      - entry: $(inputs.arvbox_data)
+        entryname: $(inputs.container_name)
+        writable: true
+  cwltool:InplaceUpdateRequirement:
+    inplaceUpdate: true
+  InlineJavascriptRequirement: {}
+arguments:
+  - shellQuote: false
+    valueFrom: |
+      set -e
+      arvbox start dev
+      arvbox status > status.txt
+      arvbox cat /var/lib/arvados/superuser_token > superuser_token.txt
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/arvbox/stop.cwl b/sdk/cwl/tests/federation/arvbox/stop.cwl
new file mode 100644 (file)
index 0000000..2ea4c0f
--- /dev/null
@@ -0,0 +1,17 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
+inputs:
+  container_name: string
+outputs: []
+requirements:
+  EnvVarRequirement:
+    envDef:
+      ARVBOX_CONTAINER: $(inputs.container_name)
+arguments: [arvbox, stop]
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/cases/base-case.cwl b/sdk/cwl/tests/federation/cases/base-case.cwl
new file mode 100644 (file)
index 0000000..4ab3b20
--- /dev/null
@@ -0,0 +1,31 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:base-case
+inputs:
+  inp:
+    type: File
+    inputBinding: {}
+  runOnCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: runOnCluster
+    out: [hash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: md5sum.cwl
diff --git a/sdk/cwl/tests/federation/cases/cat.cwl b/sdk/cwl/tests/federation/cases/cat.cwl
new file mode 100644 (file)
index 0000000..17132fe
--- /dev/null
@@ -0,0 +1,14 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+inputs:
+  inp:
+    type: File[]
+    inputBinding: {}
+outputs:
+  joined: stdout
+stdout: joined.txt
+baseCommand: cat
diff --git a/sdk/cwl/tests/federation/cases/hint-on-tool.cwl b/sdk/cwl/tests/federation/cases/hint-on-tool.cwl
new file mode 100644 (file)
index 0000000..93e6d2c
--- /dev/null
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:hint-on-tool
+inputs:
+  inp:
+    type: File
+    inputBinding: {}
+  runOnCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: runOnCluster
+    out: [hash]
+    run: md5sum-tool-hint.cwl
diff --git a/sdk/cwl/tests/federation/cases/hint-on-wf.cwl b/sdk/cwl/tests/federation/cases/hint-on-wf.cwl
new file mode 100644 (file)
index 0000000..4323659
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:hint-on-wf
+hints:
+  arv:ClusterTarget:
+    cluster_id: $(inputs.runOnCluster)
+inputs:
+  inp:
+    type: File
+    inputBinding: {}
+  runOnCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+    out: [hash]
+    run: md5sum.cwl
diff --git a/sdk/cwl/tests/federation/cases/md5sum-tool-hint.cwl b/sdk/cwl/tests/federation/cases/md5sum-tool-hint.cwl
new file mode 100644 (file)
index 0000000..726c33b
--- /dev/null
@@ -0,0 +1,24 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+hints:
+  arv:ClusterTarget:
+    cluster_id: $(inputs.runOnCluster)
+inputs:
+  inp: File
+  runOnCluster: string
+outputs:
+  hash:
+    type: File
+    outputBinding:
+      glob: out.txt
+stdin: $(inputs.inp.path)
+stdout: out.txt
+arguments: ["md5sum", "-"]
diff --git a/sdk/cwl/tests/federation/cases/md5sum.cwl b/sdk/cwl/tests/federation/cases/md5sum.cwl
new file mode 100644 (file)
index 0000000..af11999
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+inputs:
+  inp:
+    type: File
+outputs:
+  hash:
+    type: File
+    outputBinding:
+      glob: out.txt
+stdin: $(inputs.inp.path)
+stdout: out.txt
+arguments: ["md5sum", "-"]
diff --git a/sdk/cwl/tests/federation/cases/remote-case.cwl b/sdk/cwl/tests/federation/cases/remote-case.cwl
new file mode 100644 (file)
index 0000000..6683062
--- /dev/null
@@ -0,0 +1,31 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:remote-case
+inputs:
+  inp:
+    type: File
+    inputBinding: {}
+  runOnCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: runOnCluster
+    out: [hash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: md5sum.cwl
diff --git a/sdk/cwl/tests/federation/cases/rev-input-to-output.cwl b/sdk/cwl/tests/federation/cases/rev-input-to-output.cwl
new file mode 100644 (file)
index 0000000..0c247a8
--- /dev/null
@@ -0,0 +1,27 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  ShellCommandRequirement: {}
+inputs:
+  inp:
+    type: File
+outputs:
+  original:
+    type: File
+    outputBinding:
+      glob: $(inputs.inp.basename)
+  revhash:
+    type: stdout
+stdout: rev-$(inputs.inp.basename)
+arguments:
+  - shellQuote: false
+    valueFrom: |
+      ln -s $(inputs.inp.path) $(inputs.inp.basename) &&
+      rev $(inputs.inp.basename)
diff --git a/sdk/cwl/tests/federation/cases/rev.cwl b/sdk/cwl/tests/federation/cases/rev.cwl
new file mode 100644 (file)
index 0000000..8bbc565
--- /dev/null
@@ -0,0 +1,20 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+inputs:
+  inp:
+    type: File
+outputs:
+  revhash:
+    type: File
+    outputBinding:
+      glob: out.txt
+stdout: out.txt
+arguments: [rev, $(inputs.inp)]
diff --git a/sdk/cwl/tests/federation/cases/runner-home-step-remote.cwl b/sdk/cwl/tests/federation/cases/runner-home-step-remote.cwl
new file mode 100644 (file)
index 0000000..182ca1e
--- /dev/null
@@ -0,0 +1,29 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:runner-home-step-remote
+inputs:
+  inp: File
+  runOnCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: runOnCluster
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    out: [hash]
+    run: md5sum.cwl
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/cases/runner-remote-step-home.cwl b/sdk/cwl/tests/federation/cases/runner-remote-step-home.cwl
new file mode 100644 (file)
index 0000000..963c84f
--- /dev/null
@@ -0,0 +1,29 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:runner-remote-step-home
+inputs:
+  inp: File
+  runOnCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: runOnCluster
+    out: [hash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: md5sum.cwl
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/cases/scatter-gather.cwl b/sdk/cwl/tests/federation/cases/scatter-gather.cwl
new file mode 100644 (file)
index 0000000..07403ed
--- /dev/null
@@ -0,0 +1,37 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:scatter-gather
+  ScatterFeatureRequirement: {}
+inputs:
+  shards: File[]
+  clusters: string[]
+outputs:
+  joined:
+    type: File
+    outputSource: cat/joined
+steps:
+  md5sum:
+    in:
+      inp: shards
+      runOnCluster: clusters
+    scatter: [inp, runOnCluster]
+    scatterMethod: dotproduct
+    out: [hash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: md5sum.cwl
+  cat:
+    in:
+      inp: md5sum/hash
+    out: [joined]
+    run: cat.cwl
diff --git a/sdk/cwl/tests/federation/cases/threestep-remote.cwl b/sdk/cwl/tests/federation/cases/threestep-remote.cwl
new file mode 100644 (file)
index 0000000..8dffc18
--- /dev/null
@@ -0,0 +1,50 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:threestep-remote
+  ScatterFeatureRequirement: {}
+inputs:
+  inp: File
+  clusterA: string
+  clusterB: string
+  clusterC: string
+outputs:
+  revhash:
+    type: File
+    outputSource: revC/revhash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: clusterA
+    out: [hash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: md5sum.cwl
+  revB:
+    in:
+      inp: md5sum/hash
+      runOnCluster: clusterB
+    out: [revhash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: rev-input-to-output.cwl
+  revC:
+    in:
+      inp: revB/revhash
+      runOnCluster: clusterC
+    out: [revhash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: rev-input-to-output.cwl
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/cases/twostep-both-remote.cwl b/sdk/cwl/tests/federation/cases/twostep-both-remote.cwl
new file mode 100644 (file)
index 0000000..b924c54
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:twostep-both-remote
+inputs:
+  inp:
+    type: File
+    inputBinding: {}
+  md5sumCluster: string
+  revCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: md5sumCluster
+    out: [hash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: md5sum.cwl
+  rev:
+    in:
+      inp: md5sum/hash
+      runOnCluster: revCluster
+    out: [revhash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: rev.cwl
diff --git a/sdk/cwl/tests/federation/cases/twostep-home-to-remote.cwl b/sdk/cwl/tests/federation/cases/twostep-home-to-remote.cwl
new file mode 100644 (file)
index 0000000..c74c247
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:twostep-home-to-remote
+inputs:
+  inp:
+    type: File
+    inputBinding: {}
+  md5sumCluster: string
+  revCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: md5sumCluster
+    out: [hash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: md5sum.cwl
+  rev:
+    in:
+      inp: md5sum/hash
+      runOnCluster: revCluster
+    out: [revhash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: rev.cwl
diff --git a/sdk/cwl/tests/federation/cases/twostep-remote-copy-to-home.cwl b/sdk/cwl/tests/federation/cases/twostep-remote-copy-to-home.cwl
new file mode 100644 (file)
index 0000000..3722c99
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:twostep-remote-copy-to-home
+inputs:
+  inp:
+    type: File
+    inputBinding: {}
+  md5sumCluster: string
+  revCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: md5sumCluster
+    out: [hash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: md5sum.cwl
+  rev:
+    in:
+      inp: md5sum/hash
+      runOnCluster: revCluster
+    out: [revhash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: rev-input-to-output.cwl
diff --git a/sdk/cwl/tests/federation/cases/twostep-remote-to-home.cwl b/sdk/cwl/tests/federation/cases/twostep-remote-to-home.cwl
new file mode 100644 (file)
index 0000000..e528914
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  InlineJavascriptRequirement: {}
+  DockerRequirement:
+    dockerPull: arvados/fed-test:twostep-remote-to-home
+inputs:
+  inp:
+    type: File
+    inputBinding: {}
+  md5sumCluster: string
+  revCluster: string
+outputs:
+  hash:
+    type: File
+    outputSource: md5sum/hash
+steps:
+  md5sum:
+    in:
+      inp: inp
+      runOnCluster: md5sumCluster
+    out: [hash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: md5sum.cwl
+  rev:
+    in:
+      inp: md5sum/hash
+      runOnCluster: revCluster
+    out: [revhash]
+    hints:
+      arv:ClusterTarget:
+        cluster_id: $(inputs.runOnCluster)
+    run: rev.cwl
diff --git a/sdk/cwl/tests/federation/data/base-case-input.txt b/sdk/cwl/tests/federation/data/base-case-input.txt
new file mode 100644 (file)
index 0000000..761b840
--- /dev/null
@@ -0,0 +1,16 @@
+Call me base-case. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/hint-on-tool.txt b/sdk/cwl/tests/federation/data/hint-on-tool.txt
new file mode 100644 (file)
index 0000000..c396125
--- /dev/null
@@ -0,0 +1,16 @@
+Call me hint-on-tool. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/hint-on-wf.txt b/sdk/cwl/tests/federation/data/hint-on-wf.txt
new file mode 100644 (file)
index 0000000..f4aa872
--- /dev/null
@@ -0,0 +1,16 @@
+Call me hint-on-wf. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/remote-case-input.txt b/sdk/cwl/tests/federation/data/remote-case-input.txt
new file mode 100644 (file)
index 0000000..21e87fb
--- /dev/null
@@ -0,0 +1,16 @@
+Call me remote-case. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/runner-home-step-remote-input.txt b/sdk/cwl/tests/federation/data/runner-home-step-remote-input.txt
new file mode 100644 (file)
index 0000000..91ab77d
--- /dev/null
@@ -0,0 +1,16 @@
+Call me runner-home-step-remote. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/runner-remote-step-home-input.txt b/sdk/cwl/tests/federation/data/runner-remote-step-home-input.txt
new file mode 100644 (file)
index 0000000..e5673b8
--- /dev/null
@@ -0,0 +1,16 @@
+Call me runner-remote-step-home. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/scatter-gather-s1.txt b/sdk/cwl/tests/federation/data/scatter-gather-s1.txt
new file mode 100644 (file)
index 0000000..cc732e3
--- /dev/null
@@ -0,0 +1,16 @@
+Call me scatter-gather-s1. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/scatter-gather-s2.txt b/sdk/cwl/tests/federation/data/scatter-gather-s2.txt
new file mode 100644 (file)
index 0000000..3b57ee1
--- /dev/null
@@ -0,0 +1,16 @@
+Call me scatter-gather-s2. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/scatter-gather-s3.txt b/sdk/cwl/tests/federation/data/scatter-gather-s3.txt
new file mode 100644 (file)
index 0000000..06f77d2
--- /dev/null
@@ -0,0 +1,16 @@
+Call me scatter-gather-s3. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/threestep-remote.txt b/sdk/cwl/tests/federation/data/threestep-remote.txt
new file mode 100644 (file)
index 0000000..39dd99b
--- /dev/null
@@ -0,0 +1,16 @@
+Call me threestep-remote. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/twostep-both-remote.txt b/sdk/cwl/tests/federation/data/twostep-both-remote.txt
new file mode 100644 (file)
index 0000000..6218bb5
--- /dev/null
@@ -0,0 +1,16 @@
+Call me twostep-both-remote. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/twostep-home-to-remote.txt b/sdk/cwl/tests/federation/data/twostep-home-to-remote.txt
new file mode 100644 (file)
index 0000000..6430ad5
--- /dev/null
@@ -0,0 +1,16 @@
+Call me twostep-home-to-remote. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/twostep-remote-copy-to-home.txt b/sdk/cwl/tests/federation/data/twostep-remote-copy-to-home.txt
new file mode 100644 (file)
index 0000000..c0f72ef
--- /dev/null
@@ -0,0 +1,16 @@
+Call me twostep-remote-copy-to-home. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/data/twostep-remote-to-home.txt b/sdk/cwl/tests/federation/data/twostep-remote-to-home.txt
new file mode 100644 (file)
index 0000000..2318025
--- /dev/null
@@ -0,0 +1,16 @@
+Call me twostep-remote-to-home. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
diff --git a/sdk/cwl/tests/federation/framework/check-exist.cwl b/sdk/cwl/tests/federation/framework/check-exist.cwl
new file mode 100644 (file)
index 0000000..ebb0fb2
--- /dev/null
@@ -0,0 +1,42 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+  InitialWorkDirRequirement:
+    listing:
+      - entryname: config.json
+        entry: |-
+          ${
+          return JSON.stringify({
+            check_collections: inputs.check_collections
+          });
+          }
+  EnvVarRequirement:
+    envDef:
+      ARVADOS_API_HOST: $(inputs.arvados_api_host)
+      ARVADOS_API_TOKEN: $(inputs.arvados_api_token)
+      ARVADOS_API_HOST_INSECURE: $(""+inputs.arvado_api_host_insecure)
+  InlineJavascriptRequirement: {}
+inputs:
+  arvados_api_token: string
+  arvado_api_host_insecure: boolean
+  arvados_api_host: string
+  check_collections: string[]
+  preparescript:
+    type: File
+    default:
+      class: File
+      location: check_exist.py
+    inputBinding:
+      position: 1
+outputs:
+  success:
+    type: boolean
+    outputBinding:
+      glob: success
+      loadContents: true
+      outputEval: $(self[0].contents=="true")
+baseCommand: python2
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/framework/check_exist.py b/sdk/cwl/tests/federation/framework/check_exist.py
new file mode 100644 (file)
index 0000000..b333893
--- /dev/null
@@ -0,0 +1,25 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import json
+
+api = arvados.api()
+
+with open("config.json") as f:
+    config = json.load(f)
+
+success = True
+for c in config["check_collections"]:
+    try:
+        api.collections().get(uuid=c).execute()
+    except Exception as e:
+        print("Checking for %s got exception %s" % (c, e))
+        success = False
+
+with open("success", "w") as f:
+    if success:
+        f.write("true")
+    else:
+        f.write("false")
diff --git a/sdk/cwl/tests/federation/framework/dockerbuild.cwl b/sdk/cwl/tests/federation/framework/dockerbuild.cwl
new file mode 100644 (file)
index 0000000..d00b3e2
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+inputs:
+  testcase: string
+outputs:
+  imagename:
+    type: string
+    outputBinding:
+      outputEval: $(inputs.testcase)
+requirements:
+  InitialWorkDirRequirement:
+    listing:
+      - entryname: Dockerfile
+        entry: |-
+          FROM debian@sha256:0a5fcee6f52d5170f557ee2447d7a10a5bdcf715dd7f0250be0b678c556a501b
+          LABEL org.arvados.testcase="$(inputs.testcase)"
+arguments: [docker, build, -t, $(inputs.testcase), "."]
diff --git a/sdk/cwl/tests/federation/framework/prepare.cwl b/sdk/cwl/tests/federation/framework/prepare.cwl
new file mode 100644 (file)
index 0000000..03f792c
--- /dev/null
@@ -0,0 +1,48 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+  InitialWorkDirRequirement:
+    listing:
+      - entryname: input.json
+        entry: $(JSON.stringify(inputs.obj))
+      - entryname: config.json
+        entry: |-
+          ${
+          return JSON.stringify({
+            arvados_cluster_ids: inputs.arvados_cluster_ids,
+            scrub_images: [inputs.scrub_image],
+            scrub_collections: inputs.scrub_collections
+          });
+          }
+  EnvVarRequirement:
+    envDef:
+      ARVADOS_API_HOST: $(inputs.arvados_api_host)
+      ARVADOS_API_TOKEN: $(inputs.arvados_api_token)
+      ARVADOS_API_HOST_INSECURE: $(""+inputs.arvado_api_host_insecure)
+  InlineJavascriptRequirement: {}
+inputs:
+  arvados_api_token: string
+  arvado_api_host_insecure: boolean
+  arvados_api_host: string
+  arvados_cluster_ids: string[]
+  wf: File
+  obj: Any
+  scrub_image: string
+  scrub_collections: string[]
+  preparescript:
+    type: File
+    default:
+      class: File
+      location: prepare.py
+    inputBinding:
+      position: 1
+outputs:
+  done:
+    type: boolean
+    outputBinding:
+      outputEval: $(true)
+baseCommand: python2
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/framework/prepare.py b/sdk/cwl/tests/federation/framework/prepare.py
new file mode 100644 (file)
index 0000000..6fe9081
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import json
+
+api = arvados.api()
+
+with open("config.json") as f:
+    config = json.load(f)
+
+scrub_collections = set(config["scrub_collections"])
+
+for cluster_id in config["arvados_cluster_ids"]:
+    images = []
+    for scrub_image in config["scrub_images"]:
+        sp = scrub_image.split(":")
+        image_name = sp[0]
+        image_tag = sp[1] if len(sp) > 1 else "latest"
+        images.append('{}:{}'.format(image_name, image_tag))
+
+    search_links = api.links().list(
+        filters=[['link_class', '=', 'docker_image_repo+tag'],
+                 ['name', 'in', images]],
+        cluster_id=cluster_id).execute()
+
+    head_uuids = [lk["head_uuid"] for lk in search_links["items"]]
+    cols = api.collections().list(filters=[["uuid", "in", head_uuids]],
+                                  cluster_id=cluster_id).execute()
+    for c in cols["items"]:
+        scrub_collections.add(c["portable_data_hash"])
+    for lk in search_links["items"]:
+        api.links().delete(uuid=lk["uuid"]).execute()
+
+for cluster_id in config["arvados_cluster_ids"]:
+    matches = api.collections().list(filters=[["portable_data_hash", "in", list(scrub_collections)]],
+                                     select=["uuid", "portable_data_hash"], cluster_id=cluster_id).execute()
+    for m in matches["items"]:
+        api.collections().delete(uuid=m["uuid"]).execute()
+        print("Scrubbed %s (%s)" % (m["uuid"], m["portable_data_hash"]))
diff --git a/sdk/cwl/tests/federation/framework/run-acr.cwl b/sdk/cwl/tests/federation/framework/run-acr.cwl
new file mode 100644 (file)
index 0000000..5c8971b
--- /dev/null
@@ -0,0 +1,56 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+inputs:
+  acr:
+    type: string?
+    default: arvados-cwl-runner
+    inputBinding:
+      position: 1
+  arvados_api_host: string
+  arvados_api_token: string
+  arvado_api_host_insecure:
+    type: boolean
+    default: false
+  runner_cluster:
+    type: string?
+    inputBinding:
+      prefix: --submit-runner-cluster
+      position: 2
+  wf:
+    type: File
+    inputBinding:
+      position: 3
+  obj: Any
+requirements:
+  InitialWorkDirRequirement:
+    listing:
+      - entryname: input.json
+        entry: $(JSON.stringify(inputs.obj))
+  EnvVarRequirement:
+    envDef:
+      ARVADOS_API_HOST: $(inputs.arvados_api_host)
+      ARVADOS_API_TOKEN: $(inputs.arvados_api_token)
+      ARVADOS_API_HOST_INSECURE: $(""+inputs.arvado_api_host_insecure)
+  InlineJavascriptRequirement: {}
+outputs:
+  out:
+    type: Any
+    outputBinding:
+      glob: output.json
+      loadContents: true
+      #outputEval: $(JSON.parse(self[0].contents))
+      outputEval: $(self[0].contents)
+stdout: output.json
+arguments:
+  - valueFrom: --disable-reuse
+    position: 2
+  - valueFrom: --always-submit-runner
+    position: 2
+  - valueFrom: --api=containers
+    position: 2
+  - valueFrom: input.json
+    position: 4
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/framework/testcase.cwl b/sdk/cwl/tests/federation/framework/testcase.cwl
new file mode 100644 (file)
index 0000000..89aa3f9
--- /dev/null
@@ -0,0 +1,77 @@
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
+hints:
+  cwltool:Secrets:
+    secrets: [arvados_api_token]
+requirements:
+  StepInputExpressionRequirement: {}
+  InlineJavascriptRequirement: {}
+  SubworkflowFeatureRequirement: {}
+inputs:
+  arvados_api_token: string
+  arvado_api_host_insecure:
+    type: boolean
+    default: false
+  arvados_api_hosts: string[]
+  arvados_cluster_ids: string[]
+  acr: string?
+  wf: File
+  obj: Any
+  scrub_image: string
+  scrub_collections: string[]
+  runner_cluster: string?
+outputs:
+  out:
+    type: Any
+    outputSource: run-acr/out
+  success:
+    type: boolean
+    outputSource: check-result/success
+steps:
+  dockerbuild:
+    in:
+      testcase: scrub_image
+    out: [imagename]
+    run: dockerbuild.cwl
+  prepare:
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_host: {source: arvados_api_hosts, valueFrom: "$(self[0])"}
+      arvados_cluster_ids: arvados_cluster_ids
+      wf: wf
+      obj: obj
+      scrub_image: scrub_image
+      scrub_collections: scrub_collections
+    out: [done]
+    run: prepare.cwl
+  run-acr:
+    in:
+      prepare: prepare/done
+      image-ready: dockerbuild/imagename
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_host: {source: arvados_api_hosts, valueFrom: "$(self[0])"}
+      runner_cluster: runner_cluster
+      acr: acr
+      wf: wf
+      obj: obj
+    out: [out]
+    run: run-acr.cwl
+  check-result:
+    in:
+      acr-done: run-acr/out
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_host: {source: arvados_api_hosts, valueFrom: "$(self[0])"}
+      check_collections: scrub_collections
+    out: [success]
+    run: check-exist.cwl
\ No newline at end of file
diff --git a/sdk/cwl/tests/federation/main.cwl b/sdk/cwl/tests/federation/main.cwl
new file mode 100755 (executable)
index 0000000..a00e6d3
--- /dev/null
@@ -0,0 +1,545 @@
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
+hints:
+  cwltool:Secrets:
+    secrets: [arvados_api_token]
+requirements:
+  StepInputExpressionRequirement: {}
+  InlineJavascriptRequirement: {}
+  SubworkflowFeatureRequirement: {}
+inputs:
+  arvados_api_token: string
+  arvado_api_host_insecure:
+    type: boolean
+    default: false
+  arvados_api_hosts: string[]
+  arvados_cluster_ids: string[]
+  acr: string?
+  testcases:
+    type: string[]
+    default:
+      - base-case
+      - runner-home-step-remote
+      - runner-remote-step-home
+outputs:
+  base-case-success:
+    type: Any
+    outputSource: base-case/success
+  runner-home-step-remote-success:
+    type: Any
+    outputSource: runner-home-step-remote/success
+  runner-remote-step-home-success:
+    type: Any
+    outputSource: runner-remote-step-home/success
+  remote-case-success:
+    type: Any
+    outputSource: remote-case/success
+  twostep-home-to-remote-success:
+    type: Any
+    outputSource: twostep-home-to-remote/success
+  twostep-remote-to-home-success:
+    type: Any
+    outputSource: twostep-remote-to-home/success
+  twostep-both-remote-success:
+    type: Any
+    outputSource: twostep-both-remote/success
+  twostep-remote-copy-to-home-success:
+    type: Any
+    outputSource: twostep-remote-copy-to-home/success
+  scatter-gather-success:
+    type: Any
+    outputSource: scatter-gather/success
+  threestep-remote-success:
+    type: Any
+    outputSource: threestep-remote/success
+  hint-on-wf-success:
+    type: Any
+    outputSource: hint-on-wf/success
+  hint-on-tool-success:
+    type: Any
+    outputSource: hint-on-tool/success
+
+steps:
+  base-case:
+    doc: |
+      Base case (no federation), single step workflow with both the
+      runner and step on the same cluster.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/base-case.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/base-case-input.txt
+        valueFrom: |-
+          ${
+          self["runOnCluster"] = inputs.arvados_cluster_ids[0];
+          return self;
+          }
+      scrub_image: {default: "arvados/fed-test:base-case"}
+      scrub_collections:
+        default:
+          - 031a4ced0aa99de90fb630568afc6e9b+67   # input collection
+          - eb93a6718eb1a1a8ee9f66ee7d683472+51   # md5sum output collection
+          - f654d4048612135f4a5e7707ec0fcf3e+112  # final output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  runner-home-step-remote:
+    doc: |
+      Single step workflow with the runner on the home cluster and the
+      step on the remote cluster.  ClusterTarget hint is on the workflow step.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/runner-home-step-remote.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/runner-home-step-remote-input.txt
+        valueFrom: |-
+          ${
+          self["runOnCluster"] = inputs.arvados_cluster_ids[1];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+      scrub_image: {default: "arvados/fed-test:runner-home-step-remote"}
+      scrub_collections:
+        default:
+          - 3bc373e38751fe13dcbd62778d583242+81   # input collection
+          - 428e6d91e41a3af3ae287b453949e7fd+51   # md5sum output collection
+          - a4b0ddd866525655e8480f83a1ca83c6+112  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  runner-remote-step-home:
+    doc: |
+      Single step workflow with the runner on the remote cluster and the
+      step on the home cluster.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/runner-remote-step-home.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/runner-remote-step-home-input.txt
+        valueFrom: |-
+          ${
+          self["runOnCluster"] = inputs.arvados_cluster_ids[0];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[1])" }
+      scrub_image: {default: "arvados/fed-test:runner-remote-step-home"}
+      scrub_collections:
+        default:
+          - 25fe10d8e8530329a738de69d9bc8ab5+81   # input collection
+          - 7f052d1a04b851b6f73fba77c7802e1d+51   # md5sum output collection
+          - ecb639201f454b6493757f5117f540df+112  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  remote-case:
+    doc: |
+      Single step workflow with both the runner and the step on the
+      remote cluster.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/remote-case.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/remote-case-input.txt
+        valueFrom: |-
+          ${
+          self["runOnCluster"] = inputs.arvados_cluster_ids[1];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[1])" }
+      scrub_image: {default: "arvados/fed-test:remote-case"}
+      scrub_collections:
+        default:
+          - fccd49fdef8e452295f718208abafd88+69   # input collection
+          - 58c0e8ea6b148134ef8577ee11307eec+51   # md5sum output collection
+          - 1fd679c5ab64c123b9764024dbf560f0+112  # final output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  twostep-home-to-remote:
+    doc: |
+      Two step workflow.  The runner is on the home cluster, the first
+      step is on the home cluster, the second step is on the remote
+      cluster.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/twostep-home-to-remote.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+            - class: File
+              location: cases/rev.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/twostep-home-to-remote.txt
+        valueFrom: |-
+          ${
+          self["md5sumCluster"] = inputs.arvados_cluster_ids[0];
+          self["revCluster"] = inputs.arvados_cluster_ids[1];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+      scrub_image: {default: "arvados/fed-test:twostep-home-to-remote"}
+      scrub_collections:
+        default:
+          - 268a54947fb75115cfe05bb54cc62c30+74   # input collection
+          - 400f03b8c5d2dc3dcb513a21b626ef88+51   # md5sum output collection
+          - 3738166916ca5f6f6ad12bf7e06b4a21+51   # rev output collection
+          - bc37c17a37aa25229e5de1339b27fbcc+112  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  twostep-remote-to-home:
+    doc: |
+      Two step workflow.  The runner is on the home cluster, the first
+      step is on the remote cluster, the second step is on the home
+      cluster.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/twostep-remote-to-home.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+            - class: File
+              location: cases/rev.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/twostep-remote-to-home.txt
+        valueFrom: |-
+          ${
+          self["md5sumCluster"] = inputs.arvados_cluster_ids[1];
+          self["revCluster"] = inputs.arvados_cluster_ids[0];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+      scrub_image: {default: "arvados/fed-test:twostep-remote-to-home"}
+      scrub_collections:
+        default:
+          - cce89b9f7b6e163978144051ce5f071a+74   # input collection
+          - 0c358c3af63644c6343766feff1b7238+51   # md5sum output collection
+          - 33fb7d512bf21f04847eca58cea46e74+51   # rev output collection
+          - 912e04aa3db04aba008cf5cd46c277b2+112  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  twostep-both-remote:
+    doc: |
+      Two step workflow.  The runner is on the home cluster, both steps are
+      on the remote cluster.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/twostep-both-remote.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+            - class: File
+              location: cases/rev.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/twostep-both-remote.txt
+        valueFrom: |-
+          ${
+          self["md5sumCluster"] = inputs.arvados_cluster_ids[1];
+          self["revCluster"] = inputs.arvados_cluster_ids[1];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+      scrub_image: {default: "arvados/fed-test:twostep-both-remote"}
+      scrub_collections:
+        default:
+          - 3c5e39939cf197d304ac1eac20841238+71   # input collection
+          - 3edb99aa607731593969cdab663d65b4+51   # md5sum output collection
+          - a91625b7139e60fe61a88cae42fbee13+51   # rev output collection
+          - ddfa58a81953dad08436d571615dd584+112  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  twostep-remote-copy-to-home:
+    doc: |
+      Two step workflow.  The runner is on the home cluster, the first
+      step is on the remote cluster, the second step is on the home
+      cluster, and propagates its input file directly from input to
+      output by symlinking the input file in the output directory.
+      Tests that crunch-run will copy blocks from remote to local
+      when preparing output collection.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/twostep-remote-copy-to-home.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+            - class: File
+              location: cases/rev-input-to-output.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/twostep-remote-copy-to-home.txt
+        valueFrom: |-
+          ${
+          self["md5sumCluster"] = inputs.arvados_cluster_ids[1];
+          self["revCluster"] = inputs.arvados_cluster_ids[0];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+      scrub_image: {default: "arvados/fed-test:twostep-remote-copy-to-home"}
+      scrub_collections:
+        default:
+          - 538887bc29a3098bf79abdb8536d17bd+79   # input collection
+          - 14da0e0d52d7ab2945427074b275e9ee+51   # md5sum output collection
+          - 2d3a4a840077390a0d7788f169eaba89+112  # rev output collection
+          - 2d3a4a840077390a0d7788f169eaba89+112  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  scatter-gather:
+    doc: ""
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/scatter-gather.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+            - class: File
+              location: cases/cat.cwl
+      obj:
+        default:
+          shards:
+            - class: File
+              location: data/scatter-gather-s1.txt
+            - class: File
+              location: data/scatter-gather-s2.txt
+            - class: File
+              location: data/scatter-gather-s3.txt
+        valueFrom: |-
+          ${
+          self["clusters"] = inputs.arvados_cluster_ids;
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+      scrub_image: {default: "arvados/fed-test:scatter-gather"}
+      scrub_collections:
+        default:
+          - 99cc18329bce1b4a5fe6c4cf60477668+209  # input collection
+          - 2e570e844e03c7027baad148642d726f+51   # s1 md5sum output collection
+          - 61c88ee7811d0b849b5c06376eb065a6+51   # s2 md5sum output collection
+          - 85aaf18d638045fe609e025d3a319b2a+51   # s3 md5sum output collection
+          - ec44bcba77e65128f1a8f843d881ede4+56   # cat output collection
+          - 89de265942800ae36549109969940363+117  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  threestep-remote:
+    doc: ""
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/threestep-remote.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+            - class: File
+              location: cases/rev-input-to-output.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/threestep-remote.txt
+        valueFrom: |-
+          ${
+          self["clusterA"] = inputs.arvados_cluster_ids[0];
+          self["clusterB"] = inputs.arvados_cluster_ids[1];
+          self["clusterC"] = inputs.arvados_cluster_ids[2];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+      scrub_image: {default: "arvados/fed-test:threestep-remote"}
+      scrub_collections:
+        default:
+          - 9fbf33e62876357fe134f619865cc5a5+68   # input collection
+          - 210c5f2a716f6689b04316acd4928c10+51   # md5sum output collection
+          - 3abea7506269d5ebf61fb17c78bbd2af+105  # revB output
+          - 9e1b3acb28949759ad07e4c9740bbaa5+113  # revC output
+          - 8c86dbec7de7948871b5e168ede417e1+120  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  hint-on-wf:
+    doc: |
+      Single step workflow with the runner on the home cluster and the
+      step on the remote cluster.  ClusterTarget hint is at the workflow level.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/hint-on-wf.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/hint-on-wf.txt
+        valueFrom: |-
+          ${
+          self["runOnCluster"] = inputs.arvados_cluster_ids[1];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+      scrub_image: {default: "arvados/fed-test:hint-on-wf"}
+      scrub_collections:
+        default:
+          - 862433f328041b2525c90b1dc3c462fd+62   # input collection
+          - 9a68b0b9720977faba8a28e75a4398b7+51   # md5sum output collection
+          - 6a601cddb36ee2f766783b1aa9ff8d66+112  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
+
+  hint-on-tool:
+    doc: |
+      Single step workflow with the runner on the home cluster and the
+      step on the remote cluster.  ClusterTarget hint is at the tool level.
+    in:
+      arvados_api_token: arvados_api_token
+      arvado_api_host_insecure: arvado_api_host_insecure
+      arvados_api_hosts: arvados_api_hosts
+      arvados_cluster_ids: arvados_cluster_ids
+      acr: acr
+      wf:
+        default:
+          class: File
+          location: cases/hint-on-tool.cwl
+          secondaryFiles:
+            - class: File
+              location: cases/md5sum-tool-hint.cwl
+      obj:
+        default:
+          inp:
+            class: File
+            location: data/hint-on-tool.txt
+        valueFrom: |-
+          ${
+          self["runOnCluster"] = inputs.arvados_cluster_ids[1];
+          return self;
+          }
+      runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+      scrub_image: {default: "arvados/fed-test:hint-on-tool"}
+      scrub_collections:
+        default:
+          - 6803004a4f8db9f8d1d54f6229851599+64   # input collection
+          - cacb0d56235564b5ff485c5b31215ab5+51   # md5sum output collection
+          - 2b50af43fdd84a9e906be2d54b92cddf+112  # runner output json
+    out: [out, success]
+    run: framework/testcase.cwl
index 46184325f187d18436b28bf6dfbcb162757c0654..c875c07851bfd93b0ec84ded6f744f06d9a3d4c8 100644 (file)
@@ -4,6 +4,7 @@
 
 import arvados_cwl
 import arvados_cwl.context
+import arvados_cwl.util
 from arvados_cwl.arvdocker import arv_docker_clear_cache
 import copy
 import arvados.config
@@ -132,7 +133,7 @@ class TestContainer(unittest.TestCase):
                         'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                         'output_path': '/var/spool/cwl',
                         'output_ttl': 0,
-                        'container_image': 'arvados/jobs',
+                        'container_image': '99999999999999999999999999999993+99',
                         'command': ['ls', '/var/spool/cwl'],
                         'cwd': '/var/spool/cwl',
                         'scheduling_parameters': {},
@@ -219,7 +220,7 @@ class TestContainer(unittest.TestCase):
             'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
             'output_path': '/var/spool/cwl',
             'output_ttl': 7200,
-            'container_image': 'arvados/jobs',
+            'container_image': '99999999999999999999999999999993+99',
             'command': ['ls'],
             'cwd': '/var/spool/cwl',
             'scheduling_parameters': {
@@ -351,7 +352,7 @@ class TestContainer(unittest.TestCase):
             'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
             'output_path': '/var/spool/cwl',
             'output_ttl': 0,
-            'container_image': 'arvados/jobs',
+            'container_image': '99999999999999999999999999999993+99',
             'command': ['ls'],
             'cwd': '/var/spool/cwl',
             'scheduling_parameters': {
@@ -439,7 +440,7 @@ class TestContainer(unittest.TestCase):
                     'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                     'output_path': '/var/spool/cwl',
                     'output_ttl': 0,
-                    'container_image': 'arvados/jobs',
+                    'container_image': '99999999999999999999999999999993+99',
                     'command': ['ls', '/var/spool/cwl'],
                     'cwd': '/var/spool/cwl',
                     'scheduling_parameters': {},
@@ -465,7 +466,10 @@ class TestContainer(unittest.TestCase):
 
         col().open.return_value = []
 
+        loadingContext, runtimeContext = self.helper(runner)
+
         arvjob = arvados_cwl.ArvadosContainer(runner,
+                                              runtimeContext,
                                               mock.MagicMock(),
                                               {},
                                               None,
@@ -496,7 +500,7 @@ class TestContainer(unittest.TestCase):
         arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
         runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
 
-    @mock.patch("arvados_cwl.get_current_container")
+    @mock.patch("arvados_cwl.util.get_current_container")
     @mock.patch("arvados.collection.CollectionReader")
     @mock.patch("arvados.collection.Collection")
     def test_child_failure(self, col, reader, gcc_mock):
@@ -507,11 +511,11 @@ class TestContainer(unittest.TestCase):
         # Set up runner with mocked runtime_status_update()
         self.assertFalse(gcc_mock.called)
         runtime_status_update = mock.MagicMock()
-        arvados_cwl.ArvCwlRunner.runtime_status_update = runtime_status_update
-        runner = arvados_cwl.ArvCwlRunner(api)
+        arvados_cwl.ArvCwlExecutor.runtime_status_update = runtime_status_update
+        runner = arvados_cwl.ArvCwlExecutor(api)
         self.assertEqual(runner.work_api, 'containers')
 
-        # Make sure ArvCwlRunner thinks it's running inside a container so it
+        # Make sure ArvCwlExecutor thinks it's running inside a container so it
         # adds the logging handler that will call runtime_status_update() mock
         gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
         self.assertTrue(gcc_mock.called)
@@ -536,7 +540,10 @@ class TestContainer(unittest.TestCase):
 
         col().open.return_value = []
 
+        loadingContext, runtimeContext = self.helper(runner)
+
         arvjob = arvados_cwl.ArvadosContainer(runner,
+                                              runtimeContext,
                                               mock.MagicMock(),
                                               {},
                                               None,
@@ -648,7 +655,7 @@ class TestContainer(unittest.TestCase):
                     'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                     'output_path': '/var/spool/cwl',
                     'output_ttl': 0,
-                    'container_image': 'arvados/jobs',
+                    'container_image': '99999999999999999999999999999994+99',
                     'command': ['ls', '/var/spool/cwl'],
                     'cwd': '/var/spool/cwl',
                     'scheduling_parameters': {},
@@ -741,7 +748,7 @@ class TestContainer(unittest.TestCase):
                     'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                     'output_path': '/var/spool/cwl',
                     'output_ttl': 0,
-                    'container_image': 'arvados/jobs',
+                    'container_image': '99999999999999999999999999999993+99',
                     'command': ['md5sum', 'example.conf'],
                     'cwd': '/var/spool/cwl',
                     'scheduling_parameters': {},
index 20efe1513981585b3c699f73d0dbba6994f7c682..2aaac0ae50699f5c012f36ba2f28eee1ccd281c4 100644 (file)
@@ -13,6 +13,7 @@ import StringIO
 
 import arvados
 import arvados_cwl
+import arvados_cwl.executor
 import cwltool.process
 from arvados.errors import ApiError
 from schema_salad.ref_resolver import Loader
@@ -373,7 +374,7 @@ class TestWorkflow(unittest.TestCase):
         api = mock.MagicMock()
         api._rootDesc = get_rootDesc()
 
-        runner = arvados_cwl.ArvCwlRunner(api)
+        runner = arvados_cwl.executor.ArvCwlExecutor(api)
         self.assertEqual(runner.work_api, 'jobs')
 
         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
@@ -455,7 +456,7 @@ class TestWorkflow(unittest.TestCase):
         api = mock.MagicMock()
         api._rootDesc = get_rootDesc()
 
-        runner = arvados_cwl.ArvCwlRunner(api)
+        runner = arvados_cwl.executor.ArvCwlExecutor(api)
         self.assertEqual(runner.work_api, 'jobs')
 
         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
@@ -517,5 +518,5 @@ class TestWorkflow(unittest.TestCase):
         api = mock.MagicMock()
         api._rootDesc = copy.deepcopy(get_rootDesc())
         del api._rootDesc.get('resources')['jobs']['methods']['create']
-        runner = arvados_cwl.ArvCwlRunner(api)
+        runner = arvados_cwl.executor.ArvCwlExecutor(api)
         self.assertEqual(runner.work_api, 'containers')
index 590c82d207d590784c677a5831721ce577c99554..baeb4145ee6dbc5ba4db326f88acd54ce04352f4 100644 (file)
@@ -12,6 +12,7 @@ import unittest
 
 import arvados
 import arvados_cwl
+import arvados_cwl.executor
 from .mock_discovery import get_rootDesc
 
 class TestMakeOutput(unittest.TestCase):
@@ -23,7 +24,7 @@ class TestMakeOutput(unittest.TestCase):
     @mock.patch("arvados.collection.CollectionReader")
     def test_make_output_collection(self, reader, col):
         keep_client = mock.MagicMock()
-        runner = arvados_cwl.ArvCwlRunner(self.api, keep_client=keep_client)
+        runner = arvados_cwl.executor.ArvCwlExecutor(self.api, keep_client=keep_client)
         runner.project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
 
         final = mock.MagicMock()
index eaa57114222233d6bcbd02ff2674c89f5169b168..fb3c257d93e1be9cac211defc97d3282100ccdbc 100644 (file)
@@ -14,6 +14,7 @@ import arvados
 import arvados.keep
 import arvados.collection
 import arvados_cwl
+import arvados_cwl.executor
 
 from cwltool.pathmapper import MapperEnt
 from .mock_discovery import get_rootDesc
@@ -34,7 +35,7 @@ class TestPathmap(unittest.TestCase):
     def test_keepref(self):
         """Test direct keep references."""
 
-        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+        arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
 
         p = ArvPathMapper(arvrunner, [{
             "class": "File",
@@ -49,7 +50,7 @@ class TestPathmap(unittest.TestCase):
     def test_upload(self, statfile, upl):
         """Test pathmapper uploading files."""
 
-        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+        arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
 
         def statfile_mock(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False):
             st = arvados.commands.run.UploadFile("", "tests/hw.py")
@@ -70,7 +71,7 @@ class TestPathmap(unittest.TestCase):
     @mock.patch("arvados.commands.run.statfile")
     def test_statfile(self, statfile, upl):
         """Test pathmapper handling ArvFile references."""
-        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+        arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
 
         # An ArvFile object returned from arvados.commands.run.statfile means the file is located on a
         # keep mount, so we can construct a direct reference directly without upload.
@@ -92,7 +93,7 @@ class TestPathmap(unittest.TestCase):
     @mock.patch("os.stat")
     def test_missing_file(self, stat):
         """Test pathmapper handling missing references."""
-        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+        arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
 
         stat.side_effect = OSError(2, "No such file or directory")
 
index f718a86b369f756be677d292f6b33c2d5261a975..92ea553ea8ec4854af1f3292a11fd37829099181 100644 (file)
@@ -15,6 +15,7 @@ import unittest
 import arvados
 import arvados.collection
 import arvados_cwl
+import arvados_cwl.executor
 import arvados_cwl.runner
 import arvados.keep
 
@@ -46,7 +47,16 @@ def stubs(func):
         keep_client2.put.side_effect = putstub
 
         stubs.keep_client = keep_client2
-        stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+        stubs.docker_images = {
+            "arvados/jobs:"+arvados_cwl.__version__: [("zzzzz-4zz18-zzzzzzzzzzzzzd3", "")],
+            "debian:8": [("zzzzz-4zz18-zzzzzzzzzzzzzd4", "")],
+            "arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", "")],
+            "arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", "")],
+        }
+        def kd(a, b, image_name=None, image_tag=None):
+            return stubs.docker_images.get("%s:%s" % (image_name, image_tag), [])
+        stubs.keepdocker.side_effect = kd
+
         stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
         stubs.fake_container_uuid = "zzzzz-dz642-zzzzzzzzzzzzzzz"
 
@@ -69,7 +79,7 @@ def stubs(func):
 
         def collection_createstub(created_collections, body, ensure_unique_name=None):
             mt = body["manifest_text"]
-            uuid = "zzzzz-4zz18-zzzzzzzzzzzzzz%d" % len(created_collections)
+            uuid = "zzzzz-4zz18-zzzzzzzzzzzzzx%d" % len(created_collections)
             pdh = "%s+%i" % (hashlib.md5(mt).hexdigest(), len(mt))
             created_collections[uuid] = {
                 "uuid": uuid,
@@ -93,6 +103,26 @@ def stubs(func):
                 "uuid": "",
                 "portable_data_hash": "99999999999999999999999999999994+99",
                 "manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
+            },
+            "zzzzz-4zz18-zzzzzzzzzzzzzd3": {
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd3",
+                "portable_data_hash": "999999999999999999999999999999d3+99",
+                "manifest_text": ""
+            },
+            "zzzzz-4zz18-zzzzzzzzzzzzzd4": {
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd4",
+                "portable_data_hash": "999999999999999999999999999999d4+99",
+                "manifest_text": ""
+            },
+            "zzzzz-4zz18-zzzzzzzzzzzzzd5": {
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd5",
+                "portable_data_hash": "999999999999999999999999999999d5+99",
+                "manifest_text": ""
+            },
+            "zzzzz-4zz18-zzzzzzzzzzzzzd6": {
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd6",
+                "portable_data_hash": "999999999999999999999999999999d6+99",
+                "manifest_text": ""
             }
         }
         stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
@@ -117,7 +147,7 @@ def stubs(func):
         }
         stubs.expect_job_spec = {
             'runtime_constraints': {
-                'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
+                'docker_image': '999999999999999999999999999999d3+99',
                 'min_ram_mb_per_node': 1024
             },
             'script_parameters': {
@@ -141,7 +171,7 @@ def stubs(func):
                     }],
                     'class': 'Directory'
                 },
-                'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main'
+                'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main'
             },
             'repository': 'arvados',
             'script_version': 'master',
@@ -155,7 +185,7 @@ def stubs(func):
             'owner_uuid': None,
             "components": {
                 "cwl-runner": {
-                    'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__, 'min_ram_mb_per_node': 1024},
+                    'runtime_constraints': {'docker_image': '999999999999999999999999999999d3+99', 'min_ram_mb_per_node': 1024},
                     'script_parameters': {
                         'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
                         'x': {"value": {
@@ -173,7 +203,7 @@ def stubs(func):
                                       'size': 0
                                   }
                               ]}},
-                        'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main',
+                        'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main',
                         'arv:debug': True,
                         'arv:enable_reuse': True,
                         'arv:on_error': 'continue'
@@ -247,7 +277,7 @@ def stubs(func):
                         '--enable-reuse', '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
-            'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+            'container_image': '999999999999999999999999999999d3+99',
             'output_path': '/var/spool/cwl',
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
@@ -277,10 +307,17 @@ def stubs(func):
 
 
 class TestSubmit(unittest.TestCase):
-    @mock.patch("arvados_cwl.runner.arv_docker_get_image")
+    @mock.patch("arvados_cwl.arvdocker.arv_docker_get_image")
     @mock.patch("time.sleep")
     @stubs
     def test_submit(self, stubs, tm, arvdock):
+        def get_image(api_client, dockerRequirement, pull_image, project_uuid):
+            if dockerRequirement["dockerPull"] == 'arvados/jobs:'+arvados_cwl.__version__:
+                return '999999999999999999999999999999d3+99'
+            elif dockerRequirement["dockerPull"] == "debian:8":
+                return '999999999999999999999999999999d4+99'
+        arvdock.side_effect = get_image
+
         capture_stdout = cStringIO.StringIO()
         exited = arvados_cwl.main(
             ["--submit", "--no-wait", "--api=jobs", "--debug",
@@ -303,13 +340,14 @@ class TestSubmit(unittest.TestCase):
             }), ensure_unique_name=False),
             mock.call(body=JsonDiffMatcher({
                 'manifest_text':
-                '. 61df2ed9ee3eb7dd9b799e5ca35305fa+1217 0:1217:workflow.cwl\n',
+                ". 68089141fbf7e020ac90a9d6a575bc8f+1312 0:1312:workflow.cwl\n",
                 'replication_desired': None,
                 'name': 'submit_wf.cwl',
             }), ensure_unique_name=True)        ])
 
         arvdock.assert_has_calls([
             mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8"}, True, None),
+            mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8", 'http://arvados.org/cwl#dockerCollectionPDH': '999999999999999999999999999999d4+99'}, True, None),
             mock.call(stubs.api, {'dockerPull': 'arvados/jobs:'+arvados_cwl.__version__}, True, None)
         ])
 
@@ -646,7 +684,7 @@ class TestSubmit(unittest.TestCase):
 
     @mock.patch("arvados_cwl.task_queue.TaskQueue")
     @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
-    @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+    @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection", return_value = (None, None))
     @stubs
     def test_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
         def set_final_output(job_order, output_callback, runtimeContext):
@@ -667,7 +705,7 @@ class TestSubmit(unittest.TestCase):
 
     @mock.patch("arvados_cwl.task_queue.TaskQueue")
     @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
-    @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+    @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection", return_value = (None, None))
     @stubs
     def test_default_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
         def set_final_output(job_order, output_callback, runtimeContext):
@@ -835,7 +873,7 @@ class TestSubmit(unittest.TestCase):
             }, 'state': 'Committed',
             'output_path': '/var/spool/cwl',
             'name': 'expect_arvworkflow.cwl#main',
-            'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+            'container_image': '999999999999999999999999999999d3+99',
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
@@ -934,7 +972,11 @@ class TestSubmit(unittest.TestCase):
                                         'id': '#submit_tool.cwl/x'}
                                 ],
                                 'requirements': [
-                                    {'dockerPull': 'debian:8', 'class': 'DockerRequirement'}
+                                    {
+                                        'dockerPull': 'debian:8',
+                                        'class': 'DockerRequirement',
+                                        "http://arvados.org/cwl#dockerCollectionPDH": "999999999999999999999999999999d4+99"
+                                    }
                                 ],
                                 'id': '#submit_tool.cwl',
                                 'outputs': [],
@@ -953,7 +995,7 @@ class TestSubmit(unittest.TestCase):
             }, 'state': 'Committed',
             'output_path': '/var/spool/cwl',
             'name': 'a test workflow',
-            'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+            'container_image': "999999999999999999999999999999d3+99",
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
@@ -1090,7 +1132,7 @@ class TestSubmit(unittest.TestCase):
         except:
             logging.exception("")
 
-        stubs.expect_pipeline_instance["components"]["cwl-runner"]["runtime_constraints"]["docker_image"] = "arvados/jobs:123"
+        stubs.expect_pipeline_instance["components"]["cwl-runner"]["runtime_constraints"]["docker_image"] = "999999999999999999999999999999d5+99"
 
         expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
         stubs.api.pipeline_instances().create.assert_called_with(
@@ -1110,7 +1152,7 @@ class TestSubmit(unittest.TestCase):
         except:
             logging.exception("")
 
-        stubs.expect_container_spec["container_image"] = "arvados/jobs:123"
+        stubs.expect_container_spec["container_image"] = "999999999999999999999999999999d5+99"
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
@@ -1174,11 +1216,15 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
+    def tearDown(self):
+        arvados_cwl.arvdocker.arv_docker_clear_cache()
 
     @mock.patch("arvados.commands.keepdocker.find_one_image_hash")
     @mock.patch("cwltool.docker.DockerCommandLineJob.get_image")
     @mock.patch("arvados.api")
     def test_arvados_jobs_image(self, api, get_image, find_one_image_hash):
+        arvados_cwl.arvdocker.arv_docker_clear_cache()
+
         arvrunner = mock.MagicMock()
         arvrunner.project_uuid = ""
         api.return_value = mock.MagicMock()
@@ -1204,9 +1250,12 @@ class TestSubmit(unittest.TestCase):
                                                                               "properties": ""
                                                                           }], "items_available": 1, "offset": 0},)
         arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
-        self.assertEqual("arvados/jobs:"+arvados_cwl.__version__,
+        arvrunner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+                                                                  "portable_data_hash": "9999999999999999999999999999999b+99"}
+        self.assertEqual("9999999999999999999999999999999b+99",
                          arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
 
+
     @stubs
     def test_submit_secrets(self, stubs):
         capture_stdout = cStringIO.StringIO()
@@ -1235,7 +1284,7 @@ class TestSubmit(unittest.TestCase):
                 "/var/lib/cwl/workflow.json#main",
                 "/var/lib/cwl/cwl.input.json"
             ],
-            "container_image": "arvados/jobs:"+arvados_cwl.__version__,
+            "container_image": "999999999999999999999999999999d3+99",
             "cwd": "/var/spool/cwl",
             "mounts": {
                 "/var/lib/cwl/cwl.input.json": {
@@ -1297,7 +1346,8 @@ class TestSubmit(unittest.TestCase):
                                 "hints": [
                                     {
                                         "class": "DockerRequirement",
-                                        "dockerPull": "debian:8"
+                                        "dockerPull": "debian:8",
+                                        "http://arvados.org/cwl#dockerCollectionPDH": "999999999999999999999999999999d4+99"
                                     },
                                     {
                                         "class": "http://commonwl.org/cwltool#Secrets",
@@ -1395,7 +1445,26 @@ class TestSubmit(unittest.TestCase):
             logging.exception("")
 
         stubs.api.container_requests().update.assert_called_with(
-            uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec))
+            uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec), cluster_id="zzzzz")
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+    @stubs
+    def test_submit_container_cluster_id(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--submit-runner-cluster=zbbbb",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container), cluster_id="zbbbb")
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
@@ -1697,12 +1766,12 @@ class TestTemplateInputs(unittest.TestCase):
         "components": {
             "inputs_test.cwl": {
                 'runtime_constraints': {
-                    'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
+                    'docker_image': '999999999999999999999999999999d3+99',
                     'min_ram_mb_per_node': 1024
                 },
                 'script_parameters': {
                     'cwl:tool':
-                    '6c5ee1cd606088106d9f28367cde1e41+60/workflow.cwl#main',
+                    'a2de777156fb700f1363b1f2e370adca+60/workflow.cwl#main',
                     'optionalFloatInput': None,
                     'fileInput': {
                         'type': 'File',
@@ -1763,7 +1832,7 @@ class TestTemplateInputs(unittest.TestCase):
         params = expect_template[
             "components"]["inputs_test.cwl"]["script_parameters"]
         params["fileInput"]["value"] = '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt'
-        params["cwl:tool"] = '6c5ee1cd606088106d9f28367cde1e41+60/workflow.cwl#main'
+        params["cwl:tool"] = 'a2de777156fb700f1363b1f2e370adca+60/workflow.cwl#main'
         params["floatInput"]["value"] = 1.234
         params["boolInput"]["value"] = True
 
index c84252c7b8c135b0eb6105881dab64f70424006b..cb2e5ff56e10aee4b26162df2e07ddf4bca3f5f3 100644 (file)
@@ -25,7 +25,8 @@
             "requirements": [
                 {
                     "class": "DockerRequirement",
-                    "dockerPull": "debian:8"
+                    "dockerPull": "debian:8",
+                    "http://arvados.org/cwl#dockerCollectionPDH": "999999999999999999999999999999d4+99"
                 }
             ]
         },
index 65704b4e5cf5e7dc70949e81c5f4eb345810c403..83ba584b2084b39b3e507d203ab1bc4554ebda76 100644 (file)
@@ -8,6 +8,7 @@ $graph:
   requirements:
   - class: DockerRequirement
     dockerPull: debian:8
+    'http://arvados.org/cwl#dockerCollectionPDH': 999999999999999999999999999999d4+99
   inputs:
   - id: '#submit_tool.cwl/x'
     type: File
index aa1f18052f8afcbe289da18d597b6e66d62d3db6..d33956ccc3f74caa7d6b64958b4c9863f09bbd70 100644 (file)
@@ -20,7 +20,7 @@ ENV DEBIAN_FRONTEND noninteractive
 
 RUN apt-get update -q && apt-get install -qy git python-pip python-virtualenv python-dev libcurl4-gnutls-dev libgnutls28-dev nodejs python-pyasn1-modules
 
-RUN pip install -U setuptools six
+RUN pip install -U setuptools six requests
 
 ARG sdk
 ARG runner
index 8f576196bc4cd623076ed59c4166e4f40a48f369..9b38f07140049807947c8c3f3221966136a7a3d9 100644 (file)
@@ -51,7 +51,7 @@ setup(name='arvados-python-client',
           'google-api-python-client >=1.6.2, <1.7',
           'httplib2 >=0.9.2',
           'pycurl >=7.19.5.1',
-          'ruamel.yaml >=0.13.11, <= 0.15.26',
+          'ruamel.yaml >=0.15.54, <=0.15.77',
           'setuptools',
           'ws4py >=0.4.2',
           'subprocess32 >=3.5.1',
index 2d0bc114fbb4549da0a8696111bfead0a9ea564a..771ef2b1fba0c4009630b1e02a1df3d3b33b8247 100644 (file)
@@ -50,6 +50,7 @@ class Arvados::V1::SchemaController < ApplicationController
         defaultTrashLifetime: Rails.application.config.default_trash_lifetime,
         blobSignatureTtl: Rails.application.config.blob_signature_ttl,
         maxRequestSize: Rails.application.config.max_request_size,
+        maxItemsPerResponse: Rails.application.config.max_items_per_response,
         dockerImageFormats: Rails.application.config.docker_image_formats,
         crunchLogBytesPerEvent: Rails.application.config.crunch_log_bytes_per_event,
         crunchLogSecondsBetweenEvents: Rails.application.config.crunch_log_seconds_between_events,