19699: Add option to defer downloads
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False,
56                  optional_deps=None):
57         self.arvrunner = arvrunner
58         self.input_basedir = input_basedir
59         self.collection_pattern = collection_pattern
60         self.file_pattern = file_pattern
61         self.name = name
62         self.referenced_files = [r["location"] for r in referenced_files]
63         self.single_collection = single_collection
64         self.pdh_to_uuid = {}
65         self.optional_deps = optional_deps or []
66         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
67
68     def visit(self, srcobj, uploadfiles):
69         src = srcobj["location"]
70         if "#" in src:
71             src = src[:src.index("#")]
72
73         debug = logger.isEnabledFor(logging.DEBUG)
74
75         if isinstance(src, basestring) and src.startswith("keep:"):
76             if collection_pdh_pattern.match(src):
77                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
78
79                 if arvados_cwl.util.collectionUUID in srcobj:
80                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81             elif not collection_uuid_pattern.match(src):
82                 with SourceLine(srcobj, "location", WorkflowException, debug):
83                     raise WorkflowException("Invalid keep reference '%s'" % src)
84
85         if src not in self._pathmap:
86             if src.startswith("file:"):
87                 # Local FS ref, may need to be uploaded or may be on keep
88                 # mount.
89                 ab = abspath(src, self.input_basedir)
90                 st = arvados.commands.run.statfile("", ab,
91                                                    fnPattern="keep:%s/%s",
92                                                    dirPattern="keep:%s/%s",
93                                                    raiseOSError=True)
94                 with SourceLine(srcobj, "location", WorkflowException, debug):
95                     if isinstance(st, arvados.commands.run.UploadFile):
96                         uploadfiles.add((src, ab, st))
97                     elif isinstance(st, arvados.commands.run.ArvFile):
98                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
99                     else:
100                         raise WorkflowException("Input file path '%s' is invalid" % st)
101             elif src.startswith("_:"):
102                 if srcobj["class"] == "File" and "contents" not in srcobj:
103                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
104                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106             elif src.startswith("http:") or src.startswith("https:"):
107                 try:
108                     if self.arvrunner.defer_downloads:
109                         # passthrough, we'll download it later.
110                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
111                     else:
112                         keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
113                         logger.info("%s is %s", src, keepref)
114                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
115                 except Exception as e:
116                     logger.warning(str(e))
117             else:
118                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
119
120         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
121             for l in srcobj.get("secondaryFiles", []):
122                 self.visit(l, uploadfiles)
123         with SourceLine(srcobj, "listing", WorkflowException, debug):
124             for l in srcobj.get("listing", []):
125                 self.visit(l, uploadfiles)
126
127     def addentry(self, obj, c, path, remap):
128         if obj["location"] in self._pathmap:
129             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
130             if srcpath == "":
131                 srcpath = "."
132             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
133             remap.append((obj["location"], path + "/" + obj["basename"]))
134             for l in obj.get("secondaryFiles", []):
135                 self.addentry(l, c, path, remap)
136         elif obj["class"] == "Directory":
137             for l in obj.get("listing", []):
138                 self.addentry(l, c, path + "/" + obj["basename"], remap)
139             remap.append((obj["location"], path + "/" + obj["basename"]))
140         elif obj["location"].startswith("_:") and "contents" in obj:
141             with c.open(path + "/" + obj["basename"], "w") as f:
142                 f.write(obj["contents"])
143             remap.append((obj["location"], path + "/" + obj["basename"]))
144         else:
145             for opt in self.optional_deps:
146                 if obj["location"] == opt["location"]:
147                     return
148             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
149
150     def needs_new_collection(self, srcobj, prefix=""):
151         """Check if files need to be staged into a new collection.
152
153         If all the files are in the same collection and in the same
154         paths they would be staged to, return False.  Otherwise, a new
155         collection is needed with files copied/created in the
156         appropriate places.
157         """
158
159         loc = srcobj["location"]
160         if loc.startswith("_:"):
161             return True
162
163         i = loc.rfind("/")
164         if i > -1:
165             loc_prefix = loc[:i+1]
166             if not prefix:
167                 prefix = loc_prefix
168             # quote/unquote to ensure consistent quoting
169             suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
170         else:
171             # no '/' found
172             loc_prefix = loc+"/"
173             prefix = loc+"/"
174             suffix = ""
175
176         if prefix != loc_prefix:
177             return True
178
179         if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
180             return True
181
182         if srcobj["class"] == "File" and loc not in self._pathmap:
183             return True
184         for s in srcobj.get("secondaryFiles", []):
185             if self.needs_new_collection(s, prefix):
186                 return True
187         if srcobj.get("listing"):
188             prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
189             for l in srcobj["listing"]:
190                 if self.needs_new_collection(l, prefix):
191                     return True
192         return False
193
194     def setup(self, referenced_files, basedir):
195         # type: (List[Any], unicode) -> None
196         uploadfiles = set()
197
198         collection = None
199         if self.single_collection:
200             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
201                                                        keep_client=self.arvrunner.keep_client,
202                                                        num_retries=self.arvrunner.num_retries)
203
204         for srcobj in referenced_files:
205             self.visit(srcobj, uploadfiles)
206
207         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
208                                          self.arvrunner.api,
209                                          dry_run=False,
210                                          num_retries=self.arvrunner.num_retries,
211                                          fnPattern="keep:%s/%s",
212                                          name=self.name,
213                                          project=self.arvrunner.project_uuid,
214                                          collection=collection,
215                                          packed=False)
216
217         for src, ab, st in uploadfiles:
218             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
219                                            "Directory" if os.path.isdir(ab) else "File", True)
220
221         for srcobj in referenced_files:
222             remap = []
223             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
224                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
225                                                   keep_client=self.arvrunner.keep_client,
226                                                   num_retries=self.arvrunner.num_retries)
227                 for l in srcobj.get("listing", []):
228                     self.addentry(l, c, ".", remap)
229
230                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
231                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
232
233                 c.save_new(name=info["name"],
234                            owner_uuid=self.arvrunner.project_uuid,
235                            ensure_unique_name=True,
236                            trash_at=info["trash_at"],
237                            properties=info["properties"])
238
239                 ab = self.collection_pattern % c.portable_data_hash()
240                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
241             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
242                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
243                                                   keep_client=self.arvrunner.keep_client,
244                                                   num_retries=self.arvrunner.num_retries)
245                 self.addentry(srcobj, c, ".", remap)
246
247                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
248                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
249
250                 c.save_new(name=info["name"],
251                            owner_uuid=self.arvrunner.project_uuid,
252                            ensure_unique_name=True,
253                            trash_at=info["trash_at"],
254                            properties=info["properties"])
255
256                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
257                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
258                                                               ab, "File", True)
259                 if srcobj.get("secondaryFiles"):
260                     ab = self.collection_pattern % c.portable_data_hash()
261                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
262
263             if remap:
264                 for loc, sub in remap:
265                     # subdirs start with "./", strip it off
266                     if sub.startswith("./"):
267                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
268                     else:
269                         ab = self.file_pattern % (c.portable_data_hash(), sub)
270                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
271                                                    ab, "Directory", True)
272
273         self.keepdir = None
274
275     def reversemap(self, target):
276         p = super(ArvPathMapper, self).reversemap(target)
277         if p:
278             return p
279         elif target.startswith("keep:"):
280             return (target, target)
281         elif self.keepdir and target.startswith(self.keepdir):
282             kp = "keep:" + target[len(self.keepdir)+1:]
283             return (kp, kp)
284         else:
285             return None
286
287
288 class StagingPathMapper(PathMapper):
289     # Note that StagingPathMapper internally maps files from target to source.
290     # Specifically, the 'self._pathmap' dict keys are the target location and the
291     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
292     # as the file identifier. This makes it possible to map an input file to multiple
293     # target directories. The exception is for file literals, which store the contents of
294     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
295
296     _follow_dirs = True
297
298     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
299         self.targets = set()
300         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
301
302     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
303         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
304         loc = obj["location"]
305         stagedir = obj.get("dirname") or stagedir
306         tgt = os.path.join(stagedir, obj["basename"])
307         basetgt, baseext = os.path.splitext(tgt)
308
309         def targetExists():
310             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
311         def literalTargetExists():
312             return tgt in self.targets and "contents" in obj
313
314         n = 1
315         if targetExists() or literalTargetExists():
316             while tgt in self.targets:
317                 n += 1
318                 tgt = "%s_%i%s" % (basetgt, n, baseext)
319         self.targets.add(tgt)
320         if obj["class"] == "Directory":
321             if obj.get("writable"):
322                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
323             else:
324                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
325             if loc.startswith("_:") or self._follow_dirs:
326                 self.visitlisting(obj.get("listing", []), tgt, basedir)
327         elif obj["class"] == "File":
328             if tgt in self._pathmap:
329                 return
330             if "contents" in obj and loc.startswith("_:"):
331                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
332             else:
333                 if copy or obj.get("writable"):
334                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
335                 else:
336                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
337                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
338
339     def mapper(self, src):  # type: (Text) -> MapperEnt.
340         # Overridden to maintain the use case of mapping by source (identifier) to
341         # target regardless of how the map is structured interally.
342         def getMapperEnt(src):
343             for k,v in viewitems(self._pathmap):
344                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
345                     return v
346
347         if u"#" in src:
348             i = src.index(u"#")
349             v = getMapperEnt(src[i:])
350             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
351         return getMapperEnt(src)
352
353
354 class VwdPathMapper(StagingPathMapper):
355     def setup(self, referenced_files, basedir):
356         # type: (List[Any], unicode) -> None
357
358         # Go through each file and set the target to its own directory along
359         # with any secondary files.
360         self.visitlisting(referenced_files, self.stagedir, basedir)
361
362         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
363             if type in ("File", "Directory") and ab.startswith("keep:"):
364                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
365
366
367 class NoFollowPathMapper(StagingPathMapper):
368     _follow_dirs = False
369     def setup(self, referenced_files, basedir):
370         # type: (List[Any], unicode) -> None
371         self.visitlisting(referenced_files, self.stagedir, basedir)