18994: Fix for colon characters in filenames
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False,
56                  optional_deps=None):
57         self.arvrunner = arvrunner
58         self.input_basedir = input_basedir
59         self.collection_pattern = collection_pattern
60         self.file_pattern = file_pattern
61         self.name = name
62         self.referenced_files = [r["location"] for r in referenced_files]
63         self.single_collection = single_collection
64         self.pdh_to_uuid = {}
65         self.optional_deps = optional_deps or []
66         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
67
68     def visit(self, srcobj, uploadfiles):
69         src = srcobj["location"]
70         if "#" in src:
71             src = src[:src.index("#")]
72
73         debug = logger.isEnabledFor(logging.DEBUG)
74
75         if isinstance(src, basestring) and src.startswith("keep:"):
76             if collection_pdh_pattern.match(src):
77                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
78
79                 if arvados_cwl.util.collectionUUID in srcobj:
80                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81             elif not collection_uuid_pattern.match(src):
82                 with SourceLine(srcobj, "location", WorkflowException, debug):
83                     raise WorkflowException("Invalid keep reference '%s'" % src)
84
85         if src not in self._pathmap:
86             if src.startswith("file:"):
87                 # Local FS ref, may need to be uploaded or may be on keep
88                 # mount.
89                 ab = abspath(src, self.input_basedir)
90                 st = arvados.commands.run.statfile("", ab,
91                                                    fnPattern="keep:%s/%s",
92                                                    dirPattern="keep:%s/%s",
93                                                    raiseOSError=True)
94                 with SourceLine(srcobj, "location", WorkflowException, debug):
95                     if isinstance(st, arvados.commands.run.UploadFile):
96                         uploadfiles.add((src, ab, st))
97                     elif isinstance(st, arvados.commands.run.ArvFile):
98                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
99                     else:
100                         raise WorkflowException("Input file path '%s' is invalid" % st)
101             elif src.startswith("_:"):
102                 if srcobj["class"] == "File" and "contents" not in srcobj:
103                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
104                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106             elif src.startswith("http:") or src.startswith("https:"):
107                 try:
108                     keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
109                     logger.info("%s is %s", src, keepref)
110                     self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
111                 except Exception as e:
112                     logger.warning(str(e))
113             else:
114                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
115
116         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
117             for l in srcobj.get("secondaryFiles", []):
118                 self.visit(l, uploadfiles)
119         with SourceLine(srcobj, "listing", WorkflowException, debug):
120             for l in srcobj.get("listing", []):
121                 self.visit(l, uploadfiles)
122
123     def addentry(self, obj, c, path, remap):
124         if obj["location"] in self._pathmap:
125             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
126             if srcpath == "":
127                 srcpath = "."
128             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
129             remap.append((obj["location"], path + "/" + obj["basename"]))
130             for l in obj.get("secondaryFiles", []):
131                 self.addentry(l, c, path, remap)
132         elif obj["class"] == "Directory":
133             for l in obj.get("listing", []):
134                 self.addentry(l, c, path + "/" + obj["basename"], remap)
135             remap.append((obj["location"], path + "/" + obj["basename"]))
136         elif obj["location"].startswith("_:") and "contents" in obj:
137             with c.open(path + "/" + obj["basename"], "w") as f:
138                 f.write(obj["contents"])
139             remap.append((obj["location"], path + "/" + obj["basename"]))
140         else:
141             for opt in self.optional_deps:
142                 if obj["location"] == opt["location"]:
143                     return
144             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
145
146     def needs_new_collection(self, srcobj, prefix=""):
147         """Check if files need to be staged into a new collection.
148
149         If all the files are in the same collection and in the same
150         paths they would be staged to, return False.  Otherwise, a new
151         collection is needed with files copied/created in the
152         appropriate places.
153         """
154
155         loc = srcobj["location"]
156         if loc.startswith("_:"):
157             return True
158
159         if not prefix:
160             i = loc.rfind("/")
161             if i > -1:
162                 prefix = loc[:i+1]
163                 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
164             else:
165                 prefix = loc+"/"
166         else:
167             suffix = loc[len(prefix):]
168
169         if "basename" in srcobj and prefix+suffix != prefix+urllib.parse.quote(srcobj["basename"], "/+@"):
170             return True
171
172         if srcobj["class"] == "File" and loc not in self._pathmap:
173             return True
174         for s in srcobj.get("secondaryFiles", []):
175             if self.needs_new_collection(s, prefix):
176                 return True
177         if srcobj.get("listing"):
178             prefix = "%s%s/" % (prefix, srcobj["basename"])
179             for l in srcobj["listing"]:
180                 if self.needs_new_collection(l, prefix):
181                     return True
182         return False
183
184     def setup(self, referenced_files, basedir):
185         # type: (List[Any], unicode) -> None
186         uploadfiles = set()
187
188         collection = None
189         if self.single_collection:
190             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
191                                                        keep_client=self.arvrunner.keep_client,
192                                                        num_retries=self.arvrunner.num_retries)
193
194         for srcobj in referenced_files:
195             self.visit(srcobj, uploadfiles)
196
197         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
198                                          self.arvrunner.api,
199                                          dry_run=False,
200                                          num_retries=self.arvrunner.num_retries,
201                                          fnPattern="keep:%s/%s",
202                                          name=self.name,
203                                          project=self.arvrunner.project_uuid,
204                                          collection=collection,
205                                          packed=False)
206
207         for src, ab, st in uploadfiles:
208             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
209                                            "Directory" if os.path.isdir(ab) else "File", True)
210
211         for srcobj in referenced_files:
212             remap = []
213             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
214                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
215                                                   keep_client=self.arvrunner.keep_client,
216                                                   num_retries=self.arvrunner.num_retries)
217                 for l in srcobj.get("listing", []):
218                     self.addentry(l, c, ".", remap)
219
220                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
221                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
222
223                 c.save_new(name=info["name"],
224                            owner_uuid=self.arvrunner.project_uuid,
225                            ensure_unique_name=True,
226                            trash_at=info["trash_at"],
227                            properties=info["properties"])
228
229                 ab = self.collection_pattern % c.portable_data_hash()
230                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
231             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
232                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
233                                                   keep_client=self.arvrunner.keep_client,
234                                                   num_retries=self.arvrunner.num_retries)
235                 self.addentry(srcobj, c, ".", remap)
236
237                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
238                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
239
240                 c.save_new(name=info["name"],
241                            owner_uuid=self.arvrunner.project_uuid,
242                            ensure_unique_name=True,
243                            trash_at=info["trash_at"],
244                            properties=info["properties"])
245
246                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
247                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
248                                                               ab, "File", True)
249                 if srcobj.get("secondaryFiles"):
250                     ab = self.collection_pattern % c.portable_data_hash()
251                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
252
253             if remap:
254                 for loc, sub in remap:
255                     # subdirs start with "./", strip it off
256                     if sub.startswith("./"):
257                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
258                     else:
259                         ab = self.file_pattern % (c.portable_data_hash(), sub)
260                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
261                                                    ab, "Directory", True)
262
263         self.keepdir = None
264
265     def reversemap(self, target):
266         p = super(ArvPathMapper, self).reversemap(target)
267         if p:
268             return p
269         elif target.startswith("keep:"):
270             return (target, target)
271         elif self.keepdir and target.startswith(self.keepdir):
272             kp = "keep:" + target[len(self.keepdir)+1:]
273             return (kp, kp)
274         else:
275             return None
276
277
278 class StagingPathMapper(PathMapper):
279     # Note that StagingPathMapper internally maps files from target to source.
280     # Specifically, the 'self._pathmap' dict keys are the target location and the
281     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
282     # as the file identifier. This makes it possible to map an input file to multiple
283     # target directories. The exception is for file literals, which store the contents of
284     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
285
286     _follow_dirs = True
287
288     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
289         self.targets = set()
290         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
291
292     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
293         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
294         loc = obj["location"]
295         stagedir = obj.get("dirname") or stagedir
296         tgt = os.path.join(stagedir, obj["basename"])
297         basetgt, baseext = os.path.splitext(tgt)
298
299         def targetExists():
300             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
301         def literalTargetExists():
302             return tgt in self.targets and "contents" in obj
303
304         n = 1
305         if targetExists() or literalTargetExists():
306             while tgt in self.targets:
307                 n += 1
308                 tgt = "%s_%i%s" % (basetgt, n, baseext)
309         self.targets.add(tgt)
310         if obj["class"] == "Directory":
311             if obj.get("writable"):
312                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
313             else:
314                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
315             if loc.startswith("_:") or self._follow_dirs:
316                 self.visitlisting(obj.get("listing", []), tgt, basedir)
317         elif obj["class"] == "File":
318             if tgt in self._pathmap:
319                 return
320             if "contents" in obj and loc.startswith("_:"):
321                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
322             else:
323                 if copy or obj.get("writable"):
324                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
325                 else:
326                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
327                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
328
329     def mapper(self, src):  # type: (Text) -> MapperEnt.
330         # Overridden to maintain the use case of mapping by source (identifier) to
331         # target regardless of how the map is structured interally.
332         def getMapperEnt(src):
333             for k,v in viewitems(self._pathmap):
334                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
335                     return v
336
337         if u"#" in src:
338             i = src.index(u"#")
339             v = getMapperEnt(src[i:])
340             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
341         return getMapperEnt(src)
342
343
344 class VwdPathMapper(StagingPathMapper):
345     def setup(self, referenced_files, basedir):
346         # type: (List[Any], unicode) -> None
347
348         # Go through each file and set the target to its own directory along
349         # with any secondary files.
350         self.visitlisting(referenced_files, self.stagedir, basedir)
351
352         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
353             if type in ("File", "Directory") and ab.startswith("keep:"):
354                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
355
356
357 class NoFollowPathMapper(StagingPathMapper):
358     _follow_dirs = False
359     def setup(self, referenced_files, basedir):
360         # type: (List[Any], unicode) -> None
361         self.visitlisting(referenced_files, self.stagedir, basedir)