2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
25 from cwltool.errors import WorkflowException
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from .util import get_current_container
50 from ._version import __version__
52 from cwltool.pack import pack
53 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
54 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
55 from cwltool.command_line_tool import compute_checksums
57 from arvados.api import OrderedJsonModel
59 logger = logging.getLogger('arvados.cwl-runner')
60 metrics = logging.getLogger('arvados.cwl-runner.metrics')
61 logger.setLevel(logging.INFO)
63 arvados.log_handler.setFormatter(logging.Formatter(
64 '%(asctime)s %(name)s %(levelname)s: %(message)s',
67 DEFAULT_PRIORITY = 500
69 class RuntimeStatusLoggingHandler(logging.Handler):
71 Intercepts logging calls and report them as runtime statuses on runner
74 def __init__(self, runtime_status_update_func):
75 super(RuntimeStatusLoggingHandler, self).__init__()
76 self.runtime_status_update = runtime_status_update_func
78 def emit(self, record):
80 if record.levelno == logging.ERROR:
82 elif record.levelno == logging.WARNING:
84 elif record.levelno == logging.INFO:
87 log_msg = record.getMessage()
89 # If the logged message is multi-line, include it as a detail
90 self.runtime_status_update(
92 "%s from %s (please see details)" % (kind, record.name),
96 self.runtime_status_update(
98 "%s: %s" % (record.name, record.getMessage())
101 class ArvCwlRunner(object):
102 """Execute a CWL tool or workflow, submit work (using either jobs or
103 containers API), wait for them to complete, and report output.
107 def __init__(self, api_client,
114 arvargs = argparse.Namespace()
115 arvargs.work_api = None
116 arvargs.output_name = None
117 arvargs.output_tags = None
118 arvargs.thread_count = 1
120 self.api = api_client
122 self.workflow_eval_lock = threading.Condition(threading.RLock())
123 self.final_output = None
124 self.final_status = None
125 self.num_retries = num_retries
127 self.stop_polling = threading.Event()
130 self.final_output_collection = None
131 self.output_name = arvargs.output_name
132 self.output_tags = arvargs.output_tags
133 self.project_uuid = None
134 self.intermediate_output_ttl = 0
135 self.intermediate_output_collections = []
136 self.trash_intermediate = False
137 self.thread_count = arvargs.thread_count
138 self.poll_interval = 12
139 self.loadingContext = None
141 if keep_client is not None:
142 self.keep_client = keep_client
144 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
146 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
148 self.fetcher_constructor = partial(CollectionFetcher,
150 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
151 num_retries=self.num_retries)
154 expected_api = ["jobs", "containers"]
155 for api in expected_api:
157 methods = self.api._rootDesc.get('resources')[api]['methods']
158 if ('httpMethod' in methods['create'] and
159 (arvargs.work_api == api or arvargs.work_api is None)):
165 if not self.work_api:
166 if arvargs.work_api is None:
167 raise Exception("No supported APIs")
169 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
171 if self.work_api == "jobs":
173 *******************************
174 Using the deprecated 'jobs' API.
176 To get rid of this warning:
178 Users: read about migrating at
179 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
180 and use the option --api=containers
182 Admins: configure the cluster to disable the 'jobs' API as described at:
183 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
184 *******************************""")
186 self.loadingContext = ArvLoadingContext(vars(arvargs))
187 self.loadingContext.fetcher_constructor = self.fetcher_constructor
188 self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
189 self.loadingContext.construct_tool_object = self.arv_make_tool
191 # Add a custom logging handler to the root logger for runtime status reporting
192 # if running inside a container
193 if get_current_container(self.api, self.num_retries, logger):
194 root_logger = logging.getLogger('')
195 handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
196 root_logger.addHandler(handler)
198 def arv_make_tool(self, toolpath_object, loadingContext):
199 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
200 return ArvadosCommandTool(self, toolpath_object, loadingContext)
201 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
202 return ArvadosWorkflow(self, toolpath_object, loadingContext)
204 return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
206 def output_callback(self, out, processStatus):
207 with self.workflow_eval_lock:
208 if processStatus == "success":
209 logger.info("Overall process status is %s", processStatus)
212 logger.error("Overall process status is %s", processStatus)
215 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
216 body={"state": state}).execute(num_retries=self.num_retries)
217 self.final_status = processStatus
218 self.final_output = out
219 self.workflow_eval_lock.notifyAll()
222 def start_run(self, runnable, runtimeContext):
223 self.task_queue.add(partial(runnable.run, runtimeContext))
225 def process_submitted(self, container):
226 with self.workflow_eval_lock:
227 self.processes[container.uuid] = container
229 def process_done(self, uuid, record):
230 with self.workflow_eval_lock:
231 j = self.processes[uuid]
232 logger.info("%s %s is %s", self.label(j), uuid, record["state"])
233 self.task_queue.add(partial(j.done, record))
234 del self.processes[uuid]
236 def runtime_status_update(self, kind, message, detail=None):
238 Updates the runtime_status field on the runner container.
239 Called from a failing child container: records the first child error
240 or updates the error count on subsequent error statuses.
241 Also called from other parts that need to report errros, warnings or just
244 with self.workflow_eval_lock:
245 current = get_current_container(self.api, self.num_retries, logger)
248 runtime_status = current.get('runtime_status', {})
249 # In case of status being an error, only report the first one.
251 if not runtime_status.get('error'):
252 runtime_status.update({
254 'errorDetail': detail or "No error logs available"
256 # Further errors are only mentioned as a count.
258 # Get anything before an optional 'and N more' string.
260 error_msg = re.match(
261 r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
262 more_failures = re.match(
263 r'.*\(and (\d+) more\)', runtime_status.get('error'))
265 # Ignore tests stubbing errors
268 failure_qty = int(more_failures.groups()[0])
269 runtime_status.update({
270 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
273 runtime_status.update({
274 'error': "%s (and 1 more)" % error_msg
276 elif kind in ['warning', 'activity']:
277 # Record the last warning/activity status without regard of
278 # previous occurences.
279 runtime_status.update({
282 if detail is not None:
283 runtime_status.update({
284 kind+"Detail": detail
287 # Ignore any other status kind
290 self.api.containers().update(uuid=current['uuid'],
292 'runtime_status': runtime_status,
293 }).execute(num_retries=self.num_retries)
294 except Exception as e:
295 logger.info("Couldn't update runtime_status: %s", e)
297 def wrapped_callback(self, cb, obj, st):
298 with self.workflow_eval_lock:
300 self.workflow_eval_lock.notifyAll()
302 def get_wrapped_callback(self, cb):
303 return partial(self.wrapped_callback, cb)
305 def on_message(self, event):
306 if event.get("object_uuid") in self.processes and event["event_type"] == "update":
307 uuid = event["object_uuid"]
308 if event["properties"]["new_attributes"]["state"] == "Running":
309 with self.workflow_eval_lock:
310 j = self.processes[uuid]
311 if j.running is False:
313 j.update_pipeline_component(event["properties"]["new_attributes"])
314 logger.info("%s %s is Running", self.label(j), uuid)
315 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
316 self.process_done(uuid, event["properties"]["new_attributes"])
318 def label(self, obj):
319 return "[%s %s]" % (self.work_api[0:-1], obj.name)
321 def poll_states(self):
322 """Poll status of jobs or containers listed in the processes dict.
324 Runs in a separate thread.
328 remain_wait = self.poll_interval
331 self.stop_polling.wait(remain_wait)
332 if self.stop_polling.is_set():
334 with self.workflow_eval_lock:
335 keys = list(self.processes.keys())
337 remain_wait = self.poll_interval
340 begin_poll = time.time()
341 if self.work_api == "containers":
342 table = self.poll_api.container_requests()
343 elif self.work_api == "jobs":
344 table = self.poll_api.jobs()
347 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
348 except Exception as e:
349 logger.warn("Error checking states on API server: %s", e)
350 remain_wait = self.poll_interval
353 for p in proc_states["items"]:
355 "object_uuid": p["uuid"],
356 "event_type": "update",
361 finish_poll = time.time()
362 remain_wait = self.poll_interval - (finish_poll - begin_poll)
364 logger.exception("Fatal error in state polling thread.")
365 with self.workflow_eval_lock:
366 self.processes.clear()
367 self.workflow_eval_lock.notifyAll()
369 self.stop_polling.set()
371 def add_intermediate_output(self, uuid):
373 self.intermediate_output_collections.append(uuid)
375 def trash_intermediate_output(self):
376 logger.info("Cleaning up intermediate output collections")
377 for i in self.intermediate_output_collections:
379 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
381 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
382 if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
385 def check_features(self, obj):
386 if isinstance(obj, dict):
387 if obj.get("writable") and self.work_api != "containers":
388 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
389 if obj.get("class") == "DockerRequirement":
390 if obj.get("dockerOutputDirectory"):
391 if self.work_api != "containers":
392 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
393 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
394 if not obj.get("dockerOutputDirectory").startswith('/'):
395 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
396 "Option 'dockerOutputDirectory' must be an absolute path.")
397 if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
398 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
399 for v in obj.itervalues():
400 self.check_features(v)
401 elif isinstance(obj, list):
402 for i,v in enumerate(obj):
403 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
404 self.check_features(v)
406 def make_output_collection(self, name, storage_classes, tagsString, outputObj):
407 outputObj = copy.deepcopy(outputObj)
410 def capture(fileobj):
411 files.append(fileobj)
413 adjustDirObjs(outputObj, capture)
414 adjustFileObjs(outputObj, capture)
416 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
418 final = arvados.collection.Collection(api_client=self.api,
419 keep_client=self.keep_client,
420 num_retries=self.num_retries)
422 for k,v in generatemapper.items():
423 if k.startswith("_:"):
424 if v.type == "Directory":
426 if v.type == "CreateFile":
427 with final.open(v.target, "wb") as f:
428 f.write(v.resolved.encode("utf-8"))
431 if not k.startswith("keep:"):
432 raise Exception("Output source is not in keep or a literal")
434 srccollection = sp[0][5:]
436 reader = self.collection_cache.get(srccollection)
437 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
438 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
439 except arvados.errors.ArgumentError as e:
440 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
443 logger.warn("While preparing output collection: %s", e)
445 def rewrite(fileobj):
446 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
447 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
451 adjustDirObjs(outputObj, rewrite)
452 adjustFileObjs(outputObj, rewrite)
454 with final.open("cwl.output.json", "w") as f:
455 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
457 final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
459 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
460 final.api_response()["name"],
461 final.manifest_locator())
463 final_uuid = final.manifest_locator()
464 tags = tagsString.split(',')
466 self.api.links().create(body={
467 "head_uuid": final_uuid, "link_class": "tag", "name": tag
468 }).execute(num_retries=self.num_retries)
470 def finalcollection(fileobj):
471 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
473 adjustDirObjs(outputObj, finalcollection)
474 adjustFileObjs(outputObj, finalcollection)
476 return (outputObj, final)
478 def set_crunch_output(self):
479 if self.work_api == "containers":
480 current = get_current_container(self.api, self.num_retries, logger)
484 self.api.containers().update(uuid=current['uuid'],
486 'output': self.final_output_collection.portable_data_hash(),
487 }).execute(num_retries=self.num_retries)
488 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
491 }).execute(num_retries=self.num_retries)
492 except Exception as e:
493 logger.info("Setting container output: %s", e)
494 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
495 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
497 'output': self.final_output_collection.portable_data_hash(),
498 'success': self.final_status == "success",
500 }).execute(num_retries=self.num_retries)
502 def arv_executor(self, tool, job_order, runtimeContext, logger=None):
503 self.debug = runtimeContext.debug
505 tool.visit(self.check_features)
507 self.project_uuid = runtimeContext.project_uuid
509 self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
510 self.secret_store = runtimeContext.secret_store
512 self.trash_intermediate = runtimeContext.trash_intermediate
513 if self.trash_intermediate and self.work_api != "containers":
514 raise Exception("--trash-intermediate is only supported with --api=containers.")
516 self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
517 if self.intermediate_output_ttl and self.work_api != "containers":
518 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
519 if self.intermediate_output_ttl < 0:
520 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
522 if runtimeContext.submit_request_uuid and self.work_api != "containers":
523 raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
525 if not runtimeContext.name:
526 runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
528 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
529 # Also uploads docker images.
530 merged_map = upload_workflow_deps(self, tool)
532 # Reload tool object which may have been updated by
533 # upload_workflow_deps
534 # Don't validate this time because it will just print redundant errors.
535 loadingContext = self.loadingContext.copy()
536 loadingContext.loader = tool.doc_loader
537 loadingContext.avsc_names = tool.doc_schema
538 loadingContext.metadata = tool.metadata
539 loadingContext.do_validate = False
541 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
544 # Upload local file references in the job order.
545 job_order = upload_job_order(self, "%s input" % runtimeContext.name,
548 existing_uuid = runtimeContext.update_workflow
549 if existing_uuid or runtimeContext.create_workflow:
550 # Create a pipeline template or workflow record and exit.
551 if self.work_api == "jobs":
552 tmpl = RunnerTemplate(self, tool, job_order,
553 runtimeContext.enable_reuse,
555 submit_runner_ram=runtimeContext.submit_runner_ram,
556 name=runtimeContext.name,
557 merged_map=merged_map)
559 # cwltool.main will write our return value to stdout.
560 return (tmpl.uuid, "success")
561 elif self.work_api == "containers":
562 return (upload_workflow(self, tool, job_order,
565 submit_runner_ram=runtimeContext.submit_runner_ram,
566 name=runtimeContext.name,
567 merged_map=merged_map),
570 self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
571 self.eval_timeout = runtimeContext.eval_timeout
573 runtimeContext = runtimeContext.copy()
574 runtimeContext.use_container = True
575 runtimeContext.tmpdir_prefix = "tmp"
576 runtimeContext.work_api = self.work_api
578 if self.work_api == "containers":
579 if self.ignore_docker_for_reuse:
580 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
581 runtimeContext.outdir = "/var/spool/cwl"
582 runtimeContext.docker_outdir = "/var/spool/cwl"
583 runtimeContext.tmpdir = "/tmp"
584 runtimeContext.docker_tmpdir = "/tmp"
585 elif self.work_api == "jobs":
586 if runtimeContext.priority != DEFAULT_PRIORITY:
587 raise Exception("--priority not implemented for jobs API.")
588 runtimeContext.outdir = "$(task.outdir)"
589 runtimeContext.docker_outdir = "$(task.outdir)"
590 runtimeContext.tmpdir = "$(task.tmpdir)"
592 if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
593 raise Exception("--priority must be in the range 1..1000.")
596 if runtimeContext.submit:
597 # Submit a runner job to run the workflow for us.
598 if self.work_api == "containers":
599 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
600 runtimeContext.runnerjob = tool.tool["id"]
601 runnerjob = tool.job(job_order,
602 self.output_callback,
603 runtimeContext).next()
605 runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
608 submit_runner_ram=runtimeContext.submit_runner_ram,
609 name=runtimeContext.name,
610 on_error=runtimeContext.on_error,
611 submit_runner_image=runtimeContext.submit_runner_image,
612 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
613 merged_map=merged_map,
614 priority=runtimeContext.priority,
615 secret_store=self.secret_store)
616 elif self.work_api == "jobs":
617 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
620 submit_runner_ram=runtimeContext.submit_runner_ram,
621 name=runtimeContext.name,
622 on_error=runtimeContext.on_error,
623 submit_runner_image=runtimeContext.submit_runner_image,
624 merged_map=merged_map)
625 elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
626 # Create pipeline for local run
627 self.pipeline = self.api.pipeline_instances().create(
629 "owner_uuid": self.project_uuid,
630 "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
632 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
633 logger.info("Pipeline instance %s", self.pipeline["uuid"])
635 if runnerjob and not runtimeContext.wait:
636 submitargs = runtimeContext.copy()
637 submitargs.submit = False
638 runnerjob.run(submitargs)
639 return (runnerjob.uuid, "success")
641 self.poll_api = arvados.api('v1')
642 self.polling_thread = threading.Thread(target=self.poll_states)
643 self.polling_thread.start()
645 self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
648 jobiter = iter((runnerjob,))
650 if runtimeContext.cwl_runner_job is not None:
651 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
652 jobiter = tool.job(job_order,
653 self.output_callback,
657 self.workflow_eval_lock.acquire()
658 # Holds the lock while this code runs and releases it when
659 # it is safe to do so in self.workflow_eval_lock.wait(),
660 # at which point on_message can update job state and
661 # process output callbacks.
663 loopperf = Perf(metrics, "jobiter")
665 for runnable in jobiter:
668 if self.stop_polling.is_set():
671 if self.task_queue.error is not None:
672 raise self.task_queue.error
675 with Perf(metrics, "run"):
676 self.start_run(runnable, runtimeContext)
678 if (self.task_queue.in_flight + len(self.processes)) > 0:
679 self.workflow_eval_lock.wait(3)
681 logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
686 while (self.task_queue.in_flight + len(self.processes)) > 0:
687 if self.task_queue.error is not None:
688 raise self.task_queue.error
689 self.workflow_eval_lock.wait(3)
691 except UnsupportedRequirement:
694 if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
695 logger.error("Interrupted, workflow will be cancelled")
697 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
699 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
700 body={"state": "Failed"}).execute(num_retries=self.num_retries)
701 if runnerjob and runnerjob.uuid and self.work_api == "containers":
702 self.api.container_requests().update(uuid=runnerjob.uuid,
703 body={"priority": "0"}).execute(num_retries=self.num_retries)
705 self.workflow_eval_lock.release()
706 self.task_queue.drain()
707 self.stop_polling.set()
708 self.polling_thread.join()
709 self.task_queue.join()
711 if self.final_status == "UnsupportedRequirement":
712 raise UnsupportedRequirement("Check log for details.")
714 if self.final_output is None:
715 raise WorkflowException("Workflow did not return a result.")
717 if runtimeContext.submit and isinstance(runnerjob, Runner):
718 logger.info("Final output collection %s", runnerjob.final_output)
720 if self.output_name is None:
721 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
722 if self.output_tags is None:
723 self.output_tags = ""
725 storage_classes = runtimeContext.storage_classes.strip().split(",")
726 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
727 self.set_crunch_output()
729 if runtimeContext.compute_checksum:
730 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
731 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
733 if self.trash_intermediate and self.final_status == "success":
734 self.trash_intermediate_output()
736 return (self.final_output, self.final_status)
740 """Print version string of key packages for provenance and debugging."""
742 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
743 arvpkg = pkg_resources.require("arvados-python-client")
744 cwlpkg = pkg_resources.require("cwltool")
746 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
747 "arvados-python-client", arvpkg[0].version,
748 "cwltool", cwlpkg[0].version)
751 def arg_parser(): # type: () -> argparse.ArgumentParser
752 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
754 parser.add_argument("--basedir", type=str,
755 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
756 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
757 help="Output directory, default current directory")
759 parser.add_argument("--eval-timeout",
760 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
764 exgroup = parser.add_mutually_exclusive_group()
765 exgroup.add_argument("--print-dot", action="store_true",
766 help="Print workflow visualization in graphviz format and exit")
767 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
768 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
770 exgroup = parser.add_mutually_exclusive_group()
771 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
772 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
773 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
775 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
777 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
779 exgroup = parser.add_mutually_exclusive_group()
780 exgroup.add_argument("--enable-reuse", action="store_true",
781 default=True, dest="enable_reuse",
782 help="Enable job or container reuse (default)")
783 exgroup.add_argument("--disable-reuse", action="store_false",
784 default=True, dest="enable_reuse",
785 help="Disable job or container reuse")
787 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
788 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
789 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
790 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
791 help="Ignore Docker image version when deciding whether to reuse past jobs.",
794 exgroup = parser.add_mutually_exclusive_group()
795 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
796 default=True, dest="submit")
797 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
798 default=True, dest="submit")
799 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
800 dest="create_workflow")
801 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
802 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
804 exgroup = parser.add_mutually_exclusive_group()
805 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
806 default=True, dest="wait")
807 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
808 default=True, dest="wait")
810 exgroup = parser.add_mutually_exclusive_group()
811 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
812 default=True, dest="log_timestamps")
813 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
814 default=True, dest="log_timestamps")
816 parser.add_argument("--api", type=str,
817 default=None, dest="work_api",
818 choices=("jobs", "containers"),
819 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
821 parser.add_argument("--compute-checksum", action="store_true", default=False,
822 help="Compute checksum of contents while collecting outputs",
823 dest="compute_checksum")
825 parser.add_argument("--submit-runner-ram", type=int,
826 help="RAM (in MiB) required for the workflow runner job (default 1024)",
829 parser.add_argument("--submit-runner-image", type=str,
830 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
833 parser.add_argument("--submit-request-uuid", type=str,
835 help="Update and commit supplied container request instead of creating a new one (containers API only).")
837 parser.add_argument("--name", type=str,
838 help="Name to use for workflow execution instance.",
841 parser.add_argument("--on-error", type=str,
842 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
843 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
845 parser.add_argument("--enable-dev", action="store_true",
846 help="Enable loading and running development versions "
847 "of CWL spec.", default=False)
848 parser.add_argument('--storage-classes', default="default", type=str,
849 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
851 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
852 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
855 parser.add_argument("--priority", type=int,
856 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
857 default=DEFAULT_PRIORITY)
859 parser.add_argument("--disable-validate", dest="do_validate",
860 action="store_false", default=True,
861 help=argparse.SUPPRESS)
863 parser.add_argument("--disable-js-validation",
864 action="store_true", default=False,
865 help=argparse.SUPPRESS)
867 parser.add_argument("--thread-count", type=int,
868 default=4, help="Number of threads to use for job submit and output collection.")
870 exgroup = parser.add_mutually_exclusive_group()
871 exgroup.add_argument("--trash-intermediate", action="store_true",
872 default=False, dest="trash_intermediate",
873 help="Immediately trash intermediate outputs on workflow success.")
874 exgroup.add_argument("--no-trash-intermediate", action="store_false",
875 default=False, dest="trash_intermediate",
876 help="Do not trash intermediate outputs (default).")
878 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
879 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
884 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
885 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
886 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
887 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
889 cwltool.process.supportedProcessRequirements.extend([
890 "http://arvados.org/cwl#RunInSingleContainer",
891 "http://arvados.org/cwl#OutputDirType",
892 "http://arvados.org/cwl#RuntimeConstraints",
893 "http://arvados.org/cwl#PartitionRequirement",
894 "http://arvados.org/cwl#APIRequirement",
895 "http://commonwl.org/cwltool#LoadListingRequirement",
896 "http://arvados.org/cwl#IntermediateOutput",
897 "http://arvados.org/cwl#ReuseRequirement"
900 def exit_signal_handler(sigcode, frame):
901 logger.error("Caught signal {}, exiting.".format(sigcode))
904 def main(args, stdout, stderr, api_client=None, keep_client=None,
905 install_sig_handlers=True):
906 parser = arg_parser()
908 job_order_object = None
909 arvargs = parser.parse_args(args)
911 if len(arvargs.storage_classes.strip().split(',')) > 1:
912 logger.error("Multiple storage classes are not supported currently.")
915 arvargs.use_container = True
916 arvargs.relax_path_checks = True
917 arvargs.print_supported_versions = False
919 if install_sig_handlers:
920 arv_cmd.install_signal_handlers()
922 if arvargs.update_workflow:
923 if arvargs.update_workflow.find('-7fd4e-') == 5:
924 want_api = 'containers'
925 elif arvargs.update_workflow.find('-p5p6p-') == 5:
929 if want_api and arvargs.work_api and want_api != arvargs.work_api:
930 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
931 arvargs.update_workflow, want_api, arvargs.work_api))
933 arvargs.work_api = want_api
935 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
936 job_order_object = ({}, "")
941 if api_client is None:
942 api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
943 keep_client = api_client.keep
944 # Make an API object now so errors are reported early.
945 api_client.users().current().execute()
946 if keep_client is None:
947 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
948 runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
949 except Exception as e:
954 logger.setLevel(logging.DEBUG)
955 logging.getLogger('arvados').setLevel(logging.DEBUG)
958 logger.setLevel(logging.WARN)
959 logging.getLogger('arvados').setLevel(logging.WARN)
960 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
963 metrics.setLevel(logging.DEBUG)
964 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
966 if arvargs.log_timestamps:
967 arvados.log_handler.setFormatter(logging.Formatter(
968 '%(asctime)s %(name)s %(levelname)s: %(message)s',
969 '%Y-%m-%d %H:%M:%S'))
971 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
973 for key, val in cwltool.argparser.get_default_args().items():
974 if not hasattr(arvargs, key):
975 setattr(arvargs, key, val)
977 runtimeContext = ArvRuntimeContext(vars(arvargs))
978 runtimeContext.make_fs_access = partial(CollectionFsAccess,
979 collection_cache=runner.collection_cache)
981 return cwltool.main.main(args=arvargs,
984 executor=runner.arv_executor,
985 versionfunc=versionstring,
986 job_order_object=job_order_object,
987 logger_handler=arvados.log_handler,
988 custom_schema_callback=add_arv_hints,
989 loadingContext=runner.loadingContext,
990 runtimeContext=runtimeContext)