2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
21 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
53 arvados.log_handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s %(levelname)s: %(message)s',
57 DEFAULT_PRIORITY = 500
59 class ArvCwlRunner(object):
60 """Execute a CWL tool or workflow, submit work (using either jobs or
61 containers API), wait for them to complete, and report output.
65 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
68 self.lock = threading.Lock()
69 self.cond = threading.Condition(self.lock)
70 self.final_output = None
71 self.final_status = None
73 self.num_retries = num_retries
75 self.stop_polling = threading.Event()
78 self.final_output_collection = None
79 self.output_name = output_name
80 self.output_tags = output_tags
81 self.project_uuid = None
82 self.intermediate_output_ttl = 0
83 self.intermediate_output_collections = []
84 self.trash_intermediate = False
86 if keep_client is not None:
87 self.keep_client = keep_client
89 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
91 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
94 expected_api = ["jobs", "containers"]
95 for api in expected_api:
97 methods = self.api._rootDesc.get('resources')[api]['methods']
98 if ('httpMethod' in methods['create'] and
99 (work_api == api or work_api is None)):
105 if not self.work_api:
107 raise Exception("No supported APIs")
109 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
111 def arv_make_tool(self, toolpath_object, **kwargs):
112 kwargs["work_api"] = self.work_api
113 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
115 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116 num_retries=self.num_retries)
117 kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119 return ArvadosCommandTool(self, toolpath_object, **kwargs)
120 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121 return ArvadosWorkflow(self, toolpath_object, **kwargs)
123 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
125 def output_callback(self, out, processStatus):
126 if processStatus == "success":
127 logger.info("Overall process status is %s", processStatus)
129 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130 body={"state": "Complete"}).execute(num_retries=self.num_retries)
132 logger.warn("Overall process status is %s", processStatus)
134 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135 body={"state": "Failed"}).execute(num_retries=self.num_retries)
136 self.final_status = processStatus
137 self.final_output = out
139 def on_message(self, event):
140 if "object_uuid" in event:
141 if event["object_uuid"] in self.processes and event["event_type"] == "update":
142 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
143 uuid = event["object_uuid"]
145 j = self.processes[uuid]
146 logger.info("%s %s is Running", self.label(j), uuid)
148 j.update_pipeline_component(event["properties"]["new_attributes"])
149 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
150 uuid = event["object_uuid"]
153 j = self.processes[uuid]
154 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
155 with Perf(metrics, "done %s" % j.name):
156 j.done(event["properties"]["new_attributes"])
161 def label(self, obj):
162 return "[%s %s]" % (self.work_api[0:-1], obj.name)
164 def poll_states(self):
165 """Poll status of jobs or containers listed in the processes dict.
167 Runs in a separate thread.
172 self.stop_polling.wait(15)
173 if self.stop_polling.is_set():
176 keys = self.processes.keys()
180 if self.work_api == "containers":
181 table = self.poll_api.container_requests()
182 elif self.work_api == "jobs":
183 table = self.poll_api.jobs()
186 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
187 except Exception as e:
188 logger.warn("Error checking states on API server: %s", e)
191 for p in proc_states["items"]:
193 "object_uuid": p["uuid"],
194 "event_type": "update",
200 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
202 self.processes.clear()
206 self.stop_polling.set()
208 def get_uploaded(self):
209 return self.uploaded.copy()
211 def add_uploaded(self, src, pair):
212 self.uploaded[src] = pair
214 def add_intermediate_output(self, uuid):
216 self.intermediate_output_collections.append(uuid)
218 def trash_intermediate_output(self):
219 logger.info("Cleaning up intermediate output collections")
220 for i in self.intermediate_output_collections:
222 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
224 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
225 if sys.exc_info()[0] is KeyboardInterrupt:
228 def check_features(self, obj):
229 if isinstance(obj, dict):
230 if obj.get("writable") and self.work_api != "containers":
231 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
232 if obj.get("class") == "DockerRequirement":
233 if obj.get("dockerOutputDirectory"):
234 if self.work_api != "containers":
235 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
236 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
237 if not obj.get("dockerOutputDirectory").startswith('/'):
238 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
239 "Option 'dockerOutputDirectory' must be an absolute path.")
240 if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
241 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
242 for v in obj.itervalues():
243 self.check_features(v)
244 elif isinstance(obj, list):
245 for i,v in enumerate(obj):
246 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
247 self.check_features(v)
249 def make_output_collection(self, name, tagsString, outputObj):
250 outputObj = copy.deepcopy(outputObj)
253 def capture(fileobj):
254 files.append(fileobj)
256 adjustDirObjs(outputObj, capture)
257 adjustFileObjs(outputObj, capture)
259 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
261 final = arvados.collection.Collection(api_client=self.api,
262 keep_client=self.keep_client,
263 num_retries=self.num_retries)
265 for k,v in generatemapper.items():
266 if k.startswith("_:"):
267 if v.type == "Directory":
269 if v.type == "CreateFile":
270 with final.open(v.target, "wb") as f:
271 f.write(v.resolved.encode("utf-8"))
274 if not k.startswith("keep:"):
275 raise Exception("Output source is not in keep or a literal")
277 srccollection = sp[0][5:]
279 reader = self.collection_cache.get(srccollection)
280 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
281 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
282 except arvados.errors.ArgumentError as e:
283 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
286 logger.warn("While preparing output collection: %s", e)
288 def rewrite(fileobj):
289 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
290 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
294 adjustDirObjs(outputObj, rewrite)
295 adjustFileObjs(outputObj, rewrite)
297 with final.open("cwl.output.json", "w") as f:
298 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
300 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
302 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
303 final.api_response()["name"],
304 final.manifest_locator())
306 final_uuid = final.manifest_locator()
307 tags = tagsString.split(',')
309 self.api.links().create(body={
310 "head_uuid": final_uuid, "link_class": "tag", "name": tag
311 }).execute(num_retries=self.num_retries)
313 def finalcollection(fileobj):
314 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
316 adjustDirObjs(outputObj, finalcollection)
317 adjustFileObjs(outputObj, finalcollection)
319 return (outputObj, final)
321 def set_crunch_output(self):
322 if self.work_api == "containers":
324 current = self.api.containers().current().execute(num_retries=self.num_retries)
325 except ApiError as e:
326 # Status code 404 just means we're not running in a container.
327 if e.resp.status != 404:
328 logger.info("Getting current container: %s", e)
331 self.api.containers().update(uuid=current['uuid'],
333 'output': self.final_output_collection.portable_data_hash(),
334 }).execute(num_retries=self.num_retries)
335 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
338 }).execute(num_retries=self.num_retries)
339 except Exception as e:
340 logger.info("Setting container output: %s", e)
341 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
342 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
344 'output': self.final_output_collection.portable_data_hash(),
345 'success': self.final_status == "success",
347 }).execute(num_retries=self.num_retries)
349 def arv_executor(self, tool, job_order, **kwargs):
350 self.debug = kwargs.get("debug")
352 tool.visit(self.check_features)
354 self.project_uuid = kwargs.get("project_uuid")
356 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
357 collection_cache=self.collection_cache)
358 self.fs_access = make_fs_access(kwargs["basedir"])
359 self.secret_store = kwargs.get("secret_store")
361 self.trash_intermediate = kwargs["trash_intermediate"]
362 if self.trash_intermediate and self.work_api != "containers":
363 raise Exception("--trash-intermediate is only supported with --api=containers.")
365 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
366 if self.intermediate_output_ttl and self.work_api != "containers":
367 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
368 if self.intermediate_output_ttl < 0:
369 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
371 if not kwargs.get("name"):
372 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
374 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
375 # Also uploads docker images.
376 merged_map = upload_workflow_deps(self, tool)
378 # Reload tool object which may have been updated by
379 # upload_workflow_deps
380 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
381 makeTool=self.arv_make_tool,
382 loader=tool.doc_loader,
383 avsc_names=tool.doc_schema,
384 metadata=tool.metadata)
386 # Upload local file references in the job order.
387 job_order = upload_job_order(self, "%s input" % kwargs["name"],
390 existing_uuid = kwargs.get("update_workflow")
391 if existing_uuid or kwargs.get("create_workflow"):
392 # Create a pipeline template or workflow record and exit.
393 if self.work_api == "jobs":
394 tmpl = RunnerTemplate(self, tool, job_order,
395 kwargs.get("enable_reuse"),
397 submit_runner_ram=kwargs.get("submit_runner_ram"),
399 merged_map=merged_map)
401 # cwltool.main will write our return value to stdout.
402 return (tmpl.uuid, "success")
403 elif self.work_api == "containers":
404 return (upload_workflow(self, tool, job_order,
407 submit_runner_ram=kwargs.get("submit_runner_ram"),
409 merged_map=merged_map),
412 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
413 self.eval_timeout = kwargs.get("eval_timeout")
415 kwargs["make_fs_access"] = make_fs_access
416 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
417 kwargs["use_container"] = True
418 kwargs["tmpdir_prefix"] = "tmp"
419 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
421 if self.work_api == "containers":
422 if self.ignore_docker_for_reuse:
423 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
424 kwargs["outdir"] = "/var/spool/cwl"
425 kwargs["docker_outdir"] = "/var/spool/cwl"
426 kwargs["tmpdir"] = "/tmp"
427 kwargs["docker_tmpdir"] = "/tmp"
428 elif self.work_api == "jobs":
429 if kwargs["priority"] != DEFAULT_PRIORITY:
430 raise Exception("--priority not implemented for jobs API.")
431 kwargs["outdir"] = "$(task.outdir)"
432 kwargs["docker_outdir"] = "$(task.outdir)"
433 kwargs["tmpdir"] = "$(task.tmpdir)"
435 if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
436 raise Exception("--priority must be in the range 1..1000.")
439 if kwargs.get("submit"):
440 # Submit a runner job to run the workflow for us.
441 if self.work_api == "containers":
442 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
443 kwargs["runnerjob"] = tool.tool["id"]
444 runnerjob = tool.job(job_order,
445 self.output_callback,
448 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
451 submit_runner_ram=kwargs.get("submit_runner_ram"),
452 name=kwargs.get("name"),
453 on_error=kwargs.get("on_error"),
454 submit_runner_image=kwargs.get("submit_runner_image"),
455 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
456 merged_map=merged_map,
457 priority=kwargs.get("priority"),
458 secret_store=self.secret_store)
459 elif self.work_api == "jobs":
460 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
463 submit_runner_ram=kwargs.get("submit_runner_ram"),
464 name=kwargs.get("name"),
465 on_error=kwargs.get("on_error"),
466 submit_runner_image=kwargs.get("submit_runner_image"),
467 merged_map=merged_map)
468 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
469 # Create pipeline for local run
470 self.pipeline = self.api.pipeline_instances().create(
472 "owner_uuid": self.project_uuid,
473 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
475 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
476 logger.info("Pipeline instance %s", self.pipeline["uuid"])
478 if runnerjob and not kwargs.get("wait"):
479 runnerjob.run(wait=kwargs.get("wait"))
480 return (runnerjob.uuid, "success")
482 self.poll_api = arvados.api('v1')
483 self.polling_thread = threading.Thread(target=self.poll_states)
484 self.polling_thread.start()
487 jobiter = iter((runnerjob,))
489 if "cwl_runner_job" in kwargs:
490 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
491 jobiter = tool.job(job_order,
492 self.output_callback,
497 # Will continue to hold the lock for the duration of this code
498 # except when in cond.wait(), at which point on_message can update
499 # job state and process output callbacks.
501 loopperf = Perf(metrics, "jobiter")
503 for runnable in jobiter:
506 if self.stop_polling.is_set():
510 with Perf(metrics, "run"):
511 runnable.run(**kwargs)
516 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
521 while self.processes:
524 except UnsupportedRequirement:
527 if sys.exc_info()[0] is KeyboardInterrupt:
528 logger.error("Interrupted, marking pipeline as failed")
530 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
532 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
533 body={"state": "Failed"}).execute(num_retries=self.num_retries)
534 if runnerjob and runnerjob.uuid and self.work_api == "containers":
535 self.api.container_requests().update(uuid=runnerjob.uuid,
536 body={"priority": "0"}).execute(num_retries=self.num_retries)
539 self.stop_polling.set()
540 self.polling_thread.join()
542 if self.final_status == "UnsupportedRequirement":
543 raise UnsupportedRequirement("Check log for details.")
545 if self.final_output is None:
546 raise WorkflowException("Workflow did not return a result.")
548 if kwargs.get("submit") and isinstance(runnerjob, Runner):
549 logger.info("Final output collection %s", runnerjob.final_output)
551 if self.output_name is None:
552 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
553 if self.output_tags is None:
554 self.output_tags = ""
555 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
556 self.set_crunch_output()
558 if kwargs.get("compute_checksum"):
559 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
560 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
562 if self.trash_intermediate and self.final_status == "success":
563 self.trash_intermediate_output()
565 return (self.final_output, self.final_status)
569 """Print version string of key packages for provenance and debugging."""
571 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
572 arvpkg = pkg_resources.require("arvados-python-client")
573 cwlpkg = pkg_resources.require("cwltool")
575 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
576 "arvados-python-client", arvpkg[0].version,
577 "cwltool", cwlpkg[0].version)
580 def arg_parser(): # type: () -> argparse.ArgumentParser
581 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
583 parser.add_argument("--basedir", type=str,
584 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
585 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
586 help="Output directory, default current directory")
588 parser.add_argument("--eval-timeout",
589 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
593 exgroup = parser.add_mutually_exclusive_group()
594 exgroup.add_argument("--print-dot", action="store_true",
595 help="Print workflow visualization in graphviz format and exit")
596 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
597 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
599 exgroup = parser.add_mutually_exclusive_group()
600 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
601 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
602 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
604 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
606 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
608 exgroup = parser.add_mutually_exclusive_group()
609 exgroup.add_argument("--enable-reuse", action="store_true",
610 default=True, dest="enable_reuse",
611 help="Enable job or container reuse (default)")
612 exgroup.add_argument("--disable-reuse", action="store_false",
613 default=True, dest="enable_reuse",
614 help="Disable job or container reuse")
616 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
617 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
618 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
619 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
620 help="Ignore Docker image version when deciding whether to reuse past jobs.",
623 exgroup = parser.add_mutually_exclusive_group()
624 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
625 default=True, dest="submit")
626 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
627 default=True, dest="submit")
628 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
629 dest="create_workflow")
630 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
631 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
633 exgroup = parser.add_mutually_exclusive_group()
634 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
635 default=True, dest="wait")
636 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
637 default=True, dest="wait")
639 exgroup = parser.add_mutually_exclusive_group()
640 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
641 default=True, dest="log_timestamps")
642 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
643 default=True, dest="log_timestamps")
645 parser.add_argument("--api", type=str,
646 default=None, dest="work_api",
647 choices=("jobs", "containers"),
648 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
650 parser.add_argument("--compute-checksum", action="store_true", default=False,
651 help="Compute checksum of contents while collecting outputs",
652 dest="compute_checksum")
654 parser.add_argument("--submit-runner-ram", type=int,
655 help="RAM (in MiB) required for the workflow runner job (default 1024)",
658 parser.add_argument("--submit-runner-image", type=str,
659 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
662 parser.add_argument("--name", type=str,
663 help="Name to use for workflow execution instance.",
666 parser.add_argument("--on-error", type=str,
667 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
668 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
670 parser.add_argument("--enable-dev", action="store_true",
671 help="Enable loading and running development versions "
672 "of CWL spec.", default=False)
674 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
675 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
678 parser.add_argument("--priority", type=int,
679 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
680 default=DEFAULT_PRIORITY)
682 exgroup = parser.add_mutually_exclusive_group()
683 exgroup.add_argument("--trash-intermediate", action="store_true",
684 default=False, dest="trash_intermediate",
685 help="Immediately trash intermediate outputs on workflow success.")
686 exgroup.add_argument("--no-trash-intermediate", action="store_false",
687 default=False, dest="trash_intermediate",
688 help="Do not trash intermediate outputs (default).")
690 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
691 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
696 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
697 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
698 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
699 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
701 cwltool.process.supportedProcessRequirements.extend([
702 "http://arvados.org/cwl#RunInSingleContainer",
703 "http://arvados.org/cwl#OutputDirType",
704 "http://arvados.org/cwl#RuntimeConstraints",
705 "http://arvados.org/cwl#PartitionRequirement",
706 "http://arvados.org/cwl#APIRequirement",
707 "http://commonwl.org/cwltool#LoadListingRequirement",
708 "http://arvados.org/cwl#IntermediateOutput",
709 "http://arvados.org/cwl#ReuseRequirement"
712 def main(args, stdout, stderr, api_client=None, keep_client=None):
713 parser = arg_parser()
715 job_order_object = None
716 arvargs = parser.parse_args(args)
718 if arvargs.update_workflow:
719 if arvargs.update_workflow.find('-7fd4e-') == 5:
720 want_api = 'containers'
721 elif arvargs.update_workflow.find('-p5p6p-') == 5:
725 if want_api and arvargs.work_api and want_api != arvargs.work_api:
726 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
727 arvargs.update_workflow, want_api, arvargs.work_api))
729 arvargs.work_api = want_api
731 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
732 job_order_object = ({}, "")
737 if api_client is None:
738 api_client=arvados.api('v1', model=OrderedJsonModel())
739 if keep_client is None:
740 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
741 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
742 num_retries=4, output_name=arvargs.output_name,
743 output_tags=arvargs.output_tags)
744 except Exception as e:
749 logger.setLevel(logging.DEBUG)
750 logging.getLogger('arvados').setLevel(logging.DEBUG)
753 logger.setLevel(logging.WARN)
754 logging.getLogger('arvados').setLevel(logging.WARN)
755 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
758 metrics.setLevel(logging.DEBUG)
759 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
761 if arvargs.log_timestamps:
762 arvados.log_handler.setFormatter(logging.Formatter(
763 '%(asctime)s %(name)s %(levelname)s: %(message)s',
764 '%Y-%m-%d %H:%M:%S'))
766 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
768 arvargs.conformance_test = None
769 arvargs.use_container = True
770 arvargs.relax_path_checks = True
771 arvargs.print_supported_versions = False
773 make_fs_access = partial(CollectionFsAccess,
774 collection_cache=runner.collection_cache)
776 return cwltool.main.main(args=arvargs,
779 executor=runner.arv_executor,
780 makeTool=runner.arv_make_tool,
781 versionfunc=versionstring,
782 job_order_object=job_order_object,
783 make_fs_access=make_fs_access,
784 fetcher_constructor=partial(CollectionFetcher,
785 api_client=api_client,
786 fs_access=make_fs_access(""),
787 num_retries=runner.num_retries),
788 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
789 logger_handler=arvados.log_handler,
790 custom_schema_callback=add_arv_hints)