*/script/rails
sdk/cwl/tests/input/blorp.txt
sdk/cwl/tests/tool/blub.txt
+sdk/cwl/tests/federation/data/*
sdk/go/manifest/testdata/*_manifest
sdk/java/.classpath
sdk/java/pom.xml
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|pyyaml|3.12|2|python|amd64
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|rdflib|4.2.2|2|python|all
debian8,debian9,ubuntu1404,centos7|shellescape|3.4.1|2|python|all
-debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|mistune|0.7.3|2|python|all
+debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|mistune|0.8.1|2|python|all
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|typing|3.6.4|2|python|all
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|avro|1.8.1|2|python|all
debian8,debian9,ubuntu1404,centos7|ruamel.ordereddict|0.4.9|2|python|amd64
debian8,debian9,centos7|six|1.10.0|2|python3|all
debian8,debian9,ubuntu1404,centos7|requests|2.12.4|2|python3|all
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|websocket-client|0.37.0|2|python3|all
-ubuntu1404|requests|2.4.3|2|python|all
+debian8,ubuntu1404,centos7|requests|2.6.1|2|python|all
centos7|contextlib2|0.5.4|2|python|all
centos7|isodate|0.5.4|2|python|all
centos7|python-daemon|2.1.2|1|python|all
centos7|psutil|5.0.1|0|python|all
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|lockfile|0.12.2|2|python|all|--epoch 1
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|subprocess32|3.5.1|2|python|all
-all|ruamel.yaml|0.14.12|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
+all|ruamel.yaml|0.15.77|0|python|amd64|--python-setup-py-arguments --single-version-externally-managed
all|cwltest|1.0.20180518074130|4|python|all|--depends 'python-futures >= 3.0.5' --depends 'python-subprocess32 >= 3.5.0'
all|junit-xml|1.8|3|python|all
all|rdflib-jsonld|0.4.0|2|python|all
import logging
import os
import sys
-import threading
-import hashlib
-import copy
-import json
import re
-from functools import partial
import pkg_resources # part of setuptools
-import Queue
-import time
-import signal
-import thread
-from cwltool.errors import WorkflowException
+from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
import cwltool.main
import cwltool.workflow
import cwltool.process
-from schema_salad.sourceline import SourceLine
-import schema_salad.validate as validate
import cwltool.argparser
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
import arvados
import arvados.config
from arvados.keep import KeepClient
from arvados.errors import ApiError
import arvados.commands._util as arv_cmd
+from arvados.api import OrderedJsonModel
-from .arvcontainer import ArvadosContainer, RunnerContainer
-from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool
-from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
-from .pathmapper import NoFollowPathMapper
-from .task_queue import TaskQueue
-from .context import ArvLoadingContext, ArvRuntimeContext
-from .util import get_current_container
from ._version import __version__
+from .executor import ArvCwlExecutor
-from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.command_line_tool import compute_checksums
-
-from arvados.api import OrderedJsonModel
+# These arn't used directly in this file but
+# other code expects to import them from here
+from .arvcontainer import ArvadosContainer
+from .arvjob import ArvadosJob
+from .arvtool import ArvadosCommandTool
+from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
+from .util import get_current_container
+from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
+from .arvworkflow import ArvadosWorkflow
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
'%(asctime)s %(name)s %(levelname)s: %(message)s',
'%Y-%m-%d %H:%M:%S'))
-DEFAULT_PRIORITY = 500
-
-class RuntimeStatusLoggingHandler(logging.Handler):
- """
- Intercepts logging calls and report them as runtime statuses on runner
- containers.
- """
- def __init__(self, runtime_status_update_func):
- super(RuntimeStatusLoggingHandler, self).__init__()
- self.runtime_status_update = runtime_status_update_func
-
- def emit(self, record):
- kind = None
- if record.levelno >= logging.ERROR:
- kind = 'error'
- elif record.levelno >= logging.WARNING:
- kind = 'warning'
- if kind is not None:
- log_msg = record.getMessage()
- if '\n' in log_msg:
- # If the logged message is multi-line, use its first line as status
- # and the rest as detail.
- status, detail = log_msg.split('\n', 1)
- self.runtime_status_update(
- kind,
- "%s: %s" % (record.name, status),
- detail
- )
- else:
- self.runtime_status_update(
- kind,
- "%s: %s" % (record.name, record.getMessage())
- )
-
-class ArvCwlRunner(object):
- """Execute a CWL tool or workflow, submit work (using either jobs or
- containers API), wait for them to complete, and report output.
-
- """
-
- def __init__(self, api_client,
- arvargs=None,
- keep_client=None,
- num_retries=4,
- thread_count=4):
-
- if arvargs is None:
- arvargs = argparse.Namespace()
- arvargs.work_api = None
- arvargs.output_name = None
- arvargs.output_tags = None
- arvargs.thread_count = 1
-
- self.api = api_client
- self.processes = {}
- self.workflow_eval_lock = threading.Condition(threading.RLock())
- self.final_output = None
- self.final_status = None
- self.num_retries = num_retries
- self.uuid = None
- self.stop_polling = threading.Event()
- self.poll_api = None
- self.pipeline = None
- self.final_output_collection = None
- self.output_name = arvargs.output_name
- self.output_tags = arvargs.output_tags
- self.project_uuid = None
- self.intermediate_output_ttl = 0
- self.intermediate_output_collections = []
- self.trash_intermediate = False
- self.thread_count = arvargs.thread_count
- self.poll_interval = 12
- self.loadingContext = None
-
- if keep_client is not None:
- self.keep_client = keep_client
- else:
- self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
-
- self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
-
- self.fetcher_constructor = partial(CollectionFetcher,
- api_client=self.api,
- fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
- num_retries=self.num_retries)
-
- self.work_api = None
- expected_api = ["jobs", "containers"]
- for api in expected_api:
- try:
- methods = self.api._rootDesc.get('resources')[api]['methods']
- if ('httpMethod' in methods['create'] and
- (arvargs.work_api == api or arvargs.work_api is None)):
- self.work_api = api
- break
- except KeyError:
- pass
-
- if not self.work_api:
- if arvargs.work_api is None:
- raise Exception("No supported APIs")
- else:
- raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
-
- if self.work_api == "jobs":
- logger.warn("""
-*******************************
-Using the deprecated 'jobs' API.
-
-To get rid of this warning:
-
-Users: read about migrating at
-http://doc.arvados.org/user/cwl/cwl-style.html#migrate
-and use the option --api=containers
-
-Admins: configure the cluster to disable the 'jobs' API as described at:
-http://doc.arvados.org/install/install-api-server.html#disable_api_methods
-*******************************""")
-
- self.loadingContext = ArvLoadingContext(vars(arvargs))
- self.loadingContext.fetcher_constructor = self.fetcher_constructor
- self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
- self.loadingContext.construct_tool_object = self.arv_make_tool
-
- # Add a custom logging handler to the root logger for runtime status reporting
- # if running inside a container
- if get_current_container(self.api, self.num_retries, logger):
- root_logger = logging.getLogger('')
- handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
- root_logger.addHandler(handler)
-
- def arv_make_tool(self, toolpath_object, loadingContext):
- if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
- return ArvadosCommandTool(self, toolpath_object, loadingContext)
- elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
- return ArvadosWorkflow(self, toolpath_object, loadingContext)
- else:
- return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
-
- def output_callback(self, out, processStatus):
- with self.workflow_eval_lock:
- if processStatus == "success":
- logger.info("Overall process status is %s", processStatus)
- state = "Complete"
- else:
- logger.error("Overall process status is %s", processStatus)
- state = "Failed"
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": state}).execute(num_retries=self.num_retries)
- self.final_status = processStatus
- self.final_output = out
- self.workflow_eval_lock.notifyAll()
-
-
- def start_run(self, runnable, runtimeContext):
- self.task_queue.add(partial(runnable.run, runtimeContext))
-
- def process_submitted(self, container):
- with self.workflow_eval_lock:
- self.processes[container.uuid] = container
-
- def process_done(self, uuid, record):
- with self.workflow_eval_lock:
- j = self.processes[uuid]
- logger.info("%s %s is %s", self.label(j), uuid, record["state"])
- self.task_queue.add(partial(j.done, record))
- del self.processes[uuid]
-
- def runtime_status_update(self, kind, message, detail=None):
- """
- Updates the runtime_status field on the runner container.
- Called when there's a need to report errors, warnings or just
- activity statuses, for example in the RuntimeStatusLoggingHandler.
- """
- with self.workflow_eval_lock:
- current = get_current_container(self.api, self.num_retries, logger)
- if current is None:
- return
- runtime_status = current.get('runtime_status', {})
- # In case of status being an error, only report the first one.
- if kind == 'error':
- if not runtime_status.get('error'):
- runtime_status.update({
- 'error': message
- })
- if detail is not None:
- runtime_status.update({
- 'errorDetail': detail
- })
- # Further errors are only mentioned as a count.
- else:
- # Get anything before an optional 'and N more' string.
- try:
- error_msg = re.match(
- r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
- more_failures = re.match(
- r'.*\(and (\d+) more\)', runtime_status.get('error'))
- except TypeError:
- # Ignore tests stubbing errors
- return
- if more_failures:
- failure_qty = int(more_failures.groups()[0])
- runtime_status.update({
- 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
- })
- else:
- runtime_status.update({
- 'error': "%s (and 1 more)" % error_msg
- })
- elif kind in ['warning', 'activity']:
- # Record the last warning/activity status without regard of
- # previous occurences.
- runtime_status.update({
- kind: message
- })
- if detail is not None:
- runtime_status.update({
- kind+"Detail": detail
- })
- else:
- # Ignore any other status kind
- return
- try:
- self.api.containers().update(uuid=current['uuid'],
- body={
- 'runtime_status': runtime_status,
- }).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.info("Couldn't update runtime_status: %s", e)
-
- def wrapped_callback(self, cb, obj, st):
- with self.workflow_eval_lock:
- cb(obj, st)
- self.workflow_eval_lock.notifyAll()
-
- def get_wrapped_callback(self, cb):
- return partial(self.wrapped_callback, cb)
-
- def on_message(self, event):
- if event.get("object_uuid") in self.processes and event["event_type"] == "update":
- uuid = event["object_uuid"]
- if event["properties"]["new_attributes"]["state"] == "Running":
- with self.workflow_eval_lock:
- j = self.processes[uuid]
- if j.running is False:
- j.running = True
- j.update_pipeline_component(event["properties"]["new_attributes"])
- logger.info("%s %s is Running", self.label(j), uuid)
- elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
- self.process_done(uuid, event["properties"]["new_attributes"])
-
- def label(self, obj):
- return "[%s %s]" % (self.work_api[0:-1], obj.name)
-
- def poll_states(self):
- """Poll status of jobs or containers listed in the processes dict.
-
- Runs in a separate thread.
- """
-
- try:
- remain_wait = self.poll_interval
- while True:
- if remain_wait > 0:
- self.stop_polling.wait(remain_wait)
- if self.stop_polling.is_set():
- break
- with self.workflow_eval_lock:
- keys = list(self.processes.keys())
- if not keys:
- remain_wait = self.poll_interval
- continue
-
- begin_poll = time.time()
- if self.work_api == "containers":
- table = self.poll_api.container_requests()
- elif self.work_api == "jobs":
- table = self.poll_api.jobs()
-
- try:
- proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warn("Error checking states on API server: %s", e)
- remain_wait = self.poll_interval
- continue
-
- for p in proc_states["items"]:
- self.on_message({
- "object_uuid": p["uuid"],
- "event_type": "update",
- "properties": {
- "new_attributes": p
- }
- })
- finish_poll = time.time()
- remain_wait = self.poll_interval - (finish_poll - begin_poll)
- except:
- logger.exception("Fatal error in state polling thread.")
- with self.workflow_eval_lock:
- self.processes.clear()
- self.workflow_eval_lock.notifyAll()
- finally:
- self.stop_polling.set()
-
- def add_intermediate_output(self, uuid):
- if uuid:
- self.intermediate_output_collections.append(uuid)
-
- def trash_intermediate_output(self):
- logger.info("Cleaning up intermediate output collections")
- for i in self.intermediate_output_collections:
- try:
- self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
- except:
- logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
- break
-
- def check_features(self, obj):
- if isinstance(obj, dict):
- if obj.get("writable") and self.work_api != "containers":
- raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
- if obj.get("class") == "DockerRequirement":
- if obj.get("dockerOutputDirectory"):
- if self.work_api != "containers":
- raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
- if not obj.get("dockerOutputDirectory").startswith('/'):
- raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
- "Option 'dockerOutputDirectory' must be an absolute path.")
- if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
- raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
- for v in obj.itervalues():
- self.check_features(v)
- elif isinstance(obj, list):
- for i,v in enumerate(obj):
- with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
- self.check_features(v)
-
- def make_output_collection(self, name, storage_classes, tagsString, outputObj):
- outputObj = copy.deepcopy(outputObj)
-
- files = []
- def capture(fileobj):
- files.append(fileobj)
-
- adjustDirObjs(outputObj, capture)
- adjustFileObjs(outputObj, capture)
-
- generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
-
- final = arvados.collection.Collection(api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
-
- for k,v in generatemapper.items():
- if k.startswith("_:"):
- if v.type == "Directory":
- continue
- if v.type == "CreateFile":
- with final.open(v.target, "wb") as f:
- f.write(v.resolved.encode("utf-8"))
- continue
-
- if not k.startswith("keep:"):
- raise Exception("Output source is not in keep or a literal")
- sp = k.split("/")
- srccollection = sp[0][5:]
- try:
- reader = self.collection_cache.get(srccollection)
- srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
- final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
- except arvados.errors.ArgumentError as e:
- logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
- raise
- except IOError as e:
- logger.warn("While preparing output collection: %s", e)
-
- def rewrite(fileobj):
- fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
- if k in fileobj:
- del fileobj[k]
-
- adjustDirObjs(outputObj, rewrite)
- adjustFileObjs(outputObj, rewrite)
-
- with final.open("cwl.output.json", "w") as f:
- json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
-
- final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
-
- logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
- final.api_response()["name"],
- final.manifest_locator())
-
- final_uuid = final.manifest_locator()
- tags = tagsString.split(',')
- for tag in tags:
- self.api.links().create(body={
- "head_uuid": final_uuid, "link_class": "tag", "name": tag
- }).execute(num_retries=self.num_retries)
-
- def finalcollection(fileobj):
- fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
-
- adjustDirObjs(outputObj, finalcollection)
- adjustFileObjs(outputObj, finalcollection)
-
- return (outputObj, final)
-
- def set_crunch_output(self):
- if self.work_api == "containers":
- current = get_current_container(self.api, self.num_retries, logger)
- if current is None:
- return
- try:
- self.api.containers().update(uuid=current['uuid'],
- body={
- 'output': self.final_output_collection.portable_data_hash(),
- }).execute(num_retries=self.num_retries)
- self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
- body={
- 'is_trashed': True
- }).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.info("Setting container output: %s", e)
- elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
- self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
- body={
- 'output': self.final_output_collection.portable_data_hash(),
- 'success': self.final_status == "success",
- 'progress':1.0
- }).execute(num_retries=self.num_retries)
-
- def arv_executor(self, tool, job_order, runtimeContext, logger=None):
- self.debug = runtimeContext.debug
-
- tool.visit(self.check_features)
-
- self.project_uuid = runtimeContext.project_uuid
- self.pipeline = None
- self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
- self.secret_store = runtimeContext.secret_store
-
- self.trash_intermediate = runtimeContext.trash_intermediate
- if self.trash_intermediate and self.work_api != "containers":
- raise Exception("--trash-intermediate is only supported with --api=containers.")
-
- self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
- if self.intermediate_output_ttl and self.work_api != "containers":
- raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
- if self.intermediate_output_ttl < 0:
- raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
-
- if runtimeContext.submit_request_uuid and self.work_api != "containers":
- raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
-
- if not runtimeContext.name:
- runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
-
- # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
- # Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool)
-
- # Reload tool object which may have been updated by
- # upload_workflow_deps
- # Don't validate this time because it will just print redundant errors.
- loadingContext = self.loadingContext.copy()
- loadingContext.loader = tool.doc_loader
- loadingContext.avsc_names = tool.doc_schema
- loadingContext.metadata = tool.metadata
- loadingContext.do_validate = False
-
- tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- loadingContext)
-
- # Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- tool, job_order)
-
- existing_uuid = runtimeContext.update_workflow
- if existing_uuid or runtimeContext.create_workflow:
- # Create a pipeline template or workflow record and exit.
- if self.work_api == "jobs":
- tmpl = RunnerTemplate(self, tool, job_order,
- runtimeContext.enable_reuse,
- uuid=existing_uuid,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map)
- tmpl.save()
- # cwltool.main will write our return value to stdout.
- return (tmpl.uuid, "success")
- elif self.work_api == "containers":
- return (upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=existing_uuid,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map),
- "success")
-
- self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
- self.eval_timeout = runtimeContext.eval_timeout
-
- runtimeContext = runtimeContext.copy()
- runtimeContext.use_container = True
- runtimeContext.tmpdir_prefix = "tmp"
- runtimeContext.work_api = self.work_api
-
- if self.work_api == "containers":
- if self.ignore_docker_for_reuse:
- raise Exception("--ignore-docker-for-reuse not supported with containers API.")
- runtimeContext.outdir = "/var/spool/cwl"
- runtimeContext.docker_outdir = "/var/spool/cwl"
- runtimeContext.tmpdir = "/tmp"
- runtimeContext.docker_tmpdir = "/tmp"
- elif self.work_api == "jobs":
- if runtimeContext.priority != DEFAULT_PRIORITY:
- raise Exception("--priority not implemented for jobs API.")
- runtimeContext.outdir = "$(task.outdir)"
- runtimeContext.docker_outdir = "$(task.outdir)"
- runtimeContext.tmpdir = "$(task.tmpdir)"
-
- if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
- raise Exception("--priority must be in the range 1..1000.")
-
- runnerjob = None
- if runtimeContext.submit:
- # Submit a runner job to run the workflow for us.
- if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
- runtimeContext.runnerjob = tool.tool["id"]
- runnerjob = tool.job(job_order,
- self.output_callback,
- runtimeContext).next()
- else:
- runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
- self.output_name,
- self.output_tags,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- on_error=runtimeContext.on_error,
- submit_runner_image=runtimeContext.submit_runner_image,
- intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
- merged_map=merged_map,
- priority=runtimeContext.priority,
- secret_store=self.secret_store)
- elif self.work_api == "jobs":
- runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
- self.output_name,
- self.output_tags,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- on_error=runtimeContext.on_error,
- submit_runner_image=runtimeContext.submit_runner_image,
- merged_map=merged_map)
- elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
- # Create pipeline for local run
- self.pipeline = self.api.pipeline_instances().create(
- body={
- "owner_uuid": self.project_uuid,
- "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
- "components": {},
- "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
- logger.info("Pipeline instance %s", self.pipeline["uuid"])
-
- if runnerjob and not runtimeContext.wait:
- submitargs = runtimeContext.copy()
- submitargs.submit = False
- runnerjob.run(submitargs)
- return (runnerjob.uuid, "success")
-
- self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
- self.polling_thread = threading.Thread(target=self.poll_states)
- self.polling_thread.start()
-
- self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
-
- if runnerjob:
- jobiter = iter((runnerjob,))
- else:
- if runtimeContext.cwl_runner_job is not None:
- self.uuid = runtimeContext.cwl_runner_job.get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- runtimeContext)
-
- try:
- self.workflow_eval_lock.acquire()
- # Holds the lock while this code runs and releases it when
- # it is safe to do so in self.workflow_eval_lock.wait(),
- # at which point on_message can update job state and
- # process output callbacks.
-
- loopperf = Perf(metrics, "jobiter")
- loopperf.__enter__()
- for runnable in jobiter:
- loopperf.__exit__()
-
- if self.stop_polling.is_set():
- break
-
- if self.task_queue.error is not None:
- raise self.task_queue.error
-
- if runnable:
- with Perf(metrics, "run"):
- self.start_run(runnable, runtimeContext)
- else:
- if (self.task_queue.in_flight + len(self.processes)) > 0:
- self.workflow_eval_lock.wait(3)
- else:
- logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
- break
- loopperf.__enter__()
- loopperf.__exit__()
-
- while (self.task_queue.in_flight + len(self.processes)) > 0:
- if self.task_queue.error is not None:
- raise self.task_queue.error
- self.workflow_eval_lock.wait(3)
-
- except UnsupportedRequirement:
- raise
- except:
- if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
- logger.error("Interrupted, workflow will be cancelled")
- else:
- logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- if runnerjob and runnerjob.uuid and self.work_api == "containers":
- self.api.container_requests().update(uuid=runnerjob.uuid,
- body={"priority": "0"}).execute(num_retries=self.num_retries)
- finally:
- self.workflow_eval_lock.release()
- self.task_queue.drain()
- self.stop_polling.set()
- self.polling_thread.join()
- self.task_queue.join()
-
- if self.final_status == "UnsupportedRequirement":
- raise UnsupportedRequirement("Check log for details.")
-
- if self.final_output is None:
- raise WorkflowException("Workflow did not return a result.")
-
- if runtimeContext.submit and isinstance(runnerjob, Runner):
- logger.info("Final output collection %s", runnerjob.final_output)
- else:
- if self.output_name is None:
- self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
- if self.output_tags is None:
- self.output_tags = ""
-
- storage_classes = runtimeContext.storage_classes.strip().split(",")
- self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
- self.set_crunch_output()
-
- if runtimeContext.compute_checksum:
- adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
- adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
-
- if self.trash_intermediate and self.final_status == "success":
- self.trash_intermediate_output()
-
- return (self.final_output, self.final_status)
-
-
def versionstring():
"""Print version string of key packages for provenance and debugging."""
help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
default=None)
- parser.add_argument("--submit-request-uuid", type=str,
- default=None,
- help="Update and commit supplied container request instead of creating a new one (containers API only).")
+ parser.add_argument("--always-submit-runner", action="store_true",
+ help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
+ default=False)
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--submit-request-uuid", type=str,
+ default=None,
+ help="Update and commit to supplied container request instead of creating a new one (containers API only).",
+ metavar="UUID")
+ exgroup.add_argument("--submit-runner-cluster", type=str,
+ help="Submit workflow runner to a remote cluster (containers API only)",
+ default=None,
+ metavar="CLUSTER_ID")
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
"http://arvados.org/cwl#APIRequirement",
"http://commonwl.org/cwltool#LoadListingRequirement",
"http://arvados.org/cwl#IntermediateOutput",
- "http://arvados.org/cwl#ReuseRequirement"
+ "http://arvados.org/cwl#ReuseRequirement",
+ "http://arvados.org/cwl#ClusterTarget"
])
def exit_signal_handler(sigcode, frame):
add_arv_hints()
+ for key, val in cwltool.argparser.get_default_args().items():
+ if not hasattr(arvargs, key):
+ setattr(arvargs, key, val)
+
try:
if api_client is None:
api_client = arvados.safeapi.ThreadSafeApiCache(
api_client.users().current().execute()
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
- runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
+ executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
except Exception as e:
logger.error(e)
return 1
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
- for key, val in cwltool.argparser.get_default_args().items():
- if not hasattr(arvargs, key):
- setattr(arvargs, key, val)
-
- runtimeContext = ArvRuntimeContext(vars(arvargs))
- runtimeContext.make_fs_access = partial(CollectionFsAccess,
- collection_cache=runner.collection_cache)
- runtimeContext.http_timeout = arvargs.http_timeout
-
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
- executor=runner.arv_executor,
+ executor=executor.arv_executor,
versionfunc=versionstring,
job_order_object=job_order_object,
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
- loadingContext=runner.loadingContext,
- runtimeContext=runtimeContext)
+ loadingContext=executor.loadingContext,
+ runtimeContext=executor.runtimeContext)
coresMin:
type: int?
doc: Minimum cores allocated to cwl-runner
- jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
\ No newline at end of file
+ jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
+
+- name: ClusterTarget
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify where a workflow step should run
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:ClusterTarget'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ cluster_id:
+ type: string?
+ doc: The cluster to run the container
+ project_uuid:
+ type: string?
+ doc: The project that will own the container requests and intermediate collections
import uuid
import math
-from arvados_cwl.util import get_current_container, get_intermediate_collection_info
+import arvados_cwl.util
import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
class ArvadosContainer(JobBase):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
- def __init__(self, runner,
+ def __init__(self, runner, job_runtime,
builder, # type: Builder
joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
make_path_mapper, # type: Callable[..., PathMapper]
):
super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
self.arvrunner = runner
+ self.job_runtime = job_runtime
self.running = False
self.uuid = None
# ArvadosContainer object by CommandLineTool.job() before
# run() is called.
+ runtimeContext = self.job_runtime
+
container_request = {
"command": self.command_line,
"name": self.name,
keepemptydirs(vwd)
if not runtimeContext.current_container:
- runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
- info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+ runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
vwd.save_new(name=info["name"],
owner_uuid=self.arvrunner.project_uuid,
ensure_unique_name=True,
docker_req = {"dockerImageId": "arvados/jobs"}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
- docker_req,
- runtimeContext.pull_image,
- self.arvrunner.project_uuid)
+ docker_req,
+ runtimeContext.pull_image,
+ self.arvrunner.project_uuid)
api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
if api_req:
if self.timelimit is not None:
scheduling_parameters["max_run_time"] = self.timelimit
+ extra_submit_params = {}
+ if runtimeContext.submit_runner_cluster:
+ extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
+
container_request["output_name"] = "Output for step %s" % (self.name)
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
uuid=runtimeContext.submit_request_uuid,
- body=container_request
+ body=container_request,
+ **extra_submit_params
).execute(num_retries=self.arvrunner.num_retries)
else:
response = self.arvrunner.api.container_requests().create(
- body=container_request
+ body=container_request,
+ **extra_submit_params
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
if self.arvrunner.project_uuid:
job_spec["owner_uuid"] = self.arvrunner.project_uuid
+ extra_submit_params = {}
+ if runtimeContext.submit_runner_cluster:
+ extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
+
if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
uuid=runtimeContext.submit_request_uuid,
- body=job_spec
+ body=job_spec,
+ **extra_submit_params
).execute(num_retries=self.arvrunner.num_retries)
else:
response = self.arvrunner.api.container_requests().create(
- body=job_spec
+ body=job_spec,
+ **extra_submit_params
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+ if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
+ return dockerRequirement["http://arvados.org/cwl#dockerCollectionPDH"]
+
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
dockerRequirement = copy.deepcopy(dockerRequirement)
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
global cached_lookups_lock
with cached_lookups_lock:
if dockerRequirement["dockerImageId"] in cached_lookups:
- return dockerRequirement["dockerImageId"]
+ return cached_lookups[dockerRequirement["dockerImageId"]]
with SourceLine(dockerRequirement, "dockerImageId", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
sp = dockerRequirement["dockerImageId"].split(":")
if not images:
raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag))
+ pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+
with cached_lookups_lock:
- cached_lookups[dockerRequirement["dockerImageId"]] = True
+ cached_lookups[dockerRequirement["dockerImageId"]] = pdh
- return dockerRequirement["dockerImageId"]
+ return pdh
def arv_docker_clear_cache():
global cached_lookups
from schema_salad.sourceline import SourceLine
-from arvados_cwl.util import get_current_container, get_intermediate_collection_info
+import arvados_cwl.util
import ruamel.yaml as yaml
import arvados.collection
from .perf import Perf
from . import done
from ._version import __version__
+from .util import get_intermediate_collection_info
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
if vwd:
with Perf(metrics, "generatefiles.save_new %s" % self.name):
- if not runtimeContext.current_container:
- runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
- info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+ info = get_intermediate_collection_info(self.name, None, runtimeContext.intermediate_output_ttl)
vwd.save_new(name=info["name"],
owner_uuid=self.arvrunner.project_uuid,
ensure_unique_name=True,
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
from functools import partial
+from schema_salad.sourceline import SourceLine
+from cwltool.errors import WorkflowException
+
+def validate_cluster_target(arvrunner, runtimeContext):
+ if (runtimeContext.submit_runner_cluster and
+ runtimeContext.submit_runner_cluster not in arvrunner.api._rootDesc["remoteHosts"] and
+ runtimeContext.submit_runner_cluster != arvrunner.api._rootDesc["uuidPrefix"]):
+ raise WorkflowException("Unknown or invalid cluster id '%s' known remote clusters are %s" % (runtimeContext.submit_runner_cluster,
+ ", ".join(arvrunner.api._rootDesc["remoteHosts"].keys())))
+def set_cluster_target(tool, arvrunner, builder, runtimeContext):
+ cluster_target_req = None
+ for field in ("hints", "requirements"):
+ if field not in tool:
+ continue
+ for item in tool[field]:
+ if item["class"] == "http://arvados.org/cwl#ClusterTarget":
+ cluster_target_req = item
+
+ if cluster_target_req is None:
+ return runtimeContext
+
+ with SourceLine(cluster_target_req, None, WorkflowException, runtimeContext.debug):
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.submit_runner_cluster = builder.do_eval(cluster_target_req.get("cluster_id")) or runtimeContext.submit_runner_cluster
+ runtimeContext.project_uuid = builder.do_eval(cluster_target_req.get("project_uuid")) or runtimeContext.project_uuid
+ validate_cluster_target(arvrunner, runtimeContext)
+
+ return runtimeContext
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
def make_job_runner(self, runtimeContext):
if runtimeContext.work_api == "containers":
- return partial(ArvadosContainer, self.arvrunner)
+ return partial(ArvadosContainer, self.arvrunner, runtimeContext)
elif runtimeContext.work_api == "jobs":
return partial(ArvadosJob, self.arvrunner)
else:
"$(task.keep)/%s/%s")
def job(self, joborder, output_callback, runtimeContext):
-
- # Workaround for #13365
- builderargs = runtimeContext.copy()
- builderargs.toplevel = True
- builderargs.tmp_outdir_prefix = ""
- builder = self._init_job(joborder, builderargs)
- joborder = builder.job
-
- runtimeContext = runtimeContext.copy()
+ builder = self._init_job(joborder, runtimeContext)
+ runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
if runtimeContext.work_api == "containers":
dockerReq, is_req = self.get_requirement("DockerRequirement")
from cwltool.pack import pack
from cwltool.load_tool import fetch_document
from cwltool.process import shortname
-from cwltool.workflow import Workflow, WorkflowException
+from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import Builder
from cwltool.context import LoadingContext
from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool
+from .arvtool import ArvadosCommandTool, set_cluster_target
from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
overall_res_req["class"] = "ResourceRequirement"
return cmap(overall_res_req)
+class ArvadosWorkflowStep(WorkflowStep):
+ def __init__(self,
+ toolpath_object, # type: Dict[Text, Any]
+ pos, # type: int
+ loadingContext, # type: LoadingContext
+ arvrunner,
+ *argc,
+ **argv
+ ): # type: (...) -> None
+
+ super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+ self.tool["class"] = "WorkflowStep"
+ self.arvrunner = arvrunner
+
+ def job(self, joborder, output_callback, runtimeContext):
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.toplevel = True # Preserve behavior for #13365
+
+ builder = self._init_job({shortname(k): v for k,v in joborder.items()}, runtimeContext)
+ runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
+ return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
+
+
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
def __init__(self, arvrunner, toolpath_object, loadingContext):
- super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
self.wf_pdh = None
self.dynamic_resource_req = []
self.static_resource_req = []
self.wf_reffiles = []
self.loadingContext = loadingContext
+ super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+ self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
def job(self, joborder, output_callback, runtimeContext):
+
+ builder = self._init_job(joborder, runtimeContext)
+ runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
+
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
- if req:
- with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
- if "id" not in self.tool:
- raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
- document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
+ if not req:
+ return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
- discover_secondary_files(self.tool["inputs"], joborder)
+ # RunInSingleContainer is true
+
+ with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
+ if "id" not in self.tool:
+ raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
+ document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
+
+ discover_secondary_files(self.tool["inputs"], joborder)
+
+ with Perf(metrics, "subworkflow upload_deps"):
+ upload_dependencies(self.arvrunner,
+ os.path.basename(joborder.get("id", "#")),
+ document_loader,
+ joborder,
+ joborder.get("id", "#"),
+ False)
+
+ if self.wf_pdh is None:
+ workflowobj["requirements"] = dedup_reqs(self.requirements)
+ workflowobj["hints"] = dedup_reqs(self.hints)
+
+ packed = pack(document_loader, workflowobj, uri, self.metadata)
+
+ def visit(item):
+ for t in ("hints", "requirements"):
+ if t not in item:
+ continue
+ for req in item[t]:
+ if req["class"] == "ResourceRequirement":
+ dyn = False
+ for k in max_res_pars + sum_res_pars:
+ if k in req:
+ if isinstance(req[k], basestring):
+ if item["id"] == "#main":
+ # only the top-level requirements/hints may contain expressions
+ self.dynamic_resource_req.append(req)
+ dyn = True
+ break
+ else:
+ with SourceLine(req, k, WorkflowException):
+ raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
+ if not dyn:
+ self.static_resource_req.append(req)
+
+ visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
+
+ if self.static_resource_req:
+ self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
- with Perf(metrics, "subworkflow upload_deps"):
upload_dependencies(self.arvrunner,
- os.path.basename(joborder.get("id", "#")),
+ runtimeContext.name,
document_loader,
- joborder,
- joborder.get("id", "#"),
+ packed,
+ uri,
False)
- if self.wf_pdh is None:
- workflowobj["requirements"] = dedup_reqs(self.requirements)
- workflowobj["hints"] = dedup_reqs(self.hints)
-
- packed = pack(document_loader, workflowobj, uri, self.metadata)
-
- builder = Builder(joborder,
- requirements=workflowobj["requirements"],
- hints=workflowobj["hints"],
- resources={})
-
- def visit(item):
- for t in ("hints", "requirements"):
- if t not in item:
- continue
- for req in item[t]:
- if req["class"] == "ResourceRequirement":
- dyn = False
- for k in max_res_pars + sum_res_pars:
- if k in req:
- if isinstance(req[k], basestring):
- if item["id"] == "#main":
- # only the top-level requirements/hints may contain expressions
- self.dynamic_resource_req.append(req)
- dyn = True
- break
- else:
- with SourceLine(req, k, WorkflowException):
- raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
- if not dyn:
- self.static_resource_req.append(req)
-
- visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
-
- if self.static_resource_req:
- self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
-
- upload_dependencies(self.arvrunner,
- runtimeContext.name,
- document_loader,
- packed,
- uri,
- False)
-
- # Discover files/directories referenced by the
- # workflow (mainly "default" values)
- visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
-
-
- if self.dynamic_resource_req:
- builder = Builder(joborder,
- requirements=self.requirements,
- hints=self.hints,
- resources={})
-
- # Evaluate dynamic resource requirements using current builder
- rs = copy.copy(self.static_resource_req)
- for dyn_rs in self.dynamic_resource_req:
- eval_req = {"class": "ResourceRequirement"}
- for a in max_res_pars + sum_res_pars:
- if a in dyn_rs:
- eval_req[a] = builder.do_eval(dyn_rs[a])
- rs.append(eval_req)
- job_res_reqs = [get_overall_res_req(rs)]
- else:
- job_res_reqs = self.static_resource_req
-
- with Perf(metrics, "subworkflow adjust"):
- joborder_resolved = copy.deepcopy(joborder)
- joborder_keepmount = copy.deepcopy(joborder)
-
- reffiles = []
- visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
-
- mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
- "/keep/%s",
- "/keep/%s/%s")
-
- # For containers API, we need to make sure any extra
- # referenced files (ie referenced by the workflow but
- # not in the inputs) are included in the mounts.
- if self.wf_reffiles:
- runtimeContext = runtimeContext.copy()
- runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
-
- def keepmount(obj):
- remove_redundant_fields(obj)
- with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
- if "location" not in obj:
- raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
- with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
- if obj["location"].startswith("keep:"):
- obj["location"] = mapper.mapper(obj["location"]).target
- if "listing" in obj:
- del obj["listing"]
- elif obj["location"].startswith("_:"):
- del obj["location"]
- else:
- raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
-
- visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
-
- def resolved(obj):
- if obj["location"].startswith("keep:"):
- obj["location"] = mapper.mapper(obj["location"]).resolved
-
- visit_class(joborder_resolved, ("File", "Directory"), resolved)
-
- if self.wf_pdh is None:
- adjustFileObjs(packed, keepmount)
- adjustDirObjs(packed, keepmount)
- self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
-
- wf_runner = cmap({
- "class": "CommandLineTool",
- "baseCommand": "cwltool",
- "inputs": self.tool["inputs"],
- "outputs": self.tool["outputs"],
- "stdout": "cwl.output.json",
- "requirements": self.requirements+job_res_reqs+[
- {"class": "InlineJavascriptRequirement"},
- {
- "class": "InitialWorkDirRequirement",
- "listing": [{
- "entryname": "workflow.cwl",
- "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
- }, {
- "entryname": "cwl.input.yml",
- "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
- }]
- }],
- "hints": self.hints,
- "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
- "id": "#"
- })
- return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
+ # Discover files/directories referenced by the
+ # workflow (mainly "default" values)
+ visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
+
+
+ if self.dynamic_resource_req:
+ # Evaluate dynamic resource requirements using current builder
+ rs = copy.copy(self.static_resource_req)
+ for dyn_rs in self.dynamic_resource_req:
+ eval_req = {"class": "ResourceRequirement"}
+ for a in max_res_pars + sum_res_pars:
+ if a in dyn_rs:
+ eval_req[a] = builder.do_eval(dyn_rs[a])
+ rs.append(eval_req)
+ job_res_reqs = [get_overall_res_req(rs)]
else:
- return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
+ job_res_reqs = self.static_resource_req
+
+ with Perf(metrics, "subworkflow adjust"):
+ joborder_resolved = copy.deepcopy(joborder)
+ joborder_keepmount = copy.deepcopy(joborder)
+
+ reffiles = []
+ visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
+
+ mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
+ "/keep/%s",
+ "/keep/%s/%s")
+
+ # For containers API, we need to make sure any extra
+ # referenced files (ie referenced by the workflow but
+ # not in the inputs) are included in the mounts.
+ if self.wf_reffiles:
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
+
+ def keepmount(obj):
+ remove_redundant_fields(obj)
+ with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
+ if "location" not in obj:
+ raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
+ with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
+ if obj["location"].startswith("keep:"):
+ obj["location"] = mapper.mapper(obj["location"]).target
+ if "listing" in obj:
+ del obj["listing"]
+ elif obj["location"].startswith("_:"):
+ del obj["location"]
+ else:
+ raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
+
+ visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
+
+ def resolved(obj):
+ if obj["location"].startswith("keep:"):
+ obj["location"] = mapper.mapper(obj["location"]).resolved
+
+ visit_class(joborder_resolved, ("File", "Directory"), resolved)
+
+ if self.wf_pdh is None:
+ adjustFileObjs(packed, keepmount)
+ adjustDirObjs(packed, keepmount)
+ self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+
+ wf_runner = cmap({
+ "class": "CommandLineTool",
+ "baseCommand": "cwltool",
+ "inputs": self.tool["inputs"],
+ "outputs": self.tool["outputs"],
+ "stdout": "cwl.output.json",
+ "requirements": self.requirements+job_res_reqs+[
+ {"class": "InlineJavascriptRequirement"},
+ {
+ "class": "InitialWorkDirRequirement",
+ "listing": [{
+ "entryname": "workflow.cwl",
+ "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
+ }, {
+ "entryname": "cwl.input.yml",
+ "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+ }]
+ }],
+ "hints": self.hints,
+ "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
+ "id": "#"
+ })
+ return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
+
+ def make_workflow_step(self,
+ toolpath_object, # type: Dict[Text, Any]
+ pos, # type: int
+ loadingContext, # type: LoadingContext
+ *argc,
+ **argv
+ ):
+ # (...) -> WorkflowStep
+ return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)
# SPDX-License-Identifier: Apache-2.0
from cwltool.context import LoadingContext, RuntimeContext
+from collections import namedtuple
class ArvLoadingContext(LoadingContext):
def __init__(self, kwargs=None):
self.storage_classes = "default"
self.current_container = None
self.http_timeout = 300
+ self.submit_runner_cluster = None
+ self.cluster_target_id = 0
+ self.always_submit_runner = False
super(ArvRuntimeContext, self).__init__(kwargs)
+
+ if self.submit_request_uuid:
+ self.submit_runner_cluster = self.submit_request_uuid[0:5]
arvargs.output_tags = output_tags
arvargs.thread_count = 1
- runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
+ runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
arvargs=arvargs)
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import argparse
+import logging
+import os
+import sys
+import threading
+import copy
+import json
+import re
+from functools import partial
+import time
+
+from cwltool.errors import WorkflowException
+import cwltool.workflow
+from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
+
+import arvados
+import arvados.config
+from arvados.keep import KeepClient
+from arvados.errors import ApiError
+
+import arvados_cwl.util
+from .arvcontainer import RunnerContainer
+from .arvjob import RunnerJob, RunnerTemplate
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .arvtool import ArvadosCommandTool, validate_cluster_target
+from .arvworkflow import ArvadosWorkflow, upload_workflow
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
+from .perf import Perf
+from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
+from .context import ArvLoadingContext, ArvRuntimeContext
+from ._version import __version__
+
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.command_line_tool import compute_checksums
+
+logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
+
+DEFAULT_PRIORITY = 500
+
+class RuntimeStatusLoggingHandler(logging.Handler):
+ """
+ Intercepts logging calls and report them as runtime statuses on runner
+ containers.
+ """
+ def __init__(self, runtime_status_update_func):
+ super(RuntimeStatusLoggingHandler, self).__init__()
+ self.runtime_status_update = runtime_status_update_func
+
+ def emit(self, record):
+ kind = None
+ if record.levelno >= logging.ERROR:
+ kind = 'error'
+ elif record.levelno >= logging.WARNING:
+ kind = 'warning'
+ if kind is not None:
+ log_msg = record.getMessage()
+ if '\n' in log_msg:
+ # If the logged message is multi-line, use its first line as status
+ # and the rest as detail.
+ status, detail = log_msg.split('\n', 1)
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, status),
+ detail
+ )
+ else:
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, record.getMessage())
+ )
+
+class ArvCwlExecutor(object):
+ """Execute a CWL tool or workflow, submit work (using either jobs or
+ containers API), wait for them to complete, and report output.
+
+ """
+
+ def __init__(self, api_client,
+ arvargs=None,
+ keep_client=None,
+ num_retries=4,
+ thread_count=4):
+
+ if arvargs is None:
+ arvargs = argparse.Namespace()
+ arvargs.work_api = None
+ arvargs.output_name = None
+ arvargs.output_tags = None
+ arvargs.thread_count = 1
+
+ self.api = api_client
+ self.processes = {}
+ self.workflow_eval_lock = threading.Condition(threading.RLock())
+ self.final_output = None
+ self.final_status = None
+ self.num_retries = num_retries
+ self.uuid = None
+ self.stop_polling = threading.Event()
+ self.poll_api = None
+ self.pipeline = None
+ self.final_output_collection = None
+ self.output_name = arvargs.output_name
+ self.output_tags = arvargs.output_tags
+ self.project_uuid = None
+ self.intermediate_output_ttl = 0
+ self.intermediate_output_collections = []
+ self.trash_intermediate = False
+ self.thread_count = arvargs.thread_count
+ self.poll_interval = 12
+ self.loadingContext = None
+
+ if keep_client is not None:
+ self.keep_client = keep_client
+ else:
+ self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
+
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
+ self.fetcher_constructor = partial(CollectionFetcher,
+ api_client=self.api,
+ fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+ num_retries=self.num_retries)
+
+ self.work_api = None
+ expected_api = ["jobs", "containers"]
+ for api in expected_api:
+ try:
+ methods = self.api._rootDesc.get('resources')[api]['methods']
+ if ('httpMethod' in methods['create'] and
+ (arvargs.work_api == api or arvargs.work_api is None)):
+ self.work_api = api
+ break
+ except KeyError:
+ pass
+
+ if not self.work_api:
+ if arvargs.work_api is None:
+ raise Exception("No supported APIs")
+ else:
+ raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
+
+ if self.work_api == "jobs":
+ logger.warn("""
+*******************************
+Using the deprecated 'jobs' API.
+
+To get rid of this warning:
+
+Users: read about migrating at
+http://doc.arvados.org/user/cwl/cwl-style.html#migrate
+and use the option --api=containers
+
+Admins: configure the cluster to disable the 'jobs' API as described at:
+http://doc.arvados.org/install/install-api-server.html#disable_api_methods
+*******************************""")
+
+ self.loadingContext = ArvLoadingContext(vars(arvargs))
+ self.loadingContext.fetcher_constructor = self.fetcher_constructor
+ self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
+ self.loadingContext.construct_tool_object = self.arv_make_tool
+
+ # Add a custom logging handler to the root logger for runtime status reporting
+ # if running inside a container
+ if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
+ root_logger = logging.getLogger('')
+ handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
+ root_logger.addHandler(handler)
+
+ self.runtimeContext = ArvRuntimeContext(vars(arvargs))
+ self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ collection_cache=self.collection_cache)
+
+ validate_cluster_target(self, self.runtimeContext)
+
+
+ def arv_make_tool(self, toolpath_object, loadingContext):
+ if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
+ return ArvadosCommandTool(self, toolpath_object, loadingContext)
+ elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
+ return ArvadosWorkflow(self, toolpath_object, loadingContext)
+ else:
+ return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
+
+ def output_callback(self, out, processStatus):
+ with self.workflow_eval_lock:
+ if processStatus == "success":
+ logger.info("Overall process status is %s", processStatus)
+ state = "Complete"
+ else:
+ logger.error("Overall process status is %s", processStatus)
+ state = "Failed"
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": state}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
+ self.final_output = out
+ self.workflow_eval_lock.notifyAll()
+
+
+ def start_run(self, runnable, runtimeContext):
+ self.task_queue.add(partial(runnable.run, runtimeContext))
+
+ def process_submitted(self, container):
+ with self.workflow_eval_lock:
+ self.processes[container.uuid] = container
+
+ def process_done(self, uuid, record):
+ with self.workflow_eval_lock:
+ j = self.processes[uuid]
+ logger.info("%s %s is %s", self.label(j), uuid, record["state"])
+ self.task_queue.add(partial(j.done, record))
+ del self.processes[uuid]
+
+ def runtime_status_update(self, kind, message, detail=None):
+ """
+ Updates the runtime_status field on the runner container.
+ Called when there's a need to report errors, warnings or just
+ activity statuses, for example in the RuntimeStatusLoggingHandler.
+ """
+ with self.workflow_eval_lock:
+ current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
+ if current is None:
+ return
+ runtime_status = current.get('runtime_status', {})
+ # In case of status being an error, only report the first one.
+ if kind == 'error':
+ if not runtime_status.get('error'):
+ runtime_status.update({
+ 'error': message
+ })
+ if detail is not None:
+ runtime_status.update({
+ 'errorDetail': detail
+ })
+ # Further errors are only mentioned as a count.
+ else:
+ # Get anything before an optional 'and N more' string.
+ try:
+ error_msg = re.match(
+ r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+ more_failures = re.match(
+ r'.*\(and (\d+) more\)', runtime_status.get('error'))
+ except TypeError:
+ # Ignore tests stubbing errors
+ return
+ if more_failures:
+ failure_qty = int(more_failures.groups()[0])
+ runtime_status.update({
+ 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
+ })
+ else:
+ runtime_status.update({
+ 'error': "%s (and 1 more)" % error_msg
+ })
+ elif kind in ['warning', 'activity']:
+ # Record the last warning/activity status without regard of
+ # previous occurences.
+ runtime_status.update({
+ kind: message
+ })
+ if detail is not None:
+ runtime_status.update({
+ kind+"Detail": detail
+ })
+ else:
+ # Ignore any other status kind
+ return
+ try:
+ self.api.containers().update(uuid=current['uuid'],
+ body={
+ 'runtime_status': runtime_status,
+ }).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.info("Couldn't update runtime_status: %s", e)
+
+ def wrapped_callback(self, cb, obj, st):
+ with self.workflow_eval_lock:
+ cb(obj, st)
+ self.workflow_eval_lock.notifyAll()
+
+ def get_wrapped_callback(self, cb):
+ return partial(self.wrapped_callback, cb)
+
+ def on_message(self, event):
+ if event.get("object_uuid") in self.processes and event["event_type"] == "update":
+ uuid = event["object_uuid"]
+ if event["properties"]["new_attributes"]["state"] == "Running":
+ with self.workflow_eval_lock:
+ j = self.processes[uuid]
+ if j.running is False:
+ j.running = True
+ j.update_pipeline_component(event["properties"]["new_attributes"])
+ logger.info("%s %s is Running", self.label(j), uuid)
+ elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
+ self.process_done(uuid, event["properties"]["new_attributes"])
+
+ def label(self, obj):
+ return "[%s %s]" % (self.work_api[0:-1], obj.name)
+
+ def poll_states(self):
+ """Poll status of jobs or containers listed in the processes dict.
+
+ Runs in a separate thread.
+ """
+
+ try:
+ remain_wait = self.poll_interval
+ while True:
+ if remain_wait > 0:
+ self.stop_polling.wait(remain_wait)
+ if self.stop_polling.is_set():
+ break
+ with self.workflow_eval_lock:
+ keys = list(self.processes.keys())
+ if not keys:
+ remain_wait = self.poll_interval
+ continue
+
+ begin_poll = time.time()
+ if self.work_api == "containers":
+ table = self.poll_api.container_requests()
+ elif self.work_api == "jobs":
+ table = self.poll_api.jobs()
+
+ pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
+
+ while keys:
+ page = keys[:pageSize]
+ keys = keys[pageSize:]
+ try:
+ proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ remain_wait = self.poll_interval
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
+ finish_poll = time.time()
+ remain_wait = self.poll_interval - (finish_poll - begin_poll)
+ except:
+ logger.exception("Fatal error in state polling thread.")
+ with self.workflow_eval_lock:
+ self.processes.clear()
+ self.workflow_eval_lock.notifyAll()
+ finally:
+ self.stop_polling.set()
+
+ def add_intermediate_output(self, uuid):
+ if uuid:
+ self.intermediate_output_collections.append(uuid)
+
+ def trash_intermediate_output(self):
+ logger.info("Cleaning up intermediate output collections")
+ for i in self.intermediate_output_collections:
+ try:
+ self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
+ except:
+ logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+ break
+
+ def check_features(self, obj):
+ if isinstance(obj, dict):
+ if obj.get("writable") and self.work_api != "containers":
+ raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
+ if obj.get("class") == "DockerRequirement":
+ if obj.get("dockerOutputDirectory"):
+ if self.work_api != "containers":
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+ if not obj.get("dockerOutputDirectory").startswith('/'):
+ raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+ "Option 'dockerOutputDirectory' must be an absolute path.")
+ if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
+ raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
+ for v in obj.itervalues():
+ self.check_features(v)
+ elif isinstance(obj, list):
+ for i,v in enumerate(obj):
+ with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
+ self.check_features(v)
+
+ def make_output_collection(self, name, storage_classes, tagsString, outputObj):
+ outputObj = copy.deepcopy(outputObj)
+
+ files = []
+ def capture(fileobj):
+ files.append(fileobj)
+
+ adjustDirObjs(outputObj, capture)
+ adjustFileObjs(outputObj, capture)
+
+ generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
+
+ final = arvados.collection.Collection(api_client=self.api,
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
+
+ for k,v in generatemapper.items():
+ if k.startswith("_:"):
+ if v.type == "Directory":
+ continue
+ if v.type == "CreateFile":
+ with final.open(v.target, "wb") as f:
+ f.write(v.resolved.encode("utf-8"))
+ continue
+
+ if not k.startswith("keep:"):
+ raise Exception("Output source is not in keep or a literal")
+ sp = k.split("/")
+ srccollection = sp[0][5:]
+ try:
+ reader = self.collection_cache.get(srccollection)
+ srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
+ final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+ except arvados.errors.ArgumentError as e:
+ logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+ raise
+ except IOError as e:
+ logger.warn("While preparing output collection: %s", e)
+
+ def rewrite(fileobj):
+ fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
+ for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
+ if k in fileobj:
+ del fileobj[k]
+
+ adjustDirObjs(outputObj, rewrite)
+ adjustFileObjs(outputObj, rewrite)
+
+ with final.open("cwl.output.json", "w") as f:
+ json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
+
+ final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
+
+ logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
+ final.api_response()["name"],
+ final.manifest_locator())
+
+ final_uuid = final.manifest_locator()
+ tags = tagsString.split(',')
+ for tag in tags:
+ self.api.links().create(body={
+ "head_uuid": final_uuid, "link_class": "tag", "name": tag
+ }).execute(num_retries=self.num_retries)
+
+ def finalcollection(fileobj):
+ fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+ adjustDirObjs(outputObj, finalcollection)
+ adjustFileObjs(outputObj, finalcollection)
+
+ return (outputObj, final)
+
+ def set_crunch_output(self):
+ if self.work_api == "containers":
+ current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
+ if current is None:
+ return
+ try:
+ self.api.containers().update(uuid=current['uuid'],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ }).execute(num_retries=self.num_retries)
+ self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
+ body={
+ 'is_trashed': True
+ }).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.info("Setting container output: %s", e)
+ elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
+ self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ 'success': self.final_status == "success",
+ 'progress':1.0
+ }).execute(num_retries=self.num_retries)
+
+ def arv_executor(self, tool, job_order, runtimeContext, logger=None):
+ self.debug = runtimeContext.debug
+
+ tool.visit(self.check_features)
+
+ self.project_uuid = runtimeContext.project_uuid
+ self.pipeline = None
+ self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+ self.secret_store = runtimeContext.secret_store
+
+ self.trash_intermediate = runtimeContext.trash_intermediate
+ if self.trash_intermediate and self.work_api != "containers":
+ raise Exception("--trash-intermediate is only supported with --api=containers.")
+
+ self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
+ if self.intermediate_output_ttl and self.work_api != "containers":
+ raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
+ if self.intermediate_output_ttl < 0:
+ raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
+
+ if runtimeContext.submit_request_uuid and self.work_api != "containers":
+ raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+
+ if not runtimeContext.name:
+ runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+
+ # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+ # Also uploads docker images.
+ merged_map = upload_workflow_deps(self, tool)
+
+ # Reload tool object which may have been updated by
+ # upload_workflow_deps
+ # Don't validate this time because it will just print redundant errors.
+ loadingContext = self.loadingContext.copy()
+ loadingContext.loader = tool.doc_loader
+ loadingContext.avsc_names = tool.doc_schema
+ loadingContext.metadata = tool.metadata
+ loadingContext.do_validate = False
+
+ tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
+ loadingContext)
+
+ # Upload local file references in the job order.
+ job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+ tool, job_order)
+
+ existing_uuid = runtimeContext.update_workflow
+ if existing_uuid or runtimeContext.create_workflow:
+ # Create a pipeline template or workflow record and exit.
+ if self.work_api == "jobs":
+ tmpl = RunnerTemplate(self, tool, job_order,
+ runtimeContext.enable_reuse,
+ uuid=existing_uuid,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ merged_map=merged_map)
+ tmpl.save()
+ # cwltool.main will write our return value to stdout.
+ return (tmpl.uuid, "success")
+ elif self.work_api == "containers":
+ return (upload_workflow(self, tool, job_order,
+ self.project_uuid,
+ uuid=existing_uuid,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ merged_map=merged_map),
+ "success")
+
+ self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
+ self.eval_timeout = runtimeContext.eval_timeout
+
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.use_container = True
+ runtimeContext.tmpdir_prefix = "tmp"
+ runtimeContext.work_api = self.work_api
+
+ if self.work_api == "containers":
+ if self.ignore_docker_for_reuse:
+ raise Exception("--ignore-docker-for-reuse not supported with containers API.")
+ runtimeContext.outdir = "/var/spool/cwl"
+ runtimeContext.docker_outdir = "/var/spool/cwl"
+ runtimeContext.tmpdir = "/tmp"
+ runtimeContext.docker_tmpdir = "/tmp"
+ elif self.work_api == "jobs":
+ if runtimeContext.priority != DEFAULT_PRIORITY:
+ raise Exception("--priority not implemented for jobs API.")
+ runtimeContext.outdir = "$(task.outdir)"
+ runtimeContext.docker_outdir = "$(task.outdir)"
+ runtimeContext.tmpdir = "$(task.tmpdir)"
+
+ if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
+ raise Exception("--priority must be in the range 1..1000.")
+
+ runnerjob = None
+ if runtimeContext.submit:
+ # Submit a runner job to run the workflow for us.
+ if self.work_api == "containers":
+ if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
+ runtimeContext.runnerjob = tool.tool["id"]
+ runnerjob = tool.job(job_order,
+ self.output_callback,
+ runtimeContext).next()
+ else:
+ runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ on_error=runtimeContext.on_error,
+ submit_runner_image=runtimeContext.submit_runner_image,
+ intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
+ merged_map=merged_map,
+ priority=runtimeContext.priority,
+ secret_store=self.secret_store)
+ elif self.work_api == "jobs":
+ runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ on_error=runtimeContext.on_error,
+ submit_runner_image=runtimeContext.submit_runner_image,
+ merged_map=merged_map)
+ elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
+ # Create pipeline for local run
+ self.pipeline = self.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.project_uuid,
+ "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
+ "components": {},
+ "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+ logger.info("Pipeline instance %s", self.pipeline["uuid"])
+
+ if runnerjob and not runtimeContext.wait:
+ submitargs = runtimeContext.copy()
+ submitargs.submit = False
+ runnerjob.run(submitargs)
+ return (runnerjob.uuid, "success")
+
+ current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
+ if current_container:
+ logger.info("Running inside container %s", current_container.get("uuid"))
+
+ self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
+ self.polling_thread = threading.Thread(target=self.poll_states)
+ self.polling_thread.start()
+
+ self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
+
+ try:
+ self.workflow_eval_lock.acquire()
+ if runnerjob:
+ jobiter = iter((runnerjob,))
+ else:
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ runtimeContext)
+
+ # Holds the lock while this code runs and releases it when
+ # it is safe to do so in self.workflow_eval_lock.wait(),
+ # at which point on_message can update job state and
+ # process output callbacks.
+
+ loopperf = Perf(metrics, "jobiter")
+ loopperf.__enter__()
+ for runnable in jobiter:
+ loopperf.__exit__()
+
+ if self.stop_polling.is_set():
+ break
+
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
+
+ if runnable:
+ with Perf(metrics, "run"):
+ self.start_run(runnable, runtimeContext)
+ else:
+ if (self.task_queue.in_flight + len(self.processes)) > 0:
+ self.workflow_eval_lock.wait(3)
+ else:
+ logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
+ break
+ loopperf.__enter__()
+ loopperf.__exit__()
+
+ while (self.task_queue.in_flight + len(self.processes)) > 0:
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
+ self.workflow_eval_lock.wait(3)
+
+ except UnsupportedRequirement:
+ raise
+ except:
+ if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+ logger.error("Interrupted, workflow will be cancelled")
+ else:
+ logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ if runnerjob and runnerjob.uuid and self.work_api == "containers":
+ self.api.container_requests().update(uuid=runnerjob.uuid,
+ body={"priority": "0"}).execute(num_retries=self.num_retries)
+ finally:
+ self.workflow_eval_lock.release()
+ self.task_queue.drain()
+ self.stop_polling.set()
+ self.polling_thread.join()
+ self.task_queue.join()
+
+ if self.final_status == "UnsupportedRequirement":
+ raise UnsupportedRequirement("Check log for details.")
+
+ if self.final_output is None:
+ raise WorkflowException("Workflow did not return a result.")
+
+ if runtimeContext.submit and isinstance(runnerjob, Runner):
+ logger.info("Final output collection %s", runnerjob.final_output)
+ else:
+ if self.output_name is None:
+ self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
+ if self.output_tags is None:
+ self.output_tags = ""
+
+ storage_classes = runtimeContext.storage_classes.strip().split(",")
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
+ self.set_crunch_output()
+
+ if runtimeContext.compute_checksum:
+ adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
+ adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
+ if self.trash_intermediate and self.final_status == "success":
+ self.trash_intermediate_output()
+
+ return (self.final_output, self.final_status)
return super(CollectionFetcher, self).urljoin(base_url, url)
+ schemes = [u"file", u"http", u"https", u"mailto", u"keep"]
+
+ def supported_schemes(self): # type: () -> List[Text]
+ return self.schemes
+
+
workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
import os
import urllib
-from arvados_cwl.util import get_current_container, get_intermediate_collection_info
+import arvados_cwl.util
import arvados.commands.run
import arvados.collection
for l in srcobj.get("listing", []):
self.addentry(l, c, ".", remap)
- container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
- info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
+ container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
c.save_new(name=info["name"],
owner_uuid=self.arvrunner.project_uuid,
num_retries=self.arvrunner.num_retries )
self.addentry(srcobj, c, ".", remap)
- container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
- info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
+ container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
c.save_new(name=info["name"],
owner_uuid=self.arvrunner.project_uuid,
import arvados.collection
import ruamel.yaml as yaml
-from .arvdocker import arv_docker_get_image
+import arvados_cwl.arvdocker
from .pathmapper import ArvPathMapper, trim_listing
from ._version import __version__
from . import done
# TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
else:
- arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
v["location"] = merged_map[cur_id].resolved[v["location"]]
if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+ if v.get("class") == "DockerRequirement":
+ v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+ return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
- return img
+
def upload_workflow_collection(arvrunner, name, packed):
collection = arvados.collection.Collection(api_client=arvrunner.api,
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180806194258',
- 'schema-salad==2.7.20180719125426',
+ 'cwltool==1.0.20181116032456',
+ 'schema-salad==2.7.20181116024232',
'typing >= 3.6.4',
- # Need to limit ruamel.yaml version to 0.15.26 because of bug
- # https://bitbucket.org/ruamel/yaml/issues/227/regression-parsing-flow-mapping
- 'ruamel.yaml >=0.13.11, <= 0.15.26',
+ 'ruamel.yaml >=0.15.54, <=0.15.77',
'arvados-python-client>=1.1.4.20180607143841',
'setuptools',
'ciso8601 >=1.0.6, <2.0.0',
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+Arvados federated workflow testing
+
+Requires cwltool 1.0.20181109150732 or later
+
+Create main-test.json:
+
+{
+ "acr": "/path/to/arvados-cwl-runner",
+ "arvado_api_host_insecure": false,
+ "arvados_api_hosts": [
+ "c97qk.arvadosapi.com",
+ "4xphq.arvadosapi.com",
+ "9tee4.arvadosapi.com"
+ ],
+ "arvados_api_token": "...",
+ "arvados_cluster_ids": [
+ "c97qk",
+ "4xphq",
+ "9tee4"
+ ]
+}
+
+Or create an arvbox test cluster:
+
+$ cwltool --enable-ext arvbox-make-federation.cwl --arvbox_base ~/.arvbox/ --in_acr /path/to/arvados-cwl-runner > main-test.json
+
+
+Run tests:
+
+$ cwltool main.cwl main-test.json
+
+
+List test cases:
+
+$ cwltool --print-targets main.cwl
+
+
+Run a specific test case:
+
+$ cwltool -t twostep-remote-copy-to-home main.cwl main-test.json
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+requirements:
+ ScatterFeatureRequirement: {}
+ StepInputExpressionRequirement: {}
+ cwltool:LoadListingRequirement:
+ loadListing: no_listing
+ InlineJavascriptRequirement: {}
+inputs:
+ containers:
+ type: string[]
+ default: [fedbox1, fedbox2, fedbox3]
+ arvbox_base: Directory
+ in_acr: string?
+ insecure:
+ type: boolean
+ default: true
+outputs:
+ arvados_api_token:
+ type: string
+ outputSource: setup-user/test_user_token
+ arvados_api_hosts:
+ type: string[]
+ outputSource: start/container_host
+ arvados_cluster_ids:
+ type: string[]
+ outputSource: start/cluster_id
+ acr:
+ type: string?
+ outputSource: in_acr
+ arvado_api_host_insecure:
+ type: boolean
+ outputSource: insecure
+steps:
+ mkdir:
+ in:
+ containers: containers
+ arvbox_base: arvbox_base
+ out: [arvbox_data]
+ run: arvbox/mkdir.cwl
+ start:
+ in:
+ container_name: containers
+ arvbox_data: mkdir/arvbox_data
+ out: [cluster_id, container_host, arvbox_data_out, superuser_token]
+ scatter: [container_name, arvbox_data]
+ scatterMethod: dotproduct
+ run: arvbox/start.cwl
+ fed-config:
+ in:
+ container_name: containers
+ this_cluster_id: start/cluster_id
+ cluster_ids: start/cluster_id
+ cluster_hosts: start/container_host
+ arvbox_data: start/arvbox_data_out
+ out: []
+ scatter: [container_name, this_cluster_id, arvbox_data]
+ scatterMethod: dotproduct
+ run: arvbox/fed-config.cwl
+ setup-user:
+ in:
+ container_host: {source: start/container_host, valueFrom: "$(self[0])"}
+ superuser_token: {source: start/superuser_token, valueFrom: "$(self[0])"}
+ out: [test_user_uuid, test_user_token]
+ run: arvbox/setup-user.cwl
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+inputs:
+ container_name: string
+ this_cluster_id: string
+ cluster_ids: string[]
+ cluster_hosts: string[]
+ arvbox_data: Directory
+outputs:
+ arvbox_data_out:
+ type: Directory
+ outputBinding:
+ outputEval: $(inputs.arvbox_data)
+requirements:
+ EnvVarRequirement:
+ envDef:
+ ARVBOX_CONTAINER: $(inputs.container_name)
+ ARVBOX_DATA: $(inputs.arvbox_data.path)
+ InitialWorkDirRequirement:
+ listing:
+ - entryname: cluster_config.yml.override
+ entry: >-
+ ${
+ var remoteClusters = {};
+ for (var i = 0; i < inputs.cluster_ids.length; i++) {
+ remoteClusters[inputs.cluster_ids[i]] = {
+ "Host": inputs.cluster_hosts[i],
+ "Proxy": true,
+ "Insecure": true
+ };
+ }
+ var r = {"Clusters": {}};
+ r["Clusters"][inputs.this_cluster_id] = {"RemoteClusters": remoteClusters};
+ return JSON.stringify(r);
+ }
+ - entryname: application.yml.override
+ entry: >-
+ ${
+ var remoteClusters = {};
+ for (var i = 0; i < inputs.cluster_ids.length; i++) {
+ remoteClusters[inputs.cluster_ids[i]] = inputs.cluster_hosts[i];
+ }
+ return JSON.stringify({"development": {"remote_hosts": remoteClusters}});
+ }
+ cwltool:LoadListingRequirement:
+ loadListing: no_listing
+ ShellCommandRequirement: {}
+ InlineJavascriptRequirement: {}
+ cwltool:InplaceUpdateRequirement:
+ inplaceUpdate: true
+arguments:
+ - shellQuote: false
+ valueFrom: |
+ docker cp cluster_config.yml.override $(inputs.container_name):/var/lib/arvados
+ docker cp application.yml.override $(inputs.container_name):/usr/src/arvados/services/api/config
+ arvbox sv restart api
+ arvbox sv restart controller
+ arvbox sv restart keepstore0
+ arvbox sv restart keepstore1
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+inputs:
+ containers:
+ type:
+ type: array
+ items: string
+ inputBinding:
+ position: 3
+ valueFrom: |
+ ${
+ return "base/"+self;
+ }
+ arvbox_base: Directory
+outputs:
+ arvbox_data:
+ type: Directory[]
+ outputBinding:
+ glob: |
+ ${
+ var r = [];
+ for (var i = 0; i < inputs.containers.length; i++) {
+ r.push("base/"+inputs.containers[i]);
+ }
+ return r;
+ }
+requirements:
+ InitialWorkDirRequirement:
+ listing:
+ - entry: $(inputs.arvbox_base)
+ entryname: base
+ writable: true
+ cwltool:LoadListingRequirement:
+ loadListing: no_listing
+ InlineJavascriptRequirement: {}
+ cwltool:InplaceUpdateRequirement:
+ inplaceUpdate: true
+arguments:
+ - mkdir
+ - "-p"
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+requirements:
+ EnvVarRequirement:
+ envDef:
+ ARVADOS_API_HOST: $(inputs.container_host)
+ ARVADOS_API_TOKEN: $(inputs.superuser_token)
+ ARVADOS_API_HOST_INSECURE: "true"
+ cwltool:LoadListingRequirement:
+ loadListing: no_listing
+ InlineJavascriptRequirement: {}
+ cwltool:InplaceUpdateRequirement:
+ inplaceUpdate: true
+ DockerRequirement:
+ dockerPull: arvados/jobs
+inputs:
+ container_host: string
+ superuser_token: string
+ make_user_script:
+ type: File
+ default:
+ class: File
+ location: setup_user.py
+outputs:
+ test_user_uuid: string
+ test_user_token: string
+arguments: [python2, $(inputs.make_user_script)]
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import arvados.errors
+import time
+import json
+
+while True:
+ try:
+ api = arvados.api()
+ break
+ except arvados.errors.ApiError:
+ time.sleep(2)
+
+existing = api.users().list(filters=[["email", "=", "test@example.com"],
+ ["is_active", "=", True]], limit=1).execute()
+if existing["items"]:
+ u = existing["items"][0]
+else:
+ u = api.users().create(body={
+ 'first_name': 'Test',
+ 'last_name': 'User',
+ 'email': 'test@example.com',
+ 'is_admin': False
+ }).execute()
+ api.users().activate(uuid=u["uuid"]).execute()
+
+tok = api.api_client_authorizations().create(body={
+ "api_client_authorization": {
+ "owner_uuid": u["uuid"]
+ }
+}).execute()
+
+with open("cwl.output.json", "w") as f:
+ json.dump({
+ "test_user_uuid": u["uuid"],
+ "test_user_token": "v2/%s/%s" % (tok["uuid"], tok["api_token"])
+ }, f)
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+inputs:
+ container_name: string
+ arvbox_data: Directory
+outputs:
+ cluster_id:
+ type: string
+ outputBinding:
+ glob: status.txt
+ loadContents: true
+ outputEval: |
+ ${
+ var sp = self[0].contents.split("\n");
+ for (var i = 0; i < sp.length; i++) {
+ if (sp[i].startsWith("Cluster id: ")) {
+ return sp[i].substr(12);
+ }
+ }
+ }
+ container_host:
+ type: string
+ outputBinding:
+ glob: status.txt
+ loadContents: true
+ outputEval: |
+ ${
+ var sp = self[0].contents.split("\n");
+ for (var i = 0; i < sp.length; i++) {
+ if (sp[i].startsWith("Container IP: ")) {
+ return sp[i].substr(14)+":8000";
+ }
+ }
+ }
+ superuser_token:
+ type: string
+ outputBinding:
+ glob: superuser_token.txt
+ loadContents: true
+ outputEval: $(self[0].contents.trim())
+ arvbox_data_out:
+ type: Directory
+ outputBinding:
+ outputEval: $(inputs.arvbox_data)
+requirements:
+ EnvVarRequirement:
+ envDef:
+ ARVBOX_CONTAINER: $(inputs.container_name)
+ ARVBOX_DATA: $(inputs.arvbox_data.path)
+ ShellCommandRequirement: {}
+ InitialWorkDirRequirement:
+ listing:
+ - entry: $(inputs.arvbox_data)
+ entryname: $(inputs.container_name)
+ writable: true
+ cwltool:InplaceUpdateRequirement:
+ inplaceUpdate: true
+ InlineJavascriptRequirement: {}
+arguments:
+ - shellQuote: false
+ valueFrom: |
+ set -e
+ arvbox start dev
+ arvbox status > status.txt
+ arvbox cat /var/lib/arvados/superuser_token > superuser_token.txt
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+inputs:
+ container_name: string
+outputs: []
+requirements:
+ EnvVarRequirement:
+ envDef:
+ ARVBOX_CONTAINER: $(inputs.container_name)
+arguments: [arvbox, stop]
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:base-case
+inputs:
+ inp:
+ type: File
+ inputBinding: {}
+ runOnCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: runOnCluster
+ out: [hash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: md5sum.cwl
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+inputs:
+ inp:
+ type: File[]
+ inputBinding: {}
+outputs:
+ joined: stdout
+stdout: joined.txt
+baseCommand: cat
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:hint-on-tool
+inputs:
+ inp:
+ type: File
+ inputBinding: {}
+ runOnCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: runOnCluster
+ out: [hash]
+ run: md5sum-tool-hint.cwl
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:hint-on-wf
+hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+inputs:
+ inp:
+ type: File
+ inputBinding: {}
+ runOnCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ out: [hash]
+ run: md5sum.cwl
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+inputs:
+ inp: File
+ runOnCluster: string
+outputs:
+ hash:
+ type: File
+ outputBinding:
+ glob: out.txt
+stdin: $(inputs.inp.path)
+stdout: out.txt
+arguments: ["md5sum", "-"]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+inputs:
+ inp:
+ type: File
+outputs:
+ hash:
+ type: File
+ outputBinding:
+ glob: out.txt
+stdin: $(inputs.inp.path)
+stdout: out.txt
+arguments: ["md5sum", "-"]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:remote-case
+inputs:
+ inp:
+ type: File
+ inputBinding: {}
+ runOnCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: runOnCluster
+ out: [hash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: md5sum.cwl
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ ShellCommandRequirement: {}
+inputs:
+ inp:
+ type: File
+outputs:
+ original:
+ type: File
+ outputBinding:
+ glob: $(inputs.inp.basename)
+ revhash:
+ type: stdout
+stdout: rev-$(inputs.inp.basename)
+arguments:
+ - shellQuote: false
+ valueFrom: |
+ ln -s $(inputs.inp.path) $(inputs.inp.basename) &&
+ rev $(inputs.inp.basename)
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+inputs:
+ inp:
+ type: File
+outputs:
+ revhash:
+ type: File
+ outputBinding:
+ glob: out.txt
+stdout: out.txt
+arguments: [rev, $(inputs.inp)]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:runner-home-step-remote
+inputs:
+ inp: File
+ runOnCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: runOnCluster
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ out: [hash]
+ run: md5sum.cwl
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:runner-remote-step-home
+inputs:
+ inp: File
+ runOnCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: runOnCluster
+ out: [hash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: md5sum.cwl
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:scatter-gather
+ ScatterFeatureRequirement: {}
+inputs:
+ shards: File[]
+ clusters: string[]
+outputs:
+ joined:
+ type: File
+ outputSource: cat/joined
+steps:
+ md5sum:
+ in:
+ inp: shards
+ runOnCluster: clusters
+ scatter: [inp, runOnCluster]
+ scatterMethod: dotproduct
+ out: [hash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: md5sum.cwl
+ cat:
+ in:
+ inp: md5sum/hash
+ out: [joined]
+ run: cat.cwl
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:threestep-remote
+ ScatterFeatureRequirement: {}
+inputs:
+ inp: File
+ clusterA: string
+ clusterB: string
+ clusterC: string
+outputs:
+ revhash:
+ type: File
+ outputSource: revC/revhash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: clusterA
+ out: [hash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: md5sum.cwl
+ revB:
+ in:
+ inp: md5sum/hash
+ runOnCluster: clusterB
+ out: [revhash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: rev-input-to-output.cwl
+ revC:
+ in:
+ inp: revB/revhash
+ runOnCluster: clusterC
+ out: [revhash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: rev-input-to-output.cwl
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:twostep-both-remote
+inputs:
+ inp:
+ type: File
+ inputBinding: {}
+ md5sumCluster: string
+ revCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: md5sumCluster
+ out: [hash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: md5sum.cwl
+ rev:
+ in:
+ inp: md5sum/hash
+ runOnCluster: revCluster
+ out: [revhash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: rev.cwl
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:twostep-home-to-remote
+inputs:
+ inp:
+ type: File
+ inputBinding: {}
+ md5sumCluster: string
+ revCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: md5sumCluster
+ out: [hash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: md5sum.cwl
+ rev:
+ in:
+ inp: md5sum/hash
+ runOnCluster: revCluster
+ out: [revhash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: rev.cwl
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:twostep-remote-copy-to-home
+inputs:
+ inp:
+ type: File
+ inputBinding: {}
+ md5sumCluster: string
+ revCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: md5sumCluster
+ out: [hash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: md5sum.cwl
+ rev:
+ in:
+ inp: md5sum/hash
+ runOnCluster: revCluster
+ out: [revhash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: rev-input-to-output.cwl
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ InlineJavascriptRequirement: {}
+ DockerRequirement:
+ dockerPull: arvados/fed-test:twostep-remote-to-home
+inputs:
+ inp:
+ type: File
+ inputBinding: {}
+ md5sumCluster: string
+ revCluster: string
+outputs:
+ hash:
+ type: File
+ outputSource: md5sum/hash
+steps:
+ md5sum:
+ in:
+ inp: inp
+ runOnCluster: md5sumCluster
+ out: [hash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: md5sum.cwl
+ rev:
+ in:
+ inp: md5sum/hash
+ runOnCluster: revCluster
+ out: [revhash]
+ hints:
+ arv:ClusterTarget:
+ cluster_id: $(inputs.runOnCluster)
+ run: rev.cwl
--- /dev/null
+Call me base-case. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me hint-on-tool. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me hint-on-wf. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me remote-case. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me runner-home-step-remote. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me runner-remote-step-home. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me scatter-gather-s1. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me scatter-gather-s2. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me scatter-gather-s3. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me threestep-remote. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me twostep-both-remote. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me twostep-home-to-remote. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me twostep-remote-copy-to-home. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+Call me twostep-remote-to-home. Some years ago--never mind how long precisely--having
+little or no money in my purse, and nothing particular to interest me on
+shore, I thought I would sail about a little and see the watery part of
+the world. It is a way I have of driving off the spleen and regulating
+the circulation. Whenever I find myself growing grim about the mouth;
+whenever it is a damp, drizzly November in my soul; whenever I find
+myself involuntarily pausing before coffin warehouses, and bringing up
+the rear of every funeral I meet; and especially whenever my hypos get
+such an upper hand of me, that it requires a strong moral principle to
+prevent me from deliberately stepping into the street, and methodically
+knocking people's hats off--then, I account it high time to get to sea
+as soon as I can. This is my substitute for pistol and ball. With a
+philosophical flourish Cato throws himself upon his sword; I quietly
+take to the ship. There is nothing surprising in this. If they but knew
+it, almost all men in their degree, some time or other, cherish very
+nearly the same feelings towards the ocean with me.
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+ InitialWorkDirRequirement:
+ listing:
+ - entryname: config.json
+ entry: |-
+ ${
+ return JSON.stringify({
+ check_collections: inputs.check_collections
+ });
+ }
+ EnvVarRequirement:
+ envDef:
+ ARVADOS_API_HOST: $(inputs.arvados_api_host)
+ ARVADOS_API_TOKEN: $(inputs.arvados_api_token)
+ ARVADOS_API_HOST_INSECURE: $(""+inputs.arvado_api_host_insecure)
+ InlineJavascriptRequirement: {}
+inputs:
+ arvados_api_token: string
+ arvado_api_host_insecure: boolean
+ arvados_api_host: string
+ check_collections: string[]
+ preparescript:
+ type: File
+ default:
+ class: File
+ location: check_exist.py
+ inputBinding:
+ position: 1
+outputs:
+ success:
+ type: boolean
+ outputBinding:
+ glob: success
+ loadContents: true
+ outputEval: $(self[0].contents=="true")
+baseCommand: python2
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import json
+
+api = arvados.api()
+
+with open("config.json") as f:
+ config = json.load(f)
+
+success = True
+for c in config["check_collections"]:
+ try:
+ api.collections().get(uuid=c).execute()
+ except Exception as e:
+ print("Checking for %s got exception %s" % (c, e))
+ success = False
+
+with open("success", "w") as f:
+ if success:
+ f.write("true")
+ else:
+ f.write("false")
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+inputs:
+ testcase: string
+outputs:
+ imagename:
+ type: string
+ outputBinding:
+ outputEval: $(inputs.testcase)
+requirements:
+ InitialWorkDirRequirement:
+ listing:
+ - entryname: Dockerfile
+ entry: |-
+ FROM debian@sha256:0a5fcee6f52d5170f557ee2447d7a10a5bdcf715dd7f0250be0b678c556a501b
+ LABEL org.arvados.testcase="$(inputs.testcase)"
+arguments: [docker, build, -t, $(inputs.testcase), "."]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+ InitialWorkDirRequirement:
+ listing:
+ - entryname: input.json
+ entry: $(JSON.stringify(inputs.obj))
+ - entryname: config.json
+ entry: |-
+ ${
+ return JSON.stringify({
+ arvados_cluster_ids: inputs.arvados_cluster_ids,
+ scrub_images: [inputs.scrub_image],
+ scrub_collections: inputs.scrub_collections
+ });
+ }
+ EnvVarRequirement:
+ envDef:
+ ARVADOS_API_HOST: $(inputs.arvados_api_host)
+ ARVADOS_API_TOKEN: $(inputs.arvados_api_token)
+ ARVADOS_API_HOST_INSECURE: $(""+inputs.arvado_api_host_insecure)
+ InlineJavascriptRequirement: {}
+inputs:
+ arvados_api_token: string
+ arvado_api_host_insecure: boolean
+ arvados_api_host: string
+ arvados_cluster_ids: string[]
+ wf: File
+ obj: Any
+ scrub_image: string
+ scrub_collections: string[]
+ preparescript:
+ type: File
+ default:
+ class: File
+ location: prepare.py
+ inputBinding:
+ position: 1
+outputs:
+ done:
+ type: boolean
+ outputBinding:
+ outputEval: $(true)
+baseCommand: python2
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import json
+
+api = arvados.api()
+
+with open("config.json") as f:
+ config = json.load(f)
+
+scrub_collections = set(config["scrub_collections"])
+
+for cluster_id in config["arvados_cluster_ids"]:
+ images = []
+ for scrub_image in config["scrub_images"]:
+ sp = scrub_image.split(":")
+ image_name = sp[0]
+ image_tag = sp[1] if len(sp) > 1 else "latest"
+ images.append('{}:{}'.format(image_name, image_tag))
+
+ search_links = api.links().list(
+ filters=[['link_class', '=', 'docker_image_repo+tag'],
+ ['name', 'in', images]],
+ cluster_id=cluster_id).execute()
+
+ head_uuids = [lk["head_uuid"] for lk in search_links["items"]]
+ cols = api.collections().list(filters=[["uuid", "in", head_uuids]],
+ cluster_id=cluster_id).execute()
+ for c in cols["items"]:
+ scrub_collections.add(c["portable_data_hash"])
+ for lk in search_links["items"]:
+ api.links().delete(uuid=lk["uuid"]).execute()
+
+for cluster_id in config["arvados_cluster_ids"]:
+ matches = api.collections().list(filters=[["portable_data_hash", "in", list(scrub_collections)]],
+ select=["uuid", "portable_data_hash"], cluster_id=cluster_id).execute()
+ for m in matches["items"]:
+ api.collections().delete(uuid=m["uuid"]).execute()
+ print("Scrubbed %s (%s)" % (m["uuid"], m["portable_data_hash"]))
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: CommandLineTool
+inputs:
+ acr:
+ type: string?
+ default: arvados-cwl-runner
+ inputBinding:
+ position: 1
+ arvados_api_host: string
+ arvados_api_token: string
+ arvado_api_host_insecure:
+ type: boolean
+ default: false
+ runner_cluster:
+ type: string?
+ inputBinding:
+ prefix: --submit-runner-cluster
+ position: 2
+ wf:
+ type: File
+ inputBinding:
+ position: 3
+ obj: Any
+requirements:
+ InitialWorkDirRequirement:
+ listing:
+ - entryname: input.json
+ entry: $(JSON.stringify(inputs.obj))
+ EnvVarRequirement:
+ envDef:
+ ARVADOS_API_HOST: $(inputs.arvados_api_host)
+ ARVADOS_API_TOKEN: $(inputs.arvados_api_token)
+ ARVADOS_API_HOST_INSECURE: $(""+inputs.arvado_api_host_insecure)
+ InlineJavascriptRequirement: {}
+outputs:
+ out:
+ type: Any
+ outputBinding:
+ glob: output.json
+ loadContents: true
+ #outputEval: $(JSON.parse(self[0].contents))
+ outputEval: $(self[0].contents)
+stdout: output.json
+arguments:
+ - valueFrom: --disable-reuse
+ position: 2
+ - valueFrom: --always-submit-runner
+ position: 2
+ - valueFrom: --api=containers
+ position: 2
+ - valueFrom: input.json
+ position: 4
\ No newline at end of file
--- /dev/null
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+hints:
+ cwltool:Secrets:
+ secrets: [arvados_api_token]
+requirements:
+ StepInputExpressionRequirement: {}
+ InlineJavascriptRequirement: {}
+ SubworkflowFeatureRequirement: {}
+inputs:
+ arvados_api_token: string
+ arvado_api_host_insecure:
+ type: boolean
+ default: false
+ arvados_api_hosts: string[]
+ arvados_cluster_ids: string[]
+ acr: string?
+ wf: File
+ obj: Any
+ scrub_image: string
+ scrub_collections: string[]
+ runner_cluster: string?
+outputs:
+ out:
+ type: Any
+ outputSource: run-acr/out
+ success:
+ type: boolean
+ outputSource: check-result/success
+steps:
+ dockerbuild:
+ in:
+ testcase: scrub_image
+ out: [imagename]
+ run: dockerbuild.cwl
+ prepare:
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_host: {source: arvados_api_hosts, valueFrom: "$(self[0])"}
+ arvados_cluster_ids: arvados_cluster_ids
+ wf: wf
+ obj: obj
+ scrub_image: scrub_image
+ scrub_collections: scrub_collections
+ out: [done]
+ run: prepare.cwl
+ run-acr:
+ in:
+ prepare: prepare/done
+ image-ready: dockerbuild/imagename
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_host: {source: arvados_api_hosts, valueFrom: "$(self[0])"}
+ runner_cluster: runner_cluster
+ acr: acr
+ wf: wf
+ obj: obj
+ out: [out]
+ run: run-acr.cwl
+ check-result:
+ in:
+ acr-done: run-acr/out
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_host: {source: arvados_api_hosts, valueFrom: "$(self[0])"}
+ check_collections: scrub_collections
+ out: [success]
+ run: check-exist.cwl
\ No newline at end of file
--- /dev/null
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+hints:
+ cwltool:Secrets:
+ secrets: [arvados_api_token]
+requirements:
+ StepInputExpressionRequirement: {}
+ InlineJavascriptRequirement: {}
+ SubworkflowFeatureRequirement: {}
+inputs:
+ arvados_api_token: string
+ arvado_api_host_insecure:
+ type: boolean
+ default: false
+ arvados_api_hosts: string[]
+ arvados_cluster_ids: string[]
+ acr: string?
+ testcases:
+ type: string[]
+ default:
+ - base-case
+ - runner-home-step-remote
+ - runner-remote-step-home
+outputs:
+ base-case-success:
+ type: Any
+ outputSource: base-case/success
+ runner-home-step-remote-success:
+ type: Any
+ outputSource: runner-home-step-remote/success
+ runner-remote-step-home-success:
+ type: Any
+ outputSource: runner-remote-step-home/success
+ remote-case-success:
+ type: Any
+ outputSource: remote-case/success
+ twostep-home-to-remote-success:
+ type: Any
+ outputSource: twostep-home-to-remote/success
+ twostep-remote-to-home-success:
+ type: Any
+ outputSource: twostep-remote-to-home/success
+ twostep-both-remote-success:
+ type: Any
+ outputSource: twostep-both-remote/success
+ twostep-remote-copy-to-home-success:
+ type: Any
+ outputSource: twostep-remote-copy-to-home/success
+ scatter-gather-success:
+ type: Any
+ outputSource: scatter-gather/success
+ threestep-remote-success:
+ type: Any
+ outputSource: threestep-remote/success
+ hint-on-wf-success:
+ type: Any
+ outputSource: hint-on-wf/success
+ hint-on-tool-success:
+ type: Any
+ outputSource: hint-on-tool/success
+
+steps:
+ base-case:
+ doc: |
+ Base case (no federation), single step workflow with both the
+ runner and step on the same cluster.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/base-case.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/base-case-input.txt
+ valueFrom: |-
+ ${
+ self["runOnCluster"] = inputs.arvados_cluster_ids[0];
+ return self;
+ }
+ scrub_image: {default: "arvados/fed-test:base-case"}
+ scrub_collections:
+ default:
+ - 031a4ced0aa99de90fb630568afc6e9b+67 # input collection
+ - eb93a6718eb1a1a8ee9f66ee7d683472+51 # md5sum output collection
+ - f654d4048612135f4a5e7707ec0fcf3e+112 # final output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ runner-home-step-remote:
+ doc: |
+ Single step workflow with the runner on the home cluster and the
+ step on the remote cluster. ClusterTarget hint is on the workflow step.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/runner-home-step-remote.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/runner-home-step-remote-input.txt
+ valueFrom: |-
+ ${
+ self["runOnCluster"] = inputs.arvados_cluster_ids[1];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+ scrub_image: {default: "arvados/fed-test:runner-home-step-remote"}
+ scrub_collections:
+ default:
+ - 3bc373e38751fe13dcbd62778d583242+81 # input collection
+ - 428e6d91e41a3af3ae287b453949e7fd+51 # md5sum output collection
+ - a4b0ddd866525655e8480f83a1ca83c6+112 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ runner-remote-step-home:
+ doc: |
+ Single step workflow with the runner on the remote cluster and the
+ step on the home cluster.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/runner-remote-step-home.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/runner-remote-step-home-input.txt
+ valueFrom: |-
+ ${
+ self["runOnCluster"] = inputs.arvados_cluster_ids[0];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[1])" }
+ scrub_image: {default: "arvados/fed-test:runner-remote-step-home"}
+ scrub_collections:
+ default:
+ - 25fe10d8e8530329a738de69d9bc8ab5+81 # input collection
+ - 7f052d1a04b851b6f73fba77c7802e1d+51 # md5sum output collection
+ - ecb639201f454b6493757f5117f540df+112 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ remote-case:
+ doc: |
+ Single step workflow with both the runner and the step on the
+ remote cluster.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/remote-case.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/remote-case-input.txt
+ valueFrom: |-
+ ${
+ self["runOnCluster"] = inputs.arvados_cluster_ids[1];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[1])" }
+ scrub_image: {default: "arvados/fed-test:remote-case"}
+ scrub_collections:
+ default:
+ - fccd49fdef8e452295f718208abafd88+69 # input collection
+ - 58c0e8ea6b148134ef8577ee11307eec+51 # md5sum output collection
+ - 1fd679c5ab64c123b9764024dbf560f0+112 # final output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ twostep-home-to-remote:
+ doc: |
+ Two step workflow. The runner is on the home cluster, the first
+ step is on the home cluster, the second step is on the remote
+ cluster.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/twostep-home-to-remote.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ - class: File
+ location: cases/rev.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/twostep-home-to-remote.txt
+ valueFrom: |-
+ ${
+ self["md5sumCluster"] = inputs.arvados_cluster_ids[0];
+ self["revCluster"] = inputs.arvados_cluster_ids[1];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+ scrub_image: {default: "arvados/fed-test:twostep-home-to-remote"}
+ scrub_collections:
+ default:
+ - 268a54947fb75115cfe05bb54cc62c30+74 # input collection
+ - 400f03b8c5d2dc3dcb513a21b626ef88+51 # md5sum output collection
+ - 3738166916ca5f6f6ad12bf7e06b4a21+51 # rev output collection
+ - bc37c17a37aa25229e5de1339b27fbcc+112 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ twostep-remote-to-home:
+ doc: |
+ Two step workflow. The runner is on the home cluster, the first
+ step is on the remote cluster, the second step is on the home
+ cluster.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/twostep-remote-to-home.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ - class: File
+ location: cases/rev.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/twostep-remote-to-home.txt
+ valueFrom: |-
+ ${
+ self["md5sumCluster"] = inputs.arvados_cluster_ids[1];
+ self["revCluster"] = inputs.arvados_cluster_ids[0];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+ scrub_image: {default: "arvados/fed-test:twostep-remote-to-home"}
+ scrub_collections:
+ default:
+ - cce89b9f7b6e163978144051ce5f071a+74 # input collection
+ - 0c358c3af63644c6343766feff1b7238+51 # md5sum output collection
+ - 33fb7d512bf21f04847eca58cea46e74+51 # rev output collection
+ - 912e04aa3db04aba008cf5cd46c277b2+112 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ twostep-both-remote:
+ doc: |
+ Two step workflow. The runner is on the home cluster, both steps are
+ on the remote cluster.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/twostep-both-remote.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ - class: File
+ location: cases/rev.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/twostep-both-remote.txt
+ valueFrom: |-
+ ${
+ self["md5sumCluster"] = inputs.arvados_cluster_ids[1];
+ self["revCluster"] = inputs.arvados_cluster_ids[1];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+ scrub_image: {default: "arvados/fed-test:twostep-both-remote"}
+ scrub_collections:
+ default:
+ - 3c5e39939cf197d304ac1eac20841238+71 # input collection
+ - 3edb99aa607731593969cdab663d65b4+51 # md5sum output collection
+ - a91625b7139e60fe61a88cae42fbee13+51 # rev output collection
+ - ddfa58a81953dad08436d571615dd584+112 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ twostep-remote-copy-to-home:
+ doc: |
+ Two step workflow. The runner is on the home cluster, the first
+ step is on the remote cluster, the second step is on the home
+ cluster, and propagates its input file directly from input to
+ output by symlinking the input file in the output directory.
+ Tests that crunch-run will copy blocks from remote to local
+ when preparing output collection.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/twostep-remote-copy-to-home.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ - class: File
+ location: cases/rev-input-to-output.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/twostep-remote-copy-to-home.txt
+ valueFrom: |-
+ ${
+ self["md5sumCluster"] = inputs.arvados_cluster_ids[1];
+ self["revCluster"] = inputs.arvados_cluster_ids[0];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+ scrub_image: {default: "arvados/fed-test:twostep-remote-copy-to-home"}
+ scrub_collections:
+ default:
+ - 538887bc29a3098bf79abdb8536d17bd+79 # input collection
+ - 14da0e0d52d7ab2945427074b275e9ee+51 # md5sum output collection
+ - 2d3a4a840077390a0d7788f169eaba89+112 # rev output collection
+ - 2d3a4a840077390a0d7788f169eaba89+112 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ scatter-gather:
+ doc: ""
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/scatter-gather.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ - class: File
+ location: cases/cat.cwl
+ obj:
+ default:
+ shards:
+ - class: File
+ location: data/scatter-gather-s1.txt
+ - class: File
+ location: data/scatter-gather-s2.txt
+ - class: File
+ location: data/scatter-gather-s3.txt
+ valueFrom: |-
+ ${
+ self["clusters"] = inputs.arvados_cluster_ids;
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+ scrub_image: {default: "arvados/fed-test:scatter-gather"}
+ scrub_collections:
+ default:
+ - 99cc18329bce1b4a5fe6c4cf60477668+209 # input collection
+ - 2e570e844e03c7027baad148642d726f+51 # s1 md5sum output collection
+ - 61c88ee7811d0b849b5c06376eb065a6+51 # s2 md5sum output collection
+ - 85aaf18d638045fe609e025d3a319b2a+51 # s3 md5sum output collection
+ - ec44bcba77e65128f1a8f843d881ede4+56 # cat output collection
+ - 89de265942800ae36549109969940363+117 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ threestep-remote:
+ doc: ""
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/threestep-remote.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ - class: File
+ location: cases/rev-input-to-output.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/threestep-remote.txt
+ valueFrom: |-
+ ${
+ self["clusterA"] = inputs.arvados_cluster_ids[0];
+ self["clusterB"] = inputs.arvados_cluster_ids[1];
+ self["clusterC"] = inputs.arvados_cluster_ids[2];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+ scrub_image: {default: "arvados/fed-test:threestep-remote"}
+ scrub_collections:
+ default:
+ - 9fbf33e62876357fe134f619865cc5a5+68 # input collection
+ - 210c5f2a716f6689b04316acd4928c10+51 # md5sum output collection
+ - 3abea7506269d5ebf61fb17c78bbd2af+105 # revB output
+ - 9e1b3acb28949759ad07e4c9740bbaa5+113 # revC output
+ - 8c86dbec7de7948871b5e168ede417e1+120 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ hint-on-wf:
+ doc: |
+ Single step workflow with the runner on the home cluster and the
+ step on the remote cluster. ClusterTarget hint is at the workflow level.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/hint-on-wf.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/hint-on-wf.txt
+ valueFrom: |-
+ ${
+ self["runOnCluster"] = inputs.arvados_cluster_ids[1];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+ scrub_image: {default: "arvados/fed-test:hint-on-wf"}
+ scrub_collections:
+ default:
+ - 862433f328041b2525c90b1dc3c462fd+62 # input collection
+ - 9a68b0b9720977faba8a28e75a4398b7+51 # md5sum output collection
+ - 6a601cddb36ee2f766783b1aa9ff8d66+112 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
+
+ hint-on-tool:
+ doc: |
+ Single step workflow with the runner on the home cluster and the
+ step on the remote cluster. ClusterTarget hint is at the tool level.
+ in:
+ arvados_api_token: arvados_api_token
+ arvado_api_host_insecure: arvado_api_host_insecure
+ arvados_api_hosts: arvados_api_hosts
+ arvados_cluster_ids: arvados_cluster_ids
+ acr: acr
+ wf:
+ default:
+ class: File
+ location: cases/hint-on-tool.cwl
+ secondaryFiles:
+ - class: File
+ location: cases/md5sum-tool-hint.cwl
+ obj:
+ default:
+ inp:
+ class: File
+ location: data/hint-on-tool.txt
+ valueFrom: |-
+ ${
+ self["runOnCluster"] = inputs.arvados_cluster_ids[1];
+ return self;
+ }
+ runner_cluster: { valueFrom: "$(inputs.arvados_cluster_ids[0])" }
+ scrub_image: {default: "arvados/fed-test:hint-on-tool"}
+ scrub_collections:
+ default:
+ - 6803004a4f8db9f8d1d54f6229851599+64 # input collection
+ - cacb0d56235564b5ff485c5b31215ab5+51 # md5sum output collection
+ - 2b50af43fdd84a9e906be2d54b92cddf+112 # runner output json
+ out: [out, success]
+ run: framework/testcase.cwl
import arvados_cwl
import arvados_cwl.context
+import arvados_cwl.util
from arvados_cwl.arvdocker import arv_docker_clear_cache
import copy
import arvados.config
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
- 'container_image': 'arvados/jobs',
+ 'container_image': '99999999999999999999999999999993+99',
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 7200,
- 'container_image': 'arvados/jobs',
+ 'container_image': '99999999999999999999999999999993+99',
'command': ['ls'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
- 'container_image': 'arvados/jobs',
+ 'container_image': '99999999999999999999999999999993+99',
'command': ['ls'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
- 'container_image': 'arvados/jobs',
+ 'container_image': '99999999999999999999999999999993+99',
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
col().open.return_value = []
+ loadingContext, runtimeContext = self.helper(runner)
+
arvjob = arvados_cwl.ArvadosContainer(runner,
+ runtimeContext,
mock.MagicMock(),
{},
None,
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
- @mock.patch("arvados_cwl.get_current_container")
+ @mock.patch("arvados_cwl.util.get_current_container")
@mock.patch("arvados.collection.CollectionReader")
@mock.patch("arvados.collection.Collection")
def test_child_failure(self, col, reader, gcc_mock):
# Set up runner with mocked runtime_status_update()
self.assertFalse(gcc_mock.called)
runtime_status_update = mock.MagicMock()
- arvados_cwl.ArvCwlRunner.runtime_status_update = runtime_status_update
- runner = arvados_cwl.ArvCwlRunner(api)
+ arvados_cwl.ArvCwlExecutor.runtime_status_update = runtime_status_update
+ runner = arvados_cwl.ArvCwlExecutor(api)
self.assertEqual(runner.work_api, 'containers')
- # Make sure ArvCwlRunner thinks it's running inside a container so it
+ # Make sure ArvCwlExecutor thinks it's running inside a container so it
# adds the logging handler that will call runtime_status_update() mock
gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
self.assertTrue(gcc_mock.called)
col().open.return_value = []
+ loadingContext, runtimeContext = self.helper(runner)
+
arvjob = arvados_cwl.ArvadosContainer(runner,
+ runtimeContext,
mock.MagicMock(),
{},
None,
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
- 'container_image': 'arvados/jobs',
+ 'container_image': '99999999999999999999999999999994+99',
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
- 'container_image': 'arvados/jobs',
+ 'container_image': '99999999999999999999999999999993+99',
'command': ['md5sum', 'example.conf'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
import arvados
import arvados_cwl
+import arvados_cwl.executor
import cwltool.process
from arvados.errors import ApiError
from schema_salad.ref_resolver import Loader
api = mock.MagicMock()
api._rootDesc = get_rootDesc()
- runner = arvados_cwl.ArvCwlRunner(api)
+ runner = arvados_cwl.executor.ArvCwlExecutor(api)
self.assertEqual(runner.work_api, 'jobs')
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
api = mock.MagicMock()
api._rootDesc = get_rootDesc()
- runner = arvados_cwl.ArvCwlRunner(api)
+ runner = arvados_cwl.executor.ArvCwlExecutor(api)
self.assertEqual(runner.work_api, 'jobs')
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
api = mock.MagicMock()
api._rootDesc = copy.deepcopy(get_rootDesc())
del api._rootDesc.get('resources')['jobs']['methods']['create']
- runner = arvados_cwl.ArvCwlRunner(api)
+ runner = arvados_cwl.executor.ArvCwlExecutor(api)
self.assertEqual(runner.work_api, 'containers')
import arvados
import arvados_cwl
+import arvados_cwl.executor
from .mock_discovery import get_rootDesc
class TestMakeOutput(unittest.TestCase):
@mock.patch("arvados.collection.CollectionReader")
def test_make_output_collection(self, reader, col):
keep_client = mock.MagicMock()
- runner = arvados_cwl.ArvCwlRunner(self.api, keep_client=keep_client)
+ runner = arvados_cwl.executor.ArvCwlExecutor(self.api, keep_client=keep_client)
runner.project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
final = mock.MagicMock()
import arvados.keep
import arvados.collection
import arvados_cwl
+import arvados_cwl.executor
from cwltool.pathmapper import MapperEnt
from .mock_discovery import get_rootDesc
def test_keepref(self):
"""Test direct keep references."""
- arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+ arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
p = ArvPathMapper(arvrunner, [{
"class": "File",
def test_upload(self, statfile, upl):
"""Test pathmapper uploading files."""
- arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+ arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
def statfile_mock(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False):
st = arvados.commands.run.UploadFile("", "tests/hw.py")
@mock.patch("arvados.commands.run.statfile")
def test_statfile(self, statfile, upl):
"""Test pathmapper handling ArvFile references."""
- arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+ arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
# An ArvFile object returned from arvados.commands.run.statfile means the file is located on a
# keep mount, so we can construct a direct reference directly without upload.
@mock.patch("os.stat")
def test_missing_file(self, stat):
"""Test pathmapper handling missing references."""
- arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+ arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
stat.side_effect = OSError(2, "No such file or directory")
import arvados
import arvados.collection
import arvados_cwl
+import arvados_cwl.executor
import arvados_cwl.runner
import arvados.keep
keep_client2.put.side_effect = putstub
stubs.keep_client = keep_client2
- stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ stubs.docker_images = {
+ "arvados/jobs:"+arvados_cwl.__version__: [("zzzzz-4zz18-zzzzzzzzzzzzzd3", "")],
+ "debian:8": [("zzzzz-4zz18-zzzzzzzzzzzzzd4", "")],
+ "arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", "")],
+ "arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", "")],
+ }
+ def kd(a, b, image_name=None, image_tag=None):
+ return stubs.docker_images.get("%s:%s" % (image_name, image_tag), [])
+ stubs.keepdocker.side_effect = kd
+
stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
stubs.fake_container_uuid = "zzzzz-dz642-zzzzzzzzzzzzzzz"
def collection_createstub(created_collections, body, ensure_unique_name=None):
mt = body["manifest_text"]
- uuid = "zzzzz-4zz18-zzzzzzzzzzzzzz%d" % len(created_collections)
+ uuid = "zzzzz-4zz18-zzzzzzzzzzzzzx%d" % len(created_collections)
pdh = "%s+%i" % (hashlib.md5(mt).hexdigest(), len(mt))
created_collections[uuid] = {
"uuid": uuid,
"uuid": "",
"portable_data_hash": "99999999999999999999999999999994+99",
"manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd3": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd3",
+ "portable_data_hash": "999999999999999999999999999999d3+99",
+ "manifest_text": ""
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd4": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd4",
+ "portable_data_hash": "999999999999999999999999999999d4+99",
+ "manifest_text": ""
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd5": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd5",
+ "portable_data_hash": "999999999999999999999999999999d5+99",
+ "manifest_text": ""
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd6": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd6",
+ "portable_data_hash": "999999999999999999999999999999d6+99",
+ "manifest_text": ""
}
}
stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
}
stubs.expect_job_spec = {
'runtime_constraints': {
- 'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'docker_image': '999999999999999999999999999999d3+99',
'min_ram_mb_per_node': 1024
},
'script_parameters': {
}],
'class': 'Directory'
},
- 'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main'
+ 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main'
},
'repository': 'arvados',
'script_version': 'master',
'owner_uuid': None,
"components": {
"cwl-runner": {
- 'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__, 'min_ram_mb_per_node': 1024},
+ 'runtime_constraints': {'docker_image': '999999999999999999999999999999d3+99', 'min_ram_mb_per_node': 1024},
'script_parameters': {
'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
'x': {"value": {
'size': 0
}
]}},
- 'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main',
+ 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main',
'arv:debug': True,
'arv:enable_reuse': True,
'arv:on_error': 'continue'
'--enable-reuse', '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
- 'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'container_image': '999999999999999999999999999999d3+99',
'output_path': '/var/spool/cwl',
'cwd': '/var/spool/cwl',
'runtime_constraints': {
class TestSubmit(unittest.TestCase):
- @mock.patch("arvados_cwl.runner.arv_docker_get_image")
+ @mock.patch("arvados_cwl.arvdocker.arv_docker_get_image")
@mock.patch("time.sleep")
@stubs
def test_submit(self, stubs, tm, arvdock):
+ def get_image(api_client, dockerRequirement, pull_image, project_uuid):
+ if dockerRequirement["dockerPull"] == 'arvados/jobs:'+arvados_cwl.__version__:
+ return '999999999999999999999999999999d3+99'
+ elif dockerRequirement["dockerPull"] == "debian:8":
+ return '999999999999999999999999999999d4+99'
+ arvdock.side_effect = get_image
+
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=jobs", "--debug",
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
- '. 61df2ed9ee3eb7dd9b799e5ca35305fa+1217 0:1217:workflow.cwl\n',
+ ". 68089141fbf7e020ac90a9d6a575bc8f+1312 0:1312:workflow.cwl\n",
'replication_desired': None,
'name': 'submit_wf.cwl',
}), ensure_unique_name=True) ])
arvdock.assert_has_calls([
mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8"}, True, None),
+ mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8", 'http://arvados.org/cwl#dockerCollectionPDH': '999999999999999999999999999999d4+99'}, True, None),
mock.call(stubs.api, {'dockerPull': 'arvados/jobs:'+arvados_cwl.__version__}, True, None)
])
@mock.patch("arvados_cwl.task_queue.TaskQueue")
@mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
- @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+ @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection", return_value = (None, None))
@stubs
def test_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
def set_final_output(job_order, output_callback, runtimeContext):
@mock.patch("arvados_cwl.task_queue.TaskQueue")
@mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
- @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+ @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection", return_value = (None, None))
@stubs
def test_default_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
def set_final_output(job_order, output_callback, runtimeContext):
}, 'state': 'Committed',
'output_path': '/var/spool/cwl',
'name': 'expect_arvworkflow.cwl#main',
- 'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'container_image': '999999999999999999999999999999d3+99',
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
'id': '#submit_tool.cwl/x'}
],
'requirements': [
- {'dockerPull': 'debian:8', 'class': 'DockerRequirement'}
+ {
+ 'dockerPull': 'debian:8',
+ 'class': 'DockerRequirement',
+ "http://arvados.org/cwl#dockerCollectionPDH": "999999999999999999999999999999d4+99"
+ }
],
'id': '#submit_tool.cwl',
'outputs': [],
}, 'state': 'Committed',
'output_path': '/var/spool/cwl',
'name': 'a test workflow',
- 'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'container_image': "999999999999999999999999999999d3+99",
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
except:
logging.exception("")
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["runtime_constraints"]["docker_image"] = "arvados/jobs:123"
+ stubs.expect_pipeline_instance["components"]["cwl-runner"]["runtime_constraints"]["docker_image"] = "999999999999999999999999999999d5+99"
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
stubs.api.pipeline_instances().create.assert_called_with(
except:
logging.exception("")
- stubs.expect_container_spec["container_image"] = "arvados/jobs:123"
+ stubs.expect_container_spec["container_image"] = "999999999999999999999999999999d5+99"
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ def tearDown(self):
+ arvados_cwl.arvdocker.arv_docker_clear_cache()
@mock.patch("arvados.commands.keepdocker.find_one_image_hash")
@mock.patch("cwltool.docker.DockerCommandLineJob.get_image")
@mock.patch("arvados.api")
def test_arvados_jobs_image(self, api, get_image, find_one_image_hash):
+ arvados_cwl.arvdocker.arv_docker_clear_cache()
+
arvrunner = mock.MagicMock()
arvrunner.project_uuid = ""
api.return_value = mock.MagicMock()
"properties": ""
}], "items_available": 1, "offset": 0},)
arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
- self.assertEqual("arvados/jobs:"+arvados_cwl.__version__,
+ arvrunner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "portable_data_hash": "9999999999999999999999999999999b+99"}
+ self.assertEqual("9999999999999999999999999999999b+99",
arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+
@stubs
def test_submit_secrets(self, stubs):
capture_stdout = cStringIO.StringIO()
"/var/lib/cwl/workflow.json#main",
"/var/lib/cwl/cwl.input.json"
],
- "container_image": "arvados/jobs:"+arvados_cwl.__version__,
+ "container_image": "999999999999999999999999999999d3+99",
"cwd": "/var/spool/cwl",
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"hints": [
{
"class": "DockerRequirement",
- "dockerPull": "debian:8"
+ "dockerPull": "debian:8",
+ "http://arvados.org/cwl#dockerCollectionPDH": "999999999999999999999999999999d4+99"
},
{
"class": "http://commonwl.org/cwltool#Secrets",
logging.exception("")
stubs.api.container_requests().update.assert_called_with(
- uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec))
+ uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec), cluster_id="zzzzz")
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+ @stubs
+ def test_submit_container_cluster_id(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--submit-runner-cluster=zbbbb",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container), cluster_id="zbbbb")
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
"components": {
"inputs_test.cwl": {
'runtime_constraints': {
- 'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'docker_image': '999999999999999999999999999999d3+99',
'min_ram_mb_per_node': 1024
},
'script_parameters': {
'cwl:tool':
- '6c5ee1cd606088106d9f28367cde1e41+60/workflow.cwl#main',
+ 'a2de777156fb700f1363b1f2e370adca+60/workflow.cwl#main',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
params = expect_template[
"components"]["inputs_test.cwl"]["script_parameters"]
params["fileInput"]["value"] = '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt'
- params["cwl:tool"] = '6c5ee1cd606088106d9f28367cde1e41+60/workflow.cwl#main'
+ params["cwl:tool"] = 'a2de777156fb700f1363b1f2e370adca+60/workflow.cwl#main'
params["floatInput"]["value"] = 1.234
params["boolInput"]["value"] = True
"requirements": [
{
"class": "DockerRequirement",
- "dockerPull": "debian:8"
+ "dockerPull": "debian:8",
+ "http://arvados.org/cwl#dockerCollectionPDH": "999999999999999999999999999999d4+99"
}
]
},
requirements:
- class: DockerRequirement
dockerPull: debian:8
+ 'http://arvados.org/cwl#dockerCollectionPDH': 999999999999999999999999999999d4+99
inputs:
- id: '#submit_tool.cwl/x'
type: File
RUN apt-get update -q && apt-get install -qy git python-pip python-virtualenv python-dev libcurl4-gnutls-dev libgnutls28-dev nodejs python-pyasn1-modules
-RUN pip install -U setuptools six
+RUN pip install -U setuptools six requests
ARG sdk
ARG runner
'google-api-python-client >=1.6.2, <1.7',
'httplib2 >=0.9.2',
'pycurl >=7.19.5.1',
- 'ruamel.yaml >=0.13.11, <= 0.15.26',
+ 'ruamel.yaml >=0.15.54, <=0.15.77',
'setuptools',
'ws4py >=0.4.2',
'subprocess32 >=3.5.1',
defaultTrashLifetime: Rails.application.config.default_trash_lifetime,
blobSignatureTtl: Rails.application.config.blob_signature_ttl,
maxRequestSize: Rails.application.config.max_request_size,
+ maxItemsPerResponse: Rails.application.config.max_items_per_response,
dockerImageFormats: Rails.application.config.docker_image_formats,
crunchLogBytesPerEvent: Rails.application.config.crunch_log_bytes_per_event,
crunchLogSecondsBetweenEvents: Rails.application.config.crunch_log_seconds_between_events,