Merge branch '11545-stress-test' closes #11545
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados.commands.run
12 import arvados.collection
13
14 from schema_salad.sourceline import SourceLine
15
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
18
19 logger = logging.getLogger('arvados.cwl-runner')
20
21 def trim_listing(obj):
22     """Remove 'listing' field from Directory objects that are keep references.
23
24     When Directory objects represent Keep references, it is redundant and
25     potentially very expensive to pass fully enumerated Directory objects
26     between instances of cwl-runner (e.g. a submitting a job, or using the
27     RunInSingleContainer feature), so delete the 'listing' field when it is
28     safe to do so.
29
30     """
31
32     if obj.get("location", "").startswith("keep:") and "listing" in obj:
33         del obj["listing"]
34
35
36 class ArvPathMapper(PathMapper):
37     """Convert container-local paths to and from Keep collection ids."""
38
39     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
41
42     def __init__(self, arvrunner, referenced_files, input_basedir,
43                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44         self.arvrunner = arvrunner
45         self.input_basedir = input_basedir
46         self.collection_pattern = collection_pattern
47         self.file_pattern = file_pattern
48         self.name = name
49         self.referenced_files = [r["location"] for r in referenced_files]
50         self.single_collection = single_collection
51         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
52
53     def visit(self, srcobj, uploadfiles):
54         src = srcobj["location"]
55         if "#" in src:
56             src = src[:src.index("#")]
57
58         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
60
61         if src not in self._pathmap:
62             if src.startswith("file:"):
63                 # Local FS ref, may need to be uploaded or may be on keep
64                 # mount.
65                 ab = abspath(src, self.input_basedir)
66                 st = arvados.commands.run.statfile("", ab,
67                                                    fnPattern="keep:%s/%s",
68                                                    dirPattern="keep:%s/%s",
69                                                    raiseOSError=True)
70                 with SourceLine(srcobj, "location", WorkflowException):
71                     if isinstance(st, arvados.commands.run.UploadFile):
72                         uploadfiles.add((src, ab, st))
73                     elif isinstance(st, arvados.commands.run.ArvFile):
74                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
75                     else:
76                         raise WorkflowException("Input file path '%s' is invalid" % st)
77             elif src.startswith("_:"):
78                 if srcobj["class"] == "File" and "contents" not in srcobj:
79                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
80                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
81                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
82             else:
83                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
84
85         with SourceLine(srcobj, "secondaryFiles", WorkflowException):
86             for l in srcobj.get("secondaryFiles", []):
87                 self.visit(l, uploadfiles)
88         with SourceLine(srcobj, "listing", WorkflowException):
89             for l in srcobj.get("listing", []):
90                 self.visit(l, uploadfiles)
91
92     def addentry(self, obj, c, path, subdirs):
93         if obj["location"] in self._pathmap:
94             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
95             if srcpath == "":
96                 srcpath = "."
97             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
98             for l in obj.get("secondaryFiles", []):
99                 self.addentry(l, c, path, subdirs)
100         elif obj["class"] == "Directory":
101             for l in obj.get("listing", []):
102                 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
103             subdirs.append((obj["location"], path + "/" + obj["basename"]))
104         elif obj["location"].startswith("_:") and "contents" in obj:
105             with c.open(path + "/" + obj["basename"], "w") as f:
106                 f.write(obj["contents"].encode("utf-8"))
107         else:
108             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
109
110     def setup(self, referenced_files, basedir):
111         # type: (List[Any], unicode) -> None
112         uploadfiles = set()
113
114         collection = None
115         if self.single_collection:
116             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
117                                                        keep_client=self.arvrunner.keep_client,
118                                                        num_retries=self.arvrunner.num_retries)
119
120         already_uploaded = self.arvrunner.get_uploaded()
121         copied_files = set()
122         for k in referenced_files:
123             loc = k["location"]
124             if loc in already_uploaded:
125                 v = already_uploaded[loc]
126                 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
127                 if self.single_collection:
128                     basename = k["basename"]
129                     if basename not in collection:
130                         self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
131                         copied_files.add((loc, basename, v.type))
132
133         for srcobj in referenced_files:
134             self.visit(srcobj, uploadfiles)
135
136         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
137                                          self.arvrunner.api,
138                                          dry_run=False,
139                                          num_retries=self.arvrunner.num_retries,
140                                          fnPattern="keep:%s/%s",
141                                          name=self.name,
142                                          project=self.arvrunner.project_uuid,
143                                          collection=collection)
144
145         for src, ab, st in uploadfiles:
146             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147                                            "Directory" if os.path.isdir(ab) else "File", True)
148             self.arvrunner.add_uploaded(src, self._pathmap[src])
149
150         for loc, basename, cls in copied_files:
151             fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
152             self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
153
154         for srcobj in referenced_files:
155             subdirs = []
156             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
157                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
158                                                   keep_client=self.arvrunner.keep_client,
159                                                   num_retries=self.arvrunner.num_retries)
160                 for l in srcobj.get("listing", []):
161                     self.addentry(l, c, ".", subdirs)
162
163                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
164                 if not check["items"]:
165                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
166
167                 ab = self.collection_pattern % c.portable_data_hash()
168                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
169             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
170                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
171
172                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
173                                                   keep_client=self.arvrunner.keep_client,
174                                                   num_retries=self.arvrunner.num_retries                                                  )
175                 self.addentry(srcobj, c, ".", subdirs)
176
177                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
178                 if not check["items"]:
179                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
180
181                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
182                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
183                                                               ab, "File", True)
184                 if srcobj.get("secondaryFiles"):
185                     ab = self.collection_pattern % c.portable_data_hash()
186                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
187
188             if subdirs:
189                 for loc, sub in subdirs:
190                     # subdirs will all start with "./", strip it off
191                     ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
192                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
193                                                    ab, "Directory", True)
194
195         self.keepdir = None
196
197     def reversemap(self, target):
198         if target.startswith("keep:"):
199             return (target, target)
200         elif self.keepdir and target.startswith(self.keepdir):
201             return (target, "keep:" + target[len(self.keepdir)+1:])
202         else:
203             return super(ArvPathMapper, self).reversemap(target)
204
205 class StagingPathMapper(PathMapper):
206     _follow_dirs = True
207
208     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
209         self.targets = set()
210         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
211
212     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
213         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
214         loc = obj["location"]
215         tgt = os.path.join(stagedir, obj["basename"])
216         basetgt, baseext = os.path.splitext(tgt)
217         n = 1
218         while tgt in self.targets:
219             n += 1
220             tgt = "%s_%i%s" % (basetgt, n, baseext)
221         self.targets.add(tgt)
222         if obj["class"] == "Directory":
223             self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
224             if loc.startswith("_:") or self._follow_dirs:
225                 self.visitlisting(obj.get("listing", []), tgt, basedir)
226         elif obj["class"] == "File":
227             if loc in self._pathmap:
228                 return
229             if "contents" in obj and loc.startswith("_:"):
230                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
231             else:
232                 if copy:
233                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
234                 else:
235                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
236                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
237
238
239 class VwdPathMapper(StagingPathMapper):
240     def setup(self, referenced_files, basedir):
241         # type: (List[Any], unicode) -> None
242
243         # Go through each file and set the target to its own directory along
244         # with any secondary files.
245         self.visitlisting(referenced_files, self.stagedir, basedir)
246
247         for path, (ab, tgt, type, staged) in self._pathmap.items():
248             if type in ("File", "Directory") and ab.startswith("keep:"):
249                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
250
251
252 class NoFollowPathMapper(StagingPathMapper):
253     _follow_dirs = False
254     def setup(self, referenced_files, basedir):
255         # type: (List[Any], unicode) -> None
256         self.visitlisting(referenced_files, self.stagedir, basedir)