3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner, upload_instance
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement, getListing
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
45 class ArvCwlRunner(object):
46 """Execute a CWL tool or workflow, submit work (using either jobs or
47 containers API), wait for them to complete, and report output.
51 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
54 self.lock = threading.Lock()
55 self.cond = threading.Condition(self.lock)
56 self.final_output = None
57 self.final_status = None
61 self.work_api = work_api
62 self.stop_polling = threading.Event()
65 self.final_output_collection = None
66 self.output_name = output_name
67 self.project_uuid = None
69 if keep_client is not None:
70 self.keep_client = keep_client
72 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
74 if self.work_api is None:
75 # todo: autodetect API to use.
76 self.work_api = "jobs"
78 if self.work_api not in ("containers", "jobs"):
79 raise Exception("Unsupported API '%s'" % self.work_api)
81 def arv_make_tool(self, toolpath_object, **kwargs):
82 kwargs["work_api"] = self.work_api
83 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
84 return ArvadosCommandTool(self, toolpath_object, **kwargs)
85 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
86 return ArvadosWorkflow(self, toolpath_object, **kwargs)
88 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
90 def output_callback(self, out, processStatus):
91 if processStatus == "success":
92 logger.info("Overall process status is %s", processStatus)
94 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
95 body={"state": "Complete"}).execute(num_retries=self.num_retries)
97 logger.warn("Overall process status is %s", processStatus)
99 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
100 body={"state": "Failed"}).execute(num_retries=self.num_retries)
101 self.final_status = processStatus
102 self.final_output = out
104 def on_message(self, event):
105 if "object_uuid" in event:
106 if event["object_uuid"] in self.processes and event["event_type"] == "update":
107 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
108 uuid = event["object_uuid"]
110 j = self.processes[uuid]
111 logger.info("Job %s (%s) is Running", j.name, uuid)
113 j.update_pipeline_component(event["properties"]["new_attributes"])
114 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
115 uuid = event["object_uuid"]
118 j = self.processes[uuid]
119 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
120 with Perf(metrics, "done %s" % j.name):
121 j.done(event["properties"]["new_attributes"])
126 def poll_states(self):
127 """Poll status of jobs or containers listed in the processes dict.
129 Runs in a separate thread.
133 self.stop_polling.wait(15)
134 if self.stop_polling.is_set():
137 keys = self.processes.keys()
141 if self.work_api == "containers":
142 table = self.poll_api.containers()
143 elif self.work_api == "jobs":
144 table = self.poll_api.jobs()
147 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
148 except Exception as e:
149 logger.warn("Error checking states on API server: %s", e)
152 for p in proc_states["items"]:
154 "object_uuid": p["uuid"],
155 "event_type": "update",
161 def get_uploaded(self):
162 return self.uploaded.copy()
164 def add_uploaded(self, src, pair):
165 self.uploaded[src] = pair
167 def check_writable(self, obj):
168 if isinstance(obj, dict):
169 if obj.get("writable"):
170 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
171 for v in obj.itervalues():
172 self.check_writable(v)
173 if isinstance(obj, list):
175 self.check_writable(v)
177 def make_output_collection(self, name, outputObj):
178 outputObj = copy.deepcopy(outputObj)
181 def capture(fileobj):
182 files.append(fileobj)
184 adjustDirObjs(outputObj, capture)
185 adjustFileObjs(outputObj, capture)
187 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
189 final = arvados.collection.Collection(api_client=self.api,
190 keep_client=self.keep_client,
191 num_retries=self.num_retries)
194 for k,v in generatemapper.items():
196 srccollection = sp[0][5:]
197 if srccollection not in srccollections:
198 srccollections[srccollection] = arvados.collection.CollectionReader(
201 keep_client=self.keep_client,
202 num_retries=self.num_retries)
203 reader = srccollections[srccollection]
205 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
206 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
208 logger.warn("While preparing output collection: %s", e)
210 def rewrite(fileobj):
211 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
212 for k in ("basename", "size", "listing"):
216 adjustDirObjs(outputObj, rewrite)
217 adjustFileObjs(outputObj, rewrite)
219 with final.open("cwl.output.json", "w") as f:
220 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
222 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
224 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
225 final.api_response()["name"],
226 final.manifest_locator())
228 self.final_output_collection = final
230 def set_crunch_output(self):
231 if self.work_api == "containers":
233 current = self.api.containers().current().execute(num_retries=self.num_retries)
234 self.api.containers().update(uuid=current['uuid'],
236 'output': self.final_output_collection.portable_data_hash(),
237 }).execute(num_retries=self.num_retries)
238 except Exception as e:
239 logger.info("Setting container output: %s", e)
240 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
241 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
243 'output': self.final_output_collection.portable_data_hash(),
244 'success': self.final_status == "success",
246 }).execute(num_retries=self.num_retries)
248 def arv_executor(self, tool, job_order, **kwargs):
249 self.debug = kwargs.get("debug")
251 tool.visit(self.check_writable)
253 useruuid = self.api.users().current().execute()["uuid"]
254 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
256 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
258 keep_client=self.keep_client)
259 self.fs_access = make_fs_access(kwargs["basedir"])
261 if kwargs.get("create_template"):
262 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
264 # cwltool.main will write our return value to stdout.
267 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
268 return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
270 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
272 kwargs["make_fs_access"] = make_fs_access
273 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
274 kwargs["use_container"] = True
275 kwargs["tmpdir_prefix"] = "tmp"
276 kwargs["on_error"] = "continue"
277 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
279 if self.work_api == "containers":
280 kwargs["outdir"] = "/var/spool/cwl"
281 kwargs["docker_outdir"] = "/var/spool/cwl"
282 kwargs["tmpdir"] = "/tmp"
283 kwargs["docker_tmpdir"] = "/tmp"
284 elif self.work_api == "jobs":
285 kwargs["outdir"] = "$(task.outdir)"
286 kwargs["docker_outdir"] = "$(task.outdir)"
287 kwargs["tmpdir"] = "$(task.tmpdir)"
289 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
292 if kwargs.get("submit"):
293 if self.work_api == "containers":
294 if tool.tool["class"] == "CommandLineTool":
295 runnerjob = tool.job(job_order,
296 self.output_callback,
299 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
301 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
303 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
304 # Create pipeline for local run
305 self.pipeline = self.api.pipeline_instances().create(
307 "owner_uuid": self.project_uuid,
308 "name": shortname(tool.tool["id"]),
310 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
311 logger.info("Pipeline instance %s", self.pipeline["uuid"])
313 if runnerjob and not kwargs.get("wait"):
314 runnerjob.run(wait=kwargs.get("wait"))
315 return runnerjob.uuid
317 self.poll_api = arvados.api('v1')
318 self.polling_thread = threading.Thread(target=self.poll_states)
319 self.polling_thread.start()
322 jobiter = iter((runnerjob,))
324 if "cwl_runner_job" in kwargs:
325 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
326 jobiter = tool.job(job_order,
327 self.output_callback,
332 # Will continue to hold the lock for the duration of this code
333 # except when in cond.wait(), at which point on_message can update
334 # job state and process output callbacks.
336 loopperf = Perf(metrics, "jobiter")
338 for runnable in jobiter:
341 with Perf(metrics, "run"):
342 runnable.run(**kwargs)
347 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
352 while self.processes:
355 except UnsupportedRequirement:
358 if sys.exc_info()[0] is KeyboardInterrupt:
359 logger.error("Interrupted, marking pipeline as failed")
361 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
363 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
364 body={"state": "Failed"}).execute(num_retries=self.num_retries)
365 if runnerjob and runnerjob.uuid and self.work_api == "containers":
366 self.api.container_requests().update(uuid=runnerjob.uuid,
367 body={"priority": "0"}).execute(num_retries=self.num_retries)
370 self.stop_polling.set()
371 self.polling_thread.join()
373 if self.final_status == "UnsupportedRequirement":
374 raise UnsupportedRequirement("Check log for details.")
376 if self.final_output is None:
377 raise WorkflowException("Workflow did not return a result.")
379 if kwargs.get("submit") and isinstance(runnerjob, Runner):
380 logger.info("Final output collection %s", runnerjob.final_output)
382 if self.output_name is None:
383 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
384 self.make_output_collection(self.output_name, self.final_output)
385 self.set_crunch_output()
387 if self.final_status != "success":
388 raise WorkflowException("Workflow failed.")
390 if kwargs.get("compute_checksum"):
391 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
392 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
394 return self.final_output
398 """Print version string of key packages for provenance and debugging."""
400 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
401 arvpkg = pkg_resources.require("arvados-python-client")
402 cwlpkg = pkg_resources.require("cwltool")
404 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
405 "arvados-python-client", arvpkg[0].version,
406 "cwltool", cwlpkg[0].version)
409 def arg_parser(): # type: () -> argparse.ArgumentParser
410 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
412 parser.add_argument("--basedir", type=str,
413 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
414 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
415 help="Output directory, default current directory")
417 parser.add_argument("--eval-timeout",
418 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
421 parser.add_argument("--version", action="store_true", help="Print version and exit")
423 exgroup = parser.add_mutually_exclusive_group()
424 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
425 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
426 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
428 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
430 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
432 exgroup = parser.add_mutually_exclusive_group()
433 exgroup.add_argument("--enable-reuse", action="store_true",
434 default=True, dest="enable_reuse",
436 exgroup.add_argument("--disable-reuse", action="store_false",
437 default=True, dest="enable_reuse",
440 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
441 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
442 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
443 help="Ignore Docker image version when deciding whether to reuse past jobs.",
446 exgroup = parser.add_mutually_exclusive_group()
447 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
448 default=True, dest="submit")
449 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
450 default=True, dest="submit")
451 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
452 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
453 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
455 exgroup = parser.add_mutually_exclusive_group()
456 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
457 default=True, dest="wait")
458 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
459 default=True, dest="wait")
461 parser.add_argument("--api", type=str,
462 default=None, dest="work_api",
463 help="Select work submission API, one of 'jobs' or 'containers'.")
465 parser.add_argument("--compute-checksum", action="store_true", default=False,
466 help="Compute checksum of contents while collecting outputs",
467 dest="compute_checksum")
469 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
470 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
476 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
477 cache["http://arvados.org/cwl"] = res.read()
479 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
480 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
481 for n in extnames.names:
482 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
483 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
484 document_loader.idx["http://arvados.org/cwl#"+n] = {}
486 def main(args, stdout, stderr, api_client=None, keep_client=None):
487 parser = arg_parser()
489 job_order_object = None
490 arvargs = parser.parse_args(args)
491 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
492 job_order_object = ({}, "")
497 if api_client is None:
498 api_client=arvados.api('v1', model=OrderedJsonModel())
499 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
500 except Exception as e:
505 logger.setLevel(logging.DEBUG)
508 logger.setLevel(logging.WARN)
509 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
512 metrics.setLevel(logging.DEBUG)
513 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
515 arvargs.conformance_test = None
516 arvargs.use_container = True
518 return cwltool.main.main(args=arvargs,
521 executor=runner.arv_executor,
522 makeTool=runner.arv_make_tool,
523 versionfunc=versionstring,
524 job_order_object=job_order_object,
525 make_fs_access=partial(CollectionFsAccess, api_client=api_client))