13547: Merge branch 'master' into 13547-respect-insecure-flag-when-talking-ssl-to...
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados.commands.run
12 import arvados.collection
13
14 from schema_salad.sourceline import SourceLine
15
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
18
19 from .http import http_to_keep
20
21 logger = logging.getLogger('arvados.cwl-runner')
22
23 def trim_listing(obj):
24     """Remove 'listing' field from Directory objects that are keep references.
25
26     When Directory objects represent Keep references, it is redundant and
27     potentially very expensive to pass fully enumerated Directory objects
28     between instances of cwl-runner (e.g. a submitting a job, or using the
29     RunInSingleContainer feature), so delete the 'listing' field when it is
30     safe to do so.
31
32     """
33
34     if obj.get("location", "").startswith("keep:") and "listing" in obj:
35         del obj["listing"]
36
37
38 class ArvPathMapper(PathMapper):
39     """Convert container-local paths to and from Keep collection ids."""
40
41     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
42     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
43
44     def __init__(self, arvrunner, referenced_files, input_basedir,
45                  collection_pattern, file_pattern, name=None, single_collection=False):
46         self.arvrunner = arvrunner
47         self.input_basedir = input_basedir
48         self.collection_pattern = collection_pattern
49         self.file_pattern = file_pattern
50         self.name = name
51         self.referenced_files = [r["location"] for r in referenced_files]
52         self.single_collection = single_collection
53         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
54
55     def visit(self, srcobj, uploadfiles):
56         src = srcobj["location"]
57         if "#" in src:
58             src = src[:src.index("#")]
59
60         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
61             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
62
63         debug = logger.isEnabledFor(logging.DEBUG)
64
65         if src not in self._pathmap:
66             if src.startswith("file:"):
67                 # Local FS ref, may need to be uploaded or may be on keep
68                 # mount.
69                 ab = abspath(src, self.input_basedir)
70                 st = arvados.commands.run.statfile("", ab,
71                                                    fnPattern="keep:%s/%s",
72                                                    dirPattern="keep:%s/%s",
73                                                    raiseOSError=True)
74                 with SourceLine(srcobj, "location", WorkflowException, debug):
75                     if isinstance(st, arvados.commands.run.UploadFile):
76                         uploadfiles.add((src, ab, st))
77                     elif isinstance(st, arvados.commands.run.ArvFile):
78                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
79                     else:
80                         raise WorkflowException("Input file path '%s' is invalid" % st)
81             elif src.startswith("_:"):
82                 if srcobj["class"] == "File" and "contents" not in srcobj:
83                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
84                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
85                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
86             elif src.startswith("http:") or src.startswith("https:"):
87                 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
88                 logger.info("%s is %s", src, keepref)
89                 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
90             else:
91                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
92
93         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
94             for l in srcobj.get("secondaryFiles", []):
95                 self.visit(l, uploadfiles)
96         with SourceLine(srcobj, "listing", WorkflowException, debug):
97             for l in srcobj.get("listing", []):
98                 self.visit(l, uploadfiles)
99
100     def addentry(self, obj, c, path, remap):
101         if obj["location"] in self._pathmap:
102             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
103             if srcpath == "":
104                 srcpath = "."
105             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
106             remap.append((obj["location"], path + "/" + obj["basename"]))
107             for l in obj.get("secondaryFiles", []):
108                 self.addentry(l, c, path, remap)
109         elif obj["class"] == "Directory":
110             for l in obj.get("listing", []):
111                 self.addentry(l, c, path + "/" + obj["basename"], remap)
112             remap.append((obj["location"], path + "/" + obj["basename"]))
113         elif obj["location"].startswith("_:") and "contents" in obj:
114             with c.open(path + "/" + obj["basename"], "w") as f:
115                 f.write(obj["contents"].encode("utf-8"))
116             remap.append((obj["location"], path + "/" + obj["basename"]))
117         else:
118             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
119
120     def setup(self, referenced_files, basedir):
121         # type: (List[Any], unicode) -> None
122         uploadfiles = set()
123
124         collection = None
125         if self.single_collection:
126             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
127                                                        keep_client=self.arvrunner.keep_client,
128                                                        num_retries=self.arvrunner.num_retries)
129
130         for srcobj in referenced_files:
131             self.visit(srcobj, uploadfiles)
132
133         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
134                                          self.arvrunner.api,
135                                          dry_run=False,
136                                          num_retries=self.arvrunner.num_retries,
137                                          fnPattern="keep:%s/%s",
138                                          name=self.name,
139                                          project=self.arvrunner.project_uuid,
140                                          collection=collection,
141                                          packed=False)
142
143         for src, ab, st in uploadfiles:
144             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
145                                            "Directory" if os.path.isdir(ab) else "File", True)
146
147         for srcobj in referenced_files:
148             remap = []
149             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
150                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
151                                                   keep_client=self.arvrunner.keep_client,
152                                                   num_retries=self.arvrunner.num_retries)
153                 for l in srcobj.get("listing", []):
154                     self.addentry(l, c, ".", remap)
155
156                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
157                 if not check["items"]:
158                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
159
160                 ab = self.collection_pattern % c.portable_data_hash()
161                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
162             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
163                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
164
165                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
166                                                   keep_client=self.arvrunner.keep_client,
167                                                   num_retries=self.arvrunner.num_retries                                                  )
168                 self.addentry(srcobj, c, ".", remap)
169
170                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
171                 if not check["items"]:
172                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
173
174                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
175                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
176                                                               ab, "File", True)
177                 if srcobj.get("secondaryFiles"):
178                     ab = self.collection_pattern % c.portable_data_hash()
179                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
180
181             if remap:
182                 for loc, sub in remap:
183                     # subdirs start with "./", strip it off
184                     if sub.startswith("./"):
185                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
186                     else:
187                         ab = self.file_pattern % (c.portable_data_hash(), sub)
188                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
189                                                    ab, "Directory", True)
190
191         self.keepdir = None
192
193     def reversemap(self, target):
194         p = super(ArvPathMapper, self).reversemap(target)
195         if p:
196             return p
197         elif target.startswith("keep:"):
198             return (target, target)
199         elif self.keepdir and target.startswith(self.keepdir):
200             kp = "keep:" + target[len(self.keepdir)+1:]
201             return (kp, kp)
202         else:
203             return None
204
205 class StagingPathMapper(PathMapper):
206     _follow_dirs = True
207
208     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
209         self.targets = set()
210         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
211
212     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
213         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
214         loc = obj["location"]
215         tgt = os.path.join(stagedir, obj["basename"])
216         basetgt, baseext = os.path.splitext(tgt)
217         n = 1
218         if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
219             while tgt in self.targets:
220                 n += 1
221                 tgt = "%s_%i%s" % (basetgt, n, baseext)
222         self.targets.add(tgt)
223         if obj["class"] == "Directory":
224             if obj.get("writable"):
225                 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
226             else:
227                 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
228             if loc.startswith("_:") or self._follow_dirs:
229                 self.visitlisting(obj.get("listing", []), tgt, basedir)
230         elif obj["class"] == "File":
231             if loc in self._pathmap:
232                 return
233             if "contents" in obj and loc.startswith("_:"):
234                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
235             else:
236                 if copy or obj.get("writable"):
237                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
238                 else:
239                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
240                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
241
242
243 class VwdPathMapper(StagingPathMapper):
244     def setup(self, referenced_files, basedir):
245         # type: (List[Any], unicode) -> None
246
247         # Go through each file and set the target to its own directory along
248         # with any secondary files.
249         self.visitlisting(referenced_files, self.stagedir, basedir)
250
251         for path, (ab, tgt, type, staged) in self._pathmap.items():
252             if type in ("File", "Directory") and ab.startswith("keep:"):
253                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
254
255
256 class NoFollowPathMapper(StagingPathMapper):
257     _follow_dirs = False
258     def setup(self, referenced_files, basedir):
259         # type: (List[Any], unicode) -> None
260         self.visitlisting(referenced_files, self.stagedir, basedir)