3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner, upload_instance
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement, getListing
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
45 class ArvCwlRunner(object):
46 """Execute a CWL tool or workflow, submit work (using either jobs or
47 containers API), wait for them to complete, and report output.
51 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
54 self.lock = threading.Lock()
55 self.cond = threading.Condition(self.lock)
56 self.final_output = None
57 self.final_status = None
61 self.stop_polling = threading.Event()
64 self.final_output_collection = None
65 self.output_name = output_name
66 self.project_uuid = None
68 if keep_client is not None:
69 self.keep_client = keep_client
71 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
73 for api in ["jobs", "containers"]:
75 methods = self.api._rootDesc.get('resources')[api]['methods']
76 if ('httpMethod' in methods['create'] and
77 (work_api == api or work_api is None)):
84 raise Exception("No supported APIs")
86 raise Exception("Unsupported API '%s'" % work_api)
88 def arv_make_tool(self, toolpath_object, **kwargs):
89 kwargs["work_api"] = self.work_api
90 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
91 return ArvadosCommandTool(self, toolpath_object, **kwargs)
92 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
93 return ArvadosWorkflow(self, toolpath_object, **kwargs)
95 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
97 def output_callback(self, out, processStatus):
98 if processStatus == "success":
99 logger.info("Overall process status is %s", processStatus)
101 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
102 body={"state": "Complete"}).execute(num_retries=self.num_retries)
104 logger.warn("Overall process status is %s", processStatus)
106 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
107 body={"state": "Failed"}).execute(num_retries=self.num_retries)
108 self.final_status = processStatus
109 self.final_output = out
111 def on_message(self, event):
112 if "object_uuid" in event:
113 if event["object_uuid"] in self.processes and event["event_type"] == "update":
114 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
115 uuid = event["object_uuid"]
117 j = self.processes[uuid]
118 logger.info("Job %s (%s) is Running", j.name, uuid)
120 j.update_pipeline_component(event["properties"]["new_attributes"])
121 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
122 uuid = event["object_uuid"]
125 j = self.processes[uuid]
126 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
127 with Perf(metrics, "done %s" % j.name):
128 j.done(event["properties"]["new_attributes"])
133 def poll_states(self):
134 """Poll status of jobs or containers listed in the processes dict.
136 Runs in a separate thread.
140 self.stop_polling.wait(15)
141 if self.stop_polling.is_set():
144 keys = self.processes.keys()
148 if self.work_api == "containers":
149 table = self.poll_api.containers()
150 elif self.work_api == "jobs":
151 table = self.poll_api.jobs()
154 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
155 except Exception as e:
156 logger.warn("Error checking states on API server: %s", e)
159 for p in proc_states["items"]:
161 "object_uuid": p["uuid"],
162 "event_type": "update",
168 def get_uploaded(self):
169 return self.uploaded.copy()
171 def add_uploaded(self, src, pair):
172 self.uploaded[src] = pair
174 def check_writable(self, obj):
175 if isinstance(obj, dict):
176 if obj.get("writable"):
177 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
178 for v in obj.itervalues():
179 self.check_writable(v)
180 if isinstance(obj, list):
182 self.check_writable(v)
184 def make_output_collection(self, name, outputObj):
185 outputObj = copy.deepcopy(outputObj)
188 def capture(fileobj):
189 files.append(fileobj)
191 adjustDirObjs(outputObj, capture)
192 adjustFileObjs(outputObj, capture)
194 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
196 final = arvados.collection.Collection(api_client=self.api,
197 keep_client=self.keep_client,
198 num_retries=self.num_retries)
201 for k,v in generatemapper.items():
203 srccollection = sp[0][5:]
204 if srccollection not in srccollections:
205 srccollections[srccollection] = arvados.collection.CollectionReader(
208 keep_client=self.keep_client,
209 num_retries=self.num_retries)
210 reader = srccollections[srccollection]
212 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
213 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
215 logger.warn("While preparing output collection: %s", e)
217 def rewrite(fileobj):
218 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
219 for k in ("basename", "size", "listing"):
223 adjustDirObjs(outputObj, rewrite)
224 adjustFileObjs(outputObj, rewrite)
226 with final.open("cwl.output.json", "w") as f:
227 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
229 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
231 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
232 final.api_response()["name"],
233 final.manifest_locator())
235 self.final_output_collection = final
237 def arv_executor(self, tool, job_order, **kwargs):
238 self.debug = kwargs.get("debug")
240 tool.visit(self.check_writable)
242 useruuid = self.api.users().current().execute()["uuid"]
243 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
245 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
247 keep_client=self.keep_client)
248 self.fs_access = make_fs_access(kwargs["basedir"])
250 if kwargs.get("create_template"):
251 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
253 # cwltool.main will write our return value to stdout.
256 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
257 return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
259 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
261 kwargs["make_fs_access"] = make_fs_access
262 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
263 kwargs["use_container"] = True
264 kwargs["tmpdir_prefix"] = "tmp"
265 kwargs["on_error"] = "continue"
266 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
268 if self.work_api == "containers":
269 kwargs["outdir"] = "/var/spool/cwl"
270 kwargs["docker_outdir"] = "/var/spool/cwl"
271 kwargs["tmpdir"] = "/tmp"
272 kwargs["docker_tmpdir"] = "/tmp"
273 elif self.work_api == "jobs":
274 kwargs["outdir"] = "$(task.outdir)"
275 kwargs["docker_outdir"] = "$(task.outdir)"
276 kwargs["tmpdir"] = "$(task.tmpdir)"
278 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
281 if kwargs.get("submit"):
282 if self.work_api == "containers":
283 if tool.tool["class"] == "CommandLineTool":
284 runnerjob = tool.job(job_order,
285 self.output_callback,
288 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
290 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
292 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
293 # Create pipeline for local run
294 self.pipeline = self.api.pipeline_instances().create(
296 "owner_uuid": self.project_uuid,
297 "name": shortname(tool.tool["id"]),
299 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
300 logger.info("Pipeline instance %s", self.pipeline["uuid"])
302 if runnerjob and not kwargs.get("wait"):
303 runnerjob.run(wait=kwargs.get("wait"))
304 return runnerjob.uuid
306 self.poll_api = arvados.api('v1')
307 self.polling_thread = threading.Thread(target=self.poll_states)
308 self.polling_thread.start()
311 jobiter = iter((runnerjob,))
313 if "cwl_runner_job" in kwargs:
314 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
315 jobiter = tool.job(job_order,
316 self.output_callback,
321 # Will continue to hold the lock for the duration of this code
322 # except when in cond.wait(), at which point on_message can update
323 # job state and process output callbacks.
325 loopperf = Perf(metrics, "jobiter")
327 for runnable in jobiter:
330 with Perf(metrics, "run"):
331 runnable.run(**kwargs)
336 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
341 while self.processes:
344 except UnsupportedRequirement:
347 if sys.exc_info()[0] is KeyboardInterrupt:
348 logger.error("Interrupted, marking pipeline as failed")
350 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
352 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
353 body={"state": "Failed"}).execute(num_retries=self.num_retries)
354 if runnerjob and runnerjob.uuid and self.work_api == "containers":
355 self.api.container_requests().update(uuid=runnerjob.uuid,
356 body={"priority": "0"}).execute(num_retries=self.num_retries)
359 self.stop_polling.set()
360 self.polling_thread.join()
362 if self.final_status == "UnsupportedRequirement":
363 raise UnsupportedRequirement("Check log for details.")
365 if self.final_status != "success":
366 raise WorkflowException("Workflow failed.")
368 if self.final_output is None:
369 raise WorkflowException("Workflow did not return a result.")
371 if kwargs.get("submit") and isinstance(runnerjob, Runner):
372 logger.info("Final output collection %s", runnerjob.final_output)
374 if self.output_name is None:
375 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
376 self.make_output_collection(self.output_name, self.final_output)
378 if kwargs.get("compute_checksum"):
379 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
380 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
382 return self.final_output
386 """Print version string of key packages for provenance and debugging."""
388 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
389 arvpkg = pkg_resources.require("arvados-python-client")
390 cwlpkg = pkg_resources.require("cwltool")
392 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
393 "arvados-python-client", arvpkg[0].version,
394 "cwltool", cwlpkg[0].version)
397 def arg_parser(): # type: () -> argparse.ArgumentParser
398 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
400 parser.add_argument("--basedir", type=str,
401 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
402 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
403 help="Output directory, default current directory")
405 parser.add_argument("--eval-timeout",
406 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
409 parser.add_argument("--version", action="store_true", help="Print version and exit")
411 exgroup = parser.add_mutually_exclusive_group()
412 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
413 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
414 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
416 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
418 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
420 exgroup = parser.add_mutually_exclusive_group()
421 exgroup.add_argument("--enable-reuse", action="store_true",
422 default=True, dest="enable_reuse",
424 exgroup.add_argument("--disable-reuse", action="store_false",
425 default=True, dest="enable_reuse",
428 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
429 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
430 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
431 help="Ignore Docker image version when deciding whether to reuse past jobs.",
434 exgroup = parser.add_mutually_exclusive_group()
435 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
436 default=True, dest="submit")
437 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
438 default=True, dest="submit")
439 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
440 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
441 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
443 exgroup = parser.add_mutually_exclusive_group()
444 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
445 default=True, dest="wait")
446 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
447 default=True, dest="wait")
449 parser.add_argument("--api", type=str,
450 default=None, dest="work_api",
451 help="Select work submission API, one of 'jobs' or 'containers'.")
453 parser.add_argument("--compute-checksum", action="store_true", default=False,
454 help="Compute checksum of contents while collecting outputs",
455 dest="compute_checksum")
457 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
458 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
464 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
465 cache["http://arvados.org/cwl"] = res.read()
467 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
468 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
469 for n in extnames.names:
470 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
471 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
472 document_loader.idx["http://arvados.org/cwl#"+n] = {}
474 def main(args, stdout, stderr, api_client=None, keep_client=None):
475 parser = arg_parser()
477 job_order_object = None
478 arvargs = parser.parse_args(args)
479 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
480 job_order_object = ({}, "")
485 if api_client is None:
486 api_client=arvados.api('v1', model=OrderedJsonModel())
487 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
488 except Exception as e:
493 logger.setLevel(logging.DEBUG)
496 logger.setLevel(logging.WARN)
497 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
500 metrics.setLevel(logging.DEBUG)
501 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
503 arvargs.conformance_test = None
504 arvargs.use_container = True
506 return cwltool.main.main(args=arvargs,
509 executor=runner.arv_executor,
510 makeTool=runner.arv_make_tool,
511 versionfunc=versionstring,
512 job_order_object=job_order_object,
513 make_fs_access=partial(CollectionFsAccess, api_client=api_client))