22235: differentiated frozen manageable projects
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib.request, urllib.parse, urllib.error
10
11 import arvados_cwl.util
12 import arvados.commands.run
13 import arvados.collection
14
15 from arvados.errors import ApiError
16 from arvados._internal.http_to_keep import http_to_keep
17 from cwltool.pathmapper import PathMapper, MapperEnt
18 from cwltool.utils import adjustFileObjs, adjustDirObjs
19 from cwltool.stdfsaccess import abspath
20 from cwltool.workflow import WorkflowException
21 from schema_salad.sourceline import SourceLine
22
23 logger = logging.getLogger('arvados.cwl-runner')
24
25 def trim_listing(obj):
26     """Remove 'listing' field from Directory objects that are keep references.
27
28     When Directory objects represent Keep references, it is redundant and
29     potentially very expensive to pass fully enumerated Directory objects
30     between instances of cwl-runner (e.g. a submitting a job, or using the
31     RunInSingleContainer feature), so delete the 'listing' field when it is
32     safe to do so.
33
34     """
35
36     if obj.get("location", "").startswith("keep:") and "listing" in obj:
37         del obj["listing"]
38
39 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
41 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
42
43 class ArvPathMapper(PathMapper):
44     """Convert container-local paths to and from Keep collection ids."""
45
46     def __init__(self, arvrunner, referenced_files, input_basedir,
47                  collection_pattern, file_pattern, name=None, single_collection=False,
48                  optional_deps=None):
49         self.arvrunner = arvrunner
50         self.input_basedir = input_basedir
51         self.collection_pattern = collection_pattern
52         self.file_pattern = file_pattern
53         self.name = name
54         self.referenced_files = [r["location"] for r in referenced_files]
55         self.single_collection = single_collection
56         self.pdh_to_uuid = {}
57         self.optional_deps = optional_deps or []
58         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
59
60     def visit(self, srcobj, uploadfiles):
61         src = srcobj["location"]
62         if "#" in src:
63             src = src[:src.index("#")]
64
65         debug = logger.isEnabledFor(logging.DEBUG)
66
67         if isinstance(src, str) and src.startswith("keep:"):
68             if collection_pdh_pattern.match(src):
69                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
70
71                 if arvados_cwl.util.collectionUUID in srcobj:
72                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
73             elif not collection_uuid_pattern.match(src):
74                 with SourceLine(srcobj, "location", WorkflowException, debug):
75                     raise WorkflowException("Invalid keep reference '%s'" % src)
76
77         if src not in self._pathmap:
78             if src.startswith("file:"):
79                 # Local FS ref, may need to be uploaded or may be on keep
80                 # mount.
81                 ab = abspath(src, self.input_basedir)
82                 st = arvados.commands.run.statfile("", ab,
83                                                    fnPattern="keep:%s/%s",
84                                                    dirPattern="keep:%s/%s",
85                                                    raiseOSError=True)
86                 with SourceLine(srcobj, "location", WorkflowException, debug):
87                     if isinstance(st, arvados.commands.run.UploadFile):
88                         uploadfiles.add((src, ab, st))
89                     elif isinstance(st, arvados.commands.run.ArvFile):
90                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
91                     else:
92                         raise WorkflowException("Input file path '%s' is invalid" % st)
93             elif src.startswith("_:"):
94                 if srcobj["class"] == "File" and "contents" not in srcobj:
95                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
96                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
97                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
98             elif src.startswith("http:") or src.startswith("https:"):
99                 try:
100                     if self.arvrunner.defer_downloads:
101                         # passthrough, we'll download it later.
102                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
103                     else:
104                         results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
105                                                               varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
106                                                               prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
107                         keepref = "keep:%s/%s" % (results[0], results[1])
108                         logger.info("%s is %s", src, keepref)
109                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
110                 except Exception as e:
111                     logger.warning("Download error: %s", e)
112             else:
113                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
114
115         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
116             for l in srcobj.get("secondaryFiles", []):
117                 self.visit(l, uploadfiles)
118         with SourceLine(srcobj, "listing", WorkflowException, debug):
119             for l in srcobj.get("listing", []):
120                 self.visit(l, uploadfiles)
121
122     def addentry(self, obj, c, path, remap):
123         if obj["location"] in self._pathmap:
124             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
125             if srcpath == "":
126                 srcpath = "."
127             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
128             remap.append((obj["location"], path + "/" + obj["basename"]))
129             for l in obj.get("secondaryFiles", []):
130                 self.addentry(l, c, path, remap)
131         elif obj["class"] == "Directory":
132             for l in obj.get("listing", []):
133                 self.addentry(l, c, path + "/" + obj["basename"], remap)
134             remap.append((obj["location"], path + "/" + obj["basename"]))
135         elif obj["location"].startswith("_:") and "contents" in obj:
136             with c.open(path + "/" + obj["basename"], "w") as f:
137                 f.write(obj["contents"])
138             remap.append((obj["location"], path + "/" + obj["basename"]))
139         else:
140             for opt in self.optional_deps:
141                 if obj["location"] == opt["location"]:
142                     return
143             raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
144
145     def needs_new_collection(self, srcobj, prefix=""):
146         """Check if files need to be staged into a new collection.
147
148         If all the files are in the same collection and in the same
149         paths they would be staged to, return False.  Otherwise, a new
150         collection is needed with files copied/created in the
151         appropriate places.
152         """
153
154         loc = srcobj["location"]
155         if loc.startswith("_:"):
156             return True
157
158         if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
159             return False
160
161         i = loc.rfind("/")
162         if i > -1:
163             loc_prefix = loc[:i+1]
164             if not prefix:
165                 prefix = loc_prefix
166             # quote/unquote to ensure consistent quoting
167             suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
168         else:
169             # no '/' found
170             loc_prefix = loc+"/"
171             prefix = loc+"/"
172             suffix = ""
173
174         if prefix != loc_prefix:
175             return True
176
177         if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
178             return True
179
180         if srcobj["class"] == "File" and loc not in self._pathmap:
181             return True
182         for s in srcobj.get("secondaryFiles", []):
183             if self.needs_new_collection(s, prefix):
184                 return True
185         if srcobj.get("listing"):
186             prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
187             for l in srcobj["listing"]:
188                 if self.needs_new_collection(l, prefix):
189                     return True
190         return False
191
192     def setup(self, referenced_files, basedir):
193         # type: (List[Any], unicode) -> None
194         uploadfiles = set()
195
196         collection = None
197         if self.single_collection:
198             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
199                                                        keep_client=self.arvrunner.keep_client,
200                                                        num_retries=self.arvrunner.num_retries)
201
202         for srcobj in referenced_files:
203             self.visit(srcobj, uploadfiles)
204
205         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
206                                          self.arvrunner.api,
207                                          dry_run=False,
208                                          num_retries=self.arvrunner.num_retries,
209                                          fnPattern="keep:%s/%s",
210                                          name=self.name,
211                                          project=self.arvrunner.project_uuid,
212                                          collection=collection,
213                                          packed=False)
214
215         for src, ab, st in uploadfiles:
216             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
217                                            "Directory" if os.path.isdir(ab) else "File", True)
218
219         for srcobj in referenced_files:
220             remap = []
221             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
222                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
223                                                   keep_client=self.arvrunner.keep_client,
224                                                   num_retries=self.arvrunner.num_retries)
225                 for l in srcobj.get("listing", []):
226                     self.addentry(l, c, ".", remap)
227
228                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
229                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
230
231                 c.save_new(name=info["name"],
232                            owner_uuid=self.arvrunner.project_uuid,
233                            ensure_unique_name=True,
234                            trash_at=info["trash_at"],
235                            properties=info["properties"])
236
237                 ab = self.collection_pattern % c.portable_data_hash()
238                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
239             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
240                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
241                                                   keep_client=self.arvrunner.keep_client,
242                                                   num_retries=self.arvrunner.num_retries)
243                 self.addentry(srcobj, c, ".", remap)
244
245                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
246                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
247
248                 c.save_new(name=info["name"],
249                            owner_uuid=self.arvrunner.project_uuid,
250                            ensure_unique_name=True,
251                            trash_at=info["trash_at"],
252                            properties=info["properties"])
253
254                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
255                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
256                                                               ab, "File", True)
257                 if srcobj.get("secondaryFiles"):
258                     ab = self.collection_pattern % c.portable_data_hash()
259                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
260
261             if remap:
262                 for loc, sub in remap:
263                     # subdirs start with "./", strip it off
264                     if sub.startswith("./"):
265                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
266                     else:
267                         ab = self.file_pattern % (c.portable_data_hash(), sub)
268                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
269                                                    ab, "Directory", True)
270
271         self.keepdir = None
272
273     def reversemap(self, target):
274         p = super(ArvPathMapper, self).reversemap(target)
275         if p:
276             return p
277         elif target.startswith("keep:"):
278             return (target, target)
279         elif self.keepdir and target.startswith(self.keepdir):
280             kp = "keep:" + target[len(self.keepdir)+1:]
281             return (kp, kp)
282         else:
283             return None
284
285
286 class StagingPathMapper(PathMapper):
287     # Note that StagingPathMapper internally maps files from target to source.
288     # Specifically, the 'self._pathmap' dict keys are the target location and the
289     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
290     # as the file identifier. This makes it possible to map an input file to multiple
291     # target directories. The exception is for file literals, which store the contents of
292     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
293
294     _follow_dirs = True
295
296     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
297         self.targets = set()
298         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
299
300     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
301         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
302         loc = obj["location"]
303         stagedir = obj.get("dirname") or stagedir
304         tgt = os.path.join(stagedir, obj["basename"])
305         basetgt, baseext = os.path.splitext(tgt)
306
307         def targetExists():
308             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
309         def literalTargetExists():
310             return tgt in self.targets and "contents" in obj
311
312         n = 1
313         if targetExists() or literalTargetExists():
314             while tgt in self.targets:
315                 n += 1
316                 tgt = "%s_%i%s" % (basetgt, n, baseext)
317         self.targets.add(tgt)
318         if obj["class"] == "Directory":
319             if obj.get("writable"):
320                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
321             else:
322                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
323             if loc.startswith("_:") or self._follow_dirs:
324                 self.visitlisting(obj.get("listing", []), tgt, basedir)
325         elif obj["class"] == "File":
326             if tgt in self._pathmap:
327                 return
328             if "contents" in obj and loc.startswith("_:"):
329                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
330             else:
331                 if copy or obj.get("writable"):
332                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
333                 else:
334                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
335                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
336
337     def mapper(self, src):  # type: (Text) -> MapperEnt.
338         # Overridden to maintain the use case of mapping by source (identifier) to
339         # target regardless of how the map is structured interally.
340         def getMapperEnt(src):
341             for k,v in self._pathmap.items():
342                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
343                     return v
344
345         if u"#" in src:
346             i = src.index(u"#")
347             v = getMapperEnt(src[i:])
348             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
349         return getMapperEnt(src)
350
351
352 class VwdPathMapper(StagingPathMapper):
353     def setup(self, referenced_files, basedir):
354         # type: (List[Any], unicode) -> None
355
356         # Go through each file and set the target to its own directory along
357         # with any secondary files.
358         self.visitlisting(referenced_files, self.stagedir, basedir)
359
360         for path, (ab, tgt, type, staged) in self._pathmap.items():
361             if type in ("File", "Directory") and ab.startswith("keep:"):
362                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
363
364
365 class NoFollowPathMapper(StagingPathMapper):
366     _follow_dirs = False
367     def setup(self, referenced_files, basedir):
368         # type: (List[Any], unicode) -> None
369         self.visitlisting(referenced_files, self.stagedir, basedir)