7593: Add version hint to arvados-python-client. Add get_uploaded() and
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import cwltool.draft2tool
9 import cwltool.workflow
10 import cwltool.main
11 import threading
12 import cwltool.docker
13 import fnmatch
14 import logging
15 import re
16 import os
17
18 from cwltool.process import get_feature
19
20 logger = logging.getLogger('arvados.cwl-runner')
21 logger.setLevel(logging.INFO)
22
23 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
24     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
25         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
26
27     sp = dockerRequirement["dockerImageId"].split(":")
28     image_name = sp[0]
29     image_tag = sp[1] if len(sp) > 1 else None
30
31     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
32                                                             image_name=image_name,
33                                                             image_tag=image_tag)
34
35     if not images:
36         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
37         args = [image_name]
38         if image_tag:
39             args.append(image_tag)
40         arvados.commands.keepdocker.main(args)
41
42     return dockerRequirement["dockerImageId"]
43
44
45 class CollectionFsAccess(cwltool.process.StdFsAccess):
46     def __init__(self, basedir):
47         self.collections = {}
48         self.basedir = basedir
49
50     def get_collection(self, path):
51         p = path.split("/")
52         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
53             pdh = p[0][5:]
54             if pdh not in self.collections:
55                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
56             return (self.collections[pdh], "/".join(p[1:]))
57         else:
58             return (None, path)
59
60     def _match(self, collection, patternsegments, parent):
61         if not patternsegments:
62             return []
63
64         if not isinstance(collection, arvados.collection.RichCollectionBase):
65             return []
66
67         ret = []
68         # iterate over the files and subcollections in 'collection'
69         for filename in collection:
70             if patternsegments[0] == '.':
71                 # Pattern contains something like "./foo" so just shift
72                 # past the "./"
73                 ret.extend(self._match(collection, patternsegments[1:], parent))
74             elif fnmatch.fnmatch(filename, patternsegments[0]):
75                 cur = os.path.join(parent, filename)
76                 if len(patternsegments) == 1:
77                     ret.append(cur)
78                 else:
79                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
80         return ret
81
82     def glob(self, pattern):
83         collection, rest = self.get_collection(pattern)
84         patternsegments = rest.split("/")
85         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
86
87     def open(self, fn, mode):
88         collection, rest = self.get_collection(fn)
89         if collection:
90             return collection.open(rest, mode)
91         else:
92             return open(self._abs(fn), mode)
93
94     def exists(self, fn):
95         collection, rest = self.get_collection(fn)
96         if collection:
97             return collection.exists(rest)
98         else:
99             return os.path.exists(self._abs(fn))
100
101 class ArvadosJob(object):
102     def __init__(self, runner):
103         self.arvrunner = runner
104         self.running = False
105
106     def run(self, dry_run=False, pull_image=True, **kwargs):
107         script_parameters = {
108             "command": self.command_line
109         }
110         runtime_constraints = {}
111
112         if self.generatefiles:
113             vwd = arvados.collection.Collection()
114             script_parameters["task.vwd"] = {}
115             for t in self.generatefiles:
116                 if isinstance(self.generatefiles[t], dict):
117                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
118                     vwd.copy(rest, t, source_collection=src)
119                 else:
120                     with vwd.open(t, "w") as f:
121                         f.write(self.generatefiles[t])
122             vwd.save_new()
123             for t in self.generatefiles:
124                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
125
126         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
127         if self.environment:
128             script_parameters["task.env"].update(self.environment)
129
130         if self.stdin:
131             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
132
133         if self.stdout:
134             script_parameters["task.stdout"] = self.stdout
135
136         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
137         if docker_req and kwargs.get("use_container") is not False:
138             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
139
140         try:
141             response = self.arvrunner.api.jobs().create(body={
142                 "script": "crunchrunner",
143                 "repository": kwargs["repository"],
144                 "script_version": "master",
145                 "script_parameters": {"tasks": [script_parameters]},
146                 "runtime_constraints": runtime_constraints
147             }, find_or_create=kwargs.get("enable_reuse", True)).execute()
148
149             self.arvrunner.jobs[response["uuid"]] = self
150
151             logger.info("Job %s is %s", response["uuid"], response["state"])
152
153             if response["state"] in ("Complete", "Failed", "Cancelled"):
154                 self.done(response)
155         except Exception as e:
156             logger.error("Got error %s" % str(e))
157             self.output_callback({}, "permanentFail")
158
159
160     def done(self, record):
161         try:
162             if record["state"] == "Complete":
163                 processStatus = "success"
164             else:
165                 processStatus = "permanentFail"
166
167             try:
168                 outputs = {}
169                 outputs = self.collect_outputs("keep:" + record["output"])
170             except Exception as e:
171                 logger.exception("Got exception while collecting job outputs:")
172                 processStatus = "permanentFail"
173
174             self.output_callback(outputs, processStatus)
175         finally:
176             del self.arvrunner.jobs[record["uuid"]]
177
178
179 class ArvPathMapper(cwltool.pathmapper.PathMapper):
180     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
181         self._pathmap = arvrunner.get_uploaded()
182         uploadfiles = []
183
184         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
185
186         for src in referenced_files:
187             if isinstance(src, basestring) and pdh_path.match(src):
188                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
189             if src not in self._pathmap:
190                 ab = cwltool.pathmapper.abspath(src, basedir)
191                 st = arvados.commands.run.statfile("", ab)
192                 if kwargs.get("conformance_test"):
193                     self._pathmap[src] = (src, ab)
194                 elif isinstance(st, arvados.commands.run.UploadFile):
195                     uploadfiles.append((src, ab, st))
196                 elif isinstance(st, arvados.commands.run.ArvFile):
197                     self._pathmap[src] = (ab, st.fn)
198                 else:
199                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
200
201         if uploadfiles:
202             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
203                                              arvrunner.api,
204                                              dry_run=kwargs.get("dry_run"),
205                                              num_retries=3,
206                                              fnPattern="$(task.keep)/%s/%s")
207
208         for src, ab, st in uploadfiles:
209             arvrunner.add_uploaded(src, (ab, st.fn))
210             self._pathmap[src] = (ab, st.fn)
211
212
213
214 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
215     def __init__(self, arvrunner, toolpath_object, **kwargs):
216         super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs)
217         self.arvrunner = arvrunner
218
219     def makeJobRunner(self):
220         return ArvadosJob(self.arvrunner)
221
222     def makePathMapper(self, reffiles, input_basedir, **kwargs):
223         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
224
225
226 class ArvCwlRunner(object):
227     def __init__(self, api_client):
228         self.api = api_client
229         self.jobs = {}
230         self.lock = threading.Lock()
231         self.cond = threading.Condition(self.lock)
232         self.final_output = None
233         self.uploaded = {}
234
235     def arvMakeTool(self, toolpath_object, **kwargs):
236         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
237             return ArvadosCommandTool(self, toolpath_object, **kwargs)
238         else:
239             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
240
241     def output_callback(self, out, processStatus):
242         if processStatus == "success":
243             logger.info("Overall job status is %s", processStatus)
244         else:
245             logger.warn("Overall job status is %s", processStatus)
246         self.final_output = out
247
248     def on_message(self, event):
249         if "object_uuid" in event:
250                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
251                     if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
252                         logger.info("Job %s is Running", event["object_uuid"])
253                         with self.lock:
254                             self.jobs[event["object_uuid"]].running = True
255                     elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
256                         logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
257                         try:
258                             self.cond.acquire()
259                             self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
260                             self.cond.notify()
261                         finally:
262                             self.cond.release()
263
264     def get_uploaded(self):
265         return self.uploaded.copy()
266
267     def add_uploaded(self, src, pair):
268         self.uploaded[src] = pair
269
270     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
271         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
272
273         self.fs_access = CollectionFsAccess(input_basedir)
274
275         kwargs["fs_access"] = self.fs_access
276         kwargs["enable_reuse"] = args.enable_reuse
277         kwargs["repository"] = args.repository
278
279         if kwargs.get("conformance_test"):
280             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
281         else:
282             jobiter = tool.job(job_order,
283                             input_basedir,
284                             self.output_callback,
285                             **kwargs)
286
287             for runnable in jobiter:
288                 if runnable:
289                     with self.lock:
290                         runnable.run(**kwargs)
291                 else:
292                     if self.jobs:
293                         try:
294                             self.cond.acquire()
295                             self.cond.wait()
296                         finally:
297                             self.cond.release()
298                     else:
299                         logger.error("Workflow cannot make any more progress.")
300                         break
301
302             while self.jobs:
303                 try:
304                     self.cond.acquire()
305                     self.cond.wait()
306                 finally:
307                     self.cond.release()
308
309             events.close()
310
311             if self.final_output is None:
312                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
313
314             return self.final_output
315
316
317 def main(args, stdout, stderr, api_client=None):
318     runner = ArvCwlRunner(api_client=arvados.api('v1'))
319     args.insert(0, "--leave-outputs")
320     parser = cwltool.main.arg_parser()
321     exgroup = parser.add_mutually_exclusive_group()
322     exgroup.add_argument("--enable-reuse", action="store_true",
323                         default=False, dest="enable_reuse",
324                         help="")
325     exgroup.add_argument("--disable-reuse", action="store_false",
326                         default=False, dest="enable_reuse",
327                         help="")
328
329     parser.add_argument('--repository', type=str, default="peter/crunchrunner", help="Repository containing the 'crunchrunner' program.")
330
331     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)