#!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
# Implement cwl-runner interface for submitting and running work on Arvados, using
# either the Crunch jobs API or Crunch containers API.
import re
from functools import partial
import pkg_resources # part of setuptools
+import Queue
+import time
+import signal
+import thread
from cwltool.errors import WorkflowException
import cwltool.main
import cwltool.workflow
import cwltool.process
-import schema_salad
from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
import arvados
import arvados.config
from arvados.keep import KeepClient
from arvados.errors import ApiError
+import arvados.commands._util as arv_cmd
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
from ._version import __version__
from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement, getListing, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
-from cwltool.draft2tool import compute_checksums
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.command_line_tool import compute_checksums
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
'%(asctime)s %(name)s %(levelname)s: %(message)s',
'%Y-%m-%d %H:%M:%S'))
+DEFAULT_PRIORITY = 500
+
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
containers API), wait for them to complete, and report output.
"""
- def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+ def __init__(self, api_client, work_api=None, keep_client=None,
+ output_name=None, output_tags=None, num_retries=4,
+ thread_count=4):
self.api = api_client
self.processes = {}
- self.lock = threading.Lock()
- self.cond = threading.Condition(self.lock)
+ self.workflow_eval_lock = threading.Condition(threading.RLock())
self.final_output = None
self.final_status = None
self.uploaded = {}
self.output_name = output_name
self.output_tags = output_tags
self.project_uuid = None
+ self.intermediate_output_ttl = 0
+ self.intermediate_output_collections = []
+ self.trash_intermediate = False
+ self.thread_count = thread_count
+ self.poll_interval = 12
if keep_client is not None:
self.keep_client = keep_client
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
+ self.fetcher_constructor = partial(CollectionFetcher,
+ api_client=self.api,
+ fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+ num_retries=self.num_retries)
+
self.work_api = None
expected_api = ["jobs", "containers"]
for api in expected_api:
def arv_make_tool(self, toolpath_object, **kwargs):
kwargs["work_api"] = self.work_api
- kwargs["fetcher_constructor"] = partial(CollectionFetcher,
- api_client=self.api,
- keep_client=self.keep_client)
+ kwargs["fetcher_constructor"] = self.fetcher_constructor
+ kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
def output_callback(self, out, processStatus):
- if processStatus == "success":
- logger.info("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Complete"}).execute(num_retries=self.num_retries)
- else:
- logger.warn("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- self.final_status = processStatus
- self.final_output = out
+ with self.workflow_eval_lock:
+ if processStatus == "success":
+ logger.info("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
+ else:
+ logger.warn("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
+ self.final_output = out
+ self.workflow_eval_lock.notifyAll()
+
+
+ def start_run(self, runnable, kwargs):
+ self.task_queue.add(partial(runnable.run, **kwargs))
+
+ def process_submitted(self, container):
+ with self.workflow_eval_lock:
+ self.processes[container.uuid] = container
+
+ def process_done(self, uuid, record):
+ with self.workflow_eval_lock:
+ j = self.processes[uuid]
+ logger.info("%s %s is %s", self.label(j), uuid, record["state"])
+ self.task_queue.add(partial(j.done, record))
+ del self.processes[uuid]
+
+ def wrapped_callback(self, cb, obj, st):
+ with self.workflow_eval_lock:
+ cb(obj, st)
+ self.workflow_eval_lock.notifyAll()
+
+ def get_wrapped_callback(self, cb):
+ return partial(self.wrapped_callback, cb)
def on_message(self, event):
- if "object_uuid" in event:
- if event["object_uuid"] in self.processes and event["event_type"] == "update":
- if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
- uuid = event["object_uuid"]
- with self.lock:
- j = self.processes[uuid]
- logger.info("%s %s is Running", self.label(j), uuid)
+ if event.get("object_uuid") in self.processes and event["event_type"] == "update":
+ uuid = event["object_uuid"]
+ if event["properties"]["new_attributes"]["state"] == "Running":
+ with self.workflow_eval_lock:
+ j = self.processes[uuid]
+ if j.running is False:
j.running = True
j.update_pipeline_component(event["properties"]["new_attributes"])
- elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
- uuid = event["object_uuid"]
- try:
- self.cond.acquire()
- j = self.processes[uuid]
- logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
- with Perf(metrics, "done %s" % j.name):
- j.done(event["properties"]["new_attributes"])
- self.cond.notify()
- finally:
- self.cond.release()
+ logger.info("%s %s is Running", self.label(j), uuid)
+ elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
+ self.process_done(uuid, event["properties"]["new_attributes"])
def label(self, obj):
return "[%s %s]" % (self.work_api[0:-1], obj.name)
"""
try:
+ remain_wait = self.poll_interval
while True:
- self.stop_polling.wait(15)
+ if remain_wait > 0:
+ self.stop_polling.wait(remain_wait)
if self.stop_polling.is_set():
break
- with self.lock:
- keys = self.processes.keys()
+ with self.workflow_eval_lock:
+ keys = list(self.processes.keys())
if not keys:
+ remain_wait = self.poll_interval
continue
+ begin_poll = time.time()
if self.work_api == "containers":
table = self.poll_api.container_requests()
elif self.work_api == "jobs":
proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
except Exception as e:
logger.warn("Error checking states on API server: %s", e)
+ remain_wait = self.poll_interval
continue
for p in proc_states["items"]:
"new_attributes": p
}
})
+ finish_poll = time.time()
+ remain_wait = self.poll_interval - (finish_poll - begin_poll)
except:
- logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
- self.cond.acquire()
- self.processes.clear()
- self.cond.notify()
- self.cond.release()
+ logger.exception("Fatal error in state polling thread.")
+ with self.workflow_eval_lock:
+ self.processes.clear()
+ self.workflow_eval_lock.notifyAll()
finally:
self.stop_polling.set()
def add_uploaded(self, src, pair):
self.uploaded[src] = pair
+ def add_intermediate_output(self, uuid):
+ if uuid:
+ self.intermediate_output_collections.append(uuid)
+
+ def trash_intermediate_output(self):
+ logger.info("Cleaning up intermediate output collections")
+ for i in self.intermediate_output_collections:
+ try:
+ self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
+ except:
+ logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+ break
+
def check_features(self, obj):
if isinstance(obj, dict):
- if obj.get("writable"):
- raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
- if obj.get("class") == "CommandLineTool":
- if self.work_api == "containers":
- if obj.get("stdin"):
- raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
- if obj.get("stderr"):
- raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
+ if obj.get("writable") and self.work_api != "containers":
+ raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
if obj.get("class") == "DockerRequirement":
if obj.get("dockerOutputDirectory"):
- # TODO: can be supported by containers API, but not jobs API.
- raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+ if self.work_api != "containers":
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+ if not obj.get("dockerOutputDirectory").startswith('/'):
+ raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+ "Option 'dockerOutputDirectory' must be an absolute path.")
+ if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
+ raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
for i,v in enumerate(obj):
- with SourceLine(obj, i, UnsupportedRequirement):
+ with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
self.check_features(v)
def make_output_collection(self, name, tagsString, outputObj):
keep_client=self.keep_client,
num_retries=self.num_retries)
- srccollections = {}
for k,v in generatemapper.items():
if k.startswith("_:"):
if v.type == "Directory":
raise Exception("Output source is not in keep or a literal")
sp = k.split("/")
srccollection = sp[0][5:]
- if srccollection not in srccollections:
- try:
- srccollections[srccollection] = arvados.collection.CollectionReader(
- srccollection,
- api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
- except arvados.errors.ArgumentError as e:
- logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
- raise
- reader = srccollections[srccollection]
try:
+ reader = self.collection_cache.get(srccollection)
srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+ except arvados.errors.ArgumentError as e:
+ logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+ raise
except IOError as e:
logger.warn("While preparing output collection: %s", e)
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("basename", "listing", "contents"):
+ for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
if k in fileobj:
del fileobj[k]
self.project_uuid = kwargs.get("project_uuid")
self.pipeline = None
make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
- api_client=self.api,
- keep_client=self.keep_client)
+ collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
+ self.secret_store = kwargs.get("secret_store")
+
+ self.trash_intermediate = kwargs["trash_intermediate"]
+ if self.trash_intermediate and self.work_api != "containers":
+ raise Exception("--trash-intermediate is only supported with --api=containers.")
+
+ self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+ if self.intermediate_output_ttl and self.work_api != "containers":
+ raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
+ if self.intermediate_output_ttl < 0:
+ raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
if not kwargs.get("name"):
kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- upload_workflow_deps(self, tool)
+ merged_map = upload_workflow_deps(self, tool)
# Reload tool object which may have been updated by
# upload_workflow_deps
+ # Don't validate this time because it will just print redundant errors.
tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
makeTool=self.arv_make_tool,
loader=tool.doc_loader,
avsc_names=tool.doc_schema,
- metadata=tool.metadata)
+ metadata=tool.metadata,
+ do_validate=False)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % kwargs["name"],
kwargs.get("enable_reuse"),
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"])
+ name=kwargs["name"],
+ merged_map=merged_map)
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
self.project_uuid,
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"]),
+ name=kwargs["name"],
+ merged_map=merged_map),
"success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+ self.eval_timeout = kwargs.get("eval_timeout")
kwargs["make_fs_access"] = make_fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
kwargs["compute_checksum"] = kwargs.get("compute_checksum")
if self.work_api == "containers":
+ if self.ignore_docker_for_reuse:
+ raise Exception("--ignore-docker-for-reuse not supported with containers API.")
kwargs["outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "/var/spool/cwl"
kwargs["tmpdir"] = "/tmp"
kwargs["docker_tmpdir"] = "/tmp"
elif self.work_api == "jobs":
+ if kwargs["priority"] != DEFAULT_PRIORITY:
+ raise Exception("--priority not implemented for jobs API.")
kwargs["outdir"] = "$(task.outdir)"
kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
+ if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+ raise Exception("--priority must be in the range 1..1000.")
+
runnerjob = None
if kwargs.get("submit"):
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool":
+ if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
kwargs["runnerjob"] = tool.tool["id"]
- upload_dependencies(self,
- kwargs["name"],
- tool.doc_loader,
- tool.tool,
- tool.tool["id"],
- False)
runnerjob = tool.job(job_order,
self.output_callback,
**kwargs).next()
submit_runner_ram=kwargs.get("submit_runner_ram"),
name=kwargs.get("name"),
on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"))
+ submit_runner_image=kwargs.get("submit_runner_image"),
+ intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
+ merged_map=merged_map,
+ priority=kwargs.get("priority"),
+ secret_store=self.secret_store)
elif self.work_api == "jobs":
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
self.output_name,
submit_runner_ram=kwargs.get("submit_runner_ram"),
name=kwargs.get("name"),
on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"))
-
- if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+ submit_runner_image=kwargs.get("submit_runner_image"),
+ merged_map=merged_map)
+ elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runnerjob and not kwargs.get("wait"):
- runnerjob.run(wait=kwargs.get("wait"))
+ submitargs = kwargs.copy()
+ submitargs['submit'] = False
+ runnerjob.run(**submitargs)
return (runnerjob.uuid, "success")
self.poll_api = arvados.api('v1')
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
+ self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
+
if runnerjob:
jobiter = iter((runnerjob,))
else:
**kwargs)
try:
- self.cond.acquire()
- # Will continue to hold the lock for the duration of this code
- # except when in cond.wait(), at which point on_message can update
- # job state and process output callbacks.
+ self.workflow_eval_lock.acquire()
+ # Holds the lock while this code runs and releases it when
+ # it is safe to do so in self.workflow_eval_lock.wait(),
+ # at which point on_message can update job state and
+ # process output callbacks.
loopperf = Perf(metrics, "jobiter")
loopperf.__enter__()
if self.stop_polling.is_set():
break
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
+
if runnable:
with Perf(metrics, "run"):
- runnable.run(**kwargs)
+ self.start_run(runnable, kwargs)
else:
- if self.processes:
- self.cond.wait(1)
+ if (self.task_queue.in_flight + len(self.processes)) > 0:
+ self.workflow_eval_lock.wait(3)
else:
- logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+ logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
loopperf.__enter__()
loopperf.__exit__()
- while self.processes:
- self.cond.wait(1)
+ while (self.task_queue.in_flight + len(self.processes)) > 0:
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
+ self.workflow_eval_lock.wait(3)
except UnsupportedRequirement:
raise
except:
- if sys.exc_info()[0] is KeyboardInterrupt:
- logger.error("Interrupted, marking pipeline as failed")
+ if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+ logger.error("Interrupted, workflow will be cancelled")
else:
logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
- self.cond.release()
+ self.workflow_eval_lock.release()
+ self.task_queue.drain()
self.stop_polling.set()
self.polling_thread.join()
+ self.task_queue.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
self.set_crunch_output()
if kwargs.get("compute_checksum"):
- adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
+ adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+ if self.trash_intermediate and self.final_status == "success":
+ self.trash_intermediate_output()
+
return (self.final_output, self.final_status)
arvpkg = pkg_resources.require("arvados-python-client")
cwlpkg = pkg_resources.require("cwltool")
- return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
+ return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
"arvados-python-client", arvpkg[0].version,
"cwltool", cwlpkg[0].version)
help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
type=float,
default=20)
- parser.add_argument("--version", action="store_true", help="Print version and exit")
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--print-dot", action="store_true",
+ help="Print workflow visualization in graphviz format and exit")
+ exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
+ exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--verbose", action="store_true", help="Default logging")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-reuse", action="store_true",
default=True, dest="enable_reuse",
- help="")
+ help="Enable job or container reuse (default)")
exgroup.add_argument("--disable-reuse", action="store_false",
default=True, dest="enable_reuse",
- help="")
+ help="Disable job or container reuse")
parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
parser.add_argument("--api", type=str,
default=None, dest="work_api",
- help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
+ choices=("jobs", "containers"),
+ help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
parser.add_argument("--compute-checksum", action="store_true", default=False,
help="Compute checksum of contents while collecting outputs",
help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
"Default is 'continue'.", default="continue", choices=("stop", "continue"))
- parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+ parser.add_argument("--enable-dev", action="store_true",
+ help="Enable loading and running development versions "
+ "of CWL spec.", default=False)
+
+ parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
+ help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
+ default=0)
+
+ parser.add_argument("--priority", type=int,
+ help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+ default=DEFAULT_PRIORITY)
+
+ parser.add_argument("--disable-validate", dest="do_validate",
+ action="store_false", default=True,
+ help=argparse.SUPPRESS)
+
+ parser.add_argument("--disable-js-validation",
+ action="store_true", default=False,
+ help=argparse.SUPPRESS)
+
+ parser.add_argument("--thread-count", type=int,
+ default=4, help="Number of threads to use for job submit and output collection.")
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--trash-intermediate", action="store_true",
+ default=False, dest="trash_intermediate",
+ help="Immediately trash intermediate outputs on workflow success.")
+ exgroup.add_argument("--no-trash-intermediate", action="store_false",
+ default=False, dest="trash_intermediate",
+ help="Do not trash intermediate outputs (default).")
+
+ parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
return parser
def add_arv_hints():
- cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+ cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+ cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
res.close()
-
-def main(args, stdout, stderr, api_client=None, keep_client=None):
+ cwltool.process.supportedProcessRequirements.extend([
+ "http://arvados.org/cwl#RunInSingleContainer",
+ "http://arvados.org/cwl#OutputDirType",
+ "http://arvados.org/cwl#RuntimeConstraints",
+ "http://arvados.org/cwl#PartitionRequirement",
+ "http://arvados.org/cwl#APIRequirement",
+ "http://commonwl.org/cwltool#LoadListingRequirement",
+ "http://arvados.org/cwl#IntermediateOutput",
+ "http://arvados.org/cwl#ReuseRequirement"
+ ])
+
+def exit_signal_handler(sigcode, frame):
+ logger.error("Caught signal {}, exiting.".format(sigcode))
+ sys.exit(-sigcode)
+
+def main(args, stdout, stderr, api_client=None, keep_client=None,
+ install_sig_handlers=True):
parser = arg_parser()
job_order_object = None
arvargs = parser.parse_args(args)
- if arvargs.version:
- print versionstring()
- return
+ if install_sig_handlers:
+ arv_cmd.install_signal_handlers()
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
try:
if api_client is None:
- api_client=arvados.api('v1', model=OrderedJsonModel())
+ api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+ keep_client = api_client.keep
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
num_retries=4, output_name=arvargs.output_name,
- output_tags=arvargs.output_tags)
+ output_tags=arvargs.output_tags,
+ thread_count=arvargs.thread_count)
except Exception as e:
logger.error(e)
return 1
arvargs.conformance_test = None
arvargs.use_container = True
arvargs.relax_path_checks = True
- arvargs.validate = None
+ arvargs.print_supported_versions = False
+
+ make_fs_access = partial(CollectionFsAccess,
+ collection_cache=runner.collection_cache)
return cwltool.main.main(args=arvargs,
stdout=stdout,
makeTool=runner.arv_make_tool,
versionfunc=versionstring,
job_order_object=job_order_object,
- make_fs_access=partial(CollectionFsAccess,
- api_client=api_client,
- keep_client=keep_client),
+ make_fs_access=make_fs_access,
fetcher_constructor=partial(CollectionFetcher,
api_client=api_client,
- keep_client=keep_client,
+ fs_access=make_fs_access(""),
num_retries=runner.num_retries),
resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
- logger_handler=arvados.log_handler)
+ logger_handler=arvados.log_handler,
+ custom_schema_callback=add_arv_hints)