2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
21 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
53 arvados.log_handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s %(levelname)s: %(message)s',
57 DEFAULT_PRIORITY = 500
59 class ArvCwlRunner(object):
60 """Execute a CWL tool or workflow, submit work (using either jobs or
61 containers API), wait for them to complete, and report output.
65 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
69 self.workflow_eval_lock = threading.Condition(threading.RLock())
70 self.final_output = None
71 self.final_status = None
73 self.num_retries = num_retries
75 self.stop_polling = threading.Event()
78 self.final_output_collection = None
79 self.output_name = output_name
80 self.output_tags = output_tags
81 self.project_uuid = None
82 self.intermediate_output_ttl = 0
83 self.intermediate_output_collections = []
84 self.trash_intermediate = False
86 if keep_client is not None:
87 self.keep_client = keep_client
89 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
91 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
94 expected_api = ["jobs", "containers"]
95 for api in expected_api:
97 methods = self.api._rootDesc.get('resources')[api]['methods']
98 if ('httpMethod' in methods['create'] and
99 (work_api == api or work_api is None)):
105 if not self.work_api:
107 raise Exception("No supported APIs")
109 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
111 def arv_make_tool(self, toolpath_object, **kwargs):
112 kwargs["work_api"] = self.work_api
113 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
115 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116 num_retries=self.num_retries)
117 kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119 return ArvadosCommandTool(self, toolpath_object, **kwargs)
120 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121 return ArvadosWorkflow(self, toolpath_object, **kwargs)
123 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
125 def output_callback(self, out, processStatus):
126 if processStatus == "success":
127 logger.info("Overall process status is %s", processStatus)
129 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130 body={"state": "Complete"}).execute(num_retries=self.num_retries)
132 logger.warn("Overall process status is %s", processStatus)
134 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135 body={"state": "Failed"}).execute(num_retries=self.num_retries)
136 self.final_status = processStatus
137 self.final_output = out
139 def start_run(self, runnable, kwargs):
140 with self.workflow_eval_lock:
142 runnable.run(**kwargs)
144 def process_submitted(self, container):
145 with self.workflow_eval_lock:
146 self.processes[container.uuid] = container
149 def process_done(self, uuid):
150 with self.workflow_eval_lock:
151 if uuid in self.processes:
152 del self.processes[uuid]
154 def on_message(self, event):
155 if "object_uuid" in event:
156 if event["object_uuid"] in self.processes and event["event_type"] == "update":
157 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
158 uuid = event["object_uuid"]
159 with self.workflow_eval_lock:
160 j = self.processes[uuid]
161 logger.info("%s %s is Running", self.label(j), uuid)
163 j.update_pipeline_component(event["properties"]["new_attributes"])
164 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
165 uuid = event["object_uuid"]
166 with self.workflow_eval_lock:
167 j = self.processes[uuid]
168 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
169 with Perf(metrics, "done %s" % j.name):
170 j.done(event["properties"]["new_attributes"])
171 self.workflow_eval_lock.notify()
173 def label(self, obj):
174 return "[%s %s]" % (self.work_api[0:-1], obj.name)
176 def poll_states(self):
177 """Poll status of jobs or containers listed in the processes dict.
179 Runs in a separate thread.
184 self.stop_polling.wait(15)
185 if self.stop_polling.is_set():
187 with self.workflow_eval_lock:
188 keys = list(self.processes.keys())
192 if self.work_api == "containers":
193 table = self.poll_api.container_requests()
194 elif self.work_api == "jobs":
195 table = self.poll_api.jobs()
198 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
199 except Exception as e:
200 logger.warn("Error checking states on API server: %s", e)
203 for p in proc_states["items"]:
205 "object_uuid": p["uuid"],
206 "event_type": "update",
212 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
213 with workflow_eval_lock:
214 self.processes.clear()
215 self.workflow_eval_lock.notify()
217 self.stop_polling.set()
219 def get_uploaded(self):
220 return self.uploaded.copy()
222 def add_uploaded(self, src, pair):
223 self.uploaded[src] = pair
225 def add_intermediate_output(self, uuid):
227 self.intermediate_output_collections.append(uuid)
229 def trash_intermediate_output(self):
230 logger.info("Cleaning up intermediate output collections")
231 for i in self.intermediate_output_collections:
233 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
235 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
236 if sys.exc_info()[0] is KeyboardInterrupt:
239 def check_features(self, obj):
240 if isinstance(obj, dict):
241 if obj.get("writable") and self.work_api != "containers":
242 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
243 if obj.get("class") == "DockerRequirement":
244 if obj.get("dockerOutputDirectory"):
245 if self.work_api != "containers":
246 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
247 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
248 if not obj.get("dockerOutputDirectory").startswith('/'):
249 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
250 "Option 'dockerOutputDirectory' must be an absolute path.")
251 if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
252 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
253 for v in obj.itervalues():
254 self.check_features(v)
255 elif isinstance(obj, list):
256 for i,v in enumerate(obj):
257 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
258 self.check_features(v)
260 def make_output_collection(self, name, tagsString, outputObj):
261 outputObj = copy.deepcopy(outputObj)
264 def capture(fileobj):
265 files.append(fileobj)
267 adjustDirObjs(outputObj, capture)
268 adjustFileObjs(outputObj, capture)
270 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
272 final = arvados.collection.Collection(api_client=self.api,
273 keep_client=self.keep_client,
274 num_retries=self.num_retries)
276 for k,v in generatemapper.items():
277 if k.startswith("_:"):
278 if v.type == "Directory":
280 if v.type == "CreateFile":
281 with final.open(v.target, "wb") as f:
282 f.write(v.resolved.encode("utf-8"))
285 if not k.startswith("keep:"):
286 raise Exception("Output source is not in keep or a literal")
288 srccollection = sp[0][5:]
290 reader = self.collection_cache.get(srccollection)
291 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
292 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
293 except arvados.errors.ArgumentError as e:
294 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
297 logger.warn("While preparing output collection: %s", e)
299 def rewrite(fileobj):
300 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
301 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
305 adjustDirObjs(outputObj, rewrite)
306 adjustFileObjs(outputObj, rewrite)
308 with final.open("cwl.output.json", "w") as f:
309 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
311 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
313 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
314 final.api_response()["name"],
315 final.manifest_locator())
317 final_uuid = final.manifest_locator()
318 tags = tagsString.split(',')
320 self.api.links().create(body={
321 "head_uuid": final_uuid, "link_class": "tag", "name": tag
322 }).execute(num_retries=self.num_retries)
324 def finalcollection(fileobj):
325 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
327 adjustDirObjs(outputObj, finalcollection)
328 adjustFileObjs(outputObj, finalcollection)
330 return (outputObj, final)
332 def set_crunch_output(self):
333 if self.work_api == "containers":
335 current = self.api.containers().current().execute(num_retries=self.num_retries)
336 except ApiError as e:
337 # Status code 404 just means we're not running in a container.
338 if e.resp.status != 404:
339 logger.info("Getting current container: %s", e)
342 self.api.containers().update(uuid=current['uuid'],
344 'output': self.final_output_collection.portable_data_hash(),
345 }).execute(num_retries=self.num_retries)
346 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
349 }).execute(num_retries=self.num_retries)
350 except Exception as e:
351 logger.info("Setting container output: %s", e)
352 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
353 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
355 'output': self.final_output_collection.portable_data_hash(),
356 'success': self.final_status == "success",
358 }).execute(num_retries=self.num_retries)
360 def arv_executor(self, tool, job_order, **kwargs):
361 self.debug = kwargs.get("debug")
363 tool.visit(self.check_features)
365 self.project_uuid = kwargs.get("project_uuid")
367 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
368 collection_cache=self.collection_cache)
369 self.fs_access = make_fs_access(kwargs["basedir"])
370 self.secret_store = kwargs.get("secret_store")
372 self.trash_intermediate = kwargs["trash_intermediate"]
373 if self.trash_intermediate and self.work_api != "containers":
374 raise Exception("--trash-intermediate is only supported with --api=containers.")
376 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
377 if self.intermediate_output_ttl and self.work_api != "containers":
378 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
379 if self.intermediate_output_ttl < 0:
380 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
382 if not kwargs.get("name"):
383 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
385 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
386 # Also uploads docker images.
387 merged_map = upload_workflow_deps(self, tool)
389 # Reload tool object which may have been updated by
390 # upload_workflow_deps
391 # Don't validate this time because it will just print redundant errors.
392 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
393 makeTool=self.arv_make_tool,
394 loader=tool.doc_loader,
395 avsc_names=tool.doc_schema,
396 metadata=tool.metadata,
399 # Upload local file references in the job order.
400 job_order = upload_job_order(self, "%s input" % kwargs["name"],
403 existing_uuid = kwargs.get("update_workflow")
404 if existing_uuid or kwargs.get("create_workflow"):
405 # Create a pipeline template or workflow record and exit.
406 if self.work_api == "jobs":
407 tmpl = RunnerTemplate(self, tool, job_order,
408 kwargs.get("enable_reuse"),
410 submit_runner_ram=kwargs.get("submit_runner_ram"),
412 merged_map=merged_map)
414 # cwltool.main will write our return value to stdout.
415 return (tmpl.uuid, "success")
416 elif self.work_api == "containers":
417 return (upload_workflow(self, tool, job_order,
420 submit_runner_ram=kwargs.get("submit_runner_ram"),
422 merged_map=merged_map),
425 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
426 self.eval_timeout = kwargs.get("eval_timeout")
428 kwargs["make_fs_access"] = make_fs_access
429 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
430 kwargs["use_container"] = True
431 kwargs["tmpdir_prefix"] = "tmp"
432 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
434 if self.work_api == "containers":
435 if self.ignore_docker_for_reuse:
436 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
437 kwargs["outdir"] = "/var/spool/cwl"
438 kwargs["docker_outdir"] = "/var/spool/cwl"
439 kwargs["tmpdir"] = "/tmp"
440 kwargs["docker_tmpdir"] = "/tmp"
441 elif self.work_api == "jobs":
442 if kwargs["priority"] != DEFAULT_PRIORITY:
443 raise Exception("--priority not implemented for jobs API.")
444 kwargs["outdir"] = "$(task.outdir)"
445 kwargs["docker_outdir"] = "$(task.outdir)"
446 kwargs["tmpdir"] = "$(task.tmpdir)"
448 if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
449 raise Exception("--priority must be in the range 1..1000.")
452 if kwargs.get("submit"):
453 # Submit a runner job to run the workflow for us.
454 if self.work_api == "containers":
455 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
456 kwargs["runnerjob"] = tool.tool["id"]
457 runnerjob = tool.job(job_order,
458 self.output_callback,
461 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
464 submit_runner_ram=kwargs.get("submit_runner_ram"),
465 name=kwargs.get("name"),
466 on_error=kwargs.get("on_error"),
467 submit_runner_image=kwargs.get("submit_runner_image"),
468 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
469 merged_map=merged_map,
470 priority=kwargs.get("priority"),
471 secret_store=self.secret_store)
472 elif self.work_api == "jobs":
473 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
476 submit_runner_ram=kwargs.get("submit_runner_ram"),
477 name=kwargs.get("name"),
478 on_error=kwargs.get("on_error"),
479 submit_runner_image=kwargs.get("submit_runner_image"),
480 merged_map=merged_map)
481 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
482 # Create pipeline for local run
483 self.pipeline = self.api.pipeline_instances().create(
485 "owner_uuid": self.project_uuid,
486 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
488 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
489 logger.info("Pipeline instance %s", self.pipeline["uuid"])
491 if runnerjob and not kwargs.get("wait"):
492 runnerjob.run(wait=kwargs.get("wait"))
493 return (runnerjob.uuid, "success")
495 self.poll_api = arvados.api('v1')
496 self.polling_thread = threading.Thread(target=self.poll_states)
497 self.polling_thread.start()
500 jobiter = iter((runnerjob,))
502 if "cwl_runner_job" in kwargs:
503 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
504 jobiter = tool.job(job_order,
505 self.output_callback,
509 self.workflow_eval_lock.acquire()
510 # Holds the lock while this code runs and releases it when
511 # it is safe to do so in self.workflow_eval_lock.wait(),
512 # at which point on_message can update job state and
513 # process output callbacks.
515 loopperf = Perf(metrics, "jobiter")
517 for runnable in jobiter:
520 if self.stop_polling.is_set():
524 with Perf(metrics, "run"):
525 self.start_run(runnable, kwargs)
527 if (self.in_flight + len(self.processes)) > 0:
528 self.workflow_eval_lock.wait(1)
530 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
535 while self.processes:
536 self.workflow_eval_lock.wait(1)
538 except UnsupportedRequirement:
541 if sys.exc_info()[0] is KeyboardInterrupt:
542 logger.error("Interrupted, marking pipeline as failed")
544 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
546 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
547 body={"state": "Failed"}).execute(num_retries=self.num_retries)
548 if runnerjob and runnerjob.uuid and self.work_api == "containers":
549 self.api.container_requests().update(uuid=runnerjob.uuid,
550 body={"priority": "0"}).execute(num_retries=self.num_retries)
552 self.workflow_eval_lock.release()
553 self.stop_polling.set()
554 self.polling_thread.join()
556 if self.final_status == "UnsupportedRequirement":
557 raise UnsupportedRequirement("Check log for details.")
559 if self.final_output is None:
560 raise WorkflowException("Workflow did not return a result.")
562 if kwargs.get("submit") and isinstance(runnerjob, Runner):
563 logger.info("Final output collection %s", runnerjob.final_output)
565 if self.output_name is None:
566 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
567 if self.output_tags is None:
568 self.output_tags = ""
569 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
570 self.set_crunch_output()
572 if kwargs.get("compute_checksum"):
573 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
574 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
576 if self.trash_intermediate and self.final_status == "success":
577 self.trash_intermediate_output()
579 return (self.final_output, self.final_status)
583 """Print version string of key packages for provenance and debugging."""
585 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
586 arvpkg = pkg_resources.require("arvados-python-client")
587 cwlpkg = pkg_resources.require("cwltool")
589 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
590 "arvados-python-client", arvpkg[0].version,
591 "cwltool", cwlpkg[0].version)
594 def arg_parser(): # type: () -> argparse.ArgumentParser
595 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
597 parser.add_argument("--basedir", type=str,
598 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
599 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
600 help="Output directory, default current directory")
602 parser.add_argument("--eval-timeout",
603 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
607 exgroup = parser.add_mutually_exclusive_group()
608 exgroup.add_argument("--print-dot", action="store_true",
609 help="Print workflow visualization in graphviz format and exit")
610 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
611 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
613 exgroup = parser.add_mutually_exclusive_group()
614 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
615 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
616 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
618 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
620 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
622 exgroup = parser.add_mutually_exclusive_group()
623 exgroup.add_argument("--enable-reuse", action="store_true",
624 default=True, dest="enable_reuse",
625 help="Enable job or container reuse (default)")
626 exgroup.add_argument("--disable-reuse", action="store_false",
627 default=True, dest="enable_reuse",
628 help="Disable job or container reuse")
630 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
631 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
632 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
633 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
634 help="Ignore Docker image version when deciding whether to reuse past jobs.",
637 exgroup = parser.add_mutually_exclusive_group()
638 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
639 default=True, dest="submit")
640 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
641 default=True, dest="submit")
642 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
643 dest="create_workflow")
644 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
645 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
647 exgroup = parser.add_mutually_exclusive_group()
648 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
649 default=True, dest="wait")
650 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
651 default=True, dest="wait")
653 exgroup = parser.add_mutually_exclusive_group()
654 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
655 default=True, dest="log_timestamps")
656 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
657 default=True, dest="log_timestamps")
659 parser.add_argument("--api", type=str,
660 default=None, dest="work_api",
661 choices=("jobs", "containers"),
662 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
664 parser.add_argument("--compute-checksum", action="store_true", default=False,
665 help="Compute checksum of contents while collecting outputs",
666 dest="compute_checksum")
668 parser.add_argument("--submit-runner-ram", type=int,
669 help="RAM (in MiB) required for the workflow runner job (default 1024)",
672 parser.add_argument("--submit-runner-image", type=str,
673 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
676 parser.add_argument("--name", type=str,
677 help="Name to use for workflow execution instance.",
680 parser.add_argument("--on-error", type=str,
681 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
682 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
684 parser.add_argument("--enable-dev", action="store_true",
685 help="Enable loading and running development versions "
686 "of CWL spec.", default=False)
688 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
689 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
692 parser.add_argument("--priority", type=int,
693 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
694 default=DEFAULT_PRIORITY)
696 parser.add_argument("--disable-validate", dest="do_validate",
697 action="store_false", default=True,
698 help=argparse.SUPPRESS)
700 parser.add_argument("--disable-js-validation",
701 action="store_true", default=False,
702 help=argparse.SUPPRESS)
704 exgroup = parser.add_mutually_exclusive_group()
705 exgroup.add_argument("--trash-intermediate", action="store_true",
706 default=False, dest="trash_intermediate",
707 help="Immediately trash intermediate outputs on workflow success.")
708 exgroup.add_argument("--no-trash-intermediate", action="store_false",
709 default=False, dest="trash_intermediate",
710 help="Do not trash intermediate outputs (default).")
712 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
713 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
718 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
719 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
720 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
721 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
723 cwltool.process.supportedProcessRequirements.extend([
724 "http://arvados.org/cwl#RunInSingleContainer",
725 "http://arvados.org/cwl#OutputDirType",
726 "http://arvados.org/cwl#RuntimeConstraints",
727 "http://arvados.org/cwl#PartitionRequirement",
728 "http://arvados.org/cwl#APIRequirement",
729 "http://commonwl.org/cwltool#LoadListingRequirement",
730 "http://arvados.org/cwl#IntermediateOutput",
731 "http://arvados.org/cwl#ReuseRequirement"
734 def main(args, stdout, stderr, api_client=None, keep_client=None):
735 parser = arg_parser()
737 job_order_object = None
738 arvargs = parser.parse_args(args)
740 if arvargs.update_workflow:
741 if arvargs.update_workflow.find('-7fd4e-') == 5:
742 want_api = 'containers'
743 elif arvargs.update_workflow.find('-p5p6p-') == 5:
747 if want_api and arvargs.work_api and want_api != arvargs.work_api:
748 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
749 arvargs.update_workflow, want_api, arvargs.work_api))
751 arvargs.work_api = want_api
753 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
754 job_order_object = ({}, "")
759 if api_client is None:
760 api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
761 keep_client = api_client.keep
762 if keep_client is None:
763 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
764 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
765 num_retries=4, output_name=arvargs.output_name,
766 output_tags=arvargs.output_tags)
767 except Exception as e:
772 logger.setLevel(logging.DEBUG)
773 logging.getLogger('arvados').setLevel(logging.DEBUG)
776 logger.setLevel(logging.WARN)
777 logging.getLogger('arvados').setLevel(logging.WARN)
778 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
781 metrics.setLevel(logging.DEBUG)
782 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
784 if arvargs.log_timestamps:
785 arvados.log_handler.setFormatter(logging.Formatter(
786 '%(asctime)s %(name)s %(levelname)s: %(message)s',
787 '%Y-%m-%d %H:%M:%S'))
789 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
791 arvargs.conformance_test = None
792 arvargs.use_container = True
793 arvargs.relax_path_checks = True
794 arvargs.print_supported_versions = False
796 make_fs_access = partial(CollectionFsAccess,
797 collection_cache=runner.collection_cache)
799 return cwltool.main.main(args=arvargs,
802 executor=runner.arv_executor,
803 makeTool=runner.arv_make_tool,
804 versionfunc=versionstring,
805 job_order_object=job_order_object,
806 make_fs_access=make_fs_access,
807 fetcher_constructor=partial(CollectionFetcher,
808 api_client=api_client,
809 fs_access=make_fs_access(""),
810 num_retries=runner.num_retries),
811 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
812 logger_handler=arvados.log_handler,
813 custom_schema_callback=add_arv_hints)