3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
24 from arvados.errors import ApiError
26 from .arvcontainer import ArvadosContainer, RunnerContainer
27 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
28 from. runner import Runner, upload_instance
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess
32 from .perf import Perf
33 from .pathmapper import FinalOutputPathMapper
34 from ._version import __version__
36 from cwltool.pack import pack
37 from cwltool.process import shortname, UnsupportedRequirement, getListing
38 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
39 from cwltool.draft2tool import compute_checksums
40 from arvados.api import OrderedJsonModel
42 logger = logging.getLogger('arvados.cwl-runner')
43 metrics = logging.getLogger('arvados.cwl-runner.metrics')
44 logger.setLevel(logging.INFO)
47 class ArvCwlRunner(object):
48 """Execute a CWL tool or workflow, submit work (using either jobs or
49 containers API), wait for them to complete, and report output.
53 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
56 self.lock = threading.Lock()
57 self.cond = threading.Condition(self.lock)
58 self.final_output = None
59 self.final_status = None
63 self.stop_polling = threading.Event()
66 self.final_output_collection = None
67 self.output_name = output_name
68 self.output_tags = output_tags
69 self.project_uuid = None
71 if keep_client is not None:
72 self.keep_client = keep_client
74 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
76 for api in ["jobs", "containers"]:
78 methods = self.api._rootDesc.get('resources')[api]['methods']
79 if ('httpMethod' in methods['create'] and
80 (work_api == api or work_api is None)):
87 raise Exception("No supported APIs")
89 raise Exception("Unsupported API '%s'" % work_api)
91 def arv_make_tool(self, toolpath_object, **kwargs):
92 kwargs["work_api"] = self.work_api
93 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
94 return ArvadosCommandTool(self, toolpath_object, **kwargs)
95 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
96 return ArvadosWorkflow(self, toolpath_object, **kwargs)
98 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
100 def output_callback(self, out, processStatus):
101 if processStatus == "success":
102 logger.info("Overall process status is %s", processStatus)
104 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
105 body={"state": "Complete"}).execute(num_retries=self.num_retries)
107 logger.warn("Overall process status is %s", processStatus)
109 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
110 body={"state": "Failed"}).execute(num_retries=self.num_retries)
111 self.final_status = processStatus
112 self.final_output = out
114 def on_message(self, event):
115 if "object_uuid" in event:
116 if event["object_uuid"] in self.processes and event["event_type"] == "update":
117 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
118 uuid = event["object_uuid"]
120 j = self.processes[uuid]
121 logger.info("Job %s (%s) is Running", j.name, uuid)
123 j.update_pipeline_component(event["properties"]["new_attributes"])
124 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
125 uuid = event["object_uuid"]
128 j = self.processes[uuid]
129 txt = self.work_api[0].upper() + self.work_api[1:-1]
130 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
131 with Perf(metrics, "done %s" % j.name):
132 j.done(event["properties"]["new_attributes"])
137 def poll_states(self):
138 """Poll status of jobs or containers listed in the processes dict.
140 Runs in a separate thread.
145 self.stop_polling.wait(15)
146 if self.stop_polling.is_set():
149 keys = self.processes.keys()
153 if self.work_api == "containers":
154 table = self.poll_api.container_requests()
155 elif self.work_api == "jobs":
156 table = self.poll_api.jobs()
159 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
160 except Exception as e:
161 logger.warn("Error checking states on API server: %s", e)
164 for p in proc_states["items"]:
166 "object_uuid": p["uuid"],
167 "event_type": "update",
173 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
175 self.processes.clear()
179 self.stop_polling.set()
181 def get_uploaded(self):
182 return self.uploaded.copy()
184 def add_uploaded(self, src, pair):
185 self.uploaded[src] = pair
187 def check_writable(self, obj):
188 if isinstance(obj, dict):
189 if obj.get("writable"):
190 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
191 for v in obj.itervalues():
192 self.check_writable(v)
193 if isinstance(obj, list):
195 self.check_writable(v)
197 def make_output_collection(self, name, tagsString, outputObj):
198 outputObj = copy.deepcopy(outputObj)
201 def capture(fileobj):
202 files.append(fileobj)
204 adjustDirObjs(outputObj, capture)
205 adjustFileObjs(outputObj, capture)
207 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
209 final = arvados.collection.Collection(api_client=self.api,
210 keep_client=self.keep_client,
211 num_retries=self.num_retries)
214 for k,v in generatemapper.items():
215 if k.startswith("_:"):
216 if v.type == "Directory":
218 if v.type == "CreateFile":
219 with final.open(v.target, "wb") as f:
220 f.write(v.resolved.encode("utf-8"))
223 if not k.startswith("keep:"):
224 raise Exception("Output source is not in keep or a literal")
226 srccollection = sp[0][5:]
227 if srccollection not in srccollections:
229 srccollections[srccollection] = arvados.collection.CollectionReader(
232 keep_client=self.keep_client,
233 num_retries=self.num_retries)
234 except arvados.errors.ArgumentError as e:
235 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
237 reader = srccollections[srccollection]
239 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
240 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
242 logger.warn("While preparing output collection: %s", e)
244 def rewrite(fileobj):
245 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
246 for k in ("basename", "listing", "contents"):
250 adjustDirObjs(outputObj, rewrite)
251 adjustFileObjs(outputObj, rewrite)
253 with final.open("cwl.output.json", "w") as f:
254 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
256 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
258 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
259 final.api_response()["name"],
260 final.manifest_locator())
262 final_uuid = final.manifest_locator()
263 tags = tagsString.split(',')
265 self.api.links().create(body={
266 "head_uuid": final_uuid, "link_class": "tag", "name": tag
267 }).execute(num_retries=self.num_retries)
269 def finalcollection(fileobj):
270 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
272 adjustDirObjs(outputObj, finalcollection)
273 adjustFileObjs(outputObj, finalcollection)
275 return (outputObj, final)
277 def set_crunch_output(self):
278 if self.work_api == "containers":
280 current = self.api.containers().current().execute(num_retries=self.num_retries)
281 except ApiError as e:
282 # Status code 404 just means we're not running in a container.
283 if e.resp.status != 404:
284 logger.info("Getting current container: %s", e)
287 self.api.containers().update(uuid=current['uuid'],
289 'output': self.final_output_collection.portable_data_hash(),
290 }).execute(num_retries=self.num_retries)
291 except Exception as e:
292 logger.info("Setting container output: %s", e)
293 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
294 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
296 'output': self.final_output_collection.portable_data_hash(),
297 'success': self.final_status == "success",
299 }).execute(num_retries=self.num_retries)
301 def arv_executor(self, tool, job_order, **kwargs):
302 self.debug = kwargs.get("debug")
304 tool.visit(self.check_writable)
306 self.project_uuid = kwargs.get("project_uuid")
308 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
310 keep_client=self.keep_client)
311 self.fs_access = make_fs_access(kwargs["basedir"])
313 existing_uuid = kwargs.get("update_workflow")
314 if existing_uuid or kwargs.get("create_workflow"):
315 if self.work_api == "jobs":
316 tmpl = RunnerTemplate(self, tool, job_order,
317 kwargs.get("enable_reuse"),
320 # cwltool.main will write our return value to stdout.
323 return upload_workflow(self, tool, job_order,
327 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
329 kwargs["make_fs_access"] = make_fs_access
330 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
331 kwargs["use_container"] = True
332 kwargs["tmpdir_prefix"] = "tmp"
333 kwargs["on_error"] = "continue"
334 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
336 if self.work_api == "containers":
337 kwargs["outdir"] = "/var/spool/cwl"
338 kwargs["docker_outdir"] = "/var/spool/cwl"
339 kwargs["tmpdir"] = "/tmp"
340 kwargs["docker_tmpdir"] = "/tmp"
341 elif self.work_api == "jobs":
342 kwargs["outdir"] = "$(task.outdir)"
343 kwargs["docker_outdir"] = "$(task.outdir)"
344 kwargs["tmpdir"] = "$(task.tmpdir)"
346 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
349 if kwargs.get("submit"):
350 if self.work_api == "containers":
351 if tool.tool["class"] == "CommandLineTool":
352 runnerjob = tool.job(job_order,
353 self.output_callback,
356 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
358 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
360 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
361 # Create pipeline for local run
362 self.pipeline = self.api.pipeline_instances().create(
364 "owner_uuid": self.project_uuid,
365 "name": shortname(tool.tool["id"]),
367 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
368 logger.info("Pipeline instance %s", self.pipeline["uuid"])
370 if runnerjob and not kwargs.get("wait"):
371 runnerjob.run(wait=kwargs.get("wait"))
372 return runnerjob.uuid
374 self.poll_api = arvados.api('v1')
375 self.polling_thread = threading.Thread(target=self.poll_states)
376 self.polling_thread.start()
379 jobiter = iter((runnerjob,))
381 if "cwl_runner_job" in kwargs:
382 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
383 jobiter = tool.job(job_order,
384 self.output_callback,
389 # Will continue to hold the lock for the duration of this code
390 # except when in cond.wait(), at which point on_message can update
391 # job state and process output callbacks.
393 loopperf = Perf(metrics, "jobiter")
395 for runnable in jobiter:
398 if self.stop_polling.is_set():
402 with Perf(metrics, "run"):
403 runnable.run(**kwargs)
408 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
413 while self.processes:
416 except UnsupportedRequirement:
419 if sys.exc_info()[0] is KeyboardInterrupt:
420 logger.error("Interrupted, marking pipeline as failed")
422 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
424 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
425 body={"state": "Failed"}).execute(num_retries=self.num_retries)
426 if runnerjob and runnerjob.uuid and self.work_api == "containers":
427 self.api.container_requests().update(uuid=runnerjob.uuid,
428 body={"priority": "0"}).execute(num_retries=self.num_retries)
431 self.stop_polling.set()
432 self.polling_thread.join()
434 if self.final_status == "UnsupportedRequirement":
435 raise UnsupportedRequirement("Check log for details.")
437 if self.final_output is None:
438 raise WorkflowException("Workflow did not return a result.")
440 if kwargs.get("submit") and isinstance(runnerjob, Runner):
441 logger.info("Final output collection %s", runnerjob.final_output)
443 if self.output_name is None:
444 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
445 if self.output_tags is None:
446 self.output_tags = ""
447 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
448 self.set_crunch_output()
450 if self.final_status != "success":
451 raise WorkflowException("Workflow failed.")
453 if kwargs.get("compute_checksum"):
454 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
455 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
457 return self.final_output
461 """Print version string of key packages for provenance and debugging."""
463 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
464 arvpkg = pkg_resources.require("arvados-python-client")
465 cwlpkg = pkg_resources.require("cwltool")
467 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
468 "arvados-python-client", arvpkg[0].version,
469 "cwltool", cwlpkg[0].version)
472 def arg_parser(): # type: () -> argparse.ArgumentParser
473 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
475 parser.add_argument("--basedir", type=str,
476 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
477 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
478 help="Output directory, default current directory")
480 parser.add_argument("--eval-timeout",
481 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
484 parser.add_argument("--version", action="store_true", help="Print version and exit")
486 exgroup = parser.add_mutually_exclusive_group()
487 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
488 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
489 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
491 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
493 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
495 exgroup = parser.add_mutually_exclusive_group()
496 exgroup.add_argument("--enable-reuse", action="store_true",
497 default=True, dest="enable_reuse",
499 exgroup.add_argument("--disable-reuse", action="store_false",
500 default=True, dest="enable_reuse",
503 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
504 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
505 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
506 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
507 help="Ignore Docker image version when deciding whether to reuse past jobs.",
510 exgroup = parser.add_mutually_exclusive_group()
511 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
512 default=True, dest="submit")
513 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
514 default=True, dest="submit")
515 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
516 dest="create_workflow")
517 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
518 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
520 exgroup = parser.add_mutually_exclusive_group()
521 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
522 default=True, dest="wait")
523 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
524 default=True, dest="wait")
526 parser.add_argument("--api", type=str,
527 default=None, dest="work_api",
528 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
530 parser.add_argument("--compute-checksum", action="store_true", default=False,
531 help="Compute checksum of contents while collecting outputs",
532 dest="compute_checksum")
534 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
535 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
541 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
542 cache["http://arvados.org/cwl"] = res.read()
544 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
545 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
546 for n in extnames.names:
547 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
548 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
549 document_loader.idx["http://arvados.org/cwl#"+n] = {}
551 def main(args, stdout, stderr, api_client=None, keep_client=None):
552 parser = arg_parser()
554 job_order_object = None
555 arvargs = parser.parse_args(args)
557 if arvargs.update_workflow:
558 if arvargs.update_workflow.find('-7fd4e-') == 5:
559 want_api = 'containers'
560 elif arvargs.update_workflow.find('-p5p6p-') == 5:
564 if want_api and arvargs.work_api and want_api != arvargs.work_api:
565 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
566 arvargs.update_workflow, want_api, arvargs.work_api))
568 arvargs.work_api = want_api
570 if arvargs.work_api not in ("jobs", "containers"):
571 logger.error("Unknown --api '%s' expected one of 'jobs' or 'containers'", arvargs.work_api)
574 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
575 job_order_object = ({}, "")
580 if api_client is None:
581 api_client=arvados.api('v1', model=OrderedJsonModel())
582 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
583 except Exception as e:
588 logger.setLevel(logging.DEBUG)
591 logger.setLevel(logging.WARN)
592 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
595 metrics.setLevel(logging.DEBUG)
596 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
598 arvargs.conformance_test = None
599 arvargs.use_container = True
600 arvargs.relax_path_checks = True
602 return cwltool.main.main(args=arvargs,
605 executor=runner.arv_executor,
606 makeTool=runner.arv_make_tool,
607 versionfunc=versionstring,
608 job_order_object=job_order_object,
609 make_fs_access=partial(CollectionFsAccess, api_client=api_client))