3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
77 self.intermediate_output_ttl = 0
78 self.intermediate_output_collections = []
79 self.trash_intermediate = False
81 if keep_client is not None:
82 self.keep_client = keep_client
84 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
86 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
89 expected_api = ["jobs", "containers"]
90 for api in expected_api:
92 methods = self.api._rootDesc.get('resources')[api]['methods']
93 if ('httpMethod' in methods['create'] and
94 (work_api == api or work_api is None)):
100 if not self.work_api:
102 raise Exception("No supported APIs")
104 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
106 def arv_make_tool(self, toolpath_object, **kwargs):
107 kwargs["work_api"] = self.work_api
108 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
110 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
111 num_retries=self.num_retries)
112 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
113 return ArvadosCommandTool(self, toolpath_object, **kwargs)
114 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
115 return ArvadosWorkflow(self, toolpath_object, **kwargs)
117 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
119 def output_callback(self, out, processStatus):
120 if processStatus == "success":
121 logger.info("Overall process status is %s", processStatus)
123 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
124 body={"state": "Complete"}).execute(num_retries=self.num_retries)
126 logger.warn("Overall process status is %s", processStatus)
128 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
129 body={"state": "Failed"}).execute(num_retries=self.num_retries)
130 self.final_status = processStatus
131 self.final_output = out
133 def on_message(self, event):
134 if "object_uuid" in event:
135 if event["object_uuid"] in self.processes and event["event_type"] == "update":
136 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
137 uuid = event["object_uuid"]
139 j = self.processes[uuid]
140 logger.info("%s %s is Running", self.label(j), uuid)
142 j.update_pipeline_component(event["properties"]["new_attributes"])
143 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
144 uuid = event["object_uuid"]
147 j = self.processes[uuid]
148 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
149 with Perf(metrics, "done %s" % j.name):
150 j.done(event["properties"]["new_attributes"])
155 def label(self, obj):
156 return "[%s %s]" % (self.work_api[0:-1], obj.name)
158 def poll_states(self):
159 """Poll status of jobs or containers listed in the processes dict.
161 Runs in a separate thread.
166 self.stop_polling.wait(15)
167 if self.stop_polling.is_set():
170 keys = self.processes.keys()
174 if self.work_api == "containers":
175 table = self.poll_api.container_requests()
176 elif self.work_api == "jobs":
177 table = self.poll_api.jobs()
180 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
181 except Exception as e:
182 logger.warn("Error checking states on API server: %s", e)
185 for p in proc_states["items"]:
187 "object_uuid": p["uuid"],
188 "event_type": "update",
194 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
196 self.processes.clear()
200 self.stop_polling.set()
202 def get_uploaded(self):
203 return self.uploaded.copy()
205 def add_uploaded(self, src, pair):
206 self.uploaded[src] = pair
208 def add_intermediate_output(self, uuid):
210 self.intermediate_output_collections.append(uuid)
212 def trash_intermediate_output(self):
213 logger.info("Cleaning up intermediate output collections")
214 for i in self.intermediate_output_collections:
216 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
218 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
219 if sys.exc_info()[0] is KeyboardInterrupt:
222 def check_features(self, obj):
223 if isinstance(obj, dict):
224 if obj.get("writable"):
225 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
226 if obj.get("class") == "DockerRequirement":
227 if obj.get("dockerOutputDirectory"):
228 # TODO: can be supported by containers API, but not jobs API.
229 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
230 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
231 for v in obj.itervalues():
232 self.check_features(v)
233 elif isinstance(obj, list):
234 for i,v in enumerate(obj):
235 with SourceLine(obj, i, UnsupportedRequirement):
236 self.check_features(v)
238 def make_output_collection(self, name, tagsString, outputObj):
239 outputObj = copy.deepcopy(outputObj)
242 def capture(fileobj):
243 files.append(fileobj)
245 adjustDirObjs(outputObj, capture)
246 adjustFileObjs(outputObj, capture)
248 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
250 final = arvados.collection.Collection(api_client=self.api,
251 keep_client=self.keep_client,
252 num_retries=self.num_retries)
254 for k,v in generatemapper.items():
255 if k.startswith("_:"):
256 if v.type == "Directory":
258 if v.type == "CreateFile":
259 with final.open(v.target, "wb") as f:
260 f.write(v.resolved.encode("utf-8"))
263 if not k.startswith("keep:"):
264 raise Exception("Output source is not in keep or a literal")
266 srccollection = sp[0][5:]
268 reader = self.collection_cache.get(srccollection)
269 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
270 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
271 except arvados.errors.ArgumentError as e:
272 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
275 logger.warn("While preparing output collection: %s", e)
277 def rewrite(fileobj):
278 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
279 for k in ("basename", "listing", "contents"):
283 adjustDirObjs(outputObj, rewrite)
284 adjustFileObjs(outputObj, rewrite)
286 with final.open("cwl.output.json", "w") as f:
287 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
289 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
291 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
292 final.api_response()["name"],
293 final.manifest_locator())
295 final_uuid = final.manifest_locator()
296 tags = tagsString.split(',')
298 self.api.links().create(body={
299 "head_uuid": final_uuid, "link_class": "tag", "name": tag
300 }).execute(num_retries=self.num_retries)
302 def finalcollection(fileobj):
303 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
305 adjustDirObjs(outputObj, finalcollection)
306 adjustFileObjs(outputObj, finalcollection)
308 return (outputObj, final)
310 def set_crunch_output(self):
311 if self.work_api == "containers":
313 current = self.api.containers().current().execute(num_retries=self.num_retries)
314 except ApiError as e:
315 # Status code 404 just means we're not running in a container.
316 if e.resp.status != 404:
317 logger.info("Getting current container: %s", e)
320 self.api.containers().update(uuid=current['uuid'],
322 'output': self.final_output_collection.portable_data_hash(),
323 }).execute(num_retries=self.num_retries)
324 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
327 }).execute(num_retries=self.num_retries)
328 except Exception as e:
329 logger.info("Setting container output: %s", e)
330 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
331 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
333 'output': self.final_output_collection.portable_data_hash(),
334 'success': self.final_status == "success",
336 }).execute(num_retries=self.num_retries)
338 def arv_executor(self, tool, job_order, **kwargs):
339 self.debug = kwargs.get("debug")
341 tool.visit(self.check_features)
343 self.project_uuid = kwargs.get("project_uuid")
345 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
346 collection_cache=self.collection_cache)
347 self.fs_access = make_fs_access(kwargs["basedir"])
350 self.trash_intermediate = kwargs["trash_intermediate"]
351 if self.trash_intermediate and self.work_api != "containers":
352 raise Exception("--trash-intermediate is only supported with --api=containers.")
354 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
355 if self.intermediate_output_ttl and self.work_api != "containers":
356 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
357 if self.intermediate_output_ttl < 0:
358 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
360 if not kwargs.get("name"):
361 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
363 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
364 # Also uploads docker images.
365 upload_workflow_deps(self, tool)
367 # Reload tool object which may have been updated by
368 # upload_workflow_deps
369 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
370 makeTool=self.arv_make_tool,
371 loader=tool.doc_loader,
372 avsc_names=tool.doc_schema,
373 metadata=tool.metadata)
375 # Upload local file references in the job order.
376 job_order = upload_job_order(self, "%s input" % kwargs["name"],
379 existing_uuid = kwargs.get("update_workflow")
380 if existing_uuid or kwargs.get("create_workflow"):
381 # Create a pipeline template or workflow record and exit.
382 if self.work_api == "jobs":
383 tmpl = RunnerTemplate(self, tool, job_order,
384 kwargs.get("enable_reuse"),
386 submit_runner_ram=kwargs.get("submit_runner_ram"),
389 # cwltool.main will write our return value to stdout.
390 return (tmpl.uuid, "success")
391 elif self.work_api == "containers":
392 return (upload_workflow(self, tool, job_order,
395 submit_runner_ram=kwargs.get("submit_runner_ram"),
396 name=kwargs["name"]),
399 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
401 kwargs["make_fs_access"] = make_fs_access
402 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
403 kwargs["use_container"] = True
404 kwargs["tmpdir_prefix"] = "tmp"
405 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
407 if self.work_api == "containers":
408 kwargs["outdir"] = "/var/spool/cwl"
409 kwargs["docker_outdir"] = "/var/spool/cwl"
410 kwargs["tmpdir"] = "/tmp"
411 kwargs["docker_tmpdir"] = "/tmp"
412 elif self.work_api == "jobs":
413 kwargs["outdir"] = "$(task.outdir)"
414 kwargs["docker_outdir"] = "$(task.outdir)"
415 kwargs["tmpdir"] = "$(task.tmpdir)"
418 if kwargs.get("submit"):
419 # Submit a runner job to run the workflow for us.
420 if self.work_api == "containers":
421 if tool.tool["class"] == "CommandLineTool":
422 kwargs["runnerjob"] = tool.tool["id"]
423 upload_dependencies(self,
429 runnerjob = tool.job(job_order,
430 self.output_callback,
433 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
436 submit_runner_ram=kwargs.get("submit_runner_ram"),
437 name=kwargs.get("name"),
438 on_error=kwargs.get("on_error"),
439 submit_runner_image=kwargs.get("submit_runner_image"),
440 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
441 elif self.work_api == "jobs":
442 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
445 submit_runner_ram=kwargs.get("submit_runner_ram"),
446 name=kwargs.get("name"),
447 on_error=kwargs.get("on_error"),
448 submit_runner_image=kwargs.get("submit_runner_image"))
450 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
451 # Create pipeline for local run
452 self.pipeline = self.api.pipeline_instances().create(
454 "owner_uuid": self.project_uuid,
455 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
457 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
458 logger.info("Pipeline instance %s", self.pipeline["uuid"])
460 if runnerjob and not kwargs.get("wait"):
461 runnerjob.run(wait=kwargs.get("wait"))
462 return (runnerjob.uuid, "success")
464 self.poll_api = arvados.api('v1')
465 self.polling_thread = threading.Thread(target=self.poll_states)
466 self.polling_thread.start()
469 jobiter = iter((runnerjob,))
471 if "cwl_runner_job" in kwargs:
472 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
473 jobiter = tool.job(job_order,
474 self.output_callback,
479 # Will continue to hold the lock for the duration of this code
480 # except when in cond.wait(), at which point on_message can update
481 # job state and process output callbacks.
483 loopperf = Perf(metrics, "jobiter")
485 for runnable in jobiter:
488 if self.stop_polling.is_set():
492 with Perf(metrics, "run"):
493 runnable.run(**kwargs)
498 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
503 while self.processes:
506 except UnsupportedRequirement:
509 if sys.exc_info()[0] is KeyboardInterrupt:
510 logger.error("Interrupted, marking pipeline as failed")
512 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
514 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
515 body={"state": "Failed"}).execute(num_retries=self.num_retries)
516 if runnerjob and runnerjob.uuid and self.work_api == "containers":
517 self.api.container_requests().update(uuid=runnerjob.uuid,
518 body={"priority": "0"}).execute(num_retries=self.num_retries)
521 self.stop_polling.set()
522 self.polling_thread.join()
524 if self.final_status == "UnsupportedRequirement":
525 raise UnsupportedRequirement("Check log for details.")
527 if self.final_output is None:
528 raise WorkflowException("Workflow did not return a result.")
530 if kwargs.get("submit") and isinstance(runnerjob, Runner):
531 logger.info("Final output collection %s", runnerjob.final_output)
533 if self.output_name is None:
534 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
535 if self.output_tags is None:
536 self.output_tags = ""
537 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
538 self.set_crunch_output()
540 if kwargs.get("compute_checksum"):
541 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
542 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
544 if self.trash_intermediate and self.final_status == "success":
545 self.trash_intermediate_output()
547 return (self.final_output, self.final_status)
551 """Print version string of key packages for provenance and debugging."""
553 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
554 arvpkg = pkg_resources.require("arvados-python-client")
555 cwlpkg = pkg_resources.require("cwltool")
557 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
558 "arvados-python-client", arvpkg[0].version,
559 "cwltool", cwlpkg[0].version)
562 def arg_parser(): # type: () -> argparse.ArgumentParser
563 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
565 parser.add_argument("--basedir", type=str,
566 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
567 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
568 help="Output directory, default current directory")
570 parser.add_argument("--eval-timeout",
571 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
575 exgroup = parser.add_mutually_exclusive_group()
576 exgroup.add_argument("--print-dot", action="store_true",
577 help="Print workflow visualization in graphviz format and exit")
578 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
579 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
581 exgroup = parser.add_mutually_exclusive_group()
582 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
583 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
584 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
586 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
588 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
590 exgroup = parser.add_mutually_exclusive_group()
591 exgroup.add_argument("--enable-reuse", action="store_true",
592 default=True, dest="enable_reuse",
593 help="Enable job or container reuse (default)")
594 exgroup.add_argument("--disable-reuse", action="store_false",
595 default=True, dest="enable_reuse",
596 help="Disable job or container reuse")
598 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
599 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
600 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
601 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
602 help="Ignore Docker image version when deciding whether to reuse past jobs.",
605 exgroup = parser.add_mutually_exclusive_group()
606 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
607 default=True, dest="submit")
608 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
609 default=True, dest="submit")
610 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
611 dest="create_workflow")
612 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
613 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
615 exgroup = parser.add_mutually_exclusive_group()
616 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
617 default=True, dest="wait")
618 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
619 default=True, dest="wait")
621 exgroup = parser.add_mutually_exclusive_group()
622 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
623 default=True, dest="log_timestamps")
624 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
625 default=True, dest="log_timestamps")
627 parser.add_argument("--api", type=str,
628 default=None, dest="work_api",
629 choices=("jobs", "containers"),
630 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
632 parser.add_argument("--compute-checksum", action="store_true", default=False,
633 help="Compute checksum of contents while collecting outputs",
634 dest="compute_checksum")
636 parser.add_argument("--submit-runner-ram", type=int,
637 help="RAM (in MiB) required for the workflow runner job (default 1024)",
640 parser.add_argument("--submit-runner-image", type=str,
641 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
644 parser.add_argument("--name", type=str,
645 help="Name to use for workflow execution instance.",
648 parser.add_argument("--on-error", type=str,
649 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
650 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
652 parser.add_argument("--enable-dev", action="store_true",
653 help="Enable loading and running development versions "
654 "of CWL spec.", default=False)
656 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
657 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
660 exgroup = parser.add_mutually_exclusive_group()
661 exgroup.add_argument("--trash-intermediate", action="store_true",
662 default=False, dest="trash_intermediate",
663 help="Immediately trash intermediate outputs on workflow success.")
664 exgroup.add_argument("--no-trash-intermediate", action="store_false",
665 default=False, dest="trash_intermediate",
666 help="Do not trash intermediate outputs (default).")
668 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
669 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
674 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
675 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
676 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
677 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
679 cwltool.process.supportedProcessRequirements.extend([
680 "http://arvados.org/cwl#RunInSingleContainer",
681 "http://arvados.org/cwl#OutputDirType",
682 "http://arvados.org/cwl#RuntimeConstraints",
683 "http://arvados.org/cwl#PartitionRequirement",
684 "http://arvados.org/cwl#APIRequirement",
685 "http://commonwl.org/cwltool#LoadListingRequirement",
686 "http://arvados.org/cwl#IntermediateOutput"
689 def main(args, stdout, stderr, api_client=None, keep_client=None):
690 parser = arg_parser()
692 job_order_object = None
693 arvargs = parser.parse_args(args)
696 print versionstring()
699 if arvargs.update_workflow:
700 if arvargs.update_workflow.find('-7fd4e-') == 5:
701 want_api = 'containers'
702 elif arvargs.update_workflow.find('-p5p6p-') == 5:
706 if want_api and arvargs.work_api and want_api != arvargs.work_api:
707 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
708 arvargs.update_workflow, want_api, arvargs.work_api))
710 arvargs.work_api = want_api
712 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
713 job_order_object = ({}, "")
718 if api_client is None:
719 api_client=arvados.api('v1', model=OrderedJsonModel())
720 if keep_client is None:
721 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
722 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
723 num_retries=4, output_name=arvargs.output_name,
724 output_tags=arvargs.output_tags)
725 except Exception as e:
730 logger.setLevel(logging.DEBUG)
731 logging.getLogger('arvados').setLevel(logging.DEBUG)
734 logger.setLevel(logging.WARN)
735 logging.getLogger('arvados').setLevel(logging.WARN)
736 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
739 metrics.setLevel(logging.DEBUG)
740 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
742 if arvargs.log_timestamps:
743 arvados.log_handler.setFormatter(logging.Formatter(
744 '%(asctime)s %(name)s %(levelname)s: %(message)s',
745 '%Y-%m-%d %H:%M:%S'))
747 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
749 arvargs.conformance_test = None
750 arvargs.use_container = True
751 arvargs.relax_path_checks = True
752 arvargs.validate = None
754 make_fs_access = partial(CollectionFsAccess,
755 collection_cache=runner.collection_cache)
757 return cwltool.main.main(args=arvargs,
760 executor=runner.arv_executor,
761 makeTool=runner.arv_make_tool,
762 versionfunc=versionstring,
763 job_order_object=job_order_object,
764 make_fs_access=make_fs_access,
765 fetcher_constructor=partial(CollectionFetcher,
766 api_client=api_client,
767 fs_access=make_fs_access(""),
768 num_retries=runner.num_retries),
769 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
770 logger_handler=arvados.log_handler,
771 custom_schema_callback=add_arv_hints)