19699: Add --varying-url-params
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False,
56                  optional_deps=None):
57         self.arvrunner = arvrunner
58         self.input_basedir = input_basedir
59         self.collection_pattern = collection_pattern
60         self.file_pattern = file_pattern
61         self.name = name
62         self.referenced_files = [r["location"] for r in referenced_files]
63         self.single_collection = single_collection
64         self.pdh_to_uuid = {}
65         self.optional_deps = optional_deps or []
66         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
67
68     def visit(self, srcobj, uploadfiles):
69         src = srcobj["location"]
70         if "#" in src:
71             src = src[:src.index("#")]
72
73         debug = logger.isEnabledFor(logging.DEBUG)
74
75         if isinstance(src, basestring) and src.startswith("keep:"):
76             if collection_pdh_pattern.match(src):
77                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
78
79                 if arvados_cwl.util.collectionUUID in srcobj:
80                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81             elif not collection_uuid_pattern.match(src):
82                 with SourceLine(srcobj, "location", WorkflowException, debug):
83                     raise WorkflowException("Invalid keep reference '%s'" % src)
84
85         if src not in self._pathmap:
86             if src.startswith("file:"):
87                 # Local FS ref, may need to be uploaded or may be on keep
88                 # mount.
89                 ab = abspath(src, self.input_basedir)
90                 st = arvados.commands.run.statfile("", ab,
91                                                    fnPattern="keep:%s/%s",
92                                                    dirPattern="keep:%s/%s",
93                                                    raiseOSError=True)
94                 with SourceLine(srcobj, "location", WorkflowException, debug):
95                     if isinstance(st, arvados.commands.run.UploadFile):
96                         uploadfiles.add((src, ab, st))
97                     elif isinstance(st, arvados.commands.run.ArvFile):
98                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
99                     else:
100                         raise WorkflowException("Input file path '%s' is invalid" % st)
101             elif src.startswith("_:"):
102                 if srcobj["class"] == "File" and "contents" not in srcobj:
103                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
104                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106             elif src.startswith("http:") or src.startswith("https:"):
107                 try:
108                     if self.arvrunner.defer_downloads:
109                         # passthrough, we'll download it later.
110                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
111                     else:
112                         keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src, varying_url_params=self.arvrunner.varying_url_params)
113                         logger.info("%s is %s", src, keepref)
114                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
115                 except Exception as e:
116                     logger.warning(str(e))
117             else:
118                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
119
120         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
121             for l in srcobj.get("secondaryFiles", []):
122                 self.visit(l, uploadfiles)
123         with SourceLine(srcobj, "listing", WorkflowException, debug):
124             for l in srcobj.get("listing", []):
125                 self.visit(l, uploadfiles)
126
127     def addentry(self, obj, c, path, remap):
128         if obj["location"] in self._pathmap:
129             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
130             if srcpath == "":
131                 srcpath = "."
132             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
133             remap.append((obj["location"], path + "/" + obj["basename"]))
134             for l in obj.get("secondaryFiles", []):
135                 self.addentry(l, c, path, remap)
136         elif obj["class"] == "Directory":
137             for l in obj.get("listing", []):
138                 self.addentry(l, c, path + "/" + obj["basename"], remap)
139             remap.append((obj["location"], path + "/" + obj["basename"]))
140         elif obj["location"].startswith("_:") and "contents" in obj:
141             with c.open(path + "/" + obj["basename"], "w") as f:
142                 f.write(obj["contents"])
143             remap.append((obj["location"], path + "/" + obj["basename"]))
144         else:
145             for opt in self.optional_deps:
146                 if obj["location"] == opt["location"]:
147                     return
148             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
149
150     def needs_new_collection(self, srcobj, prefix=""):
151         """Check if files need to be staged into a new collection.
152
153         If all the files are in the same collection and in the same
154         paths they would be staged to, return False.  Otherwise, a new
155         collection is needed with files copied/created in the
156         appropriate places.
157         """
158
159         loc = srcobj["location"]
160         if loc.startswith("_:"):
161             return True
162
163         if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
164             return False
165
166         i = loc.rfind("/")
167         if i > -1:
168             loc_prefix = loc[:i+1]
169             if not prefix:
170                 prefix = loc_prefix
171             # quote/unquote to ensure consistent quoting
172             suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
173         else:
174             # no '/' found
175             loc_prefix = loc+"/"
176             prefix = loc+"/"
177             suffix = ""
178
179         if prefix != loc_prefix:
180             return True
181
182         if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
183             return True
184
185         if srcobj["class"] == "File" and loc not in self._pathmap:
186             return True
187         for s in srcobj.get("secondaryFiles", []):
188             if self.needs_new_collection(s, prefix):
189                 return True
190         if srcobj.get("listing"):
191             prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
192             for l in srcobj["listing"]:
193                 if self.needs_new_collection(l, prefix):
194                     return True
195         return False
196
197     def setup(self, referenced_files, basedir):
198         # type: (List[Any], unicode) -> None
199         uploadfiles = set()
200
201         collection = None
202         if self.single_collection:
203             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
204                                                        keep_client=self.arvrunner.keep_client,
205                                                        num_retries=self.arvrunner.num_retries)
206
207         for srcobj in referenced_files:
208             self.visit(srcobj, uploadfiles)
209
210         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
211                                          self.arvrunner.api,
212                                          dry_run=False,
213                                          num_retries=self.arvrunner.num_retries,
214                                          fnPattern="keep:%s/%s",
215                                          name=self.name,
216                                          project=self.arvrunner.project_uuid,
217                                          collection=collection,
218                                          packed=False)
219
220         for src, ab, st in uploadfiles:
221             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
222                                            "Directory" if os.path.isdir(ab) else "File", True)
223
224         for srcobj in referenced_files:
225             remap = []
226             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
227                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
228                                                   keep_client=self.arvrunner.keep_client,
229                                                   num_retries=self.arvrunner.num_retries)
230                 for l in srcobj.get("listing", []):
231                     self.addentry(l, c, ".", remap)
232
233                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
234                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
235
236                 c.save_new(name=info["name"],
237                            owner_uuid=self.arvrunner.project_uuid,
238                            ensure_unique_name=True,
239                            trash_at=info["trash_at"],
240                            properties=info["properties"])
241
242                 ab = self.collection_pattern % c.portable_data_hash()
243                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
244             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
245                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
246                                                   keep_client=self.arvrunner.keep_client,
247                                                   num_retries=self.arvrunner.num_retries)
248                 self.addentry(srcobj, c, ".", remap)
249
250                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
251                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
252
253                 c.save_new(name=info["name"],
254                            owner_uuid=self.arvrunner.project_uuid,
255                            ensure_unique_name=True,
256                            trash_at=info["trash_at"],
257                            properties=info["properties"])
258
259                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
260                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
261                                                               ab, "File", True)
262                 if srcobj.get("secondaryFiles"):
263                     ab = self.collection_pattern % c.portable_data_hash()
264                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
265
266             if remap:
267                 for loc, sub in remap:
268                     # subdirs start with "./", strip it off
269                     if sub.startswith("./"):
270                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
271                     else:
272                         ab = self.file_pattern % (c.portable_data_hash(), sub)
273                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
274                                                    ab, "Directory", True)
275
276         self.keepdir = None
277
278     def reversemap(self, target):
279         p = super(ArvPathMapper, self).reversemap(target)
280         if p:
281             return p
282         elif target.startswith("keep:"):
283             return (target, target)
284         elif self.keepdir and target.startswith(self.keepdir):
285             kp = "keep:" + target[len(self.keepdir)+1:]
286             return (kp, kp)
287         else:
288             return None
289
290
291 class StagingPathMapper(PathMapper):
292     # Note that StagingPathMapper internally maps files from target to source.
293     # Specifically, the 'self._pathmap' dict keys are the target location and the
294     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
295     # as the file identifier. This makes it possible to map an input file to multiple
296     # target directories. The exception is for file literals, which store the contents of
297     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
298
299     _follow_dirs = True
300
301     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
302         self.targets = set()
303         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
304
305     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
306         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
307         loc = obj["location"]
308         stagedir = obj.get("dirname") or stagedir
309         tgt = os.path.join(stagedir, obj["basename"])
310         basetgt, baseext = os.path.splitext(tgt)
311
312         def targetExists():
313             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
314         def literalTargetExists():
315             return tgt in self.targets and "contents" in obj
316
317         n = 1
318         if targetExists() or literalTargetExists():
319             while tgt in self.targets:
320                 n += 1
321                 tgt = "%s_%i%s" % (basetgt, n, baseext)
322         self.targets.add(tgt)
323         if obj["class"] == "Directory":
324             if obj.get("writable"):
325                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
326             else:
327                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
328             if loc.startswith("_:") or self._follow_dirs:
329                 self.visitlisting(obj.get("listing", []), tgt, basedir)
330         elif obj["class"] == "File":
331             if tgt in self._pathmap:
332                 return
333             if "contents" in obj and loc.startswith("_:"):
334                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
335             else:
336                 if copy or obj.get("writable"):
337                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
338                 else:
339                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
340                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
341
342     def mapper(self, src):  # type: (Text) -> MapperEnt.
343         # Overridden to maintain the use case of mapping by source (identifier) to
344         # target regardless of how the map is structured interally.
345         def getMapperEnt(src):
346             for k,v in viewitems(self._pathmap):
347                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
348                     return v
349
350         if u"#" in src:
351             i = src.index(u"#")
352             v = getMapperEnt(src[i:])
353             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
354         return getMapperEnt(src)
355
356
357 class VwdPathMapper(StagingPathMapper):
358     def setup(self, referenced_files, basedir):
359         # type: (List[Any], unicode) -> None
360
361         # Go through each file and set the target to its own directory along
362         # with any secondary files.
363         self.visitlisting(referenced_files, self.stagedir, basedir)
364
365         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
366             if type in ("File", "Directory") and ab.startswith("keep:"):
367                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
368
369
370 class NoFollowPathMapper(StagingPathMapper):
371     _follow_dirs = False
372     def setup(self, referenced_files, basedir):
373         # type: (List[Any], unicode) -> None
374         self.visitlisting(referenced_files, self.stagedir, basedir)