18994: Add test case, correctly re-stage files when basename is changed.
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False):
56         self.arvrunner = arvrunner
57         self.input_basedir = input_basedir
58         self.collection_pattern = collection_pattern
59         self.file_pattern = file_pattern
60         self.name = name
61         self.referenced_files = [r["location"] for r in referenced_files]
62         self.single_collection = single_collection
63         self.pdh_to_uuid = {}
64         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
65
66     def visit(self, srcobj, uploadfiles):
67         src = srcobj["location"]
68         if "#" in src:
69             src = src[:src.index("#")]
70
71         debug = logger.isEnabledFor(logging.DEBUG)
72
73         if isinstance(src, basestring) and src.startswith("keep:"):
74             if collection_pdh_pattern.match(src):
75                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
76                 if arvados_cwl.util.collectionUUID in srcobj:
77                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
78             elif not collection_uuid_pattern.match(src):
79                 with SourceLine(srcobj, "location", WorkflowException, debug):
80                     raise WorkflowException("Invalid keep reference '%s'" % src)
81
82         if src not in self._pathmap:
83             if src.startswith("file:"):
84                 # Local FS ref, may need to be uploaded or may be on keep
85                 # mount.
86                 ab = abspath(src, self.input_basedir)
87                 st = arvados.commands.run.statfile("", ab,
88                                                    fnPattern="keep:%s/%s",
89                                                    dirPattern="keep:%s/%s",
90                                                    raiseOSError=True)
91                 with SourceLine(srcobj, "location", WorkflowException, debug):
92                     if isinstance(st, arvados.commands.run.UploadFile):
93                         uploadfiles.add((src, ab, st))
94                     elif isinstance(st, arvados.commands.run.ArvFile):
95                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
96                     else:
97                         raise WorkflowException("Input file path '%s' is invalid" % st)
98             elif src.startswith("_:"):
99                 if srcobj["class"] == "File" and "contents" not in srcobj:
100                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
101                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
102                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
103             elif src.startswith("http:") or src.startswith("https:"):
104                 try:
105                     keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
106                     logger.info("%s is %s", src, keepref)
107                     self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
108                 except Exception as e:
109                     logger.warning(str(e))
110             else:
111                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
112
113         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
114             for l in srcobj.get("secondaryFiles", []):
115                 self.visit(l, uploadfiles)
116         with SourceLine(srcobj, "listing", WorkflowException, debug):
117             for l in srcobj.get("listing", []):
118                 self.visit(l, uploadfiles)
119
120     def addentry(self, obj, c, path, remap):
121         if obj["location"] in self._pathmap:
122             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
123             if srcpath == "":
124                 srcpath = "."
125             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
126             remap.append((obj["location"], path + "/" + obj["basename"]))
127             for l in obj.get("secondaryFiles", []):
128                 self.addentry(l, c, path, remap)
129         elif obj["class"] == "Directory":
130             for l in obj.get("listing", []):
131                 self.addentry(l, c, path + "/" + obj["basename"], remap)
132             remap.append((obj["location"], path + "/" + obj["basename"]))
133         elif obj["location"].startswith("_:") and "contents" in obj:
134             with c.open(path + "/" + obj["basename"], "w") as f:
135                 f.write(obj["contents"])
136             remap.append((obj["location"], path + "/" + obj["basename"]))
137         else:
138             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
139
140     def needs_new_collection(self, srcobj, prefix=""):
141         """Check if files need to be staged into a new collection.
142
143         If all the files are in the same collection and in the same
144         paths they would be staged to, return False.  Otherwise, a new
145         collection is needed with files copied/created in the
146         appropriate places.
147         """
148
149         loc = srcobj["location"]
150         if loc.startswith("_:"):
151             return True
152         if not prefix:
153             i = loc.rfind("/")
154             if i > -1:
155                 prefix = loc[:i+1]
156             else:
157                 prefix = loc+"/"
158
159         if loc != prefix+srcobj["basename"]:
160             return True
161
162         if srcobj["class"] == "File" and loc not in self._pathmap:
163             return True
164         for s in srcobj.get("secondaryFiles", []):
165             if self.needs_new_collection(s, prefix):
166                 return True
167         if srcobj.get("listing"):
168             prefix = "%s%s/" % (prefix, srcobj["basename"])
169             for l in srcobj["listing"]:
170                 if self.needs_new_collection(l, prefix):
171                     return True
172         return False
173
174     def setup(self, referenced_files, basedir):
175         # type: (List[Any], unicode) -> None
176         uploadfiles = set()
177
178         collection = None
179         if self.single_collection:
180             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
181                                                        keep_client=self.arvrunner.keep_client,
182                                                        num_retries=self.arvrunner.num_retries)
183
184         for srcobj in referenced_files:
185             self.visit(srcobj, uploadfiles)
186
187         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
188                                          self.arvrunner.api,
189                                          dry_run=False,
190                                          num_retries=self.arvrunner.num_retries,
191                                          fnPattern="keep:%s/%s",
192                                          name=self.name,
193                                          project=self.arvrunner.project_uuid,
194                                          collection=collection,
195                                          packed=False)
196
197         for src, ab, st in uploadfiles:
198             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
199                                            "Directory" if os.path.isdir(ab) else "File", True)
200
201         for srcobj in referenced_files:
202             print("na na na", srcobj, srcobj["location"].endswith("/"+srcobj["basename"]))
203             remap = []
204             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
205                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
206                                                   keep_client=self.arvrunner.keep_client,
207                                                   num_retries=self.arvrunner.num_retries)
208                 for l in srcobj.get("listing", []):
209                     self.addentry(l, c, ".", remap)
210
211                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
212                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
213
214                 c.save_new(name=info["name"],
215                            owner_uuid=self.arvrunner.project_uuid,
216                            ensure_unique_name=True,
217                            trash_at=info["trash_at"],
218                            properties=info["properties"])
219
220                 ab = self.collection_pattern % c.portable_data_hash()
221                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
222             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
223                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
224                                                   keep_client=self.arvrunner.keep_client,
225                                                   num_retries=self.arvrunner.num_retries                                                  )
226                 self.addentry(srcobj, c, ".", remap)
227
228                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
229                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
230
231                 c.save_new(name=info["name"],
232                            owner_uuid=self.arvrunner.project_uuid,
233                            ensure_unique_name=True,
234                            trash_at=info["trash_at"],
235                            properties=info["properties"])
236
237                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
238                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
239                                                               ab, "File", True)
240                 if srcobj.get("secondaryFiles"):
241                     ab = self.collection_pattern % c.portable_data_hash()
242                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
243
244             if remap:
245                 for loc, sub in remap:
246                     # subdirs start with "./", strip it off
247                     if sub.startswith("./"):
248                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
249                     else:
250                         ab = self.file_pattern % (c.portable_data_hash(), sub)
251                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
252                                                    ab, "Directory", True)
253
254         self.keepdir = None
255
256     def reversemap(self, target):
257         p = super(ArvPathMapper, self).reversemap(target)
258         if p:
259             return p
260         elif target.startswith("keep:"):
261             return (target, target)
262         elif self.keepdir and target.startswith(self.keepdir):
263             kp = "keep:" + target[len(self.keepdir)+1:]
264             return (kp, kp)
265         else:
266             return None
267
268
269 class StagingPathMapper(PathMapper):
270     # Note that StagingPathMapper internally maps files from target to source.
271     # Specifically, the 'self._pathmap' dict keys are the target location and the
272     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
273     # as the file identifier. This makes it possible to map an input file to multiple
274     # target directories. The exception is for file literals, which store the contents of
275     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
276
277     _follow_dirs = True
278
279     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
280         self.targets = set()
281         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
282
283     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
284         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
285         loc = obj["location"]
286         stagedir = obj.get("dirname") or stagedir
287         tgt = os.path.join(stagedir, obj["basename"])
288         basetgt, baseext = os.path.splitext(tgt)
289
290         def targetExists():
291             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
292         def literalTargetExists():
293             return tgt in self.targets and "contents" in obj
294
295         n = 1
296         if targetExists() or literalTargetExists():
297             while tgt in self.targets:
298                 n += 1
299                 tgt = "%s_%i%s" % (basetgt, n, baseext)
300         self.targets.add(tgt)
301         if obj["class"] == "Directory":
302             if obj.get("writable"):
303                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
304             else:
305                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
306             if loc.startswith("_:") or self._follow_dirs:
307                 self.visitlisting(obj.get("listing", []), tgt, basedir)
308         elif obj["class"] == "File":
309             if tgt in self._pathmap:
310                 return
311             if "contents" in obj and loc.startswith("_:"):
312                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
313             else:
314                 if copy or obj.get("writable"):
315                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
316                 else:
317                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
318                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
319
320     def mapper(self, src):  # type: (Text) -> MapperEnt.
321         # Overridden to maintain the use case of mapping by source (identifier) to
322         # target regardless of how the map is structured interally.
323         def getMapperEnt(src):
324             for k,v in viewitems(self._pathmap):
325                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
326                     return v
327
328         if u"#" in src:
329             i = src.index(u"#")
330             v = getMapperEnt(src[i:])
331             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
332         return getMapperEnt(src)
333
334
335 class VwdPathMapper(StagingPathMapper):
336     def setup(self, referenced_files, basedir):
337         # type: (List[Any], unicode) -> None
338
339         # Go through each file and set the target to its own directory along
340         # with any secondary files.
341         self.visitlisting(referenced_files, self.stagedir, basedir)
342
343         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
344             if type in ("File", "Directory") and ab.startswith("keep:"):
345                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
346
347
348 class NoFollowPathMapper(StagingPathMapper):
349     _follow_dirs = False
350     def setup(self, referenced_files, basedir):
351         # type: (List[Any], unicode) -> None
352         self.visitlisting(referenced_files, self.stagedir, basedir)