18994: Fixing quoting WIP
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False,
56                  optional_deps=None):
57         self.arvrunner = arvrunner
58         self.input_basedir = input_basedir
59         self.collection_pattern = collection_pattern
60         self.file_pattern = file_pattern
61         self.name = name
62         self.referenced_files = [r["location"] for r in referenced_files]
63         self.single_collection = single_collection
64         self.pdh_to_uuid = {}
65         self.optional_deps = optional_deps or []
66         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
67
68     def visit(self, srcobj, uploadfiles):
69         src = srcobj["location"]
70         if "#" in src:
71             src = src[:src.index("#")]
72
73         debug = logger.isEnabledFor(logging.DEBUG)
74
75         if isinstance(src, basestring) and src.startswith("keep:"):
76             if collection_pdh_pattern.match(src):
77                 #self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], srcobj["class"], True)
78                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
79
80                 if arvados_cwl.util.collectionUUID in srcobj:
81                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
82             elif not collection_uuid_pattern.match(src):
83                 with SourceLine(srcobj, "location", WorkflowException, debug):
84                     raise WorkflowException("Invalid keep reference '%s'" % src)
85
86         if src not in self._pathmap:
87             if src.startswith("file:"):
88                 # Local FS ref, may need to be uploaded or may be on keep
89                 # mount.
90                 ab = abspath(src, self.input_basedir)
91                 st = arvados.commands.run.statfile("", ab,
92                                                    fnPattern="keep:%s/%s",
93                                                    dirPattern="keep:%s/%s",
94                                                    raiseOSError=True)
95                 with SourceLine(srcobj, "location", WorkflowException, debug):
96                     if isinstance(st, arvados.commands.run.UploadFile):
97                         print("VV", (src, ab, st))
98                         uploadfiles.add((src, ab, st))
99                     elif isinstance(st, arvados.commands.run.ArvFile):
100                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
101
102                         #self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File", True)
103                     else:
104                         raise WorkflowException("Input file path '%s' is invalid" % st)
105             elif src.startswith("_:"):
106                 if srcobj["class"] == "File" and "contents" not in srcobj:
107                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
108                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
109                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
110             elif src.startswith("http:") or src.startswith("https:"):
111                 try:
112                     keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
113                     logger.info("%s is %s", src, keepref)
114                     self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
115                 except Exception as e:
116                     logger.warning(str(e))
117             else:
118                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
119
120         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
121             for l in srcobj.get("secondaryFiles", []):
122                 self.visit(l, uploadfiles)
123         with SourceLine(srcobj, "listing", WorkflowException, debug):
124             for l in srcobj.get("listing", []):
125                 self.visit(l, uploadfiles)
126
127     def addentry(self, obj, c, path, remap):
128         print(obj["location"], self._pathmap)
129         if obj["location"] in self._pathmap:
130             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
131             if srcpath == "":
132                 srcpath = "."
133             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
134             remap.append((obj["location"], path + "/" + obj["basename"]))
135             for l in obj.get("secondaryFiles", []):
136                 self.addentry(l, c, path, remap)
137         elif obj["class"] == "Directory":
138             for l in obj.get("listing", []):
139                 self.addentry(l, c, path + "/" + obj["basename"], remap)
140             remap.append((obj["location"], path + "/" + obj["basename"]))
141         elif obj["location"].startswith("_:") and "contents" in obj:
142             with c.open(path + "/" + obj["basename"], "w") as f:
143                 f.write(obj["contents"])
144             remap.append((obj["location"], path + "/" + obj["basename"]))
145         else:
146             for opt in self.optional_deps:
147                 if obj["location"] == opt["location"]:
148                     return
149             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
150
151     def needs_new_collection(self, srcobj, prefix=""):
152         """Check if files need to be staged into a new collection.
153
154         If all the files are in the same collection and in the same
155         paths they would be staged to, return False.  Otherwise, a new
156         collection is needed with files copied/created in the
157         appropriate places.
158         """
159
160         loc = srcobj["location"]
161         if loc.startswith("_:"):
162             return True
163
164         if not prefix:
165             i = loc.rfind("/")
166             if i > -1:
167                 prefix = loc[:i+1]
168                 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
169             else:
170                 prefix = loc+"/"
171                 suffix = ""
172
173         print("LLL", prefix+suffix, prefix+urllib.parse.quote(srcobj["basename"], "/+@"))
174         if prefix+suffix != prefix+urllib.parse.quote(srcobj["basename"], "/+@"):
175             print("LLL -> needs new collection")
176             return True
177
178         if srcobj["class"] == "File" and loc not in self._pathmap:
179             return True
180         for s in srcobj.get("secondaryFiles", []):
181             if self.needs_new_collection(s, prefix):
182                 return True
183         if srcobj.get("listing"):
184             prefix = "%s%s/" % (prefix, srcobj["basename"])
185             for l in srcobj["listing"]:
186                 if self.needs_new_collection(l, prefix):
187                     return True
188         return False
189
190     def setup(self, referenced_files, basedir):
191         # type: (List[Any], unicode) -> None
192         uploadfiles = set()
193
194         collection = None
195         if self.single_collection:
196             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
197                                                        keep_client=self.arvrunner.keep_client,
198                                                        num_retries=self.arvrunner.num_retries)
199
200         for srcobj in referenced_files:
201             self.visit(srcobj, uploadfiles)
202
203         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
204                                          self.arvrunner.api,
205                                          dry_run=False,
206                                          num_retries=self.arvrunner.num_retries,
207                                          fnPattern="keep:%s/%s",
208                                          name=self.name,
209                                          project=self.arvrunner.project_uuid,
210                                          collection=collection,
211                                          packed=False)
212
213         for src, ab, st in uploadfiles:
214             print("BBBBB", src, ab, st.fn, urllib.parse.quote(st.fn, "/:+@"))
215             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
216                                            "Directory" if os.path.isdir(ab) else "File", True)
217
218         print("CCCCC", self._pathmap)
219
220         for srcobj in referenced_files:
221             remap = []
222             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
223                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
224                                                   keep_client=self.arvrunner.keep_client,
225                                                   num_retries=self.arvrunner.num_retries)
226                 for l in srcobj.get("listing", []):
227                     self.addentry(l, c, ".", remap)
228
229                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
230                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
231
232                 c.save_new(name=info["name"],
233                            owner_uuid=self.arvrunner.project_uuid,
234                            ensure_unique_name=True,
235                            trash_at=info["trash_at"],
236                            properties=info["properties"])
237
238                 ab = self.collection_pattern % c.portable_data_hash()
239                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
240             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
241                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
242                                                   keep_client=self.arvrunner.keep_client,
243                                                   num_retries=self.arvrunner.num_retries)
244                 self.addentry(srcobj, c, ".", remap)
245
246                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
247                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
248
249                 c.save_new(name=info["name"],
250                            owner_uuid=self.arvrunner.project_uuid,
251                            ensure_unique_name=True,
252                            trash_at=info["trash_at"],
253                            properties=info["properties"])
254
255                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
256                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
257                                                               ab, "File", True)
258                 if srcobj.get("secondaryFiles"):
259                     ab = self.collection_pattern % c.portable_data_hash()
260                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
261
262             if remap:
263                 for loc, sub in remap:
264                     # subdirs start with "./", strip it off
265                     if sub.startswith("./"):
266                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
267                     else:
268                         ab = self.file_pattern % (c.portable_data_hash(), sub)
269                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
270                                                    ab, "Directory", True)
271
272         self.keepdir = None
273
274     def reversemap(self, target):
275         p = super(ArvPathMapper, self).reversemap(target)
276         if p:
277             return p
278         elif target.startswith("keep:"):
279             return (target, target)
280         elif self.keepdir and target.startswith(self.keepdir):
281             kp = "keep:" + target[len(self.keepdir)+1:]
282             return (kp, kp)
283         else:
284             return None
285
286
287 class StagingPathMapper(PathMapper):
288     # Note that StagingPathMapper internally maps files from target to source.
289     # Specifically, the 'self._pathmap' dict keys are the target location and the
290     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
291     # as the file identifier. This makes it possible to map an input file to multiple
292     # target directories. The exception is for file literals, which store the contents of
293     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
294
295     _follow_dirs = True
296
297     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
298         self.targets = set()
299         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
300
301     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
302         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
303         loc = obj["location"]
304         stagedir = obj.get("dirname") or stagedir
305         tgt = os.path.join(stagedir, obj["basename"])
306         basetgt, baseext = os.path.splitext(tgt)
307
308         def targetExists():
309             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
310         def literalTargetExists():
311             return tgt in self.targets and "contents" in obj
312
313         n = 1
314         if targetExists() or literalTargetExists():
315             while tgt in self.targets:
316                 n += 1
317                 tgt = "%s_%i%s" % (basetgt, n, baseext)
318         self.targets.add(tgt)
319         if obj["class"] == "Directory":
320             if obj.get("writable"):
321                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
322             else:
323                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
324             if loc.startswith("_:") or self._follow_dirs:
325                 self.visitlisting(obj.get("listing", []), tgt, basedir)
326         elif obj["class"] == "File":
327             if tgt in self._pathmap:
328                 return
329             if "contents" in obj and loc.startswith("_:"):
330                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
331             else:
332                 if copy or obj.get("writable"):
333                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
334                 else:
335                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
336                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
337
338     def mapper(self, src):  # type: (Text) -> MapperEnt.
339         # Overridden to maintain the use case of mapping by source (identifier) to
340         # target regardless of how the map is structured interally.
341         def getMapperEnt(src):
342             for k,v in viewitems(self._pathmap):
343                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
344                     return v
345
346         if u"#" in src:
347             i = src.index(u"#")
348             v = getMapperEnt(src[i:])
349             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
350         return getMapperEnt(src)
351
352
353 class VwdPathMapper(StagingPathMapper):
354     def setup(self, referenced_files, basedir):
355         # type: (List[Any], unicode) -> None
356
357         # Go through each file and set the target to its own directory along
358         # with any secondary files.
359         self.visitlisting(referenced_files, self.stagedir, basedir)
360
361         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
362             if type in ("File", "Directory") and ab.startswith("keep:"):
363                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
364
365
366 class NoFollowPathMapper(StagingPathMapper):
367     _follow_dirs = False
368     def setup(self, referenced_files, basedir):
369         # type: (List[Any], unicode) -> None
370         self.visitlisting(referenced_files, self.stagedir, basedir)