2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
21 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
53 arvados.log_handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s %(levelname)s: %(message)s',
57 DEFAULT_PRIORITY = 500
59 class ArvCwlRunner(object):
60 """Execute a CWL tool or workflow, submit work (using either jobs or
61 containers API), wait for them to complete, and report output.
65 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
68 self.lock = threading.Lock()
69 self.cond = threading.Condition(self.lock)
70 self.final_output = None
71 self.final_status = None
73 self.num_retries = num_retries
75 self.stop_polling = threading.Event()
78 self.final_output_collection = None
79 self.output_name = output_name
80 self.output_tags = output_tags
81 self.project_uuid = None
82 self.intermediate_output_ttl = 0
83 self.intermediate_output_collections = []
84 self.trash_intermediate = False
86 if keep_client is not None:
87 self.keep_client = keep_client
89 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
91 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
94 expected_api = ["jobs", "containers"]
95 for api in expected_api:
97 methods = self.api._rootDesc.get('resources')[api]['methods']
98 if ('httpMethod' in methods['create'] and
99 (work_api == api or work_api is None)):
105 if not self.work_api:
107 raise Exception("No supported APIs")
109 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
111 def arv_make_tool(self, toolpath_object, **kwargs):
112 kwargs["work_api"] = self.work_api
113 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
115 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116 num_retries=self.num_retries)
117 kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119 return ArvadosCommandTool(self, toolpath_object, **kwargs)
120 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121 return ArvadosWorkflow(self, toolpath_object, **kwargs)
123 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
125 def output_callback(self, out, processStatus):
126 if processStatus == "success":
127 logger.info("Overall process status is %s", processStatus)
129 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130 body={"state": "Complete"}).execute(num_retries=self.num_retries)
132 logger.warn("Overall process status is %s", processStatus)
134 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135 body={"state": "Failed"}).execute(num_retries=self.num_retries)
136 self.final_status = processStatus
137 self.final_output = out
139 def on_message(self, event):
140 if "object_uuid" in event:
141 if event["object_uuid"] in self.processes and event["event_type"] == "update":
142 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
143 uuid = event["object_uuid"]
145 j = self.processes[uuid]
146 logger.info("%s %s is Running", self.label(j), uuid)
148 j.update_pipeline_component(event["properties"]["new_attributes"])
149 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
150 uuid = event["object_uuid"]
153 j = self.processes[uuid]
154 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
155 with Perf(metrics, "done %s" % j.name):
156 j.done(event["properties"]["new_attributes"])
161 def label(self, obj):
162 return "[%s %s]" % (self.work_api[0:-1], obj.name)
164 def poll_states(self):
165 """Poll status of jobs or containers listed in the processes dict.
167 Runs in a separate thread.
172 self.stop_polling.wait(15)
173 if self.stop_polling.is_set():
176 keys = self.processes.keys()
180 if self.work_api == "containers":
181 table = self.poll_api.container_requests()
182 elif self.work_api == "jobs":
183 table = self.poll_api.jobs()
186 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
187 except Exception as e:
188 logger.warn("Error checking states on API server: %s", e)
191 for p in proc_states["items"]:
193 "object_uuid": p["uuid"],
194 "event_type": "update",
200 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
202 self.processes.clear()
206 self.stop_polling.set()
208 def get_uploaded(self):
209 return self.uploaded.copy()
211 def add_uploaded(self, src, pair):
212 self.uploaded[src] = pair
214 def add_intermediate_output(self, uuid):
216 self.intermediate_output_collections.append(uuid)
218 def trash_intermediate_output(self):
219 logger.info("Cleaning up intermediate output collections")
220 for i in self.intermediate_output_collections:
222 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
224 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
225 if sys.exc_info()[0] is KeyboardInterrupt:
228 def check_features(self, obj):
229 if isinstance(obj, dict):
230 if obj.get("writable") and self.work_api != "containers":
231 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
232 if obj.get("class") == "DockerRequirement":
233 if obj.get("dockerOutputDirectory"):
234 if self.work_api != "containers":
235 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
236 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
237 if not obj.get("dockerOutputDirectory").startswith('/'):
238 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
239 "Option 'dockerOutputDirectory' must be an absolute path.")
240 for v in obj.itervalues():
241 self.check_features(v)
242 elif isinstance(obj, list):
243 for i,v in enumerate(obj):
244 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
245 self.check_features(v)
247 def make_output_collection(self, name, tagsString, outputObj):
248 outputObj = copy.deepcopy(outputObj)
251 def capture(fileobj):
252 files.append(fileobj)
254 adjustDirObjs(outputObj, capture)
255 adjustFileObjs(outputObj, capture)
257 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
259 final = arvados.collection.Collection(api_client=self.api,
260 keep_client=self.keep_client,
261 num_retries=self.num_retries)
263 for k,v in generatemapper.items():
264 if k.startswith("_:"):
265 if v.type == "Directory":
267 if v.type == "CreateFile":
268 with final.open(v.target, "wb") as f:
269 f.write(v.resolved.encode("utf-8"))
272 if not k.startswith("keep:"):
273 raise Exception("Output source is not in keep or a literal")
275 srccollection = sp[0][5:]
277 reader = self.collection_cache.get(srccollection)
278 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
279 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
280 except arvados.errors.ArgumentError as e:
281 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
284 logger.warn("While preparing output collection: %s", e)
286 def rewrite(fileobj):
287 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
288 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
292 adjustDirObjs(outputObj, rewrite)
293 adjustFileObjs(outputObj, rewrite)
295 with final.open("cwl.output.json", "w") as f:
296 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
298 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
300 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
301 final.api_response()["name"],
302 final.manifest_locator())
304 final_uuid = final.manifest_locator()
305 tags = tagsString.split(',')
307 self.api.links().create(body={
308 "head_uuid": final_uuid, "link_class": "tag", "name": tag
309 }).execute(num_retries=self.num_retries)
311 def finalcollection(fileobj):
312 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
314 adjustDirObjs(outputObj, finalcollection)
315 adjustFileObjs(outputObj, finalcollection)
317 return (outputObj, final)
319 def set_crunch_output(self):
320 if self.work_api == "containers":
322 current = self.api.containers().current().execute(num_retries=self.num_retries)
323 except ApiError as e:
324 # Status code 404 just means we're not running in a container.
325 if e.resp.status != 404:
326 logger.info("Getting current container: %s", e)
329 self.api.containers().update(uuid=current['uuid'],
331 'output': self.final_output_collection.portable_data_hash(),
332 }).execute(num_retries=self.num_retries)
333 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
336 }).execute(num_retries=self.num_retries)
337 except Exception as e:
338 logger.info("Setting container output: %s", e)
339 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
340 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
342 'output': self.final_output_collection.portable_data_hash(),
343 'success': self.final_status == "success",
345 }).execute(num_retries=self.num_retries)
347 def arv_executor(self, tool, job_order, **kwargs):
348 self.debug = kwargs.get("debug")
350 tool.visit(self.check_features)
352 self.project_uuid = kwargs.get("project_uuid")
354 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
355 collection_cache=self.collection_cache)
356 self.fs_access = make_fs_access(kwargs["basedir"])
359 self.trash_intermediate = kwargs["trash_intermediate"]
360 if self.trash_intermediate and self.work_api != "containers":
361 raise Exception("--trash-intermediate is only supported with --api=containers.")
363 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
364 if self.intermediate_output_ttl and self.work_api != "containers":
365 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
366 if self.intermediate_output_ttl < 0:
367 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
369 if not kwargs.get("name"):
370 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
372 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
373 # Also uploads docker images.
374 merged_map = upload_workflow_deps(self, tool)
376 # Reload tool object which may have been updated by
377 # upload_workflow_deps
378 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
379 makeTool=self.arv_make_tool,
380 loader=tool.doc_loader,
381 avsc_names=tool.doc_schema,
382 metadata=tool.metadata)
384 # Upload local file references in the job order.
385 job_order = upload_job_order(self, "%s input" % kwargs["name"],
388 existing_uuid = kwargs.get("update_workflow")
389 if existing_uuid or kwargs.get("create_workflow"):
390 # Create a pipeline template or workflow record and exit.
391 if self.work_api == "jobs":
392 tmpl = RunnerTemplate(self, tool, job_order,
393 kwargs.get("enable_reuse"),
395 submit_runner_ram=kwargs.get("submit_runner_ram"),
397 merged_map=merged_map)
399 # cwltool.main will write our return value to stdout.
400 return (tmpl.uuid, "success")
401 elif self.work_api == "containers":
402 return (upload_workflow(self, tool, job_order,
405 submit_runner_ram=kwargs.get("submit_runner_ram"),
407 merged_map=merged_map),
410 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
411 self.eval_timeout = kwargs.get("eval_timeout")
413 kwargs["make_fs_access"] = make_fs_access
414 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
415 kwargs["use_container"] = True
416 kwargs["tmpdir_prefix"] = "tmp"
417 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
419 if self.work_api == "containers":
420 if self.ignore_docker_for_reuse:
421 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
422 kwargs["outdir"] = "/var/spool/cwl"
423 kwargs["docker_outdir"] = "/var/spool/cwl"
424 kwargs["tmpdir"] = "/tmp"
425 kwargs["docker_tmpdir"] = "/tmp"
426 elif self.work_api == "jobs":
427 if kwargs["priority"] != DEFAULT_PRIORITY:
428 raise Exception("--priority not implemented for jobs API.")
429 kwargs["outdir"] = "$(task.outdir)"
430 kwargs["docker_outdir"] = "$(task.outdir)"
431 kwargs["tmpdir"] = "$(task.tmpdir)"
433 if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
434 raise Exception("--priority must be in the range 1..1000.")
437 if kwargs.get("submit"):
438 # Submit a runner job to run the workflow for us.
439 if self.work_api == "containers":
440 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
441 kwargs["runnerjob"] = tool.tool["id"]
442 runnerjob = tool.job(job_order,
443 self.output_callback,
446 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
449 submit_runner_ram=kwargs.get("submit_runner_ram"),
450 name=kwargs.get("name"),
451 on_error=kwargs.get("on_error"),
452 submit_runner_image=kwargs.get("submit_runner_image"),
453 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
454 merged_map=merged_map,
455 priority=kwargs.get("priority"),
456 secret_store=kwargs.get("secret_store"))
457 elif self.work_api == "jobs":
458 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
461 submit_runner_ram=kwargs.get("submit_runner_ram"),
462 name=kwargs.get("name"),
463 on_error=kwargs.get("on_error"),
464 submit_runner_image=kwargs.get("submit_runner_image"),
465 merged_map=merged_map)
466 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
467 # Create pipeline for local run
468 self.pipeline = self.api.pipeline_instances().create(
470 "owner_uuid": self.project_uuid,
471 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
473 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
474 logger.info("Pipeline instance %s", self.pipeline["uuid"])
476 if runnerjob and not kwargs.get("wait"):
477 runnerjob.run(wait=kwargs.get("wait"))
478 return (runnerjob.uuid, "success")
480 self.poll_api = arvados.api('v1')
481 self.polling_thread = threading.Thread(target=self.poll_states)
482 self.polling_thread.start()
485 jobiter = iter((runnerjob,))
487 if "cwl_runner_job" in kwargs:
488 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
489 jobiter = tool.job(job_order,
490 self.output_callback,
495 # Will continue to hold the lock for the duration of this code
496 # except when in cond.wait(), at which point on_message can update
497 # job state and process output callbacks.
499 loopperf = Perf(metrics, "jobiter")
501 for runnable in jobiter:
504 if self.stop_polling.is_set():
508 with Perf(metrics, "run"):
509 runnable.run(**kwargs)
514 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
519 while self.processes:
522 except UnsupportedRequirement:
525 if sys.exc_info()[0] is KeyboardInterrupt:
526 logger.error("Interrupted, marking pipeline as failed")
528 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
530 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
531 body={"state": "Failed"}).execute(num_retries=self.num_retries)
532 if runnerjob and runnerjob.uuid and self.work_api == "containers":
533 self.api.container_requests().update(uuid=runnerjob.uuid,
534 body={"priority": "0"}).execute(num_retries=self.num_retries)
537 self.stop_polling.set()
538 self.polling_thread.join()
540 if self.final_status == "UnsupportedRequirement":
541 raise UnsupportedRequirement("Check log for details.")
543 if self.final_output is None:
544 raise WorkflowException("Workflow did not return a result.")
546 if kwargs.get("submit") and isinstance(runnerjob, Runner):
547 logger.info("Final output collection %s", runnerjob.final_output)
549 if self.output_name is None:
550 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
551 if self.output_tags is None:
552 self.output_tags = ""
553 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
554 self.set_crunch_output()
556 if kwargs.get("compute_checksum"):
557 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
558 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
560 if self.trash_intermediate and self.final_status == "success":
561 self.trash_intermediate_output()
563 return (self.final_output, self.final_status)
567 """Print version string of key packages for provenance and debugging."""
569 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
570 arvpkg = pkg_resources.require("arvados-python-client")
571 cwlpkg = pkg_resources.require("cwltool")
573 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
574 "arvados-python-client", arvpkg[0].version,
575 "cwltool", cwlpkg[0].version)
578 def arg_parser(): # type: () -> argparse.ArgumentParser
579 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
581 parser.add_argument("--basedir", type=str,
582 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
583 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
584 help="Output directory, default current directory")
586 parser.add_argument("--eval-timeout",
587 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
591 exgroup = parser.add_mutually_exclusive_group()
592 exgroup.add_argument("--print-dot", action="store_true",
593 help="Print workflow visualization in graphviz format and exit")
594 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
595 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
597 exgroup = parser.add_mutually_exclusive_group()
598 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
599 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
600 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
602 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
604 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
606 exgroup = parser.add_mutually_exclusive_group()
607 exgroup.add_argument("--enable-reuse", action="store_true",
608 default=True, dest="enable_reuse",
609 help="Enable job or container reuse (default)")
610 exgroup.add_argument("--disable-reuse", action="store_false",
611 default=True, dest="enable_reuse",
612 help="Disable job or container reuse")
614 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
615 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
616 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
617 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
618 help="Ignore Docker image version when deciding whether to reuse past jobs.",
621 exgroup = parser.add_mutually_exclusive_group()
622 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
623 default=True, dest="submit")
624 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
625 default=True, dest="submit")
626 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
627 dest="create_workflow")
628 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
629 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
631 exgroup = parser.add_mutually_exclusive_group()
632 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
633 default=True, dest="wait")
634 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
635 default=True, dest="wait")
637 exgroup = parser.add_mutually_exclusive_group()
638 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
639 default=True, dest="log_timestamps")
640 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
641 default=True, dest="log_timestamps")
643 parser.add_argument("--api", type=str,
644 default=None, dest="work_api",
645 choices=("jobs", "containers"),
646 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
648 parser.add_argument("--compute-checksum", action="store_true", default=False,
649 help="Compute checksum of contents while collecting outputs",
650 dest="compute_checksum")
652 parser.add_argument("--submit-runner-ram", type=int,
653 help="RAM (in MiB) required for the workflow runner job (default 1024)",
656 parser.add_argument("--submit-runner-image", type=str,
657 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
660 parser.add_argument("--name", type=str,
661 help="Name to use for workflow execution instance.",
664 parser.add_argument("--on-error", type=str,
665 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
666 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
668 parser.add_argument("--enable-dev", action="store_true",
669 help="Enable loading and running development versions "
670 "of CWL spec.", default=False)
672 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
673 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
676 parser.add_argument("--priority", type=int,
677 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
678 default=DEFAULT_PRIORITY)
680 exgroup = parser.add_mutually_exclusive_group()
681 exgroup.add_argument("--trash-intermediate", action="store_true",
682 default=False, dest="trash_intermediate",
683 help="Immediately trash intermediate outputs on workflow success.")
684 exgroup.add_argument("--no-trash-intermediate", action="store_false",
685 default=False, dest="trash_intermediate",
686 help="Do not trash intermediate outputs (default).")
688 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
689 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
694 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
695 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
696 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
697 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
699 cwltool.process.supportedProcessRequirements.extend([
700 "http://arvados.org/cwl#RunInSingleContainer",
701 "http://arvados.org/cwl#OutputDirType",
702 "http://arvados.org/cwl#RuntimeConstraints",
703 "http://arvados.org/cwl#PartitionRequirement",
704 "http://arvados.org/cwl#APIRequirement",
705 "http://commonwl.org/cwltool#LoadListingRequirement",
706 "http://arvados.org/cwl#IntermediateOutput",
707 "http://arvados.org/cwl#ReuseRequirement"
710 def main(args, stdout, stderr, api_client=None, keep_client=None):
711 parser = arg_parser()
713 job_order_object = None
714 arvargs = parser.parse_args(args)
716 if arvargs.update_workflow:
717 if arvargs.update_workflow.find('-7fd4e-') == 5:
718 want_api = 'containers'
719 elif arvargs.update_workflow.find('-p5p6p-') == 5:
723 if want_api and arvargs.work_api and want_api != arvargs.work_api:
724 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
725 arvargs.update_workflow, want_api, arvargs.work_api))
727 arvargs.work_api = want_api
729 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
730 job_order_object = ({}, "")
735 if api_client is None:
736 api_client=arvados.api('v1', model=OrderedJsonModel())
737 if keep_client is None:
738 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
739 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
740 num_retries=4, output_name=arvargs.output_name,
741 output_tags=arvargs.output_tags)
742 except Exception as e:
747 logger.setLevel(logging.DEBUG)
748 logging.getLogger('arvados').setLevel(logging.DEBUG)
751 logger.setLevel(logging.WARN)
752 logging.getLogger('arvados').setLevel(logging.WARN)
753 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
756 metrics.setLevel(logging.DEBUG)
757 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
759 if arvargs.log_timestamps:
760 arvados.log_handler.setFormatter(logging.Formatter(
761 '%(asctime)s %(name)s %(levelname)s: %(message)s',
762 '%Y-%m-%d %H:%M:%S'))
764 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
766 arvargs.conformance_test = None
767 arvargs.use_container = True
768 arvargs.relax_path_checks = True
769 arvargs.print_supported_versions = False
771 make_fs_access = partial(CollectionFsAccess,
772 collection_cache=runner.collection_cache)
774 return cwltool.main.main(args=arvargs,
777 executor=runner.arv_executor,
778 makeTool=runner.arv_make_tool,
779 versionfunc=versionstring,
780 job_order_object=job_order_object,
781 make_fs_access=make_fs_access,
782 fetcher_constructor=partial(CollectionFetcher,
783 api_client=api_client,
784 fs_access=make_fs_access(""),
785 num_retries=runner.num_retries),
786 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
787 logger_handler=arvados.log_handler,
788 custom_schema_callback=add_arv_hints)