2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
21 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 import cwltool.process
26 from schema_salad.sourceline import SourceLine
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.draft2tool import compute_checksums
47 from arvados.api import OrderedJsonModel
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
53 arvados.log_handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s %(levelname)s: %(message)s',
57 class ArvCwlRunner(object):
58 """Execute a CWL tool or workflow, submit work (using either jobs or
59 containers API), wait for them to complete, and report output.
63 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
66 self.lock = threading.Lock()
67 self.cond = threading.Condition(self.lock)
68 self.final_output = None
69 self.final_status = None
71 self.num_retries = num_retries
73 self.stop_polling = threading.Event()
76 self.final_output_collection = None
77 self.output_name = output_name
78 self.output_tags = output_tags
79 self.project_uuid = None
80 self.intermediate_output_ttl = 0
81 self.intermediate_output_collections = []
82 self.trash_intermediate = False
84 if keep_client is not None:
85 self.keep_client = keep_client
87 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
89 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
92 expected_api = ["jobs", "containers"]
93 for api in expected_api:
95 methods = self.api._rootDesc.get('resources')[api]['methods']
96 if ('httpMethod' in methods['create'] and
97 (work_api == api or work_api is None)):
103 if not self.work_api:
105 raise Exception("No supported APIs")
107 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
109 def arv_make_tool(self, toolpath_object, **kwargs):
110 kwargs["work_api"] = self.work_api
111 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
113 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
114 num_retries=self.num_retries,
115 overrides=kwargs.get("override_tools"))
116 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
117 return ArvadosCommandTool(self, toolpath_object, **kwargs)
118 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
119 return ArvadosWorkflow(self, toolpath_object, **kwargs)
121 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
123 def output_callback(self, out, processStatus):
124 if processStatus == "success":
125 logger.info("Overall process status is %s", processStatus)
127 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
128 body={"state": "Complete"}).execute(num_retries=self.num_retries)
130 logger.warn("Overall process status is %s", processStatus)
132 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
133 body={"state": "Failed"}).execute(num_retries=self.num_retries)
134 self.final_status = processStatus
135 self.final_output = out
137 def on_message(self, event):
138 if "object_uuid" in event:
139 if event["object_uuid"] in self.processes and event["event_type"] == "update":
140 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
141 uuid = event["object_uuid"]
143 j = self.processes[uuid]
144 logger.info("%s %s is Running", self.label(j), uuid)
146 j.update_pipeline_component(event["properties"]["new_attributes"])
147 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
148 uuid = event["object_uuid"]
151 j = self.processes[uuid]
152 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
153 with Perf(metrics, "done %s" % j.name):
154 j.done(event["properties"]["new_attributes"])
159 def label(self, obj):
160 return "[%s %s]" % (self.work_api[0:-1], obj.name)
162 def poll_states(self):
163 """Poll status of jobs or containers listed in the processes dict.
165 Runs in a separate thread.
170 self.stop_polling.wait(15)
171 if self.stop_polling.is_set():
174 keys = self.processes.keys()
178 if self.work_api == "containers":
179 table = self.poll_api.container_requests()
180 elif self.work_api == "jobs":
181 table = self.poll_api.jobs()
184 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
185 except Exception as e:
186 logger.warn("Error checking states on API server: %s", e)
189 for p in proc_states["items"]:
191 "object_uuid": p["uuid"],
192 "event_type": "update",
198 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
200 self.processes.clear()
204 self.stop_polling.set()
206 def get_uploaded(self):
207 return self.uploaded.copy()
209 def add_uploaded(self, src, pair):
210 self.uploaded[src] = pair
212 def add_intermediate_output(self, uuid):
214 self.intermediate_output_collections.append(uuid)
216 def trash_intermediate_output(self):
217 logger.info("Cleaning up intermediate output collections")
218 for i in self.intermediate_output_collections:
220 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
222 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
223 if sys.exc_info()[0] is KeyboardInterrupt:
226 def check_features(self, obj):
227 if isinstance(obj, dict):
228 if obj.get("writable"):
229 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
230 if obj.get("class") == "DockerRequirement":
231 if obj.get("dockerOutputDirectory"):
232 # TODO: can be supported by containers API, but not jobs API.
233 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
234 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
235 for v in obj.itervalues():
236 self.check_features(v)
237 elif isinstance(obj, list):
238 for i,v in enumerate(obj):
239 with SourceLine(obj, i, UnsupportedRequirement):
240 self.check_features(v)
242 def make_output_collection(self, name, tagsString, outputObj):
243 outputObj = copy.deepcopy(outputObj)
246 def capture(fileobj):
247 files.append(fileobj)
249 adjustDirObjs(outputObj, capture)
250 adjustFileObjs(outputObj, capture)
252 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
254 final = arvados.collection.Collection(api_client=self.api,
255 keep_client=self.keep_client,
256 num_retries=self.num_retries)
258 for k,v in generatemapper.items():
259 if k.startswith("_:"):
260 if v.type == "Directory":
262 if v.type == "CreateFile":
263 with final.open(v.target, "wb") as f:
264 f.write(v.resolved.encode("utf-8"))
267 if not k.startswith("keep:"):
268 raise Exception("Output source is not in keep or a literal")
270 srccollection = sp[0][5:]
272 reader = self.collection_cache.get(srccollection)
273 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
274 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
275 except arvados.errors.ArgumentError as e:
276 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
279 logger.warn("While preparing output collection: %s", e)
281 def rewrite(fileobj):
282 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
283 for k in ("basename", "listing", "contents"):
287 adjustDirObjs(outputObj, rewrite)
288 adjustFileObjs(outputObj, rewrite)
290 with final.open("cwl.output.json", "w") as f:
291 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
293 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
295 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
296 final.api_response()["name"],
297 final.manifest_locator())
299 final_uuid = final.manifest_locator()
300 tags = tagsString.split(',')
302 self.api.links().create(body={
303 "head_uuid": final_uuid, "link_class": "tag", "name": tag
304 }).execute(num_retries=self.num_retries)
306 def finalcollection(fileobj):
307 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
309 adjustDirObjs(outputObj, finalcollection)
310 adjustFileObjs(outputObj, finalcollection)
312 return (outputObj, final)
314 def set_crunch_output(self):
315 if self.work_api == "containers":
317 current = self.api.containers().current().execute(num_retries=self.num_retries)
318 except ApiError as e:
319 # Status code 404 just means we're not running in a container.
320 if e.resp.status != 404:
321 logger.info("Getting current container: %s", e)
324 self.api.containers().update(uuid=current['uuid'],
326 'output': self.final_output_collection.portable_data_hash(),
327 }).execute(num_retries=self.num_retries)
328 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
331 }).execute(num_retries=self.num_retries)
332 except Exception as e:
333 logger.info("Setting container output: %s", e)
334 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
335 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
337 'output': self.final_output_collection.portable_data_hash(),
338 'success': self.final_status == "success",
340 }).execute(num_retries=self.num_retries)
342 def arv_executor(self, tool, job_order, **kwargs):
343 self.debug = kwargs.get("debug")
345 tool.visit(self.check_features)
347 self.project_uuid = kwargs.get("project_uuid")
349 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
350 collection_cache=self.collection_cache)
351 self.fs_access = make_fs_access(kwargs["basedir"])
354 self.trash_intermediate = kwargs["trash_intermediate"]
355 if self.trash_intermediate and self.work_api != "containers":
356 raise Exception("--trash-intermediate is only supported with --api=containers.")
358 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
359 if self.intermediate_output_ttl and self.work_api != "containers":
360 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
361 if self.intermediate_output_ttl < 0:
362 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
364 if not kwargs.get("name"):
365 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
367 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
368 # Also uploads docker images.
370 upload_workflow_deps(self, tool, override_tools)
372 # Reload tool object which may have been updated by
373 # upload_workflow_deps
374 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
375 makeTool=self.arv_make_tool,
376 loader=tool.doc_loader,
377 avsc_names=tool.doc_schema,
378 metadata=tool.metadata,
379 override_tools=override_tools)
381 # Upload local file references in the job order.
382 job_order = upload_job_order(self, "%s input" % kwargs["name"],
385 existing_uuid = kwargs.get("update_workflow")
386 if existing_uuid or kwargs.get("create_workflow"):
387 # Create a pipeline template or workflow record and exit.
388 if self.work_api == "jobs":
389 tmpl = RunnerTemplate(self, tool, job_order,
390 kwargs.get("enable_reuse"),
392 submit_runner_ram=kwargs.get("submit_runner_ram"),
395 # cwltool.main will write our return value to stdout.
396 return (tmpl.uuid, "success")
397 elif self.work_api == "containers":
398 return (upload_workflow(self, tool, job_order,
401 submit_runner_ram=kwargs.get("submit_runner_ram"),
402 name=kwargs["name"]),
405 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
407 kwargs["make_fs_access"] = make_fs_access
408 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
409 kwargs["use_container"] = True
410 kwargs["tmpdir_prefix"] = "tmp"
411 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
413 if self.work_api == "containers":
414 kwargs["outdir"] = "/var/spool/cwl"
415 kwargs["docker_outdir"] = "/var/spool/cwl"
416 kwargs["tmpdir"] = "/tmp"
417 kwargs["docker_tmpdir"] = "/tmp"
418 elif self.work_api == "jobs":
419 kwargs["outdir"] = "$(task.outdir)"
420 kwargs["docker_outdir"] = "$(task.outdir)"
421 kwargs["tmpdir"] = "$(task.tmpdir)"
424 if kwargs.get("submit"):
425 # Submit a runner job to run the workflow for us.
426 if self.work_api == "containers":
427 if tool.tool["class"] == "CommandLineTool":
428 kwargs["runnerjob"] = tool.tool["id"]
429 upload_dependencies(self,
435 runnerjob = tool.job(job_order,
436 self.output_callback,
439 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
442 submit_runner_ram=kwargs.get("submit_runner_ram"),
443 name=kwargs.get("name"),
444 on_error=kwargs.get("on_error"),
445 submit_runner_image=kwargs.get("submit_runner_image"),
446 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
447 elif self.work_api == "jobs":
448 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
451 submit_runner_ram=kwargs.get("submit_runner_ram"),
452 name=kwargs.get("name"),
453 on_error=kwargs.get("on_error"),
454 submit_runner_image=kwargs.get("submit_runner_image"))
455 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
456 # Create pipeline for local run
457 self.pipeline = self.api.pipeline_instances().create(
459 "owner_uuid": self.project_uuid,
460 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
462 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
463 logger.info("Pipeline instance %s", self.pipeline["uuid"])
465 if runnerjob and not kwargs.get("wait"):
466 runnerjob.run(wait=kwargs.get("wait"))
467 return (runnerjob.uuid, "success")
469 self.poll_api = arvados.api('v1')
470 self.polling_thread = threading.Thread(target=self.poll_states)
471 self.polling_thread.start()
474 jobiter = iter((runnerjob,))
476 if "cwl_runner_job" in kwargs:
477 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
478 jobiter = tool.job(job_order,
479 self.output_callback,
484 # Will continue to hold the lock for the duration of this code
485 # except when in cond.wait(), at which point on_message can update
486 # job state and process output callbacks.
488 loopperf = Perf(metrics, "jobiter")
490 for runnable in jobiter:
493 if self.stop_polling.is_set():
497 with Perf(metrics, "run"):
498 runnable.run(**kwargs)
503 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
508 while self.processes:
511 except UnsupportedRequirement:
514 if sys.exc_info()[0] is KeyboardInterrupt:
515 logger.error("Interrupted, marking pipeline as failed")
517 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
519 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
520 body={"state": "Failed"}).execute(num_retries=self.num_retries)
521 if runnerjob and runnerjob.uuid and self.work_api == "containers":
522 self.api.container_requests().update(uuid=runnerjob.uuid,
523 body={"priority": "0"}).execute(num_retries=self.num_retries)
526 self.stop_polling.set()
527 self.polling_thread.join()
529 if self.final_status == "UnsupportedRequirement":
530 raise UnsupportedRequirement("Check log for details.")
532 if self.final_output is None:
533 raise WorkflowException("Workflow did not return a result.")
535 if kwargs.get("submit") and isinstance(runnerjob, Runner):
536 logger.info("Final output collection %s", runnerjob.final_output)
538 if self.output_name is None:
539 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
540 if self.output_tags is None:
541 self.output_tags = ""
542 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
543 self.set_crunch_output()
545 if kwargs.get("compute_checksum"):
546 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
547 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
549 if self.trash_intermediate and self.final_status == "success":
550 self.trash_intermediate_output()
552 return (self.final_output, self.final_status)
556 """Print version string of key packages for provenance and debugging."""
558 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
559 arvpkg = pkg_resources.require("arvados-python-client")
560 cwlpkg = pkg_resources.require("cwltool")
562 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
563 "arvados-python-client", arvpkg[0].version,
564 "cwltool", cwlpkg[0].version)
567 def arg_parser(): # type: () -> argparse.ArgumentParser
568 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
570 parser.add_argument("--basedir", type=str,
571 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
572 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
573 help="Output directory, default current directory")
575 parser.add_argument("--eval-timeout",
576 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
580 exgroup = parser.add_mutually_exclusive_group()
581 exgroup.add_argument("--print-dot", action="store_true",
582 help="Print workflow visualization in graphviz format and exit")
583 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
584 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
586 exgroup = parser.add_mutually_exclusive_group()
587 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
588 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
589 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
591 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
593 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
595 exgroup = parser.add_mutually_exclusive_group()
596 exgroup.add_argument("--enable-reuse", action="store_true",
597 default=True, dest="enable_reuse",
598 help="Enable job or container reuse (default)")
599 exgroup.add_argument("--disable-reuse", action="store_false",
600 default=True, dest="enable_reuse",
601 help="Disable job or container reuse")
603 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
604 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
605 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
606 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
607 help="Ignore Docker image version when deciding whether to reuse past jobs.",
610 exgroup = parser.add_mutually_exclusive_group()
611 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
612 default=True, dest="submit")
613 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
614 default=True, dest="submit")
615 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
616 dest="create_workflow")
617 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
618 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
620 exgroup = parser.add_mutually_exclusive_group()
621 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
622 default=True, dest="wait")
623 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
624 default=True, dest="wait")
626 exgroup = parser.add_mutually_exclusive_group()
627 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
628 default=True, dest="log_timestamps")
629 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
630 default=True, dest="log_timestamps")
632 parser.add_argument("--api", type=str,
633 default=None, dest="work_api",
634 choices=("jobs", "containers"),
635 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
637 parser.add_argument("--compute-checksum", action="store_true", default=False,
638 help="Compute checksum of contents while collecting outputs",
639 dest="compute_checksum")
641 parser.add_argument("--submit-runner-ram", type=int,
642 help="RAM (in MiB) required for the workflow runner job (default 1024)",
645 parser.add_argument("--submit-runner-image", type=str,
646 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
649 parser.add_argument("--name", type=str,
650 help="Name to use for workflow execution instance.",
653 parser.add_argument("--on-error", type=str,
654 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
655 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
657 parser.add_argument("--enable-dev", action="store_true",
658 help="Enable loading and running development versions "
659 "of CWL spec.", default=False)
661 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
662 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
665 exgroup = parser.add_mutually_exclusive_group()
666 exgroup.add_argument("--trash-intermediate", action="store_true",
667 default=False, dest="trash_intermediate",
668 help="Immediately trash intermediate outputs on workflow success.")
669 exgroup.add_argument("--no-trash-intermediate", action="store_false",
670 default=False, dest="trash_intermediate",
671 help="Do not trash intermediate outputs (default).")
673 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
674 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
679 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
680 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
681 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
682 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
684 cwltool.process.supportedProcessRequirements.extend([
685 "http://arvados.org/cwl#RunInSingleContainer",
686 "http://arvados.org/cwl#OutputDirType",
687 "http://arvados.org/cwl#RuntimeConstraints",
688 "http://arvados.org/cwl#PartitionRequirement",
689 "http://arvados.org/cwl#APIRequirement",
690 "http://commonwl.org/cwltool#LoadListingRequirement",
691 "http://arvados.org/cwl#IntermediateOutput",
692 "http://arvados.org/cwl#ReuseRequirement"
695 def main(args, stdout, stderr, api_client=None, keep_client=None):
696 parser = arg_parser()
698 job_order_object = None
699 arvargs = parser.parse_args(args)
702 print versionstring()
705 if arvargs.update_workflow:
706 if arvargs.update_workflow.find('-7fd4e-') == 5:
707 want_api = 'containers'
708 elif arvargs.update_workflow.find('-p5p6p-') == 5:
712 if want_api and arvargs.work_api and want_api != arvargs.work_api:
713 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
714 arvargs.update_workflow, want_api, arvargs.work_api))
716 arvargs.work_api = want_api
718 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
719 job_order_object = ({}, "")
724 if api_client is None:
725 api_client=arvados.api('v1', model=OrderedJsonModel())
726 if keep_client is None:
727 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
728 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
729 num_retries=4, output_name=arvargs.output_name,
730 output_tags=arvargs.output_tags)
731 except Exception as e:
736 logger.setLevel(logging.DEBUG)
737 logging.getLogger('arvados').setLevel(logging.DEBUG)
740 logger.setLevel(logging.WARN)
741 logging.getLogger('arvados').setLevel(logging.WARN)
742 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
745 metrics.setLevel(logging.DEBUG)
746 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
748 if arvargs.log_timestamps:
749 arvados.log_handler.setFormatter(logging.Formatter(
750 '%(asctime)s %(name)s %(levelname)s: %(message)s',
751 '%Y-%m-%d %H:%M:%S'))
753 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
755 arvargs.conformance_test = None
756 arvargs.use_container = True
757 arvargs.relax_path_checks = True
758 arvargs.validate = None
760 make_fs_access = partial(CollectionFsAccess,
761 collection_cache=runner.collection_cache)
763 return cwltool.main.main(args=arvargs,
766 executor=runner.arv_executor,
767 makeTool=runner.arv_make_tool,
768 versionfunc=versionstring,
769 job_order_object=job_order_object,
770 make_fs_access=make_fs_access,
771 fetcher_constructor=partial(CollectionFetcher,
772 api_client=api_client,
773 fs_access=make_fs_access(""),
774 num_retries=runner.num_retries),
775 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
776 logger_handler=arvados.log_handler,
777 custom_schema_callback=add_arv_hints)