12213: Remapping from source to destination collection needs to apply to files as...
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados.commands.run
12 import arvados.collection
13
14 from schema_salad.sourceline import SourceLine
15
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
18
19 logger = logging.getLogger('arvados.cwl-runner')
20
21 def trim_listing(obj):
22     """Remove 'listing' field from Directory objects that are keep references.
23
24     When Directory objects represent Keep references, it is redundant and
25     potentially very expensive to pass fully enumerated Directory objects
26     between instances of cwl-runner (e.g. a submitting a job, or using the
27     RunInSingleContainer feature), so delete the 'listing' field when it is
28     safe to do so.
29
30     """
31
32     if obj.get("location", "").startswith("keep:") and "listing" in obj:
33         del obj["listing"]
34
35
36 class ArvPathMapper(PathMapper):
37     """Convert container-local paths to and from Keep collection ids."""
38
39     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
41
42     def __init__(self, arvrunner, referenced_files, input_basedir,
43                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44         self.arvrunner = arvrunner
45         self.input_basedir = input_basedir
46         self.collection_pattern = collection_pattern
47         self.file_pattern = file_pattern
48         self.name = name
49         self.referenced_files = [r["location"] for r in referenced_files]
50         self.single_collection = single_collection
51         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
52
53     def visit(self, srcobj, uploadfiles):
54         src = srcobj["location"]
55         if "#" in src:
56             src = src[:src.index("#")]
57
58         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
60
61         debug = logger.isEnabledFor(logging.DEBUG)
62
63         if src not in self._pathmap:
64             if src.startswith("file:"):
65                 # Local FS ref, may need to be uploaded or may be on keep
66                 # mount.
67                 ab = abspath(src, self.input_basedir)
68                 st = arvados.commands.run.statfile("", ab,
69                                                    fnPattern="keep:%s/%s",
70                                                    dirPattern="keep:%s/%s",
71                                                    raiseOSError=True)
72                 with SourceLine(srcobj, "location", WorkflowException, debug):
73                     if isinstance(st, arvados.commands.run.UploadFile):
74                         uploadfiles.add((src, ab, st))
75                     elif isinstance(st, arvados.commands.run.ArvFile):
76                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
77                     else:
78                         raise WorkflowException("Input file path '%s' is invalid" % st)
79             elif src.startswith("_:"):
80                 if srcobj["class"] == "File" and "contents" not in srcobj:
81                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
82                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
83                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
84             else:
85                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
86
87         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
88             for l in srcobj.get("secondaryFiles", []):
89                 self.visit(l, uploadfiles)
90         with SourceLine(srcobj, "listing", WorkflowException, debug):
91             for l in srcobj.get("listing", []):
92                 self.visit(l, uploadfiles)
93
94     def addentry(self, obj, c, path, remap):
95         if obj["location"] in self._pathmap:
96             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
97             if srcpath == "":
98                 srcpath = "."
99             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
100             remap.append((obj["location"], path + "/" + obj["basename"]))
101             for l in obj.get("secondaryFiles", []):
102                 self.addentry(l, c, path, remap)
103         elif obj["class"] == "Directory":
104             for l in obj.get("listing", []):
105                 self.addentry(l, c, path + "/" + obj["basename"], remap)
106             remap.append((obj["location"], path + "/" + obj["basename"]))
107         elif obj["location"].startswith("_:") and "contents" in obj:
108             with c.open(path + "/" + obj["basename"], "w") as f:
109                 f.write(obj["contents"].encode("utf-8"))
110         else:
111             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
112
113     def setup(self, referenced_files, basedir):
114         # type: (List[Any], unicode) -> None
115         uploadfiles = set()
116
117         collection = None
118         if self.single_collection:
119             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
120                                                        keep_client=self.arvrunner.keep_client,
121                                                        num_retries=self.arvrunner.num_retries)
122
123         already_uploaded = self.arvrunner.get_uploaded()
124         copied_files = set()
125         for k in referenced_files:
126             loc = k["location"]
127             if loc in already_uploaded:
128                 v = already_uploaded[loc]
129                 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
130                 if self.single_collection:
131                     basename = k["basename"]
132                     if basename not in collection:
133                         self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
134                         copied_files.add((loc, basename, v.type))
135
136         for srcobj in referenced_files:
137             self.visit(srcobj, uploadfiles)
138
139         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
140                                          self.arvrunner.api,
141                                          dry_run=False,
142                                          num_retries=self.arvrunner.num_retries,
143                                          fnPattern="keep:%s/%s",
144                                          name=self.name,
145                                          project=self.arvrunner.project_uuid,
146                                          collection=collection)
147
148         for src, ab, st in uploadfiles:
149             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
150                                            "Directory" if os.path.isdir(ab) else "File", True)
151             self.arvrunner.add_uploaded(src, self._pathmap[src])
152
153         for loc, basename, cls in copied_files:
154             fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
155             self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
156
157         for srcobj in referenced_files:
158             remap = []
159             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
160                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
161                                                   keep_client=self.arvrunner.keep_client,
162                                                   num_retries=self.arvrunner.num_retries)
163                 for l in srcobj.get("listing", []):
164                     self.addentry(l, c, ".", remap)
165
166                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
167                 if not check["items"]:
168                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
169
170                 ab = self.collection_pattern % c.portable_data_hash()
171                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
172             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
173                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
174
175                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
176                                                   keep_client=self.arvrunner.keep_client,
177                                                   num_retries=self.arvrunner.num_retries                                                  )
178                 self.addentry(srcobj, c, ".", remap)
179
180                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
181                 if not check["items"]:
182                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
183
184                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
185                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
186                                                               ab, "File", True)
187                 if srcobj.get("secondaryFiles"):
188                     ab = self.collection_pattern % c.portable_data_hash()
189                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
190
191             if remap:
192                 for loc, sub in remap:
193                     # subdirs start with "./", strip it off
194                     if sub.startswith("./"):
195                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
196                     else:
197                         ab = self.file_pattern % (c.portable_data_hash(), sub)
198                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
199                                                    ab, "Directory", True)
200
201         self.keepdir = None
202
203     def reversemap(self, target):
204         p = super(ArvPathMapper, self).reversemap(target)
205         if p:
206             return p
207         elif target.startswith("keep:"):
208             return (target, target)
209         elif self.keepdir and target.startswith(self.keepdir):
210             kp = "keep:" + target[len(self.keepdir)+1:]
211             return (kp, kp)
212         else:
213             return None
214
215 class StagingPathMapper(PathMapper):
216     _follow_dirs = True
217
218     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
219         self.targets = set()
220         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
221
222     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
223         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
224         loc = obj["location"]
225         tgt = os.path.join(stagedir, obj["basename"])
226         basetgt, baseext = os.path.splitext(tgt)
227         n = 1
228         while tgt in self.targets:
229             n += 1
230             tgt = "%s_%i%s" % (basetgt, n, baseext)
231         self.targets.add(tgt)
232         if obj["class"] == "Directory":
233             self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
234             if loc.startswith("_:") or self._follow_dirs:
235                 self.visitlisting(obj.get("listing", []), tgt, basedir)
236         elif obj["class"] == "File":
237             if loc in self._pathmap:
238                 return
239             if "contents" in obj and loc.startswith("_:"):
240                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
241             else:
242                 if copy:
243                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
244                 else:
245                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
246                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
247
248
249 class VwdPathMapper(StagingPathMapper):
250     def setup(self, referenced_files, basedir):
251         # type: (List[Any], unicode) -> None
252
253         # Go through each file and set the target to its own directory along
254         # with any secondary files.
255         self.visitlisting(referenced_files, self.stagedir, basedir)
256
257         for path, (ab, tgt, type, staged) in self._pathmap.items():
258             if type in ("File", "Directory") and ab.startswith("keep:"):
259                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
260
261
262 class NoFollowPathMapper(StagingPathMapper):
263     _follow_dirs = False
264     def setup(self, referenced_files, basedir):
265         # type: (List[Any], unicode) -> None
266         self.visitlisting(referenced_files, self.stagedir, basedir)