8488: Set certificate path.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import arvados.collection
9 import arvados.util
10 import cwltool.draft2tool
11 import cwltool.workflow
12 import cwltool.main
13 from cwltool.process import shortname
14 import threading
15 import cwltool.docker
16 import fnmatch
17 import logging
18 import re
19 import os
20
21 from cwltool.process import get_feature
22
23 logger = logging.getLogger('arvados.cwl-runner')
24 logger.setLevel(logging.INFO)
25
26 crunchrunner_pdh = "e9b79ec72c692982d59f3a438fb49df2+66"
27 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
28
29 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
30     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
31         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
32
33     sp = dockerRequirement["dockerImageId"].split(":")
34     image_name = sp[0]
35     image_tag = sp[1] if len(sp) > 1 else None
36
37     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
38                                                             image_name=image_name,
39                                                             image_tag=image_tag)
40
41     if not images:
42         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
43         args = [image_name]
44         if image_tag:
45             args.append(image_tag)
46         logger.info("Uploading Docker image %s", ":".join(args))
47         arvados.commands.keepdocker.main(args)
48
49     return dockerRequirement["dockerImageId"]
50
51
52 class CollectionFsAccess(cwltool.process.StdFsAccess):
53     def __init__(self, basedir):
54         self.collections = {}
55         self.basedir = basedir
56
57     def get_collection(self, path):
58         p = path.split("/")
59         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
60             pdh = p[0][5:]
61             if pdh not in self.collections:
62                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
63             return (self.collections[pdh], "/".join(p[1:]))
64         else:
65             return (None, path)
66
67     def _match(self, collection, patternsegments, parent):
68         if not patternsegments:
69             return []
70
71         if not isinstance(collection, arvados.collection.RichCollectionBase):
72             return []
73
74         ret = []
75         # iterate over the files and subcollections in 'collection'
76         for filename in collection:
77             if patternsegments[0] == '.':
78                 # Pattern contains something like "./foo" so just shift
79                 # past the "./"
80                 ret.extend(self._match(collection, patternsegments[1:], parent))
81             elif fnmatch.fnmatch(filename, patternsegments[0]):
82                 cur = os.path.join(parent, filename)
83                 if len(patternsegments) == 1:
84                     ret.append(cur)
85                 else:
86                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
87         return ret
88
89     def glob(self, pattern):
90         collection, rest = self.get_collection(pattern)
91         patternsegments = rest.split("/")
92         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
93
94     def open(self, fn, mode):
95         collection, rest = self.get_collection(fn)
96         if collection:
97             return collection.open(rest, mode)
98         else:
99             return open(self._abs(fn), mode)
100
101     def exists(self, fn):
102         collection, rest = self.get_collection(fn)
103         if collection:
104             return collection.exists(rest)
105         else:
106             return os.path.exists(self._abs(fn))
107
108 class ArvadosJob(object):
109     def __init__(self, runner):
110         self.arvrunner = runner
111         self.running = False
112
113     def run(self, dry_run=False, pull_image=True, **kwargs):
114         script_parameters = {
115             "command": self.command_line
116         }
117         runtime_constraints = {}
118
119         if self.generatefiles:
120             vwd = arvados.collection.Collection()
121             script_parameters["task.vwd"] = {}
122             for t in self.generatefiles:
123                 if isinstance(self.generatefiles[t], dict):
124                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
125                     vwd.copy(rest, t, source_collection=src)
126                 else:
127                     with vwd.open(t, "w") as f:
128                         f.write(self.generatefiles[t])
129             vwd.save_new()
130             for t in self.generatefiles:
131                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
132
133         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
134         if self.environment:
135             script_parameters["task.env"].update(self.environment)
136
137         if self.stdin:
138             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
139
140         if self.stdout:
141             script_parameters["task.stdout"] = self.stdout
142
143         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
144         if docker_req and kwargs.get("use_container") is not False:
145             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
146
147         try:
148             response = self.arvrunner.api.jobs().create(body={
149                 "script": "crunchrunner",
150                 "repository": "arvados",
151                 "script_version": "8488-cwl-crunchrunner-collection", #"master",
152                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
153                 "runtime_constraints": runtime_constraints
154             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
155
156             self.arvrunner.jobs[response["uuid"]] = self
157
158             self.arvrunner.pipeline["components"][self.name] = {"job": response}
159             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
160                                                                                      body={
161                                                                                          "components": self.arvrunner.pipeline["components"]
162                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
163
164             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
165
166             if response["state"] in ("Complete", "Failed", "Cancelled"):
167                 self.done(response)
168         except Exception as e:
169             logger.error("Got error %s" % str(e))
170             self.output_callback({}, "permanentFail")
171
172     def update_pipeline_component(self, record):
173         self.arvrunner.pipeline["components"][self.name] = {"job": record}
174         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
175                                                                                  body={
176                                                                                     "components": self.arvrunner.pipeline["components"]
177                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
178
179     def done(self, record):
180         try:
181             self.update_pipeline_component(record)
182         except:
183             pass
184
185         try:
186             if record["state"] == "Complete":
187                 processStatus = "success"
188             else:
189                 processStatus = "permanentFail"
190
191             try:
192                 outputs = {}
193                 if record["output"]:
194                     outputs = self.collect_outputs("keep:" + record["output"])
195             except Exception as e:
196                 logger.exception("Got exception while collecting job outputs:")
197                 processStatus = "permanentFail"
198
199             self.output_callback(outputs, processStatus)
200         finally:
201             del self.arvrunner.jobs[record["uuid"]]
202
203
204 class ArvPathMapper(cwltool.pathmapper.PathMapper):
205     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
206         self._pathmap = arvrunner.get_uploaded()
207         uploadfiles = []
208
209         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
210
211         for src in referenced_files:
212             if isinstance(src, basestring) and pdh_path.match(src):
213                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
214             if src not in self._pathmap:
215                 ab = cwltool.pathmapper.abspath(src, basedir)
216                 st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
217                 if kwargs.get("conformance_test"):
218                     self._pathmap[src] = (src, ab)
219                 elif isinstance(st, arvados.commands.run.UploadFile):
220                     uploadfiles.append((src, ab, st))
221                 elif isinstance(st, arvados.commands.run.ArvFile):
222                     self._pathmap[src] = (ab, st.fn)
223                 else:
224                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
225
226         if uploadfiles:
227             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
228                                              arvrunner.api,
229                                              dry_run=kwargs.get("dry_run"),
230                                              num_retries=3,
231                                              fnPattern="$(task.keep)/%s/%s")
232
233         for src, ab, st in uploadfiles:
234             arvrunner.add_uploaded(src, (ab, st.fn))
235             self._pathmap[src] = (ab, st.fn)
236
237
238
239 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
240     def __init__(self, arvrunner, toolpath_object, **kwargs):
241         super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs)
242         self.arvrunner = arvrunner
243
244     def makeJobRunner(self):
245         return ArvadosJob(self.arvrunner)
246
247     def makePathMapper(self, reffiles, input_basedir, **kwargs):
248         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
249
250
251 class ArvCwlRunner(object):
252     def __init__(self, api_client):
253         self.api = api_client
254         self.jobs = {}
255         self.lock = threading.Lock()
256         self.cond = threading.Condition(self.lock)
257         self.final_output = None
258         self.uploaded = {}
259         self.num_retries = 4
260
261     def arvMakeTool(self, toolpath_object, **kwargs):
262         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
263             return ArvadosCommandTool(self, toolpath_object, **kwargs)
264         else:
265             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
266
267     def output_callback(self, out, processStatus):
268         if processStatus == "success":
269             logger.info("Overall job status is %s", processStatus)
270             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
271                                                  body={"state": "Complete"}).execute(num_retries=self.num_retries)
272
273         else:
274             logger.warn("Overall job status is %s", processStatus)
275             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
276                                                  body={"state": "Failed"}).execute(num_retries=self.num_retries)
277         self.final_output = out
278
279
280     def on_message(self, event):
281         if "object_uuid" in event:
282                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
283                     if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
284                         uuid = event["object_uuid"]
285                         with self.lock:
286                             j = self.jobs[uuid]
287                             logger.info("Job %s (%s) is Running", j.name, uuid)
288                             j.running = True
289                             j.update_pipeline_component(event["properties"]["new_attributes"])
290                     elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
291                         uuid = event["object_uuid"]
292                         try:
293                             self.cond.acquire()
294                             j = self.jobs[uuid]
295                             logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
296                             j.done(event["properties"]["new_attributes"])
297                             self.cond.notify()
298                         finally:
299                             self.cond.release()
300
301     def get_uploaded(self):
302         return self.uploaded.copy()
303
304     def add_uploaded(self, src, pair):
305         self.uploaded[src] = pair
306
307     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
308         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
309
310         try:
311             self.api.collections().get(uuid=crunchrunner_pdh).execute()
312         except arvados.errors.ApiError as e:
313             import httplib2
314             h = httplib2.Http('ca_certs': arvados.util.ca_certs_path())
315             resp, content = h.request(crunchrunner_download, "GET")
316             with arvados.collection.Collection() as col:
317                 with col.open("crunchrunner", "w") as f:
318                     f.write(content)
319                 col.save_new("crunchrunner binary")
320
321         self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
322                                                                    "components": {},
323                                                                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
324
325         self.fs_access = CollectionFsAccess(input_basedir)
326
327         kwargs["fs_access"] = self.fs_access
328         kwargs["enable_reuse"] = args.enable_reuse
329
330         if kwargs.get("conformance_test"):
331             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
332         else:
333             jobiter = tool.job(job_order,
334                             input_basedir,
335                             self.output_callback,
336                             **kwargs)
337
338             for runnable in jobiter:
339                 if runnable:
340                     with self.lock:
341                         runnable.run(**kwargs)
342                 else:
343                     if self.jobs:
344                         try:
345                             self.cond.acquire()
346                             self.cond.wait()
347                         finally:
348                             self.cond.release()
349                     else:
350                         logger.error("Workflow cannot make any more progress.")
351                         break
352
353             while self.jobs:
354                 try:
355                     self.cond.acquire()
356                     self.cond.wait()
357                 finally:
358                     self.cond.release()
359
360             events.close()
361
362             if self.final_output is None:
363                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
364
365             return self.final_output
366
367
368 def main(args, stdout, stderr, api_client=None):
369     args.insert(0, "--leave-outputs")
370     parser = cwltool.main.arg_parser()
371     exgroup = parser.add_mutually_exclusive_group()
372     exgroup.add_argument("--enable-reuse", action="store_true",
373                         default=False, dest="enable_reuse",
374                         help="")
375     exgroup.add_argument("--disable-reuse", action="store_false",
376                         default=False, dest="enable_reuse",
377                         help="")
378
379     try:
380         runner = ArvCwlRunner(api_client=arvados.api('v1'))
381     except Exception as e:
382         logger.error(e)
383         return 1
384
385     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)