3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 import cwltool.process
22 from schema_salad.sourceline import SourceLine
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_instance
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
49 arvados.log_handler.setFormatter(logging.Formatter(
50 '%(asctime)s %(name)s %(levelname)s: %(message)s',
53 class ArvCwlRunner(object):
54 """Execute a CWL tool or workflow, submit work (using either jobs or
55 containers API), wait for them to complete, and report output.
59 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
62 self.lock = threading.Lock()
63 self.cond = threading.Condition(self.lock)
64 self.final_output = None
65 self.final_status = None
67 self.num_retries = num_retries
69 self.stop_polling = threading.Event()
72 self.final_output_collection = None
73 self.output_name = output_name
74 self.output_tags = output_tags
75 self.project_uuid = None
77 if keep_client is not None:
78 self.keep_client = keep_client
80 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 expected_api = ["jobs", "containers"]
84 for api in expected_api:
86 methods = self.api._rootDesc.get('resources')[api]['methods']
87 if ('httpMethod' in methods['create'] and
88 (work_api == api or work_api is None)):
96 raise Exception("No supported APIs")
98 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
100 def arv_make_tool(self, toolpath_object, **kwargs):
101 kwargs["work_api"] = self.work_api
102 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
104 keep_client=self.keep_client)
105 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106 return ArvadosCommandTool(self, toolpath_object, **kwargs)
107 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108 return ArvadosWorkflow(self, toolpath_object, **kwargs)
110 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
112 def output_callback(self, out, processStatus):
113 if processStatus == "success":
114 logger.info("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Complete"}).execute(num_retries=self.num_retries)
119 logger.warn("Overall process status is %s", processStatus)
121 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122 body={"state": "Failed"}).execute(num_retries=self.num_retries)
123 self.final_status = processStatus
124 self.final_output = out
126 def on_message(self, event):
127 if "object_uuid" in event:
128 if event["object_uuid"] in self.processes and event["event_type"] == "update":
129 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130 uuid = event["object_uuid"]
132 j = self.processes[uuid]
133 logger.info("%s %s is Running", self.label(j), uuid)
135 j.update_pipeline_component(event["properties"]["new_attributes"])
136 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137 uuid = event["object_uuid"]
140 j = self.processes[uuid]
141 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142 with Perf(metrics, "done %s" % j.name):
143 j.done(event["properties"]["new_attributes"])
148 def label(self, obj):
149 return "[%s %s]" % (self.work_api[0:-1], obj.name)
151 def poll_states(self):
152 """Poll status of jobs or containers listed in the processes dict.
154 Runs in a separate thread.
159 self.stop_polling.wait(15)
160 if self.stop_polling.is_set():
163 keys = self.processes.keys()
167 if self.work_api == "containers":
168 table = self.poll_api.container_requests()
169 elif self.work_api == "jobs":
170 table = self.poll_api.jobs()
173 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174 except Exception as e:
175 logger.warn("Error checking states on API server: %s", e)
178 for p in proc_states["items"]:
180 "object_uuid": p["uuid"],
181 "event_type": "update",
187 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
189 self.processes.clear()
193 self.stop_polling.set()
195 def get_uploaded(self):
196 return self.uploaded.copy()
198 def add_uploaded(self, src, pair):
199 self.uploaded[src] = pair
201 def check_features(self, obj):
202 if isinstance(obj, dict):
203 if obj.get("class") == "InitialWorkDirRequirement":
204 if self.work_api == "containers":
205 raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206 if obj.get("writable"):
207 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208 if obj.get("class") == "CommandLineTool":
209 if self.work_api == "containers":
211 raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212 if obj.get("stderr"):
213 raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214 for v in obj.itervalues():
215 self.check_features(v)
216 elif isinstance(obj, list):
217 for i,v in enumerate(obj):
218 with SourceLine(obj, i, UnsupportedRequirement):
219 self.check_features(v)
221 def make_output_collection(self, name, tagsString, outputObj):
222 outputObj = copy.deepcopy(outputObj)
225 def capture(fileobj):
226 files.append(fileobj)
228 adjustDirObjs(outputObj, capture)
229 adjustFileObjs(outputObj, capture)
231 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
233 final = arvados.collection.Collection(api_client=self.api,
234 keep_client=self.keep_client,
235 num_retries=self.num_retries)
238 for k,v in generatemapper.items():
239 if k.startswith("_:"):
240 if v.type == "Directory":
242 if v.type == "CreateFile":
243 with final.open(v.target, "wb") as f:
244 f.write(v.resolved.encode("utf-8"))
247 if not k.startswith("keep:"):
248 raise Exception("Output source is not in keep or a literal")
250 srccollection = sp[0][5:]
251 if srccollection not in srccollections:
253 srccollections[srccollection] = arvados.collection.CollectionReader(
256 keep_client=self.keep_client,
257 num_retries=self.num_retries)
258 except arvados.errors.ArgumentError as e:
259 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
261 reader = srccollections[srccollection]
263 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
264 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
266 logger.warn("While preparing output collection: %s", e)
268 def rewrite(fileobj):
269 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
270 for k in ("basename", "listing", "contents"):
274 adjustDirObjs(outputObj, rewrite)
275 adjustFileObjs(outputObj, rewrite)
277 with final.open("cwl.output.json", "w") as f:
278 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
280 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
282 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
283 final.api_response()["name"],
284 final.manifest_locator())
286 final_uuid = final.manifest_locator()
287 tags = tagsString.split(',')
289 self.api.links().create(body={
290 "head_uuid": final_uuid, "link_class": "tag", "name": tag
291 }).execute(num_retries=self.num_retries)
293 def finalcollection(fileobj):
294 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
296 adjustDirObjs(outputObj, finalcollection)
297 adjustFileObjs(outputObj, finalcollection)
299 return (outputObj, final)
301 def set_crunch_output(self):
302 if self.work_api == "containers":
304 current = self.api.containers().current().execute(num_retries=self.num_retries)
305 except ApiError as e:
306 # Status code 404 just means we're not running in a container.
307 if e.resp.status != 404:
308 logger.info("Getting current container: %s", e)
311 self.api.containers().update(uuid=current['uuid'],
313 'output': self.final_output_collection.portable_data_hash(),
314 }).execute(num_retries=self.num_retries)
315 except Exception as e:
316 logger.info("Setting container output: %s", e)
317 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
318 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
320 'output': self.final_output_collection.portable_data_hash(),
321 'success': self.final_status == "success",
323 }).execute(num_retries=self.num_retries)
325 def arv_executor(self, tool, job_order, **kwargs):
326 self.debug = kwargs.get("debug")
328 tool.visit(self.check_features)
330 self.project_uuid = kwargs.get("project_uuid")
332 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
334 keep_client=self.keep_client)
335 self.fs_access = make_fs_access(kwargs["basedir"])
337 existing_uuid = kwargs.get("update_workflow")
338 if existing_uuid or kwargs.get("create_workflow"):
339 if self.work_api == "jobs":
340 tmpl = RunnerTemplate(self, tool, job_order,
341 kwargs.get("enable_reuse"),
343 submit_runner_ram=kwargs.get("submit_runner_ram"),
344 name=kwargs.get("name"))
346 # cwltool.main will write our return value to stdout.
347 return (tmpl.uuid, "success")
349 return (upload_workflow(self, tool, job_order,
352 submit_runner_ram=kwargs.get("submit_runner_ram"),
353 name=kwargs.get("name")), "success")
355 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
357 kwargs["make_fs_access"] = make_fs_access
358 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
359 kwargs["use_container"] = True
360 kwargs["tmpdir_prefix"] = "tmp"
361 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
363 if not kwargs["name"]:
366 if self.work_api == "containers":
367 kwargs["outdir"] = "/var/spool/cwl"
368 kwargs["docker_outdir"] = "/var/spool/cwl"
369 kwargs["tmpdir"] = "/tmp"
370 kwargs["docker_tmpdir"] = "/tmp"
371 elif self.work_api == "jobs":
372 kwargs["outdir"] = "$(task.outdir)"
373 kwargs["docker_outdir"] = "$(task.outdir)"
374 kwargs["tmpdir"] = "$(task.tmpdir)"
376 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
379 if kwargs.get("submit"):
380 if self.work_api == "containers":
381 if tool.tool["class"] == "CommandLineTool":
382 kwargs["runnerjob"] = tool.tool["id"]
383 runnerjob = tool.job(job_order,
384 self.output_callback,
387 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
390 submit_runner_ram=kwargs.get("submit_runner_ram"),
391 name=kwargs.get("name"),
392 on_error=kwargs.get("on_error"),
393 submit_runner_image=kwargs.get("submit_runner_image"))
394 elif self.work_api == "jobs":
395 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
398 submit_runner_ram=kwargs.get("submit_runner_ram"),
399 name=kwargs.get("name"),
400 on_error=kwargs.get("on_error"),
401 submit_runner_image=kwargs.get("submit_runner_image"))
403 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
404 # Create pipeline for local run
405 self.pipeline = self.api.pipeline_instances().create(
407 "owner_uuid": self.project_uuid,
408 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
410 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
411 logger.info("Pipeline instance %s", self.pipeline["uuid"])
413 if runnerjob and not kwargs.get("wait"):
414 runnerjob.run(wait=kwargs.get("wait"))
415 return (runnerjob.uuid, "success")
417 self.poll_api = arvados.api('v1')
418 self.polling_thread = threading.Thread(target=self.poll_states)
419 self.polling_thread.start()
422 jobiter = iter((runnerjob,))
424 if "cwl_runner_job" in kwargs:
425 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
426 jobiter = tool.job(job_order,
427 self.output_callback,
432 # Will continue to hold the lock for the duration of this code
433 # except when in cond.wait(), at which point on_message can update
434 # job state and process output callbacks.
436 loopperf = Perf(metrics, "jobiter")
438 for runnable in jobiter:
441 if self.stop_polling.is_set():
445 with Perf(metrics, "run"):
446 runnable.run(**kwargs)
451 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
456 while self.processes:
459 except UnsupportedRequirement:
462 if sys.exc_info()[0] is KeyboardInterrupt:
463 logger.error("Interrupted, marking pipeline as failed")
465 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
467 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
468 body={"state": "Failed"}).execute(num_retries=self.num_retries)
469 if runnerjob and runnerjob.uuid and self.work_api == "containers":
470 self.api.container_requests().update(uuid=runnerjob.uuid,
471 body={"priority": "0"}).execute(num_retries=self.num_retries)
474 self.stop_polling.set()
475 self.polling_thread.join()
477 if self.final_status == "UnsupportedRequirement":
478 raise UnsupportedRequirement("Check log for details.")
480 if self.final_output is None:
481 raise WorkflowException("Workflow did not return a result.")
483 if kwargs.get("submit") and isinstance(runnerjob, Runner):
484 logger.info("Final output collection %s", runnerjob.final_output)
486 if self.output_name is None:
487 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
488 if self.output_tags is None:
489 self.output_tags = ""
490 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
491 self.set_crunch_output()
493 if kwargs.get("compute_checksum"):
494 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
495 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
497 return (self.final_output, self.final_status)
501 """Print version string of key packages for provenance and debugging."""
503 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
504 arvpkg = pkg_resources.require("arvados-python-client")
505 cwlpkg = pkg_resources.require("cwltool")
507 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
508 "arvados-python-client", arvpkg[0].version,
509 "cwltool", cwlpkg[0].version)
512 def arg_parser(): # type: () -> argparse.ArgumentParser
513 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
515 parser.add_argument("--basedir", type=str,
516 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
517 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
518 help="Output directory, default current directory")
520 parser.add_argument("--eval-timeout",
521 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
524 parser.add_argument("--version", action="store_true", help="Print version and exit")
526 exgroup = parser.add_mutually_exclusive_group()
527 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
528 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
529 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
531 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
533 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
535 exgroup = parser.add_mutually_exclusive_group()
536 exgroup.add_argument("--enable-reuse", action="store_true",
537 default=True, dest="enable_reuse",
539 exgroup.add_argument("--disable-reuse", action="store_false",
540 default=True, dest="enable_reuse",
543 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
544 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
545 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
546 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
547 help="Ignore Docker image version when deciding whether to reuse past jobs.",
550 exgroup = parser.add_mutually_exclusive_group()
551 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
552 default=True, dest="submit")
553 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
554 default=True, dest="submit")
555 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
556 dest="create_workflow")
557 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
558 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
560 exgroup = parser.add_mutually_exclusive_group()
561 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
562 default=True, dest="wait")
563 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
564 default=True, dest="wait")
566 exgroup = parser.add_mutually_exclusive_group()
567 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
568 default=True, dest="log_timestamps")
569 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
570 default=True, dest="log_timestamps")
572 parser.add_argument("--api", type=str,
573 default=None, dest="work_api",
574 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
576 parser.add_argument("--compute-checksum", action="store_true", default=False,
577 help="Compute checksum of contents while collecting outputs",
578 dest="compute_checksum")
580 parser.add_argument("--submit-runner-ram", type=int,
581 help="RAM (in MiB) required for the workflow runner job (default 1024)",
584 parser.add_argument("--submit-runner-image", type=str,
585 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
588 parser.add_argument("--name", type=str,
589 help="Name to use for workflow execution instance.",
592 parser.add_argument("--on-error", type=str,
593 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
594 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
596 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
597 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
603 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
604 cache["http://arvados.org/cwl"] = res.read()
606 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
607 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
608 for n in extnames.names:
609 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
610 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
611 document_loader.idx["http://arvados.org/cwl#"+n] = {}
613 def main(args, stdout, stderr, api_client=None, keep_client=None):
614 parser = arg_parser()
616 job_order_object = None
617 arvargs = parser.parse_args(args)
620 print versionstring()
623 if arvargs.update_workflow:
624 if arvargs.update_workflow.find('-7fd4e-') == 5:
625 want_api = 'containers'
626 elif arvargs.update_workflow.find('-p5p6p-') == 5:
630 if want_api and arvargs.work_api and want_api != arvargs.work_api:
631 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
632 arvargs.update_workflow, want_api, arvargs.work_api))
634 arvargs.work_api = want_api
636 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
637 job_order_object = ({}, "")
642 if api_client is None:
643 api_client=arvados.api('v1', model=OrderedJsonModel())
644 if keep_client is None:
645 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
646 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
647 num_retries=4, output_name=arvargs.output_name,
648 output_tags=arvargs.output_tags)
649 except Exception as e:
654 logger.setLevel(logging.DEBUG)
657 logger.setLevel(logging.WARN)
658 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
661 metrics.setLevel(logging.DEBUG)
662 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
664 if arvargs.log_timestamps:
665 arvados.log_handler.setFormatter(logging.Formatter(
666 '%(asctime)s %(name)s %(levelname)s: %(message)s',
667 '%Y-%m-%d %H:%M:%S'))
669 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
671 arvargs.conformance_test = None
672 arvargs.use_container = True
673 arvargs.relax_path_checks = True
674 arvargs.validate = None
676 return cwltool.main.main(args=arvargs,
679 executor=runner.arv_executor,
680 makeTool=runner.arv_make_tool,
681 versionfunc=versionstring,
682 job_order_object=job_order_object,
683 make_fs_access=partial(CollectionFsAccess,
684 api_client=api_client,
685 keep_client=keep_client),
686 fetcher_constructor=partial(CollectionFetcher,
687 api_client=api_client,
688 keep_client=keep_client),
689 resolver=partial(collectionResolver, api_client),
690 logger_handler=arvados.log_handler)