3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
10 import pkg_resources # part of setuptools
12 from cwltool.errors import WorkflowException
14 import cwltool.workflow
19 from .arvcontainer import ArvadosContainer, RunnerContainer
20 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
21 from .arvtool import ArvadosCommandTool
23 from cwltool.process import shortname, UnsupportedRequirement
24 from arvados.api import OrderedJsonModel
26 logger = logging.getLogger('arvados.cwl-runner')
27 logger.setLevel(logging.INFO)
29 class ArvCwlRunner(object):
30 """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
31 complete, and report output."""
33 def __init__(self, api_client, crunch2):
36 self.lock = threading.Lock()
37 self.cond = threading.Condition(self.lock)
38 self.final_output = None
42 self.crunch2 = crunch2
44 def arvMakeTool(self, toolpath_object, **kwargs):
45 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
46 return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
48 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
50 def output_callback(self, out, processStatus):
51 if processStatus == "success":
52 logger.info("Overall job status is %s", processStatus)
54 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
55 body={"state": "Complete"}).execute(num_retries=self.num_retries)
58 logger.warn("Overall job status is %s", processStatus)
60 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
61 body={"state": "Failed"}).execute(num_retries=self.num_retries)
62 self.final_output = out
64 def on_message(self, event):
65 if "object_uuid" in event:
66 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
67 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
68 uuid = event["object_uuid"]
71 logger.info("Job %s (%s) is Running", j.name, uuid)
73 j.update_pipeline_component(event["properties"]["new_attributes"])
74 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
75 uuid = event["object_uuid"]
79 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
80 j.done(event["properties"]["new_attributes"])
85 def get_uploaded(self):
86 return self.uploaded.copy()
88 def add_uploaded(self, src, pair):
89 self.uploaded[src] = pair
91 def arvExecutor(self, tool, job_order, **kwargs):
92 self.debug = kwargs.get("debug")
94 if kwargs.get("quiet"):
95 logger.setLevel(logging.WARN)
96 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
98 useruuid = self.api.users().current().execute()["uuid"]
99 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
102 if kwargs.get("create_template"):
103 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
105 # cwltool.main will write our return value to stdout.
108 if kwargs.get("submit"):
110 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
112 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
114 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
115 # Create pipeline for local run
116 self.pipeline = self.api.pipeline_instances().create(
118 "owner_uuid": self.project_uuid,
119 "name": shortname(tool.tool["id"]),
121 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
122 logger.info("Pipeline instance %s", self.pipeline["uuid"])
124 if kwargs.get("submit") and not kwargs.get("wait"):
126 return runnerjob.uuid
129 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
131 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
133 self.debug = kwargs.get("debug")
134 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
135 self.fs_access = CollectionFsAccess(kwargs["basedir"])
137 kwargs["fs_access"] = self.fs_access
138 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
141 kwargs["outdir"] = "/var/spool/cwl"
142 kwargs["tmpdir"] = "/tmp"
144 kwargs["outdir"] = "$(task.outdir)"
145 kwargs["tmpdir"] = "$(task.tmpdir)"
147 if kwargs.get("submit"):
148 jobiter = iter((runnerjob,))
150 if "cwl_runner_job" in kwargs:
151 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
152 jobiter = tool.job(job_order,
153 self.output_callback,
154 docker_outdir="$(task.outdir)",
159 # Will continue to hold the lock for the duration of this code
160 # except when in cond.wait(), at which point on_message can update
161 # job state and process output callbacks.
163 for runnable in jobiter:
165 runnable.run(**kwargs)
170 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
177 except UnsupportedRequirement:
180 if sys.exc_info()[0] is KeyboardInterrupt:
181 logger.error("Interrupted, marking pipeline as failed")
183 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
185 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
186 body={"state": "Failed"}).execute(num_retries=self.num_retries)
190 if self.final_output is None:
191 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
193 return self.final_output
197 """Print version string of key packages for provenance and debugging."""
199 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
200 arvpkg = pkg_resources.require("arvados-python-client")
201 cwlpkg = pkg_resources.require("cwltool")
203 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
204 "arvados-python-client", arvpkg[0].version,
205 "cwltool", cwlpkg[0].version)
208 def arg_parser(): # type: () -> argparse.ArgumentParser
209 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
211 parser.add_argument("--conformance-test", action="store_true")
212 parser.add_argument("--basedir", type=str,
213 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
214 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
215 help="Output directory, default current directory")
217 parser.add_argument("--eval-timeout",
218 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
221 parser.add_argument("--version", action="store_true", help="Print version and exit")
223 exgroup = parser.add_mutually_exclusive_group()
224 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
225 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
226 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
228 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
230 exgroup = parser.add_mutually_exclusive_group()
231 exgroup.add_argument("--enable-reuse", action="store_true",
232 default=True, dest="enable_reuse",
234 exgroup.add_argument("--disable-reuse", action="store_false",
235 default=True, dest="enable_reuse",
238 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
239 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
240 help="Ignore Docker image version when deciding whether to reuse past jobs.",
243 exgroup = parser.add_mutually_exclusive_group()
244 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
245 default=True, dest="submit")
246 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
247 default=True, dest="submit")
248 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
250 exgroup = parser.add_mutually_exclusive_group()
251 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
252 default=True, dest="wait")
253 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
254 default=True, dest="wait")
256 exgroup = parser.add_mutually_exclusive_group()
257 exgroup.add_argument("--crunch1", action="store_false",
258 default=False, dest="crunch2",
259 help="Use Crunch v1 Jobs API")
261 exgroup.add_argument("--crunch2", action="store_true",
262 default=False, dest="crunch2",
263 help="Use Crunch v2 Containers API")
265 parser.add_argument("workflow", type=str, nargs="?", default=None)
266 parser.add_argument("job_order", nargs=argparse.REMAINDER)
271 def main(args, stdout, stderr, api_client=None):
272 parser = arg_parser()
274 job_order_object = None
275 arvargs = parser.parse_args(args)
276 if arvargs.create_template and not arvargs.job_order:
277 job_order_object = ({}, "")
280 if api_client is None:
281 api_client=arvados.api('v1', model=OrderedJsonModel())
282 runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
283 except Exception as e:
287 return cwltool.main.main(args=arvargs,
290 executor=runner.arvExecutor,
291 makeTool=runner.arvMakeTool,
292 versionfunc=versionstring,
293 job_order_object=job_order_object)