2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
21 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 import cwltool.process
26 from schema_salad.sourceline import SourceLine
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.draft2tool import compute_checksums
47 from arvados.api import OrderedJsonModel
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
53 arvados.log_handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s %(levelname)s: %(message)s',
57 class ArvCwlRunner(object):
58 """Execute a CWL tool or workflow, submit work (using either jobs or
59 containers API), wait for them to complete, and report output.
63 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
66 self.lock = threading.Lock()
67 self.cond = threading.Condition(self.lock)
68 self.final_output = None
69 self.final_status = None
71 self.num_retries = num_retries
73 self.stop_polling = threading.Event()
76 self.final_output_collection = None
77 self.output_name = output_name
78 self.output_tags = output_tags
79 self.project_uuid = None
80 self.intermediate_output_ttl = 0
81 self.intermediate_output_collections = []
82 self.trash_intermediate = False
84 if keep_client is not None:
85 self.keep_client = keep_client
87 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
89 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
92 expected_api = ["jobs", "containers"]
93 for api in expected_api:
95 methods = self.api._rootDesc.get('resources')[api]['methods']
96 if ('httpMethod' in methods['create'] and
97 (work_api == api or work_api is None)):
103 if not self.work_api:
105 raise Exception("No supported APIs")
107 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
109 def arv_make_tool(self, toolpath_object, **kwargs):
110 kwargs["work_api"] = self.work_api
111 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
113 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
114 num_retries=self.num_retries,
115 overrides=kwargs.get("override_tools"))
116 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
117 return ArvadosCommandTool(self, toolpath_object, **kwargs)
118 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
119 return ArvadosWorkflow(self, toolpath_object, **kwargs)
121 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
123 def output_callback(self, out, processStatus):
124 if processStatus == "success":
125 logger.info("Overall process status is %s", processStatus)
127 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
128 body={"state": "Complete"}).execute(num_retries=self.num_retries)
130 logger.warn("Overall process status is %s", processStatus)
132 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
133 body={"state": "Failed"}).execute(num_retries=self.num_retries)
134 self.final_status = processStatus
135 self.final_output = out
137 def on_message(self, event):
138 if "object_uuid" in event:
139 if event["object_uuid"] in self.processes and event["event_type"] == "update":
140 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
141 uuid = event["object_uuid"]
143 j = self.processes[uuid]
144 logger.info("%s %s is Running", self.label(j), uuid)
146 j.update_pipeline_component(event["properties"]["new_attributes"])
147 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
148 uuid = event["object_uuid"]
151 j = self.processes[uuid]
152 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
153 with Perf(metrics, "done %s" % j.name):
154 j.done(event["properties"]["new_attributes"])
159 def label(self, obj):
160 return "[%s %s]" % (self.work_api[0:-1], obj.name)
162 def poll_states(self):
163 """Poll status of jobs or containers listed in the processes dict.
165 Runs in a separate thread.
170 self.stop_polling.wait(15)
171 if self.stop_polling.is_set():
174 keys = self.processes.keys()
178 if self.work_api == "containers":
179 table = self.poll_api.container_requests()
180 elif self.work_api == "jobs":
181 table = self.poll_api.jobs()
184 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
185 except Exception as e:
186 logger.warn("Error checking states on API server: %s", e)
189 for p in proc_states["items"]:
191 "object_uuid": p["uuid"],
192 "event_type": "update",
198 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
200 self.processes.clear()
204 self.stop_polling.set()
206 def get_uploaded(self):
207 return self.uploaded.copy()
209 def add_uploaded(self, src, pair):
210 self.uploaded[src] = pair
212 def add_intermediate_output(self, uuid):
214 self.intermediate_output_collections.append(uuid)
216 def trash_intermediate_output(self):
217 logger.info("Cleaning up intermediate output collections")
218 for i in self.intermediate_output_collections:
220 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
222 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
223 if sys.exc_info()[0] is KeyboardInterrupt:
226 def check_features(self, obj):
227 if isinstance(obj, dict):
228 if obj.get("writable"):
229 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
230 if obj.get("class") == "DockerRequirement":
231 if obj.get("dockerOutputDirectory"):
232 # TODO: can be supported by containers API, but not jobs API.
233 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
234 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
235 for v in obj.itervalues():
236 self.check_features(v)
237 elif isinstance(obj, list):
238 for i,v in enumerate(obj):
239 with SourceLine(obj, i, UnsupportedRequirement):
240 self.check_features(v)
242 def make_output_collection(self, name, tagsString, outputObj):
243 outputObj = copy.deepcopy(outputObj)
246 def capture(fileobj):
247 files.append(fileobj)
249 adjustDirObjs(outputObj, capture)
250 adjustFileObjs(outputObj, capture)
252 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
254 final = arvados.collection.Collection(api_client=self.api,
255 keep_client=self.keep_client,
256 num_retries=self.num_retries)
258 for k,v in generatemapper.items():
259 if k.startswith("_:"):
260 if v.type == "Directory":
262 if v.type == "CreateFile":
263 with final.open(v.target, "wb") as f:
264 f.write(v.resolved.encode("utf-8"))
267 if not k.startswith("keep:"):
268 raise Exception("Output source is not in keep or a literal")
270 srccollection = sp[0][5:]
272 reader = self.collection_cache.get(srccollection)
273 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
274 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
275 except arvados.errors.ArgumentError as e:
276 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
279 logger.warn("While preparing output collection: %s", e)
281 def rewrite(fileobj):
282 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
283 for k in ("basename", "listing", "contents"):
287 adjustDirObjs(outputObj, rewrite)
288 adjustFileObjs(outputObj, rewrite)
290 with final.open("cwl.output.json", "w") as f:
291 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
293 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
295 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
296 final.api_response()["name"],
297 final.manifest_locator())
299 final_uuid = final.manifest_locator()
300 tags = tagsString.split(',')
302 self.api.links().create(body={
303 "head_uuid": final_uuid, "link_class": "tag", "name": tag
304 }).execute(num_retries=self.num_retries)
306 def finalcollection(fileobj):
307 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
309 adjustDirObjs(outputObj, finalcollection)
310 adjustFileObjs(outputObj, finalcollection)
312 return (outputObj, final)
314 def set_crunch_output(self):
315 if self.work_api == "containers":
317 current = self.api.containers().current().execute(num_retries=self.num_retries)
318 except ApiError as e:
319 # Status code 404 just means we're not running in a container.
320 if e.resp.status != 404:
321 logger.info("Getting current container: %s", e)
324 self.api.containers().update(uuid=current['uuid'],
326 'output': self.final_output_collection.portable_data_hash(),
327 }).execute(num_retries=self.num_retries)
328 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
331 }).execute(num_retries=self.num_retries)
332 except Exception as e:
333 logger.info("Setting container output: %s", e)
334 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
335 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
337 'output': self.final_output_collection.portable_data_hash(),
338 'success': self.final_status == "success",
340 }).execute(num_retries=self.num_retries)
342 def arv_executor(self, tool, job_order, **kwargs):
343 self.debug = kwargs.get("debug")
345 tool.visit(self.check_features)
347 self.project_uuid = kwargs.get("project_uuid")
349 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
350 collection_cache=self.collection_cache)
351 self.fs_access = make_fs_access(kwargs["basedir"])
354 self.trash_intermediate = kwargs["trash_intermediate"]
355 if self.trash_intermediate and self.work_api != "containers":
356 raise Exception("--trash-intermediate is only supported with --api=containers.")
358 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
359 if self.intermediate_output_ttl and self.work_api != "containers":
360 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
361 if self.intermediate_output_ttl < 0:
362 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
364 if not kwargs.get("name"):
365 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
367 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
368 # Also uploads docker images.
370 upload_workflow_deps(self, tool, override_tools)
372 # Reload tool object which may have been updated by
373 # upload_workflow_deps
374 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
375 makeTool=self.arv_make_tool,
376 loader=tool.doc_loader,
377 avsc_names=tool.doc_schema,
378 metadata=tool.metadata,
379 override_tools=override_tools)
381 # Upload local file references in the job order.
382 job_order = upload_job_order(self, "%s input" % kwargs["name"],
385 existing_uuid = kwargs.get("update_workflow")
386 if existing_uuid or kwargs.get("create_workflow"):
387 # Create a pipeline template or workflow record and exit.
388 if self.work_api == "jobs":
389 tmpl = RunnerTemplate(self, tool, job_order,
390 kwargs.get("enable_reuse"),
392 submit_runner_ram=kwargs.get("submit_runner_ram"),
395 # cwltool.main will write our return value to stdout.
396 return (tmpl.uuid, "success")
397 elif self.work_api == "containers":
398 return (upload_workflow(self, tool, job_order,
401 submit_runner_ram=kwargs.get("submit_runner_ram"),
402 name=kwargs["name"]),
405 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
407 kwargs["make_fs_access"] = make_fs_access
408 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
409 kwargs["use_container"] = True
410 kwargs["tmpdir_prefix"] = "tmp"
411 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
413 if self.work_api == "containers":
414 kwargs["outdir"] = "/var/spool/cwl"
415 kwargs["docker_outdir"] = "/var/spool/cwl"
416 kwargs["tmpdir"] = "/tmp"
417 kwargs["docker_tmpdir"] = "/tmp"
418 elif self.work_api == "jobs":
419 kwargs["outdir"] = "$(task.outdir)"
420 kwargs["docker_outdir"] = "$(task.outdir)"
421 kwargs["tmpdir"] = "$(task.tmpdir)"
424 if kwargs.get("submit"):
425 # Submit a runner job to run the workflow for us.
426 if self.work_api == "containers":
427 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
428 kwargs["runnerjob"] = tool.tool["id"]
429 runnerjob = tool.job(job_order,
430 self.output_callback,
433 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
436 submit_runner_ram=kwargs.get("submit_runner_ram"),
437 name=kwargs.get("name"),
438 on_error=kwargs.get("on_error"),
439 submit_runner_image=kwargs.get("submit_runner_image"),
440 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
441 elif self.work_api == "jobs":
442 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
445 submit_runner_ram=kwargs.get("submit_runner_ram"),
446 name=kwargs.get("name"),
447 on_error=kwargs.get("on_error"),
448 submit_runner_image=kwargs.get("submit_runner_image"))
449 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
450 # Create pipeline for local run
451 self.pipeline = self.api.pipeline_instances().create(
453 "owner_uuid": self.project_uuid,
454 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
456 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
457 logger.info("Pipeline instance %s", self.pipeline["uuid"])
459 if runnerjob and not kwargs.get("wait"):
460 runnerjob.run(wait=kwargs.get("wait"))
461 return (runnerjob.uuid, "success")
463 self.poll_api = arvados.api('v1')
464 self.polling_thread = threading.Thread(target=self.poll_states)
465 self.polling_thread.start()
468 jobiter = iter((runnerjob,))
470 if "cwl_runner_job" in kwargs:
471 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
472 jobiter = tool.job(job_order,
473 self.output_callback,
478 # Will continue to hold the lock for the duration of this code
479 # except when in cond.wait(), at which point on_message can update
480 # job state and process output callbacks.
482 loopperf = Perf(metrics, "jobiter")
484 for runnable in jobiter:
487 if self.stop_polling.is_set():
491 with Perf(metrics, "run"):
492 runnable.run(**kwargs)
497 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
502 while self.processes:
505 except UnsupportedRequirement:
508 if sys.exc_info()[0] is KeyboardInterrupt:
509 logger.error("Interrupted, marking pipeline as failed")
511 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
513 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
514 body={"state": "Failed"}).execute(num_retries=self.num_retries)
515 if runnerjob and runnerjob.uuid and self.work_api == "containers":
516 self.api.container_requests().update(uuid=runnerjob.uuid,
517 body={"priority": "0"}).execute(num_retries=self.num_retries)
520 self.stop_polling.set()
521 self.polling_thread.join()
523 if self.final_status == "UnsupportedRequirement":
524 raise UnsupportedRequirement("Check log for details.")
526 if self.final_output is None:
527 raise WorkflowException("Workflow did not return a result.")
529 if kwargs.get("submit") and isinstance(runnerjob, Runner):
530 logger.info("Final output collection %s", runnerjob.final_output)
532 if self.output_name is None:
533 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
534 if self.output_tags is None:
535 self.output_tags = ""
536 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
537 self.set_crunch_output()
539 if kwargs.get("compute_checksum"):
540 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
541 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
543 if self.trash_intermediate and self.final_status == "success":
544 self.trash_intermediate_output()
546 return (self.final_output, self.final_status)
550 """Print version string of key packages for provenance and debugging."""
552 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
553 arvpkg = pkg_resources.require("arvados-python-client")
554 cwlpkg = pkg_resources.require("cwltool")
556 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
557 "arvados-python-client", arvpkg[0].version,
558 "cwltool", cwlpkg[0].version)
561 def arg_parser(): # type: () -> argparse.ArgumentParser
562 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
564 parser.add_argument("--basedir", type=str,
565 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
566 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
567 help="Output directory, default current directory")
569 parser.add_argument("--eval-timeout",
570 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
574 exgroup = parser.add_mutually_exclusive_group()
575 exgroup.add_argument("--print-dot", action="store_true",
576 help="Print workflow visualization in graphviz format and exit")
577 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
578 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
580 exgroup = parser.add_mutually_exclusive_group()
581 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
582 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
583 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
585 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
587 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
589 exgroup = parser.add_mutually_exclusive_group()
590 exgroup.add_argument("--enable-reuse", action="store_true",
591 default=True, dest="enable_reuse",
592 help="Enable job or container reuse (default)")
593 exgroup.add_argument("--disable-reuse", action="store_false",
594 default=True, dest="enable_reuse",
595 help="Disable job or container reuse")
597 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
598 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
599 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
600 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
601 help="Ignore Docker image version when deciding whether to reuse past jobs.",
604 exgroup = parser.add_mutually_exclusive_group()
605 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
606 default=True, dest="submit")
607 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
608 default=True, dest="submit")
609 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
610 dest="create_workflow")
611 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
612 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
614 exgroup = parser.add_mutually_exclusive_group()
615 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
616 default=True, dest="wait")
617 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
618 default=True, dest="wait")
620 exgroup = parser.add_mutually_exclusive_group()
621 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
622 default=True, dest="log_timestamps")
623 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
624 default=True, dest="log_timestamps")
626 parser.add_argument("--api", type=str,
627 default=None, dest="work_api",
628 choices=("jobs", "containers"),
629 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
631 parser.add_argument("--compute-checksum", action="store_true", default=False,
632 help="Compute checksum of contents while collecting outputs",
633 dest="compute_checksum")
635 parser.add_argument("--submit-runner-ram", type=int,
636 help="RAM (in MiB) required for the workflow runner job (default 1024)",
639 parser.add_argument("--submit-runner-image", type=str,
640 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
643 parser.add_argument("--name", type=str,
644 help="Name to use for workflow execution instance.",
647 parser.add_argument("--on-error", type=str,
648 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
649 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
651 parser.add_argument("--enable-dev", action="store_true",
652 help="Enable loading and running development versions "
653 "of CWL spec.", default=False)
655 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
656 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
659 exgroup = parser.add_mutually_exclusive_group()
660 exgroup.add_argument("--trash-intermediate", action="store_true",
661 default=False, dest="trash_intermediate",
662 help="Immediately trash intermediate outputs on workflow success.")
663 exgroup.add_argument("--no-trash-intermediate", action="store_false",
664 default=False, dest="trash_intermediate",
665 help="Do not trash intermediate outputs (default).")
667 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
668 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
673 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
674 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
675 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
676 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
678 cwltool.process.supportedProcessRequirements.extend([
679 "http://arvados.org/cwl#RunInSingleContainer",
680 "http://arvados.org/cwl#OutputDirType",
681 "http://arvados.org/cwl#RuntimeConstraints",
682 "http://arvados.org/cwl#PartitionRequirement",
683 "http://arvados.org/cwl#APIRequirement",
684 "http://commonwl.org/cwltool#LoadListingRequirement",
685 "http://arvados.org/cwl#IntermediateOutput",
686 "http://arvados.org/cwl#ReuseRequirement"
689 def main(args, stdout, stderr, api_client=None, keep_client=None):
690 parser = arg_parser()
692 job_order_object = None
693 arvargs = parser.parse_args(args)
696 print versionstring()
699 if arvargs.update_workflow:
700 if arvargs.update_workflow.find('-7fd4e-') == 5:
701 want_api = 'containers'
702 elif arvargs.update_workflow.find('-p5p6p-') == 5:
706 if want_api and arvargs.work_api and want_api != arvargs.work_api:
707 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
708 arvargs.update_workflow, want_api, arvargs.work_api))
710 arvargs.work_api = want_api
712 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
713 job_order_object = ({}, "")
718 if api_client is None:
719 api_client=arvados.api('v1', model=OrderedJsonModel())
720 if keep_client is None:
721 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
722 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
723 num_retries=4, output_name=arvargs.output_name,
724 output_tags=arvargs.output_tags)
725 except Exception as e:
730 logger.setLevel(logging.DEBUG)
731 logging.getLogger('arvados').setLevel(logging.DEBUG)
734 logger.setLevel(logging.WARN)
735 logging.getLogger('arvados').setLevel(logging.WARN)
736 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
739 metrics.setLevel(logging.DEBUG)
740 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
742 if arvargs.log_timestamps:
743 arvados.log_handler.setFormatter(logging.Formatter(
744 '%(asctime)s %(name)s %(levelname)s: %(message)s',
745 '%Y-%m-%d %H:%M:%S'))
747 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
749 arvargs.conformance_test = None
750 arvargs.use_container = True
751 arvargs.relax_path_checks = True
752 arvargs.validate = None
753 arvargs.print_supported_versions = False
755 make_fs_access = partial(CollectionFsAccess,
756 collection_cache=runner.collection_cache)
758 return cwltool.main.main(args=arvargs,
761 executor=runner.arv_executor,
762 makeTool=runner.arv_make_tool,
763 versionfunc=versionstring,
764 job_order_object=job_order_object,
765 make_fs_access=make_fs_access,
766 fetcher_constructor=partial(CollectionFetcher,
767 api_client=api_client,
768 fs_access=make_fs_access(""),
769 num_retries=runner.num_retries),
770 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
771 logger_handler=arvados.log_handler,
772 custom_schema_callback=add_arv_hints)