3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
12 from functools import partial
13 import pkg_resources # part of setuptools
15 from cwltool.errors import WorkflowException
17 import cwltool.workflow
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .fsaccess import CollectionFsAccess
28 from cwltool.process import shortname, UnsupportedRequirement
29 from cwltool.pathmapper import adjustFileObjs
30 from cwltool.draft2tool import compute_checksums
31 from arvados.api import OrderedJsonModel
33 logger = logging.getLogger('arvados.cwl-runner')
34 logger.setLevel(logging.INFO)
36 class ArvCwlRunner(object):
37 """Execute a CWL tool or workflow, submit work (using either jobs or
38 containers API), wait for them to complete, and report output.
42 def __init__(self, api_client, work_api=None):
45 self.lock = threading.Lock()
46 self.cond = threading.Condition(self.lock)
47 self.final_output = None
48 self.final_status = None
52 self.work_api = work_api
54 if self.work_api is None:
55 # todo: autodetect API to use.
56 self.work_api = "jobs"
58 if self.work_api not in ("containers", "jobs"):
59 raise Exception("Unsupported API '%s'" % self.work_api)
61 def arvMakeTool(self, toolpath_object, **kwargs):
62 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
63 return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
65 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
67 def output_callback(self, out, processStatus):
68 if processStatus == "success":
69 logger.info("Overall process status is %s", processStatus)
71 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72 body={"state": "Complete"}).execute(num_retries=self.num_retries)
74 logger.warn("Overall process status is %s", processStatus)
76 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
77 body={"state": "Failed"}).execute(num_retries=self.num_retries)
78 self.final_status = processStatus
79 self.final_output = out
81 def on_message(self, event):
82 if "object_uuid" in event:
83 if event["object_uuid"] in self.processes and event["event_type"] == "update":
84 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
85 uuid = event["object_uuid"]
87 j = self.processes[uuid]
88 logger.info("Job %s (%s) is Running", j.name, uuid)
90 j.update_pipeline_component(event["properties"]["new_attributes"])
91 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
92 uuid = event["object_uuid"]
95 j = self.processes[uuid]
96 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
97 j.done(event["properties"]["new_attributes"])
102 def get_uploaded(self):
103 return self.uploaded.copy()
105 def add_uploaded(self, src, pair):
106 self.uploaded[src] = pair
108 def arvExecutor(self, tool, job_order, **kwargs):
109 self.debug = kwargs.get("debug")
111 if kwargs.get("quiet"):
112 logger.setLevel(logging.WARN)
113 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
115 useruuid = self.api.users().current().execute()["uuid"]
116 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
118 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
119 self.fs_access = make_fs_access(kwargs["basedir"])
121 if kwargs.get("create_template"):
122 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
124 # cwltool.main will write our return value to stdout.
127 self.debug = kwargs.get("debug")
128 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
130 kwargs["make_fs_access"] = make_fs_access
131 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
132 kwargs["use_container"] = True
133 kwargs["tmpdir_prefix"] = "tmp"
134 kwargs["on_error"] = "continue"
135 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
137 if self.work_api == "containers":
138 kwargs["outdir"] = "/var/spool/cwl"
139 kwargs["docker_outdir"] = "/var/spool/cwl"
140 kwargs["tmpdir"] = "/tmp"
141 elif self.work_api == "jobs":
142 kwargs["outdir"] = "$(task.outdir)"
143 kwargs["docker_outdir"] = "$(task.outdir)"
144 kwargs["tmpdir"] = "$(task.tmpdir)"
147 if kwargs.get("submit"):
148 if self.work_api == "containers":
149 if tool.tool["class"] == "CommandLineTool":
150 runnerjob = tool.job(job_order,
151 self.output_callback,
154 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
156 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
158 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
159 # Create pipeline for local run
160 self.pipeline = self.api.pipeline_instances().create(
162 "owner_uuid": self.project_uuid,
163 "name": shortname(tool.tool["id"]),
165 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
166 logger.info("Pipeline instance %s", self.pipeline["uuid"])
168 if runnerjob and not kwargs.get("wait"):
170 return runnerjob.uuid
172 arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
174 if self.work_api == "containers":
175 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
176 if self.work_api == "jobs":
177 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
180 jobiter = iter((runnerjob,))
182 if "cwl_runner_job" in kwargs:
183 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
184 jobiter = tool.job(job_order,
185 self.output_callback,
190 # Will continue to hold the lock for the duration of this code
191 # except when in cond.wait(), at which point on_message can update
192 # job state and process output callbacks.
194 for runnable in jobiter:
196 runnable.run(**kwargs)
201 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
204 while self.processes:
208 except UnsupportedRequirement:
211 if sys.exc_info()[0] is KeyboardInterrupt:
212 logger.error("Interrupted, marking pipeline as failed")
214 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
216 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
217 body={"state": "Failed"}).execute(num_retries=self.num_retries)
218 if runnerjob and runnerjob.uuid and self.work_api == "containers":
219 self.api.container_requests().update(uuid=runnerjob.uuid,
220 body={"priority": "0"}).execute(num_retries=self.num_retries)
224 if self.final_status == "UnsupportedRequirement":
225 raise UnsupportedRequirement("Check log for details.")
227 if self.final_status != "success":
228 raise WorkflowException("Workflow failed.")
230 if self.final_output is None:
231 raise WorkflowException("Workflow did not return a result.")
233 if kwargs.get("compute_checksum"):
234 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
236 return self.final_output
240 """Print version string of key packages for provenance and debugging."""
242 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
243 arvpkg = pkg_resources.require("arvados-python-client")
244 cwlpkg = pkg_resources.require("cwltool")
246 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
247 "arvados-python-client", arvpkg[0].version,
248 "cwltool", cwlpkg[0].version)
251 def arg_parser(): # type: () -> argparse.ArgumentParser
252 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
254 parser.add_argument("--basedir", type=str,
255 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
256 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
257 help="Output directory, default current directory")
259 parser.add_argument("--eval-timeout",
260 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
263 parser.add_argument("--version", action="store_true", help="Print version and exit")
265 exgroup = parser.add_mutually_exclusive_group()
266 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
267 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
268 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
270 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
272 exgroup = parser.add_mutually_exclusive_group()
273 exgroup.add_argument("--enable-reuse", action="store_true",
274 default=True, dest="enable_reuse",
276 exgroup.add_argument("--disable-reuse", action="store_false",
277 default=True, dest="enable_reuse",
280 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
281 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
282 help="Ignore Docker image version when deciding whether to reuse past jobs.",
285 exgroup = parser.add_mutually_exclusive_group()
286 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
287 default=True, dest="submit")
288 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
289 default=True, dest="submit")
290 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
292 exgroup = parser.add_mutually_exclusive_group()
293 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
294 default=True, dest="wait")
295 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
296 default=True, dest="wait")
298 parser.add_argument("--api", type=str,
299 default=None, dest="work_api",
300 help="Select work submission API, one of 'jobs' or 'containers'.")
302 parser.add_argument("--compute-checksum", action="store_true", default=False,
303 help="Compute checksum of contents while collecting outputs",
304 dest="compute_checksum")
306 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
307 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
312 def main(args, stdout, stderr, api_client=None):
313 parser = arg_parser()
315 job_order_object = None
316 arvargs = parser.parse_args(args)
317 if arvargs.create_template and not arvargs.job_order:
318 job_order_object = ({}, "")
321 if api_client is None:
322 api_client=arvados.api('v1', model=OrderedJsonModel())
323 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
324 except Exception as e:
328 arvargs.conformance_test = None
329 arvargs.use_container = True
331 return cwltool.main.main(args=arvargs,
334 executor=runner.arvExecutor,
335 makeTool=runner.arvMakeTool,
336 versionfunc=versionstring,
337 job_order_object=job_order_object,
338 make_fs_access=partial(CollectionFsAccess, api_client=api_client))