Remove trailig white-space
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10 import datetime
11
12 import arvados.commands.run
13 import arvados.collection
14
15 from schema_salad.sourceline import SourceLine
16
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
20
21 from .http import http_to_keep
22
23 logger = logging.getLogger('arvados.cwl-runner')
24
25 def trim_listing(obj):
26     """Remove 'listing' field from Directory objects that are keep references.
27
28     When Directory objects represent Keep references, it is redundant and
29     potentially very expensive to pass fully enumerated Directory objects
30     between instances of cwl-runner (e.g. a submitting a job, or using the
31     RunInSingleContainer feature), so delete the 'listing' field when it is
32     safe to do so.
33
34     """
35
36     if obj.get("location", "").startswith("keep:") and "listing" in obj:
37         del obj["listing"]
38
39
40 class ArvPathMapper(PathMapper):
41     """Convert container-local paths to and from Keep collection ids."""
42
43     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
45
46     def __init__(self, arvrunner, referenced_files, input_basedir,
47                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
48         self.arvrunner = arvrunner
49         self.input_basedir = input_basedir
50         self.collection_pattern = collection_pattern
51         self.file_pattern = file_pattern
52         self.name = name
53         self.referenced_files = [r["location"] for r in referenced_files]
54         self.single_collection = single_collection
55         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
56
57     def visit(self, srcobj, uploadfiles):
58         src = srcobj["location"]
59         if "#" in src:
60             src = src[:src.index("#")]
61
62         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
64
65         debug = logger.isEnabledFor(logging.DEBUG)
66
67         if src not in self._pathmap:
68             if src.startswith("file:"):
69                 # Local FS ref, may need to be uploaded or may be on keep
70                 # mount.
71                 ab = abspath(src, self.input_basedir)
72                 st = arvados.commands.run.statfile("", ab,
73                                                    fnPattern="keep:%s/%s",
74                                                    dirPattern="keep:%s/%s",
75                                                    raiseOSError=True)
76                 with SourceLine(srcobj, "location", WorkflowException, debug):
77                     if isinstance(st, arvados.commands.run.UploadFile):
78                         uploadfiles.add((src, ab, st))
79                     elif isinstance(st, arvados.commands.run.ArvFile):
80                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
81                     else:
82                         raise WorkflowException("Input file path '%s' is invalid" % st)
83             elif src.startswith("_:"):
84                 if srcobj["class"] == "File" and "contents" not in srcobj:
85                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
86                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88             elif src.startswith("http:") or src.startswith("https:"):
89                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90                 logger.info("%s is %s", src, keepref)
91                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
92             else:
93                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
94
95         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96             for l in srcobj.get("secondaryFiles", []):
97                 self.visit(l, uploadfiles)
98         with SourceLine(srcobj, "listing", WorkflowException, debug):
99             for l in srcobj.get("listing", []):
100                 self.visit(l, uploadfiles)
101
102     def addentry(self, obj, c, path, remap):
103         if obj["location"] in self._pathmap:
104             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
105             if srcpath == "":
106                 srcpath = "."
107             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108             remap.append((obj["location"], path + "/" + obj["basename"]))
109             for l in obj.get("secondaryFiles", []):
110                 self.addentry(l, c, path, remap)
111         elif obj["class"] == "Directory":
112             for l in obj.get("listing", []):
113                 self.addentry(l, c, path + "/" + obj["basename"], remap)
114             remap.append((obj["location"], path + "/" + obj["basename"]))
115         elif obj["location"].startswith("_:") and "contents" in obj:
116             with c.open(path + "/" + obj["basename"], "w") as f:
117                 f.write(obj["contents"].encode("utf-8"))
118             remap.append((obj["location"], path + "/" + obj["basename"]))
119         else:
120             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
121
122     def setup(self, referenced_files, basedir):
123         # type: (List[Any], unicode) -> None
124         uploadfiles = set()
125
126         collection = None
127         if self.single_collection:
128             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
129                                                        keep_client=self.arvrunner.keep_client,
130                                                        num_retries=self.arvrunner.num_retries)
131
132         for srcobj in referenced_files:
133             self.visit(srcobj, uploadfiles)
134
135         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
136                                          self.arvrunner.api,
137                                          dry_run=False,
138                                          num_retries=self.arvrunner.num_retries,
139                                          fnPattern="keep:%s/%s",
140                                          name=self.name,
141                                          project=self.arvrunner.project_uuid,
142                                          collection=collection,
143                                          packed=False)
144
145         for src, ab, st in uploadfiles:
146             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147                                            "Directory" if os.path.isdir(ab) else "File", True)
148
149         for srcobj in referenced_files:
150             remap = []
151             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
152                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
153                                                   keep_client=self.arvrunner.keep_client,
154                                                   num_retries=self.arvrunner.num_retries)
155                 for l in srcobj.get("listing", []):
156                     self.addentry(l, c, ".", remap)
157
158                 info = self._get_intermediate_collection_info()
159
160                 c.save_new(name=info["name"],
161                            owner_uuid=self.arvrunner.project_uuid,
162                            ensure_unique_name=True,
163                            trash_at=info["trash_at"],
164                            properties=info["properties"])
165
166                 ab = self.collection_pattern % c.portable_data_hash()
167                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
168             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
169                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
170
171                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
172                                                   keep_client=self.arvrunner.keep_client,
173                                                   num_retries=self.arvrunner.num_retries                                                  )
174                 self.addentry(srcobj, c, ".", remap)
175
176                 info = self._get_intermediate_collection_info()
177
178                 c.save_new(name=info["name"],
179                            owner_uuid=self.arvrunner.project_uuid,
180                            ensure_unique_name=True,
181                            trash_at=info["trash_at"],
182                            properties=info["properties"])
183
184                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
185                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
186                                                               ab, "File", True)
187                 if srcobj.get("secondaryFiles"):
188                     ab = self.collection_pattern % c.portable_data_hash()
189                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
190
191             if remap:
192                 for loc, sub in remap:
193                     # subdirs start with "./", strip it off
194                     if sub.startswith("./"):
195                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
196                     else:
197                         ab = self.file_pattern % (c.portable_data_hash(), sub)
198                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
199                                                    ab, "Directory", True)
200
201         self.keepdir = None
202
203     def reversemap(self, target):
204         p = super(ArvPathMapper, self).reversemap(target)
205         if p:
206             return p
207         elif target.startswith("keep:"):
208             return (target, target)
209         elif self.keepdir and target.startswith(self.keepdir):
210             kp = "keep:" + target[len(self.keepdir)+1:]
211             return (kp, kp)
212         else:
213             return None
214
215     def _get_intermediate_collection_info(self):
216             trash_time = None
217             if self.arvrunner.intermediate_output_ttl > 0:
218                 trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
219
220             current_container_uuid = None
221             try:
222                 current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
223                 current_container_uuid = current_container['uuid']
224             except ApiError as e:
225                 # Status code 404 just means we're not running in a container.
226                 if e.resp.status != 404:
227                     logger.info("Getting current container: %s", e)
228             props = {"type": "Intermediate",
229                           "container": current_container_uuid}
230
231             return {"name" : "Intermediate collection",
232                     "trash_at" : trash_time,
233                     "properties" : props}
234
235
236 class StagingPathMapper(PathMapper):
237     _follow_dirs = True
238
239     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
240         self.targets = set()
241         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
242
243     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
244         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
245         loc = obj["location"]
246         tgt = os.path.join(stagedir, obj["basename"])
247         basetgt, baseext = os.path.splitext(tgt)
248         n = 1
249         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
250             while tgt in self.targets:
251                 n += 1
252                 tgt = "%s_%i%s" % (basetgt, n, baseext)
253         self.targets.add(tgt)
254         if obj["class"] == "Directory":
255             if obj.get("writable"):
256                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
257             else:
258                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
259             if loc.startswith("_:") or self._follow_dirs:
260                 self.visitlisting(obj.get("listing", []), tgt, basedir)
261         elif obj["class"] == "File":
262             if loc in self._pathmap:
263                 return
264             if "contents" in obj and loc.startswith("_:"):
265                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
266             else:
267                 if copy or obj.get("writable"):
268                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
269                 else:
270                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
271                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
272
273
274 class VwdPathMapper(StagingPathMapper):
275     def setup(self, referenced_files, basedir):
276         # type: (List[Any], unicode) -> None
277
278         # Go through each file and set the target to its own directory along
279         # with any secondary files.
280         self.visitlisting(referenced_files, self.stagedir, basedir)
281
282         for path, (ab, tgt, type, staged) in self._pathmap.items():
283             if type in ("File", "Directory") and ab.startswith("keep:"):
284                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
285
286
287 class NoFollowPathMapper(StagingPathMapper):
288     _follow_dirs = False
289     def setup(self, referenced_files, basedir):
290         # type: (List[Any], unicode) -> None
291         self.visitlisting(referenced_files, self.stagedir, basedir)