3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
11 import pkg_resources # part of setuptools
13 from cwltool.errors import WorkflowException
15 import cwltool.workflow
20 from .arvcontainer import ArvadosContainer, RunnerContainer
21 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
22 from .arvtool import ArvadosCommandTool
23 from .fsaccess import CollectionFsAccess
25 from cwltool.process import shortname, UnsupportedRequirement
26 from arvados.api import OrderedJsonModel
28 logger = logging.getLogger('arvados.cwl-runner')
29 logger.setLevel(logging.INFO)
31 class ArvCwlRunner(object):
32 """Execute a CWL tool or workflow, submit work (using either jobs or
33 containers API), wait for them to complete, and report output.
37 def __init__(self, api_client, work_api=None):
40 self.lock = threading.Lock()
41 self.cond = threading.Condition(self.lock)
42 self.final_output = None
43 self.final_status = None
47 self.work_api = work_api
49 if self.work_api is None:
50 # todo: autodetect API to use.
51 self.work_api = "jobs"
53 if self.work_api not in ("containers", "jobs"):
54 raise Exception("Unsupported API '%s'" % self.work_api)
56 def arvMakeTool(self, toolpath_object, **kwargs):
57 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
58 return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
60 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
62 def output_callback(self, out, processStatus):
63 if processStatus == "success":
64 logger.info("Overall process status is %s", processStatus)
66 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
67 body={"state": "Complete"}).execute(num_retries=self.num_retries)
69 logger.warn("Overall process status is %s", processStatus)
71 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72 body={"state": "Failed"}).execute(num_retries=self.num_retries)
73 self.final_status = processStatus
74 self.final_output = out
76 def on_message(self, event):
77 if "object_uuid" in event:
78 if event["object_uuid"] in self.processes and event["event_type"] == "update":
79 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
80 uuid = event["object_uuid"]
82 j = self.processes[uuid]
83 logger.info("Job %s (%s) is Running", j.name, uuid)
85 j.update_pipeline_component(event["properties"]["new_attributes"])
86 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
87 uuid = event["object_uuid"]
90 j = self.processes[uuid]
91 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
92 j.done(event["properties"]["new_attributes"])
97 def get_uploaded(self):
98 return self.uploaded.copy()
100 def add_uploaded(self, src, pair):
101 self.uploaded[src] = pair
103 def arvExecutor(self, tool, job_order, **kwargs):
104 self.debug = kwargs.get("debug")
106 if kwargs.get("quiet"):
107 logger.setLevel(logging.WARN)
108 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
110 useruuid = self.api.users().current().execute()["uuid"]
111 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
114 if kwargs.get("create_template"):
115 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
117 # cwltool.main will write our return value to stdout.
120 self.debug = kwargs.get("debug")
121 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
122 self.fs_access = CollectionFsAccess(kwargs["basedir"])
124 kwargs["fs_access"] = self.fs_access
125 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
127 if self.work_api == "containers":
128 kwargs["outdir"] = "/var/spool/cwl"
129 kwargs["tmpdir"] = "/tmp"
130 elif self.work_api == "jobs":
131 kwargs["outdir"] = "$(task.outdir)"
132 kwargs["tmpdir"] = "$(task.tmpdir)"
135 if kwargs.get("submit"):
136 if self.work_api == "containers":
137 if tool.tool["class"] == "CommandLineTool":
138 runnerjob = tool.job(job_order,
139 self.output_callback,
142 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
144 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
146 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
147 # Create pipeline for local run
148 self.pipeline = self.api.pipeline_instances().create(
150 "owner_uuid": self.project_uuid,
151 "name": shortname(tool.tool["id"]),
153 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
154 logger.info("Pipeline instance %s", self.pipeline["uuid"])
156 if runnerjob and not kwargs.get("wait"):
158 return runnerjob.uuid
160 if self.work_api == "containers":
161 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
162 if self.work_api == "jobs":
163 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
166 jobiter = iter((runnerjob,))
168 if "cwl_runner_job" in kwargs:
169 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
170 jobiter = tool.job(job_order,
171 self.output_callback,
172 docker_outdir="$(task.outdir)",
177 # Will continue to hold the lock for the duration of this code
178 # except when in cond.wait(), at which point on_message can update
179 # job state and process output callbacks.
181 for runnable in jobiter:
183 runnable.run(**kwargs)
188 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
191 while self.processes:
195 except UnsupportedRequirement:
198 if sys.exc_info()[0] is KeyboardInterrupt:
199 logger.error("Interrupted, marking pipeline as failed")
201 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
203 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
204 body={"state": "Failed"}).execute(num_retries=self.num_retries)
205 if runnerjob and runnerjob.uuid and self.work_api == "containers":
206 self.api.container_requests().update(uuid=runnerjob.uuid,
207 body={"priority": "0"}).execute(num_retries=self.num_retries)
211 if self.final_status == "UnsupportedRequirement":
212 raise UnsupportedRequirement("Check log for details.")
214 if self.final_output is None:
215 raise WorkflowException("Workflow did not return a result.")
217 return self.final_output
221 """Print version string of key packages for provenance and debugging."""
223 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
224 arvpkg = pkg_resources.require("arvados-python-client")
225 cwlpkg = pkg_resources.require("cwltool")
227 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
228 "arvados-python-client", arvpkg[0].version,
229 "cwltool", cwlpkg[0].version)
232 def arg_parser(): # type: () -> argparse.ArgumentParser
233 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
235 parser.add_argument("--basedir", type=str,
236 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
237 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
238 help="Output directory, default current directory")
240 parser.add_argument("--eval-timeout",
241 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
244 parser.add_argument("--version", action="store_true", help="Print version and exit")
246 exgroup = parser.add_mutually_exclusive_group()
247 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
248 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
249 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
251 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
253 exgroup = parser.add_mutually_exclusive_group()
254 exgroup.add_argument("--enable-reuse", action="store_true",
255 default=True, dest="enable_reuse",
257 exgroup.add_argument("--disable-reuse", action="store_false",
258 default=True, dest="enable_reuse",
261 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
262 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
263 help="Ignore Docker image version when deciding whether to reuse past jobs.",
266 exgroup = parser.add_mutually_exclusive_group()
267 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
268 default=True, dest="submit")
269 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
270 default=True, dest="submit")
271 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
273 exgroup = parser.add_mutually_exclusive_group()
274 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
275 default=True, dest="wait")
276 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
277 default=True, dest="wait")
279 parser.add_argument("--api", type=str,
280 default=None, dest="work_api",
281 help="Select work submission API, one of 'jobs' or 'containers'.")
283 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
284 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
289 def main(args, stdout, stderr, api_client=None):
290 parser = arg_parser()
292 job_order_object = None
293 arvargs = parser.parse_args(args)
294 if arvargs.create_template and not arvargs.job_order:
295 job_order_object = ({}, "")
298 if api_client is None:
299 api_client=arvados.api('v1', model=OrderedJsonModel())
300 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
301 except Exception as e:
305 arvargs.conformance_test = None
306 arvargs.use_container = True
308 return cwltool.main.main(args=arvargs,
311 executor=runner.arvExecutor,
312 makeTool=runner.arvMakeTool,
313 versionfunc=versionstring,
314 job_order_object=job_order_object)