3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 import cwltool.process
22 from schema_salad.sourceline import SourceLine
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_instance
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
49 arvados.log_handler.setFormatter(logging.Formatter(
50 '%(asctime)s %(name)s %(levelname)s: %(message)s',
53 class ArvCwlRunner(object):
54 """Execute a CWL tool or workflow, submit work (using either jobs or
55 containers API), wait for them to complete, and report output.
59 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
62 self.lock = threading.Lock()
63 self.cond = threading.Condition(self.lock)
64 self.final_output = None
65 self.final_status = None
67 self.num_retries = num_retries
69 self.stop_polling = threading.Event()
72 self.final_output_collection = None
73 self.output_name = output_name
74 self.output_tags = output_tags
75 self.project_uuid = None
77 if keep_client is not None:
78 self.keep_client = keep_client
80 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 expected_api = ["jobs", "containers"]
84 for api in expected_api:
86 methods = self.api._rootDesc.get('resources')[api]['methods']
87 if ('httpMethod' in methods['create'] and
88 (work_api == api or work_api is None)):
96 raise Exception("No supported APIs")
98 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
100 def arv_make_tool(self, toolpath_object, **kwargs):
101 kwargs["work_api"] = self.work_api
102 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
104 keep_client=self.keep_client)
105 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106 return ArvadosCommandTool(self, toolpath_object, **kwargs)
107 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108 return ArvadosWorkflow(self, toolpath_object, **kwargs)
110 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
112 def output_callback(self, out, processStatus):
113 if processStatus == "success":
114 logger.info("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Complete"}).execute(num_retries=self.num_retries)
119 logger.warn("Overall process status is %s", processStatus)
121 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122 body={"state": "Failed"}).execute(num_retries=self.num_retries)
123 self.final_status = processStatus
124 self.final_output = out
126 def on_message(self, event):
127 if "object_uuid" in event:
128 if event["object_uuid"] in self.processes and event["event_type"] == "update":
129 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130 uuid = event["object_uuid"]
132 j = self.processes[uuid]
133 logger.info("%s %s is Running", self.label(j), uuid)
135 j.update_pipeline_component(event["properties"]["new_attributes"])
136 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137 uuid = event["object_uuid"]
140 j = self.processes[uuid]
141 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142 with Perf(metrics, "done %s" % j.name):
143 j.done(event["properties"]["new_attributes"])
148 def label(self, obj):
149 return "[%s %s]" % (self.work_api[0:-1], obj.name)
151 def poll_states(self):
152 """Poll status of jobs or containers listed in the processes dict.
154 Runs in a separate thread.
159 self.stop_polling.wait(15)
160 if self.stop_polling.is_set():
163 keys = self.processes.keys()
167 if self.work_api == "containers":
168 table = self.poll_api.container_requests()
169 elif self.work_api == "jobs":
170 table = self.poll_api.jobs()
173 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174 except Exception as e:
175 logger.warn("Error checking states on API server: %s", e)
178 for p in proc_states["items"]:
180 "object_uuid": p["uuid"],
181 "event_type": "update",
187 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
189 self.processes.clear()
193 self.stop_polling.set()
195 def get_uploaded(self):
196 return self.uploaded.copy()
198 def add_uploaded(self, src, pair):
199 self.uploaded[src] = pair
201 def check_features(self, obj):
202 if isinstance(obj, dict):
203 if obj.get("class") == "InitialWorkDirRequirement":
204 if self.work_api == "containers":
205 raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206 if obj.get("writable"):
207 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208 if obj.get("class") == "CommandLineTool":
209 if self.work_api == "containers":
211 raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212 if obj.get("stderr"):
213 raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214 for v in obj.itervalues():
215 self.check_features(v)
216 elif isinstance(obj, list):
217 for i,v in enumerate(obj):
218 with SourceLine(obj, i, UnsupportedRequirement):
219 self.check_features(v)
221 def make_output_collection(self, name, tagsString, outputObj):
222 outputObj = copy.deepcopy(outputObj)
225 def capture(fileobj):
226 files.append(fileobj)
228 adjustDirObjs(outputObj, capture)
229 adjustFileObjs(outputObj, capture)
231 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
233 final = arvados.collection.Collection(api_client=self.api,
234 keep_client=self.keep_client,
235 num_retries=self.num_retries)
238 for k,v in generatemapper.items():
239 if k.startswith("_:"):
240 if v.type == "Directory":
242 if v.type == "CreateFile":
243 with final.open(v.target, "wb") as f:
244 f.write(v.resolved.encode("utf-8"))
247 if not k.startswith("keep:"):
248 raise Exception("Output source is not in keep or a literal")
250 srccollection = sp[0][5:]
251 if srccollection not in srccollections:
253 srccollections[srccollection] = arvados.collection.CollectionReader(
256 keep_client=self.keep_client,
257 num_retries=self.num_retries)
258 except arvados.errors.ArgumentError as e:
259 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
261 reader = srccollections[srccollection]
263 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
264 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
266 logger.warn("While preparing output collection: %s", e)
268 def rewrite(fileobj):
269 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
270 for k in ("basename", "listing", "contents"):
274 adjustDirObjs(outputObj, rewrite)
275 adjustFileObjs(outputObj, rewrite)
277 with final.open("cwl.output.json", "w") as f:
278 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
280 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
282 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
283 final.api_response()["name"],
284 final.manifest_locator())
286 final_uuid = final.manifest_locator()
287 tags = tagsString.split(',')
289 self.api.links().create(body={
290 "head_uuid": final_uuid, "link_class": "tag", "name": tag
291 }).execute(num_retries=self.num_retries)
293 def finalcollection(fileobj):
294 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
296 adjustDirObjs(outputObj, finalcollection)
297 adjustFileObjs(outputObj, finalcollection)
299 return (outputObj, final)
301 def set_crunch_output(self):
302 if self.work_api == "containers":
304 current = self.api.containers().current().execute(num_retries=self.num_retries)
305 except ApiError as e:
306 # Status code 404 just means we're not running in a container.
307 if e.resp.status != 404:
308 logger.info("Getting current container: %s", e)
311 self.api.containers().update(uuid=current['uuid'],
313 'output': self.final_output_collection.portable_data_hash(),
314 }).execute(num_retries=self.num_retries)
315 except Exception as e:
316 logger.info("Setting container output: %s", e)
317 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
318 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
320 'output': self.final_output_collection.portable_data_hash(),
321 'success': self.final_status == "success",
323 }).execute(num_retries=self.num_retries)
325 def arv_executor(self, tool, job_order, **kwargs):
326 self.debug = kwargs.get("debug")
328 tool.visit(self.check_features)
330 self.project_uuid = kwargs.get("project_uuid")
332 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
334 keep_client=self.keep_client)
335 self.fs_access = make_fs_access(kwargs["basedir"])
337 existing_uuid = kwargs.get("update_workflow")
338 if existing_uuid or kwargs.get("create_workflow"):
339 if self.work_api == "jobs":
340 tmpl = RunnerTemplate(self, tool, job_order,
341 kwargs.get("enable_reuse"),
343 submit_runner_ram=kwargs.get("submit_runner_ram"),
344 name=kwargs.get("name"))
346 # cwltool.main will write our return value to stdout.
347 return (tmpl.uuid, "success")
349 return (upload_workflow(self, tool, job_order,
352 submit_runner_ram=kwargs.get("submit_runner_ram"),
353 name=kwargs.get("name")), "success")
355 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
357 kwargs["make_fs_access"] = make_fs_access
358 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
359 kwargs["use_container"] = True
360 kwargs["tmpdir_prefix"] = "tmp"
361 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
363 if not kwargs["name"]:
366 if self.work_api == "containers":
367 kwargs["outdir"] = "/var/spool/cwl"
368 kwargs["docker_outdir"] = "/var/spool/cwl"
369 kwargs["tmpdir"] = "/tmp"
370 kwargs["docker_tmpdir"] = "/tmp"
371 elif self.work_api == "jobs":
372 kwargs["outdir"] = "$(task.outdir)"
373 kwargs["docker_outdir"] = "$(task.outdir)"
374 kwargs["tmpdir"] = "$(task.tmpdir)"
376 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
379 if kwargs.get("submit"):
380 if self.work_api == "containers":
381 if tool.tool["class"] == "CommandLineTool":
382 kwargs["runnerjob"] = tool.tool["id"]
383 runnerjob = tool.job(job_order,
384 self.output_callback,
387 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
388 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
389 name=kwargs.get("name"), on_error=kwargs.get("on_error"))
391 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
392 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
393 name=kwargs.get("name"), on_error=kwargs.get("on_error"))
395 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
396 # Create pipeline for local run
397 self.pipeline = self.api.pipeline_instances().create(
399 "owner_uuid": self.project_uuid,
400 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
402 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
403 logger.info("Pipeline instance %s", self.pipeline["uuid"])
405 if runnerjob and not kwargs.get("wait"):
406 runnerjob.run(wait=kwargs.get("wait"))
407 return (runnerjob.uuid, "success")
409 self.poll_api = arvados.api('v1')
410 self.polling_thread = threading.Thread(target=self.poll_states)
411 self.polling_thread.start()
414 jobiter = iter((runnerjob,))
416 if "cwl_runner_job" in kwargs:
417 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
418 jobiter = tool.job(job_order,
419 self.output_callback,
424 # Will continue to hold the lock for the duration of this code
425 # except when in cond.wait(), at which point on_message can update
426 # job state and process output callbacks.
428 loopperf = Perf(metrics, "jobiter")
430 for runnable in jobiter:
433 if self.stop_polling.is_set():
437 with Perf(metrics, "run"):
438 runnable.run(**kwargs)
443 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
448 while self.processes:
451 except UnsupportedRequirement:
454 if sys.exc_info()[0] is KeyboardInterrupt:
455 logger.error("Interrupted, marking pipeline as failed")
457 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
459 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
460 body={"state": "Failed"}).execute(num_retries=self.num_retries)
461 if runnerjob and runnerjob.uuid and self.work_api == "containers":
462 self.api.container_requests().update(uuid=runnerjob.uuid,
463 body={"priority": "0"}).execute(num_retries=self.num_retries)
466 self.stop_polling.set()
467 self.polling_thread.join()
469 if self.final_status == "UnsupportedRequirement":
470 raise UnsupportedRequirement("Check log for details.")
472 if self.final_output is None:
473 raise WorkflowException("Workflow did not return a result.")
475 if kwargs.get("submit") and isinstance(runnerjob, Runner):
476 logger.info("Final output collection %s", runnerjob.final_output)
478 if self.output_name is None:
479 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
480 if self.output_tags is None:
481 self.output_tags = ""
482 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
483 self.set_crunch_output()
485 if kwargs.get("compute_checksum"):
486 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
487 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
489 return (self.final_output, self.final_status)
493 """Print version string of key packages for provenance and debugging."""
495 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
496 arvpkg = pkg_resources.require("arvados-python-client")
497 cwlpkg = pkg_resources.require("cwltool")
499 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
500 "arvados-python-client", arvpkg[0].version,
501 "cwltool", cwlpkg[0].version)
504 def arg_parser(): # type: () -> argparse.ArgumentParser
505 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
507 parser.add_argument("--basedir", type=str,
508 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
509 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
510 help="Output directory, default current directory")
512 parser.add_argument("--eval-timeout",
513 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
516 parser.add_argument("--version", action="store_true", help="Print version and exit")
518 exgroup = parser.add_mutually_exclusive_group()
519 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
520 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
521 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
523 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
525 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
527 exgroup = parser.add_mutually_exclusive_group()
528 exgroup.add_argument("--enable-reuse", action="store_true",
529 default=True, dest="enable_reuse",
531 exgroup.add_argument("--disable-reuse", action="store_false",
532 default=True, dest="enable_reuse",
535 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
536 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
537 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
538 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
539 help="Ignore Docker image version when deciding whether to reuse past jobs.",
542 exgroup = parser.add_mutually_exclusive_group()
543 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
544 default=True, dest="submit")
545 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
546 default=True, dest="submit")
547 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
548 dest="create_workflow")
549 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
550 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
552 exgroup = parser.add_mutually_exclusive_group()
553 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
554 default=True, dest="wait")
555 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
556 default=True, dest="wait")
558 exgroup = parser.add_mutually_exclusive_group()
559 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
560 default=True, dest="log_timestamps")
561 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
562 default=True, dest="log_timestamps")
564 parser.add_argument("--api", type=str,
565 default=None, dest="work_api",
566 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
568 parser.add_argument("--compute-checksum", action="store_true", default=False,
569 help="Compute checksum of contents while collecting outputs",
570 dest="compute_checksum")
572 parser.add_argument("--submit-runner-ram", type=int,
573 help="RAM (in MiB) required for the workflow runner job (default 1024)",
576 parser.add_argument("--name", type=str,
577 help="Name to use for workflow execution instance.",
580 parser.add_argument("--on-error", type=str,
581 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
582 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
584 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
585 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
591 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
592 cache["http://arvados.org/cwl"] = res.read()
594 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
595 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
596 for n in extnames.names:
597 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
598 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
599 document_loader.idx["http://arvados.org/cwl#"+n] = {}
601 def main(args, stdout, stderr, api_client=None, keep_client=None):
602 parser = arg_parser()
604 job_order_object = None
605 arvargs = parser.parse_args(args)
608 print versionstring()
611 if arvargs.update_workflow:
612 if arvargs.update_workflow.find('-7fd4e-') == 5:
613 want_api = 'containers'
614 elif arvargs.update_workflow.find('-p5p6p-') == 5:
618 if want_api and arvargs.work_api and want_api != arvargs.work_api:
619 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
620 arvargs.update_workflow, want_api, arvargs.work_api))
622 arvargs.work_api = want_api
624 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
625 job_order_object = ({}, "")
630 if api_client is None:
631 api_client=arvados.api('v1', model=OrderedJsonModel())
632 if keep_client is None:
633 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
634 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
635 num_retries=4, output_name=arvargs.output_name,
636 output_tags=arvargs.output_tags)
637 except Exception as e:
642 logger.setLevel(logging.DEBUG)
645 logger.setLevel(logging.WARN)
646 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
649 metrics.setLevel(logging.DEBUG)
650 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
652 if arvargs.log_timestamps:
653 arvados.log_handler.setFormatter(logging.Formatter(
654 '%(asctime)s %(name)s %(levelname)s: %(message)s',
655 '%Y-%m-%d %H:%M:%S'))
657 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
659 arvargs.conformance_test = None
660 arvargs.use_container = True
661 arvargs.relax_path_checks = True
662 arvargs.validate = None
664 return cwltool.main.main(args=arvargs,
667 executor=runner.arv_executor,
668 makeTool=runner.arv_make_tool,
669 versionfunc=versionstring,
670 job_order_object=job_order_object,
671 make_fs_access=partial(CollectionFsAccess,
672 api_client=api_client,
673 keep_client=keep_client),
674 fetcher_constructor=partial(CollectionFetcher,
675 api_client=api_client,
676 keep_client=keep_client),
677 resolver=partial(collectionResolver, api_client),
678 logger_handler=arvados.log_handler)