3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
77 self.intermediate_output_ttl = 0
78 self.intermediate_output_collections = []
79 self.trash_intermediate = False
81 if keep_client is not None:
82 self.keep_client = keep_client
84 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
86 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
89 expected_api = ["jobs", "containers"]
90 for api in expected_api:
92 methods = self.api._rootDesc.get('resources')[api]['methods']
93 if ('httpMethod' in methods['create'] and
94 (work_api == api or work_api is None)):
100 if not self.work_api:
102 raise Exception("No supported APIs")
104 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
106 def arv_make_tool(self, toolpath_object, **kwargs):
107 kwargs["work_api"] = self.work_api
108 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
110 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
111 num_retries=self.num_retries,
112 overrides=kwargs.get("override_tools"))
113 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
114 return ArvadosCommandTool(self, toolpath_object, **kwargs)
115 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
116 return ArvadosWorkflow(self, toolpath_object, **kwargs)
118 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
120 def output_callback(self, out, processStatus):
121 if processStatus == "success":
122 logger.info("Overall process status is %s", processStatus)
124 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
125 body={"state": "Complete"}).execute(num_retries=self.num_retries)
127 logger.warn("Overall process status is %s", processStatus)
129 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130 body={"state": "Failed"}).execute(num_retries=self.num_retries)
131 self.final_status = processStatus
132 self.final_output = out
134 def on_message(self, event):
135 if "object_uuid" in event:
136 if event["object_uuid"] in self.processes and event["event_type"] == "update":
137 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
138 uuid = event["object_uuid"]
140 j = self.processes[uuid]
141 logger.info("%s %s is Running", self.label(j), uuid)
143 j.update_pipeline_component(event["properties"]["new_attributes"])
144 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
145 uuid = event["object_uuid"]
148 j = self.processes[uuid]
149 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
150 with Perf(metrics, "done %s" % j.name):
151 j.done(event["properties"]["new_attributes"])
156 def label(self, obj):
157 return "[%s %s]" % (self.work_api[0:-1], obj.name)
159 def poll_states(self):
160 """Poll status of jobs or containers listed in the processes dict.
162 Runs in a separate thread.
167 self.stop_polling.wait(15)
168 if self.stop_polling.is_set():
171 keys = self.processes.keys()
175 if self.work_api == "containers":
176 table = self.poll_api.container_requests()
177 elif self.work_api == "jobs":
178 table = self.poll_api.jobs()
181 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
182 except Exception as e:
183 logger.warn("Error checking states on API server: %s", e)
186 for p in proc_states["items"]:
188 "object_uuid": p["uuid"],
189 "event_type": "update",
195 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
197 self.processes.clear()
201 self.stop_polling.set()
203 def get_uploaded(self):
204 return self.uploaded.copy()
206 def add_uploaded(self, src, pair):
207 self.uploaded[src] = pair
209 def add_intermediate_output(self, uuid):
211 self.intermediate_output_collections.append(uuid)
213 def trash_intermediate_output(self):
214 logger.info("Cleaning up intermediate output collections")
215 for i in self.intermediate_output_collections:
217 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
219 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
220 if sys.exc_info()[0] is KeyboardInterrupt:
223 def check_features(self, obj):
224 if isinstance(obj, dict):
225 if obj.get("writable"):
226 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
227 if obj.get("class") == "DockerRequirement":
228 if obj.get("dockerOutputDirectory"):
229 # TODO: can be supported by containers API, but not jobs API.
230 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
231 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
232 for v in obj.itervalues():
233 self.check_features(v)
234 elif isinstance(obj, list):
235 for i,v in enumerate(obj):
236 with SourceLine(obj, i, UnsupportedRequirement):
237 self.check_features(v)
239 def make_output_collection(self, name, tagsString, outputObj):
240 outputObj = copy.deepcopy(outputObj)
243 def capture(fileobj):
244 files.append(fileobj)
246 adjustDirObjs(outputObj, capture)
247 adjustFileObjs(outputObj, capture)
249 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
251 final = arvados.collection.Collection(api_client=self.api,
252 keep_client=self.keep_client,
253 num_retries=self.num_retries)
255 for k,v in generatemapper.items():
256 if k.startswith("_:"):
257 if v.type == "Directory":
259 if v.type == "CreateFile":
260 with final.open(v.target, "wb") as f:
261 f.write(v.resolved.encode("utf-8"))
264 if not k.startswith("keep:"):
265 raise Exception("Output source is not in keep or a literal")
267 srccollection = sp[0][5:]
269 reader = self.collection_cache.get(srccollection)
270 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
271 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
272 except arvados.errors.ArgumentError as e:
273 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
276 logger.warn("While preparing output collection: %s", e)
278 def rewrite(fileobj):
279 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
280 for k in ("basename", "listing", "contents"):
284 adjustDirObjs(outputObj, rewrite)
285 adjustFileObjs(outputObj, rewrite)
287 with final.open("cwl.output.json", "w") as f:
288 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
290 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
292 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
293 final.api_response()["name"],
294 final.manifest_locator())
296 final_uuid = final.manifest_locator()
297 tags = tagsString.split(',')
299 self.api.links().create(body={
300 "head_uuid": final_uuid, "link_class": "tag", "name": tag
301 }).execute(num_retries=self.num_retries)
303 def finalcollection(fileobj):
304 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
306 adjustDirObjs(outputObj, finalcollection)
307 adjustFileObjs(outputObj, finalcollection)
309 return (outputObj, final)
311 def set_crunch_output(self):
312 if self.work_api == "containers":
314 current = self.api.containers().current().execute(num_retries=self.num_retries)
315 except ApiError as e:
316 # Status code 404 just means we're not running in a container.
317 if e.resp.status != 404:
318 logger.info("Getting current container: %s", e)
321 self.api.containers().update(uuid=current['uuid'],
323 'output': self.final_output_collection.portable_data_hash(),
324 }).execute(num_retries=self.num_retries)
325 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
328 }).execute(num_retries=self.num_retries)
329 except Exception as e:
330 logger.info("Setting container output: %s", e)
331 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
332 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
334 'output': self.final_output_collection.portable_data_hash(),
335 'success': self.final_status == "success",
337 }).execute(num_retries=self.num_retries)
339 def arv_executor(self, tool, job_order, **kwargs):
340 self.debug = kwargs.get("debug")
342 tool.visit(self.check_features)
344 self.project_uuid = kwargs.get("project_uuid")
346 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
347 collection_cache=self.collection_cache)
348 self.fs_access = make_fs_access(kwargs["basedir"])
351 self.trash_intermediate = kwargs["trash_intermediate"]
352 if self.trash_intermediate and self.work_api != "containers":
353 raise Exception("--trash-intermediate is only supported with --api=containers.")
355 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
356 if self.intermediate_output_ttl and self.work_api != "containers":
357 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
358 if self.intermediate_output_ttl < 0:
359 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
361 if not kwargs.get("name"):
362 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
364 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
365 # Also uploads docker images.
367 upload_workflow_deps(self, tool, override_tools)
369 # Reload tool object which may have been updated by
370 # upload_workflow_deps
371 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
372 makeTool=self.arv_make_tool,
373 loader=tool.doc_loader,
374 avsc_names=tool.doc_schema,
375 metadata=tool.metadata,
376 override_tools=override_tools)
378 # Upload local file references in the job order.
379 job_order = upload_job_order(self, "%s input" % kwargs["name"],
382 existing_uuid = kwargs.get("update_workflow")
383 if existing_uuid or kwargs.get("create_workflow"):
384 # Create a pipeline template or workflow record and exit.
385 if self.work_api == "jobs":
386 tmpl = RunnerTemplate(self, tool, job_order,
387 kwargs.get("enable_reuse"),
389 submit_runner_ram=kwargs.get("submit_runner_ram"),
392 # cwltool.main will write our return value to stdout.
393 return (tmpl.uuid, "success")
394 elif self.work_api == "containers":
395 return (upload_workflow(self, tool, job_order,
398 submit_runner_ram=kwargs.get("submit_runner_ram"),
399 name=kwargs["name"]),
402 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
404 kwargs["make_fs_access"] = make_fs_access
405 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
406 kwargs["use_container"] = True
407 kwargs["tmpdir_prefix"] = "tmp"
408 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
410 if self.work_api == "containers":
411 kwargs["outdir"] = "/var/spool/cwl"
412 kwargs["docker_outdir"] = "/var/spool/cwl"
413 kwargs["tmpdir"] = "/tmp"
414 kwargs["docker_tmpdir"] = "/tmp"
415 elif self.work_api == "jobs":
416 kwargs["outdir"] = "$(task.outdir)"
417 kwargs["docker_outdir"] = "$(task.outdir)"
418 kwargs["tmpdir"] = "$(task.tmpdir)"
421 if kwargs.get("submit"):
422 # Submit a runner job to run the workflow for us.
423 if self.work_api == "containers":
424 if tool.tool["class"] == "CommandLineTool":
425 kwargs["runnerjob"] = tool.tool["id"]
426 upload_dependencies(self,
432 runnerjob = tool.job(job_order,
433 self.output_callback,
436 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
439 submit_runner_ram=kwargs.get("submit_runner_ram"),
440 name=kwargs.get("name"),
441 on_error=kwargs.get("on_error"),
442 submit_runner_image=kwargs.get("submit_runner_image"),
443 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
444 elif self.work_api == "jobs":
445 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
448 submit_runner_ram=kwargs.get("submit_runner_ram"),
449 name=kwargs.get("name"),
450 on_error=kwargs.get("on_error"),
451 submit_runner_image=kwargs.get("submit_runner_image"))
452 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
453 # Create pipeline for local run
454 self.pipeline = self.api.pipeline_instances().create(
456 "owner_uuid": self.project_uuid,
457 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
459 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
460 logger.info("Pipeline instance %s", self.pipeline["uuid"])
462 if runnerjob and not kwargs.get("wait"):
463 runnerjob.run(wait=kwargs.get("wait"))
464 return (runnerjob.uuid, "success")
466 self.poll_api = arvados.api('v1')
467 self.polling_thread = threading.Thread(target=self.poll_states)
468 self.polling_thread.start()
471 jobiter = iter((runnerjob,))
473 if "cwl_runner_job" in kwargs:
474 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
475 jobiter = tool.job(job_order,
476 self.output_callback,
481 # Will continue to hold the lock for the duration of this code
482 # except when in cond.wait(), at which point on_message can update
483 # job state and process output callbacks.
485 loopperf = Perf(metrics, "jobiter")
487 for runnable in jobiter:
490 if self.stop_polling.is_set():
494 with Perf(metrics, "run"):
495 runnable.run(**kwargs)
500 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
505 while self.processes:
508 except UnsupportedRequirement:
511 if sys.exc_info()[0] is KeyboardInterrupt:
512 logger.error("Interrupted, marking pipeline as failed")
514 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
516 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
517 body={"state": "Failed"}).execute(num_retries=self.num_retries)
518 if runnerjob and runnerjob.uuid and self.work_api == "containers":
519 self.api.container_requests().update(uuid=runnerjob.uuid,
520 body={"priority": "0"}).execute(num_retries=self.num_retries)
523 self.stop_polling.set()
524 self.polling_thread.join()
526 if self.final_status == "UnsupportedRequirement":
527 raise UnsupportedRequirement("Check log for details.")
529 if self.final_output is None:
530 raise WorkflowException("Workflow did not return a result.")
532 if kwargs.get("submit") and isinstance(runnerjob, Runner):
533 logger.info("Final output collection %s", runnerjob.final_output)
535 if self.output_name is None:
536 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
537 if self.output_tags is None:
538 self.output_tags = ""
539 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
540 self.set_crunch_output()
542 if kwargs.get("compute_checksum"):
543 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
544 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
546 if self.trash_intermediate and self.final_status == "success":
547 self.trash_intermediate_output()
549 return (self.final_output, self.final_status)
553 """Print version string of key packages for provenance and debugging."""
555 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
556 arvpkg = pkg_resources.require("arvados-python-client")
557 cwlpkg = pkg_resources.require("cwltool")
559 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
560 "arvados-python-client", arvpkg[0].version,
561 "cwltool", cwlpkg[0].version)
564 def arg_parser(): # type: () -> argparse.ArgumentParser
565 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
567 parser.add_argument("--basedir", type=str,
568 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
569 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
570 help="Output directory, default current directory")
572 parser.add_argument("--eval-timeout",
573 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
577 exgroup = parser.add_mutually_exclusive_group()
578 exgroup.add_argument("--print-dot", action="store_true",
579 help="Print workflow visualization in graphviz format and exit")
580 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
581 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
583 exgroup = parser.add_mutually_exclusive_group()
584 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
585 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
586 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
588 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
590 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
592 exgroup = parser.add_mutually_exclusive_group()
593 exgroup.add_argument("--enable-reuse", action="store_true",
594 default=True, dest="enable_reuse",
595 help="Enable job or container reuse (default)")
596 exgroup.add_argument("--disable-reuse", action="store_false",
597 default=True, dest="enable_reuse",
598 help="Disable job or container reuse")
600 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
601 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
602 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
603 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
604 help="Ignore Docker image version when deciding whether to reuse past jobs.",
607 exgroup = parser.add_mutually_exclusive_group()
608 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
609 default=True, dest="submit")
610 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
611 default=True, dest="submit")
612 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
613 dest="create_workflow")
614 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
615 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
617 exgroup = parser.add_mutually_exclusive_group()
618 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
619 default=True, dest="wait")
620 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
621 default=True, dest="wait")
623 exgroup = parser.add_mutually_exclusive_group()
624 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
625 default=True, dest="log_timestamps")
626 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
627 default=True, dest="log_timestamps")
629 parser.add_argument("--api", type=str,
630 default=None, dest="work_api",
631 choices=("jobs", "containers"),
632 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
634 parser.add_argument("--compute-checksum", action="store_true", default=False,
635 help="Compute checksum of contents while collecting outputs",
636 dest="compute_checksum")
638 parser.add_argument("--submit-runner-ram", type=int,
639 help="RAM (in MiB) required for the workflow runner job (default 1024)",
642 parser.add_argument("--submit-runner-image", type=str,
643 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
646 parser.add_argument("--name", type=str,
647 help="Name to use for workflow execution instance.",
650 parser.add_argument("--on-error", type=str,
651 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
652 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
654 parser.add_argument("--enable-dev", action="store_true",
655 help="Enable loading and running development versions "
656 "of CWL spec.", default=False)
658 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
659 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
662 exgroup = parser.add_mutually_exclusive_group()
663 exgroup.add_argument("--trash-intermediate", action="store_true",
664 default=False, dest="trash_intermediate",
665 help="Immediately trash intermediate outputs on workflow success.")
666 exgroup.add_argument("--no-trash-intermediate", action="store_false",
667 default=False, dest="trash_intermediate",
668 help="Do not trash intermediate outputs (default).")
670 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
671 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
676 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
677 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
678 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
679 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
681 cwltool.process.supportedProcessRequirements.extend([
682 "http://arvados.org/cwl#RunInSingleContainer",
683 "http://arvados.org/cwl#OutputDirType",
684 "http://arvados.org/cwl#RuntimeConstraints",
685 "http://arvados.org/cwl#PartitionRequirement",
686 "http://arvados.org/cwl#APIRequirement",
687 "http://commonwl.org/cwltool#LoadListingRequirement",
688 "http://arvados.org/cwl#IntermediateOutput",
689 "http://arvados.org/cwl#ReuseRequirement"
692 def main(args, stdout, stderr, api_client=None, keep_client=None):
693 parser = arg_parser()
695 job_order_object = None
696 arvargs = parser.parse_args(args)
699 print versionstring()
702 if arvargs.update_workflow:
703 if arvargs.update_workflow.find('-7fd4e-') == 5:
704 want_api = 'containers'
705 elif arvargs.update_workflow.find('-p5p6p-') == 5:
709 if want_api and arvargs.work_api and want_api != arvargs.work_api:
710 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
711 arvargs.update_workflow, want_api, arvargs.work_api))
713 arvargs.work_api = want_api
715 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
716 job_order_object = ({}, "")
721 if api_client is None:
722 api_client=arvados.api('v1', model=OrderedJsonModel())
723 if keep_client is None:
724 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
725 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
726 num_retries=4, output_name=arvargs.output_name,
727 output_tags=arvargs.output_tags)
728 except Exception as e:
733 logger.setLevel(logging.DEBUG)
734 logging.getLogger('arvados').setLevel(logging.DEBUG)
737 logger.setLevel(logging.WARN)
738 logging.getLogger('arvados').setLevel(logging.WARN)
739 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
742 metrics.setLevel(logging.DEBUG)
743 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
745 if arvargs.log_timestamps:
746 arvados.log_handler.setFormatter(logging.Formatter(
747 '%(asctime)s %(name)s %(levelname)s: %(message)s',
748 '%Y-%m-%d %H:%M:%S'))
750 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
752 arvargs.conformance_test = None
753 arvargs.use_container = True
754 arvargs.relax_path_checks = True
755 arvargs.validate = None
757 make_fs_access = partial(CollectionFsAccess,
758 collection_cache=runner.collection_cache)
760 return cwltool.main.main(args=arvargs,
763 executor=runner.arv_executor,
764 makeTool=runner.arv_make_tool,
765 versionfunc=versionstring,
766 job_order_object=job_order_object,
767 make_fs_access=make_fs_access,
768 fetcher_constructor=partial(CollectionFetcher,
769 api_client=api_client,
770 fs_access=make_fs_access(""),
771 num_retries=runner.num_retries),
772 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
773 logger_handler=arvados.log_handler,
774 custom_schema_callback=add_arv_hints)