3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
24 from arvados.errors import ApiError
26 from .arvcontainer import ArvadosContainer, RunnerContainer
27 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
28 from. runner import Runner, upload_instance
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess
32 from .perf import Perf
33 from .pathmapper import FinalOutputPathMapper
34 from ._version import __version__
36 from cwltool.pack import pack
37 from cwltool.process import shortname, UnsupportedRequirement, getListing
38 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
39 from cwltool.draft2tool import compute_checksums
40 from arvados.api import OrderedJsonModel
42 logger = logging.getLogger('arvados.cwl-runner')
43 metrics = logging.getLogger('arvados.cwl-runner.metrics')
44 logger.setLevel(logging.INFO)
47 class ArvCwlRunner(object):
48 """Execute a CWL tool or workflow, submit work (using either jobs or
49 containers API), wait for them to complete, and report output.
53 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
56 self.lock = threading.Lock()
57 self.cond = threading.Condition(self.lock)
58 self.final_output = None
59 self.final_status = None
63 self.stop_polling = threading.Event()
66 self.final_output_collection = None
67 self.output_name = output_name
68 self.output_tags = output_tags
69 self.project_uuid = None
71 if keep_client is not None:
72 self.keep_client = keep_client
74 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
77 expected_api = ["jobs", "containers"]
78 for api in expected_api:
80 methods = self.api._rootDesc.get('resources')[api]['methods']
81 if ('httpMethod' in methods['create'] and
82 (work_api == api or work_api is None)):
90 raise Exception("No supported APIs")
92 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
94 def arv_make_tool(self, toolpath_object, **kwargs):
95 kwargs["work_api"] = self.work_api
96 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
97 return ArvadosCommandTool(self, toolpath_object, **kwargs)
98 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
99 return ArvadosWorkflow(self, toolpath_object, **kwargs)
101 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
103 def output_callback(self, out, processStatus):
104 if processStatus == "success":
105 logger.info("Overall process status is %s", processStatus)
107 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
108 body={"state": "Complete"}).execute(num_retries=self.num_retries)
110 logger.warn("Overall process status is %s", processStatus)
112 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
113 body={"state": "Failed"}).execute(num_retries=self.num_retries)
114 self.final_status = processStatus
115 self.final_output = out
117 def on_message(self, event):
118 if "object_uuid" in event:
119 if event["object_uuid"] in self.processes and event["event_type"] == "update":
120 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
121 uuid = event["object_uuid"]
123 j = self.processes[uuid]
124 logger.info("Job %s (%s) is Running", j.name, uuid)
126 j.update_pipeline_component(event["properties"]["new_attributes"])
127 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
128 uuid = event["object_uuid"]
131 j = self.processes[uuid]
132 txt = self.work_api[0].upper() + self.work_api[1:-1]
133 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
134 with Perf(metrics, "done %s" % j.name):
135 j.done(event["properties"]["new_attributes"])
140 def poll_states(self):
141 """Poll status of jobs or containers listed in the processes dict.
143 Runs in a separate thread.
148 self.stop_polling.wait(15)
149 if self.stop_polling.is_set():
152 keys = self.processes.keys()
156 if self.work_api == "containers":
157 table = self.poll_api.container_requests()
158 elif self.work_api == "jobs":
159 table = self.poll_api.jobs()
162 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
163 except Exception as e:
164 logger.warn("Error checking states on API server: %s", e)
167 for p in proc_states["items"]:
169 "object_uuid": p["uuid"],
170 "event_type": "update",
176 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
178 self.processes.clear()
182 self.stop_polling.set()
184 def get_uploaded(self):
185 return self.uploaded.copy()
187 def add_uploaded(self, src, pair):
188 self.uploaded[src] = pair
190 def check_writable(self, obj):
191 if isinstance(obj, dict):
192 if obj.get("writable"):
193 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
194 for v in obj.itervalues():
195 self.check_writable(v)
196 if isinstance(obj, list):
198 self.check_writable(v)
200 def make_output_collection(self, name, tagsString, outputObj):
201 outputObj = copy.deepcopy(outputObj)
204 def capture(fileobj):
205 files.append(fileobj)
207 adjustDirObjs(outputObj, capture)
208 adjustFileObjs(outputObj, capture)
210 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
212 final = arvados.collection.Collection(api_client=self.api,
213 keep_client=self.keep_client,
214 num_retries=self.num_retries)
217 for k,v in generatemapper.items():
218 if k.startswith("_:"):
219 if v.type == "Directory":
221 if v.type == "CreateFile":
222 with final.open(v.target, "wb") as f:
223 f.write(v.resolved.encode("utf-8"))
226 if not k.startswith("keep:"):
227 raise Exception("Output source is not in keep or a literal")
229 srccollection = sp[0][5:]
230 if srccollection not in srccollections:
232 srccollections[srccollection] = arvados.collection.CollectionReader(
235 keep_client=self.keep_client,
236 num_retries=self.num_retries)
237 except arvados.errors.ArgumentError as e:
238 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
240 reader = srccollections[srccollection]
242 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
243 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
245 logger.warn("While preparing output collection: %s", e)
247 def rewrite(fileobj):
248 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
249 for k in ("basename", "listing", "contents"):
253 adjustDirObjs(outputObj, rewrite)
254 adjustFileObjs(outputObj, rewrite)
256 with final.open("cwl.output.json", "w") as f:
257 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
259 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
261 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
262 final.api_response()["name"],
263 final.manifest_locator())
265 final_uuid = final.manifest_locator()
266 tags = tagsString.split(',')
268 self.api.links().create(body={
269 "head_uuid": final_uuid, "link_class": "tag", "name": tag
270 }).execute(num_retries=self.num_retries)
272 def finalcollection(fileobj):
273 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
275 adjustDirObjs(outputObj, finalcollection)
276 adjustFileObjs(outputObj, finalcollection)
278 return (outputObj, final)
280 def set_crunch_output(self):
281 if self.work_api == "containers":
283 current = self.api.containers().current().execute(num_retries=self.num_retries)
284 except ApiError as e:
285 # Status code 404 just means we're not running in a container.
286 if e.resp.status != 404:
287 logger.info("Getting current container: %s", e)
290 self.api.containers().update(uuid=current['uuid'],
292 'output': self.final_output_collection.portable_data_hash(),
293 }).execute(num_retries=self.num_retries)
294 except Exception as e:
295 logger.info("Setting container output: %s", e)
296 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
297 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
299 'output': self.final_output_collection.portable_data_hash(),
300 'success': self.final_status == "success",
302 }).execute(num_retries=self.num_retries)
304 def arv_executor(self, tool, job_order, **kwargs):
305 self.debug = kwargs.get("debug")
307 tool.visit(self.check_writable)
309 self.project_uuid = kwargs.get("project_uuid")
311 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
313 keep_client=self.keep_client)
314 self.fs_access = make_fs_access(kwargs["basedir"])
316 existing_uuid = kwargs.get("update_workflow")
317 if existing_uuid or kwargs.get("create_workflow"):
318 if self.work_api == "jobs":
319 tmpl = RunnerTemplate(self, tool, job_order,
320 kwargs.get("enable_reuse"),
322 submit_runner_ram=kwargs.get("submit_runner_ram"))
324 # cwltool.main will write our return value to stdout.
327 return upload_workflow(self, tool, job_order,
330 submit_runner_ram=kwargs.get("submit_runner_ram"))
332 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
334 kwargs["make_fs_access"] = make_fs_access
335 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
336 kwargs["use_container"] = True
337 kwargs["tmpdir_prefix"] = "tmp"
338 kwargs["on_error"] = "continue"
339 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
341 if self.work_api == "containers":
342 kwargs["outdir"] = "/var/spool/cwl"
343 kwargs["docker_outdir"] = "/var/spool/cwl"
344 kwargs["tmpdir"] = "/tmp"
345 kwargs["docker_tmpdir"] = "/tmp"
346 elif self.work_api == "jobs":
347 kwargs["outdir"] = "$(task.outdir)"
348 kwargs["docker_outdir"] = "$(task.outdir)"
349 kwargs["tmpdir"] = "$(task.tmpdir)"
351 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
354 if kwargs.get("submit"):
355 if self.work_api == "containers":
356 if tool.tool["class"] == "CommandLineTool":
357 runnerjob = tool.job(job_order,
358 self.output_callback,
361 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
362 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
363 name=kwargs.get("name"))
365 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
366 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
367 name=kwargs.get("name"))
369 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
370 # Create pipeline for local run
371 self.pipeline = self.api.pipeline_instances().create(
373 "owner_uuid": self.project_uuid,
374 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
376 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
377 logger.info("Pipeline instance %s", self.pipeline["uuid"])
379 if runnerjob and not kwargs.get("wait"):
380 runnerjob.run(wait=kwargs.get("wait"))
381 return runnerjob.uuid
383 self.poll_api = arvados.api('v1')
384 self.polling_thread = threading.Thread(target=self.poll_states)
385 self.polling_thread.start()
388 jobiter = iter((runnerjob,))
390 if "cwl_runner_job" in kwargs:
391 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
392 jobiter = tool.job(job_order,
393 self.output_callback,
398 # Will continue to hold the lock for the duration of this code
399 # except when in cond.wait(), at which point on_message can update
400 # job state and process output callbacks.
402 loopperf = Perf(metrics, "jobiter")
404 for runnable in jobiter:
407 if self.stop_polling.is_set():
411 with Perf(metrics, "run"):
412 runnable.run(**kwargs)
417 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
422 while self.processes:
425 except UnsupportedRequirement:
428 if sys.exc_info()[0] is KeyboardInterrupt:
429 logger.error("Interrupted, marking pipeline as failed")
431 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
433 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
434 body={"state": "Failed"}).execute(num_retries=self.num_retries)
435 if runnerjob and runnerjob.uuid and self.work_api == "containers":
436 self.api.container_requests().update(uuid=runnerjob.uuid,
437 body={"priority": "0"}).execute(num_retries=self.num_retries)
440 self.stop_polling.set()
441 self.polling_thread.join()
443 if self.final_status == "UnsupportedRequirement":
444 raise UnsupportedRequirement("Check log for details.")
446 if self.final_output is None:
447 raise WorkflowException("Workflow did not return a result.")
449 if kwargs.get("submit") and isinstance(runnerjob, Runner):
450 logger.info("Final output collection %s", runnerjob.final_output)
452 if self.output_name is None:
453 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
454 if self.output_tags is None:
455 self.output_tags = ""
456 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
457 self.set_crunch_output()
459 if self.final_status != "success":
460 raise WorkflowException("Workflow failed.")
462 if kwargs.get("compute_checksum"):
463 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
464 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
466 return self.final_output
470 """Print version string of key packages for provenance and debugging."""
472 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
473 arvpkg = pkg_resources.require("arvados-python-client")
474 cwlpkg = pkg_resources.require("cwltool")
476 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
477 "arvados-python-client", arvpkg[0].version,
478 "cwltool", cwlpkg[0].version)
481 def arg_parser(): # type: () -> argparse.ArgumentParser
482 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
484 parser.add_argument("--basedir", type=str,
485 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
486 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
487 help="Output directory, default current directory")
489 parser.add_argument("--eval-timeout",
490 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
493 parser.add_argument("--version", action="store_true", help="Print version and exit")
495 exgroup = parser.add_mutually_exclusive_group()
496 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
497 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
498 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
500 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
502 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
504 exgroup = parser.add_mutually_exclusive_group()
505 exgroup.add_argument("--enable-reuse", action="store_true",
506 default=True, dest="enable_reuse",
508 exgroup.add_argument("--disable-reuse", action="store_false",
509 default=True, dest="enable_reuse",
512 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
513 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
514 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
515 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
516 help="Ignore Docker image version when deciding whether to reuse past jobs.",
519 exgroup = parser.add_mutually_exclusive_group()
520 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
521 default=True, dest="submit")
522 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
523 default=True, dest="submit")
524 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
525 dest="create_workflow")
526 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
527 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
529 exgroup = parser.add_mutually_exclusive_group()
530 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
531 default=True, dest="wait")
532 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
533 default=True, dest="wait")
535 parser.add_argument("--api", type=str,
536 default=None, dest="work_api",
537 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
539 parser.add_argument("--compute-checksum", action="store_true", default=False,
540 help="Compute checksum of contents while collecting outputs",
541 dest="compute_checksum")
543 parser.add_argument("--submit-runner-ram", type=int,
544 help="RAM (in MiB) required for the workflow runner job (default 1024)",
547 parser.add_argument("--name", type=str,
548 help="Name to use for workflow execution instance.",
551 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
552 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
558 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
559 cache["http://arvados.org/cwl"] = res.read()
561 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
562 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
563 for n in extnames.names:
564 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
565 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
566 document_loader.idx["http://arvados.org/cwl#"+n] = {}
568 def main(args, stdout, stderr, api_client=None, keep_client=None):
569 parser = arg_parser()
571 job_order_object = None
572 arvargs = parser.parse_args(args)
575 print versionstring()
578 if arvargs.update_workflow:
579 if arvargs.update_workflow.find('-7fd4e-') == 5:
580 want_api = 'containers'
581 elif arvargs.update_workflow.find('-p5p6p-') == 5:
585 if want_api and arvargs.work_api and want_api != arvargs.work_api:
586 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
587 arvargs.update_workflow, want_api, arvargs.work_api))
589 arvargs.work_api = want_api
591 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
592 job_order_object = ({}, "")
597 if api_client is None:
598 api_client=arvados.api('v1', model=OrderedJsonModel())
599 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
600 except Exception as e:
605 logger.setLevel(logging.DEBUG)
608 logger.setLevel(logging.WARN)
609 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
612 metrics.setLevel(logging.DEBUG)
613 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
615 arvargs.conformance_test = None
616 arvargs.use_container = True
617 arvargs.relax_path_checks = True
619 return cwltool.main.main(args=arvargs,
622 executor=runner.arv_executor,
623 makeTool=runner.arv_make_tool,
624 versionfunc=versionstring,
625 job_order_object=job_order_object,
626 make_fs_access=partial(CollectionFsAccess, api_client=api_client))