14807: Merge branch 'master'
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
25 from cwltool.workflow import WorkflowException
26
27 from .http import http_to_keep
28
29 logger = logging.getLogger('arvados.cwl-runner')
30
31 def trim_listing(obj):
32     """Remove 'listing' field from Directory objects that are keep references.
33
34     When Directory objects represent Keep references, it is redundant and
35     potentially very expensive to pass fully enumerated Directory objects
36     between instances of cwl-runner (e.g. a submitting a job, or using the
37     RunInSingleContainer feature), so delete the 'listing' field when it is
38     safe to do so.
39
40     """
41
42     if obj.get("location", "").startswith("keep:") and "listing" in obj:
43         del obj["listing"]
44
45
46 class ArvPathMapper(PathMapper):
47     """Convert container-local paths to and from Keep collection ids."""
48
49     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
50     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
51
52     def __init__(self, arvrunner, referenced_files, input_basedir,
53                  collection_pattern, file_pattern, name=None, single_collection=False):
54         self.arvrunner = arvrunner
55         self.input_basedir = input_basedir
56         self.collection_pattern = collection_pattern
57         self.file_pattern = file_pattern
58         self.name = name
59         self.referenced_files = [r["location"] for r in referenced_files]
60         self.single_collection = single_collection
61         self.pdh_to_uuid = {}
62         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
63
64     def visit(self, srcobj, uploadfiles):
65         src = srcobj["location"]
66         if "#" in src:
67             src = src[:src.index("#")]
68
69         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
70             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
71             if arvados_cwl.util.collectionUUID in srcobj:
72                 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
73
74         debug = logger.isEnabledFor(logging.DEBUG)
75
76         if src not in self._pathmap:
77             if src.startswith("file:"):
78                 # Local FS ref, may need to be uploaded or may be on keep
79                 # mount.
80                 ab = abspath(src, self.input_basedir)
81                 st = arvados.commands.run.statfile("", ab,
82                                                    fnPattern="keep:%s/%s",
83                                                    dirPattern="keep:%s/%s",
84                                                    raiseOSError=True)
85                 with SourceLine(srcobj, "location", WorkflowException, debug):
86                     if isinstance(st, arvados.commands.run.UploadFile):
87                         uploadfiles.add((src, ab, st))
88                     elif isinstance(st, arvados.commands.run.ArvFile):
89                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
90                     else:
91                         raise WorkflowException("Input file path '%s' is invalid" % st)
92             elif src.startswith("_:"):
93                 if srcobj["class"] == "File" and "contents" not in srcobj:
94                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
95                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
96                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
97             elif src.startswith("http:") or src.startswith("https:"):
98                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
99                 logger.info("%s is %s", src, keepref)
100                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
101             else:
102                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
103
104         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
105             for l in srcobj.get("secondaryFiles", []):
106                 self.visit(l, uploadfiles)
107         with SourceLine(srcobj, "listing", WorkflowException, debug):
108             for l in srcobj.get("listing", []):
109                 self.visit(l, uploadfiles)
110
111     def addentry(self, obj, c, path, remap):
112         if obj["location"] in self._pathmap:
113             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
114             if srcpath == "":
115                 srcpath = "."
116             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
117             remap.append((obj["location"], path + "/" + obj["basename"]))
118             for l in obj.get("secondaryFiles", []):
119                 self.addentry(l, c, path, remap)
120         elif obj["class"] == "Directory":
121             for l in obj.get("listing", []):
122                 self.addentry(l, c, path + "/" + obj["basename"], remap)
123             remap.append((obj["location"], path + "/" + obj["basename"]))
124         elif obj["location"].startswith("_:") and "contents" in obj:
125             with c.open(path + "/" + obj["basename"], "w") as f:
126                 f.write(obj["contents"])
127             remap.append((obj["location"], path + "/" + obj["basename"]))
128         else:
129             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
130
131     def needs_new_collection(self, srcobj, prefix=""):
132         """Check if files need to be staged into a new collection.
133
134         If all the files are in the same collection and in the same
135         paths they would be staged to, return False.  Otherwise, a new
136         collection is needed with files copied/created in the
137         appropriate places.
138         """
139
140         loc = srcobj["location"]
141         if loc.startswith("_:"):
142             return True
143         if prefix:
144             if loc != prefix+srcobj["basename"]:
145                 return True
146         else:
147             i = loc.rfind("/")
148             if i > -1:
149                 prefix = loc[:i+1]
150             else:
151                 prefix = loc+"/"
152         if srcobj["class"] == "File" and loc not in self._pathmap:
153             return True
154         for s in srcobj.get("secondaryFiles", []):
155             if self.needs_new_collection(s, prefix):
156                 return True
157         if srcobj.get("listing"):
158             prefix = "%s%s/" % (prefix, srcobj["basename"])
159             for l in srcobj["listing"]:
160                 if self.needs_new_collection(l, prefix):
161                     return True
162         return False
163
164     def setup(self, referenced_files, basedir):
165         # type: (List[Any], unicode) -> None
166         uploadfiles = set()
167
168         collection = None
169         if self.single_collection:
170             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
171                                                        keep_client=self.arvrunner.keep_client,
172                                                        num_retries=self.arvrunner.num_retries)
173
174         for srcobj in referenced_files:
175             self.visit(srcobj, uploadfiles)
176
177         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
178                                          self.arvrunner.api,
179                                          dry_run=False,
180                                          num_retries=self.arvrunner.num_retries,
181                                          fnPattern="keep:%s/%s",
182                                          name=self.name,
183                                          project=self.arvrunner.project_uuid,
184                                          collection=collection,
185                                          packed=False)
186
187         for src, ab, st in uploadfiles:
188             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
189                                            "Directory" if os.path.isdir(ab) else "File", True)
190
191         for srcobj in referenced_files:
192             remap = []
193             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
194                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
195                                                   keep_client=self.arvrunner.keep_client,
196                                                   num_retries=self.arvrunner.num_retries)
197                 for l in srcobj.get("listing", []):
198                     self.addentry(l, c, ".", remap)
199
200                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
201                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
202
203                 c.save_new(name=info["name"],
204                            owner_uuid=self.arvrunner.project_uuid,
205                            ensure_unique_name=True,
206                            trash_at=info["trash_at"],
207                            properties=info["properties"])
208
209                 ab = self.collection_pattern % c.portable_data_hash()
210                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
211             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
212                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
213
214                 # If all secondary files/directories are located in
215                 # the same collection as the primary file and the
216                 # paths and names that are consistent with staging,
217                 # don't create a new collection.
218                 if not self.needs_new_collection(srcobj):
219                     continue
220
221                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
222                                                   keep_client=self.arvrunner.keep_client,
223                                                   num_retries=self.arvrunner.num_retries                                                  )
224                 self.addentry(srcobj, c, ".", remap)
225
226                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
227                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
228
229                 c.save_new(name=info["name"],
230                            owner_uuid=self.arvrunner.project_uuid,
231                            ensure_unique_name=True,
232                            trash_at=info["trash_at"],
233                            properties=info["properties"])
234
235                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
236                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
237                                                               ab, "File", True)
238                 if srcobj.get("secondaryFiles"):
239                     ab = self.collection_pattern % c.portable_data_hash()
240                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
241
242             if remap:
243                 for loc, sub in remap:
244                     # subdirs start with "./", strip it off
245                     if sub.startswith("./"):
246                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
247                     else:
248                         ab = self.file_pattern % (c.portable_data_hash(), sub)
249                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
250                                                    ab, "Directory", True)
251
252         self.keepdir = None
253
254     def reversemap(self, target):
255         p = super(ArvPathMapper, self).reversemap(target)
256         if p:
257             return p
258         elif target.startswith("keep:"):
259             return (target, target)
260         elif self.keepdir and target.startswith(self.keepdir):
261             kp = "keep:" + target[len(self.keepdir)+1:]
262             return (kp, kp)
263         else:
264             return None
265
266
267 class StagingPathMapper(PathMapper):
268     _follow_dirs = True
269
270     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
271         self.targets = set()
272         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
273
274     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
275         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
276         loc = obj["location"]
277         tgt = os.path.join(stagedir, obj["basename"])
278         basetgt, baseext = os.path.splitext(tgt)
279         n = 1
280         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
281             while tgt in self.targets:
282                 n += 1
283                 tgt = "%s_%i%s" % (basetgt, n, baseext)
284         self.targets.add(tgt)
285         if obj["class"] == "Directory":
286             if obj.get("writable"):
287                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
288             else:
289                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
290             if loc.startswith("_:") or self._follow_dirs:
291                 self.visitlisting(obj.get("listing", []), tgt, basedir)
292         elif obj["class"] == "File":
293             if loc in self._pathmap:
294                 return
295             if "contents" in obj and loc.startswith("_:"):
296                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
297             else:
298                 if copy or obj.get("writable"):
299                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
300                 else:
301                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
302                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
303
304
305 class VwdPathMapper(StagingPathMapper):
306     def setup(self, referenced_files, basedir):
307         # type: (List[Any], unicode) -> None
308
309         # Go through each file and set the target to its own directory along
310         # with any secondary files.
311         self.visitlisting(referenced_files, self.stagedir, basedir)
312
313         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
314             if type in ("File", "Directory") and ab.startswith("keep:"):
315                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
316
317
318 class NoFollowPathMapper(StagingPathMapper):
319     _follow_dirs = False
320     def setup(self, referenced_files, basedir):
321         # type: (List[Any], unicode) -> None
322         self.visitlisting(referenced_files, self.stagedir, basedir)