Update intermediate collection name, properties and trash_at atrribute
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10 import datetime
11
12 import arvados.commands.run
13 import arvados.collection
14
15 from schema_salad.sourceline import SourceLine
16
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
20
21 from .http import http_to_keep
22
23 logger = logging.getLogger('arvados.cwl-runner')
24
25 def trim_listing(obj):
26     """Remove 'listing' field from Directory objects that are keep references.
27
28     When Directory objects represent Keep references, it is redundant and
29     potentially very expensive to pass fully enumerated Directory objects
30     between instances of cwl-runner (e.g. a submitting a job, or using the
31     RunInSingleContainer feature), so delete the 'listing' field when it is
32     safe to do so.
33
34     """
35
36     if obj.get("location", "").startswith("keep:") and "listing" in obj:
37         del obj["listing"]
38
39
40 class ArvPathMapper(PathMapper):
41     """Convert container-local paths to and from Keep collection ids."""
42
43     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
45
46     def __init__(self, arvrunner, referenced_files, input_basedir,
47                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
48         self.arvrunner = arvrunner
49         self.input_basedir = input_basedir
50         self.collection_pattern = collection_pattern
51         self.file_pattern = file_pattern
52         self.name = name
53         self.referenced_files = [r["location"] for r in referenced_files]
54         self.single_collection = single_collection
55         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
56
57     def visit(self, srcobj, uploadfiles):
58         src = srcobj["location"]
59         if "#" in src:
60             src = src[:src.index("#")]
61
62         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
64
65         debug = logger.isEnabledFor(logging.DEBUG)
66
67         if src not in self._pathmap:
68             if src.startswith("file:"):
69                 # Local FS ref, may need to be uploaded or may be on keep
70                 # mount.
71                 ab = abspath(src, self.input_basedir)
72                 st = arvados.commands.run.statfile("", ab,
73                                                    fnPattern="keep:%s/%s",
74                                                    dirPattern="keep:%s/%s",
75                                                    raiseOSError=True)
76                 with SourceLine(srcobj, "location", WorkflowException, debug):
77                     if isinstance(st, arvados.commands.run.UploadFile):
78                         uploadfiles.add((src, ab, st))
79                     elif isinstance(st, arvados.commands.run.ArvFile):
80                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
81                     else:
82                         raise WorkflowException("Input file path '%s' is invalid" % st)
83             elif src.startswith("_:"):
84                 if srcobj["class"] == "File" and "contents" not in srcobj:
85                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
86                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88             elif src.startswith("http:") or src.startswith("https:"):
89                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90                 logger.info("%s is %s", src, keepref)
91                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
92             else:
93                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
94
95         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96             for l in srcobj.get("secondaryFiles", []):
97                 self.visit(l, uploadfiles)
98         with SourceLine(srcobj, "listing", WorkflowException, debug):
99             for l in srcobj.get("listing", []):
100                 self.visit(l, uploadfiles)
101
102     def addentry(self, obj, c, path, remap):
103         if obj["location"] in self._pathmap:
104             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
105             if srcpath == "":
106                 srcpath = "."
107             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108             remap.append((obj["location"], path + "/" + obj["basename"]))
109             for l in obj.get("secondaryFiles", []):
110                 self.addentry(l, c, path, remap)
111         elif obj["class"] == "Directory":
112             for l in obj.get("listing", []):
113                 self.addentry(l, c, path + "/" + obj["basename"], remap)
114             remap.append((obj["location"], path + "/" + obj["basename"]))
115         elif obj["location"].startswith("_:") and "contents" in obj:
116             with c.open(path + "/" + obj["basename"], "w") as f:
117                 f.write(obj["contents"].encode("utf-8"))
118             remap.append((obj["location"], path + "/" + obj["basename"]))
119         else:
120             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
121
122     def setup(self, referenced_files, basedir):
123         # type: (List[Any], unicode) -> None
124         uploadfiles = set()
125
126         collection = None
127         if self.single_collection:
128             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
129                                                        keep_client=self.arvrunner.keep_client,
130                                                        num_retries=self.arvrunner.num_retries)
131
132         for srcobj in referenced_files:
133             self.visit(srcobj, uploadfiles)
134
135         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
136                                          self.arvrunner.api,
137                                          dry_run=False,
138                                          num_retries=self.arvrunner.num_retries,
139                                          fnPattern="keep:%s/%s",
140                                          name=self.name,
141                                          project=self.arvrunner.project_uuid,
142                                          collection=collection,
143                                          packed=False)
144
145         for src, ab, st in uploadfiles:
146             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147                                            "Directory" if os.path.isdir(ab) else "File", True)
148
149         for srcobj in referenced_files:
150             remap = []
151             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
152                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
153                                                   keep_client=self.arvrunner.keep_client,
154                                                   num_retries=self.arvrunner.num_retries)
155                 for l in srcobj.get("listing", []):
156                     self.addentry(l, c, ".", remap)
157
158                 trash_time, props = self.__get_collection_attributes()
159
160                 c.save_new(name="Intermediate collection", 
161                            owner_uuid=self.arvrunner.project_uuid, 
162                            ensure_unique_name=True, 
163                            trash_at=trash_time, 
164                            properties=props)
165
166                 ab = self.collection_pattern % c.portable_data_hash()
167                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
168             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
169                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
170
171                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
172                                                   keep_client=self.arvrunner.keep_client,
173                                                   num_retries=self.arvrunner.num_retries                                                  )
174                 self.addentry(srcobj, c, ".", remap)
175
176                 trash_time, props = self.__get_collection_attributes()
177
178                 c.save_new(name="Intermediate collection", 
179                            owner_uuid=self.arvrunner.project_uuid, 
180                            ensure_unique_name=True, 
181                            trash_at=trash_time, 
182                            properties=props)
183
184                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
185                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
186                                                               ab, "File", True)
187                 if srcobj.get("secondaryFiles"):
188                     ab = self.collection_pattern % c.portable_data_hash()
189                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
190
191             if remap:
192                 for loc, sub in remap:
193                     # subdirs start with "./", strip it off
194                     if sub.startswith("./"):
195                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
196                     else:
197                         ab = self.file_pattern % (c.portable_data_hash(), sub)
198                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
199                                                    ab, "Directory", True)
200
201         self.keepdir = None
202
203     def reversemap(self, target):
204         p = super(ArvPathMapper, self).reversemap(target)
205         if p:
206             return p
207         elif target.startswith("keep:"):
208             return (target, target)
209         elif self.keepdir and target.startswith(self.keepdir):
210             kp = "keep:" + target[len(self.keepdir)+1:]
211             return (kp, kp)
212         else:
213             return None
214
215     def __get_collection_attributes(self):
216             trash_time = None 
217             if self.arvrunner.intermediate_output_ttl > 0: 
218                 trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl) 
219
220             current_container_uuid = None 
221             try: 
222                 current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries) 
223                 current_container_uuid = current_container['uuid'] 
224             except ApiError as e: 
225                 # Status code 404 just means we're not running in a container. 
226                 if e.resp.status != 404: 
227                     logger.info("Getting current container: %s", e)
228             properties = {"type": "Intermediate", 
229                           "container": current_container_uuid}
230
231             return (trash_time, properties)
232
233
234 class StagingPathMapper(PathMapper):
235     _follow_dirs = True
236
237     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
238         self.targets = set()
239         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
240
241     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
242         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
243         loc = obj["location"]
244         tgt = os.path.join(stagedir, obj["basename"])
245         basetgt, baseext = os.path.splitext(tgt)
246         n = 1
247         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
248             while tgt in self.targets:
249                 n += 1
250                 tgt = "%s_%i%s" % (basetgt, n, baseext)
251         self.targets.add(tgt)
252         if obj["class"] == "Directory":
253             if obj.get("writable"):
254                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
255             else:
256                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
257             if loc.startswith("_:") or self._follow_dirs:
258                 self.visitlisting(obj.get("listing", []), tgt, basedir)
259         elif obj["class"] == "File":
260             if loc in self._pathmap:
261                 return
262             if "contents" in obj and loc.startswith("_:"):
263                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
264             else:
265                 if copy or obj.get("writable"):
266                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
267                 else:
268                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
269                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
270
271
272 class VwdPathMapper(StagingPathMapper):
273     def setup(self, referenced_files, basedir):
274         # type: (List[Any], unicode) -> None
275
276         # Go through each file and set the target to its own directory along
277         # with any secondary files.
278         self.visitlisting(referenced_files, self.stagedir, basedir)
279
280         for path, (ab, tgt, type, staged) in self._pathmap.items():
281             if type in ("File", "Directory") and ab.startswith("keep:"):
282                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
283
284
285 class NoFollowPathMapper(StagingPathMapper):
286     _follow_dirs = False
287     def setup(self, referenced_files, basedir):
288         # type: (List[Any], unicode) -> None
289         self.visitlisting(referenced_files, self.stagedir, basedir)