Merge branch '12708-report-storage-classes'
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados.commands.run
12 import arvados.collection
13
14 from schema_salad.sourceline import SourceLine
15
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
18
19 logger = logging.getLogger('arvados.cwl-runner')
20
21 def trim_listing(obj):
22     """Remove 'listing' field from Directory objects that are keep references.
23
24     When Directory objects represent Keep references, it is redundant and
25     potentially very expensive to pass fully enumerated Directory objects
26     between instances of cwl-runner (e.g. a submitting a job, or using the
27     RunInSingleContainer feature), so delete the 'listing' field when it is
28     safe to do so.
29
30     """
31
32     if obj.get("location", "").startswith("keep:") and "listing" in obj:
33         del obj["listing"]
34
35
36 class ArvPathMapper(PathMapper):
37     """Convert container-local paths to and from Keep collection ids."""
38
39     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
41
42     def __init__(self, arvrunner, referenced_files, input_basedir,
43                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44         self.arvrunner = arvrunner
45         self.input_basedir = input_basedir
46         self.collection_pattern = collection_pattern
47         self.file_pattern = file_pattern
48         self.name = name
49         self.referenced_files = [r["location"] for r in referenced_files]
50         self.single_collection = single_collection
51         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
52
53     def visit(self, srcobj, uploadfiles):
54         src = srcobj["location"]
55         if "#" in src:
56             src = src[:src.index("#")]
57
58         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
60
61         debug = logger.isEnabledFor(logging.DEBUG)
62
63         if src not in self._pathmap:
64             if src.startswith("file:"):
65                 # Local FS ref, may need to be uploaded or may be on keep
66                 # mount.
67                 ab = abspath(src, self.input_basedir)
68                 st = arvados.commands.run.statfile("", ab,
69                                                    fnPattern="keep:%s/%s",
70                                                    dirPattern="keep:%s/%s",
71                                                    raiseOSError=True)
72                 with SourceLine(srcobj, "location", WorkflowException, debug):
73                     if isinstance(st, arvados.commands.run.UploadFile):
74                         uploadfiles.add((src, ab, st))
75                     elif isinstance(st, arvados.commands.run.ArvFile):
76                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
77                     else:
78                         raise WorkflowException("Input file path '%s' is invalid" % st)
79             elif src.startswith("_:"):
80                 if srcobj["class"] == "File" and "contents" not in srcobj:
81                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
82                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
83                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
84             else:
85                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
86
87         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
88             for l in srcobj.get("secondaryFiles", []):
89                 self.visit(l, uploadfiles)
90         with SourceLine(srcobj, "listing", WorkflowException, debug):
91             for l in srcobj.get("listing", []):
92                 self.visit(l, uploadfiles)
93
94     def addentry(self, obj, c, path, remap):
95         if obj["location"] in self._pathmap:
96             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
97             if srcpath == "":
98                 srcpath = "."
99             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
100             remap.append((obj["location"], path + "/" + obj["basename"]))
101             for l in obj.get("secondaryFiles", []):
102                 self.addentry(l, c, path, remap)
103         elif obj["class"] == "Directory":
104             for l in obj.get("listing", []):
105                 self.addentry(l, c, path + "/" + obj["basename"], remap)
106             remap.append((obj["location"], path + "/" + obj["basename"]))
107         elif obj["location"].startswith("_:") and "contents" in obj:
108             with c.open(path + "/" + obj["basename"], "w") as f:
109                 f.write(obj["contents"].encode("utf-8"))
110             remap.append((obj["location"], path + "/" + obj["basename"]))
111         else:
112             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
113
114     def setup(self, referenced_files, basedir):
115         # type: (List[Any], unicode) -> None
116         uploadfiles = set()
117
118         collection = None
119         if self.single_collection:
120             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
121                                                        keep_client=self.arvrunner.keep_client,
122                                                        num_retries=self.arvrunner.num_retries)
123
124         already_uploaded = self.arvrunner.get_uploaded()
125         copied_files = set()
126         for k in referenced_files:
127             loc = k["location"]
128             if loc in already_uploaded:
129                 v = already_uploaded[loc]
130                 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
131                 if self.single_collection:
132                     basename = k["basename"]
133                     if basename not in collection:
134                         self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
135                         copied_files.add((loc, basename, v.type))
136
137         for srcobj in referenced_files:
138             self.visit(srcobj, uploadfiles)
139
140         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
141                                          self.arvrunner.api,
142                                          dry_run=False,
143                                          num_retries=self.arvrunner.num_retries,
144                                          fnPattern="keep:%s/%s",
145                                          name=self.name,
146                                          project=self.arvrunner.project_uuid,
147                                          collection=collection)
148
149         for src, ab, st in uploadfiles:
150             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
151                                            "Directory" if os.path.isdir(ab) else "File", True)
152             self.arvrunner.add_uploaded(src, self._pathmap[src])
153
154         for loc, basename, cls in copied_files:
155             fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
156             self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
157
158         for srcobj in referenced_files:
159             remap = []
160             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
161                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
162                                                   keep_client=self.arvrunner.keep_client,
163                                                   num_retries=self.arvrunner.num_retries)
164                 for l in srcobj.get("listing", []):
165                     self.addentry(l, c, ".", remap)
166
167                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
168                 if not check["items"]:
169                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
170
171                 ab = self.collection_pattern % c.portable_data_hash()
172                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
173             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
174                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
175
176                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
177                                                   keep_client=self.arvrunner.keep_client,
178                                                   num_retries=self.arvrunner.num_retries                                                  )
179                 self.addentry(srcobj, c, ".", remap)
180
181                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
182                 if not check["items"]:
183                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
184
185                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
186                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
187                                                               ab, "File", True)
188                 if srcobj.get("secondaryFiles"):
189                     ab = self.collection_pattern % c.portable_data_hash()
190                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
191
192             if remap:
193                 for loc, sub in remap:
194                     # subdirs start with "./", strip it off
195                     if sub.startswith("./"):
196                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
197                     else:
198                         ab = self.file_pattern % (c.portable_data_hash(), sub)
199                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
200                                                    ab, "Directory", True)
201
202         self.keepdir = None
203
204     def reversemap(self, target):
205         p = super(ArvPathMapper, self).reversemap(target)
206         if p:
207             return p
208         elif target.startswith("keep:"):
209             return (target, target)
210         elif self.keepdir and target.startswith(self.keepdir):
211             kp = "keep:" + target[len(self.keepdir)+1:]
212             return (kp, kp)
213         else:
214             return None
215
216 class StagingPathMapper(PathMapper):
217     _follow_dirs = True
218
219     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
220         self.targets = set()
221         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
222
223     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
224         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
225         loc = obj["location"]
226         tgt = os.path.join(stagedir, obj["basename"])
227         basetgt, baseext = os.path.splitext(tgt)
228         n = 1
229         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
230             while tgt in self.targets:
231                 n += 1
232                 tgt = "%s_%i%s" % (basetgt, n, baseext)
233         self.targets.add(tgt)
234         if obj["class"] == "Directory":
235             if obj.get("writable"):
236                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
237             else:
238                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
239             if loc.startswith("_:") or self._follow_dirs:
240                 self.visitlisting(obj.get("listing", []), tgt, basedir)
241         elif obj["class"] == "File":
242             if loc in self._pathmap:
243                 return
244             if "contents" in obj and loc.startswith("_:"):
245                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
246             else:
247                 if copy or obj.get("writable"):
248                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
249                 else:
250                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
251                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
252
253
254 class VwdPathMapper(StagingPathMapper):
255     def setup(self, referenced_files, basedir):
256         # type: (List[Any], unicode) -> None
257
258         # Go through each file and set the target to its own directory along
259         # with any secondary files.
260         self.visitlisting(referenced_files, self.stagedir, basedir)
261
262         for path, (ab, tgt, type, staged) in self._pathmap.items():
263             if type in ("File", "Directory") and ab.startswith("keep:"):
264                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
265
266
267 class NoFollowPathMapper(StagingPathMapper):
268     _follow_dirs = False
269     def setup(self, referenced_files, basedir):
270         # type: (List[Any], unicode) -> None
271         self.visitlisting(referenced_files, self.stagedir, basedir)