#!/usr/bin/env python
+# Implement cwl-runner interface for submitting and running work on Arvados, using
+# either the Crunch jobs API or Crunch containers API.
+
import argparse
-import arvados
-import arvados.events
-import arvados.commands.keepdocker
-import arvados.commands.run
-import arvados.collection
-import arvados.util
-import cwltool.draft2tool
-import cwltool.workflow
-import cwltool.main
-from cwltool.process import shortname
-import threading
-import cwltool.docker
-import fnmatch
import logging
-import re
import os
import sys
+import threading
+import hashlib
+from functools import partial
+import pkg_resources # part of setuptools
+
+from cwltool.errors import WorkflowException
+import cwltool.main
+import cwltool.workflow
-from cwltool.process import get_feature
+import arvados
+import arvados.config
+
+from .arvcontainer import ArvadosContainer, RunnerContainer
+from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from .arvtool import ArvadosCommandTool
+from .fsaccess import CollectionFsAccess
+from .arvworkflow import make_workflow
+from .perf import Perf
+
+from cwltool.process import shortname, UnsupportedRequirement
+from cwltool.pathmapper import adjustFileObjs
+from cwltool.draft2tool import compute_checksums
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
logger.setLevel(logging.INFO)
-crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
-crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
-certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
-
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
-
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
- if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
- dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
-
- sp = dockerRequirement["dockerImageId"].split(":")
- image_name = sp[0]
- image_tag = sp[1] if len(sp) > 1 else None
-
- images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
- image_name=image_name,
- image_tag=image_tag)
-
- if not images:
- imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
- args = ["--project-uuid="+project_uuid, image_name]
- if image_tag:
- args.append(image_tag)
- logger.info("Uploading Docker image %s", ":".join(args[1:]))
- arvados.commands.keepdocker.main(args)
-
- return dockerRequirement["dockerImageId"]
-
-
-class CollectionFsAccess(cwltool.process.StdFsAccess):
- def __init__(self, basedir):
- self.collections = {}
- self.basedir = basedir
-
- def get_collection(self, path):
- p = path.split("/")
- if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
- pdh = p[0][5:]
- if pdh not in self.collections:
- self.collections[pdh] = arvados.collection.CollectionReader(pdh)
- return (self.collections[pdh], "/".join(p[1:]))
- else:
- return (None, path)
-
- def _match(self, collection, patternsegments, parent):
- if not patternsegments:
- return []
-
- if not isinstance(collection, arvados.collection.RichCollectionBase):
- return []
-
- ret = []
- # iterate over the files and subcollections in 'collection'
- for filename in collection:
- if patternsegments[0] == '.':
- # Pattern contains something like "./foo" so just shift
- # past the "./"
- ret.extend(self._match(collection, patternsegments[1:], parent))
- elif fnmatch.fnmatch(filename, patternsegments[0]):
- cur = os.path.join(parent, filename)
- if len(patternsegments) == 1:
- ret.append(cur)
- else:
- ret.extend(self._match(collection[filename], patternsegments[1:], cur))
- return ret
-
- def glob(self, pattern):
- collection, rest = self.get_collection(pattern)
- patternsegments = rest.split("/")
- return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
-
- def open(self, fn, mode):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.open(rest, mode)
- else:
- return open(self._abs(fn), mode)
-
- def exists(self, fn):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.exists(rest)
- else:
- return os.path.exists(self._abs(fn))
-
-class ArvadosJob(object):
- def __init__(self, runner):
- self.arvrunner = runner
- self.running = False
-
- def run(self, dry_run=False, pull_image=True, **kwargs):
- script_parameters = {
- "command": self.command_line
- }
- runtime_constraints = {}
-
- if self.generatefiles:
- vwd = arvados.collection.Collection()
- script_parameters["task.vwd"] = {}
- for t in self.generatefiles:
- if isinstance(self.generatefiles[t], dict):
- src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
- vwd.copy(rest, t, source_collection=src)
- else:
- with vwd.open(t, "w") as f:
- f.write(self.generatefiles[t])
- vwd.save_new()
- for t in self.generatefiles:
- script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
- script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
- if self.environment:
- script_parameters["task.env"].update(self.environment)
-
- if self.stdin:
- script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-
- if self.stdout:
- script_parameters["task.stdout"] = self.stdout
-
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
- if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
-
- try:
- response = self.arvrunner.api.jobs().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "script": "crunchrunner",
- "repository": "arvados",
- "script_version": "master",
- "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
- "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
- "runtime_constraints": runtime_constraints
- }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
-
- self.arvrunner.jobs[response["uuid"]] = self
-
- self.arvrunner.pipeline["components"][self.name] = {"job": response}
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
-
- logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
- except Exception as e:
- logger.error("Got error %s" % str(e))
- self.output_callback({}, "permanentFail")
-
- def update_pipeline_component(self, record):
- self.arvrunner.pipeline["components"][self.name] = {"job": record}
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
-
- def done(self, record):
- try:
- self.update_pipeline_component(record)
- except:
- pass
-
- try:
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"
-
- try:
- outputs = {}
- if record["output"]:
- logc = arvados.collection.Collection(record["log"])
- log = logc.open(logc.keys()[0])
- tmpdir = None
- outdir = None
- keepdir = None
- for l in log.readlines():
- g = tmpdirre.match(l)
- if g:
- tmpdir = g.group(1)
- g = outdirre.match(l)
- if g:
- outdir = g.group(1)
- g = keepre.match(l)
- if g:
- keepdir = g.group(1)
- if tmpdir and outdir and keepdir:
- break
-
- self.builder.outdir = outdir
- self.builder.pathmapper.keepdir = keepdir
- outputs = self.collect_outputs("keep:" + record["output"])
- except Exception as e:
- logger.exception("Got exception while collecting job outputs:")
- processStatus = "permanentFail"
-
- self.output_callback(outputs, processStatus)
- finally:
- del self.arvrunner.jobs[record["uuid"]]
-
-
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
- def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
- self._pathmap = arvrunner.get_uploaded()
- uploadfiles = []
-
- pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-
- for src in referenced_files:
- if isinstance(src, basestring) and pdh_path.match(src):
- self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
- if src not in self._pathmap:
- ab = cwltool.pathmapper.abspath(src, basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
- if kwargs.get("conformance_test"):
- self._pathmap[src] = (src, ab)
- elif isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.append((src, ab, st))
- elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = (ab, st.fn)
- else:
- raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
-
- if uploadfiles:
- arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
- arvrunner.api,
- dry_run=kwargs.get("dry_run"),
- num_retries=3,
- fnPattern="$(task.keep)/%s/%s",
- project=arvrunner.project_uuid)
-
- for src, ab, st in uploadfiles:
- arvrunner.add_uploaded(src, (ab, st.fn))
- self._pathmap[src] = (ab, st.fn)
-
- self.keepdir = None
-
- def reversemap(self, target):
- if target.startswith("keep:"):
- return (target, target)
- elif self.keepdir and target.startswith(self.keepdir):
- return (target, "keep:" + target[len(self.keepdir)+1:])
- else:
- return super(ArvPathMapper, self).reversemap(target)
-
-
-class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
- def __init__(self, arvrunner, toolpath_object, **kwargs):
- super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
- self.arvrunner = arvrunner
-
- def makeJobRunner(self):
- return ArvadosJob(self.arvrunner)
-
- def makePathMapper(self, reffiles, input_basedir, **kwargs):
- return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+class ArvCwlRunner(object):
+ """Execute a CWL tool or workflow, submit work (using either jobs or
+ containers API), wait for them to complete, and report output.
+ """
-class ArvCwlRunner(object):
- def __init__(self, api_client):
+ def __init__(self, api_client, work_api=None):
self.api = api_client
- self.jobs = {}
+ self.processes = {}
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.final_output = None
+ self.final_status = None
self.uploaded = {}
self.num_retries = 4
+ self.uuid = None
+ self.work_api = work_api
+ self.stop_polling = threading.Event()
+ self.poll_api = None
+
+ if self.work_api is None:
+ # todo: autodetect API to use.
+ self.work_api = "jobs"
+
+ if self.work_api not in ("containers", "jobs"):
+ raise Exception("Unsupported API '%s'" % self.work_api)
def arvMakeTool(self, toolpath_object, **kwargs):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
+ kwargs["work_api"] = self.work_api
return ArvadosCommandTool(self, toolpath_object, **kwargs)
else:
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
def output_callback(self, out, processStatus):
if processStatus == "success":
- logger.info("Overall job status is %s", processStatus)
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Complete"}).execute(num_retries=self.num_retries)
-
+ logger.info("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
else:
- logger.warn("Overall job status is %s", processStatus)
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ logger.warn("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
self.final_output = out
-
def on_message(self, event):
if "object_uuid" in event:
- if event["object_uuid"] in self.jobs and event["event_type"] == "update":
- if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+ if event["object_uuid"] in self.processes and event["event_type"] == "update":
+ if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
uuid = event["object_uuid"]
with self.lock:
- j = self.jobs[uuid]
+ j = self.processes[uuid]
logger.info("Job %s (%s) is Running", j.name, uuid)
j.running = True
j.update_pipeline_component(event["properties"]["new_attributes"])
uuid = event["object_uuid"]
try:
self.cond.acquire()
- j = self.jobs[uuid]
+ j = self.processes[uuid]
logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
- j.done(event["properties"]["new_attributes"])
+ with Perf(logger, "done %s" % j.name):
+ j.done(event["properties"]["new_attributes"])
self.cond.notify()
finally:
self.cond.release()
+ def poll_states(self):
+ """Poll status of jobs or containers listed in the processes dict.
+
+ Runs in a separate thread.
+ """
+
+ while True:
+ self.stop_polling.wait(15)
+ if self.stop_polling.is_set():
+ break
+ with self.lock:
+ keys = self.processes.keys()
+ if not keys:
+ continue
+
+ if self.work_api == "containers":
+ table = self.poll_api.containers()
+ elif self.work_api == "jobs":
+ table = self.poll_api.jobs()
+
+ try:
+ proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
+
def get_uploaded(self):
return self.uploaded.copy()
def add_uploaded(self, src, pair):
self.uploaded[src] = pair
- def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
-
- try:
- self.api.collections().get(uuid=crunchrunner_pdh).execute()
- except arvados.errors.ApiError as e:
- import httplib2
- h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
- resp, content = h.request(crunchrunner_download, "GET")
- resp2, content2 = h.request(certs_download, "GET")
- with arvados.collection.Collection() as col:
- with col.open("crunchrunner", "w") as f:
- f.write(content)
- with col.open("ca-certificates.crt", "w") as f:
- f.write(content2)
+ def check_writable(self, obj):
+ if isinstance(obj, dict):
+ if obj.get("writable"):
+ raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
+ for v in obj.itervalues():
+ self.check_writable(v)
+ if isinstance(obj, list):
+ for v in obj:
+ self.check_writable(v)
- col.save_new("crunchrunner binary", ensure_unique_name=True)
+ def arvExecutor(self, tool, job_order, **kwargs):
+ self.debug = kwargs.get("debug")
- self.fs_access = CollectionFsAccess(input_basedir)
+ tool.visit(self.check_writable)
- kwargs["fs_access"] = self.fs_access
- if args:
- kwargs["enable_reuse"] = args.enable_reuse
+ if kwargs.get("quiet"):
+ logger.setLevel(logging.WARN)
+ logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
- kwargs["outdir"] = "$(task.outdir)"
- kwargs["tmpdir"] = "$(task.tmpdir)"
+ if self.debug:
+ logger.setLevel(logging.DEBUG)
useruuid = self.api.users().current().execute()["uuid"]
- self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+ self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
+ self.pipeline = None
+ make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
+ self.fs_access = make_fs_access(kwargs["basedir"])
+
+ if kwargs.get("create_template"):
+ tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
+ tmpl.save()
+ # cwltool.main will write our return value to stdout.
+ return tmpl.uuid
+
+ if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+ return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
+ self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+
+ kwargs["make_fs_access"] = make_fs_access
+ kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+ kwargs["use_container"] = True
+ kwargs["tmpdir_prefix"] = "tmp"
+ kwargs["on_error"] = "continue"
+ kwargs["compute_checksum"] = kwargs.get("compute_checksum")
+
+ if self.work_api == "containers":
+ kwargs["outdir"] = "/var/spool/cwl"
+ kwargs["docker_outdir"] = "/var/spool/cwl"
+ kwargs["tmpdir"] = "/tmp"
+ kwargs["docker_tmpdir"] = "/tmp"
+ elif self.work_api == "jobs":
+ kwargs["outdir"] = "$(task.outdir)"
+ kwargs["docker_outdir"] = "$(task.outdir)"
+ kwargs["tmpdir"] = "$(task.tmpdir)"
+
+ runnerjob = None
+ if kwargs.get("submit"):
+ if self.work_api == "containers":
+ if tool.tool["class"] == "CommandLineTool":
+ runnerjob = tool.job(job_order,
+ self.output_callback,
+ **kwargs).next()
+ else:
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+ else:
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
- if kwargs.get("conformance_test"):
- return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
- else:
+ if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
+ # Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
"owner_uuid": self.project_uuid,
"name": shortname(tool.tool["id"]),
"components": {},
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
-
logger.info("Pipeline instance %s", self.pipeline["uuid"])
+ if runnerjob and not kwargs.get("wait"):
+ runnerjob.run()
+ return runnerjob.uuid
+
+ self.poll_api = arvados.api('v1')
+ self.polling_thread = threading.Thread(target=self.poll_states)
+ self.polling_thread.start()
+
+ if runnerjob:
+ jobiter = iter((runnerjob,))
+ else:
+ if "cwl_runner_job" in kwargs:
+ self.uuid = kwargs.get("cwl_runner_job").get('uuid')
jobiter = tool.job(job_order,
- input_basedir,
self.output_callback,
- docker_outdir="$(task.outdir)",
**kwargs)
- try:
- self.cond.acquire()
- # Will continue to hold the lock for the duration of this code
- # except when in cond.wait(), at which point on_message can update
- # job state and process output callbacks.
-
- for runnable in jobiter:
- if runnable:
+ try:
+ self.cond.acquire()
+ # Will continue to hold the lock for the duration of this code
+ # except when in cond.wait(), at which point on_message can update
+ # job state and process output callbacks.
+
+ for runnable in jobiter:
+ if runnable:
+ with Perf(logger, "run"):
runnable.run(**kwargs)
+ else:
+ if self.processes:
+ self.cond.wait(1)
else:
- if self.jobs:
- self.cond.wait(1)
- else:
- logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
- break
+ logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+ break
- while self.jobs:
- self.cond.wait(1)
+ while self.processes:
+ self.cond.wait(1)
- events.close()
-
- if self.final_output is None:
- raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
-
- # create final output collection
- except:
- if sys.exc_info()[0] is KeyboardInterrupt:
- logger.error("Interrupted, marking pipeline as failed")
- else:
- logger.exception("Caught unhandled exception, marking pipeline as failed")
+ except UnsupportedRequirement:
+ raise
+ except:
+ if sys.exc_info()[0] is KeyboardInterrupt:
+ logger.error("Interrupted, marking pipeline as failed")
+ else:
+ logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
- finally:
- self.cond.release()
+ if runnerjob and runnerjob.uuid and self.work_api == "containers":
+ self.api.container_requests().update(uuid=runnerjob.uuid,
+ body={"priority": "0"}).execute(num_retries=self.num_retries)
+ finally:
+ self.cond.release()
+ self.stop_polling.set()
+ self.polling_thread.join()
- return self.final_output
+ if self.final_status == "UnsupportedRequirement":
+ raise UnsupportedRequirement("Check log for details.")
+ if self.final_status != "success":
+ raise WorkflowException("Workflow failed.")
+
+ if self.final_output is None:
+ raise WorkflowException("Workflow did not return a result.")
+
+ if kwargs.get("compute_checksum"):
+ adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
+ return self.final_output
+
+
+def versionstring():
+ """Print version string of key packages for provenance and debugging."""
+
+ arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
+ arvpkg = pkg_resources.require("arvados-python-client")
+ cwlpkg = pkg_resources.require("cwltool")
+
+ return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+ "arvados-python-client", arvpkg[0].version,
+ "cwltool", cwlpkg[0].version)
+
+
+def arg_parser(): # type: () -> argparse.ArgumentParser
+ parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
+
+ parser.add_argument("--basedir", type=str,
+ help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
+ parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
+ help="Output directory, default current directory")
+
+ parser.add_argument("--eval-timeout",
+ help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
+ type=float,
+ default=20)
+ parser.add_argument("--version", action="store_true", help="Print version and exit")
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--verbose", action="store_true", help="Default logging")
+ exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
+ exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
+
+ parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
-def main(args, stdout, stderr, api_client=None):
- args.insert(0, "--leave-outputs")
- parser = cwltool.main.arg_parser()
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-reuse", action="store_true",
default=True, dest="enable_reuse",
exgroup.add_argument("--disable-reuse", action="store_false",
default=True, dest="enable_reuse",
help="")
- parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
+
+ parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+ parser.add_argument("--ignore-docker-for-reuse", action="store_true",
+ help="Ignore Docker image version when deciding whether to reuse past jobs.",
+ default=False)
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
+ default=True, dest="submit")
+ exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+ default=True, dest="submit")
+ exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+ exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+ exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+ default=True, dest="wait")
+ exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+ default=True, dest="wait")
+
+ parser.add_argument("--api", type=str,
+ default=None, dest="work_api",
+ help="Select work submission API, one of 'jobs' or 'containers'.")
+
+ parser.add_argument("--compute-checksum", action="store_true", default=False,
+ help="Compute checksum of contents while collecting outputs",
+ dest="compute_checksum")
+
+ parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+ parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
+
+ return parser
+
+
+def main(args, stdout, stderr, api_client=None):
+ parser = arg_parser()
+
+ job_order_object = None
+ arvargs = parser.parse_args(args)
+ if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+ job_order_object = ({}, "")
try:
- runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+ if api_client is None:
+ api_client=arvados.api('v1', model=OrderedJsonModel())
+ runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
except Exception as e:
logger.error(e)
return 1
- return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
+ arvargs.conformance_test = None
+ arvargs.use_container = True
+
+ return cwltool.main.main(args=arvargs,
+ stdout=stdout,
+ stderr=stderr,
+ executor=runner.arvExecutor,
+ makeTool=runner.arvMakeTool,
+ versionfunc=versionstring,
+ job_order_object=job_order_object,
+ make_fs_access=partial(CollectionFsAccess, api_client=api_client))