13306: Changes to arvados-cwl-runner code after running futurize --stage2
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 from future import standard_library
2 standard_library.install_aliases()
3 from builtins import str
4 from past.builtins import basestring
5 # Copyright (C) The Arvados Authors. All rights reserved.
6 #
7 # SPDX-License-Identifier: Apache-2.0
8
9 import re
10 import logging
11 import uuid
12 import os
13 import urllib.request, urllib.parse, urllib.error
14
15 import arvados_cwl.util
16 import arvados.commands.run
17 import arvados.collection
18
19 from schema_salad.sourceline import SourceLine
20
21 from arvados.errors import ApiError
22 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
23 from cwltool.workflow import WorkflowException
24
25 from .http import http_to_keep
26
27 logger = logging.getLogger('arvados.cwl-runner')
28
29 def trim_listing(obj):
30     """Remove 'listing' field from Directory objects that are keep references.
31
32     When Directory objects represent Keep references, it is redundant and
33     potentially very expensive to pass fully enumerated Directory objects
34     between instances of cwl-runner (e.g. a submitting a job, or using the
35     RunInSingleContainer feature), so delete the 'listing' field when it is
36     safe to do so.
37
38     """
39
40     if obj.get("location", "").startswith("keep:") and "listing" in obj:
41         del obj["listing"]
42
43
44 class ArvPathMapper(PathMapper):
45     """Convert container-local paths to and from Keep collection ids."""
46
47     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
49
50     def __init__(self, arvrunner, referenced_files, input_basedir,
51                  collection_pattern, file_pattern, name=None, single_collection=False):
52         self.arvrunner = arvrunner
53         self.input_basedir = input_basedir
54         self.collection_pattern = collection_pattern
55         self.file_pattern = file_pattern
56         self.name = name
57         self.referenced_files = [r["location"] for r in referenced_files]
58         self.single_collection = single_collection
59         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
60
61     def visit(self, srcobj, uploadfiles):
62         src = srcobj["location"]
63         if "#" in src:
64             src = src[:src.index("#")]
65
66         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
67             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
68
69         debug = logger.isEnabledFor(logging.DEBUG)
70
71         if src not in self._pathmap:
72             if src.startswith("file:"):
73                 # Local FS ref, may need to be uploaded or may be on keep
74                 # mount.
75                 ab = abspath(src, self.input_basedir)
76                 st = arvados.commands.run.statfile("", ab,
77                                                    fnPattern="keep:%s/%s",
78                                                    dirPattern="keep:%s/%s",
79                                                    raiseOSError=True)
80                 with SourceLine(srcobj, "location", WorkflowException, debug):
81                     if isinstance(st, arvados.commands.run.UploadFile):
82                         uploadfiles.add((src, ab, st))
83                     elif isinstance(st, arvados.commands.run.ArvFile):
84                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
85                     else:
86                         raise WorkflowException("Input file path '%s' is invalid" % st)
87             elif src.startswith("_:"):
88                 if srcobj["class"] == "File" and "contents" not in srcobj:
89                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
90                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
91                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
92             elif src.startswith("http:") or src.startswith("https:"):
93                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
94                 logger.info("%s is %s", src, keepref)
95                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
96             else:
97                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
98
99         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
100             for l in srcobj.get("secondaryFiles", []):
101                 self.visit(l, uploadfiles)
102         with SourceLine(srcobj, "listing", WorkflowException, debug):
103             for l in srcobj.get("listing", []):
104                 self.visit(l, uploadfiles)
105
106     def addentry(self, obj, c, path, remap):
107         if obj["location"] in self._pathmap:
108             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
109             if srcpath == "":
110                 srcpath = "."
111             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
112             remap.append((obj["location"], path + "/" + obj["basename"]))
113             for l in obj.get("secondaryFiles", []):
114                 self.addentry(l, c, path, remap)
115         elif obj["class"] == "Directory":
116             for l in obj.get("listing", []):
117                 self.addentry(l, c, path + "/" + obj["basename"], remap)
118             remap.append((obj["location"], path + "/" + obj["basename"]))
119         elif obj["location"].startswith("_:") and "contents" in obj:
120             with c.open(path + "/" + obj["basename"], "w") as f:
121                 f.write(obj["contents"].encode("utf-8"))
122             remap.append((obj["location"], path + "/" + obj["basename"]))
123         else:
124             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
125
126     def needs_new_collection(self, srcobj, prefix=""):
127         """Check if files need to be staged into a new collection.
128
129         If all the files are in the same collection and in the same
130         paths they would be staged to, return False.  Otherwise, a new
131         collection is needed with files copied/created in the
132         appropriate places.
133         """
134
135         loc = srcobj["location"]
136         if loc.startswith("_:"):
137             return True
138         if prefix:
139             if loc != prefix+srcobj["basename"]:
140                 return True
141         else:
142             i = loc.rfind("/")
143             if i > -1:
144                 prefix = loc[:i+1]
145             else:
146                 prefix = loc+"/"
147         if srcobj["class"] == "File" and loc not in self._pathmap:
148             return True
149         for s in srcobj.get("secondaryFiles", []):
150             if self.needs_new_collection(s, prefix):
151                 return True
152         if srcobj.get("listing"):
153             prefix = "%s%s/" % (prefix, srcobj["basename"])
154             for l in srcobj["listing"]:
155                 if self.needs_new_collection(l, prefix):
156                     return True
157         return False
158
159     def setup(self, referenced_files, basedir):
160         # type: (List[Any], unicode) -> None
161         uploadfiles = set()
162
163         collection = None
164         if self.single_collection:
165             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
166                                                        keep_client=self.arvrunner.keep_client,
167                                                        num_retries=self.arvrunner.num_retries)
168
169         for srcobj in referenced_files:
170             self.visit(srcobj, uploadfiles)
171
172         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
173                                          self.arvrunner.api,
174                                          dry_run=False,
175                                          num_retries=self.arvrunner.num_retries,
176                                          fnPattern="keep:%s/%s",
177                                          name=self.name,
178                                          project=self.arvrunner.project_uuid,
179                                          collection=collection,
180                                          packed=False)
181
182         for src, ab, st in uploadfiles:
183             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
184                                            "Directory" if os.path.isdir(ab) else "File", True)
185
186         for srcobj in referenced_files:
187             remap = []
188             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
189                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
190                                                   keep_client=self.arvrunner.keep_client,
191                                                   num_retries=self.arvrunner.num_retries)
192                 for l in srcobj.get("listing", []):
193                     self.addentry(l, c, ".", remap)
194
195                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
196                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
197
198                 c.save_new(name=info["name"],
199                            owner_uuid=self.arvrunner.project_uuid,
200                            ensure_unique_name=True,
201                            trash_at=info["trash_at"],
202                            properties=info["properties"])
203
204                 ab = self.collection_pattern % c.portable_data_hash()
205                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
206             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
207                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
208
209                 # If all secondary files/directories are located in
210                 # the same collection as the primary file and the
211                 # paths and names that are consistent with staging,
212                 # don't create a new collection.
213                 if not self.needs_new_collection(srcobj):
214                     continue
215
216                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
217                                                   keep_client=self.arvrunner.keep_client,
218                                                   num_retries=self.arvrunner.num_retries                                                  )
219                 self.addentry(srcobj, c, ".", remap)
220
221                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
222                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
223
224                 c.save_new(name=info["name"],
225                            owner_uuid=self.arvrunner.project_uuid,
226                            ensure_unique_name=True,
227                            trash_at=info["trash_at"],
228                            properties=info["properties"])
229
230                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
231                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
232                                                               ab, "File", True)
233                 if srcobj.get("secondaryFiles"):
234                     ab = self.collection_pattern % c.portable_data_hash()
235                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
236
237             if remap:
238                 for loc, sub in remap:
239                     # subdirs start with "./", strip it off
240                     if sub.startswith("./"):
241                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
242                     else:
243                         ab = self.file_pattern % (c.portable_data_hash(), sub)
244                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
245                                                    ab, "Directory", True)
246
247         self.keepdir = None
248
249     def reversemap(self, target):
250         p = super(ArvPathMapper, self).reversemap(target)
251         if p:
252             return p
253         elif target.startswith("keep:"):
254             return (target, target)
255         elif self.keepdir and target.startswith(self.keepdir):
256             kp = "keep:" + target[len(self.keepdir)+1:]
257             return (kp, kp)
258         else:
259             return None
260
261
262 class StagingPathMapper(PathMapper):
263     _follow_dirs = True
264
265     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
266         self.targets = set()
267         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
268
269     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
270         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
271         loc = obj["location"]
272         tgt = os.path.join(stagedir, obj["basename"])
273         basetgt, baseext = os.path.splitext(tgt)
274         n = 1
275         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
276             while tgt in self.targets:
277                 n += 1
278                 tgt = "%s_%i%s" % (basetgt, n, baseext)
279         self.targets.add(tgt)
280         if obj["class"] == "Directory":
281             if obj.get("writable"):
282                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
283             else:
284                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
285             if loc.startswith("_:") or self._follow_dirs:
286                 self.visitlisting(obj.get("listing", []), tgt, basedir)
287         elif obj["class"] == "File":
288             if loc in self._pathmap:
289                 return
290             if "contents" in obj and loc.startswith("_:"):
291                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
292             else:
293                 if copy or obj.get("writable"):
294                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
295                 else:
296                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
297                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
298
299
300 class VwdPathMapper(StagingPathMapper):
301     def setup(self, referenced_files, basedir):
302         # type: (List[Any], unicode) -> None
303
304         # Go through each file and set the target to its own directory along
305         # with any secondary files.
306         self.visitlisting(referenced_files, self.stagedir, basedir)
307
308         for path, (ab, tgt, type, staged) in list(self._pathmap.items()):
309             if type in ("File", "Directory") and ab.startswith("keep:"):
310                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
311
312
313 class NoFollowPathMapper(StagingPathMapper):
314     _follow_dirs = False
315     def setup(self, referenced_files, basedir):
316         # type: (List[Any], unicode) -> None
317         self.visitlisting(referenced_files, self.stagedir, basedir)