3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
10 import pkg_resources # part of setuptools
12 from cwltool.errors import WorkflowException
14 import cwltool.workflow
19 from .arvcontainer import ArvadosContainer, RunnerContainer
20 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
21 from .arvtool import ArvadosCommandTool
22 from .fsaccess import CollectionFsAccess
24 from cwltool.process import shortname, UnsupportedRequirement
25 from arvados.api import OrderedJsonModel
27 logger = logging.getLogger('arvados.cwl-runner')
28 logger.setLevel(logging.INFO)
30 class ArvCwlRunner(object):
31 """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
32 complete, and report output."""
34 def __init__(self, api_client, crunch2):
37 self.lock = threading.Lock()
38 self.cond = threading.Condition(self.lock)
39 self.final_output = None
43 self.crunch2 = crunch2
45 def arvMakeTool(self, toolpath_object, **kwargs):
46 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
47 return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
49 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
51 def output_callback(self, out, processStatus):
52 if processStatus == "success":
53 logger.info("Overall job status is %s", processStatus)
55 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
56 body={"state": "Complete"}).execute(num_retries=self.num_retries)
59 logger.warn("Overall job status is %s", processStatus)
61 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
62 body={"state": "Failed"}).execute(num_retries=self.num_retries)
63 self.final_output = out
65 def on_message(self, event):
66 if "object_uuid" in event:
67 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
68 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
69 uuid = event["object_uuid"]
72 logger.info("Job %s (%s) is Running", j.name, uuid)
74 j.update_pipeline_component(event["properties"]["new_attributes"])
75 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
76 uuid = event["object_uuid"]
80 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
81 j.done(event["properties"]["new_attributes"])
86 def get_uploaded(self):
87 return self.uploaded.copy()
89 def add_uploaded(self, src, pair):
90 self.uploaded[src] = pair
92 def arvExecutor(self, tool, job_order, **kwargs):
93 self.debug = kwargs.get("debug")
95 if kwargs.get("quiet"):
96 logger.setLevel(logging.WARN)
97 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
99 useruuid = self.api.users().current().execute()["uuid"]
100 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
103 if kwargs.get("create_template"):
104 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
106 # cwltool.main will write our return value to stdout.
109 if kwargs.get("submit"):
111 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
113 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
115 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
116 # Create pipeline for local run
117 self.pipeline = self.api.pipeline_instances().create(
119 "owner_uuid": self.project_uuid,
120 "name": shortname(tool.tool["id"]),
122 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
123 logger.info("Pipeline instance %s", self.pipeline["uuid"])
125 if kwargs.get("submit") and not kwargs.get("wait"):
127 return runnerjob.uuid
130 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
132 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
134 self.debug = kwargs.get("debug")
135 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
136 self.fs_access = CollectionFsAccess(kwargs["basedir"])
138 kwargs["fs_access"] = self.fs_access
139 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
142 kwargs["outdir"] = "/var/spool/cwl"
143 kwargs["tmpdir"] = "/tmp"
145 kwargs["outdir"] = "$(task.outdir)"
146 kwargs["tmpdir"] = "$(task.tmpdir)"
148 if kwargs.get("submit"):
149 jobiter = iter((runnerjob,))
151 if "cwl_runner_job" in kwargs:
152 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
153 jobiter = tool.job(job_order,
154 self.output_callback,
155 docker_outdir="$(task.outdir)",
160 # Will continue to hold the lock for the duration of this code
161 # except when in cond.wait(), at which point on_message can update
162 # job state and process output callbacks.
164 for runnable in jobiter:
166 runnable.run(**kwargs)
171 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
178 except UnsupportedRequirement:
181 if sys.exc_info()[0] is KeyboardInterrupt:
182 logger.error("Interrupted, marking pipeline as failed")
184 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
186 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
187 body={"state": "Failed"}).execute(num_retries=self.num_retries)
188 if runnerjob and self.crunch2:
189 self.api.container_requests().update(uuid=runnerjob.uuid,
190 body={"priority": "0"}).execute(num_retries=self.num_retries)
194 if self.final_output is None:
195 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
197 return self.final_output
201 """Print version string of key packages for provenance and debugging."""
203 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
204 arvpkg = pkg_resources.require("arvados-python-client")
205 cwlpkg = pkg_resources.require("cwltool")
207 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
208 "arvados-python-client", arvpkg[0].version,
209 "cwltool", cwlpkg[0].version)
212 def arg_parser(): # type: () -> argparse.ArgumentParser
213 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
215 parser.add_argument("--conformance-test", action="store_true")
216 parser.add_argument("--basedir", type=str,
217 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
218 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
219 help="Output directory, default current directory")
221 parser.add_argument("--eval-timeout",
222 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
225 parser.add_argument("--version", action="store_true", help="Print version and exit")
227 exgroup = parser.add_mutually_exclusive_group()
228 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
229 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
230 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
232 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
234 exgroup = parser.add_mutually_exclusive_group()
235 exgroup.add_argument("--enable-reuse", action="store_true",
236 default=True, dest="enable_reuse",
238 exgroup.add_argument("--disable-reuse", action="store_false",
239 default=True, dest="enable_reuse",
242 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
243 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
244 help="Ignore Docker image version when deciding whether to reuse past jobs.",
247 exgroup = parser.add_mutually_exclusive_group()
248 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
249 default=True, dest="submit")
250 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
251 default=True, dest="submit")
252 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
254 exgroup = parser.add_mutually_exclusive_group()
255 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
256 default=True, dest="wait")
257 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
258 default=True, dest="wait")
260 exgroup = parser.add_mutually_exclusive_group()
261 exgroup.add_argument("--crunch1", action="store_false",
262 default=False, dest="crunch2",
263 help="Use Crunch v1 Jobs API")
265 exgroup.add_argument("--crunch2", action="store_true",
266 default=False, dest="crunch2",
267 help="Use Crunch v2 Containers API")
269 parser.add_argument("workflow", type=str, nargs="?", default=None)
270 parser.add_argument("job_order", nargs=argparse.REMAINDER)
275 def main(args, stdout, stderr, api_client=None):
276 parser = arg_parser()
278 job_order_object = None
279 arvargs = parser.parse_args(args)
280 if arvargs.create_template and not arvargs.job_order:
281 job_order_object = ({}, "")
284 if api_client is None:
285 api_client=arvados.api('v1', model=OrderedJsonModel())
286 runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
287 except Exception as e:
291 return cwltool.main.main(args=arvargs,
294 executor=runner.arvExecutor,
295 makeTool=runner.arvMakeTool,
296 versionfunc=versionstring,
297 job_order_object=job_order_object)