3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
12 from functools import partial
13 import pkg_resources # part of setuptools
15 from cwltool.errors import WorkflowException
17 import cwltool.workflow
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .arvworkflow import ArvadosWorkflow, upload_workflow
27 from .fsaccess import CollectionFsAccess
28 from .perf import Perf
29 from cwltool.pack import pack
31 from cwltool.process import shortname, UnsupportedRequirement
32 from cwltool.pathmapper import adjustFileObjs
33 from cwltool.draft2tool import compute_checksums
34 from arvados.api import OrderedJsonModel
36 logger = logging.getLogger('arvados.cwl-runner')
37 logger.setLevel(logging.INFO)
39 class ArvCwlRunner(object):
40 """Execute a CWL tool or workflow, submit work (using either jobs or
41 containers API), wait for them to complete, and report output.
45 def __init__(self, api_client, work_api=None):
48 self.lock = threading.Lock()
49 self.cond = threading.Condition(self.lock)
50 self.final_output = None
51 self.final_status = None
55 self.work_api = work_api
56 self.stop_polling = threading.Event()
60 if self.work_api is None:
61 # todo: autodetect API to use.
62 self.work_api = "jobs"
64 if self.work_api not in ("containers", "jobs"):
65 raise Exception("Unsupported API '%s'" % self.work_api)
67 def arv_make_tool(self, toolpath_object, **kwargs):
68 kwargs["work_api"] = self.work_api
69 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
70 return ArvadosCommandTool(self, toolpath_object, **kwargs)
71 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
72 return ArvadosWorkflow(self, toolpath_object, **kwargs)
74 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
76 def output_callback(self, out, processStatus):
77 if processStatus == "success":
78 logger.info("Overall process status is %s", processStatus)
80 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
81 body={"state": "Complete"}).execute(num_retries=self.num_retries)
83 logger.warn("Overall process status is %s", processStatus)
85 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
86 body={"state": "Failed"}).execute(num_retries=self.num_retries)
87 self.final_status = processStatus
88 self.final_output = out
90 def on_message(self, event):
91 if "object_uuid" in event:
92 if event["object_uuid"] in self.processes and event["event_type"] == "update":
93 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
94 uuid = event["object_uuid"]
96 j = self.processes[uuid]
97 logger.info("Job %s (%s) is Running", j.name, uuid)
99 j.update_pipeline_component(event["properties"]["new_attributes"])
100 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
101 uuid = event["object_uuid"]
104 j = self.processes[uuid]
105 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
106 with Perf(logger, "done %s" % j.name):
107 j.done(event["properties"]["new_attributes"])
112 def poll_states(self):
113 """Poll status of jobs or containers listed in the processes dict.
115 Runs in a separate thread.
119 self.stop_polling.wait(15)
120 if self.stop_polling.is_set():
123 keys = self.processes.keys()
127 if self.work_api == "containers":
128 table = self.poll_api.containers()
129 elif self.work_api == "jobs":
130 table = self.poll_api.jobs()
133 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
134 except Exception as e:
135 logger.warn("Error checking states on API server: %s", e)
138 for p in proc_states["items"]:
140 "object_uuid": p["uuid"],
141 "event_type": "update",
147 def get_uploaded(self):
148 return self.uploaded.copy()
150 def add_uploaded(self, src, pair):
151 self.uploaded[src] = pair
153 def check_writable(self, obj):
154 if isinstance(obj, dict):
155 if obj.get("writable"):
156 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
157 for v in obj.itervalues():
158 self.check_writable(v)
159 if isinstance(obj, list):
161 self.check_writable(v)
163 def arv_executor(self, tool, job_order, **kwargs):
164 self.debug = kwargs.get("debug")
166 tool.visit(self.check_writable)
168 useruuid = self.api.users().current().execute()["uuid"]
169 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
171 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
172 self.fs_access = make_fs_access(kwargs["basedir"])
174 if kwargs.get("create_template"):
175 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
177 # cwltool.main will write our return value to stdout.
180 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
181 return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
183 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
185 kwargs["make_fs_access"] = make_fs_access
186 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
187 kwargs["use_container"] = True
188 kwargs["tmpdir_prefix"] = "tmp"
189 kwargs["on_error"] = "continue"
190 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
192 if self.work_api == "containers":
193 kwargs["outdir"] = "/var/spool/cwl"
194 kwargs["docker_outdir"] = "/var/spool/cwl"
195 kwargs["tmpdir"] = "/tmp"
196 kwargs["docker_tmpdir"] = "/tmp"
197 elif self.work_api == "jobs":
198 kwargs["outdir"] = "$(task.outdir)"
199 kwargs["docker_outdir"] = "$(task.outdir)"
200 kwargs["tmpdir"] = "$(task.tmpdir)"
203 if kwargs.get("submit"):
204 if self.work_api == "containers":
205 if tool.tool["class"] == "CommandLineTool":
206 runnerjob = tool.job(job_order,
207 self.output_callback,
210 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
212 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
214 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
215 # Create pipeline for local run
216 self.pipeline = self.api.pipeline_instances().create(
218 "owner_uuid": self.project_uuid,
219 "name": shortname(tool.tool["id"]),
221 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
222 logger.info("Pipeline instance %s", self.pipeline["uuid"])
224 if runnerjob and not kwargs.get("wait"):
226 return runnerjob.uuid
228 self.poll_api = arvados.api('v1')
229 self.polling_thread = threading.Thread(target=self.poll_states)
230 self.polling_thread.start()
233 jobiter = iter((runnerjob,))
235 if "cwl_runner_job" in kwargs:
236 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
237 jobiter = tool.job(job_order,
238 self.output_callback,
243 # Will continue to hold the lock for the duration of this code
244 # except when in cond.wait(), at which point on_message can update
245 # job state and process output callbacks.
247 for runnable in jobiter:
249 with Perf(logger, "run"):
250 runnable.run(**kwargs)
255 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
258 while self.processes:
261 except UnsupportedRequirement:
264 if sys.exc_info()[0] is KeyboardInterrupt:
265 logger.error("Interrupted, marking pipeline as failed")
267 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
269 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
270 body={"state": "Failed"}).execute(num_retries=self.num_retries)
271 if runnerjob and runnerjob.uuid and self.work_api == "containers":
272 self.api.container_requests().update(uuid=runnerjob.uuid,
273 body={"priority": "0"}).execute(num_retries=self.num_retries)
276 self.stop_polling.set()
277 self.polling_thread.join()
279 if self.final_status == "UnsupportedRequirement":
280 raise UnsupportedRequirement("Check log for details.")
282 if self.final_status != "success":
283 raise WorkflowException("Workflow failed.")
285 if self.final_output is None:
286 raise WorkflowException("Workflow did not return a result.")
288 if kwargs.get("compute_checksum"):
289 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
291 return self.final_output
295 """Print version string of key packages for provenance and debugging."""
297 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
298 arvpkg = pkg_resources.require("arvados-python-client")
299 cwlpkg = pkg_resources.require("cwltool")
301 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
302 "arvados-python-client", arvpkg[0].version,
303 "cwltool", cwlpkg[0].version)
306 def arg_parser(): # type: () -> argparse.ArgumentParser
307 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
309 parser.add_argument("--basedir", type=str,
310 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
311 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
312 help="Output directory, default current directory")
314 parser.add_argument("--eval-timeout",
315 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
318 parser.add_argument("--version", action="store_true", help="Print version and exit")
320 exgroup = parser.add_mutually_exclusive_group()
321 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
322 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
323 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
325 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
327 exgroup = parser.add_mutually_exclusive_group()
328 exgroup.add_argument("--enable-reuse", action="store_true",
329 default=True, dest="enable_reuse",
331 exgroup.add_argument("--disable-reuse", action="store_false",
332 default=True, dest="enable_reuse",
335 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
336 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
337 help="Ignore Docker image version when deciding whether to reuse past jobs.",
340 exgroup = parser.add_mutually_exclusive_group()
341 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
342 default=True, dest="submit")
343 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
344 default=True, dest="submit")
345 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
346 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
347 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
349 exgroup = parser.add_mutually_exclusive_group()
350 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
351 default=True, dest="wait")
352 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
353 default=True, dest="wait")
355 parser.add_argument("--api", type=str,
356 default=None, dest="work_api",
357 help="Select work submission API, one of 'jobs' or 'containers'.")
359 parser.add_argument("--compute-checksum", action="store_true", default=False,
360 help="Compute checksum of contents while collecting outputs",
361 dest="compute_checksum")
363 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
364 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
370 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
371 cache["https://w3id.org/cwl/arv-cwl-schema.yml"] = res.read()
373 _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
374 _, extnames, _, _ = schema_salad.schema.load_schema("https://w3id.org/cwl/arv-cwl-schema.yml", cache=cache)
375 for n in extnames.names:
376 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
377 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
379 def main(args, stdout, stderr, api_client=None):
380 parser = arg_parser()
382 job_order_object = None
383 arvargs = parser.parse_args(args)
384 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
385 job_order_object = ({}, "")
390 if api_client is None:
391 api_client=arvados.api('v1', model=OrderedJsonModel())
392 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
393 except Exception as e:
398 logger.setLevel(logging.DEBUG)
401 logger.setLevel(logging.WARN)
402 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
404 arvargs.conformance_test = None
405 arvargs.use_container = True
407 return cwltool.main.main(args=arvargs,
410 executor=runner.arv_executor,
411 makeTool=runner.arv_make_tool,
412 versionfunc=versionstring,
413 job_order_object=job_order_object,
414 make_fs_access=partial(CollectionFsAccess, api_client=api_client))