14327: Add comments
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados_cwl.util
12 import arvados.commands.run
13 import arvados.collection
14
15 from schema_salad.sourceline import SourceLine
16
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
20
21 from .http import http_to_keep
22
23 logger = logging.getLogger('arvados.cwl-runner')
24
25 def trim_listing(obj):
26     """Remove 'listing' field from Directory objects that are keep references.
27
28     When Directory objects represent Keep references, it is redundant and
29     potentially very expensive to pass fully enumerated Directory objects
30     between instances of cwl-runner (e.g. a submitting a job, or using the
31     RunInSingleContainer feature), so delete the 'listing' field when it is
32     safe to do so.
33
34     """
35
36     if obj.get("location", "").startswith("keep:") and "listing" in obj:
37         del obj["listing"]
38
39
40 class ArvPathMapper(PathMapper):
41     """Convert container-local paths to and from Keep collection ids."""
42
43     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
45
46     def __init__(self, arvrunner, referenced_files, input_basedir,
47                  collection_pattern, file_pattern, name=None, single_collection=False):
48         self.arvrunner = arvrunner
49         self.input_basedir = input_basedir
50         self.collection_pattern = collection_pattern
51         self.file_pattern = file_pattern
52         self.name = name
53         self.referenced_files = [r["location"] for r in referenced_files]
54         self.single_collection = single_collection
55         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
56
57     def visit(self, srcobj, uploadfiles):
58         src = srcobj["location"]
59         if "#" in src:
60             src = src[:src.index("#")]
61
62         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
64
65         debug = logger.isEnabledFor(logging.DEBUG)
66
67         if src not in self._pathmap:
68             if src.startswith("file:"):
69                 # Local FS ref, may need to be uploaded or may be on keep
70                 # mount.
71                 ab = abspath(src, self.input_basedir)
72                 st = arvados.commands.run.statfile("", ab,
73                                                    fnPattern="keep:%s/%s",
74                                                    dirPattern="keep:%s/%s",
75                                                    raiseOSError=True)
76                 with SourceLine(srcobj, "location", WorkflowException, debug):
77                     if isinstance(st, arvados.commands.run.UploadFile):
78                         uploadfiles.add((src, ab, st))
79                     elif isinstance(st, arvados.commands.run.ArvFile):
80                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
81                     else:
82                         raise WorkflowException("Input file path '%s' is invalid" % st)
83             elif src.startswith("_:"):
84                 if srcobj["class"] == "File" and "contents" not in srcobj:
85                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
86                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88             elif src.startswith("http:") or src.startswith("https:"):
89                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90                 logger.info("%s is %s", src, keepref)
91                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
92             else:
93                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
94
95         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96             for l in srcobj.get("secondaryFiles", []):
97                 self.visit(l, uploadfiles)
98         with SourceLine(srcobj, "listing", WorkflowException, debug):
99             for l in srcobj.get("listing", []):
100                 self.visit(l, uploadfiles)
101
102     def addentry(self, obj, c, path, remap):
103         if obj["location"] in self._pathmap:
104             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
105             if srcpath == "":
106                 srcpath = "."
107             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108             remap.append((obj["location"], path + "/" + obj["basename"]))
109             for l in obj.get("secondaryFiles", []):
110                 self.addentry(l, c, path, remap)
111         elif obj["class"] == "Directory":
112             for l in obj.get("listing", []):
113                 self.addentry(l, c, path + "/" + obj["basename"], remap)
114             remap.append((obj["location"], path + "/" + obj["basename"]))
115         elif obj["location"].startswith("_:") and "contents" in obj:
116             with c.open(path + "/" + obj["basename"], "w") as f:
117                 f.write(obj["contents"].encode("utf-8"))
118             remap.append((obj["location"], path + "/" + obj["basename"]))
119         else:
120             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
121
122     def needs_new_collection(self, srcobj, prefix=""):
123         """Check if files need to be staged into a new collection.
124
125         If all the files are in the same collection and in the same
126         paths they would be staged to, return False.  Otherwise, a new
127         collection is needed with files copied/created in the
128         appropriate places.
129         """
130
131         loc = srcobj["location"]
132         if loc.startswith("_:"):
133             return True
134         if prefix:
135             if loc != prefix+srcobj["basename"]:
136                 return True
137         else:
138             i = loc.rfind("/")
139             if i > -1:
140                 prefix = loc[:i+1]
141             else:
142                 prefix = loc+"/"
143         if srcobj["class"] == "File" and loc not in self._pathmap:
144             return True
145         for s in srcobj.get("secondaryFiles", []):
146             if self.needs_new_collection(s, prefix):
147                 return True
148         if srcobj.get("listing"):
149             prefix = "%s%s/" % (prefix, srcobj["basename"])
150             for l in srcobj["listing"]:
151                 if self.needs_new_collection(l, prefix):
152                     return True
153         return False
154
155     def setup(self, referenced_files, basedir):
156         # type: (List[Any], unicode) -> None
157         uploadfiles = set()
158
159         collection = None
160         if self.single_collection:
161             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
162                                                        keep_client=self.arvrunner.keep_client,
163                                                        num_retries=self.arvrunner.num_retries)
164
165         for srcobj in referenced_files:
166             self.visit(srcobj, uploadfiles)
167
168         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
169                                          self.arvrunner.api,
170                                          dry_run=False,
171                                          num_retries=self.arvrunner.num_retries,
172                                          fnPattern="keep:%s/%s",
173                                          name=self.name,
174                                          project=self.arvrunner.project_uuid,
175                                          collection=collection,
176                                          packed=False)
177
178         for src, ab, st in uploadfiles:
179             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
180                                            "Directory" if os.path.isdir(ab) else "File", True)
181
182         for srcobj in referenced_files:
183             remap = []
184             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
185                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
186                                                   keep_client=self.arvrunner.keep_client,
187                                                   num_retries=self.arvrunner.num_retries)
188                 for l in srcobj.get("listing", []):
189                     self.addentry(l, c, ".", remap)
190
191                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
192                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
193
194                 c.save_new(name=info["name"],
195                            owner_uuid=self.arvrunner.project_uuid,
196                            ensure_unique_name=True,
197                            trash_at=info["trash_at"],
198                            properties=info["properties"])
199
200                 ab = self.collection_pattern % c.portable_data_hash()
201                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
202             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
203                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
204
205                 # If all secondary files/directories are located in
206                 # the same collection as the primary file and the
207                 # paths and names that are consistent with staging,
208                 # don't create a new collection.
209                 if not self.needs_new_collection(srcobj):
210                     continue
211
212                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
213                                                   keep_client=self.arvrunner.keep_client,
214                                                   num_retries=self.arvrunner.num_retries                                                  )
215                 self.addentry(srcobj, c, ".", remap)
216
217                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
218                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
219
220                 c.save_new(name=info["name"],
221                            owner_uuid=self.arvrunner.project_uuid,
222                            ensure_unique_name=True,
223                            trash_at=info["trash_at"],
224                            properties=info["properties"])
225
226                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
227                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
228                                                               ab, "File", True)
229                 if srcobj.get("secondaryFiles"):
230                     ab = self.collection_pattern % c.portable_data_hash()
231                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
232
233             if remap:
234                 for loc, sub in remap:
235                     # subdirs start with "./", strip it off
236                     if sub.startswith("./"):
237                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
238                     else:
239                         ab = self.file_pattern % (c.portable_data_hash(), sub)
240                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
241                                                    ab, "Directory", True)
242
243         self.keepdir = None
244
245     def reversemap(self, target):
246         p = super(ArvPathMapper, self).reversemap(target)
247         if p:
248             return p
249         elif target.startswith("keep:"):
250             return (target, target)
251         elif self.keepdir and target.startswith(self.keepdir):
252             kp = "keep:" + target[len(self.keepdir)+1:]
253             return (kp, kp)
254         else:
255             return None
256
257
258 class StagingPathMapper(PathMapper):
259     _follow_dirs = True
260
261     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
262         self.targets = set()
263         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
264
265     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
266         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
267         loc = obj["location"]
268         tgt = os.path.join(stagedir, obj["basename"])
269         basetgt, baseext = os.path.splitext(tgt)
270         n = 1
271         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
272             while tgt in self.targets:
273                 n += 1
274                 tgt = "%s_%i%s" % (basetgt, n, baseext)
275         self.targets.add(tgt)
276         if obj["class"] == "Directory":
277             if obj.get("writable"):
278                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
279             else:
280                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
281             if loc.startswith("_:") or self._follow_dirs:
282                 self.visitlisting(obj.get("listing", []), tgt, basedir)
283         elif obj["class"] == "File":
284             if loc in self._pathmap:
285                 return
286             if "contents" in obj and loc.startswith("_:"):
287                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
288             else:
289                 if copy or obj.get("writable"):
290                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
291                 else:
292                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
293                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
294
295
296 class VwdPathMapper(StagingPathMapper):
297     def setup(self, referenced_files, basedir):
298         # type: (List[Any], unicode) -> None
299
300         # Go through each file and set the target to its own directory along
301         # with any secondary files.
302         self.visitlisting(referenced_files, self.stagedir, basedir)
303
304         for path, (ab, tgt, type, staged) in self._pathmap.items():
305             if type in ("File", "Directory") and ab.startswith("keep:"):
306                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
307
308
309 class NoFollowPathMapper(StagingPathMapper):
310     _follow_dirs = False
311     def setup(self, referenced_files, basedir):
312         # type: (List[Any], unicode) -> None
313         self.visitlisting(referenced_files, self.stagedir, basedir)