Merge branch '14723-cwl-multiple-file-targets'
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
25 from cwltool.workflow import WorkflowException
26
27 from .http import http_to_keep
28
29 logger = logging.getLogger('arvados.cwl-runner')
30
31 def trim_listing(obj):
32     """Remove 'listing' field from Directory objects that are keep references.
33
34     When Directory objects represent Keep references, it is redundant and
35     potentially very expensive to pass fully enumerated Directory objects
36     between instances of cwl-runner (e.g. a submitting a job, or using the
37     RunInSingleContainer feature), so delete the 'listing' field when it is
38     safe to do so.
39
40     """
41
42     if obj.get("location", "").startswith("keep:") and "listing" in obj:
43         del obj["listing"]
44
45
46 class ArvPathMapper(PathMapper):
47     """Convert container-local paths to and from Keep collection ids."""
48
49     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
50     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
51
52     def __init__(self, arvrunner, referenced_files, input_basedir,
53                  collection_pattern, file_pattern, name=None, single_collection=False):
54         self.arvrunner = arvrunner
55         self.input_basedir = input_basedir
56         self.collection_pattern = collection_pattern
57         self.file_pattern = file_pattern
58         self.name = name
59         self.referenced_files = [r["location"] for r in referenced_files]
60         self.single_collection = single_collection
61         self.pdh_to_uuid = {}
62         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
63
64     def visit(self, srcobj, uploadfiles):
65         src = srcobj["location"]
66         if "#" in src:
67             src = src[:src.index("#")]
68
69         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
70             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
71             if arvados_cwl.util.collectionUUID in srcobj:
72                 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
73
74         debug = logger.isEnabledFor(logging.DEBUG)
75
76         if src not in self._pathmap:
77             if src.startswith("file:"):
78                 # Local FS ref, may need to be uploaded or may be on keep
79                 # mount.
80                 ab = abspath(src, self.input_basedir)
81                 st = arvados.commands.run.statfile("", ab,
82                                                    fnPattern="keep:%s/%s",
83                                                    dirPattern="keep:%s/%s",
84                                                    raiseOSError=True)
85                 with SourceLine(srcobj, "location", WorkflowException, debug):
86                     if isinstance(st, arvados.commands.run.UploadFile):
87                         uploadfiles.add((src, ab, st))
88                     elif isinstance(st, arvados.commands.run.ArvFile):
89                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
90                     else:
91                         raise WorkflowException("Input file path '%s' is invalid" % st)
92             elif src.startswith("_:"):
93                 if srcobj["class"] == "File" and "contents" not in srcobj:
94                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
95                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
96                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
97             elif src.startswith("http:") or src.startswith("https:"):
98                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
99                 logger.info("%s is %s", src, keepref)
100                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
101             else:
102                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
103
104         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
105             for l in srcobj.get("secondaryFiles", []):
106                 self.visit(l, uploadfiles)
107         with SourceLine(srcobj, "listing", WorkflowException, debug):
108             for l in srcobj.get("listing", []):
109                 self.visit(l, uploadfiles)
110
111     def addentry(self, obj, c, path, remap):
112         if obj["location"] in self._pathmap:
113             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
114             if srcpath == "":
115                 srcpath = "."
116             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
117             remap.append((obj["location"], path + "/" + obj["basename"]))
118             for l in obj.get("secondaryFiles", []):
119                 self.addentry(l, c, path, remap)
120         elif obj["class"] == "Directory":
121             for l in obj.get("listing", []):
122                 self.addentry(l, c, path + "/" + obj["basename"], remap)
123             remap.append((obj["location"], path + "/" + obj["basename"]))
124         elif obj["location"].startswith("_:") and "contents" in obj:
125             with c.open(path + "/" + obj["basename"], "w") as f:
126                 f.write(obj["contents"])
127             remap.append((obj["location"], path + "/" + obj["basename"]))
128         else:
129             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
130
131     def needs_new_collection(self, srcobj, prefix=""):
132         """Check if files need to be staged into a new collection.
133
134         If all the files are in the same collection and in the same
135         paths they would be staged to, return False.  Otherwise, a new
136         collection is needed with files copied/created in the
137         appropriate places.
138         """
139
140         loc = srcobj["location"]
141         if loc.startswith("_:"):
142             return True
143         if prefix:
144             if loc != prefix+srcobj["basename"]:
145                 return True
146         else:
147             i = loc.rfind("/")
148             if i > -1:
149                 prefix = loc[:i+1]
150             else:
151                 prefix = loc+"/"
152         if srcobj["class"] == "File" and loc not in self._pathmap:
153             return True
154         for s in srcobj.get("secondaryFiles", []):
155             if self.needs_new_collection(s, prefix):
156                 return True
157         if srcobj.get("listing"):
158             prefix = "%s%s/" % (prefix, srcobj["basename"])
159             for l in srcobj["listing"]:
160                 if self.needs_new_collection(l, prefix):
161                     return True
162         return False
163
164     def setup(self, referenced_files, basedir):
165         # type: (List[Any], unicode) -> None
166         uploadfiles = set()
167
168         collection = None
169         if self.single_collection:
170             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
171                                                        keep_client=self.arvrunner.keep_client,
172                                                        num_retries=self.arvrunner.num_retries)
173
174         for srcobj in referenced_files:
175             self.visit(srcobj, uploadfiles)
176
177         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
178                                          self.arvrunner.api,
179                                          dry_run=False,
180                                          num_retries=self.arvrunner.num_retries,
181                                          fnPattern="keep:%s/%s",
182                                          name=self.name,
183                                          project=self.arvrunner.project_uuid,
184                                          collection=collection,
185                                          packed=False)
186
187         for src, ab, st in uploadfiles:
188             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
189                                            "Directory" if os.path.isdir(ab) else "File", True)
190
191         for srcobj in referenced_files:
192             remap = []
193             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
194                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
195                                                   keep_client=self.arvrunner.keep_client,
196                                                   num_retries=self.arvrunner.num_retries)
197                 for l in srcobj.get("listing", []):
198                     self.addentry(l, c, ".", remap)
199
200                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
201                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
202
203                 c.save_new(name=info["name"],
204                            owner_uuid=self.arvrunner.project_uuid,
205                            ensure_unique_name=True,
206                            trash_at=info["trash_at"],
207                            properties=info["properties"])
208
209                 ab = self.collection_pattern % c.portable_data_hash()
210                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
211             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
212                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
213
214                 # If all secondary files/directories are located in
215                 # the same collection as the primary file and the
216                 # paths and names that are consistent with staging,
217                 # don't create a new collection.
218                 if not self.needs_new_collection(srcobj):
219                     continue
220
221                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
222                                                   keep_client=self.arvrunner.keep_client,
223                                                   num_retries=self.arvrunner.num_retries                                                  )
224                 self.addentry(srcobj, c, ".", remap)
225
226                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
227                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
228
229                 c.save_new(name=info["name"],
230                            owner_uuid=self.arvrunner.project_uuid,
231                            ensure_unique_name=True,
232                            trash_at=info["trash_at"],
233                            properties=info["properties"])
234
235                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
236                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
237                                                               ab, "File", True)
238                 if srcobj.get("secondaryFiles"):
239                     ab = self.collection_pattern % c.portable_data_hash()
240                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
241
242             if remap:
243                 for loc, sub in remap:
244                     # subdirs start with "./", strip it off
245                     if sub.startswith("./"):
246                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
247                     else:
248                         ab = self.file_pattern % (c.portable_data_hash(), sub)
249                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
250                                                    ab, "Directory", True)
251
252         self.keepdir = None
253
254     def reversemap(self, target):
255         p = super(ArvPathMapper, self).reversemap(target)
256         if p:
257             return p
258         elif target.startswith("keep:"):
259             return (target, target)
260         elif self.keepdir and target.startswith(self.keepdir):
261             kp = "keep:" + target[len(self.keepdir)+1:]
262             return (kp, kp)
263         else:
264             return None
265
266
267 class StagingPathMapper(PathMapper):
268     # Note that StagingPathMapper internally maps files from target to source.
269     # Specifically, the 'self._pathmap' dict keys are the target location and the
270     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
271     # as the file identifier. This makes it possible to map an input file to multiple
272     # target directories. The exception is for file literals, which store the contents of
273     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
274
275     _follow_dirs = True
276
277     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
278         self.targets = set()
279         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
280
281     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
282         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
283         loc = obj["location"]
284         tgt = os.path.join(stagedir, obj["basename"])
285         basetgt, baseext = os.path.splitext(tgt)
286
287         def targetExists():
288             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
289         def literalTargetExists():
290             return tgt in self.targets and "contents" in obj
291
292         n = 1
293         if targetExists() or literalTargetExists():
294             while tgt in self.targets:
295                 n += 1
296                 tgt = "%s_%i%s" % (basetgt, n, baseext)
297         self.targets.add(tgt)
298         if obj["class"] == "Directory":
299             if obj.get("writable"):
300                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
301             else:
302                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
303             if loc.startswith("_:") or self._follow_dirs:
304                 self.visitlisting(obj.get("listing", []), tgt, basedir)
305         elif obj["class"] == "File":
306             if tgt in self._pathmap:
307                 return
308             if "contents" in obj and loc.startswith("_:"):
309                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
310             else:
311                 if copy or obj.get("writable"):
312                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
313                 else:
314                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
315                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
316
317     def mapper(self, src):  # type: (Text) -> MapperEnt.
318         # Overridden to maintain the use case of mapping by source (identifier) to
319         # target regardless of how the map is structured interally.
320         def getMapperEnt(src):
321             for k,v in viewitems(self._pathmap):
322                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
323                     return v
324
325         if u"#" in src:
326             i = src.index(u"#")
327             v = getMapperEnt(src[i:])
328             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
329         return getMapperEnt(src)
330
331
332 class VwdPathMapper(StagingPathMapper):
333     def setup(self, referenced_files, basedir):
334         # type: (List[Any], unicode) -> None
335
336         # Go through each file and set the target to its own directory along
337         # with any secondary files.
338         self.visitlisting(referenced_files, self.stagedir, basedir)
339
340         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
341             if type in ("File", "Directory") and ab.startswith("keep:"):
342                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
343
344
345 class NoFollowPathMapper(StagingPathMapper):
346     _follow_dirs = False
347     def setup(self, referenced_files, basedir):
348         # type: (List[Any], unicode) -> None
349         self.visitlisting(referenced_files, self.stagedir, basedir)