20462: Refactor "common prefix" function & fix fencepost errors
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from arvados.http_to_keep import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False,
56                  optional_deps=None):
57         self.arvrunner = arvrunner
58         self.input_basedir = input_basedir
59         self.collection_pattern = collection_pattern
60         self.file_pattern = file_pattern
61         self.name = name
62         self.referenced_files = [r["location"] for r in referenced_files]
63         self.single_collection = single_collection
64         self.pdh_to_uuid = {}
65         self.optional_deps = optional_deps or []
66         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
67
68     def visit(self, srcobj, uploadfiles):
69         src = srcobj["location"]
70         if "#" in src:
71             src = src[:src.index("#")]
72
73         debug = logger.isEnabledFor(logging.DEBUG)
74
75         if isinstance(src, basestring) and src.startswith("keep:"):
76             if collection_pdh_pattern.match(src):
77                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
78
79                 if arvados_cwl.util.collectionUUID in srcobj:
80                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81             elif not collection_uuid_pattern.match(src):
82                 with SourceLine(srcobj, "location", WorkflowException, debug):
83                     raise WorkflowException("Invalid keep reference '%s'" % src)
84
85         if src not in self._pathmap:
86             if src.startswith("file:"):
87                 # Local FS ref, may need to be uploaded or may be on keep
88                 # mount.
89                 ab = abspath(src, self.input_basedir)
90                 st = arvados.commands.run.statfile("", ab,
91                                                    fnPattern="keep:%s/%s",
92                                                    dirPattern="keep:%s/%s",
93                                                    raiseOSError=True)
94                 with SourceLine(srcobj, "location", WorkflowException, debug):
95                     if isinstance(st, arvados.commands.run.UploadFile):
96                         uploadfiles.add((src, ab, st))
97                     elif isinstance(st, arvados.commands.run.ArvFile):
98                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
99                     else:
100                         raise WorkflowException("Input file path '%s' is invalid" % st)
101             elif src.startswith("_:"):
102                 if srcobj["class"] == "File" and "contents" not in srcobj:
103                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
104                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106             elif src.startswith("http:") or src.startswith("https:"):
107                 try:
108                     if self.arvrunner.defer_downloads:
109                         # passthrough, we'll download it later.
110                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
111                     else:
112                         keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
113                                                               varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
114                                                               prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
115                         logger.info("%s is %s", src, keepref)
116                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
117                 except Exception as e:
118                     logger.warning(str(e))
119             else:
120                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
121
122         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
123             for l in srcobj.get("secondaryFiles", []):
124                 self.visit(l, uploadfiles)
125         with SourceLine(srcobj, "listing", WorkflowException, debug):
126             for l in srcobj.get("listing", []):
127                 self.visit(l, uploadfiles)
128
129     def addentry(self, obj, c, path, remap):
130         if obj["location"] in self._pathmap:
131             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
132             if srcpath == "":
133                 srcpath = "."
134             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
135             remap.append((obj["location"], path + "/" + obj["basename"]))
136             for l in obj.get("secondaryFiles", []):
137                 self.addentry(l, c, path, remap)
138         elif obj["class"] == "Directory":
139             for l in obj.get("listing", []):
140                 self.addentry(l, c, path + "/" + obj["basename"], remap)
141             remap.append((obj["location"], path + "/" + obj["basename"]))
142         elif obj["location"].startswith("_:") and "contents" in obj:
143             with c.open(path + "/" + obj["basename"], "w") as f:
144                 f.write(obj["contents"])
145             remap.append((obj["location"], path + "/" + obj["basename"]))
146         else:
147             for opt in self.optional_deps:
148                 if obj["location"] == opt["location"]:
149                     return
150             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
151
152     def needs_new_collection(self, srcobj, prefix=""):
153         """Check if files need to be staged into a new collection.
154
155         If all the files are in the same collection and in the same
156         paths they would be staged to, return False.  Otherwise, a new
157         collection is needed with files copied/created in the
158         appropriate places.
159         """
160
161         loc = srcobj["location"]
162         if loc.startswith("_:"):
163             return True
164
165         if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
166             return False
167
168         i = loc.rfind("/")
169         if i > -1:
170             loc_prefix = loc[:i+1]
171             if not prefix:
172                 prefix = loc_prefix
173             # quote/unquote to ensure consistent quoting
174             suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
175         else:
176             # no '/' found
177             loc_prefix = loc+"/"
178             prefix = loc+"/"
179             suffix = ""
180
181         if prefix != loc_prefix:
182             return True
183
184         if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
185             return True
186
187         if srcobj["class"] == "File" and loc not in self._pathmap:
188             return True
189         for s in srcobj.get("secondaryFiles", []):
190             if self.needs_new_collection(s, prefix):
191                 return True
192         if srcobj.get("listing"):
193             prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
194             for l in srcobj["listing"]:
195                 if self.needs_new_collection(l, prefix):
196                     return True
197         return False
198
199     def setup(self, referenced_files, basedir):
200         # type: (List[Any], unicode) -> None
201         uploadfiles = set()
202
203         collection = None
204         if self.single_collection:
205             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
206                                                        keep_client=self.arvrunner.keep_client,
207                                                        num_retries=self.arvrunner.num_retries)
208
209         for srcobj in referenced_files:
210             self.visit(srcobj, uploadfiles)
211
212         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
213                                          self.arvrunner.api,
214                                          dry_run=False,
215                                          num_retries=self.arvrunner.num_retries,
216                                          fnPattern="keep:%s/%s",
217                                          name=self.name,
218                                          project=self.arvrunner.project_uuid,
219                                          collection=collection,
220                                          packed=False)
221
222         for src, ab, st in uploadfiles:
223             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
224                                            "Directory" if os.path.isdir(ab) else "File", True)
225
226         for srcobj in referenced_files:
227             remap = []
228             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
229                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
230                                                   keep_client=self.arvrunner.keep_client,
231                                                   num_retries=self.arvrunner.num_retries)
232                 for l in srcobj.get("listing", []):
233                     self.addentry(l, c, ".", remap)
234
235                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
236                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
237
238                 c.save_new(name=info["name"],
239                            owner_uuid=self.arvrunner.project_uuid,
240                            ensure_unique_name=True,
241                            trash_at=info["trash_at"],
242                            properties=info["properties"])
243
244                 ab = self.collection_pattern % c.portable_data_hash()
245                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
246             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
247                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
248                                                   keep_client=self.arvrunner.keep_client,
249                                                   num_retries=self.arvrunner.num_retries)
250                 self.addentry(srcobj, c, ".", remap)
251
252                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
253                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
254
255                 c.save_new(name=info["name"],
256                            owner_uuid=self.arvrunner.project_uuid,
257                            ensure_unique_name=True,
258                            trash_at=info["trash_at"],
259                            properties=info["properties"])
260
261                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
262                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
263                                                               ab, "File", True)
264                 if srcobj.get("secondaryFiles"):
265                     ab = self.collection_pattern % c.portable_data_hash()
266                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
267
268             if remap:
269                 for loc, sub in remap:
270                     # subdirs start with "./", strip it off
271                     if sub.startswith("./"):
272                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
273                     else:
274                         ab = self.file_pattern % (c.portable_data_hash(), sub)
275                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
276                                                    ab, "Directory", True)
277
278         self.keepdir = None
279
280     def reversemap(self, target):
281         p = super(ArvPathMapper, self).reversemap(target)
282         if p:
283             return p
284         elif target.startswith("keep:"):
285             return (target, target)
286         elif self.keepdir and target.startswith(self.keepdir):
287             kp = "keep:" + target[len(self.keepdir)+1:]
288             return (kp, kp)
289         else:
290             return None
291
292
293 class StagingPathMapper(PathMapper):
294     # Note that StagingPathMapper internally maps files from target to source.
295     # Specifically, the 'self._pathmap' dict keys are the target location and the
296     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
297     # as the file identifier. This makes it possible to map an input file to multiple
298     # target directories. The exception is for file literals, which store the contents of
299     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
300
301     _follow_dirs = True
302
303     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
304         self.targets = set()
305         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
306
307     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
308         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
309         loc = obj["location"]
310         stagedir = obj.get("dirname") or stagedir
311         tgt = os.path.join(stagedir, obj["basename"])
312         basetgt, baseext = os.path.splitext(tgt)
313
314         def targetExists():
315             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
316         def literalTargetExists():
317             return tgt in self.targets and "contents" in obj
318
319         n = 1
320         if targetExists() or literalTargetExists():
321             while tgt in self.targets:
322                 n += 1
323                 tgt = "%s_%i%s" % (basetgt, n, baseext)
324         self.targets.add(tgt)
325         if obj["class"] == "Directory":
326             if obj.get("writable"):
327                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
328             else:
329                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
330             if loc.startswith("_:") or self._follow_dirs:
331                 self.visitlisting(obj.get("listing", []), tgt, basedir)
332         elif obj["class"] == "File":
333             if tgt in self._pathmap:
334                 return
335             if "contents" in obj and loc.startswith("_:"):
336                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
337             else:
338                 if copy or obj.get("writable"):
339                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
340                 else:
341                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
342                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
343
344     def mapper(self, src):  # type: (Text) -> MapperEnt.
345         # Overridden to maintain the use case of mapping by source (identifier) to
346         # target regardless of how the map is structured interally.
347         def getMapperEnt(src):
348             for k,v in viewitems(self._pathmap):
349                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
350                     return v
351
352         if u"#" in src:
353             i = src.index(u"#")
354             v = getMapperEnt(src[i:])
355             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
356         return getMapperEnt(src)
357
358
359 class VwdPathMapper(StagingPathMapper):
360     def setup(self, referenced_files, basedir):
361         # type: (List[Any], unicode) -> None
362
363         # Go through each file and set the target to its own directory along
364         # with any secondary files.
365         self.visitlisting(referenced_files, self.stagedir, basedir)
366
367         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
368             if type in ("File", "Directory") and ab.startswith("keep:"):
369                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
370
371
372 class NoFollowPathMapper(StagingPathMapper):
373     _follow_dirs = False
374     def setup(self, referenced_files, basedir):
375         # type: (List[Any], unicode) -> None
376         self.visitlisting(referenced_files, self.stagedir, basedir)