Merge branch 'main' into 15397-remove-obsolete-apis
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib.request, urllib.parse, urllib.error
10
11 import arvados_cwl.util
12 import arvados.commands.run
13 import arvados.collection
14
15 from schema_salad.sourceline import SourceLine
16
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt
19 from cwltool.utils import adjustFileObjs, adjustDirObjs
20 from cwltool.stdfsaccess import abspath
21 from cwltool.workflow import WorkflowException
22
23 from arvados.http_to_keep import http_to_keep
24
25 logger = logging.getLogger('arvados.cwl-runner')
26
27 def trim_listing(obj):
28     """Remove 'listing' field from Directory objects that are keep references.
29
30     When Directory objects represent Keep references, it is redundant and
31     potentially very expensive to pass fully enumerated Directory objects
32     between instances of cwl-runner (e.g. a submitting a job, or using the
33     RunInSingleContainer feature), so delete the 'listing' field when it is
34     safe to do so.
35
36     """
37
38     if obj.get("location", "").startswith("keep:") and "listing" in obj:
39         del obj["listing"]
40
41 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
42 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
43 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
44
45 class ArvPathMapper(PathMapper):
46     """Convert container-local paths to and from Keep collection ids."""
47
48     def __init__(self, arvrunner, referenced_files, input_basedir,
49                  collection_pattern, file_pattern, name=None, single_collection=False,
50                  optional_deps=None):
51         self.arvrunner = arvrunner
52         self.input_basedir = input_basedir
53         self.collection_pattern = collection_pattern
54         self.file_pattern = file_pattern
55         self.name = name
56         self.referenced_files = [r["location"] for r in referenced_files]
57         self.single_collection = single_collection
58         self.pdh_to_uuid = {}
59         self.optional_deps = optional_deps or []
60         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
61
62     def visit(self, srcobj, uploadfiles):
63         src = srcobj["location"]
64         if "#" in src:
65             src = src[:src.index("#")]
66
67         debug = logger.isEnabledFor(logging.DEBUG)
68
69         if isinstance(src, str) and src.startswith("keep:"):
70             if collection_pdh_pattern.match(src):
71                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
72
73                 if arvados_cwl.util.collectionUUID in srcobj:
74                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
75             elif not collection_uuid_pattern.match(src):
76                 with SourceLine(srcobj, "location", WorkflowException, debug):
77                     raise WorkflowException("Invalid keep reference '%s'" % src)
78
79         if src not in self._pathmap:
80             if src.startswith("file:"):
81                 # Local FS ref, may need to be uploaded or may be on keep
82                 # mount.
83                 ab = abspath(src, self.input_basedir)
84                 st = arvados.commands.run.statfile("", ab,
85                                                    fnPattern="keep:%s/%s",
86                                                    dirPattern="keep:%s/%s",
87                                                    raiseOSError=True)
88                 with SourceLine(srcobj, "location", WorkflowException, debug):
89                     if isinstance(st, arvados.commands.run.UploadFile):
90                         uploadfiles.add((src, ab, st))
91                     elif isinstance(st, arvados.commands.run.ArvFile):
92                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
93                     else:
94                         raise WorkflowException("Input file path '%s' is invalid" % st)
95             elif src.startswith("_:"):
96                 if srcobj["class"] == "File" and "contents" not in srcobj:
97                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
98                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
99                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
100             elif src.startswith("http:") or src.startswith("https:"):
101                 try:
102                     if self.arvrunner.defer_downloads:
103                         # passthrough, we'll download it later.
104                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
105                     else:
106                         results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
107                                                               varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
108                                                               prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
109                         keepref = "keep:%s/%s" % (results[0], results[1])
110                         logger.info("%s is %s", src, keepref)
111                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
112                 except Exception as e:
113                     logger.warning("Download error: %s", e)
114             else:
115                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
116
117         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
118             for l in srcobj.get("secondaryFiles", []):
119                 self.visit(l, uploadfiles)
120         with SourceLine(srcobj, "listing", WorkflowException, debug):
121             for l in srcobj.get("listing", []):
122                 self.visit(l, uploadfiles)
123
124     def addentry(self, obj, c, path, remap):
125         if obj["location"] in self._pathmap:
126             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
127             if srcpath == "":
128                 srcpath = "."
129             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
130             remap.append((obj["location"], path + "/" + obj["basename"]))
131             for l in obj.get("secondaryFiles", []):
132                 self.addentry(l, c, path, remap)
133         elif obj["class"] == "Directory":
134             for l in obj.get("listing", []):
135                 self.addentry(l, c, path + "/" + obj["basename"], remap)
136             remap.append((obj["location"], path + "/" + obj["basename"]))
137         elif obj["location"].startswith("_:") and "contents" in obj:
138             with c.open(path + "/" + obj["basename"], "w") as f:
139                 f.write(obj["contents"])
140             remap.append((obj["location"], path + "/" + obj["basename"]))
141         else:
142             for opt in self.optional_deps:
143                 if obj["location"] == opt["location"]:
144                     return
145             raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
146
147     def needs_new_collection(self, srcobj, prefix=""):
148         """Check if files need to be staged into a new collection.
149
150         If all the files are in the same collection and in the same
151         paths they would be staged to, return False.  Otherwise, a new
152         collection is needed with files copied/created in the
153         appropriate places.
154         """
155
156         loc = srcobj["location"]
157         if loc.startswith("_:"):
158             return True
159
160         if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
161             return False
162
163         i = loc.rfind("/")
164         if i > -1:
165             loc_prefix = loc[:i+1]
166             if not prefix:
167                 prefix = loc_prefix
168             # quote/unquote to ensure consistent quoting
169             suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
170         else:
171             # no '/' found
172             loc_prefix = loc+"/"
173             prefix = loc+"/"
174             suffix = ""
175
176         if prefix != loc_prefix:
177             return True
178
179         if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
180             return True
181
182         if srcobj["class"] == "File" and loc not in self._pathmap:
183             return True
184         for s in srcobj.get("secondaryFiles", []):
185             if self.needs_new_collection(s, prefix):
186                 return True
187         if srcobj.get("listing"):
188             prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
189             for l in srcobj["listing"]:
190                 if self.needs_new_collection(l, prefix):
191                     return True
192         return False
193
194     def setup(self, referenced_files, basedir):
195         # type: (List[Any], unicode) -> None
196         uploadfiles = set()
197
198         collection = None
199         if self.single_collection:
200             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
201                                                        keep_client=self.arvrunner.keep_client,
202                                                        num_retries=self.arvrunner.num_retries)
203
204         for srcobj in referenced_files:
205             self.visit(srcobj, uploadfiles)
206
207         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
208                                          self.arvrunner.api,
209                                          dry_run=False,
210                                          num_retries=self.arvrunner.num_retries,
211                                          fnPattern="keep:%s/%s",
212                                          name=self.name,
213                                          project=self.arvrunner.project_uuid,
214                                          collection=collection,
215                                          packed=False)
216
217         for src, ab, st in uploadfiles:
218             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
219                                            "Directory" if os.path.isdir(ab) else "File", True)
220
221         for srcobj in referenced_files:
222             remap = []
223             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
224                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
225                                                   keep_client=self.arvrunner.keep_client,
226                                                   num_retries=self.arvrunner.num_retries)
227                 for l in srcobj.get("listing", []):
228                     self.addentry(l, c, ".", remap)
229
230                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
231                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
232
233                 c.save_new(name=info["name"],
234                            owner_uuid=self.arvrunner.project_uuid,
235                            ensure_unique_name=True,
236                            trash_at=info["trash_at"],
237                            properties=info["properties"])
238
239                 ab = self.collection_pattern % c.portable_data_hash()
240                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
241             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
242                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
243                                                   keep_client=self.arvrunner.keep_client,
244                                                   num_retries=self.arvrunner.num_retries)
245                 self.addentry(srcobj, c, ".", remap)
246
247                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
248                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
249
250                 c.save_new(name=info["name"],
251                            owner_uuid=self.arvrunner.project_uuid,
252                            ensure_unique_name=True,
253                            trash_at=info["trash_at"],
254                            properties=info["properties"])
255
256                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
257                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
258                                                               ab, "File", True)
259                 if srcobj.get("secondaryFiles"):
260                     ab = self.collection_pattern % c.portable_data_hash()
261                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
262
263             if remap:
264                 for loc, sub in remap:
265                     # subdirs start with "./", strip it off
266                     if sub.startswith("./"):
267                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
268                     else:
269                         ab = self.file_pattern % (c.portable_data_hash(), sub)
270                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
271                                                    ab, "Directory", True)
272
273         self.keepdir = None
274
275     def reversemap(self, target):
276         p = super(ArvPathMapper, self).reversemap(target)
277         if p:
278             return p
279         elif target.startswith("keep:"):
280             return (target, target)
281         elif self.keepdir and target.startswith(self.keepdir):
282             kp = "keep:" + target[len(self.keepdir)+1:]
283             return (kp, kp)
284         else:
285             return None
286
287
288 class StagingPathMapper(PathMapper):
289     # Note that StagingPathMapper internally maps files from target to source.
290     # Specifically, the 'self._pathmap' dict keys are the target location and the
291     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
292     # as the file identifier. This makes it possible to map an input file to multiple
293     # target directories. The exception is for file literals, which store the contents of
294     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
295
296     _follow_dirs = True
297
298     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
299         self.targets = set()
300         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
301
302     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
303         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
304         loc = obj["location"]
305         stagedir = obj.get("dirname") or stagedir
306         tgt = os.path.join(stagedir, obj["basename"])
307         basetgt, baseext = os.path.splitext(tgt)
308
309         def targetExists():
310             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
311         def literalTargetExists():
312             return tgt in self.targets and "contents" in obj
313
314         n = 1
315         if targetExists() or literalTargetExists():
316             while tgt in self.targets:
317                 n += 1
318                 tgt = "%s_%i%s" % (basetgt, n, baseext)
319         self.targets.add(tgt)
320         if obj["class"] == "Directory":
321             if obj.get("writable"):
322                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
323             else:
324                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
325             if loc.startswith("_:") or self._follow_dirs:
326                 self.visitlisting(obj.get("listing", []), tgt, basedir)
327         elif obj["class"] == "File":
328             if tgt in self._pathmap:
329                 return
330             if "contents" in obj and loc.startswith("_:"):
331                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
332             else:
333                 if copy or obj.get("writable"):
334                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
335                 else:
336                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
337                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
338
339     def mapper(self, src):  # type: (Text) -> MapperEnt.
340         # Overridden to maintain the use case of mapping by source (identifier) to
341         # target regardless of how the map is structured interally.
342         def getMapperEnt(src):
343             for k,v in self._pathmap.items():
344                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
345                     return v
346
347         if u"#" in src:
348             i = src.index(u"#")
349             v = getMapperEnt(src[i:])
350             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
351         return getMapperEnt(src)
352
353
354 class VwdPathMapper(StagingPathMapper):
355     def setup(self, referenced_files, basedir):
356         # type: (List[Any], unicode) -> None
357
358         # Go through each file and set the target to its own directory along
359         # with any secondary files.
360         self.visitlisting(referenced_files, self.stagedir, basedir)
361
362         for path, (ab, tgt, type, staged) in self._pathmap.items():
363             if type in ("File", "Directory") and ab.startswith("keep:"):
364                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
365
366
367 class NoFollowPathMapper(StagingPathMapper):
368     _follow_dirs = False
369     def setup(self, referenced_files, basedir):
370         # type: (List[Any], unicode) -> None
371         self.visitlisting(referenced_files, self.stagedir, basedir)