3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
24 from arvados.errors import ApiError
26 from .arvcontainer import ArvadosContainer, RunnerContainer
27 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
28 from. runner import Runner, upload_instance
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess
32 from .perf import Perf
33 from .pathmapper import FinalOutputPathMapper
34 from ._version import __version__
36 from cwltool.pack import pack
37 from cwltool.process import shortname, UnsupportedRequirement, getListing
38 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
39 from cwltool.draft2tool import compute_checksums
40 from arvados.api import OrderedJsonModel
42 logger = logging.getLogger('arvados.cwl-runner')
43 metrics = logging.getLogger('arvados.cwl-runner.metrics')
44 logger.setLevel(logging.INFO)
47 class ArvCwlRunner(object):
48 """Execute a CWL tool or workflow, submit work (using either jobs or
49 containers API), wait for them to complete, and report output.
53 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
56 self.lock = threading.Lock()
57 self.cond = threading.Condition(self.lock)
58 self.final_output = None
59 self.final_status = None
63 self.stop_polling = threading.Event()
66 self.final_output_collection = None
67 self.output_name = output_name
68 self.output_tags = output_tags
69 self.project_uuid = None
71 if keep_client is not None:
72 self.keep_client = keep_client
74 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
77 expected_api = ["jobs", "containers"]
78 for api in expected_api:
80 methods = self.api._rootDesc.get('resources')[api]['methods']
81 if ('httpMethod' in methods['create'] and
82 (work_api == api or work_api is None)):
90 raise Exception("No supported APIs")
92 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
94 def arv_make_tool(self, toolpath_object, **kwargs):
95 kwargs["work_api"] = self.work_api
96 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
97 return ArvadosCommandTool(self, toolpath_object, **kwargs)
98 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
99 return ArvadosWorkflow(self, toolpath_object, **kwargs)
101 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
103 def output_callback(self, out, processStatus):
104 if processStatus == "success":
105 logger.info("Overall process status is %s", processStatus)
107 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
108 body={"state": "Complete"}).execute(num_retries=self.num_retries)
110 logger.warn("Overall process status is %s", processStatus)
112 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
113 body={"state": "Failed"}).execute(num_retries=self.num_retries)
114 self.final_status = processStatus
115 self.final_output = out
117 def on_message(self, event):
118 if "object_uuid" in event:
119 if event["object_uuid"] in self.processes and event["event_type"] == "update":
120 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
121 uuid = event["object_uuid"]
123 j = self.processes[uuid]
124 logger.info("Job %s (%s) is Running", j.name, uuid)
126 j.update_pipeline_component(event["properties"]["new_attributes"])
127 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
128 uuid = event["object_uuid"]
131 j = self.processes[uuid]
132 txt = self.work_api[0].upper() + self.work_api[1:-1]
133 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
134 with Perf(metrics, "done %s" % j.name):
135 j.done(event["properties"]["new_attributes"])
140 def poll_states(self):
141 """Poll status of jobs or containers listed in the processes dict.
143 Runs in a separate thread.
148 self.stop_polling.wait(15)
149 if self.stop_polling.is_set():
152 keys = self.processes.keys()
156 if self.work_api == "containers":
157 table = self.poll_api.container_requests()
158 elif self.work_api == "jobs":
159 table = self.poll_api.jobs()
162 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
163 except Exception as e:
164 logger.warn("Error checking states on API server: %s", e)
167 for p in proc_states["items"]:
169 "object_uuid": p["uuid"],
170 "event_type": "update",
176 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
178 self.processes.clear()
182 self.stop_polling.set()
184 def get_uploaded(self):
185 return self.uploaded.copy()
187 def add_uploaded(self, src, pair):
188 self.uploaded[src] = pair
190 def check_writable(self, obj):
191 if isinstance(obj, dict):
192 if obj.get("writable"):
193 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
194 for v in obj.itervalues():
195 self.check_writable(v)
196 if isinstance(obj, list):
198 self.check_writable(v)
200 def make_output_collection(self, name, tagsString, outputObj):
201 outputObj = copy.deepcopy(outputObj)
204 def capture(fileobj):
205 files.append(fileobj)
207 adjustDirObjs(outputObj, capture)
208 adjustFileObjs(outputObj, capture)
210 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
212 final = arvados.collection.Collection(api_client=self.api,
213 keep_client=self.keep_client,
214 num_retries=self.num_retries)
217 for k,v in generatemapper.items():
218 if k.startswith("_:"):
219 if v.type == "Directory":
221 if v.type == "CreateFile":
222 with final.open(v.target, "wb") as f:
223 f.write(v.resolved.encode("utf-8"))
226 if not k.startswith("keep:"):
227 raise Exception("Output source is not in keep or a literal")
229 srccollection = sp[0][5:]
230 if srccollection not in srccollections:
232 srccollections[srccollection] = arvados.collection.CollectionReader(
235 keep_client=self.keep_client,
236 num_retries=self.num_retries)
237 except arvados.errors.ArgumentError as e:
238 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
240 reader = srccollections[srccollection]
242 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
243 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
245 logger.warn("While preparing output collection: %s", e)
247 def rewrite(fileobj):
248 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
249 for k in ("basename", "listing", "contents"):
253 adjustDirObjs(outputObj, rewrite)
254 adjustFileObjs(outputObj, rewrite)
256 with final.open("cwl.output.json", "w") as f:
257 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
259 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
261 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
262 final.api_response()["name"],
263 final.manifest_locator())
265 final_uuid = final.manifest_locator()
266 tags = tagsString.split(',')
268 self.api.links().create(body={
269 "head_uuid": final_uuid, "link_class": "tag", "name": tag
270 }).execute(num_retries=self.num_retries)
272 def finalcollection(fileobj):
273 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
275 adjustDirObjs(outputObj, finalcollection)
276 adjustFileObjs(outputObj, finalcollection)
278 return (outputObj, final)
280 def set_crunch_output(self):
281 if self.work_api == "containers":
283 current = self.api.containers().current().execute(num_retries=self.num_retries)
284 except ApiError as e:
285 # Status code 404 just means we're not running in a container.
286 if e.resp.status != 404:
287 logger.info("Getting current container: %s", e)
290 self.api.containers().update(uuid=current['uuid'],
292 'output': self.final_output_collection.portable_data_hash(),
293 }).execute(num_retries=self.num_retries)
294 except Exception as e:
295 logger.info("Setting container output: %s", e)
296 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
297 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
299 'output': self.final_output_collection.portable_data_hash(),
300 'success': self.final_status == "success",
302 }).execute(num_retries=self.num_retries)
304 def arv_executor(self, tool, job_order, **kwargs):
305 self.debug = kwargs.get("debug")
307 tool.visit(self.check_writable)
309 self.project_uuid = kwargs.get("project_uuid")
311 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
313 keep_client=self.keep_client)
314 self.fs_access = make_fs_access(kwargs["basedir"])
316 existing_uuid = kwargs.get("update_workflow")
317 if existing_uuid or kwargs.get("create_workflow"):
318 if self.work_api == "jobs":
319 tmpl = RunnerTemplate(self, tool, job_order,
320 kwargs.get("enable_reuse"),
322 submit_runner_ram=kwargs.get("submit_runner_ram"))
324 # cwltool.main will write our return value to stdout.
327 return upload_workflow(self, tool, job_order,
330 submit_runner_ram=kwargs.get("submit_runner_ram"))
332 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
334 kwargs["make_fs_access"] = make_fs_access
335 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
336 kwargs["use_container"] = True
337 kwargs["tmpdir_prefix"] = "tmp"
338 kwargs["on_error"] = "continue"
339 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
341 if self.work_api == "containers":
342 kwargs["outdir"] = "/var/spool/cwl"
343 kwargs["docker_outdir"] = "/var/spool/cwl"
344 kwargs["tmpdir"] = "/tmp"
345 kwargs["docker_tmpdir"] = "/tmp"
346 elif self.work_api == "jobs":
347 kwargs["outdir"] = "$(task.outdir)"
348 kwargs["docker_outdir"] = "$(task.outdir)"
349 kwargs["tmpdir"] = "$(task.tmpdir)"
351 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
354 if kwargs.get("submit"):
355 if self.work_api == "containers":
356 if tool.tool["class"] == "CommandLineTool":
357 runnerjob = tool.job(job_order,
358 self.output_callback,
361 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
362 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
364 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
365 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
367 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
368 # Create pipeline for local run
369 self.pipeline = self.api.pipeline_instances().create(
371 "owner_uuid": self.project_uuid,
372 "name": shortname(tool.tool["id"]),
374 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
375 logger.info("Pipeline instance %s", self.pipeline["uuid"])
377 if runnerjob and not kwargs.get("wait"):
378 runnerjob.run(wait=kwargs.get("wait"))
379 return runnerjob.uuid
381 self.poll_api = arvados.api('v1')
382 self.polling_thread = threading.Thread(target=self.poll_states)
383 self.polling_thread.start()
386 jobiter = iter((runnerjob,))
388 if "cwl_runner_job" in kwargs:
389 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
390 jobiter = tool.job(job_order,
391 self.output_callback,
396 # Will continue to hold the lock for the duration of this code
397 # except when in cond.wait(), at which point on_message can update
398 # job state and process output callbacks.
400 loopperf = Perf(metrics, "jobiter")
402 for runnable in jobiter:
405 if self.stop_polling.is_set():
409 with Perf(metrics, "run"):
410 runnable.run(**kwargs)
415 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
420 while self.processes:
423 except UnsupportedRequirement:
426 if sys.exc_info()[0] is KeyboardInterrupt:
427 logger.error("Interrupted, marking pipeline as failed")
429 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
431 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
432 body={"state": "Failed"}).execute(num_retries=self.num_retries)
433 if runnerjob and runnerjob.uuid and self.work_api == "containers":
434 self.api.container_requests().update(uuid=runnerjob.uuid,
435 body={"priority": "0"}).execute(num_retries=self.num_retries)
438 self.stop_polling.set()
439 self.polling_thread.join()
441 if self.final_status == "UnsupportedRequirement":
442 raise UnsupportedRequirement("Check log for details.")
444 if self.final_output is None:
445 raise WorkflowException("Workflow did not return a result.")
447 if kwargs.get("submit") and isinstance(runnerjob, Runner):
448 logger.info("Final output collection %s", runnerjob.final_output)
450 if self.output_name is None:
451 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
452 if self.output_tags is None:
453 self.output_tags = ""
454 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
455 self.set_crunch_output()
457 if self.final_status != "success":
458 raise WorkflowException("Workflow failed.")
460 if kwargs.get("compute_checksum"):
461 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
462 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
464 return self.final_output
468 """Print version string of key packages for provenance and debugging."""
470 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
471 arvpkg = pkg_resources.require("arvados-python-client")
472 cwlpkg = pkg_resources.require("cwltool")
474 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
475 "arvados-python-client", arvpkg[0].version,
476 "cwltool", cwlpkg[0].version)
479 def arg_parser(): # type: () -> argparse.ArgumentParser
480 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
482 parser.add_argument("--basedir", type=str,
483 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
484 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
485 help="Output directory, default current directory")
487 parser.add_argument("--eval-timeout",
488 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
491 parser.add_argument("--version", action="store_true", help="Print version and exit")
493 exgroup = parser.add_mutually_exclusive_group()
494 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
495 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
496 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
498 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
500 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
502 exgroup = parser.add_mutually_exclusive_group()
503 exgroup.add_argument("--enable-reuse", action="store_true",
504 default=True, dest="enable_reuse",
506 exgroup.add_argument("--disable-reuse", action="store_false",
507 default=True, dest="enable_reuse",
510 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
511 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
512 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
513 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
514 help="Ignore Docker image version when deciding whether to reuse past jobs.",
517 exgroup = parser.add_mutually_exclusive_group()
518 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
519 default=True, dest="submit")
520 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
521 default=True, dest="submit")
522 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
523 dest="create_workflow")
524 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
525 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
527 exgroup = parser.add_mutually_exclusive_group()
528 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
529 default=True, dest="wait")
530 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
531 default=True, dest="wait")
533 parser.add_argument("--api", type=str,
534 default=None, dest="work_api",
535 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
537 parser.add_argument("--compute-checksum", action="store_true", default=False,
538 help="Compute checksum of contents while collecting outputs",
539 dest="compute_checksum")
541 parser.add_argument("--submit-runner-ram", type=int,
542 help="RAM (in MiB) required for the workflow runner job (default 1024)",
545 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
546 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
552 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
553 cache["http://arvados.org/cwl"] = res.read()
555 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
556 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
557 for n in extnames.names:
558 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
559 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
560 document_loader.idx["http://arvados.org/cwl#"+n] = {}
562 def main(args, stdout, stderr, api_client=None, keep_client=None):
563 parser = arg_parser()
565 job_order_object = None
566 arvargs = parser.parse_args(args)
569 print versionstring()
572 if arvargs.update_workflow:
573 if arvargs.update_workflow.find('-7fd4e-') == 5:
574 want_api = 'containers'
575 elif arvargs.update_workflow.find('-p5p6p-') == 5:
579 if want_api and arvargs.work_api and want_api != arvargs.work_api:
580 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
581 arvargs.update_workflow, want_api, arvargs.work_api))
583 arvargs.work_api = want_api
585 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
586 job_order_object = ({}, "")
591 if api_client is None:
592 api_client=arvados.api('v1', model=OrderedJsonModel())
593 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
594 except Exception as e:
599 logger.setLevel(logging.DEBUG)
602 logger.setLevel(logging.WARN)
603 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
606 metrics.setLevel(logging.DEBUG)
607 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
609 arvargs.conformance_test = None
610 arvargs.use_container = True
611 arvargs.relax_path_checks = True
613 return cwltool.main.main(args=arvargs,
616 executor=runner.arv_executor,
617 makeTool=runner.arv_make_tool,
618 versionfunc=versionstring,
619 job_order_object=job_order_object,
620 make_fs_access=partial(CollectionFsAccess, api_client=api_client))