Merge branch 'master' into 7478-anm-spot-instances
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados.commands.run
12 import arvados.collection
13
14 from schema_salad.sourceline import SourceLine
15
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
18
19 from .http import http_to_keep
20
21 logger = logging.getLogger('arvados.cwl-runner')
22
23 def trim_listing(obj):
24     """Remove 'listing' field from Directory objects that are keep references.
25
26     When Directory objects represent Keep references, it is redundant and
27     potentially very expensive to pass fully enumerated Directory objects
28     between instances of cwl-runner (e.g. a submitting a job, or using the
29     RunInSingleContainer feature), so delete the 'listing' field when it is
30     safe to do so.
31
32     """
33
34     if obj.get("location", "").startswith("keep:") and "listing" in obj:
35         del obj["listing"]
36
37
38 class ArvPathMapper(PathMapper):
39     """Convert container-local paths to and from Keep collection ids."""
40
41     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
42     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
43
44     def __init__(self, arvrunner, referenced_files, input_basedir,
45                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
46         self.arvrunner = arvrunner
47         self.input_basedir = input_basedir
48         self.collection_pattern = collection_pattern
49         self.file_pattern = file_pattern
50         self.name = name
51         self.referenced_files = [r["location"] for r in referenced_files]
52         self.single_collection = single_collection
53         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
54
55     def visit(self, srcobj, uploadfiles):
56         src = srcobj["location"]
57         if "#" in src:
58             src = src[:src.index("#")]
59
60         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
61             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
62
63         debug = logger.isEnabledFor(logging.DEBUG)
64
65         if src not in self._pathmap:
66             if src.startswith("file:"):
67                 # Local FS ref, may need to be uploaded or may be on keep
68                 # mount.
69                 ab = abspath(src, self.input_basedir)
70                 st = arvados.commands.run.statfile("", ab,
71                                                    fnPattern="keep:%s/%s",
72                                                    dirPattern="keep:%s/%s",
73                                                    raiseOSError=True)
74                 with SourceLine(srcobj, "location", WorkflowException, debug):
75                     if isinstance(st, arvados.commands.run.UploadFile):
76                         uploadfiles.add((src, ab, st))
77                     elif isinstance(st, arvados.commands.run.ArvFile):
78                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
79                     else:
80                         raise WorkflowException("Input file path '%s' is invalid" % st)
81             elif src.startswith("_:"):
82                 if srcobj["class"] == "File" and "contents" not in srcobj:
83                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
84                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
85                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
86             elif src.startswith("http:") or src.startswith("https:"):
87                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
88                 logger.info("%s is %s", src, keepref)
89                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
90             else:
91                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
92
93         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
94             for l in srcobj.get("secondaryFiles", []):
95                 self.visit(l, uploadfiles)
96         with SourceLine(srcobj, "listing", WorkflowException, debug):
97             for l in srcobj.get("listing", []):
98                 self.visit(l, uploadfiles)
99
100     def addentry(self, obj, c, path, remap):
101         if obj["location"] in self._pathmap:
102             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
103             if srcpath == "":
104                 srcpath = "."
105             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
106             remap.append((obj["location"], path + "/" + obj["basename"]))
107             for l in obj.get("secondaryFiles", []):
108                 self.addentry(l, c, path, remap)
109         elif obj["class"] == "Directory":
110             for l in obj.get("listing", []):
111                 self.addentry(l, c, path + "/" + obj["basename"], remap)
112             remap.append((obj["location"], path + "/" + obj["basename"]))
113         elif obj["location"].startswith("_:") and "contents" in obj:
114             with c.open(path + "/" + obj["basename"], "w") as f:
115                 f.write(obj["contents"].encode("utf-8"))
116             remap.append((obj["location"], path + "/" + obj["basename"]))
117         else:
118             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
119
120     def setup(self, referenced_files, basedir):
121         # type: (List[Any], unicode) -> None
122         uploadfiles = set()
123
124         collection = None
125         if self.single_collection:
126             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
127                                                        keep_client=self.arvrunner.keep_client,
128                                                        num_retries=self.arvrunner.num_retries)
129
130         already_uploaded = self.arvrunner.get_uploaded()
131         copied_files = set()
132         for k in referenced_files:
133             loc = k["location"]
134             if loc in already_uploaded:
135                 v = already_uploaded[loc]
136                 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
137                 if self.single_collection:
138                     basename = k["basename"]
139                     if basename not in collection:
140                         self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
141                         copied_files.add((loc, basename, v.type))
142
143         for srcobj in referenced_files:
144             self.visit(srcobj, uploadfiles)
145
146         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
147                                          self.arvrunner.api,
148                                          dry_run=False,
149                                          num_retries=self.arvrunner.num_retries,
150                                          fnPattern="keep:%s/%s",
151                                          name=self.name,
152                                          project=self.arvrunner.project_uuid,
153                                          collection=collection)
154
155         for src, ab, st in uploadfiles:
156             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
157                                            "Directory" if os.path.isdir(ab) else "File", True)
158             self.arvrunner.add_uploaded(src, self._pathmap[src])
159
160         for loc, basename, cls in copied_files:
161             fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
162             self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
163
164         for srcobj in referenced_files:
165             remap = []
166             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
167                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
168                                                   keep_client=self.arvrunner.keep_client,
169                                                   num_retries=self.arvrunner.num_retries)
170                 for l in srcobj.get("listing", []):
171                     self.addentry(l, c, ".", remap)
172
173                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
174                 if not check["items"]:
175                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
176
177                 ab = self.collection_pattern % c.portable_data_hash()
178                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
179             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
180                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
181
182                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
183                                                   keep_client=self.arvrunner.keep_client,
184                                                   num_retries=self.arvrunner.num_retries                                                  )
185                 self.addentry(srcobj, c, ".", remap)
186
187                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
188                 if not check["items"]:
189                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
190
191                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
192                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
193                                                               ab, "File", True)
194                 if srcobj.get("secondaryFiles"):
195                     ab = self.collection_pattern % c.portable_data_hash()
196                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
197
198             if remap:
199                 for loc, sub in remap:
200                     # subdirs start with "./", strip it off
201                     if sub.startswith("./"):
202                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
203                     else:
204                         ab = self.file_pattern % (c.portable_data_hash(), sub)
205                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
206                                                    ab, "Directory", True)
207
208         self.keepdir = None
209
210     def reversemap(self, target):
211         p = super(ArvPathMapper, self).reversemap(target)
212         if p:
213             return p
214         elif target.startswith("keep:"):
215             return (target, target)
216         elif self.keepdir and target.startswith(self.keepdir):
217             kp = "keep:" + target[len(self.keepdir)+1:]
218             return (kp, kp)
219         else:
220             return None
221
222 class StagingPathMapper(PathMapper):
223     _follow_dirs = True
224
225     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
226         self.targets = set()
227         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
228
229     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
230         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
231         loc = obj["location"]
232         tgt = os.path.join(stagedir, obj["basename"])
233         basetgt, baseext = os.path.splitext(tgt)
234         n = 1
235         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
236             while tgt in self.targets:
237                 n += 1
238                 tgt = "%s_%i%s" % (basetgt, n, baseext)
239         self.targets.add(tgt)
240         if obj["class"] == "Directory":
241             if obj.get("writable"):
242                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
243             else:
244                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
245             if loc.startswith("_:") or self._follow_dirs:
246                 self.visitlisting(obj.get("listing", []), tgt, basedir)
247         elif obj["class"] == "File":
248             if loc in self._pathmap:
249                 return
250             if "contents" in obj and loc.startswith("_:"):
251                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
252             else:
253                 if copy or obj.get("writable"):
254                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
255                 else:
256                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
257                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
258
259
260 class VwdPathMapper(StagingPathMapper):
261     def setup(self, referenced_files, basedir):
262         # type: (List[Any], unicode) -> None
263
264         # Go through each file and set the target to its own directory along
265         # with any secondary files.
266         self.visitlisting(referenced_files, self.stagedir, basedir)
267
268         for path, (ab, tgt, type, staged) in self._pathmap.items():
269             if type in ("File", "Directory") and ab.startswith("keep:"):
270                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
271
272
273 class NoFollowPathMapper(StagingPathMapper):
274     _follow_dirs = False
275     def setup(self, referenced_files, basedir):
276         # type: (List[Any], unicode) -> None
277         self.visitlisting(referenced_files, self.stagedir, basedir)