2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
25 from cwltool.errors import WorkflowException
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from ._version import __version__
51 from cwltool.pack import pack
52 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
53 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
54 from cwltool.command_line_tool import compute_checksums
56 from arvados.api import OrderedJsonModel
58 logger = logging.getLogger('arvados.cwl-runner')
59 metrics = logging.getLogger('arvados.cwl-runner.metrics')
60 logger.setLevel(logging.INFO)
62 arvados.log_handler.setFormatter(logging.Formatter(
63 '%(asctime)s %(name)s %(levelname)s: %(message)s',
66 DEFAULT_PRIORITY = 500
68 class ArvCwlRunner(object):
69 """Execute a CWL tool or workflow, submit work (using either jobs or
70 containers API), wait for them to complete, and report output.
74 def __init__(self, api_client,
81 arvargs = argparse.Namespace()
82 arvargs.work_api = None
83 arvargs.output_name = None
84 arvargs.output_tags = None
85 arvargs.thread_count = 1
89 self.workflow_eval_lock = threading.Condition(threading.RLock())
90 self.final_output = None
91 self.final_status = None
92 self.num_retries = num_retries
94 self.stop_polling = threading.Event()
97 self.final_output_collection = None
98 self.output_name = arvargs.output_name
99 self.output_tags = arvargs.output_tags
100 self.project_uuid = None
101 self.intermediate_output_ttl = 0
102 self.intermediate_output_collections = []
103 self.trash_intermediate = False
104 self.thread_count = arvargs.thread_count
105 self.poll_interval = 12
106 self.loadingContext = None
108 if keep_client is not None:
109 self.keep_client = keep_client
111 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
113 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
115 self.fetcher_constructor = partial(CollectionFetcher,
117 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
118 num_retries=self.num_retries)
121 expected_api = ["jobs", "containers"]
122 for api in expected_api:
124 methods = self.api._rootDesc.get('resources')[api]['methods']
125 if ('httpMethod' in methods['create'] and
126 (arvargs.work_api == api or arvargs.work_api is None)):
132 if not self.work_api:
133 if arvargs.work_api is None:
134 raise Exception("No supported APIs")
136 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
138 if self.work_api == "jobs":
140 *******************************
141 Using the deprecated 'jobs' API.
143 To get rid of this warning:
145 Users: read about migrating at
146 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
147 and use the option --api=containers
149 Admins: configure the cluster to disable the 'jobs' API as described at:
150 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
151 *******************************""")
153 self.loadingContext = ArvLoadingContext(vars(arvargs))
154 self.loadingContext.fetcher_constructor = self.fetcher_constructor
155 self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
156 self.loadingContext.construct_tool_object = self.arv_make_tool
159 def arv_make_tool(self, toolpath_object, loadingContext):
160 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
161 return ArvadosCommandTool(self, toolpath_object, loadingContext)
162 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
163 return ArvadosWorkflow(self, toolpath_object, loadingContext)
165 return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
167 def output_callback(self, out, processStatus):
168 with self.workflow_eval_lock:
169 if processStatus == "success":
170 logger.info("Overall process status is %s", processStatus)
173 logger.error("Overall process status is %s", processStatus)
176 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
177 body={"state": state}).execute(num_retries=self.num_retries)
178 self.final_status = processStatus
179 self.final_output = out
180 self.workflow_eval_lock.notifyAll()
183 def start_run(self, runnable, runtimeContext):
184 self.task_queue.add(partial(runnable.run, runtimeContext))
186 def process_submitted(self, container):
187 with self.workflow_eval_lock:
188 self.processes[container.uuid] = container
190 def process_done(self, uuid, record):
191 with self.workflow_eval_lock:
192 j = self.processes[uuid]
193 logger.info("%s %s is %s", self.label(j), uuid, record["state"])
194 self.task_queue.add(partial(j.done, record))
195 del self.processes[uuid]
197 def runtime_status_update(self, kind, message, detail=None):
199 Updates the runtime_status field on the runner container.
200 Called from a failing child container: records the first child error
201 or updates the error count on subsequent error statuses.
202 Also called from other parts that need to report errros, warnings or just
205 with self.workflow_eval_lock:
207 current = self.api.containers().current().execute(num_retries=self.num_retries)
208 except ApiError as e:
209 # Status code 404 just means we're not running in a container.
210 if e.resp.status != 404:
211 logger.info("Getting current container: %s", e)
213 runtime_status = current.get('runtime_status', {})
214 # In case of status being an error, only report the first one.
216 if not runtime_status.get('error'):
217 runtime_status.update({
219 'errorDetail': detail or "No error logs available"
221 # Further errors are only mentioned as a count.
223 # Get anything before an optional 'and N more' string.
224 error_msg = re.match(
225 r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
226 more_failures = re.match(
227 r'.*\(and (\d+) more\)', runtime_status.get('error'))
229 failure_qty = int(more_failures.groups()[0])
230 runtime_status.update({
231 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
234 runtime_status.update({
235 'error': "%s (and 1 more)" % error_msg
237 elif kind in ['warning', 'activity']:
238 # Record the last warning/activity status without regard of
239 # previous occurences.
240 runtime_status.update({
243 if detail is not None:
244 runtime_status.update({
245 kind+"Detail": detail
248 # Ignore any other status kind
251 self.api.containers().update(uuid=current['uuid'],
253 'runtime_status': runtime_status,
254 }).execute(num_retries=self.num_retries)
255 except Exception as e:
256 logger.info("Couldn't update runtime_status: %s", e)
258 def wrapped_callback(self, cb, obj, st):
259 with self.workflow_eval_lock:
261 self.workflow_eval_lock.notifyAll()
263 def get_wrapped_callback(self, cb):
264 return partial(self.wrapped_callback, cb)
266 def on_message(self, event):
267 if event.get("object_uuid") in self.processes and event["event_type"] == "update":
268 uuid = event["object_uuid"]
269 if event["properties"]["new_attributes"]["state"] == "Running":
270 with self.workflow_eval_lock:
271 j = self.processes[uuid]
272 if j.running is False:
274 j.update_pipeline_component(event["properties"]["new_attributes"])
275 logger.info("%s %s is Running", self.label(j), uuid)
276 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
277 self.process_done(uuid, event["properties"]["new_attributes"])
279 def label(self, obj):
280 return "[%s %s]" % (self.work_api[0:-1], obj.name)
282 def poll_states(self):
283 """Poll status of jobs or containers listed in the processes dict.
285 Runs in a separate thread.
289 remain_wait = self.poll_interval
292 self.stop_polling.wait(remain_wait)
293 if self.stop_polling.is_set():
295 with self.workflow_eval_lock:
296 keys = list(self.processes.keys())
298 remain_wait = self.poll_interval
301 begin_poll = time.time()
302 if self.work_api == "containers":
303 table = self.poll_api.container_requests()
304 elif self.work_api == "jobs":
305 table = self.poll_api.jobs()
308 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
309 except Exception as e:
310 logger.warn("Error checking states on API server: %s", e)
311 remain_wait = self.poll_interval
314 for p in proc_states["items"]:
316 "object_uuid": p["uuid"],
317 "event_type": "update",
322 finish_poll = time.time()
323 remain_wait = self.poll_interval - (finish_poll - begin_poll)
325 logger.exception("Fatal error in state polling thread.")
326 with self.workflow_eval_lock:
327 self.processes.clear()
328 self.workflow_eval_lock.notifyAll()
330 self.stop_polling.set()
332 def add_intermediate_output(self, uuid):
334 self.intermediate_output_collections.append(uuid)
336 def trash_intermediate_output(self):
337 logger.info("Cleaning up intermediate output collections")
338 for i in self.intermediate_output_collections:
340 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
342 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
343 if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
346 def check_features(self, obj):
347 if isinstance(obj, dict):
348 if obj.get("writable") and self.work_api != "containers":
349 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
350 if obj.get("class") == "DockerRequirement":
351 if obj.get("dockerOutputDirectory"):
352 if self.work_api != "containers":
353 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
354 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
355 if not obj.get("dockerOutputDirectory").startswith('/'):
356 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
357 "Option 'dockerOutputDirectory' must be an absolute path.")
358 if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
359 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
360 for v in obj.itervalues():
361 self.check_features(v)
362 elif isinstance(obj, list):
363 for i,v in enumerate(obj):
364 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
365 self.check_features(v)
367 def make_output_collection(self, name, storage_classes, tagsString, outputObj):
368 outputObj = copy.deepcopy(outputObj)
371 def capture(fileobj):
372 files.append(fileobj)
374 adjustDirObjs(outputObj, capture)
375 adjustFileObjs(outputObj, capture)
377 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
379 final = arvados.collection.Collection(api_client=self.api,
380 keep_client=self.keep_client,
381 num_retries=self.num_retries)
383 for k,v in generatemapper.items():
384 if k.startswith("_:"):
385 if v.type == "Directory":
387 if v.type == "CreateFile":
388 with final.open(v.target, "wb") as f:
389 f.write(v.resolved.encode("utf-8"))
392 if not k.startswith("keep:"):
393 raise Exception("Output source is not in keep or a literal")
395 srccollection = sp[0][5:]
397 reader = self.collection_cache.get(srccollection)
398 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
399 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
400 except arvados.errors.ArgumentError as e:
401 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
404 logger.warn("While preparing output collection: %s", e)
406 def rewrite(fileobj):
407 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
408 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
412 adjustDirObjs(outputObj, rewrite)
413 adjustFileObjs(outputObj, rewrite)
415 with final.open("cwl.output.json", "w") as f:
416 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
418 final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
420 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
421 final.api_response()["name"],
422 final.manifest_locator())
424 final_uuid = final.manifest_locator()
425 tags = tagsString.split(',')
427 self.api.links().create(body={
428 "head_uuid": final_uuid, "link_class": "tag", "name": tag
429 }).execute(num_retries=self.num_retries)
431 def finalcollection(fileobj):
432 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
434 adjustDirObjs(outputObj, finalcollection)
435 adjustFileObjs(outputObj, finalcollection)
437 return (outputObj, final)
439 def set_crunch_output(self):
440 if self.work_api == "containers":
442 current = self.api.containers().current().execute(num_retries=self.num_retries)
443 except ApiError as e:
444 # Status code 404 just means we're not running in a container.
445 if e.resp.status != 404:
446 logger.info("Getting current container: %s", e)
449 self.api.containers().update(uuid=current['uuid'],
451 'output': self.final_output_collection.portable_data_hash(),
452 }).execute(num_retries=self.num_retries)
453 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
456 }).execute(num_retries=self.num_retries)
457 except Exception as e:
458 logger.info("Setting container output: %s", e)
459 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
460 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
462 'output': self.final_output_collection.portable_data_hash(),
463 'success': self.final_status == "success",
465 }).execute(num_retries=self.num_retries)
467 def arv_executor(self, tool, job_order, runtimeContext, logger=None):
468 self.debug = runtimeContext.debug
470 tool.visit(self.check_features)
472 self.project_uuid = runtimeContext.project_uuid
474 self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
475 self.secret_store = runtimeContext.secret_store
477 self.trash_intermediate = runtimeContext.trash_intermediate
478 if self.trash_intermediate and self.work_api != "containers":
479 raise Exception("--trash-intermediate is only supported with --api=containers.")
481 self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
482 if self.intermediate_output_ttl and self.work_api != "containers":
483 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
484 if self.intermediate_output_ttl < 0:
485 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
487 if runtimeContext.submit_request_uuid and self.work_api != "containers":
488 raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
490 if not runtimeContext.name:
491 runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
493 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
494 # Also uploads docker images.
495 merged_map = upload_workflow_deps(self, tool)
497 # Reload tool object which may have been updated by
498 # upload_workflow_deps
499 # Don't validate this time because it will just print redundant errors.
500 loadingContext = self.loadingContext.copy()
501 loadingContext.loader = tool.doc_loader
502 loadingContext.avsc_names = tool.doc_schema
503 loadingContext.metadata = tool.metadata
504 loadingContext.do_validate = False
506 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
509 # Upload local file references in the job order.
510 job_order = upload_job_order(self, "%s input" % runtimeContext.name,
513 existing_uuid = runtimeContext.update_workflow
514 if existing_uuid or runtimeContext.create_workflow:
515 # Create a pipeline template or workflow record and exit.
516 if self.work_api == "jobs":
517 tmpl = RunnerTemplate(self, tool, job_order,
518 runtimeContext.enable_reuse,
520 submit_runner_ram=runtimeContext.submit_runner_ram,
521 name=runtimeContext.name,
522 merged_map=merged_map)
524 # cwltool.main will write our return value to stdout.
525 return (tmpl.uuid, "success")
526 elif self.work_api == "containers":
527 return (upload_workflow(self, tool, job_order,
530 submit_runner_ram=runtimeContext.submit_runner_ram,
531 name=runtimeContext.name,
532 merged_map=merged_map),
535 self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
536 self.eval_timeout = runtimeContext.eval_timeout
538 runtimeContext = runtimeContext.copy()
539 runtimeContext.use_container = True
540 runtimeContext.tmpdir_prefix = "tmp"
541 runtimeContext.work_api = self.work_api
543 if self.work_api == "containers":
544 if self.ignore_docker_for_reuse:
545 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
546 runtimeContext.outdir = "/var/spool/cwl"
547 runtimeContext.docker_outdir = "/var/spool/cwl"
548 runtimeContext.tmpdir = "/tmp"
549 runtimeContext.docker_tmpdir = "/tmp"
550 elif self.work_api == "jobs":
551 if runtimeContext.priority != DEFAULT_PRIORITY:
552 raise Exception("--priority not implemented for jobs API.")
553 runtimeContext.outdir = "$(task.outdir)"
554 runtimeContext.docker_outdir = "$(task.outdir)"
555 runtimeContext.tmpdir = "$(task.tmpdir)"
557 if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
558 raise Exception("--priority must be in the range 1..1000.")
561 if runtimeContext.submit:
562 # Submit a runner job to run the workflow for us.
563 if self.work_api == "containers":
564 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
565 runtimeContext.runnerjob = tool.tool["id"]
566 runnerjob = tool.job(job_order,
567 self.output_callback,
568 runtimeContext).next()
570 runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
573 submit_runner_ram=runtimeContext.submit_runner_ram,
574 name=runtimeContext.name,
575 on_error=runtimeContext.on_error,
576 submit_runner_image=runtimeContext.submit_runner_image,
577 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
578 merged_map=merged_map,
579 priority=runtimeContext.priority,
580 secret_store=self.secret_store)
581 elif self.work_api == "jobs":
582 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
585 submit_runner_ram=runtimeContext.submit_runner_ram,
586 name=runtimeContext.name,
587 on_error=runtimeContext.on_error,
588 submit_runner_image=runtimeContext.submit_runner_image,
589 merged_map=merged_map)
590 elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
591 # Create pipeline for local run
592 self.pipeline = self.api.pipeline_instances().create(
594 "owner_uuid": self.project_uuid,
595 "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
597 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
598 logger.info("Pipeline instance %s", self.pipeline["uuid"])
600 if runnerjob and not runtimeContext.wait:
601 submitargs = runtimeContext.copy()
602 submitargs.submit = False
603 runnerjob.run(submitargs)
604 return (runnerjob.uuid, "success")
606 self.poll_api = arvados.api('v1')
607 self.polling_thread = threading.Thread(target=self.poll_states)
608 self.polling_thread.start()
610 self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
613 jobiter = iter((runnerjob,))
615 if runtimeContext.cwl_runner_job is not None:
616 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
617 jobiter = tool.job(job_order,
618 self.output_callback,
622 self.workflow_eval_lock.acquire()
623 # Holds the lock while this code runs and releases it when
624 # it is safe to do so in self.workflow_eval_lock.wait(),
625 # at which point on_message can update job state and
626 # process output callbacks.
628 loopperf = Perf(metrics, "jobiter")
630 for runnable in jobiter:
633 if self.stop_polling.is_set():
636 if self.task_queue.error is not None:
637 raise self.task_queue.error
640 with Perf(metrics, "run"):
641 self.start_run(runnable, runtimeContext)
643 if (self.task_queue.in_flight + len(self.processes)) > 0:
644 self.workflow_eval_lock.wait(3)
646 logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
651 while (self.task_queue.in_flight + len(self.processes)) > 0:
652 if self.task_queue.error is not None:
653 raise self.task_queue.error
654 self.workflow_eval_lock.wait(3)
656 except UnsupportedRequirement:
659 if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
660 logger.error("Interrupted, workflow will be cancelled")
662 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
664 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
665 body={"state": "Failed"}).execute(num_retries=self.num_retries)
666 if runnerjob and runnerjob.uuid and self.work_api == "containers":
667 self.api.container_requests().update(uuid=runnerjob.uuid,
668 body={"priority": "0"}).execute(num_retries=self.num_retries)
670 self.workflow_eval_lock.release()
671 self.task_queue.drain()
672 self.stop_polling.set()
673 self.polling_thread.join()
674 self.task_queue.join()
676 if self.final_status == "UnsupportedRequirement":
677 raise UnsupportedRequirement("Check log for details.")
679 if self.final_output is None:
680 raise WorkflowException("Workflow did not return a result.")
682 if runtimeContext.submit and isinstance(runnerjob, Runner):
683 logger.info("Final output collection %s", runnerjob.final_output)
685 if self.output_name is None:
686 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
687 if self.output_tags is None:
688 self.output_tags = ""
690 storage_classes = runtimeContext.storage_classes.strip().split(",")
691 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
692 self.set_crunch_output()
694 if runtimeContext.compute_checksum:
695 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
696 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
698 if self.trash_intermediate and self.final_status == "success":
699 self.trash_intermediate_output()
701 return (self.final_output, self.final_status)
705 """Print version string of key packages for provenance and debugging."""
707 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
708 arvpkg = pkg_resources.require("arvados-python-client")
709 cwlpkg = pkg_resources.require("cwltool")
711 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
712 "arvados-python-client", arvpkg[0].version,
713 "cwltool", cwlpkg[0].version)
716 def arg_parser(): # type: () -> argparse.ArgumentParser
717 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
719 parser.add_argument("--basedir", type=str,
720 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
721 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
722 help="Output directory, default current directory")
724 parser.add_argument("--eval-timeout",
725 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
729 exgroup = parser.add_mutually_exclusive_group()
730 exgroup.add_argument("--print-dot", action="store_true",
731 help="Print workflow visualization in graphviz format and exit")
732 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
733 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
735 exgroup = parser.add_mutually_exclusive_group()
736 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
737 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
738 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
740 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
742 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
744 exgroup = parser.add_mutually_exclusive_group()
745 exgroup.add_argument("--enable-reuse", action="store_true",
746 default=True, dest="enable_reuse",
747 help="Enable job or container reuse (default)")
748 exgroup.add_argument("--disable-reuse", action="store_false",
749 default=True, dest="enable_reuse",
750 help="Disable job or container reuse")
752 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
753 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
754 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
755 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
756 help="Ignore Docker image version when deciding whether to reuse past jobs.",
759 exgroup = parser.add_mutually_exclusive_group()
760 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
761 default=True, dest="submit")
762 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
763 default=True, dest="submit")
764 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
765 dest="create_workflow")
766 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
767 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
769 exgroup = parser.add_mutually_exclusive_group()
770 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
771 default=True, dest="wait")
772 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
773 default=True, dest="wait")
775 exgroup = parser.add_mutually_exclusive_group()
776 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
777 default=True, dest="log_timestamps")
778 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
779 default=True, dest="log_timestamps")
781 parser.add_argument("--api", type=str,
782 default=None, dest="work_api",
783 choices=("jobs", "containers"),
784 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
786 parser.add_argument("--compute-checksum", action="store_true", default=False,
787 help="Compute checksum of contents while collecting outputs",
788 dest="compute_checksum")
790 parser.add_argument("--submit-runner-ram", type=int,
791 help="RAM (in MiB) required for the workflow runner job (default 1024)",
794 parser.add_argument("--submit-runner-image", type=str,
795 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
798 parser.add_argument("--submit-request-uuid", type=str,
800 help="Update and commit supplied container request instead of creating a new one (containers API only).")
802 parser.add_argument("--name", type=str,
803 help="Name to use for workflow execution instance.",
806 parser.add_argument("--on-error", type=str,
807 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
808 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
810 parser.add_argument("--enable-dev", action="store_true",
811 help="Enable loading and running development versions "
812 "of CWL spec.", default=False)
813 parser.add_argument('--storage-classes', default="default", type=str,
814 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
816 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
817 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
820 parser.add_argument("--priority", type=int,
821 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
822 default=DEFAULT_PRIORITY)
824 parser.add_argument("--disable-validate", dest="do_validate",
825 action="store_false", default=True,
826 help=argparse.SUPPRESS)
828 parser.add_argument("--disable-js-validation",
829 action="store_true", default=False,
830 help=argparse.SUPPRESS)
832 parser.add_argument("--thread-count", type=int,
833 default=4, help="Number of threads to use for job submit and output collection.")
835 exgroup = parser.add_mutually_exclusive_group()
836 exgroup.add_argument("--trash-intermediate", action="store_true",
837 default=False, dest="trash_intermediate",
838 help="Immediately trash intermediate outputs on workflow success.")
839 exgroup.add_argument("--no-trash-intermediate", action="store_false",
840 default=False, dest="trash_intermediate",
841 help="Do not trash intermediate outputs (default).")
843 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
844 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
849 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
850 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
851 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
852 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
854 cwltool.process.supportedProcessRequirements.extend([
855 "http://arvados.org/cwl#RunInSingleContainer",
856 "http://arvados.org/cwl#OutputDirType",
857 "http://arvados.org/cwl#RuntimeConstraints",
858 "http://arvados.org/cwl#PartitionRequirement",
859 "http://arvados.org/cwl#APIRequirement",
860 "http://commonwl.org/cwltool#LoadListingRequirement",
861 "http://arvados.org/cwl#IntermediateOutput",
862 "http://arvados.org/cwl#ReuseRequirement"
865 def exit_signal_handler(sigcode, frame):
866 logger.error("Caught signal {}, exiting.".format(sigcode))
869 def main(args, stdout, stderr, api_client=None, keep_client=None,
870 install_sig_handlers=True):
871 parser = arg_parser()
873 job_order_object = None
874 arvargs = parser.parse_args(args)
876 if len(arvargs.storage_classes.strip().split(',')) > 1:
877 logger.error("Multiple storage classes are not supported currently.")
880 arvargs.use_container = True
881 arvargs.relax_path_checks = True
882 arvargs.print_supported_versions = False
884 if install_sig_handlers:
885 arv_cmd.install_signal_handlers()
887 if arvargs.update_workflow:
888 if arvargs.update_workflow.find('-7fd4e-') == 5:
889 want_api = 'containers'
890 elif arvargs.update_workflow.find('-p5p6p-') == 5:
894 if want_api and arvargs.work_api and want_api != arvargs.work_api:
895 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
896 arvargs.update_workflow, want_api, arvargs.work_api))
898 arvargs.work_api = want_api
900 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
901 job_order_object = ({}, "")
906 if api_client is None:
907 api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
908 keep_client = api_client.keep
909 # Make an API object now so errors are reported early.
910 api_client.users().current().execute()
911 if keep_client is None:
912 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
913 runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
914 except Exception as e:
919 logger.setLevel(logging.DEBUG)
920 logging.getLogger('arvados').setLevel(logging.DEBUG)
923 logger.setLevel(logging.WARN)
924 logging.getLogger('arvados').setLevel(logging.WARN)
925 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
928 metrics.setLevel(logging.DEBUG)
929 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
931 if arvargs.log_timestamps:
932 arvados.log_handler.setFormatter(logging.Formatter(
933 '%(asctime)s %(name)s %(levelname)s: %(message)s',
934 '%Y-%m-%d %H:%M:%S'))
936 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
938 for key, val in cwltool.argparser.get_default_args().items():
939 if not hasattr(arvargs, key):
940 setattr(arvargs, key, val)
942 runtimeContext = ArvRuntimeContext(vars(arvargs))
943 runtimeContext.make_fs_access = partial(CollectionFsAccess,
944 collection_cache=runner.collection_cache)
946 return cwltool.main.main(args=arvargs,
949 executor=runner.arv_executor,
950 versionfunc=versionstring,
951 job_order_object=job_order_object,
952 logger_handler=arvados.log_handler,
953 custom_schema_callback=add_arv_hints,
954 loadingContext=runner.loadingContext,
955 runtimeContext=runtimeContext)