2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
21 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 import cwltool.process
26 from schema_salad.sourceline import SourceLine
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.draft2tool import compute_checksums
47 from arvados.api import OrderedJsonModel
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
53 arvados.log_handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s %(levelname)s: %(message)s',
57 class ArvCwlRunner(object):
58 """Execute a CWL tool or workflow, submit work (using either jobs or
59 containers API), wait for them to complete, and report output.
63 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
66 self.lock = threading.Lock()
67 self.cond = threading.Condition(self.lock)
68 self.final_output = None
69 self.final_status = None
71 self.num_retries = num_retries
73 self.stop_polling = threading.Event()
76 self.final_output_collection = None
77 self.output_name = output_name
78 self.output_tags = output_tags
79 self.project_uuid = None
80 self.intermediate_output_ttl = 0
81 self.intermediate_output_collections = []
82 self.trash_intermediate = False
84 if keep_client is not None:
85 self.keep_client = keep_client
87 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
89 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
92 expected_api = ["jobs", "containers"]
93 for api in expected_api:
95 methods = self.api._rootDesc.get('resources')[api]['methods']
96 if ('httpMethod' in methods['create'] and
97 (work_api == api or work_api is None)):
103 if not self.work_api:
105 raise Exception("No supported APIs")
107 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
109 def arv_make_tool(self, toolpath_object, **kwargs):
110 kwargs["work_api"] = self.work_api
111 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
113 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
114 num_retries=self.num_retries,
115 overrides=kwargs.get("override_tools"))
116 kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
117 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
118 return ArvadosCommandTool(self, toolpath_object, **kwargs)
119 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
120 return ArvadosWorkflow(self, toolpath_object, **kwargs)
122 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
124 def output_callback(self, out, processStatus):
125 if processStatus == "success":
126 logger.info("Overall process status is %s", processStatus)
128 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
129 body={"state": "Complete"}).execute(num_retries=self.num_retries)
131 logger.warn("Overall process status is %s", processStatus)
133 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
134 body={"state": "Failed"}).execute(num_retries=self.num_retries)
135 self.final_status = processStatus
136 self.final_output = out
138 def on_message(self, event):
139 if "object_uuid" in event:
140 if event["object_uuid"] in self.processes and event["event_type"] == "update":
141 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
142 uuid = event["object_uuid"]
144 j = self.processes[uuid]
145 logger.info("%s %s is Running", self.label(j), uuid)
147 j.update_pipeline_component(event["properties"]["new_attributes"])
148 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
149 uuid = event["object_uuid"]
152 j = self.processes[uuid]
153 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
154 with Perf(metrics, "done %s" % j.name):
155 j.done(event["properties"]["new_attributes"])
160 def label(self, obj):
161 return "[%s %s]" % (self.work_api[0:-1], obj.name)
163 def poll_states(self):
164 """Poll status of jobs or containers listed in the processes dict.
166 Runs in a separate thread.
171 self.stop_polling.wait(15)
172 if self.stop_polling.is_set():
175 keys = self.processes.keys()
179 if self.work_api == "containers":
180 table = self.poll_api.container_requests()
181 elif self.work_api == "jobs":
182 table = self.poll_api.jobs()
185 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
186 except Exception as e:
187 logger.warn("Error checking states on API server: %s", e)
190 for p in proc_states["items"]:
192 "object_uuid": p["uuid"],
193 "event_type": "update",
199 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
201 self.processes.clear()
205 self.stop_polling.set()
207 def get_uploaded(self):
208 return self.uploaded.copy()
210 def add_uploaded(self, src, pair):
211 self.uploaded[src] = pair
213 def add_intermediate_output(self, uuid):
215 self.intermediate_output_collections.append(uuid)
217 def trash_intermediate_output(self):
218 logger.info("Cleaning up intermediate output collections")
219 for i in self.intermediate_output_collections:
221 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
223 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
224 if sys.exc_info()[0] is KeyboardInterrupt:
227 def check_features(self, obj):
228 if isinstance(obj, dict):
229 if obj.get("writable"):
230 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
231 if obj.get("class") == "DockerRequirement":
232 if obj.get("dockerOutputDirectory"):
233 # TODO: can be supported by containers API, but not jobs API.
234 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
235 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
236 for v in obj.itervalues():
237 self.check_features(v)
238 elif isinstance(obj, list):
239 for i,v in enumerate(obj):
240 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
241 self.check_features(v)
243 def make_output_collection(self, name, tagsString, outputObj):
244 outputObj = copy.deepcopy(outputObj)
247 def capture(fileobj):
248 files.append(fileobj)
250 adjustDirObjs(outputObj, capture)
251 adjustFileObjs(outputObj, capture)
253 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
255 final = arvados.collection.Collection(api_client=self.api,
256 keep_client=self.keep_client,
257 num_retries=self.num_retries)
259 for k,v in generatemapper.items():
260 if k.startswith("_:"):
261 if v.type == "Directory":
263 if v.type == "CreateFile":
264 with final.open(v.target, "wb") as f:
265 f.write(v.resolved.encode("utf-8"))
268 if not k.startswith("keep:"):
269 raise Exception("Output source is not in keep or a literal")
271 srccollection = sp[0][5:]
273 reader = self.collection_cache.get(srccollection)
274 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
275 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
276 except arvados.errors.ArgumentError as e:
277 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
280 logger.warn("While preparing output collection: %s", e)
282 def rewrite(fileobj):
283 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
284 for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"):
288 adjustDirObjs(outputObj, rewrite)
289 adjustFileObjs(outputObj, rewrite)
291 with final.open("cwl.output.json", "w") as f:
292 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
294 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
296 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
297 final.api_response()["name"],
298 final.manifest_locator())
300 final_uuid = final.manifest_locator()
301 tags = tagsString.split(',')
303 self.api.links().create(body={
304 "head_uuid": final_uuid, "link_class": "tag", "name": tag
305 }).execute(num_retries=self.num_retries)
307 def finalcollection(fileobj):
308 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
310 adjustDirObjs(outputObj, finalcollection)
311 adjustFileObjs(outputObj, finalcollection)
313 return (outputObj, final)
315 def set_crunch_output(self):
316 if self.work_api == "containers":
318 current = self.api.containers().current().execute(num_retries=self.num_retries)
319 except ApiError as e:
320 # Status code 404 just means we're not running in a container.
321 if e.resp.status != 404:
322 logger.info("Getting current container: %s", e)
325 self.api.containers().update(uuid=current['uuid'],
327 'output': self.final_output_collection.portable_data_hash(),
328 }).execute(num_retries=self.num_retries)
329 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
332 }).execute(num_retries=self.num_retries)
333 except Exception as e:
334 logger.info("Setting container output: %s", e)
335 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
336 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
338 'output': self.final_output_collection.portable_data_hash(),
339 'success': self.final_status == "success",
341 }).execute(num_retries=self.num_retries)
343 def arv_executor(self, tool, job_order, **kwargs):
344 self.debug = kwargs.get("debug")
346 tool.visit(self.check_features)
348 self.project_uuid = kwargs.get("project_uuid")
350 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
351 collection_cache=self.collection_cache)
352 self.fs_access = make_fs_access(kwargs["basedir"])
355 self.trash_intermediate = kwargs["trash_intermediate"]
356 if self.trash_intermediate and self.work_api != "containers":
357 raise Exception("--trash-intermediate is only supported with --api=containers.")
359 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
360 if self.intermediate_output_ttl and self.work_api != "containers":
361 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
362 if self.intermediate_output_ttl < 0:
363 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
365 if not kwargs.get("name"):
366 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
368 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
369 # Also uploads docker images.
371 upload_workflow_deps(self, tool, override_tools)
373 # Reload tool object which may have been updated by
374 # upload_workflow_deps
375 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
376 makeTool=self.arv_make_tool,
377 loader=tool.doc_loader,
378 avsc_names=tool.doc_schema,
379 metadata=tool.metadata,
380 override_tools=override_tools)
382 # Upload local file references in the job order.
383 job_order = upload_job_order(self, "%s input" % kwargs["name"],
386 existing_uuid = kwargs.get("update_workflow")
387 if existing_uuid or kwargs.get("create_workflow"):
388 # Create a pipeline template or workflow record and exit.
389 if self.work_api == "jobs":
390 tmpl = RunnerTemplate(self, tool, job_order,
391 kwargs.get("enable_reuse"),
393 submit_runner_ram=kwargs.get("submit_runner_ram"),
396 # cwltool.main will write our return value to stdout.
397 return (tmpl.uuid, "success")
398 elif self.work_api == "containers":
399 return (upload_workflow(self, tool, job_order,
402 submit_runner_ram=kwargs.get("submit_runner_ram"),
403 name=kwargs["name"]),
406 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
408 kwargs["make_fs_access"] = make_fs_access
409 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
410 kwargs["use_container"] = True
411 kwargs["tmpdir_prefix"] = "tmp"
412 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
414 if self.work_api == "containers":
415 kwargs["outdir"] = "/var/spool/cwl"
416 kwargs["docker_outdir"] = "/var/spool/cwl"
417 kwargs["tmpdir"] = "/tmp"
418 kwargs["docker_tmpdir"] = "/tmp"
419 elif self.work_api == "jobs":
420 kwargs["outdir"] = "$(task.outdir)"
421 kwargs["docker_outdir"] = "$(task.outdir)"
422 kwargs["tmpdir"] = "$(task.tmpdir)"
425 if kwargs.get("submit"):
426 # Submit a runner job to run the workflow for us.
427 if self.work_api == "containers":
428 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
429 kwargs["runnerjob"] = tool.tool["id"]
430 runnerjob = tool.job(job_order,
431 self.output_callback,
434 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
437 submit_runner_ram=kwargs.get("submit_runner_ram"),
438 name=kwargs.get("name"),
439 on_error=kwargs.get("on_error"),
440 submit_runner_image=kwargs.get("submit_runner_image"),
441 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
442 elif self.work_api == "jobs":
443 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
446 submit_runner_ram=kwargs.get("submit_runner_ram"),
447 name=kwargs.get("name"),
448 on_error=kwargs.get("on_error"),
449 submit_runner_image=kwargs.get("submit_runner_image"))
450 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
451 # Create pipeline for local run
452 self.pipeline = self.api.pipeline_instances().create(
454 "owner_uuid": self.project_uuid,
455 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
457 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
458 logger.info("Pipeline instance %s", self.pipeline["uuid"])
460 if runnerjob and not kwargs.get("wait"):
461 runnerjob.run(wait=kwargs.get("wait"))
462 return (runnerjob.uuid, "success")
464 self.poll_api = arvados.api('v1')
465 self.polling_thread = threading.Thread(target=self.poll_states)
466 self.polling_thread.start()
469 jobiter = iter((runnerjob,))
471 if "cwl_runner_job" in kwargs:
472 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
473 jobiter = tool.job(job_order,
474 self.output_callback,
479 # Will continue to hold the lock for the duration of this code
480 # except when in cond.wait(), at which point on_message can update
481 # job state and process output callbacks.
483 loopperf = Perf(metrics, "jobiter")
485 for runnable in jobiter:
488 if self.stop_polling.is_set():
492 with Perf(metrics, "run"):
493 runnable.run(**kwargs)
498 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
503 while self.processes:
506 except UnsupportedRequirement:
509 if sys.exc_info()[0] is KeyboardInterrupt:
510 logger.error("Interrupted, marking pipeline as failed")
512 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
514 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
515 body={"state": "Failed"}).execute(num_retries=self.num_retries)
516 if runnerjob and runnerjob.uuid and self.work_api == "containers":
517 self.api.container_requests().update(uuid=runnerjob.uuid,
518 body={"priority": "0"}).execute(num_retries=self.num_retries)
521 self.stop_polling.set()
522 self.polling_thread.join()
524 if self.final_status == "UnsupportedRequirement":
525 raise UnsupportedRequirement("Check log for details.")
527 if self.final_output is None:
528 raise WorkflowException("Workflow did not return a result.")
530 if kwargs.get("submit") and isinstance(runnerjob, Runner):
531 logger.info("Final output collection %s", runnerjob.final_output)
533 if self.output_name is None:
534 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
535 if self.output_tags is None:
536 self.output_tags = ""
537 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
538 self.set_crunch_output()
540 if kwargs.get("compute_checksum"):
541 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
542 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
544 if self.trash_intermediate and self.final_status == "success":
545 self.trash_intermediate_output()
547 return (self.final_output, self.final_status)
551 """Print version string of key packages for provenance and debugging."""
553 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
554 arvpkg = pkg_resources.require("arvados-python-client")
555 cwlpkg = pkg_resources.require("cwltool")
557 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
558 "arvados-python-client", arvpkg[0].version,
559 "cwltool", cwlpkg[0].version)
562 def arg_parser(): # type: () -> argparse.ArgumentParser
563 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
565 parser.add_argument("--basedir", type=str,
566 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
567 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
568 help="Output directory, default current directory")
570 parser.add_argument("--eval-timeout",
571 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
575 exgroup = parser.add_mutually_exclusive_group()
576 exgroup.add_argument("--print-dot", action="store_true",
577 help="Print workflow visualization in graphviz format and exit")
578 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
579 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
581 exgroup = parser.add_mutually_exclusive_group()
582 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
583 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
584 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
586 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
588 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
590 exgroup = parser.add_mutually_exclusive_group()
591 exgroup.add_argument("--enable-reuse", action="store_true",
592 default=True, dest="enable_reuse",
593 help="Enable job or container reuse (default)")
594 exgroup.add_argument("--disable-reuse", action="store_false",
595 default=True, dest="enable_reuse",
596 help="Disable job or container reuse")
598 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
599 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
600 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
601 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
602 help="Ignore Docker image version when deciding whether to reuse past jobs.",
605 exgroup = parser.add_mutually_exclusive_group()
606 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
607 default=True, dest="submit")
608 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
609 default=True, dest="submit")
610 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
611 dest="create_workflow")
612 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
613 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
615 exgroup = parser.add_mutually_exclusive_group()
616 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
617 default=True, dest="wait")
618 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
619 default=True, dest="wait")
621 exgroup = parser.add_mutually_exclusive_group()
622 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
623 default=True, dest="log_timestamps")
624 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
625 default=True, dest="log_timestamps")
627 parser.add_argument("--api", type=str,
628 default=None, dest="work_api",
629 choices=("jobs", "containers"),
630 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
632 parser.add_argument("--compute-checksum", action="store_true", default=False,
633 help="Compute checksum of contents while collecting outputs",
634 dest="compute_checksum")
636 parser.add_argument("--submit-runner-ram", type=int,
637 help="RAM (in MiB) required for the workflow runner job (default 1024)",
640 parser.add_argument("--submit-runner-image", type=str,
641 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
644 parser.add_argument("--name", type=str,
645 help="Name to use for workflow execution instance.",
648 parser.add_argument("--on-error", type=str,
649 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
650 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
652 parser.add_argument("--enable-dev", action="store_true",
653 help="Enable loading and running development versions "
654 "of CWL spec.", default=False)
656 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
657 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
660 exgroup = parser.add_mutually_exclusive_group()
661 exgroup.add_argument("--trash-intermediate", action="store_true",
662 default=False, dest="trash_intermediate",
663 help="Immediately trash intermediate outputs on workflow success.")
664 exgroup.add_argument("--no-trash-intermediate", action="store_false",
665 default=False, dest="trash_intermediate",
666 help="Do not trash intermediate outputs (default).")
668 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
669 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
674 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
675 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
676 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
677 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
679 cwltool.process.supportedProcessRequirements.extend([
680 "http://arvados.org/cwl#RunInSingleContainer",
681 "http://arvados.org/cwl#OutputDirType",
682 "http://arvados.org/cwl#RuntimeConstraints",
683 "http://arvados.org/cwl#PartitionRequirement",
684 "http://arvados.org/cwl#APIRequirement",
685 "http://commonwl.org/cwltool#LoadListingRequirement",
686 "http://arvados.org/cwl#IntermediateOutput",
687 "http://arvados.org/cwl#ReuseRequirement"
690 def main(args, stdout, stderr, api_client=None, keep_client=None):
691 parser = arg_parser()
693 job_order_object = None
694 arvargs = parser.parse_args(args)
697 print versionstring()
700 if arvargs.update_workflow:
701 if arvargs.update_workflow.find('-7fd4e-') == 5:
702 want_api = 'containers'
703 elif arvargs.update_workflow.find('-p5p6p-') == 5:
707 if want_api and arvargs.work_api and want_api != arvargs.work_api:
708 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
709 arvargs.update_workflow, want_api, arvargs.work_api))
711 arvargs.work_api = want_api
713 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
714 job_order_object = ({}, "")
719 if api_client is None:
720 api_client=arvados.api('v1', model=OrderedJsonModel())
721 if keep_client is None:
722 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
723 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
724 num_retries=4, output_name=arvargs.output_name,
725 output_tags=arvargs.output_tags)
726 except Exception as e:
731 logger.setLevel(logging.DEBUG)
732 logging.getLogger('arvados').setLevel(logging.DEBUG)
735 logger.setLevel(logging.WARN)
736 logging.getLogger('arvados').setLevel(logging.WARN)
737 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
740 metrics.setLevel(logging.DEBUG)
741 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
743 if arvargs.log_timestamps:
744 arvados.log_handler.setFormatter(logging.Formatter(
745 '%(asctime)s %(name)s %(levelname)s: %(message)s',
746 '%Y-%m-%d %H:%M:%S'))
748 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
750 arvargs.conformance_test = None
751 arvargs.use_container = True
752 arvargs.relax_path_checks = True
753 arvargs.validate = None
754 arvargs.print_supported_versions = False
756 make_fs_access = partial(CollectionFsAccess,
757 collection_cache=runner.collection_cache)
759 return cwltool.main.main(args=arvargs,
762 executor=runner.arv_executor,
763 makeTool=runner.arv_make_tool,
764 versionfunc=versionstring,
765 job_order_object=job_order_object,
766 make_fs_access=make_fs_access,
767 fetcher_constructor=partial(CollectionFetcher,
768 api_client=api_client,
769 fs_access=make_fs_access(""),
770 num_retries=runner.num_retries),
771 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
772 logger_handler=arvados.log_handler,
773 custom_schema_callback=add_arv_hints)