18323: Revise in line with the cwltool version of this feature
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False):
56         self.arvrunner = arvrunner
57         self.input_basedir = input_basedir
58         self.collection_pattern = collection_pattern
59         self.file_pattern = file_pattern
60         self.name = name
61         self.referenced_files = [r["location"] for r in referenced_files]
62         self.single_collection = single_collection
63         self.pdh_to_uuid = {}
64         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
65
66     def visit(self, srcobj, uploadfiles):
67         src = srcobj["location"]
68         if "#" in src:
69             src = src[:src.index("#")]
70
71         debug = logger.isEnabledFor(logging.DEBUG)
72
73         if isinstance(src, basestring) and src.startswith("keep:"):
74             if collection_pdh_pattern.match(src):
75                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
76                 if arvados_cwl.util.collectionUUID in srcobj:
77                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
78             elif not collection_uuid_pattern.match(src):
79                 with SourceLine(srcobj, "location", WorkflowException, debug):
80                     raise WorkflowException("Invalid keep reference '%s'" % src)
81
82         if src not in self._pathmap:
83             if src.startswith("file:"):
84                 # Local FS ref, may need to be uploaded or may be on keep
85                 # mount.
86                 ab = abspath(src, self.input_basedir)
87                 st = arvados.commands.run.statfile("", ab,
88                                                    fnPattern="keep:%s/%s",
89                                                    dirPattern="keep:%s/%s",
90                                                    raiseOSError=True)
91                 with SourceLine(srcobj, "location", WorkflowException, debug):
92                     if isinstance(st, arvados.commands.run.UploadFile):
93                         uploadfiles.add((src, ab, st))
94                     elif isinstance(st, arvados.commands.run.ArvFile):
95                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
96                     else:
97                         raise WorkflowException("Input file path '%s' is invalid" % st)
98             elif src.startswith("_:"):
99                 if srcobj["class"] == "File" and "contents" not in srcobj:
100                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
101                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
102                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
103             elif src.startswith("http:") or src.startswith("https:"):
104                 try:
105                     keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
106                     logger.info("%s is %s", src, keepref)
107                     self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
108                 except Exception as e:
109                     logger.warning(str(e))
110             else:
111                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
112
113         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
114             for l in srcobj.get("secondaryFiles", []):
115                 self.visit(l, uploadfiles)
116         with SourceLine(srcobj, "listing", WorkflowException, debug):
117             for l in srcobj.get("listing", []):
118                 self.visit(l, uploadfiles)
119
120     def addentry(self, obj, c, path, remap):
121         if obj["location"] in self._pathmap:
122             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
123             if srcpath == "":
124                 srcpath = "."
125             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
126             remap.append((obj["location"], path + "/" + obj["basename"]))
127             for l in obj.get("secondaryFiles", []):
128                 self.addentry(l, c, path, remap)
129         elif obj["class"] == "Directory":
130             for l in obj.get("listing", []):
131                 self.addentry(l, c, path + "/" + obj["basename"], remap)
132             remap.append((obj["location"], path + "/" + obj["basename"]))
133         elif obj["location"].startswith("_:") and "contents" in obj:
134             with c.open(path + "/" + obj["basename"], "w") as f:
135                 f.write(obj["contents"])
136             remap.append((obj["location"], path + "/" + obj["basename"]))
137         else:
138             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
139
140     def needs_new_collection(self, srcobj, prefix=""):
141         """Check if files need to be staged into a new collection.
142
143         If all the files are in the same collection and in the same
144         paths they would be staged to, return False.  Otherwise, a new
145         collection is needed with files copied/created in the
146         appropriate places.
147         """
148
149         loc = srcobj["location"]
150         if loc.startswith("_:"):
151             return True
152         if prefix:
153             if loc != prefix+srcobj["basename"]:
154                 return True
155         else:
156             i = loc.rfind("/")
157             if i > -1:
158                 prefix = loc[:i+1]
159             else:
160                 prefix = loc+"/"
161         if srcobj["class"] == "File" and loc not in self._pathmap:
162             return True
163         for s in srcobj.get("secondaryFiles", []):
164             if self.needs_new_collection(s, prefix):
165                 return True
166         if srcobj.get("listing"):
167             prefix = "%s%s/" % (prefix, srcobj["basename"])
168             for l in srcobj["listing"]:
169                 if self.needs_new_collection(l, prefix):
170                     return True
171         return False
172
173     def setup(self, referenced_files, basedir):
174         # type: (List[Any], unicode) -> None
175         uploadfiles = set()
176
177         collection = None
178         if self.single_collection:
179             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
180                                                        keep_client=self.arvrunner.keep_client,
181                                                        num_retries=self.arvrunner.num_retries)
182
183         for srcobj in referenced_files:
184             self.visit(srcobj, uploadfiles)
185
186         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
187                                          self.arvrunner.api,
188                                          dry_run=False,
189                                          num_retries=self.arvrunner.num_retries,
190                                          fnPattern="keep:%s/%s",
191                                          name=self.name,
192                                          project=self.arvrunner.project_uuid,
193                                          collection=collection,
194                                          packed=False)
195
196         for src, ab, st in uploadfiles:
197             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
198                                            "Directory" if os.path.isdir(ab) else "File", True)
199
200         for srcobj in referenced_files:
201             remap = []
202             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
203                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
204                                                   keep_client=self.arvrunner.keep_client,
205                                                   num_retries=self.arvrunner.num_retries)
206                 for l in srcobj.get("listing", []):
207                     self.addentry(l, c, ".", remap)
208
209                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
210                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
211
212                 c.save_new(name=info["name"],
213                            owner_uuid=self.arvrunner.project_uuid,
214                            ensure_unique_name=True,
215                            trash_at=info["trash_at"],
216                            properties=info["properties"])
217
218                 ab = self.collection_pattern % c.portable_data_hash()
219                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
220             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
221                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
222
223                 # If all secondary files/directories are located in
224                 # the same collection as the primary file and the
225                 # paths and names that are consistent with staging,
226                 # don't create a new collection.
227                 if not self.needs_new_collection(srcobj):
228                     continue
229
230                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
231                                                   keep_client=self.arvrunner.keep_client,
232                                                   num_retries=self.arvrunner.num_retries                                                  )
233                 self.addentry(srcobj, c, ".", remap)
234
235                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
236                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
237
238                 c.save_new(name=info["name"],
239                            owner_uuid=self.arvrunner.project_uuid,
240                            ensure_unique_name=True,
241                            trash_at=info["trash_at"],
242                            properties=info["properties"])
243
244                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
245                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
246                                                               ab, "File", True)
247                 if srcobj.get("secondaryFiles"):
248                     ab = self.collection_pattern % c.portable_data_hash()
249                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
250
251             if remap:
252                 for loc, sub in remap:
253                     # subdirs start with "./", strip it off
254                     if sub.startswith("./"):
255                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
256                     else:
257                         ab = self.file_pattern % (c.portable_data_hash(), sub)
258                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
259                                                    ab, "Directory", True)
260
261         self.keepdir = None
262
263     def reversemap(self, target):
264         p = super(ArvPathMapper, self).reversemap(target)
265         if p:
266             return p
267         elif target.startswith("keep:"):
268             return (target, target)
269         elif self.keepdir and target.startswith(self.keepdir):
270             kp = "keep:" + target[len(self.keepdir)+1:]
271             return (kp, kp)
272         else:
273             return None
274
275
276 class StagingPathMapper(PathMapper):
277     # Note that StagingPathMapper internally maps files from target to source.
278     # Specifically, the 'self._pathmap' dict keys are the target location and the
279     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
280     # as the file identifier. This makes it possible to map an input file to multiple
281     # target directories. The exception is for file literals, which store the contents of
282     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
283
284     _follow_dirs = True
285
286     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
287         self.targets = set()
288         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
289
290     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
291         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
292         loc = obj["location"]
293         stagedir = obj.get("dirname") or stagedir
294         tgt = os.path.join(stagedir, obj["basename"])
295         basetgt, baseext = os.path.splitext(tgt)
296
297         def targetExists():
298             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
299         def literalTargetExists():
300             return tgt in self.targets and "contents" in obj
301
302         n = 1
303         if targetExists() or literalTargetExists():
304             while tgt in self.targets:
305                 n += 1
306                 tgt = "%s_%i%s" % (basetgt, n, baseext)
307         self.targets.add(tgt)
308         if obj["class"] == "Directory":
309             if obj.get("writable"):
310                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
311             else:
312                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
313             if loc.startswith("_:") or self._follow_dirs:
314                 self.visitlisting(obj.get("listing", []), tgt, basedir)
315         elif obj["class"] == "File":
316             if tgt in self._pathmap:
317                 return
318             if "contents" in obj and loc.startswith("_:"):
319                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
320             else:
321                 if copy or obj.get("writable"):
322                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
323                 else:
324                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
325                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
326
327     def mapper(self, src):  # type: (Text) -> MapperEnt.
328         # Overridden to maintain the use case of mapping by source (identifier) to
329         # target regardless of how the map is structured interally.
330         def getMapperEnt(src):
331             for k,v in viewitems(self._pathmap):
332                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
333                     return v
334
335         if u"#" in src:
336             i = src.index(u"#")
337             v = getMapperEnt(src[i:])
338             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
339         return getMapperEnt(src)
340
341
342 class VwdPathMapper(StagingPathMapper):
343     def setup(self, referenced_files, basedir):
344         # type: (List[Any], unicode) -> None
345
346         # Go through each file and set the target to its own directory along
347         # with any secondary files.
348         self.visitlisting(referenced_files, self.stagedir, basedir)
349
350         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
351             if type in ("File", "Directory") and ab.startswith("keep:"):
352                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
353
354
355 class NoFollowPathMapper(StagingPathMapper):
356     _follow_dirs = False
357     def setup(self, referenced_files, basedir):
358         # type: (List[Any], unicode) -> None
359         self.visitlisting(referenced_files, self.stagedir, basedir)