3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement, getListing
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
45 class ArvCwlRunner(object):
46 """Execute a CWL tool or workflow, submit work (using either jobs or
47 containers API), wait for them to complete, and report output.
51 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
54 self.lock = threading.Lock()
55 self.cond = threading.Condition(self.lock)
56 self.final_output = None
57 self.final_status = None
61 self.work_api = work_api
62 self.stop_polling = threading.Event()
65 self.final_output_collection = None
66 self.output_name = output_name
67 if keep_client is not None:
68 self.keep_client = keep_client
70 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
72 if self.work_api is None:
73 # todo: autodetect API to use.
74 self.work_api = "jobs"
76 if self.work_api not in ("containers", "jobs"):
77 raise Exception("Unsupported API '%s'" % self.work_api)
79 def arv_make_tool(self, toolpath_object, **kwargs):
80 kwargs["work_api"] = self.work_api
81 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
82 return ArvadosCommandTool(self, toolpath_object, **kwargs)
83 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
84 return ArvadosWorkflow(self, toolpath_object, **kwargs)
86 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
88 def output_callback(self, out, processStatus):
89 if processStatus == "success":
90 logger.info("Overall process status is %s", processStatus)
92 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
93 body={"state": "Complete"}).execute(num_retries=self.num_retries)
95 logger.warn("Overall process status is %s", processStatus)
97 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
98 body={"state": "Failed"}).execute(num_retries=self.num_retries)
99 self.final_status = processStatus
100 self.final_output = out
102 def on_message(self, event):
103 if "object_uuid" in event:
104 if event["object_uuid"] in self.processes and event["event_type"] == "update":
105 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
106 uuid = event["object_uuid"]
108 j = self.processes[uuid]
109 logger.info("Job %s (%s) is Running", j.name, uuid)
111 j.update_pipeline_component(event["properties"]["new_attributes"])
112 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
113 uuid = event["object_uuid"]
116 j = self.processes[uuid]
117 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
118 with Perf(metrics, "done %s" % j.name):
119 j.done(event["properties"]["new_attributes"])
124 def poll_states(self):
125 """Poll status of jobs or containers listed in the processes dict.
127 Runs in a separate thread.
131 self.stop_polling.wait(15)
132 if self.stop_polling.is_set():
135 keys = self.processes.keys()
139 if self.work_api == "containers":
140 table = self.poll_api.containers()
141 elif self.work_api == "jobs":
142 table = self.poll_api.jobs()
145 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
146 except Exception as e:
147 logger.warn("Error checking states on API server: %s", e)
150 for p in proc_states["items"]:
152 "object_uuid": p["uuid"],
153 "event_type": "update",
159 def get_uploaded(self):
160 return self.uploaded.copy()
162 def add_uploaded(self, src, pair):
163 self.uploaded[src] = pair
165 def check_writable(self, obj):
166 if isinstance(obj, dict):
167 if obj.get("writable"):
168 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
169 for v in obj.itervalues():
170 self.check_writable(v)
171 if isinstance(obj, list):
173 self.check_writable(v)
175 def make_output_collection(self, name, outputObj):
176 outputObj = copy.deepcopy(outputObj)
179 def capture(fileobj):
180 files.append(fileobj)
182 adjustDirObjs(outputObj, capture)
183 adjustFileObjs(outputObj, capture)
185 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
187 final = arvados.collection.Collection(api_client=self.api,
188 keep_client=self.keep_client,
189 num_retries=self.num_retries)
192 for k,v in generatemapper.items():
194 srccollection = sp[0][5:]
195 if srccollection not in srccollections:
196 srccollections[srccollection] = arvados.collection.CollectionReader(
199 keep_client=self.keep_client,
200 num_retries=self.num_retries)
201 reader = srccollections[srccollection]
203 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
204 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
206 logger.warn("While preparing output collection: %s", e)
208 def rewrite(fileobj):
209 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
210 for k in ("basename", "size", "listing"):
214 adjustDirObjs(outputObj, rewrite)
215 adjustFileObjs(outputObj, rewrite)
217 with final.open("cwl.output.json", "w") as f:
218 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
220 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
222 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
223 final.api_response()["name"],
224 final.manifest_locator())
226 self.final_output_collection = final
228 def arv_executor(self, tool, job_order, **kwargs):
229 self.debug = kwargs.get("debug")
231 tool.visit(self.check_writable)
233 useruuid = self.api.users().current().execute()["uuid"]
234 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
236 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
238 keep_client=self.keep_client)
239 self.fs_access = make_fs_access(kwargs["basedir"])
241 if kwargs.get("create_template"):
242 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
244 # cwltool.main will write our return value to stdout.
247 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
248 return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
250 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
252 kwargs["make_fs_access"] = make_fs_access
253 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
254 kwargs["use_container"] = True
255 kwargs["tmpdir_prefix"] = "tmp"
256 kwargs["on_error"] = "continue"
257 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
259 if self.work_api == "containers":
260 kwargs["outdir"] = "/var/spool/cwl"
261 kwargs["docker_outdir"] = "/var/spool/cwl"
262 kwargs["tmpdir"] = "/tmp"
263 kwargs["docker_tmpdir"] = "/tmp"
264 elif self.work_api == "jobs":
265 kwargs["outdir"] = "$(task.outdir)"
266 kwargs["docker_outdir"] = "$(task.outdir)"
267 kwargs["tmpdir"] = "$(task.tmpdir)"
270 if kwargs.get("submit"):
271 if self.work_api == "containers":
272 if tool.tool["class"] == "CommandLineTool":
273 runnerjob = tool.job(job_order,
274 self.output_callback,
277 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
279 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
281 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
282 # Create pipeline for local run
283 self.pipeline = self.api.pipeline_instances().create(
285 "owner_uuid": self.project_uuid,
286 "name": shortname(tool.tool["id"]),
288 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
289 logger.info("Pipeline instance %s", self.pipeline["uuid"])
291 if runnerjob and not kwargs.get("wait"):
293 return runnerjob.uuid
295 self.poll_api = arvados.api('v1')
296 self.polling_thread = threading.Thread(target=self.poll_states)
297 self.polling_thread.start()
300 jobiter = iter((runnerjob,))
302 if "cwl_runner_job" in kwargs:
303 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
304 jobiter = tool.job(job_order,
305 self.output_callback,
310 # Will continue to hold the lock for the duration of this code
311 # except when in cond.wait(), at which point on_message can update
312 # job state and process output callbacks.
314 loopperf = Perf(metrics, "jobiter")
316 for runnable in jobiter:
319 with Perf(metrics, "run"):
320 runnable.run(**kwargs)
325 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
330 while self.processes:
333 except UnsupportedRequirement:
336 if sys.exc_info()[0] is KeyboardInterrupt:
337 logger.error("Interrupted, marking pipeline as failed")
339 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
341 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
342 body={"state": "Failed"}).execute(num_retries=self.num_retries)
343 if runnerjob and runnerjob.uuid and self.work_api == "containers":
344 self.api.container_requests().update(uuid=runnerjob.uuid,
345 body={"priority": "0"}).execute(num_retries=self.num_retries)
348 self.stop_polling.set()
349 self.polling_thread.join()
351 if self.final_status == "UnsupportedRequirement":
352 raise UnsupportedRequirement("Check log for details.")
354 if self.final_status != "success":
355 raise WorkflowException("Workflow failed.")
357 if self.final_output is None:
358 raise WorkflowException("Workflow did not return a result.")
360 if kwargs.get("submit") and isinstance(runnerjob, Runner):
361 logger.info("Final output collection %s", runnerjob.final_output)
363 if self.output_name is None:
364 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
365 self.make_output_collection(self.output_name, self.final_output)
367 if kwargs.get("compute_checksum"):
368 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
369 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
371 return self.final_output
375 """Print version string of key packages for provenance and debugging."""
377 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
378 arvpkg = pkg_resources.require("arvados-python-client")
379 cwlpkg = pkg_resources.require("cwltool")
381 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
382 "arvados-python-client", arvpkg[0].version,
383 "cwltool", cwlpkg[0].version)
386 def arg_parser(): # type: () -> argparse.ArgumentParser
387 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
389 parser.add_argument("--basedir", type=str,
390 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
391 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
392 help="Output directory, default current directory")
394 parser.add_argument("--eval-timeout",
395 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
398 parser.add_argument("--version", action="store_true", help="Print version and exit")
400 exgroup = parser.add_mutually_exclusive_group()
401 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
402 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
403 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
405 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
407 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
409 exgroup = parser.add_mutually_exclusive_group()
410 exgroup.add_argument("--enable-reuse", action="store_true",
411 default=True, dest="enable_reuse",
413 exgroup.add_argument("--disable-reuse", action="store_false",
414 default=True, dest="enable_reuse",
417 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
418 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
419 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
420 help="Ignore Docker image version when deciding whether to reuse past jobs.",
423 exgroup = parser.add_mutually_exclusive_group()
424 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
425 default=True, dest="submit")
426 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
427 default=True, dest="submit")
428 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
429 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
430 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
432 exgroup = parser.add_mutually_exclusive_group()
433 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
434 default=True, dest="wait")
435 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
436 default=True, dest="wait")
438 parser.add_argument("--api", type=str,
439 default=None, dest="work_api",
440 help="Select work submission API, one of 'jobs' or 'containers'.")
442 parser.add_argument("--compute-checksum", action="store_true", default=False,
443 help="Compute checksum of contents while collecting outputs",
444 dest="compute_checksum")
446 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
447 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
453 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
454 cache["http://arvados.org/cwl"] = res.read()
456 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
457 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
458 for n in extnames.names:
459 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
460 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
461 document_loader.idx["http://arvados.org/cwl#"+n] = {}
463 def main(args, stdout, stderr, api_client=None, keep_client=None):
464 parser = arg_parser()
466 job_order_object = None
467 arvargs = parser.parse_args(args)
468 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
469 job_order_object = ({}, "")
474 if api_client is None:
475 api_client=arvados.api('v1', model=OrderedJsonModel())
476 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
477 except Exception as e:
482 logger.setLevel(logging.DEBUG)
485 logger.setLevel(logging.WARN)
486 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
489 metrics.setLevel(logging.DEBUG)
490 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
492 arvargs.conformance_test = None
493 arvargs.use_container = True
495 return cwltool.main.main(args=arvargs,
498 executor=runner.arv_executor,
499 makeTool=runner.arv_make_tool,
500 versionfunc=versionstring,
501 job_order_object=job_order_object,
502 make_fs_access=partial(CollectionFsAccess, api_client=api_client))