11684: Merge branch 'master' into 11684-unsigned-locator-fix
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 import re
2 import logging
3 import uuid
4 import os
5 import urllib
6
7 import arvados.commands.run
8 import arvados.collection
9
10 from schema_salad.sourceline import SourceLine
11
12 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
13 from cwltool.workflow import WorkflowException
14
15 logger = logging.getLogger('arvados.cwl-runner')
16
17 def trim_listing(obj):
18     """Remove 'listing' field from Directory objects that are keep references.
19
20     When Directory objects represent Keep references, it is redundant and
21     potentially very expensive to pass fully enumerated Directory objects
22     between instances of cwl-runner (e.g. a submitting a job, or using the
23     RunInSingleContainer feature), so delete the 'listing' field when it is
24     safe to do so.
25
26     """
27
28     if obj.get("location", "").startswith("keep:") and "listing" in obj:
29         del obj["listing"]
30
31
32 class ArvPathMapper(PathMapper):
33     """Convert container-local paths to and from Keep collection ids."""
34
35     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
36     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
37
38     def __init__(self, arvrunner, referenced_files, input_basedir,
39                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
40         self.arvrunner = arvrunner
41         self.input_basedir = input_basedir
42         self.collection_pattern = collection_pattern
43         self.file_pattern = file_pattern
44         self.name = name
45         self.referenced_files = [r["location"] for r in referenced_files]
46         self.single_collection = single_collection
47         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
48
49     def visit(self, srcobj, uploadfiles):
50         src = srcobj["location"]
51         if "#" in src:
52             src = src[:src.index("#")]
53
54         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
55             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
56
57         if src not in self._pathmap:
58             if src.startswith("file:"):
59                 # Local FS ref, may need to be uploaded or may be on keep
60                 # mount.
61                 ab = abspath(src, self.input_basedir)
62                 st = arvados.commands.run.statfile("", ab,
63                                                    fnPattern="keep:%s/%s",
64                                                    dirPattern="keep:%s/%s",
65                                                    raiseOSError=True)
66                 with SourceLine(srcobj, "location", WorkflowException):
67                     if isinstance(st, arvados.commands.run.UploadFile):
68                         uploadfiles.add((src, ab, st))
69                     elif isinstance(st, arvados.commands.run.ArvFile):
70                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
71                     else:
72                         raise WorkflowException("Input file path '%s' is invalid" % st)
73             elif src.startswith("_:"):
74                 if srcobj["class"] == "File" and "contents" not in srcobj:
75                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
76                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
77                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
78             else:
79                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
80
81         with SourceLine(srcobj, "secondaryFiles", WorkflowException):
82             for l in srcobj.get("secondaryFiles", []):
83                 self.visit(l, uploadfiles)
84         with SourceLine(srcobj, "listing", WorkflowException):
85             for l in srcobj.get("listing", []):
86                 self.visit(l, uploadfiles)
87
88     def addentry(self, obj, c, path, subdirs):
89         if obj["location"] in self._pathmap:
90             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
91             if srcpath == "":
92                 srcpath = "."
93             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
94             for l in obj.get("secondaryFiles", []):
95                 self.addentry(l, c, path, subdirs)
96         elif obj["class"] == "Directory":
97             for l in obj.get("listing", []):
98                 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
99             subdirs.append((obj["location"], path + "/" + obj["basename"]))
100         elif obj["location"].startswith("_:") and "contents" in obj:
101             with c.open(path + "/" + obj["basename"], "w") as f:
102                 f.write(obj["contents"].encode("utf-8"))
103         else:
104             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
105
106     def setup(self, referenced_files, basedir):
107         # type: (List[Any], unicode) -> None
108         uploadfiles = set()
109
110         collection = None
111         if self.single_collection:
112             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
113                                                        keep_client=self.arvrunner.keep_client,
114                                                        num_retries=self.arvrunner.num_retries)
115
116         already_uploaded = self.arvrunner.get_uploaded()
117         copied_files = set()
118         for k in referenced_files:
119             loc = k["location"]
120             if loc in already_uploaded:
121                 v = already_uploaded[loc]
122                 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
123                 if self.single_collection:
124                     basename = k["basename"]
125                     if basename not in collection:
126                         self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
127                         copied_files.add((loc, basename, v.type))
128
129         for srcobj in referenced_files:
130             self.visit(srcobj, uploadfiles)
131
132         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
133                                          self.arvrunner.api,
134                                          dry_run=False,
135                                          num_retries=self.arvrunner.num_retries,
136                                          fnPattern="keep:%s/%s",
137                                          name=self.name,
138                                          project=self.arvrunner.project_uuid,
139                                          collection=collection)
140
141         for src, ab, st in uploadfiles:
142             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
143                                            "Directory" if os.path.isdir(ab) else "File", True)
144             self.arvrunner.add_uploaded(src, self._pathmap[src])
145
146         for loc, basename, cls in copied_files:
147             fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
148             self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
149
150         for srcobj in referenced_files:
151             subdirs = []
152             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
153                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
154                                                   keep_client=self.arvrunner.keep_client,
155                                                   num_retries=self.arvrunner.num_retries)
156                 for l in srcobj.get("listing", []):
157                     self.addentry(l, c, ".", subdirs)
158
159                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
160                 if not check["items"]:
161                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
162
163                 ab = self.collection_pattern % c.portable_data_hash()
164                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
165             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
166                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
167
168                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
169                                                   keep_client=self.arvrunner.keep_client,
170                                                   num_retries=self.arvrunner.num_retries                                                  )
171                 self.addentry(srcobj, c, ".", subdirs)
172
173                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
174                 if not check["items"]:
175                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
176
177                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
178                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
179                                                               ab, "File", True)
180                 if srcobj.get("secondaryFiles"):
181                     ab = self.collection_pattern % c.portable_data_hash()
182                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
183
184             if subdirs:
185                 for loc, sub in subdirs:
186                     # subdirs will all start with "./", strip it off
187                     ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
188                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
189                                                    ab, "Directory", True)
190
191         self.keepdir = None
192
193     def reversemap(self, target):
194         if target.startswith("keep:"):
195             return (target, target)
196         elif self.keepdir and target.startswith(self.keepdir):
197             return (target, "keep:" + target[len(self.keepdir)+1:])
198         else:
199             return super(ArvPathMapper, self).reversemap(target)
200
201 class StagingPathMapper(PathMapper):
202     _follow_dirs = True
203
204     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
205         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
206         loc = obj["location"]
207         tgt = os.path.join(stagedir, obj["basename"])
208         if obj["class"] == "Directory":
209             self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
210             if loc.startswith("_:") or self._follow_dirs:
211                 self.visitlisting(obj.get("listing", []), tgt, basedir)
212         elif obj["class"] == "File":
213             if loc in self._pathmap:
214                 return
215             if "contents" in obj and loc.startswith("_:"):
216                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
217             else:
218                 if copy:
219                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
220                 else:
221                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
222                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
223
224
225 class VwdPathMapper(StagingPathMapper):
226     def setup(self, referenced_files, basedir):
227         # type: (List[Any], unicode) -> None
228
229         # Go through each file and set the target to its own directory along
230         # with any secondary files.
231         self.visitlisting(referenced_files, self.stagedir, basedir)
232
233         for path, (ab, tgt, type, staged) in self._pathmap.items():
234             if type in ("File", "Directory") and ab.startswith("keep:"):
235                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
236
237
238 class NoFollowPathMapper(StagingPathMapper):
239     _follow_dirs = False
240     def setup(self, referenced_files, basedir):
241         # type: (List[Any], unicode) -> None
242         self.visitlisting(referenced_files, self.stagedir, basedir)