3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
12 from functools import partial
13 import pkg_resources # part of setuptools
15 from cwltool.errors import WorkflowException
17 import cwltool.workflow
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .fsaccess import CollectionFsAccess
28 from cwltool.process import shortname, UnsupportedRequirement
29 from cwltool.pathmapper import adjustFileObjs
30 from cwltool.draft2tool import compute_checksums
31 from arvados.api import OrderedJsonModel
33 logger = logging.getLogger('arvados.cwl-runner')
34 logger.setLevel(logging.INFO)
36 class ArvCwlRunner(object):
37 """Execute a CWL tool or workflow, submit work (using either jobs or
38 containers API), wait for them to complete, and report output.
42 def __init__(self, api_client, work_api=None):
45 self.lock = threading.Lock()
46 self.cond = threading.Condition(self.lock)
47 self.final_output = None
48 self.final_status = None
52 self.work_api = work_api
54 if self.work_api is None:
55 # todo: autodetect API to use.
56 self.work_api = "jobs"
58 if self.work_api not in ("containers", "jobs"):
59 raise Exception("Unsupported API '%s'" % self.work_api)
61 def arvMakeTool(self, toolpath_object, **kwargs):
62 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
63 return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
65 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
67 def output_callback(self, out, processStatus):
68 if processStatus == "success":
69 logger.info("Overall process status is %s", processStatus)
71 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72 body={"state": "Complete"}).execute(num_retries=self.num_retries)
74 logger.warn("Overall process status is %s", processStatus)
76 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
77 body={"state": "Failed"}).execute(num_retries=self.num_retries)
78 self.final_status = processStatus
79 self.final_output = out
81 def on_message(self, event):
82 if "object_uuid" in event:
83 if event["object_uuid"] in self.processes and event["event_type"] == "update":
84 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
85 uuid = event["object_uuid"]
87 j = self.processes[uuid]
88 logger.info("Job %s (%s) is Running", j.name, uuid)
90 j.update_pipeline_component(event["properties"]["new_attributes"])
91 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
92 uuid = event["object_uuid"]
95 j = self.processes[uuid]
96 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
97 j.done(event["properties"]["new_attributes"])
102 def get_uploaded(self):
103 return self.uploaded.copy()
105 def add_uploaded(self, src, pair):
106 self.uploaded[src] = pair
108 def check_writable(self, obj):
109 if isinstance(obj, dict):
110 if obj.get("writable"):
111 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
112 for v in obj.itervalues():
113 self.check_writable(v)
114 if isinstance(obj, list):
116 self.check_writable(v)
118 def arvExecutor(self, tool, job_order, **kwargs):
119 self.debug = kwargs.get("debug")
121 tool.visit(self.check_writable)
123 if kwargs.get("quiet"):
124 logger.setLevel(logging.WARN)
125 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
127 useruuid = self.api.users().current().execute()["uuid"]
128 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
130 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
131 self.fs_access = make_fs_access(kwargs["basedir"])
133 if kwargs.get("create_template"):
134 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
136 # cwltool.main will write our return value to stdout.
139 self.debug = kwargs.get("debug")
140 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
142 kwargs["make_fs_access"] = make_fs_access
143 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
144 kwargs["use_container"] = True
145 kwargs["tmpdir_prefix"] = "tmp"
146 kwargs["on_error"] = "continue"
147 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
149 if self.work_api == "containers":
150 kwargs["outdir"] = "/var/spool/cwl"
151 kwargs["docker_outdir"] = "/var/spool/cwl"
152 kwargs["tmpdir"] = "/tmp"
153 elif self.work_api == "jobs":
154 kwargs["outdir"] = "$(task.outdir)"
155 kwargs["docker_outdir"] = "$(task.outdir)"
156 kwargs["tmpdir"] = "$(task.tmpdir)"
159 if kwargs.get("submit"):
160 if self.work_api == "containers":
161 if tool.tool["class"] == "CommandLineTool":
162 runnerjob = tool.job(job_order,
163 self.output_callback,
166 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
168 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
170 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
171 # Create pipeline for local run
172 self.pipeline = self.api.pipeline_instances().create(
174 "owner_uuid": self.project_uuid,
175 "name": shortname(tool.tool["id"]),
177 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
178 logger.info("Pipeline instance %s", self.pipeline["uuid"])
180 if runnerjob and not kwargs.get("wait"):
182 return runnerjob.uuid
184 arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
186 if self.work_api == "containers":
187 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
188 if self.work_api == "jobs":
189 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
192 jobiter = iter((runnerjob,))
194 if "cwl_runner_job" in kwargs:
195 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
196 jobiter = tool.job(job_order,
197 self.output_callback,
202 # Will continue to hold the lock for the duration of this code
203 # except when in cond.wait(), at which point on_message can update
204 # job state and process output callbacks.
206 for runnable in jobiter:
208 runnable.run(**kwargs)
213 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
216 while self.processes:
220 except UnsupportedRequirement:
223 if sys.exc_info()[0] is KeyboardInterrupt:
224 logger.error("Interrupted, marking pipeline as failed")
226 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
228 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
229 body={"state": "Failed"}).execute(num_retries=self.num_retries)
230 if runnerjob and runnerjob.uuid and self.work_api == "containers":
231 self.api.container_requests().update(uuid=runnerjob.uuid,
232 body={"priority": "0"}).execute(num_retries=self.num_retries)
236 if self.final_status == "UnsupportedRequirement":
237 raise UnsupportedRequirement("Check log for details.")
239 if self.final_status != "success":
240 raise WorkflowException("Workflow failed.")
242 if self.final_output is None:
243 raise WorkflowException("Workflow did not return a result.")
245 if kwargs.get("compute_checksum"):
246 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
248 return self.final_output
252 """Print version string of key packages for provenance and debugging."""
254 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
255 arvpkg = pkg_resources.require("arvados-python-client")
256 cwlpkg = pkg_resources.require("cwltool")
258 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
259 "arvados-python-client", arvpkg[0].version,
260 "cwltool", cwlpkg[0].version)
263 def arg_parser(): # type: () -> argparse.ArgumentParser
264 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
266 parser.add_argument("--basedir", type=str,
267 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
268 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
269 help="Output directory, default current directory")
271 parser.add_argument("--eval-timeout",
272 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
275 parser.add_argument("--version", action="store_true", help="Print version and exit")
277 exgroup = parser.add_mutually_exclusive_group()
278 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
279 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
280 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
282 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
284 exgroup = parser.add_mutually_exclusive_group()
285 exgroup.add_argument("--enable-reuse", action="store_true",
286 default=True, dest="enable_reuse",
288 exgroup.add_argument("--disable-reuse", action="store_false",
289 default=True, dest="enable_reuse",
292 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
293 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
294 help="Ignore Docker image version when deciding whether to reuse past jobs.",
297 exgroup = parser.add_mutually_exclusive_group()
298 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
299 default=True, dest="submit")
300 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
301 default=True, dest="submit")
302 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
304 exgroup = parser.add_mutually_exclusive_group()
305 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
306 default=True, dest="wait")
307 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
308 default=True, dest="wait")
310 parser.add_argument("--api", type=str,
311 default=None, dest="work_api",
312 help="Select work submission API, one of 'jobs' or 'containers'.")
314 parser.add_argument("--compute-checksum", action="store_true", default=False,
315 help="Compute checksum of contents while collecting outputs",
316 dest="compute_checksum")
318 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
319 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
324 def main(args, stdout, stderr, api_client=None):
325 parser = arg_parser()
327 job_order_object = None
328 arvargs = parser.parse_args(args)
329 if arvargs.create_template and not arvargs.job_order:
330 job_order_object = ({}, "")
333 if api_client is None:
334 api_client=arvados.api('v1', model=OrderedJsonModel())
335 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
336 except Exception as e:
340 arvargs.conformance_test = None
341 arvargs.use_container = True
343 return cwltool.main.main(args=arvargs,
346 executor=runner.arvExecutor,
347 makeTool=runner.arvMakeTool,
348 versionfunc=versionstring,
349 job_order_object=job_order_object,
350 make_fs_access=partial(CollectionFsAccess, api_client=api_client))