3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
24 from arvados.errors import ApiError
26 from .arvcontainer import ArvadosContainer, RunnerContainer
27 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
28 from. runner import Runner, upload_instance
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess
32 from .perf import Perf
33 from .pathmapper import FinalOutputPathMapper
34 from ._version import __version__
36 from cwltool.pack import pack
37 from cwltool.process import shortname, UnsupportedRequirement, getListing
38 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
39 from cwltool.draft2tool import compute_checksums
40 from arvados.api import OrderedJsonModel
42 logger = logging.getLogger('arvados.cwl-runner')
43 metrics = logging.getLogger('arvados.cwl-runner.metrics')
44 logger.setLevel(logging.INFO)
47 class ArvCwlRunner(object):
48 """Execute a CWL tool or workflow, submit work (using either jobs or
49 containers API), wait for them to complete, and report output.
53 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
56 self.lock = threading.Lock()
57 self.cond = threading.Condition(self.lock)
58 self.final_output = None
59 self.final_status = None
63 self.stop_polling = threading.Event()
66 self.final_output_collection = None
67 self.output_name = output_name
68 self.output_tags = output_tags
69 self.project_uuid = None
71 if keep_client is not None:
72 self.keep_client = keep_client
74 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
77 expected_api = ["jobs", "containers"]
78 for api in expected_api:
80 methods = self.api._rootDesc.get('resources')[api]['methods']
81 if ('httpMethod' in methods['create'] and
82 (work_api == api or work_api is None)):
90 raise Exception("No supported APIs")
92 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
94 def arv_make_tool(self, toolpath_object, **kwargs):
95 kwargs["work_api"] = self.work_api
96 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
97 return ArvadosCommandTool(self, toolpath_object, **kwargs)
98 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
99 return ArvadosWorkflow(self, toolpath_object, **kwargs)
101 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
103 def output_callback(self, out, processStatus):
104 if processStatus == "success":
105 logger.info("Overall process status is %s", processStatus)
107 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
108 body={"state": "Complete"}).execute(num_retries=self.num_retries)
110 logger.warn("Overall process status is %s", processStatus)
112 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
113 body={"state": "Failed"}).execute(num_retries=self.num_retries)
114 self.final_status = processStatus
115 self.final_output = out
117 def on_message(self, event):
118 if "object_uuid" in event:
119 if event["object_uuid"] in self.processes and event["event_type"] == "update":
120 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
121 uuid = event["object_uuid"]
123 j = self.processes[uuid]
124 logger.info("Job %s (%s) is Running", j.name, uuid)
126 j.update_pipeline_component(event["properties"]["new_attributes"])
127 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
128 uuid = event["object_uuid"]
131 j = self.processes[uuid]
132 txt = self.work_api[0].upper() + self.work_api[1:-1]
133 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
134 with Perf(metrics, "done %s" % j.name):
135 j.done(event["properties"]["new_attributes"])
140 def poll_states(self):
141 """Poll status of jobs or containers listed in the processes dict.
143 Runs in a separate thread.
148 self.stop_polling.wait(15)
149 if self.stop_polling.is_set():
152 keys = self.processes.keys()
156 if self.work_api == "containers":
157 table = self.poll_api.container_requests()
158 elif self.work_api == "jobs":
159 table = self.poll_api.jobs()
162 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
163 except Exception as e:
164 logger.warn("Error checking states on API server: %s", e)
167 for p in proc_states["items"]:
169 "object_uuid": p["uuid"],
170 "event_type": "update",
176 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
178 self.processes.clear()
182 self.stop_polling.set()
184 def get_uploaded(self):
185 return self.uploaded.copy()
187 def add_uploaded(self, src, pair):
188 self.uploaded[src] = pair
190 def check_writable(self, obj):
191 if isinstance(obj, dict):
192 if obj.get("writable"):
193 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
194 for v in obj.itervalues():
195 self.check_writable(v)
196 if isinstance(obj, list):
198 self.check_writable(v)
200 def make_output_collection(self, name, tagsString, outputObj):
201 outputObj = copy.deepcopy(outputObj)
204 def capture(fileobj):
205 files.append(fileobj)
207 adjustDirObjs(outputObj, capture)
208 adjustFileObjs(outputObj, capture)
210 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
212 final = arvados.collection.Collection(api_client=self.api,
213 keep_client=self.keep_client,
214 num_retries=self.num_retries)
217 for k,v in generatemapper.items():
218 if k.startswith("_:"):
219 if v.type == "Directory":
221 if v.type == "CreateFile":
222 with final.open(v.target, "wb") as f:
223 f.write(v.resolved.encode("utf-8"))
226 if not k.startswith("keep:"):
227 raise Exception("Output source is not in keep or a literal")
229 srccollection = sp[0][5:]
230 if srccollection not in srccollections:
232 srccollections[srccollection] = arvados.collection.CollectionReader(
235 keep_client=self.keep_client,
236 num_retries=self.num_retries)
237 except arvados.errors.ArgumentError as e:
238 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
240 reader = srccollections[srccollection]
242 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
243 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
245 logger.warn("While preparing output collection: %s", e)
247 def rewrite(fileobj):
248 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
249 for k in ("basename", "listing", "contents"):
253 adjustDirObjs(outputObj, rewrite)
254 adjustFileObjs(outputObj, rewrite)
256 with final.open("cwl.output.json", "w") as f:
257 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
259 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
261 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
262 final.api_response()["name"],
263 final.manifest_locator())
265 final_uuid = final.manifest_locator()
266 tags = tagsString.split(',')
268 self.api.links().create(body={
269 "head_uuid": final_uuid, "link_class": "tag", "name": tag
270 }).execute(num_retries=self.num_retries)
272 def finalcollection(fileobj):
273 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
275 adjustDirObjs(outputObj, finalcollection)
276 adjustFileObjs(outputObj, finalcollection)
278 return (outputObj, final)
280 def set_crunch_output(self):
281 if self.work_api == "containers":
283 current = self.api.containers().current().execute(num_retries=self.num_retries)
284 except ApiError as e:
285 # Status code 404 just means we're not running in a container.
286 if e.resp.status != 404:
287 logger.info("Getting current container: %s", e)
290 self.api.containers().update(uuid=current['uuid'],
292 'output': self.final_output_collection.portable_data_hash(),
293 }).execute(num_retries=self.num_retries)
294 except Exception as e:
295 logger.info("Setting container output: %s", e)
296 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
297 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
299 'output': self.final_output_collection.portable_data_hash(),
300 'success': self.final_status == "success",
302 }).execute(num_retries=self.num_retries)
304 def arv_executor(self, tool, job_order, **kwargs):
305 self.debug = kwargs.get("debug")
307 tool.visit(self.check_writable)
309 self.project_uuid = kwargs.get("project_uuid")
311 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
313 keep_client=self.keep_client)
314 self.fs_access = make_fs_access(kwargs["basedir"])
316 existing_uuid = kwargs.get("update_workflow")
317 if existing_uuid or kwargs.get("create_workflow"):
318 if self.work_api == "jobs":
319 tmpl = RunnerTemplate(self, tool, job_order,
320 kwargs.get("enable_reuse"),
322 submit_runner_ram=kwargs.get("submit_runner_ram"),
323 name=kwargs.get("name"))
325 # cwltool.main will write our return value to stdout.
328 return upload_workflow(self, tool, job_order,
331 submit_runner_ram=kwargs.get("submit_runner_ram"),
332 name=kwargs.get("name"))
334 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
336 kwargs["make_fs_access"] = make_fs_access
337 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
338 kwargs["use_container"] = True
339 kwargs["tmpdir_prefix"] = "tmp"
340 kwargs["on_error"] = "continue"
341 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
343 if self.work_api == "containers":
344 kwargs["outdir"] = "/var/spool/cwl"
345 kwargs["docker_outdir"] = "/var/spool/cwl"
346 kwargs["tmpdir"] = "/tmp"
347 kwargs["docker_tmpdir"] = "/tmp"
348 elif self.work_api == "jobs":
349 kwargs["outdir"] = "$(task.outdir)"
350 kwargs["docker_outdir"] = "$(task.outdir)"
351 kwargs["tmpdir"] = "$(task.tmpdir)"
353 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
356 if kwargs.get("submit"):
357 if self.work_api == "containers":
358 if tool.tool["class"] == "CommandLineTool":
359 runnerjob = tool.job(job_order,
360 self.output_callback,
363 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
364 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
365 name=kwargs.get("name"))
367 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
368 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
369 name=kwargs.get("name"))
371 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
372 # Create pipeline for local run
373 self.pipeline = self.api.pipeline_instances().create(
375 "owner_uuid": self.project_uuid,
376 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
378 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
379 logger.info("Pipeline instance %s", self.pipeline["uuid"])
381 if runnerjob and not kwargs.get("wait"):
382 runnerjob.run(wait=kwargs.get("wait"))
383 return runnerjob.uuid
385 self.poll_api = arvados.api('v1')
386 self.polling_thread = threading.Thread(target=self.poll_states)
387 self.polling_thread.start()
390 jobiter = iter((runnerjob,))
392 if "cwl_runner_job" in kwargs:
393 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
394 jobiter = tool.job(job_order,
395 self.output_callback,
400 # Will continue to hold the lock for the duration of this code
401 # except when in cond.wait(), at which point on_message can update
402 # job state and process output callbacks.
404 loopperf = Perf(metrics, "jobiter")
406 for runnable in jobiter:
409 if self.stop_polling.is_set():
413 with Perf(metrics, "run"):
414 runnable.run(**kwargs)
419 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
424 while self.processes:
427 except UnsupportedRequirement:
430 if sys.exc_info()[0] is KeyboardInterrupt:
431 logger.error("Interrupted, marking pipeline as failed")
433 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
435 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
436 body={"state": "Failed"}).execute(num_retries=self.num_retries)
437 if runnerjob and runnerjob.uuid and self.work_api == "containers":
438 self.api.container_requests().update(uuid=runnerjob.uuid,
439 body={"priority": "0"}).execute(num_retries=self.num_retries)
442 self.stop_polling.set()
443 self.polling_thread.join()
445 if self.final_status == "UnsupportedRequirement":
446 raise UnsupportedRequirement("Check log for details.")
448 if self.final_output is None:
449 raise WorkflowException("Workflow did not return a result.")
451 if kwargs.get("submit") and isinstance(runnerjob, Runner):
452 logger.info("Final output collection %s", runnerjob.final_output)
454 if self.output_name is None:
455 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
456 if self.output_tags is None:
457 self.output_tags = ""
458 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
459 self.set_crunch_output()
461 if self.final_status != "success":
462 raise WorkflowException("Workflow failed.")
464 if kwargs.get("compute_checksum"):
465 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
466 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
468 return self.final_output
472 """Print version string of key packages for provenance and debugging."""
474 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
475 arvpkg = pkg_resources.require("arvados-python-client")
476 cwlpkg = pkg_resources.require("cwltool")
478 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
479 "arvados-python-client", arvpkg[0].version,
480 "cwltool", cwlpkg[0].version)
483 def arg_parser(): # type: () -> argparse.ArgumentParser
484 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
486 parser.add_argument("--basedir", type=str,
487 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
488 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
489 help="Output directory, default current directory")
491 parser.add_argument("--eval-timeout",
492 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
495 parser.add_argument("--version", action="store_true", help="Print version and exit")
497 exgroup = parser.add_mutually_exclusive_group()
498 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
499 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
500 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
502 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
504 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
506 exgroup = parser.add_mutually_exclusive_group()
507 exgroup.add_argument("--enable-reuse", action="store_true",
508 default=True, dest="enable_reuse",
510 exgroup.add_argument("--disable-reuse", action="store_false",
511 default=True, dest="enable_reuse",
514 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
515 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
516 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
517 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
518 help="Ignore Docker image version when deciding whether to reuse past jobs.",
521 exgroup = parser.add_mutually_exclusive_group()
522 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
523 default=True, dest="submit")
524 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
525 default=True, dest="submit")
526 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
527 dest="create_workflow")
528 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
529 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
531 exgroup = parser.add_mutually_exclusive_group()
532 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
533 default=True, dest="wait")
534 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
535 default=True, dest="wait")
537 parser.add_argument("--api", type=str,
538 default=None, dest="work_api",
539 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
541 parser.add_argument("--compute-checksum", action="store_true", default=False,
542 help="Compute checksum of contents while collecting outputs",
543 dest="compute_checksum")
545 parser.add_argument("--submit-runner-ram", type=int,
546 help="RAM (in MiB) required for the workflow runner job (default 1024)",
549 parser.add_argument("--name", type=str,
550 help="Name to use for workflow execution instance.",
553 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
554 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
560 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
561 cache["http://arvados.org/cwl"] = res.read()
563 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
564 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
565 for n in extnames.names:
566 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
567 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
568 document_loader.idx["http://arvados.org/cwl#"+n] = {}
570 def main(args, stdout, stderr, api_client=None, keep_client=None):
571 parser = arg_parser()
573 job_order_object = None
574 arvargs = parser.parse_args(args)
577 print versionstring()
580 if arvargs.update_workflow:
581 if arvargs.update_workflow.find('-7fd4e-') == 5:
582 want_api = 'containers'
583 elif arvargs.update_workflow.find('-p5p6p-') == 5:
587 if want_api and arvargs.work_api and want_api != arvargs.work_api:
588 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
589 arvargs.update_workflow, want_api, arvargs.work_api))
591 arvargs.work_api = want_api
593 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
594 job_order_object = ({}, "")
599 if api_client is None:
600 api_client=arvados.api('v1', model=OrderedJsonModel())
601 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
602 except Exception as e:
607 logger.setLevel(logging.DEBUG)
610 logger.setLevel(logging.WARN)
611 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
614 metrics.setLevel(logging.DEBUG)
615 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
617 arvargs.conformance_test = None
618 arvargs.use_container = True
619 arvargs.relax_path_checks = True
621 return cwltool.main.main(args=arvargs,
624 executor=runner.arv_executor,
625 makeTool=runner.arv_make_tool,
626 versionfunc=versionstring,
627 job_order_object=job_order_object,
628 make_fs_access=partial(CollectionFsAccess, api_client=api_client))