Merge branch '8784-dir-listings'
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados.commands.run
12 import arvados.collection
13
14 from schema_salad.sourceline import SourceLine
15
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
18
19 logger = logging.getLogger('arvados.cwl-runner')
20
21 def trim_listing(obj):
22     """Remove 'listing' field from Directory objects that are keep references.
23
24     When Directory objects represent Keep references, it is redundant and
25     potentially very expensive to pass fully enumerated Directory objects
26     between instances of cwl-runner (e.g. a submitting a job, or using the
27     RunInSingleContainer feature), so delete the 'listing' field when it is
28     safe to do so.
29
30     """
31
32     if obj.get("location", "").startswith("keep:") and "listing" in obj:
33         del obj["listing"]
34
35
36 class ArvPathMapper(PathMapper):
37     """Convert container-local paths to and from Keep collection ids."""
38
39     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
41
42     def __init__(self, arvrunner, referenced_files, input_basedir,
43                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44         self.arvrunner = arvrunner
45         self.input_basedir = input_basedir
46         self.collection_pattern = collection_pattern
47         self.file_pattern = file_pattern
48         self.name = name
49         self.referenced_files = [r["location"] for r in referenced_files]
50         self.single_collection = single_collection
51         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
52
53     def visit(self, srcobj, uploadfiles):
54         src = srcobj["location"]
55         if "#" in src:
56             src = src[:src.index("#")]
57
58         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
60
61         if src not in self._pathmap:
62             if src.startswith("file:"):
63                 # Local FS ref, may need to be uploaded or may be on keep
64                 # mount.
65                 ab = abspath(src, self.input_basedir)
66                 st = arvados.commands.run.statfile("", ab,
67                                                    fnPattern="keep:%s/%s",
68                                                    dirPattern="keep:%s/%s",
69                                                    raiseOSError=True)
70                 with SourceLine(srcobj, "location", WorkflowException):
71                     if isinstance(st, arvados.commands.run.UploadFile):
72                         uploadfiles.add((src, ab, st))
73                     elif isinstance(st, arvados.commands.run.ArvFile):
74                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
75                     else:
76                         raise WorkflowException("Input file path '%s' is invalid" % st)
77             elif src.startswith("_:"):
78                 if srcobj["class"] == "File" and "contents" not in srcobj:
79                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
80                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
81                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
82             else:
83                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
84
85         with SourceLine(srcobj, "secondaryFiles", WorkflowException):
86             for l in srcobj.get("secondaryFiles", []):
87                 self.visit(l, uploadfiles)
88         with SourceLine(srcobj, "listing", WorkflowException):
89             for l in srcobj.get("listing", []):
90                 self.visit(l, uploadfiles)
91
92     def addentry(self, obj, c, path, subdirs):
93         if obj["location"] in self._pathmap:
94             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
95             if srcpath == "":
96                 srcpath = "."
97             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
98             for l in obj.get("secondaryFiles", []):
99                 self.addentry(l, c, path, subdirs)
100         elif obj["class"] == "Directory":
101             for l in obj.get("listing", []):
102                 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
103             subdirs.append((obj["location"], path + "/" + obj["basename"]))
104         elif obj["location"].startswith("_:") and "contents" in obj:
105             with c.open(path + "/" + obj["basename"], "w") as f:
106                 f.write(obj["contents"].encode("utf-8"))
107         else:
108             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
109
110     def setup(self, referenced_files, basedir):
111         # type: (List[Any], unicode) -> None
112         uploadfiles = set()
113
114         collection = None
115         if self.single_collection:
116             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
117                                                        keep_client=self.arvrunner.keep_client,
118                                                        num_retries=self.arvrunner.num_retries)
119
120         already_uploaded = self.arvrunner.get_uploaded()
121         copied_files = set()
122         for k in referenced_files:
123             loc = k["location"]
124             if loc in already_uploaded:
125                 v = already_uploaded[loc]
126                 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
127                 if self.single_collection:
128                     basename = k["basename"]
129                     if basename not in collection:
130                         self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
131                         copied_files.add((loc, basename, v.type))
132
133         for srcobj in referenced_files:
134             self.visit(srcobj, uploadfiles)
135
136         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
137                                          self.arvrunner.api,
138                                          dry_run=False,
139                                          num_retries=self.arvrunner.num_retries,
140                                          fnPattern="keep:%s/%s",
141                                          name=self.name,
142                                          project=self.arvrunner.project_uuid,
143                                          collection=collection)
144
145         for src, ab, st in uploadfiles:
146             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147                                            "Directory" if os.path.isdir(ab) else "File", True)
148             self.arvrunner.add_uploaded(src, self._pathmap[src])
149
150         for loc, basename, cls in copied_files:
151             fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
152             self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
153
154         for srcobj in referenced_files:
155             subdirs = []
156             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
157                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
158                                                   keep_client=self.arvrunner.keep_client,
159                                                   num_retries=self.arvrunner.num_retries)
160                 for l in srcobj.get("listing", []):
161                     self.addentry(l, c, ".", subdirs)
162
163                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
164                 if not check["items"]:
165                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
166
167                 ab = self.collection_pattern % c.portable_data_hash()
168                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
169             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
170                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
171
172                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
173                                                   keep_client=self.arvrunner.keep_client,
174                                                   num_retries=self.arvrunner.num_retries                                                  )
175                 self.addentry(srcobj, c, ".", subdirs)
176
177                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
178                 if not check["items"]:
179                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
180
181                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
182                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
183                                                               ab, "File", True)
184                 if srcobj.get("secondaryFiles"):
185                     ab = self.collection_pattern % c.portable_data_hash()
186                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
187
188             if subdirs:
189                 for loc, sub in subdirs:
190                     # subdirs will all start with "./", strip it off
191                     ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
192                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
193                                                    ab, "Directory", True)
194
195         self.keepdir = None
196
197     def reversemap(self, target):
198         if target.startswith("keep:"):
199             return (target, target)
200         elif self.keepdir and target.startswith(self.keepdir):
201             return (target, "keep:" + target[len(self.keepdir)+1:])
202         else:
203             return super(ArvPathMapper, self).reversemap(target)
204
205 class StagingPathMapper(PathMapper):
206     _follow_dirs = True
207
208     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
209         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
210         loc = obj["location"]
211         tgt = os.path.join(stagedir, obj["basename"])
212         if obj["class"] == "Directory":
213             self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
214             if loc.startswith("_:") or self._follow_dirs:
215                 self.visitlisting(obj.get("listing", []), tgt, basedir)
216         elif obj["class"] == "File":
217             if loc in self._pathmap:
218                 return
219             if "contents" in obj and loc.startswith("_:"):
220                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
221             else:
222                 if copy:
223                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
224                 else:
225                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
226                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
227
228
229 class VwdPathMapper(StagingPathMapper):
230     def setup(self, referenced_files, basedir):
231         # type: (List[Any], unicode) -> None
232
233         # Go through each file and set the target to its own directory along
234         # with any secondary files.
235         self.visitlisting(referenced_files, self.stagedir, basedir)
236
237         for path, (ab, tgt, type, staged) in self._pathmap.items():
238             if type in ("File", "Directory") and ab.startswith("keep:"):
239                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
240
241
242 class NoFollowPathMapper(StagingPathMapper):
243     _follow_dirs = False
244     def setup(self, referenced_files, basedir):
245         # type: (List[Any], unicode) -> None
246         self.visitlisting(referenced_files, self.stagedir, basedir)