2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
25 from cwltool.errors import WorkflowException
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from .util import get_current_container
50 from ._version import __version__
52 from cwltool.pack import pack
53 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
54 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
55 from cwltool.command_line_tool import compute_checksums
57 from arvados.api import OrderedJsonModel
59 logger = logging.getLogger('arvados.cwl-runner')
60 metrics = logging.getLogger('arvados.cwl-runner.metrics')
61 logger.setLevel(logging.INFO)
63 arvados.log_handler.setFormatter(logging.Formatter(
64 '%(asctime)s %(name)s %(levelname)s: %(message)s',
67 DEFAULT_PRIORITY = 500
69 class RuntimeStatusLoggingHandler(logging.Handler):
71 Intercepts logging calls and report them as runtime statuses on runner
74 def __init__(self, runtime_status_update_func):
75 super(RuntimeStatusLoggingHandler, self).__init__()
76 self.runtime_status_update = runtime_status_update_func
78 def emit(self, record):
80 if record.levelno == logging.ERROR:
82 elif record.levelno == logging.WARNING:
85 log_msg = record.getMessage()
87 # If the logged message is multi-line, use its first line as status
88 # and the rest as detail.
89 status, detail = log_msg.split('\n', 1)
90 self.runtime_status_update(
92 "%s: %s" % (record.name, status),
96 self.runtime_status_update(
98 "%s: %s" % (record.name, record.getMessage())
101 class ArvCwlRunner(object):
102 """Execute a CWL tool or workflow, submit work (using either jobs or
103 containers API), wait for them to complete, and report output.
107 def __init__(self, api_client,
114 arvargs = argparse.Namespace()
115 arvargs.work_api = None
116 arvargs.output_name = None
117 arvargs.output_tags = None
118 arvargs.thread_count = 1
120 self.api = api_client
122 self.workflow_eval_lock = threading.Condition(threading.RLock())
123 self.final_output = None
124 self.final_status = None
125 self.num_retries = num_retries
127 self.stop_polling = threading.Event()
130 self.final_output_collection = None
131 self.output_name = arvargs.output_name
132 self.output_tags = arvargs.output_tags
133 self.project_uuid = None
134 self.intermediate_output_ttl = 0
135 self.intermediate_output_collections = []
136 self.trash_intermediate = False
137 self.thread_count = arvargs.thread_count
138 self.poll_interval = 12
139 self.loadingContext = None
141 if keep_client is not None:
142 self.keep_client = keep_client
144 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
146 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
148 self.fetcher_constructor = partial(CollectionFetcher,
150 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
151 num_retries=self.num_retries)
154 expected_api = ["jobs", "containers"]
155 for api in expected_api:
157 methods = self.api._rootDesc.get('resources')[api]['methods']
158 if ('httpMethod' in methods['create'] and
159 (arvargs.work_api == api or arvargs.work_api is None)):
165 if not self.work_api:
166 if arvargs.work_api is None:
167 raise Exception("No supported APIs")
169 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
171 if self.work_api == "jobs":
173 *******************************
174 Using the deprecated 'jobs' API.
176 To get rid of this warning:
178 Users: read about migrating at
179 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
180 and use the option --api=containers
182 Admins: configure the cluster to disable the 'jobs' API as described at:
183 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
184 *******************************""")
186 self.loadingContext = ArvLoadingContext(vars(arvargs))
187 self.loadingContext.fetcher_constructor = self.fetcher_constructor
188 self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
189 self.loadingContext.construct_tool_object = self.arv_make_tool
191 # Add a custom logging handler to the root logger for runtime status reporting
192 # if running inside a container
193 if get_current_container(self.api, self.num_retries, logger):
194 root_logger = logging.getLogger('')
195 handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
196 root_logger.addHandler(handler)
198 def arv_make_tool(self, toolpath_object, loadingContext):
199 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
200 return ArvadosCommandTool(self, toolpath_object, loadingContext)
201 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
202 return ArvadosWorkflow(self, toolpath_object, loadingContext)
204 return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
206 def output_callback(self, out, processStatus):
207 with self.workflow_eval_lock:
208 if processStatus == "success":
209 logger.info("Overall process status is %s", processStatus)
212 logger.error("Overall process status is %s", processStatus)
215 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
216 body={"state": state}).execute(num_retries=self.num_retries)
217 self.final_status = processStatus
218 self.final_output = out
219 self.workflow_eval_lock.notifyAll()
222 def start_run(self, runnable, runtimeContext):
223 self.task_queue.add(partial(runnable.run, runtimeContext))
225 def process_submitted(self, container):
226 with self.workflow_eval_lock:
227 self.processes[container.uuid] = container
229 def process_done(self, uuid, record):
230 with self.workflow_eval_lock:
231 j = self.processes[uuid]
232 logger.info("%s %s is %s", self.label(j), uuid, record["state"])
233 self.task_queue.add(partial(j.done, record))
234 del self.processes[uuid]
236 def runtime_status_update(self, kind, message, detail=None):
238 Updates the runtime_status field on the runner container.
239 Called from a failing child container: records the first child error
240 or updates the error count on subsequent error statuses.
241 Also called from other parts that need to report errros, warnings or just
244 with self.workflow_eval_lock:
245 current = get_current_container(self.api, self.num_retries, logger)
248 runtime_status = current.get('runtime_status', {})
249 # In case of status being an error, only report the first one.
251 if not runtime_status.get('error'):
252 runtime_status.update({
255 if detail is not None:
256 runtime_status.update({
257 'errorDetail': detail
259 # Further errors are only mentioned as a count.
261 # Get anything before an optional 'and N more' string.
263 error_msg = re.match(
264 r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
265 more_failures = re.match(
266 r'.*\(and (\d+) more\)', runtime_status.get('error'))
268 # Ignore tests stubbing errors
271 failure_qty = int(more_failures.groups()[0])
272 runtime_status.update({
273 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
276 runtime_status.update({
277 'error': "%s (and 1 more)" % error_msg
279 elif kind in ['warning', 'activity']:
280 # Record the last warning/activity status without regard of
281 # previous occurences.
282 runtime_status.update({
285 if detail is not None:
286 runtime_status.update({
287 kind+"Detail": detail
290 # Ignore any other status kind
293 self.api.containers().update(uuid=current['uuid'],
295 'runtime_status': runtime_status,
296 }).execute(num_retries=self.num_retries)
297 except Exception as e:
298 logger.info("Couldn't update runtime_status: %s", e)
300 def wrapped_callback(self, cb, obj, st):
301 with self.workflow_eval_lock:
303 self.workflow_eval_lock.notifyAll()
305 def get_wrapped_callback(self, cb):
306 return partial(self.wrapped_callback, cb)
308 def on_message(self, event):
309 if event.get("object_uuid") in self.processes and event["event_type"] == "update":
310 uuid = event["object_uuid"]
311 if event["properties"]["new_attributes"]["state"] == "Running":
312 with self.workflow_eval_lock:
313 j = self.processes[uuid]
314 if j.running is False:
316 j.update_pipeline_component(event["properties"]["new_attributes"])
317 logger.info("%s %s is Running", self.label(j), uuid)
318 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
319 self.process_done(uuid, event["properties"]["new_attributes"])
321 def label(self, obj):
322 return "[%s %s]" % (self.work_api[0:-1], obj.name)
324 def poll_states(self):
325 """Poll status of jobs or containers listed in the processes dict.
327 Runs in a separate thread.
331 remain_wait = self.poll_interval
334 self.stop_polling.wait(remain_wait)
335 if self.stop_polling.is_set():
337 with self.workflow_eval_lock:
338 keys = list(self.processes.keys())
340 remain_wait = self.poll_interval
343 begin_poll = time.time()
344 if self.work_api == "containers":
345 table = self.poll_api.container_requests()
346 elif self.work_api == "jobs":
347 table = self.poll_api.jobs()
350 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
351 except Exception as e:
352 logger.warn("Error checking states on API server: %s", e)
353 remain_wait = self.poll_interval
356 for p in proc_states["items"]:
358 "object_uuid": p["uuid"],
359 "event_type": "update",
364 finish_poll = time.time()
365 remain_wait = self.poll_interval - (finish_poll - begin_poll)
367 logger.exception("Fatal error in state polling thread.")
368 with self.workflow_eval_lock:
369 self.processes.clear()
370 self.workflow_eval_lock.notifyAll()
372 self.stop_polling.set()
374 def add_intermediate_output(self, uuid):
376 self.intermediate_output_collections.append(uuid)
378 def trash_intermediate_output(self):
379 logger.info("Cleaning up intermediate output collections")
380 for i in self.intermediate_output_collections:
382 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
384 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
385 if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
388 def check_features(self, obj):
389 if isinstance(obj, dict):
390 if obj.get("writable") and self.work_api != "containers":
391 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
392 if obj.get("class") == "DockerRequirement":
393 if obj.get("dockerOutputDirectory"):
394 if self.work_api != "containers":
395 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
396 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
397 if not obj.get("dockerOutputDirectory").startswith('/'):
398 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
399 "Option 'dockerOutputDirectory' must be an absolute path.")
400 if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
401 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
402 for v in obj.itervalues():
403 self.check_features(v)
404 elif isinstance(obj, list):
405 for i,v in enumerate(obj):
406 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
407 self.check_features(v)
409 def make_output_collection(self, name, storage_classes, tagsString, outputObj):
410 outputObj = copy.deepcopy(outputObj)
413 def capture(fileobj):
414 files.append(fileobj)
416 adjustDirObjs(outputObj, capture)
417 adjustFileObjs(outputObj, capture)
419 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
421 final = arvados.collection.Collection(api_client=self.api,
422 keep_client=self.keep_client,
423 num_retries=self.num_retries)
425 for k,v in generatemapper.items():
426 if k.startswith("_:"):
427 if v.type == "Directory":
429 if v.type == "CreateFile":
430 with final.open(v.target, "wb") as f:
431 f.write(v.resolved.encode("utf-8"))
434 if not k.startswith("keep:"):
435 raise Exception("Output source is not in keep or a literal")
437 srccollection = sp[0][5:]
439 reader = self.collection_cache.get(srccollection)
440 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
441 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
442 except arvados.errors.ArgumentError as e:
443 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
446 logger.warn("While preparing output collection: %s", e)
448 def rewrite(fileobj):
449 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
450 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
454 adjustDirObjs(outputObj, rewrite)
455 adjustFileObjs(outputObj, rewrite)
457 with final.open("cwl.output.json", "w") as f:
458 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
460 final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
462 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
463 final.api_response()["name"],
464 final.manifest_locator())
466 final_uuid = final.manifest_locator()
467 tags = tagsString.split(',')
469 self.api.links().create(body={
470 "head_uuid": final_uuid, "link_class": "tag", "name": tag
471 }).execute(num_retries=self.num_retries)
473 def finalcollection(fileobj):
474 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
476 adjustDirObjs(outputObj, finalcollection)
477 adjustFileObjs(outputObj, finalcollection)
479 return (outputObj, final)
481 def set_crunch_output(self):
482 if self.work_api == "containers":
483 current = get_current_container(self.api, self.num_retries, logger)
487 self.api.containers().update(uuid=current['uuid'],
489 'output': self.final_output_collection.portable_data_hash(),
490 }).execute(num_retries=self.num_retries)
491 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
494 }).execute(num_retries=self.num_retries)
495 except Exception as e:
496 logger.info("Setting container output: %s", e)
497 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
498 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
500 'output': self.final_output_collection.portable_data_hash(),
501 'success': self.final_status == "success",
503 }).execute(num_retries=self.num_retries)
505 def arv_executor(self, tool, job_order, runtimeContext, logger=None):
506 self.debug = runtimeContext.debug
508 tool.visit(self.check_features)
510 self.project_uuid = runtimeContext.project_uuid
512 self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
513 self.secret_store = runtimeContext.secret_store
515 self.trash_intermediate = runtimeContext.trash_intermediate
516 if self.trash_intermediate and self.work_api != "containers":
517 raise Exception("--trash-intermediate is only supported with --api=containers.")
519 self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
520 if self.intermediate_output_ttl and self.work_api != "containers":
521 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
522 if self.intermediate_output_ttl < 0:
523 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
525 if runtimeContext.submit_request_uuid and self.work_api != "containers":
526 raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
528 if not runtimeContext.name:
529 runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
531 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
532 # Also uploads docker images.
533 merged_map = upload_workflow_deps(self, tool)
535 # Reload tool object which may have been updated by
536 # upload_workflow_deps
537 # Don't validate this time because it will just print redundant errors.
538 loadingContext = self.loadingContext.copy()
539 loadingContext.loader = tool.doc_loader
540 loadingContext.avsc_names = tool.doc_schema
541 loadingContext.metadata = tool.metadata
542 loadingContext.do_validate = False
544 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
547 # Upload local file references in the job order.
548 job_order = upload_job_order(self, "%s input" % runtimeContext.name,
551 existing_uuid = runtimeContext.update_workflow
552 if existing_uuid or runtimeContext.create_workflow:
553 # Create a pipeline template or workflow record and exit.
554 if self.work_api == "jobs":
555 tmpl = RunnerTemplate(self, tool, job_order,
556 runtimeContext.enable_reuse,
558 submit_runner_ram=runtimeContext.submit_runner_ram,
559 name=runtimeContext.name,
560 merged_map=merged_map)
562 # cwltool.main will write our return value to stdout.
563 return (tmpl.uuid, "success")
564 elif self.work_api == "containers":
565 return (upload_workflow(self, tool, job_order,
568 submit_runner_ram=runtimeContext.submit_runner_ram,
569 name=runtimeContext.name,
570 merged_map=merged_map),
573 self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
574 self.eval_timeout = runtimeContext.eval_timeout
576 runtimeContext = runtimeContext.copy()
577 runtimeContext.use_container = True
578 runtimeContext.tmpdir_prefix = "tmp"
579 runtimeContext.work_api = self.work_api
581 if self.work_api == "containers":
582 if self.ignore_docker_for_reuse:
583 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
584 runtimeContext.outdir = "/var/spool/cwl"
585 runtimeContext.docker_outdir = "/var/spool/cwl"
586 runtimeContext.tmpdir = "/tmp"
587 runtimeContext.docker_tmpdir = "/tmp"
588 elif self.work_api == "jobs":
589 if runtimeContext.priority != DEFAULT_PRIORITY:
590 raise Exception("--priority not implemented for jobs API.")
591 runtimeContext.outdir = "$(task.outdir)"
592 runtimeContext.docker_outdir = "$(task.outdir)"
593 runtimeContext.tmpdir = "$(task.tmpdir)"
595 if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
596 raise Exception("--priority must be in the range 1..1000.")
599 if runtimeContext.submit:
600 # Submit a runner job to run the workflow for us.
601 if self.work_api == "containers":
602 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
603 runtimeContext.runnerjob = tool.tool["id"]
604 runnerjob = tool.job(job_order,
605 self.output_callback,
606 runtimeContext).next()
608 runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
611 submit_runner_ram=runtimeContext.submit_runner_ram,
612 name=runtimeContext.name,
613 on_error=runtimeContext.on_error,
614 submit_runner_image=runtimeContext.submit_runner_image,
615 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
616 merged_map=merged_map,
617 priority=runtimeContext.priority,
618 secret_store=self.secret_store)
619 elif self.work_api == "jobs":
620 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
623 submit_runner_ram=runtimeContext.submit_runner_ram,
624 name=runtimeContext.name,
625 on_error=runtimeContext.on_error,
626 submit_runner_image=runtimeContext.submit_runner_image,
627 merged_map=merged_map)
628 elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
629 # Create pipeline for local run
630 self.pipeline = self.api.pipeline_instances().create(
632 "owner_uuid": self.project_uuid,
633 "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
635 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
636 logger.info("Pipeline instance %s", self.pipeline["uuid"])
638 if runnerjob and not runtimeContext.wait:
639 submitargs = runtimeContext.copy()
640 submitargs.submit = False
641 runnerjob.run(submitargs)
642 return (runnerjob.uuid, "success")
644 self.poll_api = arvados.api('v1')
645 self.polling_thread = threading.Thread(target=self.poll_states)
646 self.polling_thread.start()
648 self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
651 jobiter = iter((runnerjob,))
653 if runtimeContext.cwl_runner_job is not None:
654 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
655 jobiter = tool.job(job_order,
656 self.output_callback,
660 self.workflow_eval_lock.acquire()
661 # Holds the lock while this code runs and releases it when
662 # it is safe to do so in self.workflow_eval_lock.wait(),
663 # at which point on_message can update job state and
664 # process output callbacks.
666 loopperf = Perf(metrics, "jobiter")
668 for runnable in jobiter:
671 if self.stop_polling.is_set():
674 if self.task_queue.error is not None:
675 raise self.task_queue.error
678 with Perf(metrics, "run"):
679 self.start_run(runnable, runtimeContext)
681 if (self.task_queue.in_flight + len(self.processes)) > 0:
682 self.workflow_eval_lock.wait(3)
684 logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
689 while (self.task_queue.in_flight + len(self.processes)) > 0:
690 if self.task_queue.error is not None:
691 raise self.task_queue.error
692 self.workflow_eval_lock.wait(3)
694 except UnsupportedRequirement:
697 if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
698 logger.error("Interrupted, workflow will be cancelled")
700 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
702 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
703 body={"state": "Failed"}).execute(num_retries=self.num_retries)
704 if runnerjob and runnerjob.uuid and self.work_api == "containers":
705 self.api.container_requests().update(uuid=runnerjob.uuid,
706 body={"priority": "0"}).execute(num_retries=self.num_retries)
708 self.workflow_eval_lock.release()
709 self.task_queue.drain()
710 self.stop_polling.set()
711 self.polling_thread.join()
712 self.task_queue.join()
714 if self.final_status == "UnsupportedRequirement":
715 raise UnsupportedRequirement("Check log for details.")
717 if self.final_output is None:
718 raise WorkflowException("Workflow did not return a result.")
720 if runtimeContext.submit and isinstance(runnerjob, Runner):
721 logger.info("Final output collection %s", runnerjob.final_output)
723 if self.output_name is None:
724 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
725 if self.output_tags is None:
726 self.output_tags = ""
728 storage_classes = runtimeContext.storage_classes.strip().split(",")
729 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
730 self.set_crunch_output()
732 if runtimeContext.compute_checksum:
733 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
734 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
736 if self.trash_intermediate and self.final_status == "success":
737 self.trash_intermediate_output()
739 return (self.final_output, self.final_status)
743 """Print version string of key packages for provenance and debugging."""
745 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
746 arvpkg = pkg_resources.require("arvados-python-client")
747 cwlpkg = pkg_resources.require("cwltool")
749 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
750 "arvados-python-client", arvpkg[0].version,
751 "cwltool", cwlpkg[0].version)
754 def arg_parser(): # type: () -> argparse.ArgumentParser
755 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
757 parser.add_argument("--basedir", type=str,
758 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
759 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
760 help="Output directory, default current directory")
762 parser.add_argument("--eval-timeout",
763 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
767 exgroup = parser.add_mutually_exclusive_group()
768 exgroup.add_argument("--print-dot", action="store_true",
769 help="Print workflow visualization in graphviz format and exit")
770 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
771 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
773 exgroup = parser.add_mutually_exclusive_group()
774 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
775 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
776 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
778 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
780 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
782 exgroup = parser.add_mutually_exclusive_group()
783 exgroup.add_argument("--enable-reuse", action="store_true",
784 default=True, dest="enable_reuse",
785 help="Enable job or container reuse (default)")
786 exgroup.add_argument("--disable-reuse", action="store_false",
787 default=True, dest="enable_reuse",
788 help="Disable job or container reuse")
790 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
791 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
792 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
793 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
794 help="Ignore Docker image version when deciding whether to reuse past jobs.",
797 exgroup = parser.add_mutually_exclusive_group()
798 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
799 default=True, dest="submit")
800 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
801 default=True, dest="submit")
802 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
803 dest="create_workflow")
804 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
805 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
807 exgroup = parser.add_mutually_exclusive_group()
808 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
809 default=True, dest="wait")
810 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
811 default=True, dest="wait")
813 exgroup = parser.add_mutually_exclusive_group()
814 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
815 default=True, dest="log_timestamps")
816 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
817 default=True, dest="log_timestamps")
819 parser.add_argument("--api", type=str,
820 default=None, dest="work_api",
821 choices=("jobs", "containers"),
822 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
824 parser.add_argument("--compute-checksum", action="store_true", default=False,
825 help="Compute checksum of contents while collecting outputs",
826 dest="compute_checksum")
828 parser.add_argument("--submit-runner-ram", type=int,
829 help="RAM (in MiB) required for the workflow runner job (default 1024)",
832 parser.add_argument("--submit-runner-image", type=str,
833 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
836 parser.add_argument("--submit-request-uuid", type=str,
838 help="Update and commit supplied container request instead of creating a new one (containers API only).")
840 parser.add_argument("--name", type=str,
841 help="Name to use for workflow execution instance.",
844 parser.add_argument("--on-error", type=str,
845 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
846 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
848 parser.add_argument("--enable-dev", action="store_true",
849 help="Enable loading and running development versions "
850 "of CWL spec.", default=False)
851 parser.add_argument('--storage-classes', default="default", type=str,
852 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
854 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
855 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
858 parser.add_argument("--priority", type=int,
859 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
860 default=DEFAULT_PRIORITY)
862 parser.add_argument("--disable-validate", dest="do_validate",
863 action="store_false", default=True,
864 help=argparse.SUPPRESS)
866 parser.add_argument("--disable-js-validation",
867 action="store_true", default=False,
868 help=argparse.SUPPRESS)
870 parser.add_argument("--thread-count", type=int,
871 default=4, help="Number of threads to use for job submit and output collection.")
873 exgroup = parser.add_mutually_exclusive_group()
874 exgroup.add_argument("--trash-intermediate", action="store_true",
875 default=False, dest="trash_intermediate",
876 help="Immediately trash intermediate outputs on workflow success.")
877 exgroup.add_argument("--no-trash-intermediate", action="store_false",
878 default=False, dest="trash_intermediate",
879 help="Do not trash intermediate outputs (default).")
881 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
882 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
887 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
888 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
889 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
890 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
892 cwltool.process.supportedProcessRequirements.extend([
893 "http://arvados.org/cwl#RunInSingleContainer",
894 "http://arvados.org/cwl#OutputDirType",
895 "http://arvados.org/cwl#RuntimeConstraints",
896 "http://arvados.org/cwl#PartitionRequirement",
897 "http://arvados.org/cwl#APIRequirement",
898 "http://commonwl.org/cwltool#LoadListingRequirement",
899 "http://arvados.org/cwl#IntermediateOutput",
900 "http://arvados.org/cwl#ReuseRequirement"
903 def exit_signal_handler(sigcode, frame):
904 logger.error("Caught signal {}, exiting.".format(sigcode))
907 def main(args, stdout, stderr, api_client=None, keep_client=None,
908 install_sig_handlers=True):
909 parser = arg_parser()
911 job_order_object = None
912 arvargs = parser.parse_args(args)
914 if len(arvargs.storage_classes.strip().split(',')) > 1:
915 logger.error("Multiple storage classes are not supported currently.")
918 arvargs.use_container = True
919 arvargs.relax_path_checks = True
920 arvargs.print_supported_versions = False
922 if install_sig_handlers:
923 arv_cmd.install_signal_handlers()
925 if arvargs.update_workflow:
926 if arvargs.update_workflow.find('-7fd4e-') == 5:
927 want_api = 'containers'
928 elif arvargs.update_workflow.find('-p5p6p-') == 5:
932 if want_api and arvargs.work_api and want_api != arvargs.work_api:
933 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
934 arvargs.update_workflow, want_api, arvargs.work_api))
936 arvargs.work_api = want_api
938 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
939 job_order_object = ({}, "")
944 if api_client is None:
945 api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
946 keep_client = api_client.keep
947 # Make an API object now so errors are reported early.
948 api_client.users().current().execute()
949 if keep_client is None:
950 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
951 runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
952 except Exception as e:
957 logger.setLevel(logging.DEBUG)
958 logging.getLogger('arvados').setLevel(logging.DEBUG)
961 logger.setLevel(logging.WARN)
962 logging.getLogger('arvados').setLevel(logging.WARN)
963 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
966 metrics.setLevel(logging.DEBUG)
967 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
969 if arvargs.log_timestamps:
970 arvados.log_handler.setFormatter(logging.Formatter(
971 '%(asctime)s %(name)s %(levelname)s: %(message)s',
972 '%Y-%m-%d %H:%M:%S'))
974 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
976 for key, val in cwltool.argparser.get_default_args().items():
977 if not hasattr(arvargs, key):
978 setattr(arvargs, key, val)
980 runtimeContext = ArvRuntimeContext(vars(arvargs))
981 runtimeContext.make_fs_access = partial(CollectionFsAccess,
982 collection_cache=runner.collection_cache)
984 return cwltool.main.main(args=arvargs,
987 executor=runner.arv_executor,
988 versionfunc=versionstring,
989 job_order_object=job_order_object,
990 logger_handler=arvados.log_handler,
991 custom_schema_callback=add_arv_hints,
992 loadingContext=runner.loadingContext,
993 runtimeContext=runtimeContext)