3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
11 import pkg_resources # part of setuptools
13 from cwltool.errors import WorkflowException
15 import cwltool.workflow
20 from .arvcontainer import ArvadosContainer, RunnerContainer
21 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
22 from .arvtool import ArvadosCommandTool
23 from .fsaccess import CollectionFsAccess
25 from cwltool.process import shortname, UnsupportedRequirement
26 from arvados.api import OrderedJsonModel
28 logger = logging.getLogger('arvados.cwl-runner')
29 logger.setLevel(logging.INFO)
31 class ArvCwlRunner(object):
32 """Execute a CWL tool or workflow, submit work (using either jobs or
33 containers API), wait for them to complete, and report output.
37 def __init__(self, api_client, work_api=None):
40 self.lock = threading.Lock()
41 self.cond = threading.Condition(self.lock)
42 self.final_output = None
43 self.final_status = None
47 self.work_api = work_api
49 if self.work_api is None:
50 # todo: autodetect API to use.
51 self.work_api = "jobs"
53 if self.work_api not in ("containers", "jobs"):
54 raise Exception("Unsupported API '%s'" % self.work_api)
56 def arvMakeTool(self, toolpath_object, **kwargs):
57 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
58 return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
60 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
62 def output_callback(self, out, processStatus):
63 if processStatus == "success":
64 logger.info("Overall process status is %s", processStatus)
66 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
67 body={"state": "Complete"}).execute(num_retries=self.num_retries)
69 logger.warn("Overall process status is %s", processStatus)
71 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72 body={"state": "Failed"}).execute(num_retries=self.num_retries)
73 self.final_status = processStatus
74 self.final_output = out
76 def on_message(self, event):
77 if "object_uuid" in event:
78 if event["object_uuid"] in self.processes and event["event_type"] == "update":
79 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
80 uuid = event["object_uuid"]
82 j = self.processes[uuid]
83 logger.info("Job %s (%s) is Running", j.name, uuid)
85 j.update_pipeline_component(event["properties"]["new_attributes"])
86 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
87 uuid = event["object_uuid"]
90 j = self.processes[uuid]
91 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
92 j.done(event["properties"]["new_attributes"])
97 def get_uploaded(self):
98 return self.uploaded.copy()
100 def add_uploaded(self, src, pair):
101 self.uploaded[src] = pair
103 def arvExecutor(self, tool, job_order, **kwargs):
104 self.debug = kwargs.get("debug")
106 if kwargs.get("quiet"):
107 logger.setLevel(logging.WARN)
108 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
110 useruuid = self.api.users().current().execute()["uuid"]
111 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
113 self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
115 if kwargs.get("create_template"):
116 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
118 # cwltool.main will write our return value to stdout.
121 self.debug = kwargs.get("debug")
122 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
124 kwargs["fs_access"] = self.fs_access
125 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
126 kwargs["use_container"] = True
127 kwargs["tmpdir_prefix"] = "tmp"
128 kwargs["on_error"] = "continue"
130 if self.work_api == "containers":
131 kwargs["outdir"] = "/var/spool/cwl"
132 kwargs["docker_outdir"] = "/var/spool/cwl"
133 kwargs["tmpdir"] = "/tmp"
134 elif self.work_api == "jobs":
135 kwargs["outdir"] = "$(task.outdir)"
136 kwargs["docker_outdir"] = "$(task.outdir)"
137 kwargs["tmpdir"] = "$(task.tmpdir)"
140 if kwargs.get("submit"):
141 if self.work_api == "containers":
142 if tool.tool["class"] == "CommandLineTool":
143 runnerjob = tool.job(job_order,
144 self.output_callback,
147 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
149 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
151 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
152 # Create pipeline for local run
153 self.pipeline = self.api.pipeline_instances().create(
155 "owner_uuid": self.project_uuid,
156 "name": shortname(tool.tool["id"]),
158 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
159 logger.info("Pipeline instance %s", self.pipeline["uuid"])
161 if runnerjob and not kwargs.get("wait"):
163 return runnerjob.uuid
165 if self.work_api == "containers":
166 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
167 if self.work_api == "jobs":
168 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
171 jobiter = iter((runnerjob,))
173 if "cwl_runner_job" in kwargs:
174 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
175 jobiter = tool.job(job_order,
176 self.output_callback,
181 # Will continue to hold the lock for the duration of this code
182 # except when in cond.wait(), at which point on_message can update
183 # job state and process output callbacks.
185 for runnable in jobiter:
187 runnable.run(**kwargs)
192 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
195 while self.processes:
199 except UnsupportedRequirement:
202 if sys.exc_info()[0] is KeyboardInterrupt:
203 logger.error("Interrupted, marking pipeline as failed")
205 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
207 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
208 body={"state": "Failed"}).execute(num_retries=self.num_retries)
209 if runnerjob and runnerjob.uuid and self.work_api == "containers":
210 self.api.container_requests().update(uuid=runnerjob.uuid,
211 body={"priority": "0"}).execute(num_retries=self.num_retries)
215 if self.final_status == "UnsupportedRequirement":
216 raise UnsupportedRequirement("Check log for details.")
218 if self.final_status != "success":
219 raise WorkflowException("Workflow failed.")
221 if self.final_output is None:
222 raise WorkflowException("Workflow did not return a result.")
224 return self.final_output
228 """Print version string of key packages for provenance and debugging."""
230 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
231 arvpkg = pkg_resources.require("arvados-python-client")
232 cwlpkg = pkg_resources.require("cwltool")
234 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
235 "arvados-python-client", arvpkg[0].version,
236 "cwltool", cwlpkg[0].version)
239 def arg_parser(): # type: () -> argparse.ArgumentParser
240 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
242 parser.add_argument("--basedir", type=str,
243 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
244 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
245 help="Output directory, default current directory")
247 parser.add_argument("--eval-timeout",
248 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
251 parser.add_argument("--version", action="store_true", help="Print version and exit")
253 exgroup = parser.add_mutually_exclusive_group()
254 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
255 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
256 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
258 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
260 exgroup = parser.add_mutually_exclusive_group()
261 exgroup.add_argument("--enable-reuse", action="store_true",
262 default=True, dest="enable_reuse",
264 exgroup.add_argument("--disable-reuse", action="store_false",
265 default=True, dest="enable_reuse",
268 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
269 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
270 help="Ignore Docker image version when deciding whether to reuse past jobs.",
273 exgroup = parser.add_mutually_exclusive_group()
274 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
275 default=True, dest="submit")
276 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
277 default=True, dest="submit")
278 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
280 exgroup = parser.add_mutually_exclusive_group()
281 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
282 default=True, dest="wait")
283 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
284 default=True, dest="wait")
286 parser.add_argument("--api", type=str,
287 default=None, dest="work_api",
288 help="Select work submission API, one of 'jobs' or 'containers'.")
290 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
291 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
296 def main(args, stdout, stderr, api_client=None):
297 parser = arg_parser()
299 job_order_object = None
300 arvargs = parser.parse_args(args)
301 if arvargs.create_template and not arvargs.job_order:
302 job_order_object = ({}, "")
305 if api_client is None:
306 api_client=arvados.api('v1', model=OrderedJsonModel())
307 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
308 except Exception as e:
312 arvargs.conformance_test = None
313 arvargs.use_container = True
315 return cwltool.main.main(args=arvargs,
318 executor=runner.arvExecutor,
319 makeTool=runner.arvMakeTool,
320 versionfunc=versionstring,
321 job_order_object=job_order_object)