2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
21 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
53 arvados.log_handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s %(levelname)s: %(message)s',
57 DEFAULT_PRIORITY = 500
59 class ArvCwlRunner(object):
60 """Execute a CWL tool or workflow, submit work (using either jobs or
61 containers API), wait for them to complete, and report output.
65 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
68 self.lock = threading.Lock()
69 self.cond = threading.Condition(self.lock)
70 self.final_output = None
71 self.final_status = None
73 self.num_retries = num_retries
75 self.stop_polling = threading.Event()
78 self.final_output_collection = None
79 self.output_name = output_name
80 self.output_tags = output_tags
81 self.project_uuid = None
82 self.intermediate_output_ttl = 0
83 self.intermediate_output_collections = []
84 self.trash_intermediate = False
86 if keep_client is not None:
87 self.keep_client = keep_client
89 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
91 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
94 expected_api = ["jobs", "containers"]
95 for api in expected_api:
97 methods = self.api._rootDesc.get('resources')[api]['methods']
98 if ('httpMethod' in methods['create'] and
99 (work_api == api or work_api is None)):
105 if not self.work_api:
107 raise Exception("No supported APIs")
109 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
111 def arv_make_tool(self, toolpath_object, **kwargs):
112 kwargs["work_api"] = self.work_api
113 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
115 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116 num_retries=self.num_retries)
117 kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119 return ArvadosCommandTool(self, toolpath_object, **kwargs)
120 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121 return ArvadosWorkflow(self, toolpath_object, **kwargs)
123 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
125 def output_callback(self, out, processStatus):
126 if processStatus == "success":
127 logger.info("Overall process status is %s", processStatus)
129 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130 body={"state": "Complete"}).execute(num_retries=self.num_retries)
132 logger.warn("Overall process status is %s", processStatus)
134 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135 body={"state": "Failed"}).execute(num_retries=self.num_retries)
136 self.final_status = processStatus
137 self.final_output = out
139 def on_message(self, event):
140 if "object_uuid" in event:
141 if event["object_uuid"] in self.processes and event["event_type"] == "update":
142 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
143 uuid = event["object_uuid"]
145 j = self.processes[uuid]
146 logger.info("%s %s is Running", self.label(j), uuid)
148 j.update_pipeline_component(event["properties"]["new_attributes"])
149 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
150 uuid = event["object_uuid"]
153 j = self.processes[uuid]
154 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
155 with Perf(metrics, "done %s" % j.name):
156 j.done(event["properties"]["new_attributes"])
161 def label(self, obj):
162 return "[%s %s]" % (self.work_api[0:-1], obj.name)
164 def poll_states(self):
165 """Poll status of jobs or containers listed in the processes dict.
167 Runs in a separate thread.
172 self.stop_polling.wait(15)
173 if self.stop_polling.is_set():
176 keys = self.processes.keys()
180 if self.work_api == "containers":
181 table = self.poll_api.container_requests()
182 elif self.work_api == "jobs":
183 table = self.poll_api.jobs()
186 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
187 except Exception as e:
188 logger.warn("Error checking states on API server: %s", e)
191 for p in proc_states["items"]:
193 "object_uuid": p["uuid"],
194 "event_type": "update",
200 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
202 self.processes.clear()
206 self.stop_polling.set()
208 def get_uploaded(self):
209 return self.uploaded.copy()
211 def add_uploaded(self, src, pair):
212 self.uploaded[src] = pair
214 def add_intermediate_output(self, uuid):
216 self.intermediate_output_collections.append(uuid)
218 def trash_intermediate_output(self):
219 logger.info("Cleaning up intermediate output collections")
220 for i in self.intermediate_output_collections:
222 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
224 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
225 if sys.exc_info()[0] is KeyboardInterrupt:
228 def check_features(self, obj):
229 if isinstance(obj, dict):
230 if obj.get("writable") and self.work_api != "containers":
231 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
232 if obj.get("class") == "DockerRequirement":
233 if obj.get("dockerOutputDirectory"):
234 if self.work_api != "containers":
235 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
236 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
237 if not obj.get("dockerOutputDirectory").startswith('/'):
238 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
239 "Option 'dockerOutputDirectory' must be an absolute path.")
240 if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
241 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
242 for v in obj.itervalues():
243 self.check_features(v)
244 elif isinstance(obj, list):
245 for i,v in enumerate(obj):
246 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
247 self.check_features(v)
249 def make_output_collection(self, name, tagsString, outputObj):
250 outputObj = copy.deepcopy(outputObj)
253 def capture(fileobj):
254 files.append(fileobj)
256 adjustDirObjs(outputObj, capture)
257 adjustFileObjs(outputObj, capture)
259 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
261 final = arvados.collection.Collection(api_client=self.api,
262 keep_client=self.keep_client,
263 num_retries=self.num_retries)
265 for k,v in generatemapper.items():
266 if k.startswith("_:"):
267 if v.type == "Directory":
269 if v.type == "CreateFile":
270 with final.open(v.target, "wb") as f:
271 f.write(v.resolved.encode("utf-8"))
274 if not k.startswith("keep:"):
275 raise Exception("Output source is not in keep or a literal")
277 srccollection = sp[0][5:]
279 reader = self.collection_cache.get(srccollection)
280 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
281 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
282 except arvados.errors.ArgumentError as e:
283 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
286 logger.warn("While preparing output collection: %s", e)
288 def rewrite(fileobj):
289 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
290 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
294 adjustDirObjs(outputObj, rewrite)
295 adjustFileObjs(outputObj, rewrite)
297 with final.open("cwl.output.json", "w") as f:
298 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
300 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
302 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
303 final.api_response()["name"],
304 final.manifest_locator())
306 final_uuid = final.manifest_locator()
307 tags = tagsString.split(',')
309 self.api.links().create(body={
310 "head_uuid": final_uuid, "link_class": "tag", "name": tag
311 }).execute(num_retries=self.num_retries)
313 def finalcollection(fileobj):
314 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
316 adjustDirObjs(outputObj, finalcollection)
317 adjustFileObjs(outputObj, finalcollection)
319 return (outputObj, final)
321 def set_crunch_output(self):
322 if self.work_api == "containers":
324 current = self.api.containers().current().execute(num_retries=self.num_retries)
325 except ApiError as e:
326 # Status code 404 just means we're not running in a container.
327 if e.resp.status != 404:
328 logger.info("Getting current container: %s", e)
331 self.api.containers().update(uuid=current['uuid'],
333 'output': self.final_output_collection.portable_data_hash(),
334 }).execute(num_retries=self.num_retries)
335 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
338 }).execute(num_retries=self.num_retries)
339 except Exception as e:
340 logger.info("Setting container output: %s", e)
341 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
342 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
344 'output': self.final_output_collection.portable_data_hash(),
345 'success': self.final_status == "success",
347 }).execute(num_retries=self.num_retries)
349 def arv_executor(self, tool, job_order, **kwargs):
350 self.debug = kwargs.get("debug")
352 tool.visit(self.check_features)
354 self.project_uuid = kwargs.get("project_uuid")
356 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
357 collection_cache=self.collection_cache)
358 self.fs_access = make_fs_access(kwargs["basedir"])
359 self.secret_store = kwargs.get("secret_store")
361 self.trash_intermediate = kwargs["trash_intermediate"]
362 if self.trash_intermediate and self.work_api != "containers":
363 raise Exception("--trash-intermediate is only supported with --api=containers.")
365 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
366 if self.intermediate_output_ttl and self.work_api != "containers":
367 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
368 if self.intermediate_output_ttl < 0:
369 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
371 if not kwargs.get("name"):
372 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
374 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
375 # Also uploads docker images.
376 merged_map = upload_workflow_deps(self, tool)
378 # Reload tool object which may have been updated by
379 # upload_workflow_deps
380 # Don't validate this time because it will just print redundant errors.
381 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
382 makeTool=self.arv_make_tool,
383 loader=tool.doc_loader,
384 avsc_names=tool.doc_schema,
385 metadata=tool.metadata,
388 # Upload local file references in the job order.
389 job_order = upload_job_order(self, "%s input" % kwargs["name"],
392 existing_uuid = kwargs.get("update_workflow")
393 if existing_uuid or kwargs.get("create_workflow"):
394 # Create a pipeline template or workflow record and exit.
395 if self.work_api == "jobs":
396 tmpl = RunnerTemplate(self, tool, job_order,
397 kwargs.get("enable_reuse"),
399 submit_runner_ram=kwargs.get("submit_runner_ram"),
401 merged_map=merged_map)
403 # cwltool.main will write our return value to stdout.
404 return (tmpl.uuid, "success")
405 elif self.work_api == "containers":
406 return (upload_workflow(self, tool, job_order,
409 submit_runner_ram=kwargs.get("submit_runner_ram"),
411 merged_map=merged_map),
414 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
415 self.eval_timeout = kwargs.get("eval_timeout")
417 kwargs["make_fs_access"] = make_fs_access
418 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
419 kwargs["use_container"] = True
420 kwargs["tmpdir_prefix"] = "tmp"
421 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
423 if self.work_api == "containers":
424 if self.ignore_docker_for_reuse:
425 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
426 kwargs["outdir"] = "/var/spool/cwl"
427 kwargs["docker_outdir"] = "/var/spool/cwl"
428 kwargs["tmpdir"] = "/tmp"
429 kwargs["docker_tmpdir"] = "/tmp"
430 elif self.work_api == "jobs":
431 if kwargs["priority"] != DEFAULT_PRIORITY:
432 raise Exception("--priority not implemented for jobs API.")
433 kwargs["outdir"] = "$(task.outdir)"
434 kwargs["docker_outdir"] = "$(task.outdir)"
435 kwargs["tmpdir"] = "$(task.tmpdir)"
437 if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
438 raise Exception("--priority must be in the range 1..1000.")
441 if kwargs.get("submit"):
442 # Submit a runner job to run the workflow for us.
443 if self.work_api == "containers":
444 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
445 kwargs["runnerjob"] = tool.tool["id"]
446 runnerjob = tool.job(job_order,
447 self.output_callback,
450 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
453 submit_runner_ram=kwargs.get("submit_runner_ram"),
454 name=kwargs.get("name"),
455 on_error=kwargs.get("on_error"),
456 submit_runner_image=kwargs.get("submit_runner_image"),
457 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
458 merged_map=merged_map,
459 priority=kwargs.get("priority"),
460 secret_store=self.secret_store)
461 elif self.work_api == "jobs":
462 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
465 submit_runner_ram=kwargs.get("submit_runner_ram"),
466 name=kwargs.get("name"),
467 on_error=kwargs.get("on_error"),
468 submit_runner_image=kwargs.get("submit_runner_image"),
469 merged_map=merged_map)
470 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
471 # Create pipeline for local run
472 self.pipeline = self.api.pipeline_instances().create(
474 "owner_uuid": self.project_uuid,
475 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
477 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
478 logger.info("Pipeline instance %s", self.pipeline["uuid"])
480 if runnerjob and not kwargs.get("wait"):
481 runnerjob.run(wait=kwargs.get("wait"))
482 return (runnerjob.uuid, "success")
484 self.poll_api = arvados.api('v1')
485 self.polling_thread = threading.Thread(target=self.poll_states)
486 self.polling_thread.start()
489 jobiter = iter((runnerjob,))
491 if "cwl_runner_job" in kwargs:
492 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
493 jobiter = tool.job(job_order,
494 self.output_callback,
499 # Will continue to hold the lock for the duration of this code
500 # except when in cond.wait(), at which point on_message can update
501 # job state and process output callbacks.
503 loopperf = Perf(metrics, "jobiter")
505 for runnable in jobiter:
508 if self.stop_polling.is_set():
512 with Perf(metrics, "run"):
513 runnable.run(**kwargs)
518 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
523 while self.processes:
526 except UnsupportedRequirement:
529 if sys.exc_info()[0] is KeyboardInterrupt:
530 logger.error("Interrupted, marking pipeline as failed")
532 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
534 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
535 body={"state": "Failed"}).execute(num_retries=self.num_retries)
536 if runnerjob and runnerjob.uuid and self.work_api == "containers":
537 self.api.container_requests().update(uuid=runnerjob.uuid,
538 body={"priority": "0"}).execute(num_retries=self.num_retries)
541 self.stop_polling.set()
542 self.polling_thread.join()
544 if self.final_status == "UnsupportedRequirement":
545 raise UnsupportedRequirement("Check log for details.")
547 if self.final_output is None:
548 raise WorkflowException("Workflow did not return a result.")
550 if kwargs.get("submit") and isinstance(runnerjob, Runner):
551 logger.info("Final output collection %s", runnerjob.final_output)
553 if self.output_name is None:
554 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
555 if self.output_tags is None:
556 self.output_tags = ""
557 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
558 self.set_crunch_output()
560 if kwargs.get("compute_checksum"):
561 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
562 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
564 if self.trash_intermediate and self.final_status == "success":
565 self.trash_intermediate_output()
567 return (self.final_output, self.final_status)
571 """Print version string of key packages for provenance and debugging."""
573 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
574 arvpkg = pkg_resources.require("arvados-python-client")
575 cwlpkg = pkg_resources.require("cwltool")
577 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
578 "arvados-python-client", arvpkg[0].version,
579 "cwltool", cwlpkg[0].version)
582 def arg_parser(): # type: () -> argparse.ArgumentParser
583 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
585 parser.add_argument("--basedir", type=str,
586 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
587 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
588 help="Output directory, default current directory")
590 parser.add_argument("--eval-timeout",
591 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
595 exgroup = parser.add_mutually_exclusive_group()
596 exgroup.add_argument("--print-dot", action="store_true",
597 help="Print workflow visualization in graphviz format and exit")
598 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
599 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
601 exgroup = parser.add_mutually_exclusive_group()
602 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
603 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
604 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
606 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
608 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
610 exgroup = parser.add_mutually_exclusive_group()
611 exgroup.add_argument("--enable-reuse", action="store_true",
612 default=True, dest="enable_reuse",
613 help="Enable job or container reuse (default)")
614 exgroup.add_argument("--disable-reuse", action="store_false",
615 default=True, dest="enable_reuse",
616 help="Disable job or container reuse")
618 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
619 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
620 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
621 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
622 help="Ignore Docker image version when deciding whether to reuse past jobs.",
625 exgroup = parser.add_mutually_exclusive_group()
626 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
627 default=True, dest="submit")
628 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
629 default=True, dest="submit")
630 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
631 dest="create_workflow")
632 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
633 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
635 exgroup = parser.add_mutually_exclusive_group()
636 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
637 default=True, dest="wait")
638 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
639 default=True, dest="wait")
641 exgroup = parser.add_mutually_exclusive_group()
642 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
643 default=True, dest="log_timestamps")
644 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
645 default=True, dest="log_timestamps")
647 parser.add_argument("--api", type=str,
648 default=None, dest="work_api",
649 choices=("jobs", "containers"),
650 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
652 parser.add_argument("--compute-checksum", action="store_true", default=False,
653 help="Compute checksum of contents while collecting outputs",
654 dest="compute_checksum")
656 parser.add_argument("--submit-runner-ram", type=int,
657 help="RAM (in MiB) required for the workflow runner job (default 1024)",
660 parser.add_argument("--submit-runner-image", type=str,
661 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
664 parser.add_argument("--name", type=str,
665 help="Name to use for workflow execution instance.",
668 parser.add_argument("--on-error", type=str,
669 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
670 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
672 parser.add_argument("--enable-dev", action="store_true",
673 help="Enable loading and running development versions "
674 "of CWL spec.", default=False)
676 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
677 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
680 parser.add_argument("--priority", type=int,
681 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
682 default=DEFAULT_PRIORITY)
684 parser.add_argument("--disable-validate", dest="do_validate",
685 action="store_false", default=True,
686 help=argparse.SUPPRESS)
688 parser.add_argument("--disable-js-validation",
689 action="store_true", default=False,
690 help=argparse.SUPPRESS)
692 exgroup = parser.add_mutually_exclusive_group()
693 exgroup.add_argument("--trash-intermediate", action="store_true",
694 default=False, dest="trash_intermediate",
695 help="Immediately trash intermediate outputs on workflow success.")
696 exgroup.add_argument("--no-trash-intermediate", action="store_false",
697 default=False, dest="trash_intermediate",
698 help="Do not trash intermediate outputs (default).")
700 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
701 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
706 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
707 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
708 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
709 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
711 cwltool.process.supportedProcessRequirements.extend([
712 "http://arvados.org/cwl#RunInSingleContainer",
713 "http://arvados.org/cwl#OutputDirType",
714 "http://arvados.org/cwl#RuntimeConstraints",
715 "http://arvados.org/cwl#PartitionRequirement",
716 "http://arvados.org/cwl#APIRequirement",
717 "http://commonwl.org/cwltool#LoadListingRequirement",
718 "http://arvados.org/cwl#IntermediateOutput",
719 "http://arvados.org/cwl#ReuseRequirement"
722 def main(args, stdout, stderr, api_client=None, keep_client=None):
723 parser = arg_parser()
725 job_order_object = None
726 arvargs = parser.parse_args(args)
728 if arvargs.update_workflow:
729 if arvargs.update_workflow.find('-7fd4e-') == 5:
730 want_api = 'containers'
731 elif arvargs.update_workflow.find('-p5p6p-') == 5:
735 if want_api and arvargs.work_api and want_api != arvargs.work_api:
736 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
737 arvargs.update_workflow, want_api, arvargs.work_api))
739 arvargs.work_api = want_api
741 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
742 job_order_object = ({}, "")
747 if api_client is None:
748 api_client=arvados.api('v1', model=OrderedJsonModel())
749 if keep_client is None:
750 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
751 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
752 num_retries=4, output_name=arvargs.output_name,
753 output_tags=arvargs.output_tags)
754 except Exception as e:
759 logger.setLevel(logging.DEBUG)
760 logging.getLogger('arvados').setLevel(logging.DEBUG)
763 logger.setLevel(logging.WARN)
764 logging.getLogger('arvados').setLevel(logging.WARN)
765 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
768 metrics.setLevel(logging.DEBUG)
769 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
771 if arvargs.log_timestamps:
772 arvados.log_handler.setFormatter(logging.Formatter(
773 '%(asctime)s %(name)s %(levelname)s: %(message)s',
774 '%Y-%m-%d %H:%M:%S'))
776 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
778 arvargs.conformance_test = None
779 arvargs.use_container = True
780 arvargs.relax_path_checks = True
781 arvargs.print_supported_versions = False
783 make_fs_access = partial(CollectionFsAccess,
784 collection_cache=runner.collection_cache)
786 return cwltool.main.main(args=arvargs,
789 executor=runner.arv_executor,
790 makeTool=runner.arv_make_tool,
791 versionfunc=versionstring,
792 job_order_object=job_order_object,
793 make_fs_access=make_fs_access,
794 fetcher_constructor=partial(CollectionFetcher,
795 api_client=api_client,
796 fs_access=make_fs_access(""),
797 num_retries=runner.num_retries),
798 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
799 logger_handler=arvados.log_handler,
800 custom_schema_callback=add_arv_hints)