3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 import cwltool.process
22 from schema_salad.sourceline import SourceLine
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_instance
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
49 arvados.log_handler.setFormatter(logging.Formatter(
50 '%(asctime)s %(name)s %(levelname)s: %(message)s',
53 class ArvCwlRunner(object):
54 """Execute a CWL tool or workflow, submit work (using either jobs or
55 containers API), wait for them to complete, and report output.
59 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
62 self.lock = threading.Lock()
63 self.cond = threading.Condition(self.lock)
64 self.final_output = None
65 self.final_status = None
67 self.num_retries = num_retries
69 self.stop_polling = threading.Event()
72 self.final_output_collection = None
73 self.output_name = output_name
74 self.output_tags = output_tags
75 self.project_uuid = None
77 if keep_client is not None:
78 self.keep_client = keep_client
80 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 expected_api = ["jobs", "containers"]
84 for api in expected_api:
86 methods = self.api._rootDesc.get('resources')[api]['methods']
87 if ('httpMethod' in methods['create'] and
88 (work_api == api or work_api is None)):
96 raise Exception("No supported APIs")
98 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
100 def arv_make_tool(self, toolpath_object, **kwargs):
101 kwargs["work_api"] = self.work_api
102 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
104 keep_client=self.keep_client)
105 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106 return ArvadosCommandTool(self, toolpath_object, **kwargs)
107 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108 return ArvadosWorkflow(self, toolpath_object, **kwargs)
110 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
112 def output_callback(self, out, processStatus):
113 if processStatus == "success":
114 logger.info("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Complete"}).execute(num_retries=self.num_retries)
119 logger.warn("Overall process status is %s", processStatus)
121 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122 body={"state": "Failed"}).execute(num_retries=self.num_retries)
123 self.final_status = processStatus
124 self.final_output = out
126 def on_message(self, event):
127 if "object_uuid" in event:
128 if event["object_uuid"] in self.processes and event["event_type"] == "update":
129 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130 uuid = event["object_uuid"]
132 j = self.processes[uuid]
133 logger.info("%s %s is Running", self.label(j), uuid)
135 j.update_pipeline_component(event["properties"]["new_attributes"])
136 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137 uuid = event["object_uuid"]
140 j = self.processes[uuid]
141 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142 with Perf(metrics, "done %s" % j.name):
143 j.done(event["properties"]["new_attributes"])
148 def label(self, obj):
149 return "[%s %s]" % (self.work_api[0:-1], obj.name)
151 def poll_states(self):
152 """Poll status of jobs or containers listed in the processes dict.
154 Runs in a separate thread.
159 self.stop_polling.wait(15)
160 if self.stop_polling.is_set():
163 keys = self.processes.keys()
167 if self.work_api == "containers":
168 table = self.poll_api.container_requests()
169 elif self.work_api == "jobs":
170 table = self.poll_api.jobs()
173 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174 except Exception as e:
175 logger.warn("Error checking states on API server: %s", e)
178 for p in proc_states["items"]:
180 "object_uuid": p["uuid"],
181 "event_type": "update",
187 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
189 self.processes.clear()
193 self.stop_polling.set()
195 def get_uploaded(self):
196 return self.uploaded.copy()
198 def add_uploaded(self, src, pair):
199 self.uploaded[src] = pair
201 def check_features(self, obj):
202 if isinstance(obj, dict):
203 if obj.get("class") == "InitialWorkDirRequirement":
204 if self.work_api == "containers":
205 raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206 if obj.get("writable"):
207 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208 if obj.get("class") == "CommandLineTool":
209 if self.work_api == "containers":
211 raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212 if obj.get("stderr"):
213 raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214 for v in obj.itervalues():
215 self.check_features(v)
216 elif isinstance(obj, list):
217 for i,v in enumerate(obj):
218 with SourceLine(obj, i, UnsupportedRequirement):
219 self.check_features(v)
221 def make_output_collection(self, name, tagsString, outputObj):
222 outputObj = copy.deepcopy(outputObj)
225 def capture(fileobj):
226 files.append(fileobj)
228 adjustDirObjs(outputObj, capture)
229 adjustFileObjs(outputObj, capture)
231 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
233 final = arvados.collection.Collection(api_client=self.api,
234 keep_client=self.keep_client,
235 num_retries=self.num_retries)
238 for k,v in generatemapper.items():
239 if k.startswith("_:"):
240 if v.type == "Directory":
242 if v.type == "CreateFile":
243 with final.open(v.target, "wb") as f:
244 f.write(v.resolved.encode("utf-8"))
247 if not k.startswith("keep:"):
248 raise Exception("Output source is not in keep or a literal")
250 srccollection = sp[0][5:]
251 if srccollection not in srccollections:
253 srccollections[srccollection] = arvados.collection.CollectionReader(
256 keep_client=self.keep_client,
257 num_retries=self.num_retries)
258 except arvados.errors.ArgumentError as e:
259 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
261 reader = srccollections[srccollection]
263 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
264 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
266 logger.warn("While preparing output collection: %s", e)
268 def rewrite(fileobj):
269 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
270 for k in ("basename", "listing", "contents"):
274 adjustDirObjs(outputObj, rewrite)
275 adjustFileObjs(outputObj, rewrite)
277 with final.open("cwl.output.json", "w") as f:
278 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
280 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
282 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
283 final.api_response()["name"],
284 final.manifest_locator())
286 final_uuid = final.manifest_locator()
287 tags = tagsString.split(',')
289 self.api.links().create(body={
290 "head_uuid": final_uuid, "link_class": "tag", "name": tag
291 }).execute(num_retries=self.num_retries)
293 def finalcollection(fileobj):
294 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
296 adjustDirObjs(outputObj, finalcollection)
297 adjustFileObjs(outputObj, finalcollection)
299 return (outputObj, final)
301 def set_crunch_output(self):
302 if self.work_api == "containers":
304 current = self.api.containers().current().execute(num_retries=self.num_retries)
305 except ApiError as e:
306 # Status code 404 just means we're not running in a container.
307 if e.resp.status != 404:
308 logger.info("Getting current container: %s", e)
311 self.api.containers().update(uuid=current['uuid'],
313 'output': self.final_output_collection.portable_data_hash(),
314 }).execute(num_retries=self.num_retries)
315 except Exception as e:
316 logger.info("Setting container output: %s", e)
317 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
318 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
320 'output': self.final_output_collection.portable_data_hash(),
321 'success': self.final_status == "success",
323 }).execute(num_retries=self.num_retries)
325 def arv_executor(self, tool, job_order, **kwargs):
326 self.debug = kwargs.get("debug")
328 tool.visit(self.check_features)
330 self.project_uuid = kwargs.get("project_uuid")
332 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
334 keep_client=self.keep_client)
335 self.fs_access = make_fs_access(kwargs["basedir"])
337 existing_uuid = kwargs.get("update_workflow")
338 if existing_uuid or kwargs.get("create_workflow"):
339 if self.work_api == "jobs":
340 tmpl = RunnerTemplate(self, tool, job_order,
341 kwargs.get("enable_reuse"),
343 submit_runner_ram=kwargs.get("submit_runner_ram"),
344 name=kwargs.get("name"))
346 # cwltool.main will write our return value to stdout.
349 return upload_workflow(self, tool, job_order,
352 submit_runner_ram=kwargs.get("submit_runner_ram"),
353 name=kwargs.get("name"))
355 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
357 kwargs["make_fs_access"] = make_fs_access
358 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
359 kwargs["use_container"] = True
360 kwargs["tmpdir_prefix"] = "tmp"
361 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
363 if not kwargs["name"]:
366 if self.work_api == "containers":
367 kwargs["outdir"] = "/var/spool/cwl"
368 kwargs["docker_outdir"] = "/var/spool/cwl"
369 kwargs["tmpdir"] = "/tmp"
370 kwargs["docker_tmpdir"] = "/tmp"
371 elif self.work_api == "jobs":
372 kwargs["outdir"] = "$(task.outdir)"
373 kwargs["docker_outdir"] = "$(task.outdir)"
374 kwargs["tmpdir"] = "$(task.tmpdir)"
376 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
379 if kwargs.get("submit"):
380 if self.work_api == "containers":
381 if tool.tool["class"] == "CommandLineTool":
382 kwargs["runnerjob"] = tool.tool["id"]
383 runnerjob = tool.job(job_order,
384 self.output_callback,
387 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
388 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
389 name=kwargs.get("name"), on_error=kwargs.get("on_error"))
391 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
392 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
393 name=kwargs.get("name"), on_error=kwargs.get("on_error"))
395 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
396 # Create pipeline for local run
397 self.pipeline = self.api.pipeline_instances().create(
399 "owner_uuid": self.project_uuid,
400 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
402 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
403 logger.info("Pipeline instance %s", self.pipeline["uuid"])
405 if runnerjob and not kwargs.get("wait"):
406 runnerjob.run(wait=kwargs.get("wait"))
407 return runnerjob.uuid
409 self.poll_api = arvados.api('v1')
410 self.polling_thread = threading.Thread(target=self.poll_states)
411 self.polling_thread.start()
414 jobiter = iter((runnerjob,))
416 if "cwl_runner_job" in kwargs:
417 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
418 jobiter = tool.job(job_order,
419 self.output_callback,
424 # Will continue to hold the lock for the duration of this code
425 # except when in cond.wait(), at which point on_message can update
426 # job state and process output callbacks.
428 loopperf = Perf(metrics, "jobiter")
430 for runnable in jobiter:
433 if self.stop_polling.is_set():
437 with Perf(metrics, "run"):
438 runnable.run(**kwargs)
443 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
448 while self.processes:
451 except UnsupportedRequirement:
454 if sys.exc_info()[0] is KeyboardInterrupt:
455 logger.error("Interrupted, marking pipeline as failed")
457 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
459 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
460 body={"state": "Failed"}).execute(num_retries=self.num_retries)
461 if runnerjob and runnerjob.uuid and self.work_api == "containers":
462 self.api.container_requests().update(uuid=runnerjob.uuid,
463 body={"priority": "0"}).execute(num_retries=self.num_retries)
466 self.stop_polling.set()
467 self.polling_thread.join()
469 if self.final_status == "UnsupportedRequirement":
470 raise UnsupportedRequirement("Check log for details.")
472 if self.final_output is None:
473 raise WorkflowException("Workflow did not return a result.")
475 if kwargs.get("submit") and isinstance(runnerjob, Runner):
476 logger.info("Final output collection %s", runnerjob.final_output)
478 if self.output_name is None:
479 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
480 if self.output_tags is None:
481 self.output_tags = ""
482 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
483 self.set_crunch_output()
485 if self.final_status != "success":
486 raise WorkflowException("Workflow failed.")
488 if kwargs.get("compute_checksum"):
489 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
490 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
492 return self.final_output
496 """Print version string of key packages for provenance and debugging."""
498 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
499 arvpkg = pkg_resources.require("arvados-python-client")
500 cwlpkg = pkg_resources.require("cwltool")
502 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
503 "arvados-python-client", arvpkg[0].version,
504 "cwltool", cwlpkg[0].version)
507 def arg_parser(): # type: () -> argparse.ArgumentParser
508 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
510 parser.add_argument("--basedir", type=str,
511 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
512 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
513 help="Output directory, default current directory")
515 parser.add_argument("--eval-timeout",
516 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
519 parser.add_argument("--version", action="store_true", help="Print version and exit")
521 exgroup = parser.add_mutually_exclusive_group()
522 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
523 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
524 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
526 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
528 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
530 exgroup = parser.add_mutually_exclusive_group()
531 exgroup.add_argument("--enable-reuse", action="store_true",
532 default=True, dest="enable_reuse",
534 exgroup.add_argument("--disable-reuse", action="store_false",
535 default=True, dest="enable_reuse",
538 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
539 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
540 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
541 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
542 help="Ignore Docker image version when deciding whether to reuse past jobs.",
545 exgroup = parser.add_mutually_exclusive_group()
546 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
547 default=True, dest="submit")
548 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
549 default=True, dest="submit")
550 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
551 dest="create_workflow")
552 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
553 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
555 exgroup = parser.add_mutually_exclusive_group()
556 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
557 default=True, dest="wait")
558 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
559 default=True, dest="wait")
561 exgroup = parser.add_mutually_exclusive_group()
562 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
563 default=True, dest="log_timestamps")
564 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
565 default=True, dest="log_timestamps")
567 parser.add_argument("--api", type=str,
568 default=None, dest="work_api",
569 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
571 parser.add_argument("--compute-checksum", action="store_true", default=False,
572 help="Compute checksum of contents while collecting outputs",
573 dest="compute_checksum")
575 parser.add_argument("--submit-runner-ram", type=int,
576 help="RAM (in MiB) required for the workflow runner job (default 1024)",
579 parser.add_argument("--name", type=str,
580 help="Name to use for workflow execution instance.",
583 parser.add_argument("--on-error", type=str,
584 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
585 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
587 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
588 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
594 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
595 cache["http://arvados.org/cwl"] = res.read()
597 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
598 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
599 for n in extnames.names:
600 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
601 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
602 document_loader.idx["http://arvados.org/cwl#"+n] = {}
604 def main(args, stdout, stderr, api_client=None, keep_client=None):
605 parser = arg_parser()
607 job_order_object = None
608 arvargs = parser.parse_args(args)
611 print versionstring()
614 if arvargs.update_workflow:
615 if arvargs.update_workflow.find('-7fd4e-') == 5:
616 want_api = 'containers'
617 elif arvargs.update_workflow.find('-p5p6p-') == 5:
621 if want_api and arvargs.work_api and want_api != arvargs.work_api:
622 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
623 arvargs.update_workflow, want_api, arvargs.work_api))
625 arvargs.work_api = want_api
627 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
628 job_order_object = ({}, "")
633 if api_client is None:
634 api_client=arvados.api('v1', model=OrderedJsonModel())
635 if keep_client is None:
636 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
637 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
638 num_retries=4, output_name=arvargs.output_name,
639 output_tags=arvargs.output_tags)
640 except Exception as e:
645 logger.setLevel(logging.DEBUG)
648 logger.setLevel(logging.WARN)
649 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
652 metrics.setLevel(logging.DEBUG)
653 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
655 if arvargs.log_timestamps:
656 arvados.log_handler.setFormatter(logging.Formatter(
657 '%(asctime)s %(name)s %(levelname)s: %(message)s',
658 '%Y-%m-%d %H:%M:%S'))
660 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
662 arvargs.conformance_test = None
663 arvargs.use_container = True
664 arvargs.relax_path_checks = True
665 arvargs.validate = None
667 return cwltool.main.main(args=arvargs,
670 executor=runner.arv_executor,
671 makeTool=runner.arv_make_tool,
672 versionfunc=versionstring,
673 job_order_object=job_order_object,
674 make_fs_access=partial(CollectionFsAccess,
675 api_client=api_client,
676 keep_client=keep_client),
677 fetcher_constructor=partial(CollectionFetcher,
678 api_client=api_client,
679 keep_client=keep_client),
680 resolver=partial(collectionResolver, api_client),
681 logger_handler=arvados.log_handler)