Merge branch '18947-githttpd'
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False,
56                  optional_deps=None):
57         self.arvrunner = arvrunner
58         self.input_basedir = input_basedir
59         self.collection_pattern = collection_pattern
60         self.file_pattern = file_pattern
61         self.name = name
62         self.referenced_files = [r["location"] for r in referenced_files]
63         self.single_collection = single_collection
64         self.pdh_to_uuid = {}
65         self.optional_deps = optional_deps or []
66         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
67
68     def visit(self, srcobj, uploadfiles):
69         src = srcobj["location"]
70         if "#" in src:
71             src = src[:src.index("#")]
72
73         debug = logger.isEnabledFor(logging.DEBUG)
74
75         if isinstance(src, basestring) and src.startswith("keep:"):
76             if collection_pdh_pattern.match(src):
77                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
78
79                 if arvados_cwl.util.collectionUUID in srcobj:
80                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81             elif not collection_uuid_pattern.match(src):
82                 with SourceLine(srcobj, "location", WorkflowException, debug):
83                     raise WorkflowException("Invalid keep reference '%s'" % src)
84
85         if src not in self._pathmap:
86             if src.startswith("file:"):
87                 # Local FS ref, may need to be uploaded or may be on keep
88                 # mount.
89                 ab = abspath(src, self.input_basedir)
90                 st = arvados.commands.run.statfile("", ab,
91                                                    fnPattern="keep:%s/%s",
92                                                    dirPattern="keep:%s/%s",
93                                                    raiseOSError=True)
94                 with SourceLine(srcobj, "location", WorkflowException, debug):
95                     if isinstance(st, arvados.commands.run.UploadFile):
96                         uploadfiles.add((src, ab, st))
97                     elif isinstance(st, arvados.commands.run.ArvFile):
98                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
99                     else:
100                         raise WorkflowException("Input file path '%s' is invalid" % st)
101             elif src.startswith("_:"):
102                 if srcobj["class"] == "File" and "contents" not in srcobj:
103                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
104                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106             elif src.startswith("http:") or src.startswith("https:"):
107                 try:
108                     keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
109                     logger.info("%s is %s", src, keepref)
110                     self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
111                 except Exception as e:
112                     logger.warning(str(e))
113             else:
114                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
115
116         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
117             for l in srcobj.get("secondaryFiles", []):
118                 self.visit(l, uploadfiles)
119         with SourceLine(srcobj, "listing", WorkflowException, debug):
120             for l in srcobj.get("listing", []):
121                 self.visit(l, uploadfiles)
122
123     def addentry(self, obj, c, path, remap):
124         if obj["location"] in self._pathmap:
125             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
126             if srcpath == "":
127                 srcpath = "."
128             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
129             remap.append((obj["location"], path + "/" + obj["basename"]))
130             for l in obj.get("secondaryFiles", []):
131                 self.addentry(l, c, path, remap)
132         elif obj["class"] == "Directory":
133             for l in obj.get("listing", []):
134                 self.addentry(l, c, path + "/" + obj["basename"], remap)
135             remap.append((obj["location"], path + "/" + obj["basename"]))
136         elif obj["location"].startswith("_:") and "contents" in obj:
137             with c.open(path + "/" + obj["basename"], "w") as f:
138                 f.write(obj["contents"])
139             remap.append((obj["location"], path + "/" + obj["basename"]))
140         else:
141             for opt in self.optional_deps:
142                 if obj["location"] == opt["location"]:
143                     return
144             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
145
146     def needs_new_collection(self, srcobj, prefix=""):
147         """Check if files need to be staged into a new collection.
148
149         If all the files are in the same collection and in the same
150         paths they would be staged to, return False.  Otherwise, a new
151         collection is needed with files copied/created in the
152         appropriate places.
153         """
154
155         loc = srcobj["location"]
156         if loc.startswith("_:"):
157             return True
158
159         i = loc.rfind("/")
160         if i > -1:
161             loc_prefix = loc[:i+1]
162             if not prefix:
163                 prefix = loc_prefix
164             # quote/unquote to ensure consistent quoting
165             suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
166         else:
167             # no '/' found
168             loc_prefix = loc+"/"
169             prefix = loc+"/"
170             suffix = ""
171
172         if prefix != loc_prefix:
173             return True
174
175         if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
176             return True
177
178         if srcobj["class"] == "File" and loc not in self._pathmap:
179             return True
180         for s in srcobj.get("secondaryFiles", []):
181             if self.needs_new_collection(s, prefix):
182                 return True
183         if srcobj.get("listing"):
184             prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
185             for l in srcobj["listing"]:
186                 if self.needs_new_collection(l, prefix):
187                     return True
188         return False
189
190     def setup(self, referenced_files, basedir):
191         # type: (List[Any], unicode) -> None
192         uploadfiles = set()
193
194         collection = None
195         if self.single_collection:
196             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
197                                                        keep_client=self.arvrunner.keep_client,
198                                                        num_retries=self.arvrunner.num_retries)
199
200         for srcobj in referenced_files:
201             self.visit(srcobj, uploadfiles)
202
203         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
204                                          self.arvrunner.api,
205                                          dry_run=False,
206                                          num_retries=self.arvrunner.num_retries,
207                                          fnPattern="keep:%s/%s",
208                                          name=self.name,
209                                          project=self.arvrunner.project_uuid,
210                                          collection=collection,
211                                          packed=False)
212
213         for src, ab, st in uploadfiles:
214             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
215                                            "Directory" if os.path.isdir(ab) else "File", True)
216
217         for srcobj in referenced_files:
218             remap = []
219             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
220                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
221                                                   keep_client=self.arvrunner.keep_client,
222                                                   num_retries=self.arvrunner.num_retries)
223                 for l in srcobj.get("listing", []):
224                     self.addentry(l, c, ".", remap)
225
226                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
227                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
228
229                 c.save_new(name=info["name"],
230                            owner_uuid=self.arvrunner.project_uuid,
231                            ensure_unique_name=True,
232                            trash_at=info["trash_at"],
233                            properties=info["properties"])
234
235                 ab = self.collection_pattern % c.portable_data_hash()
236                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
237             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
238                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
239                                                   keep_client=self.arvrunner.keep_client,
240                                                   num_retries=self.arvrunner.num_retries)
241                 self.addentry(srcobj, c, ".", remap)
242
243                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
244                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
245
246                 c.save_new(name=info["name"],
247                            owner_uuid=self.arvrunner.project_uuid,
248                            ensure_unique_name=True,
249                            trash_at=info["trash_at"],
250                            properties=info["properties"])
251
252                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
253                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
254                                                               ab, "File", True)
255                 if srcobj.get("secondaryFiles"):
256                     ab = self.collection_pattern % c.portable_data_hash()
257                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
258
259             if remap:
260                 for loc, sub in remap:
261                     # subdirs start with "./", strip it off
262                     if sub.startswith("./"):
263                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
264                     else:
265                         ab = self.file_pattern % (c.portable_data_hash(), sub)
266                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
267                                                    ab, "Directory", True)
268
269         self.keepdir = None
270
271     def reversemap(self, target):
272         p = super(ArvPathMapper, self).reversemap(target)
273         if p:
274             return p
275         elif target.startswith("keep:"):
276             return (target, target)
277         elif self.keepdir and target.startswith(self.keepdir):
278             kp = "keep:" + target[len(self.keepdir)+1:]
279             return (kp, kp)
280         else:
281             return None
282
283
284 class StagingPathMapper(PathMapper):
285     # Note that StagingPathMapper internally maps files from target to source.
286     # Specifically, the 'self._pathmap' dict keys are the target location and the
287     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
288     # as the file identifier. This makes it possible to map an input file to multiple
289     # target directories. The exception is for file literals, which store the contents of
290     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
291
292     _follow_dirs = True
293
294     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
295         self.targets = set()
296         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
297
298     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
299         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
300         loc = obj["location"]
301         stagedir = obj.get("dirname") or stagedir
302         tgt = os.path.join(stagedir, obj["basename"])
303         basetgt, baseext = os.path.splitext(tgt)
304
305         def targetExists():
306             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
307         def literalTargetExists():
308             return tgt in self.targets and "contents" in obj
309
310         n = 1
311         if targetExists() or literalTargetExists():
312             while tgt in self.targets:
313                 n += 1
314                 tgt = "%s_%i%s" % (basetgt, n, baseext)
315         self.targets.add(tgt)
316         if obj["class"] == "Directory":
317             if obj.get("writable"):
318                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
319             else:
320                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
321             if loc.startswith("_:") or self._follow_dirs:
322                 self.visitlisting(obj.get("listing", []), tgt, basedir)
323         elif obj["class"] == "File":
324             if tgt in self._pathmap:
325                 return
326             if "contents" in obj and loc.startswith("_:"):
327                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
328             else:
329                 if copy or obj.get("writable"):
330                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
331                 else:
332                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
333                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
334
335     def mapper(self, src):  # type: (Text) -> MapperEnt.
336         # Overridden to maintain the use case of mapping by source (identifier) to
337         # target regardless of how the map is structured interally.
338         def getMapperEnt(src):
339             for k,v in viewitems(self._pathmap):
340                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
341                     return v
342
343         if u"#" in src:
344             i = src.index(u"#")
345             v = getMapperEnt(src[i:])
346             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
347         return getMapperEnt(src)
348
349
350 class VwdPathMapper(StagingPathMapper):
351     def setup(self, referenced_files, basedir):
352         # type: (List[Any], unicode) -> None
353
354         # Go through each file and set the target to its own directory along
355         # with any secondary files.
356         self.visitlisting(referenced_files, self.stagedir, basedir)
357
358         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
359             if type in ("File", "Directory") and ab.startswith("keep:"):
360                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
361
362
363 class NoFollowPathMapper(StagingPathMapper):
364     _follow_dirs = False
365     def setup(self, referenced_files, basedir):
366         # type: (List[Any], unicode) -> None
367         self.visitlisting(referenced_files, self.stagedir, basedir)