14327: Don't create new collection if source has expected secondaryFiles
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados_cwl.util
12 import arvados.commands.run
13 import arvados.collection
14
15 from schema_salad.sourceline import SourceLine
16
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
20
21 from .http import http_to_keep
22
23 logger = logging.getLogger('arvados.cwl-runner')
24
25 def trim_listing(obj):
26     """Remove 'listing' field from Directory objects that are keep references.
27
28     When Directory objects represent Keep references, it is redundant and
29     potentially very expensive to pass fully enumerated Directory objects
30     between instances of cwl-runner (e.g. a submitting a job, or using the
31     RunInSingleContainer feature), so delete the 'listing' field when it is
32     safe to do so.
33
34     """
35
36     if obj.get("location", "").startswith("keep:") and "listing" in obj:
37         del obj["listing"]
38
39
40 class ArvPathMapper(PathMapper):
41     """Convert container-local paths to and from Keep collection ids."""
42
43     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
45
46     def __init__(self, arvrunner, referenced_files, input_basedir,
47                  collection_pattern, file_pattern, name=None, single_collection=False):
48         self.arvrunner = arvrunner
49         self.input_basedir = input_basedir
50         self.collection_pattern = collection_pattern
51         self.file_pattern = file_pattern
52         self.name = name
53         self.referenced_files = [r["location"] for r in referenced_files]
54         self.single_collection = single_collection
55         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
56
57     def visit(self, srcobj, uploadfiles):
58         src = srcobj["location"]
59         if "#" in src:
60             src = src[:src.index("#")]
61
62         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
64
65         debug = logger.isEnabledFor(logging.DEBUG)
66
67         if src not in self._pathmap:
68             if src.startswith("file:"):
69                 # Local FS ref, may need to be uploaded or may be on keep
70                 # mount.
71                 ab = abspath(src, self.input_basedir)
72                 st = arvados.commands.run.statfile("", ab,
73                                                    fnPattern="keep:%s/%s",
74                                                    dirPattern="keep:%s/%s",
75                                                    raiseOSError=True)
76                 with SourceLine(srcobj, "location", WorkflowException, debug):
77                     if isinstance(st, arvados.commands.run.UploadFile):
78                         uploadfiles.add((src, ab, st))
79                     elif isinstance(st, arvados.commands.run.ArvFile):
80                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
81                     else:
82                         raise WorkflowException("Input file path '%s' is invalid" % st)
83             elif src.startswith("_:"):
84                 if srcobj["class"] == "File" and "contents" not in srcobj:
85                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
86                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88             elif src.startswith("http:") or src.startswith("https:"):
89                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90                 logger.info("%s is %s", src, keepref)
91                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
92             else:
93                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
94
95         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96             for l in srcobj.get("secondaryFiles", []):
97                 self.visit(l, uploadfiles)
98         with SourceLine(srcobj, "listing", WorkflowException, debug):
99             for l in srcobj.get("listing", []):
100                 self.visit(l, uploadfiles)
101
102     def addentry(self, obj, c, path, remap):
103         if obj["location"] in self._pathmap:
104             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
105             if srcpath == "":
106                 srcpath = "."
107             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108             remap.append((obj["location"], path + "/" + obj["basename"]))
109             for l in obj.get("secondaryFiles", []):
110                 self.addentry(l, c, path, remap)
111         elif obj["class"] == "Directory":
112             for l in obj.get("listing", []):
113                 self.addentry(l, c, path + "/" + obj["basename"], remap)
114             remap.append((obj["location"], path + "/" + obj["basename"]))
115         elif obj["location"].startswith("_:") and "contents" in obj:
116             with c.open(path + "/" + obj["basename"], "w") as f:
117                 f.write(obj["contents"].encode("utf-8"))
118             remap.append((obj["location"], path + "/" + obj["basename"]))
119         else:
120             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
121
122     def needs_new_collection(self, srcobj, prefix=""):
123         loc = srcobj["location"]
124         if loc.startswith("_:"):
125             return True
126         if prefix:
127             if loc != prefix+srcobj["basename"]:
128                 return True
129         else:
130             i = loc.rfind("/")
131             if i > -1:
132                 prefix = loc[:i+1]
133             else:
134                 prefix = loc+"/"
135         if srcobj["class"] == "File" and loc not in self._pathmap:
136             return True
137         if srcobj.get("secondaryFiles"):
138             for s in srcobj["secondaryFiles"]:
139                 if self.needs_new_collection(s, prefix):
140                     return True
141         if srcobj.get("listing"):
142             prefix = "%s%s/" % (prefix, srcobj["basename"])
143             for l in srcobj["listing"]:
144                 if self.needs_new_collection(l, prefix):
145                     return True
146         return False
147
148     def setup(self, referenced_files, basedir):
149         # type: (List[Any], unicode) -> None
150         uploadfiles = set()
151
152         collection = None
153         if self.single_collection:
154             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
155                                                        keep_client=self.arvrunner.keep_client,
156                                                        num_retries=self.arvrunner.num_retries)
157
158         for srcobj in referenced_files:
159             self.visit(srcobj, uploadfiles)
160
161         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
162                                          self.arvrunner.api,
163                                          dry_run=False,
164                                          num_retries=self.arvrunner.num_retries,
165                                          fnPattern="keep:%s/%s",
166                                          name=self.name,
167                                          project=self.arvrunner.project_uuid,
168                                          collection=collection,
169                                          packed=False)
170
171         for src, ab, st in uploadfiles:
172             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
173                                            "Directory" if os.path.isdir(ab) else "File", True)
174
175         for srcobj in referenced_files:
176             remap = []
177             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
178                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
179                                                   keep_client=self.arvrunner.keep_client,
180                                                   num_retries=self.arvrunner.num_retries)
181                 for l in srcobj.get("listing", []):
182                     self.addentry(l, c, ".", remap)
183
184                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
185                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
186
187                 c.save_new(name=info["name"],
188                            owner_uuid=self.arvrunner.project_uuid,
189                            ensure_unique_name=True,
190                            trash_at=info["trash_at"],
191                            properties=info["properties"])
192
193                 ab = self.collection_pattern % c.portable_data_hash()
194                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
195             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
196                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
197
198                 if not self.needs_new_collection(srcobj):
199                     continue
200
201                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
202                                                   keep_client=self.arvrunner.keep_client,
203                                                   num_retries=self.arvrunner.num_retries                                                  )
204                 self.addentry(srcobj, c, ".", remap)
205
206                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
208
209                 c.save_new(name=info["name"],
210                            owner_uuid=self.arvrunner.project_uuid,
211                            ensure_unique_name=True,
212                            trash_at=info["trash_at"],
213                            properties=info["properties"])
214
215                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
216                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
217                                                               ab, "File", True)
218                 if srcobj.get("secondaryFiles"):
219                     ab = self.collection_pattern % c.portable_data_hash()
220                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
221
222             if remap:
223                 for loc, sub in remap:
224                     # subdirs start with "./", strip it off
225                     if sub.startswith("./"):
226                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
227                     else:
228                         ab = self.file_pattern % (c.portable_data_hash(), sub)
229                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
230                                                    ab, "Directory", True)
231
232         self.keepdir = None
233
234     def reversemap(self, target):
235         p = super(ArvPathMapper, self).reversemap(target)
236         if p:
237             return p
238         elif target.startswith("keep:"):
239             return (target, target)
240         elif self.keepdir and target.startswith(self.keepdir):
241             kp = "keep:" + target[len(self.keepdir)+1:]
242             return (kp, kp)
243         else:
244             return None
245
246
247 class StagingPathMapper(PathMapper):
248     _follow_dirs = True
249
250     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
251         self.targets = set()
252         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
253
254     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
255         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
256         loc = obj["location"]
257         tgt = os.path.join(stagedir, obj["basename"])
258         basetgt, baseext = os.path.splitext(tgt)
259         n = 1
260         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
261             while tgt in self.targets:
262                 n += 1
263                 tgt = "%s_%i%s" % (basetgt, n, baseext)
264         self.targets.add(tgt)
265         if obj["class"] == "Directory":
266             if obj.get("writable"):
267                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
268             else:
269                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
270             if loc.startswith("_:") or self._follow_dirs:
271                 self.visitlisting(obj.get("listing", []), tgt, basedir)
272         elif obj["class"] == "File":
273             if loc in self._pathmap:
274                 return
275             if "contents" in obj and loc.startswith("_:"):
276                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
277             else:
278                 if copy or obj.get("writable"):
279                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
280                 else:
281                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
282                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
283
284
285 class VwdPathMapper(StagingPathMapper):
286     def setup(self, referenced_files, basedir):
287         # type: (List[Any], unicode) -> None
288
289         # Go through each file and set the target to its own directory along
290         # with any secondary files.
291         self.visitlisting(referenced_files, self.stagedir, basedir)
292
293         for path, (ab, tgt, type, staged) in self._pathmap.items():
294             if type in ("File", "Directory") and ab.startswith("keep:"):
295                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
296
297
298 class NoFollowPathMapper(StagingPathMapper):
299     _follow_dirs = False
300     def setup(self, referenced_files, basedir):
301         # type: (List[Any], unicode) -> None
302         self.visitlisting(referenced_files, self.stagedir, basedir)