2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
23 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 import cwltool.process
27 from schema_salad.sourceline import SourceLine
28 import schema_salad.validate as validate
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
35 from .arvcontainer import ArvadosContainer, RunnerContainer
36 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
37 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
38 from .arvtool import ArvadosCommandTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from ._version import __version__
45 from cwltool.pack import pack
46 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
47 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
48 from cwltool.command_line_tool import compute_checksums
49 from arvados.api import OrderedJsonModel
51 logger = logging.getLogger('arvados.cwl-runner')
52 metrics = logging.getLogger('arvados.cwl-runner.metrics')
53 logger.setLevel(logging.INFO)
55 arvados.log_handler.setFormatter(logging.Formatter(
56 '%(asctime)s %(name)s %(levelname)s: %(message)s',
59 DEFAULT_PRIORITY = 500
61 class ArvCwlRunner(object):
62 """Execute a CWL tool or workflow, submit work (using either jobs or
63 containers API), wait for them to complete, and report output.
67 def __init__(self, api_client, work_api=None, keep_client=None,
68 output_name=None, output_tags=None, num_retries=4,
73 self.workflow_eval_lock = threading.Condition(threading.RLock())
74 self.final_output = None
75 self.final_status = None
77 self.num_retries = num_retries
79 self.stop_polling = threading.Event()
82 self.final_output_collection = None
83 self.output_name = output_name
84 self.output_tags = output_tags
85 self.project_uuid = None
86 self.intermediate_output_ttl = 0
87 self.intermediate_output_collections = []
88 self.trash_intermediate = False
89 self.task_queue = Queue.Queue()
90 self.task_queue_threads = []
91 self.thread_count = thread_count
92 self.poll_interval = 12
94 if keep_client is not None:
95 self.keep_client = keep_client
97 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
99 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
102 expected_api = ["jobs", "containers"]
103 for api in expected_api:
105 methods = self.api._rootDesc.get('resources')[api]['methods']
106 if ('httpMethod' in methods['create'] and
107 (work_api == api or work_api is None)):
113 if not self.work_api:
115 raise Exception("No supported APIs")
117 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
119 def arv_make_tool(self, toolpath_object, **kwargs):
120 kwargs["work_api"] = self.work_api
121 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
123 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
124 num_retries=self.num_retries)
125 kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
126 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
127 return ArvadosCommandTool(self, toolpath_object, **kwargs)
128 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
129 return ArvadosWorkflow(self, toolpath_object, **kwargs)
131 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
133 def output_callback(self, out, processStatus):
134 with self.workflow_eval_lock:
135 if processStatus == "success":
136 logger.info("Overall process status is %s", processStatus)
138 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
139 body={"state": "Complete"}).execute(num_retries=self.num_retries)
141 logger.warn("Overall process status is %s", processStatus)
143 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
144 body={"state": "Failed"}).execute(num_retries=self.num_retries)
145 self.final_status = processStatus
146 self.final_output = out
148 def task_queue_func(self):
150 task = self.task_queue.get()
155 def task_queue_add(self, task):
156 if self.thread_count > 1:
157 self.task_queue.put(task)
161 def start_run(self, runnable, kwargs):
162 with self.workflow_eval_lock:
164 self.task_queue_add(partial(runnable.run, **kwargs))
166 def process_submitted(self, container):
167 with self.workflow_eval_lock:
168 self.processes[container.uuid] = container
171 def process_done(self, uuid):
172 with self.workflow_eval_lock:
173 if uuid in self.processes:
174 del self.processes[uuid]
176 def wrapped_callback(self, cb, obj, st):
177 with self.workflow_eval_lock:
180 def get_wrapped_callback(self, cb):
181 return partial(self.wrapped_callback, cb)
183 def on_message(self, event):
184 if "object_uuid" in event:
185 if event["object_uuid"] in self.processes and event["event_type"] == "update":
186 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
187 uuid = event["object_uuid"]
188 with self.workflow_eval_lock:
189 j = self.processes[uuid]
190 logger.info("%s %s is Running", self.label(j), uuid)
192 j.update_pipeline_component(event["properties"]["new_attributes"])
193 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
194 uuid = event["object_uuid"]
195 with self.workflow_eval_lock:
196 j = self.processes[uuid]
197 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
199 j.done(event["properties"]["new_attributes"])
200 with self.workflow_eval_lock:
201 self.workflow_eval_lock.notify()
202 self.task_queue_add(done_cb)
205 def label(self, obj):
206 return "[%s %s]" % (self.work_api[0:-1], obj.name)
208 def poll_states(self):
209 """Poll status of jobs or containers listed in the processes dict.
211 Runs in a separate thread.
215 remain_wait = self.poll_interval
218 self.stop_polling.wait(remain_wait)
219 if self.stop_polling.is_set():
221 with self.workflow_eval_lock:
222 keys = list(self.processes.keys())
224 remain_wait = self.poll_interval
227 begin_poll = time.time()
228 if self.work_api == "containers":
229 table = self.poll_api.container_requests()
230 elif self.work_api == "jobs":
231 table = self.poll_api.jobs()
234 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
235 except Exception as e:
236 logger.warn("Error checking states on API server: %s", e)
237 remain_wait = self.poll_interval
240 for p in proc_states["items"]:
242 "object_uuid": p["uuid"],
243 "event_type": "update",
248 finish_poll = time.time()
249 remain_wait = self.poll_interval - (finish_poll - begin_poll)
251 logger.exception("Fatal error in state polling thread.")
252 with self.workflow_eval_lock:
253 self.processes.clear()
254 self.workflow_eval_lock.notify()
256 self.stop_polling.set()
258 def get_uploaded(self):
259 return self.uploaded.copy()
261 def add_uploaded(self, src, pair):
262 self.uploaded[src] = pair
264 def add_intermediate_output(self, uuid):
266 self.intermediate_output_collections.append(uuid)
268 def trash_intermediate_output(self):
269 logger.info("Cleaning up intermediate output collections")
270 for i in self.intermediate_output_collections:
272 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
274 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
275 if sys.exc_info()[0] is KeyboardInterrupt:
278 def check_features(self, obj):
279 if isinstance(obj, dict):
280 if obj.get("writable") and self.work_api != "containers":
281 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
282 if obj.get("class") == "DockerRequirement":
283 if obj.get("dockerOutputDirectory"):
284 if self.work_api != "containers":
285 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
286 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
287 if not obj.get("dockerOutputDirectory").startswith('/'):
288 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
289 "Option 'dockerOutputDirectory' must be an absolute path.")
290 if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
291 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
292 for v in obj.itervalues():
293 self.check_features(v)
294 elif isinstance(obj, list):
295 for i,v in enumerate(obj):
296 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
297 self.check_features(v)
299 def make_output_collection(self, name, tagsString, outputObj):
300 outputObj = copy.deepcopy(outputObj)
303 def capture(fileobj):
304 files.append(fileobj)
306 adjustDirObjs(outputObj, capture)
307 adjustFileObjs(outputObj, capture)
309 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
311 final = arvados.collection.Collection(api_client=self.api,
312 keep_client=self.keep_client,
313 num_retries=self.num_retries)
315 for k,v in generatemapper.items():
316 if k.startswith("_:"):
317 if v.type == "Directory":
319 if v.type == "CreateFile":
320 with final.open(v.target, "wb") as f:
321 f.write(v.resolved.encode("utf-8"))
324 if not k.startswith("keep:"):
325 raise Exception("Output source is not in keep or a literal")
327 srccollection = sp[0][5:]
329 reader = self.collection_cache.get(srccollection)
330 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
331 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
332 except arvados.errors.ArgumentError as e:
333 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
336 logger.warn("While preparing output collection: %s", e)
338 def rewrite(fileobj):
339 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
340 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
344 adjustDirObjs(outputObj, rewrite)
345 adjustFileObjs(outputObj, rewrite)
347 with final.open("cwl.output.json", "w") as f:
348 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
350 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
352 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
353 final.api_response()["name"],
354 final.manifest_locator())
356 final_uuid = final.manifest_locator()
357 tags = tagsString.split(',')
359 self.api.links().create(body={
360 "head_uuid": final_uuid, "link_class": "tag", "name": tag
361 }).execute(num_retries=self.num_retries)
363 def finalcollection(fileobj):
364 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
366 adjustDirObjs(outputObj, finalcollection)
367 adjustFileObjs(outputObj, finalcollection)
369 return (outputObj, final)
371 def set_crunch_output(self):
372 if self.work_api == "containers":
374 current = self.api.containers().current().execute(num_retries=self.num_retries)
375 except ApiError as e:
376 # Status code 404 just means we're not running in a container.
377 if e.resp.status != 404:
378 logger.info("Getting current container: %s", e)
381 self.api.containers().update(uuid=current['uuid'],
383 'output': self.final_output_collection.portable_data_hash(),
384 }).execute(num_retries=self.num_retries)
385 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
388 }).execute(num_retries=self.num_retries)
389 except Exception as e:
390 logger.info("Setting container output: %s", e)
391 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
392 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
394 'output': self.final_output_collection.portable_data_hash(),
395 'success': self.final_status == "success",
397 }).execute(num_retries=self.num_retries)
399 def arv_executor(self, tool, job_order, **kwargs):
400 self.debug = kwargs.get("debug")
402 tool.visit(self.check_features)
404 self.project_uuid = kwargs.get("project_uuid")
406 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
407 collection_cache=self.collection_cache)
408 self.fs_access = make_fs_access(kwargs["basedir"])
409 self.secret_store = kwargs.get("secret_store")
411 self.trash_intermediate = kwargs["trash_intermediate"]
412 if self.trash_intermediate and self.work_api != "containers":
413 raise Exception("--trash-intermediate is only supported with --api=containers.")
415 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
416 if self.intermediate_output_ttl and self.work_api != "containers":
417 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
418 if self.intermediate_output_ttl < 0:
419 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
421 if not kwargs.get("name"):
422 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
424 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
425 # Also uploads docker images.
426 merged_map = upload_workflow_deps(self, tool)
428 # Reload tool object which may have been updated by
429 # upload_workflow_deps
430 # Don't validate this time because it will just print redundant errors.
431 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
432 makeTool=self.arv_make_tool,
433 loader=tool.doc_loader,
434 avsc_names=tool.doc_schema,
435 metadata=tool.metadata,
438 # Upload local file references in the job order.
439 job_order = upload_job_order(self, "%s input" % kwargs["name"],
442 existing_uuid = kwargs.get("update_workflow")
443 if existing_uuid or kwargs.get("create_workflow"):
444 # Create a pipeline template or workflow record and exit.
445 if self.work_api == "jobs":
446 tmpl = RunnerTemplate(self, tool, job_order,
447 kwargs.get("enable_reuse"),
449 submit_runner_ram=kwargs.get("submit_runner_ram"),
451 merged_map=merged_map)
453 # cwltool.main will write our return value to stdout.
454 return (tmpl.uuid, "success")
455 elif self.work_api == "containers":
456 return (upload_workflow(self, tool, job_order,
459 submit_runner_ram=kwargs.get("submit_runner_ram"),
461 merged_map=merged_map),
464 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
465 self.eval_timeout = kwargs.get("eval_timeout")
467 kwargs["make_fs_access"] = make_fs_access
468 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
469 kwargs["use_container"] = True
470 kwargs["tmpdir_prefix"] = "tmp"
471 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
473 if self.work_api == "containers":
474 if self.ignore_docker_for_reuse:
475 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
476 kwargs["outdir"] = "/var/spool/cwl"
477 kwargs["docker_outdir"] = "/var/spool/cwl"
478 kwargs["tmpdir"] = "/tmp"
479 kwargs["docker_tmpdir"] = "/tmp"
480 elif self.work_api == "jobs":
481 if kwargs["priority"] != DEFAULT_PRIORITY:
482 raise Exception("--priority not implemented for jobs API.")
483 kwargs["outdir"] = "$(task.outdir)"
484 kwargs["docker_outdir"] = "$(task.outdir)"
485 kwargs["tmpdir"] = "$(task.tmpdir)"
487 if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
488 raise Exception("--priority must be in the range 1..1000.")
491 if kwargs.get("submit"):
492 # Submit a runner job to run the workflow for us.
493 if self.work_api == "containers":
494 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
495 kwargs["runnerjob"] = tool.tool["id"]
496 runnerjob = tool.job(job_order,
497 self.output_callback,
500 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
503 submit_runner_ram=kwargs.get("submit_runner_ram"),
504 name=kwargs.get("name"),
505 on_error=kwargs.get("on_error"),
506 submit_runner_image=kwargs.get("submit_runner_image"),
507 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
508 merged_map=merged_map,
509 priority=kwargs.get("priority"),
510 secret_store=self.secret_store)
511 elif self.work_api == "jobs":
512 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
515 submit_runner_ram=kwargs.get("submit_runner_ram"),
516 name=kwargs.get("name"),
517 on_error=kwargs.get("on_error"),
518 submit_runner_image=kwargs.get("submit_runner_image"),
519 merged_map=merged_map)
520 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
521 # Create pipeline for local run
522 self.pipeline = self.api.pipeline_instances().create(
524 "owner_uuid": self.project_uuid,
525 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
527 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
528 logger.info("Pipeline instance %s", self.pipeline["uuid"])
530 if runnerjob and not kwargs.get("wait"):
531 runnerjob.run(**kwargs)
532 return (runnerjob.uuid, "success")
534 self.poll_api = arvados.api('v1')
535 self.polling_thread = threading.Thread(target=self.poll_states)
536 self.polling_thread.start()
538 for r in xrange(0, self.thread_count):
539 t = threading.Thread(target=self.task_queue_func)
540 self.task_queue_threads.append(t)
544 jobiter = iter((runnerjob,))
546 if "cwl_runner_job" in kwargs:
547 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
548 jobiter = tool.job(job_order,
549 self.output_callback,
553 self.workflow_eval_lock.acquire()
554 # Holds the lock while this code runs and releases it when
555 # it is safe to do so in self.workflow_eval_lock.wait(),
556 # at which point on_message can update job state and
557 # process output callbacks.
559 loopperf = Perf(metrics, "jobiter")
561 for runnable in jobiter:
564 if self.stop_polling.is_set():
568 with Perf(metrics, "run"):
569 self.start_run(runnable, kwargs)
571 if (self.in_flight + len(self.processes)) > 0:
572 self.workflow_eval_lock.wait(3)
574 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
579 while (self.in_flight + len(self.processes)) > 0:
580 self.workflow_eval_lock.wait(3)
582 except UnsupportedRequirement:
585 if sys.exc_info()[0] is KeyboardInterrupt:
586 logger.error("Interrupted, marking pipeline as failed")
588 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
590 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
591 body={"state": "Failed"}).execute(num_retries=self.num_retries)
592 if runnerjob and runnerjob.uuid and self.work_api == "containers":
593 self.api.container_requests().update(uuid=runnerjob.uuid,
594 body={"priority": "0"}).execute(num_retries=self.num_retries)
596 self.workflow_eval_lock.release()
599 while not self.task_queue.empty():
600 self.task_queue.get()
603 self.stop_polling.set()
604 self.polling_thread.join()
605 for t in self.task_queue_threads:
606 self.task_queue.put(None)
607 for t in self.task_queue_threads:
610 if self.final_status == "UnsupportedRequirement":
611 raise UnsupportedRequirement("Check log for details.")
613 if self.final_output is None:
614 raise WorkflowException("Workflow did not return a result.")
616 if kwargs.get("submit") and isinstance(runnerjob, Runner):
617 logger.info("Final output collection %s", runnerjob.final_output)
619 if self.output_name is None:
620 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
621 if self.output_tags is None:
622 self.output_tags = ""
623 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
624 self.set_crunch_output()
626 if kwargs.get("compute_checksum"):
627 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
628 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
630 if self.trash_intermediate and self.final_status == "success":
631 self.trash_intermediate_output()
633 return (self.final_output, self.final_status)
637 """Print version string of key packages for provenance and debugging."""
639 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
640 arvpkg = pkg_resources.require("arvados-python-client")
641 cwlpkg = pkg_resources.require("cwltool")
643 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
644 "arvados-python-client", arvpkg[0].version,
645 "cwltool", cwlpkg[0].version)
648 def arg_parser(): # type: () -> argparse.ArgumentParser
649 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
651 parser.add_argument("--basedir", type=str,
652 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
653 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
654 help="Output directory, default current directory")
656 parser.add_argument("--eval-timeout",
657 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
661 exgroup = parser.add_mutually_exclusive_group()
662 exgroup.add_argument("--print-dot", action="store_true",
663 help="Print workflow visualization in graphviz format and exit")
664 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
665 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
667 exgroup = parser.add_mutually_exclusive_group()
668 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
669 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
670 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
672 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
674 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
676 exgroup = parser.add_mutually_exclusive_group()
677 exgroup.add_argument("--enable-reuse", action="store_true",
678 default=True, dest="enable_reuse",
679 help="Enable job or container reuse (default)")
680 exgroup.add_argument("--disable-reuse", action="store_false",
681 default=True, dest="enable_reuse",
682 help="Disable job or container reuse")
684 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
685 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
686 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
687 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
688 help="Ignore Docker image version when deciding whether to reuse past jobs.",
691 exgroup = parser.add_mutually_exclusive_group()
692 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
693 default=True, dest="submit")
694 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
695 default=True, dest="submit")
696 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
697 dest="create_workflow")
698 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
699 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
701 exgroup = parser.add_mutually_exclusive_group()
702 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
703 default=True, dest="wait")
704 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
705 default=True, dest="wait")
707 exgroup = parser.add_mutually_exclusive_group()
708 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
709 default=True, dest="log_timestamps")
710 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
711 default=True, dest="log_timestamps")
713 parser.add_argument("--api", type=str,
714 default=None, dest="work_api",
715 choices=("jobs", "containers"),
716 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
718 parser.add_argument("--compute-checksum", action="store_true", default=False,
719 help="Compute checksum of contents while collecting outputs",
720 dest="compute_checksum")
722 parser.add_argument("--submit-runner-ram", type=int,
723 help="RAM (in MiB) required for the workflow runner job (default 1024)",
726 parser.add_argument("--submit-runner-image", type=str,
727 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
730 parser.add_argument("--name", type=str,
731 help="Name to use for workflow execution instance.",
734 parser.add_argument("--on-error", type=str,
735 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
736 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
738 parser.add_argument("--enable-dev", action="store_true",
739 help="Enable loading and running development versions "
740 "of CWL spec.", default=False)
742 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
743 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
746 parser.add_argument("--priority", type=int,
747 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
748 default=DEFAULT_PRIORITY)
750 parser.add_argument("--disable-validate", dest="do_validate",
751 action="store_false", default=True,
752 help=argparse.SUPPRESS)
754 parser.add_argument("--disable-js-validation",
755 action="store_true", default=False,
756 help=argparse.SUPPRESS)
758 parser.add_argument("--thread-count", type=int,
759 default=4, help="Number of threads to use for job submit and output collection.")
761 exgroup = parser.add_mutually_exclusive_group()
762 exgroup.add_argument("--trash-intermediate", action="store_true",
763 default=False, dest="trash_intermediate",
764 help="Immediately trash intermediate outputs on workflow success.")
765 exgroup.add_argument("--no-trash-intermediate", action="store_false",
766 default=False, dest="trash_intermediate",
767 help="Do not trash intermediate outputs (default).")
769 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
770 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
775 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
776 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
777 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
778 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
780 cwltool.process.supportedProcessRequirements.extend([
781 "http://arvados.org/cwl#RunInSingleContainer",
782 "http://arvados.org/cwl#OutputDirType",
783 "http://arvados.org/cwl#RuntimeConstraints",
784 "http://arvados.org/cwl#PartitionRequirement",
785 "http://arvados.org/cwl#APIRequirement",
786 "http://commonwl.org/cwltool#LoadListingRequirement",
787 "http://arvados.org/cwl#IntermediateOutput",
788 "http://arvados.org/cwl#ReuseRequirement"
791 def main(args, stdout, stderr, api_client=None, keep_client=None):
792 parser = arg_parser()
794 job_order_object = None
795 arvargs = parser.parse_args(args)
797 if arvargs.update_workflow:
798 if arvargs.update_workflow.find('-7fd4e-') == 5:
799 want_api = 'containers'
800 elif arvargs.update_workflow.find('-p5p6p-') == 5:
804 if want_api and arvargs.work_api and want_api != arvargs.work_api:
805 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
806 arvargs.update_workflow, want_api, arvargs.work_api))
808 arvargs.work_api = want_api
810 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
811 job_order_object = ({}, "")
816 if api_client is None:
817 api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
818 keep_client = api_client.keep
819 if keep_client is None:
820 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
821 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
822 num_retries=4, output_name=arvargs.output_name,
823 output_tags=arvargs.output_tags)
824 except Exception as e:
829 logger.setLevel(logging.DEBUG)
830 logging.getLogger('arvados').setLevel(logging.DEBUG)
833 logger.setLevel(logging.WARN)
834 logging.getLogger('arvados').setLevel(logging.WARN)
835 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
838 metrics.setLevel(logging.DEBUG)
839 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
841 if arvargs.log_timestamps:
842 arvados.log_handler.setFormatter(logging.Formatter(
843 '%(asctime)s %(name)s %(levelname)s: %(message)s',
844 '%Y-%m-%d %H:%M:%S'))
846 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
848 arvargs.conformance_test = None
849 arvargs.use_container = True
850 arvargs.relax_path_checks = True
851 arvargs.print_supported_versions = False
853 make_fs_access = partial(CollectionFsAccess,
854 collection_cache=runner.collection_cache)
856 return cwltool.main.main(args=arvargs,
859 executor=runner.arv_executor,
860 makeTool=runner.arv_make_tool,
861 versionfunc=versionstring,
862 job_order_object=job_order_object,
863 make_fs_access=make_fs_access,
864 fetcher_constructor=partial(CollectionFetcher,
865 api_client=api_client,
866 fs_access=make_fs_access(""),
867 num_retries=runner.num_retries),
868 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
869 logger_handler=arvados.log_handler,
870 custom_schema_callback=add_arv_hints)