3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
45 class ArvCwlRunner(object):
46 """Execute a CWL tool or workflow, submit work (using either jobs or
47 containers API), wait for them to complete, and report output.
51 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
54 self.lock = threading.Lock()
55 self.cond = threading.Condition(self.lock)
56 self.final_output = None
57 self.final_status = None
61 self.work_api = work_api
62 self.stop_polling = threading.Event()
65 self.final_output_collection = None
66 self.output_name = output_name
67 if keep_client is not None:
68 self.keep_client = keep_client
70 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
72 if self.work_api is None:
73 # todo: autodetect API to use.
74 self.work_api = "jobs"
76 if self.work_api not in ("containers", "jobs"):
77 raise Exception("Unsupported API '%s'" % self.work_api)
79 def arv_make_tool(self, toolpath_object, **kwargs):
80 kwargs["work_api"] = self.work_api
81 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
82 return ArvadosCommandTool(self, toolpath_object, **kwargs)
83 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
84 return ArvadosWorkflow(self, toolpath_object, **kwargs)
86 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
88 def output_callback(self, out, processStatus):
89 if processStatus == "success":
90 logger.info("Overall process status is %s", processStatus)
92 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
93 body={"state": "Complete"}).execute(num_retries=self.num_retries)
95 logger.warn("Overall process status is %s", processStatus)
97 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
98 body={"state": "Failed"}).execute(num_retries=self.num_retries)
99 self.final_status = processStatus
100 self.final_output = out
102 def on_message(self, event):
103 if "object_uuid" in event:
104 if event["object_uuid"] in self.processes and event["event_type"] == "update":
105 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
106 uuid = event["object_uuid"]
108 j = self.processes[uuid]
109 logger.info("Job %s (%s) is Running", j.name, uuid)
111 j.update_pipeline_component(event["properties"]["new_attributes"])
112 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
113 uuid = event["object_uuid"]
116 j = self.processes[uuid]
117 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
118 with Perf(metrics, "done %s" % j.name):
119 j.done(event["properties"]["new_attributes"])
124 def poll_states(self):
125 """Poll status of jobs or containers listed in the processes dict.
127 Runs in a separate thread.
131 self.stop_polling.wait(15)
132 if self.stop_polling.is_set():
135 keys = self.processes.keys()
139 if self.work_api == "containers":
140 table = self.poll_api.containers()
141 elif self.work_api == "jobs":
142 table = self.poll_api.jobs()
145 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
146 except Exception as e:
147 logger.warn("Error checking states on API server: %s", e)
150 for p in proc_states["items"]:
152 "object_uuid": p["uuid"],
153 "event_type": "update",
159 def get_uploaded(self):
160 return self.uploaded.copy()
162 def add_uploaded(self, src, pair):
163 self.uploaded[src] = pair
165 def check_writable(self, obj):
166 if isinstance(obj, dict):
167 if obj.get("writable"):
168 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
169 for v in obj.itervalues():
170 self.check_writable(v)
171 if isinstance(obj, list):
173 self.check_writable(v)
175 def make_output_collection(self, name, outputObj):
176 outputObj = copy.deepcopy(outputObj)
179 def capture(fileobj):
180 files.append(fileobj)
182 adjustDirObjs(outputObj, capture)
183 adjustFileObjs(outputObj, capture)
185 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
187 final = arvados.collection.Collection(api_client=self.api,
188 keep_client=self.keep_client,
189 num_retries=self.num_retries)
192 for k,v in generatemapper.items():
194 srccollection = sp[0][5:]
195 if srccollection not in srccollections:
196 srccollections[srccollection] = arvados.collection.CollectionReader(
199 keep_client=self.keep_client,
200 num_retries=self.num_retries)
201 reader = srccollections[srccollection]
203 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
204 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
206 logger.warn("While preparing output collection: %s", e)
208 def rewrite(fileobj):
209 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
210 for k in ("basename", "size", "listing"):
214 adjustDirObjs(outputObj, rewrite)
215 adjustFileObjs(outputObj, rewrite)
217 with final.open("cwl.output.json", "w") as f:
218 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
220 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
222 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
223 final.api_response()["name"],
224 final.manifest_locator())
226 self.final_output_collection = final
228 def arv_executor(self, tool, job_order, **kwargs):
229 self.debug = kwargs.get("debug")
231 tool.visit(self.check_writable)
233 useruuid = self.api.users().current().execute()["uuid"]
234 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
236 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
238 keep_client=self.keep_client)
239 self.fs_access = make_fs_access(kwargs["basedir"])
241 if kwargs.get("create_template"):
242 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
244 # cwltool.main will write our return value to stdout.
247 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
248 return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
250 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
252 kwargs["make_fs_access"] = make_fs_access
253 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
254 kwargs["use_container"] = True
255 kwargs["tmpdir_prefix"] = "tmp"
256 kwargs["on_error"] = "continue"
257 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
259 if self.work_api == "containers":
260 kwargs["outdir"] = "/var/spool/cwl"
261 kwargs["docker_outdir"] = "/var/spool/cwl"
262 kwargs["tmpdir"] = "/tmp"
263 kwargs["docker_tmpdir"] = "/tmp"
264 elif self.work_api == "jobs":
265 kwargs["outdir"] = "$(task.outdir)"
266 kwargs["docker_outdir"] = "$(task.outdir)"
267 kwargs["tmpdir"] = "$(task.tmpdir)"
270 if kwargs.get("submit"):
271 if self.work_api == "containers":
272 if tool.tool["class"] == "CommandLineTool":
273 runnerjob = tool.job(job_order,
274 self.output_callback,
277 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
279 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
281 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
282 # Create pipeline for local run
283 self.pipeline = self.api.pipeline_instances().create(
285 "owner_uuid": self.project_uuid,
286 "name": shortname(tool.tool["id"]),
288 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
289 logger.info("Pipeline instance %s", self.pipeline["uuid"])
291 if runnerjob and not kwargs.get("wait"):
293 return runnerjob.uuid
295 self.poll_api = arvados.api('v1')
296 self.polling_thread = threading.Thread(target=self.poll_states)
297 self.polling_thread.start()
300 jobiter = iter((runnerjob,))
302 if "cwl_runner_job" in kwargs:
303 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
304 jobiter = tool.job(job_order,
305 self.output_callback,
310 # Will continue to hold the lock for the duration of this code
311 # except when in cond.wait(), at which point on_message can update
312 # job state and process output callbacks.
314 loopperf = Perf(metrics, "jobiter")
316 for runnable in jobiter:
319 with Perf(metrics, "run"):
320 runnable.run(**kwargs)
325 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
330 while self.processes:
333 except UnsupportedRequirement:
336 if sys.exc_info()[0] is KeyboardInterrupt:
337 logger.error("Interrupted, marking pipeline as failed")
339 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
341 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
342 body={"state": "Failed"}).execute(num_retries=self.num_retries)
343 if runnerjob and runnerjob.uuid and self.work_api == "containers":
344 self.api.container_requests().update(uuid=runnerjob.uuid,
345 body={"priority": "0"}).execute(num_retries=self.num_retries)
348 self.stop_polling.set()
349 self.polling_thread.join()
351 if self.final_status == "UnsupportedRequirement":
352 raise UnsupportedRequirement("Check log for details.")
354 if self.final_status != "success":
355 raise WorkflowException("Workflow failed.")
357 if self.final_output is None:
358 raise WorkflowException("Workflow did not return a result.")
360 if kwargs.get("submit") and isinstance(runnerjob, Runner):
361 logger.info("Final output collection %s", runnerjob.final_output)
363 if self.output_name is None:
364 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
365 self.make_output_collection(self.output_name, self.final_output)
367 if kwargs.get("compute_checksum"):
368 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
370 return self.final_output
374 """Print version string of key packages for provenance and debugging."""
376 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
377 arvpkg = pkg_resources.require("arvados-python-client")
378 cwlpkg = pkg_resources.require("cwltool")
380 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
381 "arvados-python-client", arvpkg[0].version,
382 "cwltool", cwlpkg[0].version)
385 def arg_parser(): # type: () -> argparse.ArgumentParser
386 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
388 parser.add_argument("--basedir", type=str,
389 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
390 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
391 help="Output directory, default current directory")
393 parser.add_argument("--eval-timeout",
394 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
397 parser.add_argument("--version", action="store_true", help="Print version and exit")
399 exgroup = parser.add_mutually_exclusive_group()
400 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
401 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
402 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
404 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
406 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
408 exgroup = parser.add_mutually_exclusive_group()
409 exgroup.add_argument("--enable-reuse", action="store_true",
410 default=True, dest="enable_reuse",
412 exgroup.add_argument("--disable-reuse", action="store_false",
413 default=True, dest="enable_reuse",
416 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
417 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
418 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
419 help="Ignore Docker image version when deciding whether to reuse past jobs.",
422 exgroup = parser.add_mutually_exclusive_group()
423 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
424 default=True, dest="submit")
425 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
426 default=True, dest="submit")
427 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
428 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
429 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
431 exgroup = parser.add_mutually_exclusive_group()
432 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
433 default=True, dest="wait")
434 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
435 default=True, dest="wait")
437 parser.add_argument("--api", type=str,
438 default=None, dest="work_api",
439 help="Select work submission API, one of 'jobs' or 'containers'.")
441 parser.add_argument("--compute-checksum", action="store_true", default=False,
442 help="Compute checksum of contents while collecting outputs",
443 dest="compute_checksum")
445 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
446 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
452 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
453 cache["http://arvados.org/cwl"] = res.read()
455 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
456 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
457 for n in extnames.names:
458 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
459 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
460 document_loader.idx["http://arvados.org/cwl#"+n] = {}
462 def main(args, stdout, stderr, api_client=None, keep_client=None):
463 parser = arg_parser()
465 job_order_object = None
466 arvargs = parser.parse_args(args)
467 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
468 job_order_object = ({}, "")
473 if api_client is None:
474 api_client=arvados.api('v1', model=OrderedJsonModel())
475 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
476 except Exception as e:
481 logger.setLevel(logging.DEBUG)
484 logger.setLevel(logging.WARN)
485 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
488 metrics.setLevel(logging.DEBUG)
489 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
491 arvargs.conformance_test = None
492 arvargs.use_container = True
494 return cwltool.main.main(args=arvargs,
497 executor=runner.arv_executor,
498 makeTool=runner.arv_make_tool,
499 versionfunc=versionstring,
500 job_order_object=job_order_object,
501 make_fs_access=partial(CollectionFsAccess, api_client=api_client))