3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner, upload_instance
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement, getListing
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
45 class ArvCwlRunner(object):
46 """Execute a CWL tool or workflow, submit work (using either jobs or
47 containers API), wait for them to complete, and report output.
51 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
54 self.lock = threading.Lock()
55 self.cond = threading.Condition(self.lock)
56 self.final_output = None
57 self.final_status = None
61 self.work_api = work_api
62 self.stop_polling = threading.Event()
65 self.final_output_collection = None
66 self.output_name = output_name
67 self.project_uuid = None
69 if keep_client is not None:
70 self.keep_client = keep_client
72 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
74 if self.work_api is None:
75 # todo: autodetect API to use.
76 self.work_api = "jobs"
78 if self.work_api not in ("containers", "jobs"):
79 raise Exception("Unsupported API '%s'" % self.work_api)
81 def arv_make_tool(self, toolpath_object, **kwargs):
82 kwargs["work_api"] = self.work_api
83 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
84 return ArvadosCommandTool(self, toolpath_object, **kwargs)
85 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
86 return ArvadosWorkflow(self, toolpath_object, **kwargs)
88 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
90 def output_callback(self, out, processStatus):
91 if processStatus == "success":
92 logger.info("Overall process status is %s", processStatus)
94 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
95 body={"state": "Complete"}).execute(num_retries=self.num_retries)
97 logger.warn("Overall process status is %s", processStatus)
99 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
100 body={"state": "Failed"}).execute(num_retries=self.num_retries)
101 self.final_status = processStatus
102 self.final_output = out
104 def on_message(self, event):
105 if "object_uuid" in event:
106 if event["object_uuid"] in self.processes and event["event_type"] == "update":
107 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
108 uuid = event["object_uuid"]
110 j = self.processes[uuid]
111 logger.info("Job %s (%s) is Running", j.name, uuid)
113 j.update_pipeline_component(event["properties"]["new_attributes"])
114 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
115 uuid = event["object_uuid"]
118 j = self.processes[uuid]
119 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
120 with Perf(metrics, "done %s" % j.name):
121 j.done(event["properties"]["new_attributes"])
126 def poll_states(self):
127 """Poll status of jobs or containers listed in the processes dict.
129 Runs in a separate thread.
133 self.stop_polling.wait(15)
134 if self.stop_polling.is_set():
137 keys = self.processes.keys()
141 if self.work_api == "containers":
142 table = self.poll_api.containers()
143 elif self.work_api == "jobs":
144 table = self.poll_api.jobs()
147 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
148 except Exception as e:
149 logger.warn("Error checking states on API server: %s", e)
152 for p in proc_states["items"]:
154 "object_uuid": p["uuid"],
155 "event_type": "update",
161 def get_uploaded(self):
162 return self.uploaded.copy()
164 def add_uploaded(self, src, pair):
165 self.uploaded[src] = pair
167 def check_writable(self, obj):
168 if isinstance(obj, dict):
169 if obj.get("writable"):
170 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
171 for v in obj.itervalues():
172 self.check_writable(v)
173 if isinstance(obj, list):
175 self.check_writable(v)
177 def make_output_collection(self, name, outputObj):
178 outputObj = copy.deepcopy(outputObj)
181 def capture(fileobj):
182 files.append(fileobj)
184 adjustDirObjs(outputObj, capture)
185 adjustFileObjs(outputObj, capture)
187 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
189 final = arvados.collection.Collection(api_client=self.api,
190 keep_client=self.keep_client,
191 num_retries=self.num_retries)
194 for k,v in generatemapper.items():
196 srccollection = sp[0][5:]
197 if srccollection not in srccollections:
198 srccollections[srccollection] = arvados.collection.CollectionReader(
201 keep_client=self.keep_client,
202 num_retries=self.num_retries)
203 reader = srccollections[srccollection]
205 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
206 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
208 logger.warn("While preparing output collection: %s", e)
210 def rewrite(fileobj):
211 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
212 for k in ("basename", "size", "listing"):
216 adjustDirObjs(outputObj, rewrite)
217 adjustFileObjs(outputObj, rewrite)
219 with final.open("cwl.output.json", "w") as f:
220 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
222 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
224 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
225 final.api_response()["name"],
226 final.manifest_locator())
228 self.final_output_collection = final
230 def arv_executor(self, tool, job_order, **kwargs):
231 self.debug = kwargs.get("debug")
233 tool.visit(self.check_writable)
235 useruuid = self.api.users().current().execute()["uuid"]
236 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
238 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
240 keep_client=self.keep_client)
241 self.fs_access = make_fs_access(kwargs["basedir"])
243 if kwargs.get("create_template"):
244 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
246 # cwltool.main will write our return value to stdout.
249 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
250 return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
252 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
254 kwargs["make_fs_access"] = make_fs_access
255 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
256 kwargs["use_container"] = True
257 kwargs["tmpdir_prefix"] = "tmp"
258 kwargs["on_error"] = "continue"
259 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
261 if self.work_api == "containers":
262 kwargs["outdir"] = "/var/spool/cwl"
263 kwargs["docker_outdir"] = "/var/spool/cwl"
264 kwargs["tmpdir"] = "/tmp"
265 kwargs["docker_tmpdir"] = "/tmp"
266 elif self.work_api == "jobs":
267 kwargs["outdir"] = "$(task.outdir)"
268 kwargs["docker_outdir"] = "$(task.outdir)"
269 kwargs["tmpdir"] = "$(task.tmpdir)"
271 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
274 if kwargs.get("submit"):
275 if self.work_api == "containers":
276 if tool.tool["class"] == "CommandLineTool":
277 runnerjob = tool.job(job_order,
278 self.output_callback,
281 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
283 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
285 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
286 # Create pipeline for local run
287 self.pipeline = self.api.pipeline_instances().create(
289 "owner_uuid": self.project_uuid,
290 "name": shortname(tool.tool["id"]),
292 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
293 logger.info("Pipeline instance %s", self.pipeline["uuid"])
295 if runnerjob and not kwargs.get("wait"):
296 runnerjob.run(wait=kwargs.get("wait"))
297 return runnerjob.uuid
299 self.poll_api = arvados.api('v1')
300 self.polling_thread = threading.Thread(target=self.poll_states)
301 self.polling_thread.start()
304 jobiter = iter((runnerjob,))
306 if "cwl_runner_job" in kwargs:
307 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
308 jobiter = tool.job(job_order,
309 self.output_callback,
314 # Will continue to hold the lock for the duration of this code
315 # except when in cond.wait(), at which point on_message can update
316 # job state and process output callbacks.
318 loopperf = Perf(metrics, "jobiter")
320 for runnable in jobiter:
323 with Perf(metrics, "run"):
324 runnable.run(**kwargs)
329 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
334 while self.processes:
337 except UnsupportedRequirement:
340 if sys.exc_info()[0] is KeyboardInterrupt:
341 logger.error("Interrupted, marking pipeline as failed")
343 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
345 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
346 body={"state": "Failed"}).execute(num_retries=self.num_retries)
347 if runnerjob and runnerjob.uuid and self.work_api == "containers":
348 self.api.container_requests().update(uuid=runnerjob.uuid,
349 body={"priority": "0"}).execute(num_retries=self.num_retries)
352 self.stop_polling.set()
353 self.polling_thread.join()
355 if self.final_status == "UnsupportedRequirement":
356 raise UnsupportedRequirement("Check log for details.")
358 if self.final_status != "success":
359 raise WorkflowException("Workflow failed.")
361 if self.final_output is None:
362 raise WorkflowException("Workflow did not return a result.")
364 if kwargs.get("submit") and isinstance(runnerjob, Runner):
365 logger.info("Final output collection %s", runnerjob.final_output)
367 if self.output_name is None:
368 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
369 self.make_output_collection(self.output_name, self.final_output)
371 if kwargs.get("compute_checksum"):
372 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
373 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
375 return self.final_output
379 """Print version string of key packages for provenance and debugging."""
381 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
382 arvpkg = pkg_resources.require("arvados-python-client")
383 cwlpkg = pkg_resources.require("cwltool")
385 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
386 "arvados-python-client", arvpkg[0].version,
387 "cwltool", cwlpkg[0].version)
390 def arg_parser(): # type: () -> argparse.ArgumentParser
391 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
393 parser.add_argument("--basedir", type=str,
394 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
395 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
396 help="Output directory, default current directory")
398 parser.add_argument("--eval-timeout",
399 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
402 parser.add_argument("--version", action="store_true", help="Print version and exit")
404 exgroup = parser.add_mutually_exclusive_group()
405 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
406 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
407 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
409 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
411 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
413 exgroup = parser.add_mutually_exclusive_group()
414 exgroup.add_argument("--enable-reuse", action="store_true",
415 default=True, dest="enable_reuse",
417 exgroup.add_argument("--disable-reuse", action="store_false",
418 default=True, dest="enable_reuse",
421 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
422 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
423 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
424 help="Ignore Docker image version when deciding whether to reuse past jobs.",
427 exgroup = parser.add_mutually_exclusive_group()
428 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
429 default=True, dest="submit")
430 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
431 default=True, dest="submit")
432 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
433 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
434 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
436 exgroup = parser.add_mutually_exclusive_group()
437 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
438 default=True, dest="wait")
439 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
440 default=True, dest="wait")
442 parser.add_argument("--api", type=str,
443 default=None, dest="work_api",
444 help="Select work submission API, one of 'jobs' or 'containers'.")
446 parser.add_argument("--compute-checksum", action="store_true", default=False,
447 help="Compute checksum of contents while collecting outputs",
448 dest="compute_checksum")
450 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
451 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
457 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
458 cache["http://arvados.org/cwl"] = res.read()
460 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
461 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
462 for n in extnames.names:
463 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
464 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
465 document_loader.idx["http://arvados.org/cwl#"+n] = {}
467 def main(args, stdout, stderr, api_client=None, keep_client=None):
468 parser = arg_parser()
470 job_order_object = None
471 arvargs = parser.parse_args(args)
472 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
473 job_order_object = ({}, "")
478 if api_client is None:
479 api_client=arvados.api('v1', model=OrderedJsonModel())
480 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
481 except Exception as e:
486 logger.setLevel(logging.DEBUG)
489 logger.setLevel(logging.WARN)
490 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
493 metrics.setLevel(logging.DEBUG)
494 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
496 arvargs.conformance_test = None
497 arvargs.use_container = True
499 return cwltool.main.main(args=arvargs,
502 executor=runner.arv_executor,
503 makeTool=runner.arv_make_tool,
504 versionfunc=versionstring,
505 job_order_object=job_order_object,
506 make_fs_access=partial(CollectionFsAccess, api_client=api_client))