Merge branch '14870-retry-logs' refs #14870
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
25 from cwltool.workflow import WorkflowException
26
27 from .http import http_to_keep
28
29 logger = logging.getLogger('arvados.cwl-runner')
30
31 def trim_listing(obj):
32     """Remove 'listing' field from Directory objects that are keep references.
33
34     When Directory objects represent Keep references, it is redundant and
35     potentially very expensive to pass fully enumerated Directory objects
36     between instances of cwl-runner (e.g. a submitting a job, or using the
37     RunInSingleContainer feature), so delete the 'listing' field when it is
38     safe to do so.
39
40     """
41
42     if obj.get("location", "").startswith("keep:") and "listing" in obj:
43         del obj["listing"]
44
45
46 class ArvPathMapper(PathMapper):
47     """Convert container-local paths to and from Keep collection ids."""
48
49     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
50     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
51
52     def __init__(self, arvrunner, referenced_files, input_basedir,
53                  collection_pattern, file_pattern, name=None, single_collection=False):
54         self.arvrunner = arvrunner
55         self.input_basedir = input_basedir
56         self.collection_pattern = collection_pattern
57         self.file_pattern = file_pattern
58         self.name = name
59         self.referenced_files = [r["location"] for r in referenced_files]
60         self.single_collection = single_collection
61         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
62
63     def visit(self, srcobj, uploadfiles):
64         src = srcobj["location"]
65         if "#" in src:
66             src = src[:src.index("#")]
67
68         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
69             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
70
71         debug = logger.isEnabledFor(logging.DEBUG)
72
73         if src not in self._pathmap:
74             if src.startswith("file:"):
75                 # Local FS ref, may need to be uploaded or may be on keep
76                 # mount.
77                 ab = abspath(src, self.input_basedir)
78                 st = arvados.commands.run.statfile("", ab,
79                                                    fnPattern="keep:%s/%s",
80                                                    dirPattern="keep:%s/%s",
81                                                    raiseOSError=True)
82                 with SourceLine(srcobj, "location", WorkflowException, debug):
83                     if isinstance(st, arvados.commands.run.UploadFile):
84                         uploadfiles.add((src, ab, st))
85                     elif isinstance(st, arvados.commands.run.ArvFile):
86                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
87                     else:
88                         raise WorkflowException("Input file path '%s' is invalid" % st)
89             elif src.startswith("_:"):
90                 if srcobj["class"] == "File" and "contents" not in srcobj:
91                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
92                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
93                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
94             elif src.startswith("http:") or src.startswith("https:"):
95                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
96                 logger.info("%s is %s", src, keepref)
97                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
98             else:
99                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
100
101         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
102             for l in srcobj.get("secondaryFiles", []):
103                 self.visit(l, uploadfiles)
104         with SourceLine(srcobj, "listing", WorkflowException, debug):
105             for l in srcobj.get("listing", []):
106                 self.visit(l, uploadfiles)
107
108     def addentry(self, obj, c, path, remap):
109         if obj["location"] in self._pathmap:
110             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
111             if srcpath == "":
112                 srcpath = "."
113             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
114             remap.append((obj["location"], path + "/" + obj["basename"]))
115             for l in obj.get("secondaryFiles", []):
116                 self.addentry(l, c, path, remap)
117         elif obj["class"] == "Directory":
118             for l in obj.get("listing", []):
119                 self.addentry(l, c, path + "/" + obj["basename"], remap)
120             remap.append((obj["location"], path + "/" + obj["basename"]))
121         elif obj["location"].startswith("_:") and "contents" in obj:
122             with c.open(path + "/" + obj["basename"], "w") as f:
123                 f.write(obj["contents"])
124             remap.append((obj["location"], path + "/" + obj["basename"]))
125         else:
126             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
127
128     def needs_new_collection(self, srcobj, prefix=""):
129         """Check if files need to be staged into a new collection.
130
131         If all the files are in the same collection and in the same
132         paths they would be staged to, return False.  Otherwise, a new
133         collection is needed with files copied/created in the
134         appropriate places.
135         """
136
137         loc = srcobj["location"]
138         if loc.startswith("_:"):
139             return True
140         if prefix:
141             if loc != prefix+srcobj["basename"]:
142                 return True
143         else:
144             i = loc.rfind("/")
145             if i > -1:
146                 prefix = loc[:i+1]
147             else:
148                 prefix = loc+"/"
149         if srcobj["class"] == "File" and loc not in self._pathmap:
150             return True
151         for s in srcobj.get("secondaryFiles", []):
152             if self.needs_new_collection(s, prefix):
153                 return True
154         if srcobj.get("listing"):
155             prefix = "%s%s/" % (prefix, srcobj["basename"])
156             for l in srcobj["listing"]:
157                 if self.needs_new_collection(l, prefix):
158                     return True
159         return False
160
161     def setup(self, referenced_files, basedir):
162         # type: (List[Any], unicode) -> None
163         uploadfiles = set()
164
165         collection = None
166         if self.single_collection:
167             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
168                                                        keep_client=self.arvrunner.keep_client,
169                                                        num_retries=self.arvrunner.num_retries)
170
171         for srcobj in referenced_files:
172             self.visit(srcobj, uploadfiles)
173
174         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
175                                          self.arvrunner.api,
176                                          dry_run=False,
177                                          num_retries=self.arvrunner.num_retries,
178                                          fnPattern="keep:%s/%s",
179                                          name=self.name,
180                                          project=self.arvrunner.project_uuid,
181                                          collection=collection,
182                                          packed=False)
183
184         for src, ab, st in uploadfiles:
185             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
186                                            "Directory" if os.path.isdir(ab) else "File", True)
187
188         for srcobj in referenced_files:
189             remap = []
190             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
191                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
192                                                   keep_client=self.arvrunner.keep_client,
193                                                   num_retries=self.arvrunner.num_retries)
194                 for l in srcobj.get("listing", []):
195                     self.addentry(l, c, ".", remap)
196
197                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
198                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
199
200                 c.save_new(name=info["name"],
201                            owner_uuid=self.arvrunner.project_uuid,
202                            ensure_unique_name=True,
203                            trash_at=info["trash_at"],
204                            properties=info["properties"])
205
206                 ab = self.collection_pattern % c.portable_data_hash()
207                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
208             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
209                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
210
211                 # If all secondary files/directories are located in
212                 # the same collection as the primary file and the
213                 # paths and names that are consistent with staging,
214                 # don't create a new collection.
215                 if not self.needs_new_collection(srcobj):
216                     continue
217
218                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
219                                                   keep_client=self.arvrunner.keep_client,
220                                                   num_retries=self.arvrunner.num_retries                                                  )
221                 self.addentry(srcobj, c, ".", remap)
222
223                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
224                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
225
226                 c.save_new(name=info["name"],
227                            owner_uuid=self.arvrunner.project_uuid,
228                            ensure_unique_name=True,
229                            trash_at=info["trash_at"],
230                            properties=info["properties"])
231
232                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
233                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
234                                                               ab, "File", True)
235                 if srcobj.get("secondaryFiles"):
236                     ab = self.collection_pattern % c.portable_data_hash()
237                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
238
239             if remap:
240                 for loc, sub in remap:
241                     # subdirs start with "./", strip it off
242                     if sub.startswith("./"):
243                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
244                     else:
245                         ab = self.file_pattern % (c.portable_data_hash(), sub)
246                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
247                                                    ab, "Directory", True)
248
249         self.keepdir = None
250
251     def reversemap(self, target):
252         p = super(ArvPathMapper, self).reversemap(target)
253         if p:
254             return p
255         elif target.startswith("keep:"):
256             return (target, target)
257         elif self.keepdir and target.startswith(self.keepdir):
258             kp = "keep:" + target[len(self.keepdir)+1:]
259             return (kp, kp)
260         else:
261             return None
262
263
264 class StagingPathMapper(PathMapper):
265     _follow_dirs = True
266
267     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
268         self.targets = set()
269         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
270
271     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
272         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
273         loc = obj["location"]
274         tgt = os.path.join(stagedir, obj["basename"])
275         basetgt, baseext = os.path.splitext(tgt)
276         n = 1
277         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
278             while tgt in self.targets:
279                 n += 1
280                 tgt = "%s_%i%s" % (basetgt, n, baseext)
281         self.targets.add(tgt)
282         if obj["class"] == "Directory":
283             if obj.get("writable"):
284                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
285             else:
286                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
287             if loc.startswith("_:") or self._follow_dirs:
288                 self.visitlisting(obj.get("listing", []), tgt, basedir)
289         elif obj["class"] == "File":
290             if loc in self._pathmap:
291                 return
292             if "contents" in obj and loc.startswith("_:"):
293                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
294             else:
295                 if copy or obj.get("writable"):
296                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
297                 else:
298                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
299                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
300
301
302 class VwdPathMapper(StagingPathMapper):
303     def setup(self, referenced_files, basedir):
304         # type: (List[Any], unicode) -> None
305
306         # Go through each file and set the target to its own directory along
307         # with any secondary files.
308         self.visitlisting(referenced_files, self.stagedir, basedir)
309
310         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
311             if type in ("File", "Directory") and ab.startswith("keep:"):
312                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
313
314
315 class NoFollowPathMapper(StagingPathMapper):
316     _follow_dirs = False
317     def setup(self, referenced_files, basedir):
318         # type: (List[Any], unicode) -> None
319         self.visitlisting(referenced_files, self.stagedir, basedir)