13306: Fixed formatting for newly introduced imports
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9
10 import re
11 import logging
12 import uuid
13 import os
14 import urllib.request, urllib.parse, urllib.error
15
16 import arvados_cwl.util
17 import arvados.commands.run
18 import arvados.collection
19
20 from schema_salad.sourceline import SourceLine
21
22 from arvados.errors import ApiError
23 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
24 from cwltool.workflow import WorkflowException
25
26 from .http import http_to_keep
27
28 logger = logging.getLogger('arvados.cwl-runner')
29
30 def trim_listing(obj):
31     """Remove 'listing' field from Directory objects that are keep references.
32
33     When Directory objects represent Keep references, it is redundant and
34     potentially very expensive to pass fully enumerated Directory objects
35     between instances of cwl-runner (e.g. a submitting a job, or using the
36     RunInSingleContainer feature), so delete the 'listing' field when it is
37     safe to do so.
38
39     """
40
41     if obj.get("location", "").startswith("keep:") and "listing" in obj:
42         del obj["listing"]
43
44
45 class ArvPathMapper(PathMapper):
46     """Convert container-local paths to and from Keep collection ids."""
47
48     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
49     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
50
51     def __init__(self, arvrunner, referenced_files, input_basedir,
52                  collection_pattern, file_pattern, name=None, single_collection=False):
53         self.arvrunner = arvrunner
54         self.input_basedir = input_basedir
55         self.collection_pattern = collection_pattern
56         self.file_pattern = file_pattern
57         self.name = name
58         self.referenced_files = [r["location"] for r in referenced_files]
59         self.single_collection = single_collection
60         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
61
62     def visit(self, srcobj, uploadfiles):
63         src = srcobj["location"]
64         if "#" in src:
65             src = src[:src.index("#")]
66
67         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
68             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
69
70         debug = logger.isEnabledFor(logging.DEBUG)
71
72         if src not in self._pathmap:
73             if src.startswith("file:"):
74                 # Local FS ref, may need to be uploaded or may be on keep
75                 # mount.
76                 ab = abspath(src, self.input_basedir)
77                 st = arvados.commands.run.statfile("", ab,
78                                                    fnPattern="keep:%s/%s",
79                                                    dirPattern="keep:%s/%s",
80                                                    raiseOSError=True)
81                 with SourceLine(srcobj, "location", WorkflowException, debug):
82                     if isinstance(st, arvados.commands.run.UploadFile):
83                         uploadfiles.add((src, ab, st))
84                     elif isinstance(st, arvados.commands.run.ArvFile):
85                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
86                     else:
87                         raise WorkflowException("Input file path '%s' is invalid" % st)
88             elif src.startswith("_:"):
89                 if srcobj["class"] == "File" and "contents" not in srcobj:
90                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
91                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
92                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
93             elif src.startswith("http:") or src.startswith("https:"):
94                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
95                 logger.info("%s is %s", src, keepref)
96                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
97             else:
98                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
99
100         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
101             for l in srcobj.get("secondaryFiles", []):
102                 self.visit(l, uploadfiles)
103         with SourceLine(srcobj, "listing", WorkflowException, debug):
104             for l in srcobj.get("listing", []):
105                 self.visit(l, uploadfiles)
106
107     def addentry(self, obj, c, path, remap):
108         if obj["location"] in self._pathmap:
109             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
110             if srcpath == "":
111                 srcpath = "."
112             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
113             remap.append((obj["location"], path + "/" + obj["basename"]))
114             for l in obj.get("secondaryFiles", []):
115                 self.addentry(l, c, path, remap)
116         elif obj["class"] == "Directory":
117             for l in obj.get("listing", []):
118                 self.addentry(l, c, path + "/" + obj["basename"], remap)
119             remap.append((obj["location"], path + "/" + obj["basename"]))
120         elif obj["location"].startswith("_:") and "contents" in obj:
121             with c.open(path + "/" + obj["basename"], "w") as f:
122                 f.write(obj["contents"].encode("utf-8"))
123             remap.append((obj["location"], path + "/" + obj["basename"]))
124         else:
125             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
126
127     def needs_new_collection(self, srcobj, prefix=""):
128         """Check if files need to be staged into a new collection.
129
130         If all the files are in the same collection and in the same
131         paths they would be staged to, return False.  Otherwise, a new
132         collection is needed with files copied/created in the
133         appropriate places.
134         """
135
136         loc = srcobj["location"]
137         if loc.startswith("_:"):
138             return True
139         if prefix:
140             if loc != prefix+srcobj["basename"]:
141                 return True
142         else:
143             i = loc.rfind("/")
144             if i > -1:
145                 prefix = loc[:i+1]
146             else:
147                 prefix = loc+"/"
148         if srcobj["class"] == "File" and loc not in self._pathmap:
149             return True
150         for s in srcobj.get("secondaryFiles", []):
151             if self.needs_new_collection(s, prefix):
152                 return True
153         if srcobj.get("listing"):
154             prefix = "%s%s/" % (prefix, srcobj["basename"])
155             for l in srcobj["listing"]:
156                 if self.needs_new_collection(l, prefix):
157                     return True
158         return False
159
160     def setup(self, referenced_files, basedir):
161         # type: (List[Any], unicode) -> None
162         uploadfiles = set()
163
164         collection = None
165         if self.single_collection:
166             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
167                                                        keep_client=self.arvrunner.keep_client,
168                                                        num_retries=self.arvrunner.num_retries)
169
170         for srcobj in referenced_files:
171             self.visit(srcobj, uploadfiles)
172
173         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
174                                          self.arvrunner.api,
175                                          dry_run=False,
176                                          num_retries=self.arvrunner.num_retries,
177                                          fnPattern="keep:%s/%s",
178                                          name=self.name,
179                                          project=self.arvrunner.project_uuid,
180                                          collection=collection,
181                                          packed=False)
182
183         for src, ab, st in uploadfiles:
184             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
185                                            "Directory" if os.path.isdir(ab) else "File", True)
186
187         for srcobj in referenced_files:
188             remap = []
189             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
190                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
191                                                   keep_client=self.arvrunner.keep_client,
192                                                   num_retries=self.arvrunner.num_retries)
193                 for l in srcobj.get("listing", []):
194                     self.addentry(l, c, ".", remap)
195
196                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
197                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
198
199                 c.save_new(name=info["name"],
200                            owner_uuid=self.arvrunner.project_uuid,
201                            ensure_unique_name=True,
202                            trash_at=info["trash_at"],
203                            properties=info["properties"])
204
205                 ab = self.collection_pattern % c.portable_data_hash()
206                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
207             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
208                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
209
210                 # If all secondary files/directories are located in
211                 # the same collection as the primary file and the
212                 # paths and names that are consistent with staging,
213                 # don't create a new collection.
214                 if not self.needs_new_collection(srcobj):
215                     continue
216
217                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
218                                                   keep_client=self.arvrunner.keep_client,
219                                                   num_retries=self.arvrunner.num_retries                                                  )
220                 self.addentry(srcobj, c, ".", remap)
221
222                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
223                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
224
225                 c.save_new(name=info["name"],
226                            owner_uuid=self.arvrunner.project_uuid,
227                            ensure_unique_name=True,
228                            trash_at=info["trash_at"],
229                            properties=info["properties"])
230
231                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
232                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
233                                                               ab, "File", True)
234                 if srcobj.get("secondaryFiles"):
235                     ab = self.collection_pattern % c.portable_data_hash()
236                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
237
238             if remap:
239                 for loc, sub in remap:
240                     # subdirs start with "./", strip it off
241                     if sub.startswith("./"):
242                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
243                     else:
244                         ab = self.file_pattern % (c.portable_data_hash(), sub)
245                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
246                                                    ab, "Directory", True)
247
248         self.keepdir = None
249
250     def reversemap(self, target):
251         p = super(ArvPathMapper, self).reversemap(target)
252         if p:
253             return p
254         elif target.startswith("keep:"):
255             return (target, target)
256         elif self.keepdir and target.startswith(self.keepdir):
257             kp = "keep:" + target[len(self.keepdir)+1:]
258             return (kp, kp)
259         else:
260             return None
261
262
263 class StagingPathMapper(PathMapper):
264     _follow_dirs = True
265
266     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
267         self.targets = set()
268         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
269
270     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
271         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
272         loc = obj["location"]
273         tgt = os.path.join(stagedir, obj["basename"])
274         basetgt, baseext = os.path.splitext(tgt)
275         n = 1
276         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
277             while tgt in self.targets:
278                 n += 1
279                 tgt = "%s_%i%s" % (basetgt, n, baseext)
280         self.targets.add(tgt)
281         if obj["class"] == "Directory":
282             if obj.get("writable"):
283                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
284             else:
285                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
286             if loc.startswith("_:") or self._follow_dirs:
287                 self.visitlisting(obj.get("listing", []), tgt, basedir)
288         elif obj["class"] == "File":
289             if loc in self._pathmap:
290                 return
291             if "contents" in obj and loc.startswith("_:"):
292                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
293             else:
294                 if copy or obj.get("writable"):
295                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
296                 else:
297                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
298                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
299
300
301 class VwdPathMapper(StagingPathMapper):
302     def setup(self, referenced_files, basedir):
303         # type: (List[Any], unicode) -> None
304
305         # Go through each file and set the target to its own directory along
306         # with any secondary files.
307         self.visitlisting(referenced_files, self.stagedir, basedir)
308
309         for path, (ab, tgt, type, staged) in list(self._pathmap.items()):
310             if type in ("File", "Directory") and ab.startswith("keep:"):
311                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
312
313
314 class NoFollowPathMapper(StagingPathMapper):
315     _follow_dirs = False
316     def setup(self, referenced_files, basedir):
317         # type: (List[Any], unicode) -> None
318         self.visitlisting(referenced_files, self.stagedir, basedir)