Merge branch '17152-collection-versions-fixes'
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False):
56         self.arvrunner = arvrunner
57         self.input_basedir = input_basedir
58         self.collection_pattern = collection_pattern
59         self.file_pattern = file_pattern
60         self.name = name
61         self.referenced_files = [r["location"] for r in referenced_files]
62         self.single_collection = single_collection
63         self.pdh_to_uuid = {}
64         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
65
66     def visit(self, srcobj, uploadfiles):
67         src = srcobj["location"]
68         if "#" in src:
69             src = src[:src.index("#")]
70
71         debug = logger.isEnabledFor(logging.DEBUG)
72
73         if isinstance(src, basestring) and src.startswith("keep:"):
74             if collection_pdh_pattern.match(src):
75                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
76                 if arvados_cwl.util.collectionUUID in srcobj:
77                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
78             elif not collection_uuid_pattern.match(src):
79                 with SourceLine(srcobj, "location", WorkflowException, debug):
80                     raise WorkflowException("Invalid keep reference '%s'" % src)
81
82         if src not in self._pathmap:
83             if src.startswith("file:"):
84                 # Local FS ref, may need to be uploaded or may be on keep
85                 # mount.
86                 ab = abspath(src, self.input_basedir)
87                 st = arvados.commands.run.statfile("", ab,
88                                                    fnPattern="keep:%s/%s",
89                                                    dirPattern="keep:%s/%s",
90                                                    raiseOSError=True)
91                 with SourceLine(srcobj, "location", WorkflowException, debug):
92                     if isinstance(st, arvados.commands.run.UploadFile):
93                         uploadfiles.add((src, ab, st))
94                     elif isinstance(st, arvados.commands.run.ArvFile):
95                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
96                     else:
97                         raise WorkflowException("Input file path '%s' is invalid" % st)
98             elif src.startswith("_:"):
99                 if srcobj["class"] == "File" and "contents" not in srcobj:
100                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
101                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
102                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
103             elif src.startswith("http:") or src.startswith("https:"):
104                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
105                 logger.info("%s is %s", src, keepref)
106                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
107             else:
108                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
109
110         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
111             for l in srcobj.get("secondaryFiles", []):
112                 self.visit(l, uploadfiles)
113         with SourceLine(srcobj, "listing", WorkflowException, debug):
114             for l in srcobj.get("listing", []):
115                 self.visit(l, uploadfiles)
116
117     def addentry(self, obj, c, path, remap):
118         if obj["location"] in self._pathmap:
119             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
120             if srcpath == "":
121                 srcpath = "."
122             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
123             remap.append((obj["location"], path + "/" + obj["basename"]))
124             for l in obj.get("secondaryFiles", []):
125                 self.addentry(l, c, path, remap)
126         elif obj["class"] == "Directory":
127             for l in obj.get("listing", []):
128                 self.addentry(l, c, path + "/" + obj["basename"], remap)
129             remap.append((obj["location"], path + "/" + obj["basename"]))
130         elif obj["location"].startswith("_:") and "contents" in obj:
131             with c.open(path + "/" + obj["basename"], "w") as f:
132                 f.write(obj["contents"])
133             remap.append((obj["location"], path + "/" + obj["basename"]))
134         else:
135             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
136
137     def needs_new_collection(self, srcobj, prefix=""):
138         """Check if files need to be staged into a new collection.
139
140         If all the files are in the same collection and in the same
141         paths they would be staged to, return False.  Otherwise, a new
142         collection is needed with files copied/created in the
143         appropriate places.
144         """
145
146         loc = srcobj["location"]
147         if loc.startswith("_:"):
148             return True
149         if prefix:
150             if loc != prefix+srcobj["basename"]:
151                 return True
152         else:
153             i = loc.rfind("/")
154             if i > -1:
155                 prefix = loc[:i+1]
156             else:
157                 prefix = loc+"/"
158         if srcobj["class"] == "File" and loc not in self._pathmap:
159             return True
160         for s in srcobj.get("secondaryFiles", []):
161             if self.needs_new_collection(s, prefix):
162                 return True
163         if srcobj.get("listing"):
164             prefix = "%s%s/" % (prefix, srcobj["basename"])
165             for l in srcobj["listing"]:
166                 if self.needs_new_collection(l, prefix):
167                     return True
168         return False
169
170     def setup(self, referenced_files, basedir):
171         # type: (List[Any], unicode) -> None
172         uploadfiles = set()
173
174         collection = None
175         if self.single_collection:
176             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
177                                                        keep_client=self.arvrunner.keep_client,
178                                                        num_retries=self.arvrunner.num_retries)
179
180         for srcobj in referenced_files:
181             self.visit(srcobj, uploadfiles)
182
183         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
184                                          self.arvrunner.api,
185                                          dry_run=False,
186                                          num_retries=self.arvrunner.num_retries,
187                                          fnPattern="keep:%s/%s",
188                                          name=self.name,
189                                          project=self.arvrunner.project_uuid,
190                                          collection=collection,
191                                          packed=False)
192
193         for src, ab, st in uploadfiles:
194             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
195                                            "Directory" if os.path.isdir(ab) else "File", True)
196
197         for srcobj in referenced_files:
198             remap = []
199             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
200                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
201                                                   keep_client=self.arvrunner.keep_client,
202                                                   num_retries=self.arvrunner.num_retries)
203                 for l in srcobj.get("listing", []):
204                     self.addentry(l, c, ".", remap)
205
206                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
208
209                 c.save_new(name=info["name"],
210                            owner_uuid=self.arvrunner.project_uuid,
211                            ensure_unique_name=True,
212                            trash_at=info["trash_at"],
213                            properties=info["properties"])
214
215                 ab = self.collection_pattern % c.portable_data_hash()
216                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
217             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
218                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
219
220                 # If all secondary files/directories are located in
221                 # the same collection as the primary file and the
222                 # paths and names that are consistent with staging,
223                 # don't create a new collection.
224                 if not self.needs_new_collection(srcobj):
225                     continue
226
227                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
228                                                   keep_client=self.arvrunner.keep_client,
229                                                   num_retries=self.arvrunner.num_retries                                                  )
230                 self.addentry(srcobj, c, ".", remap)
231
232                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
233                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
234
235                 c.save_new(name=info["name"],
236                            owner_uuid=self.arvrunner.project_uuid,
237                            ensure_unique_name=True,
238                            trash_at=info["trash_at"],
239                            properties=info["properties"])
240
241                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
242                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
243                                                               ab, "File", True)
244                 if srcobj.get("secondaryFiles"):
245                     ab = self.collection_pattern % c.portable_data_hash()
246                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
247
248             if remap:
249                 for loc, sub in remap:
250                     # subdirs start with "./", strip it off
251                     if sub.startswith("./"):
252                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
253                     else:
254                         ab = self.file_pattern % (c.portable_data_hash(), sub)
255                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
256                                                    ab, "Directory", True)
257
258         self.keepdir = None
259
260     def reversemap(self, target):
261         p = super(ArvPathMapper, self).reversemap(target)
262         if p:
263             return p
264         elif target.startswith("keep:"):
265             return (target, target)
266         elif self.keepdir and target.startswith(self.keepdir):
267             kp = "keep:" + target[len(self.keepdir)+1:]
268             return (kp, kp)
269         else:
270             return None
271
272
273 class StagingPathMapper(PathMapper):
274     # Note that StagingPathMapper internally maps files from target to source.
275     # Specifically, the 'self._pathmap' dict keys are the target location and the
276     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
277     # as the file identifier. This makes it possible to map an input file to multiple
278     # target directories. The exception is for file literals, which store the contents of
279     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
280
281     _follow_dirs = True
282
283     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
284         self.targets = set()
285         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
286
287     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
288         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
289         loc = obj["location"]
290         stagedir = obj.get("dirname") or stagedir
291         tgt = os.path.join(stagedir, obj["basename"])
292         basetgt, baseext = os.path.splitext(tgt)
293
294         def targetExists():
295             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
296         def literalTargetExists():
297             return tgt in self.targets and "contents" in obj
298
299         n = 1
300         if targetExists() or literalTargetExists():
301             while tgt in self.targets:
302                 n += 1
303                 tgt = "%s_%i%s" % (basetgt, n, baseext)
304         self.targets.add(tgt)
305         if obj["class"] == "Directory":
306             if obj.get("writable"):
307                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
308             else:
309                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
310             if loc.startswith("_:") or self._follow_dirs:
311                 self.visitlisting(obj.get("listing", []), tgt, basedir)
312         elif obj["class"] == "File":
313             if tgt in self._pathmap:
314                 return
315             if "contents" in obj and loc.startswith("_:"):
316                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
317             else:
318                 if copy or obj.get("writable"):
319                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
320                 else:
321                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
322                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
323
324     def mapper(self, src):  # type: (Text) -> MapperEnt.
325         # Overridden to maintain the use case of mapping by source (identifier) to
326         # target regardless of how the map is structured interally.
327         def getMapperEnt(src):
328             for k,v in viewitems(self._pathmap):
329                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
330                     return v
331
332         if u"#" in src:
333             i = src.index(u"#")
334             v = getMapperEnt(src[i:])
335             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
336         return getMapperEnt(src)
337
338
339 class VwdPathMapper(StagingPathMapper):
340     def setup(self, referenced_files, basedir):
341         # type: (List[Any], unicode) -> None
342
343         # Go through each file and set the target to its own directory along
344         # with any secondary files.
345         self.visitlisting(referenced_files, self.stagedir, basedir)
346
347         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
348             if type in ("File", "Directory") and ab.startswith("keep:"):
349                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
350
351
352 class NoFollowPathMapper(StagingPathMapper):
353     _follow_dirs = False
354     def setup(self, referenced_files, basedir):
355         # type: (List[Any], unicode) -> None
356         self.visitlisting(referenced_files, self.stagedir, basedir)