3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 import cwltool.process
22 from schema_salad.sourceline import SourceLine
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_instance
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
50 class ArvCwlRunner(object):
51 """Execute a CWL tool or workflow, submit work (using either jobs or
52 containers API), wait for them to complete, and report output.
56 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
59 self.lock = threading.Lock()
60 self.cond = threading.Condition(self.lock)
61 self.final_output = None
62 self.final_status = None
64 self.num_retries = num_retries
66 self.stop_polling = threading.Event()
69 self.final_output_collection = None
70 self.output_name = output_name
71 self.output_tags = output_tags
72 self.project_uuid = None
74 if keep_client is not None:
75 self.keep_client = keep_client
77 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
80 expected_api = ["jobs", "containers"]
81 for api in expected_api:
83 methods = self.api._rootDesc.get('resources')[api]['methods']
84 if ('httpMethod' in methods['create'] and
85 (work_api == api or work_api is None)):
93 raise Exception("No supported APIs")
95 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
97 def arv_make_tool(self, toolpath_object, **kwargs):
98 kwargs["work_api"] = self.work_api
99 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
101 keep_client=self.keep_client)
102 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
103 return ArvadosCommandTool(self, toolpath_object, **kwargs)
104 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
105 return ArvadosWorkflow(self, toolpath_object, **kwargs)
107 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
109 def output_callback(self, out, processStatus):
110 if processStatus == "success":
111 logger.info("Overall process status is %s", processStatus)
113 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
114 body={"state": "Complete"}).execute(num_retries=self.num_retries)
116 logger.warn("Overall process status is %s", processStatus)
118 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
119 body={"state": "Failed"}).execute(num_retries=self.num_retries)
120 self.final_status = processStatus
121 self.final_output = out
123 def on_message(self, event):
124 if "object_uuid" in event:
125 if event["object_uuid"] in self.processes and event["event_type"] == "update":
126 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
127 uuid = event["object_uuid"]
129 j = self.processes[uuid]
130 logger.info("Job %s (%s) is Running", j.name, uuid)
132 j.update_pipeline_component(event["properties"]["new_attributes"])
133 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
134 uuid = event["object_uuid"]
137 j = self.processes[uuid]
138 txt = self.work_api[0].upper() + self.work_api[1:-1]
139 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
140 with Perf(metrics, "done %s" % j.name):
141 j.done(event["properties"]["new_attributes"])
146 def poll_states(self):
147 """Poll status of jobs or containers listed in the processes dict.
149 Runs in a separate thread.
154 self.stop_polling.wait(15)
155 if self.stop_polling.is_set():
158 keys = self.processes.keys()
162 if self.work_api == "containers":
163 table = self.poll_api.container_requests()
164 elif self.work_api == "jobs":
165 table = self.poll_api.jobs()
168 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
169 except Exception as e:
170 logger.warn("Error checking states on API server: %s", e)
173 for p in proc_states["items"]:
175 "object_uuid": p["uuid"],
176 "event_type": "update",
182 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
184 self.processes.clear()
188 self.stop_polling.set()
190 def get_uploaded(self):
191 return self.uploaded.copy()
193 def add_uploaded(self, src, pair):
194 self.uploaded[src] = pair
196 def check_features(self, obj):
197 if isinstance(obj, dict):
198 if obj.get("class") == "InitialWorkDirRequirement":
199 if self.work_api == "containers":
200 raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
201 if obj.get("writable"):
202 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
203 if obj.get("class") == "CommandLineTool":
204 if self.work_api == "containers":
206 raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
207 if obj.get("stderr"):
208 raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
209 for v in obj.itervalues():
210 self.check_features(v)
211 elif isinstance(obj, list):
212 for i,v in enumerate(obj):
213 with SourceLine(obj, i, UnsupportedRequirement):
214 self.check_features(v)
216 def make_output_collection(self, name, tagsString, outputObj):
217 outputObj = copy.deepcopy(outputObj)
220 def capture(fileobj):
221 files.append(fileobj)
223 adjustDirObjs(outputObj, capture)
224 adjustFileObjs(outputObj, capture)
226 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
228 final = arvados.collection.Collection(api_client=self.api,
229 keep_client=self.keep_client,
230 num_retries=self.num_retries)
233 for k,v in generatemapper.items():
234 if k.startswith("_:"):
235 if v.type == "Directory":
237 if v.type == "CreateFile":
238 with final.open(v.target, "wb") as f:
239 f.write(v.resolved.encode("utf-8"))
242 if not k.startswith("keep:"):
243 raise Exception("Output source is not in keep or a literal")
245 srccollection = sp[0][5:]
246 if srccollection not in srccollections:
248 srccollections[srccollection] = arvados.collection.CollectionReader(
251 keep_client=self.keep_client,
252 num_retries=self.num_retries)
253 except arvados.errors.ArgumentError as e:
254 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
256 reader = srccollections[srccollection]
258 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
259 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
261 logger.warn("While preparing output collection: %s", e)
263 def rewrite(fileobj):
264 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
265 for k in ("basename", "listing", "contents"):
269 adjustDirObjs(outputObj, rewrite)
270 adjustFileObjs(outputObj, rewrite)
272 with final.open("cwl.output.json", "w") as f:
273 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
275 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
277 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
278 final.api_response()["name"],
279 final.manifest_locator())
281 final_uuid = final.manifest_locator()
282 tags = tagsString.split(',')
284 self.api.links().create(body={
285 "head_uuid": final_uuid, "link_class": "tag", "name": tag
286 }).execute(num_retries=self.num_retries)
288 def finalcollection(fileobj):
289 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
291 adjustDirObjs(outputObj, finalcollection)
292 adjustFileObjs(outputObj, finalcollection)
294 return (outputObj, final)
296 def set_crunch_output(self):
297 if self.work_api == "containers":
299 current = self.api.containers().current().execute(num_retries=self.num_retries)
300 except ApiError as e:
301 # Status code 404 just means we're not running in a container.
302 if e.resp.status != 404:
303 logger.info("Getting current container: %s", e)
306 self.api.containers().update(uuid=current['uuid'],
308 'output': self.final_output_collection.portable_data_hash(),
309 }).execute(num_retries=self.num_retries)
310 except Exception as e:
311 logger.info("Setting container output: %s", e)
312 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
313 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
315 'output': self.final_output_collection.portable_data_hash(),
316 'success': self.final_status == "success",
318 }).execute(num_retries=self.num_retries)
320 def arv_executor(self, tool, job_order, **kwargs):
321 self.debug = kwargs.get("debug")
323 tool.visit(self.check_features)
325 self.project_uuid = kwargs.get("project_uuid")
327 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
329 keep_client=self.keep_client)
330 self.fs_access = make_fs_access(kwargs["basedir"])
332 existing_uuid = kwargs.get("update_workflow")
333 if existing_uuid or kwargs.get("create_workflow"):
334 if self.work_api == "jobs":
335 tmpl = RunnerTemplate(self, tool, job_order,
336 kwargs.get("enable_reuse"),
338 submit_runner_ram=kwargs.get("submit_runner_ram"),
339 name=kwargs.get("name"))
341 # cwltool.main will write our return value to stdout.
344 return upload_workflow(self, tool, job_order,
347 submit_runner_ram=kwargs.get("submit_runner_ram"),
348 name=kwargs.get("name"))
350 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
352 kwargs["make_fs_access"] = make_fs_access
353 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
354 kwargs["use_container"] = True
355 kwargs["tmpdir_prefix"] = "tmp"
356 kwargs["on_error"] = "continue"
357 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
359 if not kwargs["name"]:
362 if self.work_api == "containers":
363 kwargs["outdir"] = "/var/spool/cwl"
364 kwargs["docker_outdir"] = "/var/spool/cwl"
365 kwargs["tmpdir"] = "/tmp"
366 kwargs["docker_tmpdir"] = "/tmp"
367 elif self.work_api == "jobs":
368 kwargs["outdir"] = "$(task.outdir)"
369 kwargs["docker_outdir"] = "$(task.outdir)"
370 kwargs["tmpdir"] = "$(task.tmpdir)"
372 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
375 if kwargs.get("submit"):
376 if self.work_api == "containers":
377 if tool.tool["class"] == "CommandLineTool":
378 kwargs["runnerjob"] = tool.tool["id"]
379 runnerjob = tool.job(job_order,
380 self.output_callback,
383 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
384 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
385 name=kwargs.get("name"))
387 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
388 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
389 name=kwargs.get("name"))
391 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
392 # Create pipeline for local run
393 self.pipeline = self.api.pipeline_instances().create(
395 "owner_uuid": self.project_uuid,
396 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
398 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
399 logger.info("Pipeline instance %s", self.pipeline["uuid"])
401 if runnerjob and not kwargs.get("wait"):
402 runnerjob.run(wait=kwargs.get("wait"))
403 return runnerjob.uuid
405 self.poll_api = arvados.api('v1')
406 self.polling_thread = threading.Thread(target=self.poll_states)
407 self.polling_thread.start()
410 jobiter = iter((runnerjob,))
412 if "cwl_runner_job" in kwargs:
413 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
414 jobiter = tool.job(job_order,
415 self.output_callback,
420 # Will continue to hold the lock for the duration of this code
421 # except when in cond.wait(), at which point on_message can update
422 # job state and process output callbacks.
424 loopperf = Perf(metrics, "jobiter")
426 for runnable in jobiter:
429 if self.stop_polling.is_set():
433 with Perf(metrics, "run"):
434 runnable.run(**kwargs)
439 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
444 while self.processes:
447 except UnsupportedRequirement:
450 if sys.exc_info()[0] is KeyboardInterrupt:
451 logger.error("Interrupted, marking pipeline as failed")
453 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
455 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
456 body={"state": "Failed"}).execute(num_retries=self.num_retries)
457 if runnerjob and runnerjob.uuid and self.work_api == "containers":
458 self.api.container_requests().update(uuid=runnerjob.uuid,
459 body={"priority": "0"}).execute(num_retries=self.num_retries)
462 self.stop_polling.set()
463 self.polling_thread.join()
465 if self.final_status == "UnsupportedRequirement":
466 raise UnsupportedRequirement("Check log for details.")
468 if self.final_output is None:
469 raise WorkflowException("Workflow did not return a result.")
471 if kwargs.get("submit") and isinstance(runnerjob, Runner):
472 logger.info("Final output collection %s", runnerjob.final_output)
474 if self.output_name is None:
475 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
476 if self.output_tags is None:
477 self.output_tags = ""
478 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
479 self.set_crunch_output()
481 if self.final_status != "success":
482 raise WorkflowException("Workflow failed.")
484 if kwargs.get("compute_checksum"):
485 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
486 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
488 return self.final_output
492 """Print version string of key packages for provenance and debugging."""
494 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
495 arvpkg = pkg_resources.require("arvados-python-client")
496 cwlpkg = pkg_resources.require("cwltool")
498 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
499 "arvados-python-client", arvpkg[0].version,
500 "cwltool", cwlpkg[0].version)
503 def arg_parser(): # type: () -> argparse.ArgumentParser
504 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
506 parser.add_argument("--basedir", type=str,
507 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
508 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
509 help="Output directory, default current directory")
511 parser.add_argument("--eval-timeout",
512 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
515 parser.add_argument("--version", action="store_true", help="Print version and exit")
517 exgroup = parser.add_mutually_exclusive_group()
518 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
519 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
520 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
522 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
524 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
526 exgroup = parser.add_mutually_exclusive_group()
527 exgroup.add_argument("--enable-reuse", action="store_true",
528 default=True, dest="enable_reuse",
530 exgroup.add_argument("--disable-reuse", action="store_false",
531 default=True, dest="enable_reuse",
534 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
535 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
536 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
537 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
538 help="Ignore Docker image version when deciding whether to reuse past jobs.",
541 exgroup = parser.add_mutually_exclusive_group()
542 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
543 default=True, dest="submit")
544 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
545 default=True, dest="submit")
546 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
547 dest="create_workflow")
548 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
549 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
551 exgroup = parser.add_mutually_exclusive_group()
552 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
553 default=True, dest="wait")
554 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
555 default=True, dest="wait")
557 parser.add_argument("--api", type=str,
558 default=None, dest="work_api",
559 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
561 parser.add_argument("--compute-checksum", action="store_true", default=False,
562 help="Compute checksum of contents while collecting outputs",
563 dest="compute_checksum")
565 parser.add_argument("--submit-runner-ram", type=int,
566 help="RAM (in MiB) required for the workflow runner job (default 1024)",
569 parser.add_argument("--name", type=str,
570 help="Name to use for workflow execution instance.",
573 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
574 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
580 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
581 cache["http://arvados.org/cwl"] = res.read()
583 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
584 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
585 for n in extnames.names:
586 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
587 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
588 document_loader.idx["http://arvados.org/cwl#"+n] = {}
590 def main(args, stdout, stderr, api_client=None, keep_client=None):
591 parser = arg_parser()
593 job_order_object = None
594 arvargs = parser.parse_args(args)
597 print versionstring()
600 if arvargs.update_workflow:
601 if arvargs.update_workflow.find('-7fd4e-') == 5:
602 want_api = 'containers'
603 elif arvargs.update_workflow.find('-p5p6p-') == 5:
607 if want_api and arvargs.work_api and want_api != arvargs.work_api:
608 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
609 arvargs.update_workflow, want_api, arvargs.work_api))
611 arvargs.work_api = want_api
613 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
614 job_order_object = ({}, "")
619 if api_client is None:
620 api_client=arvados.api('v1', model=OrderedJsonModel())
621 if keep_client is None:
622 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
623 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
624 num_retries=4, output_name=arvargs.output_name,
625 output_tags=arvargs.output_tags)
626 except Exception as e:
631 logger.setLevel(logging.DEBUG)
634 logger.setLevel(logging.WARN)
635 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
638 metrics.setLevel(logging.DEBUG)
639 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
641 arvargs.conformance_test = None
642 arvargs.use_container = True
643 arvargs.relax_path_checks = True
644 arvargs.validate = None
646 return cwltool.main.main(args=arvargs,
649 executor=runner.arv_executor,
650 makeTool=runner.arv_make_tool,
651 versionfunc=versionstring,
652 job_order_object=job_order_object,
653 make_fs_access=partial(CollectionFsAccess,
654 api_client=api_client,
655 keep_client=keep_client),
656 fetcher_constructor=partial(CollectionFetcher,
657 api_client=api_client,
658 keep_client=keep_client),
659 resolver=partial(collectionResolver, api_client))