2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
21 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
32 from .arvcontainer import ArvadosContainer, RunnerContainer
33 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
34 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
35 from .arvtool import ArvadosCommandTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from ._version import __version__
42 from cwltool.pack import pack
43 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
44 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
45 from cwltool.draft2tool import compute_checksums
46 from arvados.api import OrderedJsonModel
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50 logger.setLevel(logging.INFO)
52 arvados.log_handler.setFormatter(logging.Formatter(
53 '%(asctime)s %(name)s %(levelname)s: %(message)s',
56 class ArvCwlRunner(object):
57 """Execute a CWL tool or workflow, submit work (using either jobs or
58 containers API), wait for them to complete, and report output.
62 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
65 self.lock = threading.Lock()
66 self.cond = threading.Condition(self.lock)
67 self.final_output = None
68 self.final_status = None
70 self.num_retries = num_retries
72 self.stop_polling = threading.Event()
75 self.final_output_collection = None
76 self.output_name = output_name
77 self.output_tags = output_tags
78 self.project_uuid = None
79 self.intermediate_output_ttl = 0
80 self.intermediate_output_collections = []
81 self.trash_intermediate = False
83 if keep_client is not None:
84 self.keep_client = keep_client
86 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
88 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
91 expected_api = ["jobs", "containers"]
92 for api in expected_api:
94 methods = self.api._rootDesc.get('resources')[api]['methods']
95 if ('httpMethod' in methods['create'] and
96 (work_api == api or work_api is None)):
102 if not self.work_api:
104 raise Exception("No supported APIs")
106 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
108 def arv_make_tool(self, toolpath_object, **kwargs):
109 kwargs["work_api"] = self.work_api
110 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
112 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
113 num_retries=self.num_retries)
114 kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
115 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
116 return ArvadosCommandTool(self, toolpath_object, **kwargs)
117 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
118 return ArvadosWorkflow(self, toolpath_object, **kwargs)
120 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
122 def output_callback(self, out, processStatus):
123 if processStatus == "success":
124 logger.info("Overall process status is %s", processStatus)
126 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
127 body={"state": "Complete"}).execute(num_retries=self.num_retries)
129 logger.warn("Overall process status is %s", processStatus)
131 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
132 body={"state": "Failed"}).execute(num_retries=self.num_retries)
133 self.final_status = processStatus
134 self.final_output = out
136 def on_message(self, event):
137 if "object_uuid" in event:
138 if event["object_uuid"] in self.processes and event["event_type"] == "update":
139 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
140 uuid = event["object_uuid"]
142 j = self.processes[uuid]
143 logger.info("%s %s is Running", self.label(j), uuid)
145 j.update_pipeline_component(event["properties"]["new_attributes"])
146 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
147 uuid = event["object_uuid"]
150 j = self.processes[uuid]
151 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
152 with Perf(metrics, "done %s" % j.name):
153 j.done(event["properties"]["new_attributes"])
158 def label(self, obj):
159 return "[%s %s]" % (self.work_api[0:-1], obj.name)
161 def poll_states(self):
162 """Poll status of jobs or containers listed in the processes dict.
164 Runs in a separate thread.
169 self.stop_polling.wait(15)
170 if self.stop_polling.is_set():
173 keys = self.processes.keys()
177 if self.work_api == "containers":
178 table = self.poll_api.container_requests()
179 elif self.work_api == "jobs":
180 table = self.poll_api.jobs()
183 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
184 except Exception as e:
185 logger.warn("Error checking states on API server: %s", e)
188 for p in proc_states["items"]:
190 "object_uuid": p["uuid"],
191 "event_type": "update",
197 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
199 self.processes.clear()
203 self.stop_polling.set()
205 def get_uploaded(self):
206 return self.uploaded.copy()
208 def add_uploaded(self, src, pair):
209 self.uploaded[src] = pair
211 def add_intermediate_output(self, uuid):
213 self.intermediate_output_collections.append(uuid)
215 def trash_intermediate_output(self):
216 logger.info("Cleaning up intermediate output collections")
217 for i in self.intermediate_output_collections:
219 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
221 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
222 if sys.exc_info()[0] is KeyboardInterrupt:
225 def check_features(self, obj):
226 if isinstance(obj, dict):
227 if obj.get("writable") and self.work_api != "containers":
228 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
229 if obj.get("class") == "DockerRequirement":
230 if obj.get("dockerOutputDirectory"):
231 if self.work_api != "containers":
232 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
233 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
234 if not obj.get("dockerOutputDirectory").startswith('/'):
235 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
236 "Option 'dockerOutputDirectory' must be an absolute path.")
237 for v in obj.itervalues():
238 self.check_features(v)
239 elif isinstance(obj, list):
240 for i,v in enumerate(obj):
241 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
242 self.check_features(v)
244 def make_output_collection(self, name, tagsString, outputObj):
245 outputObj = copy.deepcopy(outputObj)
248 def capture(fileobj):
249 files.append(fileobj)
251 adjustDirObjs(outputObj, capture)
252 adjustFileObjs(outputObj, capture)
254 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
256 final = arvados.collection.Collection(api_client=self.api,
257 keep_client=self.keep_client,
258 num_retries=self.num_retries)
260 for k,v in generatemapper.items():
261 if k.startswith("_:"):
262 if v.type == "Directory":
264 if v.type == "CreateFile":
265 with final.open(v.target, "wb") as f:
266 f.write(v.resolved.encode("utf-8"))
269 if not k.startswith("keep:"):
270 raise Exception("Output source is not in keep or a literal")
272 srccollection = sp[0][5:]
274 reader = self.collection_cache.get(srccollection)
275 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
276 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
277 except arvados.errors.ArgumentError as e:
278 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
281 logger.warn("While preparing output collection: %s", e)
283 def rewrite(fileobj):
284 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
285 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
289 adjustDirObjs(outputObj, rewrite)
290 adjustFileObjs(outputObj, rewrite)
292 with final.open("cwl.output.json", "w") as f:
293 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
295 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
297 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
298 final.api_response()["name"],
299 final.manifest_locator())
301 final_uuid = final.manifest_locator()
302 tags = tagsString.split(',')
304 self.api.links().create(body={
305 "head_uuid": final_uuid, "link_class": "tag", "name": tag
306 }).execute(num_retries=self.num_retries)
308 def finalcollection(fileobj):
309 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
311 adjustDirObjs(outputObj, finalcollection)
312 adjustFileObjs(outputObj, finalcollection)
314 return (outputObj, final)
316 def set_crunch_output(self):
317 if self.work_api == "containers":
319 current = self.api.containers().current().execute(num_retries=self.num_retries)
320 except ApiError as e:
321 # Status code 404 just means we're not running in a container.
322 if e.resp.status != 404:
323 logger.info("Getting current container: %s", e)
326 self.api.containers().update(uuid=current['uuid'],
328 'output': self.final_output_collection.portable_data_hash(),
329 }).execute(num_retries=self.num_retries)
330 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
333 }).execute(num_retries=self.num_retries)
334 except Exception as e:
335 logger.info("Setting container output: %s", e)
336 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
337 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
339 'output': self.final_output_collection.portable_data_hash(),
340 'success': self.final_status == "success",
342 }).execute(num_retries=self.num_retries)
344 def arv_executor(self, tool, job_order, **kwargs):
345 self.debug = kwargs.get("debug")
347 tool.visit(self.check_features)
349 self.project_uuid = kwargs.get("project_uuid")
351 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
352 collection_cache=self.collection_cache)
353 self.fs_access = make_fs_access(kwargs["basedir"])
356 self.trash_intermediate = kwargs["trash_intermediate"]
357 if self.trash_intermediate and self.work_api != "containers":
358 raise Exception("--trash-intermediate is only supported with --api=containers.")
360 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
361 if self.intermediate_output_ttl and self.work_api != "containers":
362 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
363 if self.intermediate_output_ttl < 0:
364 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
366 if not kwargs.get("name"):
367 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
369 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
370 # Also uploads docker images.
371 merged_map = upload_workflow_deps(self, tool)
373 # Reload tool object which may have been updated by
374 # upload_workflow_deps
375 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
376 makeTool=self.arv_make_tool,
377 loader=tool.doc_loader,
378 avsc_names=tool.doc_schema,
379 metadata=tool.metadata)
381 # Upload local file references in the job order.
382 job_order = upload_job_order(self, "%s input" % kwargs["name"],
385 existing_uuid = kwargs.get("update_workflow")
386 if existing_uuid or kwargs.get("create_workflow"):
387 # Create a pipeline template or workflow record and exit.
388 if self.work_api == "jobs":
389 tmpl = RunnerTemplate(self, tool, job_order,
390 kwargs.get("enable_reuse"),
392 submit_runner_ram=kwargs.get("submit_runner_ram"),
394 merged_map=merged_map)
396 # cwltool.main will write our return value to stdout.
397 return (tmpl.uuid, "success")
398 elif self.work_api == "containers":
399 return (upload_workflow(self, tool, job_order,
402 submit_runner_ram=kwargs.get("submit_runner_ram"),
404 merged_map=merged_map),
407 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
408 self.eval_timeout = kwargs.get("eval_timeout")
410 kwargs["make_fs_access"] = make_fs_access
411 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
412 kwargs["use_container"] = True
413 kwargs["tmpdir_prefix"] = "tmp"
414 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
416 if self.work_api == "containers":
417 if self.ignore_docker_for_reuse:
418 raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
419 kwargs["outdir"] = "/var/spool/cwl"
420 kwargs["docker_outdir"] = "/var/spool/cwl"
421 kwargs["tmpdir"] = "/tmp"
422 kwargs["docker_tmpdir"] = "/tmp"
423 elif self.work_api == "jobs":
424 kwargs["outdir"] = "$(task.outdir)"
425 kwargs["docker_outdir"] = "$(task.outdir)"
426 kwargs["tmpdir"] = "$(task.tmpdir)"
429 if kwargs.get("submit"):
430 # Submit a runner job to run the workflow for us.
431 if self.work_api == "containers":
432 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
433 kwargs["runnerjob"] = tool.tool["id"]
434 runnerjob = tool.job(job_order,
435 self.output_callback,
438 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
441 submit_runner_ram=kwargs.get("submit_runner_ram"),
442 name=kwargs.get("name"),
443 on_error=kwargs.get("on_error"),
444 submit_runner_image=kwargs.get("submit_runner_image"),
445 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
446 merged_map=merged_map)
447 elif self.work_api == "jobs":
448 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
451 submit_runner_ram=kwargs.get("submit_runner_ram"),
452 name=kwargs.get("name"),
453 on_error=kwargs.get("on_error"),
454 submit_runner_image=kwargs.get("submit_runner_image"),
455 merged_map=merged_map)
456 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
457 # Create pipeline for local run
458 self.pipeline = self.api.pipeline_instances().create(
460 "owner_uuid": self.project_uuid,
461 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
463 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
464 logger.info("Pipeline instance %s", self.pipeline["uuid"])
466 if runnerjob and not kwargs.get("wait"):
467 runnerjob.run(wait=kwargs.get("wait"))
468 return (runnerjob.uuid, "success")
470 self.poll_api = arvados.api('v1')
471 self.polling_thread = threading.Thread(target=self.poll_states)
472 self.polling_thread.start()
475 jobiter = iter((runnerjob,))
477 if "cwl_runner_job" in kwargs:
478 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
479 jobiter = tool.job(job_order,
480 self.output_callback,
485 # Will continue to hold the lock for the duration of this code
486 # except when in cond.wait(), at which point on_message can update
487 # job state and process output callbacks.
489 loopperf = Perf(metrics, "jobiter")
491 for runnable in jobiter:
494 if self.stop_polling.is_set():
498 with Perf(metrics, "run"):
499 runnable.run(**kwargs)
504 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
509 while self.processes:
512 except UnsupportedRequirement:
515 if sys.exc_info()[0] is KeyboardInterrupt:
516 logger.error("Interrupted, marking pipeline as failed")
518 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
520 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
521 body={"state": "Failed"}).execute(num_retries=self.num_retries)
522 if runnerjob and runnerjob.uuid and self.work_api == "containers":
523 self.api.container_requests().update(uuid=runnerjob.uuid,
524 body={"priority": "0"}).execute(num_retries=self.num_retries)
527 self.stop_polling.set()
528 self.polling_thread.join()
530 if self.final_status == "UnsupportedRequirement":
531 raise UnsupportedRequirement("Check log for details.")
533 if self.final_output is None:
534 raise WorkflowException("Workflow did not return a result.")
536 if kwargs.get("submit") and isinstance(runnerjob, Runner):
537 logger.info("Final output collection %s", runnerjob.final_output)
539 if self.output_name is None:
540 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
541 if self.output_tags is None:
542 self.output_tags = ""
543 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
544 self.set_crunch_output()
546 if kwargs.get("compute_checksum"):
547 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
548 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
550 if self.trash_intermediate and self.final_status == "success":
551 self.trash_intermediate_output()
553 return (self.final_output, self.final_status)
557 """Print version string of key packages for provenance and debugging."""
559 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
560 arvpkg = pkg_resources.require("arvados-python-client")
561 cwlpkg = pkg_resources.require("cwltool")
563 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
564 "arvados-python-client", arvpkg[0].version,
565 "cwltool", cwlpkg[0].version)
568 def arg_parser(): # type: () -> argparse.ArgumentParser
569 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
571 parser.add_argument("--basedir", type=str,
572 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
573 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
574 help="Output directory, default current directory")
576 parser.add_argument("--eval-timeout",
577 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
581 exgroup = parser.add_mutually_exclusive_group()
582 exgroup.add_argument("--print-dot", action="store_true",
583 help="Print workflow visualization in graphviz format and exit")
584 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
585 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
587 exgroup = parser.add_mutually_exclusive_group()
588 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
589 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
590 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
592 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
594 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
596 exgroup = parser.add_mutually_exclusive_group()
597 exgroup.add_argument("--enable-reuse", action="store_true",
598 default=True, dest="enable_reuse",
599 help="Enable job or container reuse (default)")
600 exgroup.add_argument("--disable-reuse", action="store_false",
601 default=True, dest="enable_reuse",
602 help="Disable job or container reuse")
604 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
605 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
606 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
607 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
608 help="Ignore Docker image version when deciding whether to reuse past jobs.",
611 exgroup = parser.add_mutually_exclusive_group()
612 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
613 default=True, dest="submit")
614 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
615 default=True, dest="submit")
616 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
617 dest="create_workflow")
618 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
619 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
621 exgroup = parser.add_mutually_exclusive_group()
622 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
623 default=True, dest="wait")
624 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
625 default=True, dest="wait")
627 exgroup = parser.add_mutually_exclusive_group()
628 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
629 default=True, dest="log_timestamps")
630 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
631 default=True, dest="log_timestamps")
633 parser.add_argument("--api", type=str,
634 default=None, dest="work_api",
635 choices=("jobs", "containers"),
636 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
638 parser.add_argument("--compute-checksum", action="store_true", default=False,
639 help="Compute checksum of contents while collecting outputs",
640 dest="compute_checksum")
642 parser.add_argument("--submit-runner-ram", type=int,
643 help="RAM (in MiB) required for the workflow runner job (default 1024)",
646 parser.add_argument("--submit-runner-image", type=str,
647 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
650 parser.add_argument("--name", type=str,
651 help="Name to use for workflow execution instance.",
654 parser.add_argument("--on-error", type=str,
655 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
656 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
658 parser.add_argument("--enable-dev", action="store_true",
659 help="Enable loading and running development versions "
660 "of CWL spec.", default=False)
662 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
663 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
666 exgroup = parser.add_mutually_exclusive_group()
667 exgroup.add_argument("--trash-intermediate", action="store_true",
668 default=False, dest="trash_intermediate",
669 help="Immediately trash intermediate outputs on workflow success.")
670 exgroup.add_argument("--no-trash-intermediate", action="store_false",
671 default=False, dest="trash_intermediate",
672 help="Do not trash intermediate outputs (default).")
674 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
675 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
680 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
681 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
682 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
683 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
685 cwltool.process.supportedProcessRequirements.extend([
686 "http://arvados.org/cwl#RunInSingleContainer",
687 "http://arvados.org/cwl#OutputDirType",
688 "http://arvados.org/cwl#RuntimeConstraints",
689 "http://arvados.org/cwl#PartitionRequirement",
690 "http://arvados.org/cwl#APIRequirement",
691 "http://commonwl.org/cwltool#LoadListingRequirement",
692 "http://arvados.org/cwl#IntermediateOutput",
693 "http://arvados.org/cwl#ReuseRequirement"
696 def main(args, stdout, stderr, api_client=None, keep_client=None):
697 parser = arg_parser()
699 job_order_object = None
700 arvargs = parser.parse_args(args)
703 print versionstring()
706 if arvargs.update_workflow:
707 if arvargs.update_workflow.find('-7fd4e-') == 5:
708 want_api = 'containers'
709 elif arvargs.update_workflow.find('-p5p6p-') == 5:
713 if want_api and arvargs.work_api and want_api != arvargs.work_api:
714 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
715 arvargs.update_workflow, want_api, arvargs.work_api))
717 arvargs.work_api = want_api
719 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
720 job_order_object = ({}, "")
725 if api_client is None:
726 api_client=arvados.api('v1', model=OrderedJsonModel())
727 if keep_client is None:
728 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
729 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
730 num_retries=4, output_name=arvargs.output_name,
731 output_tags=arvargs.output_tags)
732 except Exception as e:
737 logger.setLevel(logging.DEBUG)
738 logging.getLogger('arvados').setLevel(logging.DEBUG)
741 logger.setLevel(logging.WARN)
742 logging.getLogger('arvados').setLevel(logging.WARN)
743 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
746 metrics.setLevel(logging.DEBUG)
747 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
749 if arvargs.log_timestamps:
750 arvados.log_handler.setFormatter(logging.Formatter(
751 '%(asctime)s %(name)s %(levelname)s: %(message)s',
752 '%Y-%m-%d %H:%M:%S'))
754 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
756 arvargs.conformance_test = None
757 arvargs.use_container = True
758 arvargs.relax_path_checks = True
759 arvargs.print_supported_versions = False
761 make_fs_access = partial(CollectionFsAccess,
762 collection_cache=runner.collection_cache)
764 return cwltool.main.main(args=arvargs,
767 executor=runner.arv_executor,
768 makeTool=runner.arv_make_tool,
769 versionfunc=versionstring,
770 job_order_object=job_order_object,
771 make_fs_access=make_fs_access,
772 fetcher_constructor=partial(CollectionFetcher,
773 api_client=api_client,
774 fs_access=make_fs_access(""),
775 num_retries=runner.num_retries),
776 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
777 logger_handler=arvados.log_handler,
778 custom_schema_callback=add_arv_hints)