Merge branch '13330-cwl-intermediate-collections-cleanup'
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 from arvados_cwl.util import get_current_container, get_intermediate_collection_info
12 import arvados.commands.run
13 import arvados.collection
14
15 from schema_salad.sourceline import SourceLine
16
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
20
21 from .http import http_to_keep
22
23 logger = logging.getLogger('arvados.cwl-runner')
24
25 def trim_listing(obj):
26     """Remove 'listing' field from Directory objects that are keep references.
27
28     When Directory objects represent Keep references, it is redundant and
29     potentially very expensive to pass fully enumerated Directory objects
30     between instances of cwl-runner (e.g. a submitting a job, or using the
31     RunInSingleContainer feature), so delete the 'listing' field when it is
32     safe to do so.
33
34     """
35
36     if obj.get("location", "").startswith("keep:") and "listing" in obj:
37         del obj["listing"]
38
39
40 class ArvPathMapper(PathMapper):
41     """Convert container-local paths to and from Keep collection ids."""
42
43     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
45
46     def __init__(self, arvrunner, referenced_files, input_basedir,
47                  collection_pattern, file_pattern, name=None, single_collection=False):
48         self.arvrunner = arvrunner
49         self.input_basedir = input_basedir
50         self.collection_pattern = collection_pattern
51         self.file_pattern = file_pattern
52         self.name = name
53         self.referenced_files = [r["location"] for r in referenced_files]
54         self.single_collection = single_collection
55         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
56
57     def visit(self, srcobj, uploadfiles):
58         src = srcobj["location"]
59         if "#" in src:
60             src = src[:src.index("#")]
61
62         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
64
65         debug = logger.isEnabledFor(logging.DEBUG)
66
67         if src not in self._pathmap:
68             if src.startswith("file:"):
69                 # Local FS ref, may need to be uploaded or may be on keep
70                 # mount.
71                 ab = abspath(src, self.input_basedir)
72                 st = arvados.commands.run.statfile("", ab,
73                                                    fnPattern="keep:%s/%s",
74                                                    dirPattern="keep:%s/%s",
75                                                    raiseOSError=True)
76                 with SourceLine(srcobj, "location", WorkflowException, debug):
77                     if isinstance(st, arvados.commands.run.UploadFile):
78                         uploadfiles.add((src, ab, st))
79                     elif isinstance(st, arvados.commands.run.ArvFile):
80                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
81                     else:
82                         raise WorkflowException("Input file path '%s' is invalid" % st)
83             elif src.startswith("_:"):
84                 if srcobj["class"] == "File" and "contents" not in srcobj:
85                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
86                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88             elif src.startswith("http:") or src.startswith("https:"):
89                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90                 logger.info("%s is %s", src, keepref)
91                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
92             else:
93                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
94
95         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96             for l in srcobj.get("secondaryFiles", []):
97                 self.visit(l, uploadfiles)
98         with SourceLine(srcobj, "listing", WorkflowException, debug):
99             for l in srcobj.get("listing", []):
100                 self.visit(l, uploadfiles)
101
102     def addentry(self, obj, c, path, remap):
103         if obj["location"] in self._pathmap:
104             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
105             if srcpath == "":
106                 srcpath = "."
107             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108             remap.append((obj["location"], path + "/" + obj["basename"]))
109             for l in obj.get("secondaryFiles", []):
110                 self.addentry(l, c, path, remap)
111         elif obj["class"] == "Directory":
112             for l in obj.get("listing", []):
113                 self.addentry(l, c, path + "/" + obj["basename"], remap)
114             remap.append((obj["location"], path + "/" + obj["basename"]))
115         elif obj["location"].startswith("_:") and "contents" in obj:
116             with c.open(path + "/" + obj["basename"], "w") as f:
117                 f.write(obj["contents"].encode("utf-8"))
118             remap.append((obj["location"], path + "/" + obj["basename"]))
119         else:
120             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
121
122     def setup(self, referenced_files, basedir):
123         # type: (List[Any], unicode) -> None
124         uploadfiles = set()
125
126         collection = None
127         if self.single_collection:
128             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
129                                                        keep_client=self.arvrunner.keep_client,
130                                                        num_retries=self.arvrunner.num_retries)
131
132         for srcobj in referenced_files:
133             self.visit(srcobj, uploadfiles)
134
135         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
136                                          self.arvrunner.api,
137                                          dry_run=False,
138                                          num_retries=self.arvrunner.num_retries,
139                                          fnPattern="keep:%s/%s",
140                                          name=self.name,
141                                          project=self.arvrunner.project_uuid,
142                                          collection=collection,
143                                          packed=False)
144
145         for src, ab, st in uploadfiles:
146             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147                                            "Directory" if os.path.isdir(ab) else "File", True)
148
149         for srcobj in referenced_files:
150             remap = []
151             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
152                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
153                                                   keep_client=self.arvrunner.keep_client,
154                                                   num_retries=self.arvrunner.num_retries)
155                 for l in srcobj.get("listing", []):
156                     self.addentry(l, c, ".", remap)
157
158                 container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
159                 info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
160
161                 c.save_new(name=info["name"],
162                            owner_uuid=self.arvrunner.project_uuid,
163                            ensure_unique_name=True,
164                            trash_at=info["trash_at"],
165                            properties=info["properties"])
166
167                 ab = self.collection_pattern % c.portable_data_hash()
168                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
169             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
170                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
171
172                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
173                                                   keep_client=self.arvrunner.keep_client,
174                                                   num_retries=self.arvrunner.num_retries                                                  )
175                 self.addentry(srcobj, c, ".", remap)
176
177                 container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
178                 info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
179
180                 c.save_new(name=info["name"],
181                            owner_uuid=self.arvrunner.project_uuid,
182                            ensure_unique_name=True,
183                            trash_at=info["trash_at"],
184                            properties=info["properties"])
185
186                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
187                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
188                                                               ab, "File", True)
189                 if srcobj.get("secondaryFiles"):
190                     ab = self.collection_pattern % c.portable_data_hash()
191                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
192
193             if remap:
194                 for loc, sub in remap:
195                     # subdirs start with "./", strip it off
196                     if sub.startswith("./"):
197                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
198                     else:
199                         ab = self.file_pattern % (c.portable_data_hash(), sub)
200                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
201                                                    ab, "Directory", True)
202
203         self.keepdir = None
204
205     def reversemap(self, target):
206         p = super(ArvPathMapper, self).reversemap(target)
207         if p:
208             return p
209         elif target.startswith("keep:"):
210             return (target, target)
211         elif self.keepdir and target.startswith(self.keepdir):
212             kp = "keep:" + target[len(self.keepdir)+1:]
213             return (kp, kp)
214         else:
215             return None
216
217
218 class StagingPathMapper(PathMapper):
219     _follow_dirs = True
220
221     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
222         self.targets = set()
223         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
224
225     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
226         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
227         loc = obj["location"]
228         tgt = os.path.join(stagedir, obj["basename"])
229         basetgt, baseext = os.path.splitext(tgt)
230         n = 1
231         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
232             while tgt in self.targets:
233                 n += 1
234                 tgt = "%s_%i%s" % (basetgt, n, baseext)
235         self.targets.add(tgt)
236         if obj["class"] == "Directory":
237             if obj.get("writable"):
238                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
239             else:
240                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
241             if loc.startswith("_:") or self._follow_dirs:
242                 self.visitlisting(obj.get("listing", []), tgt, basedir)
243         elif obj["class"] == "File":
244             if loc in self._pathmap:
245                 return
246             if "contents" in obj and loc.startswith("_:"):
247                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
248             else:
249                 if copy or obj.get("writable"):
250                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
251                 else:
252                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
253                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
254
255
256 class VwdPathMapper(StagingPathMapper):
257     def setup(self, referenced_files, basedir):
258         # type: (List[Any], unicode) -> None
259
260         # Go through each file and set the target to its own directory along
261         # with any secondary files.
262         self.visitlisting(referenced_files, self.stagedir, basedir)
263
264         for path, (ab, tgt, type, staged) in self._pathmap.items():
265             if type in ("File", "Directory") and ab.startswith("keep:"):
266                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
267
268
269 class NoFollowPathMapper(StagingPathMapper):
270     _follow_dirs = False
271     def setup(self, referenced_files, basedir):
272         # type: (List[Any], unicode) -> None
273         self.visitlisting(referenced_files, self.stagedir, basedir)