12347: Don't reuse runner job/container if arv:enableReuse is false.
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados.commands.run
12 import arvados.collection
13
14 from schema_salad.sourceline import SourceLine
15
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
18
19 logger = logging.getLogger('arvados.cwl-runner')
20
21 def trim_listing(obj):
22     """Remove 'listing' field from Directory objects that are keep references.
23
24     When Directory objects represent Keep references, it is redundant and
25     potentially very expensive to pass fully enumerated Directory objects
26     between instances of cwl-runner (e.g. a submitting a job, or using the
27     RunInSingleContainer feature), so delete the 'listing' field when it is
28     safe to do so.
29
30     """
31
32     if obj.get("location", "").startswith("keep:") and "listing" in obj:
33         del obj["listing"]
34
35
36 class ArvPathMapper(PathMapper):
37     """Convert container-local paths to and from Keep collection ids."""
38
39     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
41
42     def __init__(self, arvrunner, referenced_files, input_basedir,
43                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44         self.arvrunner = arvrunner
45         self.input_basedir = input_basedir
46         self.collection_pattern = collection_pattern
47         self.file_pattern = file_pattern
48         self.name = name
49         self.referenced_files = [r["location"] for r in referenced_files]
50         self.single_collection = single_collection
51         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
52
53     def visit(self, srcobj, uploadfiles):
54         src = srcobj["location"]
55         if "#" in src:
56             src = src[:src.index("#")]
57
58         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
60
61         if src not in self._pathmap:
62             if src.startswith("file:"):
63                 # Local FS ref, may need to be uploaded or may be on keep
64                 # mount.
65                 ab = abspath(src, self.input_basedir)
66                 st = arvados.commands.run.statfile("", ab,
67                                                    fnPattern="keep:%s/%s",
68                                                    dirPattern="keep:%s/%s",
69                                                    raiseOSError=True)
70                 with SourceLine(srcobj, "location", WorkflowException):
71                     if isinstance(st, arvados.commands.run.UploadFile):
72                         uploadfiles.add((src, ab, st))
73                     elif isinstance(st, arvados.commands.run.ArvFile):
74                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
75                     else:
76                         raise WorkflowException("Input file path '%s' is invalid" % st)
77             elif src.startswith("_:"):
78                 if srcobj["class"] == "File" and "contents" not in srcobj:
79                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
80                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
81                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
82             else:
83                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
84
85         with SourceLine(srcobj, "secondaryFiles", WorkflowException):
86             for l in srcobj.get("secondaryFiles", []):
87                 self.visit(l, uploadfiles)
88         with SourceLine(srcobj, "listing", WorkflowException):
89             for l in srcobj.get("listing", []):
90                 self.visit(l, uploadfiles)
91
92     def addentry(self, obj, c, path, subdirs):
93         if obj["location"] in self._pathmap:
94             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
95             if srcpath == "":
96                 srcpath = "."
97             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
98             for l in obj.get("secondaryFiles", []):
99                 self.addentry(l, c, path, subdirs)
100         elif obj["class"] == "Directory":
101             for l in obj.get("listing", []):
102                 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
103             subdirs.append((obj["location"], path + "/" + obj["basename"]))
104         elif obj["location"].startswith("_:") and "contents" in obj:
105             with c.open(path + "/" + obj["basename"], "w") as f:
106                 f.write(obj["contents"].encode("utf-8"))
107         else:
108             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
109
110     def setup(self, referenced_files, basedir):
111         # type: (List[Any], unicode) -> None
112         uploadfiles = set()
113
114         collection = None
115         if self.single_collection:
116             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
117                                                        keep_client=self.arvrunner.keep_client,
118                                                        num_retries=self.arvrunner.num_retries)
119
120         already_uploaded = self.arvrunner.get_uploaded()
121         copied_files = set()
122         for k in referenced_files:
123             loc = k["location"]
124             if loc in already_uploaded:
125                 v = already_uploaded[loc]
126                 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
127                 if self.single_collection:
128                     basename = k["basename"]
129                     if basename not in collection:
130                         self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
131                         copied_files.add((loc, basename, v.type))
132
133         for srcobj in referenced_files:
134             self.visit(srcobj, uploadfiles)
135
136         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
137                                          self.arvrunner.api,
138                                          dry_run=False,
139                                          num_retries=self.arvrunner.num_retries,
140                                          fnPattern="keep:%s/%s",
141                                          name=self.name,
142                                          project=self.arvrunner.project_uuid,
143                                          collection=collection)
144
145         for src, ab, st in uploadfiles:
146             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147                                            "Directory" if os.path.isdir(ab) else "File", True)
148             self.arvrunner.add_uploaded(src, self._pathmap[src])
149
150         for loc, basename, cls in copied_files:
151             fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
152             self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
153
154         for srcobj in referenced_files:
155             subdirs = []
156             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
157                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
158                                                   keep_client=self.arvrunner.keep_client,
159                                                   num_retries=self.arvrunner.num_retries)
160                 for l in srcobj.get("listing", []):
161                     self.addentry(l, c, ".", subdirs)
162
163                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
164                 if not check["items"]:
165                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
166
167                 ab = self.collection_pattern % c.portable_data_hash()
168                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
169             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
170                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
171
172                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
173                                                   keep_client=self.arvrunner.keep_client,
174                                                   num_retries=self.arvrunner.num_retries                                                  )
175                 self.addentry(srcobj, c, ".", subdirs)
176
177                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
178                 if not check["items"]:
179                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
180
181                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
182                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
183                                                               ab, "File", True)
184                 if srcobj.get("secondaryFiles"):
185                     ab = self.collection_pattern % c.portable_data_hash()
186                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
187
188             if subdirs:
189                 for loc, sub in subdirs:
190                     # subdirs will all start with "./", strip it off
191                     ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
192                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
193                                                    ab, "Directory", True)
194
195         self.keepdir = None
196
197     def reversemap(self, target):
198         p = super(ArvPathMapper, self).reversemap(target)
199         if p:
200             return p
201         elif target.startswith("keep:"):
202             return (target, target)
203         elif self.keepdir and target.startswith(self.keepdir):
204             kp = "keep:" + target[len(self.keepdir)+1:]
205             return (kp, kp)
206         else:
207             return None
208
209 class StagingPathMapper(PathMapper):
210     _follow_dirs = True
211
212     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
213         self.targets = set()
214         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
215
216     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
217         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
218         loc = obj["location"]
219         tgt = os.path.join(stagedir, obj["basename"])
220         basetgt, baseext = os.path.splitext(tgt)
221         n = 1
222         while tgt in self.targets:
223             n += 1
224             tgt = "%s_%i%s" % (basetgt, n, baseext)
225         self.targets.add(tgt)
226         if obj["class"] == "Directory":
227             self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
228             if loc.startswith("_:") or self._follow_dirs:
229                 self.visitlisting(obj.get("listing", []), tgt, basedir)
230         elif obj["class"] == "File":
231             if loc in self._pathmap:
232                 return
233             if "contents" in obj and loc.startswith("_:"):
234                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
235             else:
236                 if copy:
237                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
238                 else:
239                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
240                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
241
242
243 class VwdPathMapper(StagingPathMapper):
244     def setup(self, referenced_files, basedir):
245         # type: (List[Any], unicode) -> None
246
247         # Go through each file and set the target to its own directory along
248         # with any secondary files.
249         self.visitlisting(referenced_files, self.stagedir, basedir)
250
251         for path, (ab, tgt, type, staged) in self._pathmap.items():
252             if type in ("File", "Directory") and ab.startswith("keep:"):
253                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
254
255
256 class NoFollowPathMapper(StagingPathMapper):
257     _follow_dirs = False
258     def setup(self, referenced_files, basedir):
259         # type: (List[Any], unicode) -> None
260         self.visitlisting(referenced_files, self.stagedir, basedir)