Merge branch '15296-cwl-cancel-procs' closes #15296
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
25 from cwltool.workflow import WorkflowException
26
27 from .http import http_to_keep
28
29 logger = logging.getLogger('arvados.cwl-runner')
30
31 def trim_listing(obj):
32     """Remove 'listing' field from Directory objects that are keep references.
33
34     When Directory objects represent Keep references, it is redundant and
35     potentially very expensive to pass fully enumerated Directory objects
36     between instances of cwl-runner (e.g. a submitting a job, or using the
37     RunInSingleContainer feature), so delete the 'listing' field when it is
38     safe to do so.
39
40     """
41
42     if obj.get("location", "").startswith("keep:") and "listing" in obj:
43         del obj["listing"]
44
45 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
46 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
47 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
48
49 class ArvPathMapper(PathMapper):
50     """Convert container-local paths to and from Keep collection ids."""
51
52     def __init__(self, arvrunner, referenced_files, input_basedir,
53                  collection_pattern, file_pattern, name=None, single_collection=False):
54         self.arvrunner = arvrunner
55         self.input_basedir = input_basedir
56         self.collection_pattern = collection_pattern
57         self.file_pattern = file_pattern
58         self.name = name
59         self.referenced_files = [r["location"] for r in referenced_files]
60         self.single_collection = single_collection
61         self.pdh_to_uuid = {}
62         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
63
64     def visit(self, srcobj, uploadfiles):
65         src = srcobj["location"]
66         if "#" in src:
67             src = src[:src.index("#")]
68
69         debug = logger.isEnabledFor(logging.DEBUG)
70
71         if isinstance(src, basestring) and src.startswith("keep:"):
72             if collection_pdh_pattern.match(src):
73                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
74                 if arvados_cwl.util.collectionUUID in srcobj:
75                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
76             elif not collection_uuid_pattern.match(src):
77                 with SourceLine(srcobj, "location", WorkflowException, debug):
78                     raise WorkflowException("Invalid keep reference '%s'" % src)
79
80         if src not in self._pathmap:
81             if src.startswith("file:"):
82                 # Local FS ref, may need to be uploaded or may be on keep
83                 # mount.
84                 ab = abspath(src, self.input_basedir)
85                 st = arvados.commands.run.statfile("", ab,
86                                                    fnPattern="keep:%s/%s",
87                                                    dirPattern="keep:%s/%s",
88                                                    raiseOSError=True)
89                 with SourceLine(srcobj, "location", WorkflowException, debug):
90                     if isinstance(st, arvados.commands.run.UploadFile):
91                         uploadfiles.add((src, ab, st))
92                     elif isinstance(st, arvados.commands.run.ArvFile):
93                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
94                     else:
95                         raise WorkflowException("Input file path '%s' is invalid" % st)
96             elif src.startswith("_:"):
97                 if srcobj["class"] == "File" and "contents" not in srcobj:
98                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
99                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
100                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
101             elif src.startswith("http:") or src.startswith("https:"):
102                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
103                 logger.info("%s is %s", src, keepref)
104                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
105             else:
106                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
107
108         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
109             for l in srcobj.get("secondaryFiles", []):
110                 self.visit(l, uploadfiles)
111         with SourceLine(srcobj, "listing", WorkflowException, debug):
112             for l in srcobj.get("listing", []):
113                 self.visit(l, uploadfiles)
114
115     def addentry(self, obj, c, path, remap):
116         if obj["location"] in self._pathmap:
117             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
118             if srcpath == "":
119                 srcpath = "."
120             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
121             remap.append((obj["location"], path + "/" + obj["basename"]))
122             for l in obj.get("secondaryFiles", []):
123                 self.addentry(l, c, path, remap)
124         elif obj["class"] == "Directory":
125             for l in obj.get("listing", []):
126                 self.addentry(l, c, path + "/" + obj["basename"], remap)
127             remap.append((obj["location"], path + "/" + obj["basename"]))
128         elif obj["location"].startswith("_:") and "contents" in obj:
129             with c.open(path + "/" + obj["basename"], "w") as f:
130                 f.write(obj["contents"])
131             remap.append((obj["location"], path + "/" + obj["basename"]))
132         else:
133             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
134
135     def needs_new_collection(self, srcobj, prefix=""):
136         """Check if files need to be staged into a new collection.
137
138         If all the files are in the same collection and in the same
139         paths they would be staged to, return False.  Otherwise, a new
140         collection is needed with files copied/created in the
141         appropriate places.
142         """
143
144         loc = srcobj["location"]
145         if loc.startswith("_:"):
146             return True
147         if prefix:
148             if loc != prefix+srcobj["basename"]:
149                 return True
150         else:
151             i = loc.rfind("/")
152             if i > -1:
153                 prefix = loc[:i+1]
154             else:
155                 prefix = loc+"/"
156         if srcobj["class"] == "File" and loc not in self._pathmap:
157             return True
158         for s in srcobj.get("secondaryFiles", []):
159             if self.needs_new_collection(s, prefix):
160                 return True
161         if srcobj.get("listing"):
162             prefix = "%s%s/" % (prefix, srcobj["basename"])
163             for l in srcobj["listing"]:
164                 if self.needs_new_collection(l, prefix):
165                     return True
166         return False
167
168     def setup(self, referenced_files, basedir):
169         # type: (List[Any], unicode) -> None
170         uploadfiles = set()
171
172         collection = None
173         if self.single_collection:
174             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
175                                                        keep_client=self.arvrunner.keep_client,
176                                                        num_retries=self.arvrunner.num_retries)
177
178         for srcobj in referenced_files:
179             self.visit(srcobj, uploadfiles)
180
181         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
182                                          self.arvrunner.api,
183                                          dry_run=False,
184                                          num_retries=self.arvrunner.num_retries,
185                                          fnPattern="keep:%s/%s",
186                                          name=self.name,
187                                          project=self.arvrunner.project_uuid,
188                                          collection=collection,
189                                          packed=False)
190
191         for src, ab, st in uploadfiles:
192             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
193                                            "Directory" if os.path.isdir(ab) else "File", True)
194
195         for srcobj in referenced_files:
196             remap = []
197             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
198                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
199                                                   keep_client=self.arvrunner.keep_client,
200                                                   num_retries=self.arvrunner.num_retries)
201                 for l in srcobj.get("listing", []):
202                     self.addentry(l, c, ".", remap)
203
204                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
205                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
206
207                 c.save_new(name=info["name"],
208                            owner_uuid=self.arvrunner.project_uuid,
209                            ensure_unique_name=True,
210                            trash_at=info["trash_at"],
211                            properties=info["properties"])
212
213                 ab = self.collection_pattern % c.portable_data_hash()
214                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
215             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
216                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
217
218                 # If all secondary files/directories are located in
219                 # the same collection as the primary file and the
220                 # paths and names that are consistent with staging,
221                 # don't create a new collection.
222                 if not self.needs_new_collection(srcobj):
223                     continue
224
225                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
226                                                   keep_client=self.arvrunner.keep_client,
227                                                   num_retries=self.arvrunner.num_retries                                                  )
228                 self.addentry(srcobj, c, ".", remap)
229
230                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
231                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
232
233                 c.save_new(name=info["name"],
234                            owner_uuid=self.arvrunner.project_uuid,
235                            ensure_unique_name=True,
236                            trash_at=info["trash_at"],
237                            properties=info["properties"])
238
239                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
240                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
241                                                               ab, "File", True)
242                 if srcobj.get("secondaryFiles"):
243                     ab = self.collection_pattern % c.portable_data_hash()
244                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
245
246             if remap:
247                 for loc, sub in remap:
248                     # subdirs start with "./", strip it off
249                     if sub.startswith("./"):
250                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
251                     else:
252                         ab = self.file_pattern % (c.portable_data_hash(), sub)
253                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
254                                                    ab, "Directory", True)
255
256         self.keepdir = None
257
258     def reversemap(self, target):
259         p = super(ArvPathMapper, self).reversemap(target)
260         if p:
261             return p
262         elif target.startswith("keep:"):
263             return (target, target)
264         elif self.keepdir and target.startswith(self.keepdir):
265             kp = "keep:" + target[len(self.keepdir)+1:]
266             return (kp, kp)
267         else:
268             return None
269
270
271 class StagingPathMapper(PathMapper):
272     # Note that StagingPathMapper internally maps files from target to source.
273     # Specifically, the 'self._pathmap' dict keys are the target location and the
274     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
275     # as the file identifier. This makes it possible to map an input file to multiple
276     # target directories. The exception is for file literals, which store the contents of
277     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
278
279     _follow_dirs = True
280
281     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
282         self.targets = set()
283         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
284
285     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
286         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
287         loc = obj["location"]
288         tgt = os.path.join(stagedir, obj["basename"])
289         basetgt, baseext = os.path.splitext(tgt)
290
291         def targetExists():
292             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
293         def literalTargetExists():
294             return tgt in self.targets and "contents" in obj
295
296         n = 1
297         if targetExists() or literalTargetExists():
298             while tgt in self.targets:
299                 n += 1
300                 tgt = "%s_%i%s" % (basetgt, n, baseext)
301         self.targets.add(tgt)
302         if obj["class"] == "Directory":
303             if obj.get("writable"):
304                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
305             else:
306                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
307             if loc.startswith("_:") or self._follow_dirs:
308                 self.visitlisting(obj.get("listing", []), tgt, basedir)
309         elif obj["class"] == "File":
310             if tgt in self._pathmap:
311                 return
312             if "contents" in obj and loc.startswith("_:"):
313                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
314             else:
315                 if copy or obj.get("writable"):
316                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
317                 else:
318                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
319                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
320
321     def mapper(self, src):  # type: (Text) -> MapperEnt.
322         # Overridden to maintain the use case of mapping by source (identifier) to
323         # target regardless of how the map is structured interally.
324         def getMapperEnt(src):
325             for k,v in viewitems(self._pathmap):
326                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
327                     return v
328
329         if u"#" in src:
330             i = src.index(u"#")
331             v = getMapperEnt(src[i:])
332             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
333         return getMapperEnt(src)
334
335
336 class VwdPathMapper(StagingPathMapper):
337     def setup(self, referenced_files, basedir):
338         # type: (List[Any], unicode) -> None
339
340         # Go through each file and set the target to its own directory along
341         # with any secondary files.
342         self.visitlisting(referenced_files, self.stagedir, basedir)
343
344         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
345             if type in ("File", "Directory") and ab.startswith("keep:"):
346                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
347
348
349 class NoFollowPathMapper(StagingPathMapper):
350     _follow_dirs = False
351     def setup(self, referenced_files, basedir):
352         # type: (List[Any], unicode) -> None
353         self.visitlisting(referenced_files, self.stagedir, basedir)