18994: Remove debug prints
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False,
56                  optional_deps=None):
57         self.arvrunner = arvrunner
58         self.input_basedir = input_basedir
59         self.collection_pattern = collection_pattern
60         self.file_pattern = file_pattern
61         self.name = name
62         self.referenced_files = [r["location"] for r in referenced_files]
63         self.single_collection = single_collection
64         self.pdh_to_uuid = {}
65         self.optional_deps = optional_deps or []
66         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
67
68     def visit(self, srcobj, uploadfiles):
69         src = srcobj["location"]
70         if "#" in src:
71             src = src[:src.index("#")]
72
73         debug = logger.isEnabledFor(logging.DEBUG)
74
75         if isinstance(src, basestring) and src.startswith("keep:"):
76             if collection_pdh_pattern.match(src):
77                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
78
79                 if arvados_cwl.util.collectionUUID in srcobj:
80                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81             elif not collection_uuid_pattern.match(src):
82                 with SourceLine(srcobj, "location", WorkflowException, debug):
83                     raise WorkflowException("Invalid keep reference '%s'" % src)
84
85         if src not in self._pathmap:
86             if src.startswith("file:"):
87                 # Local FS ref, may need to be uploaded or may be on keep
88                 # mount.
89                 ab = abspath(src, self.input_basedir)
90                 st = arvados.commands.run.statfile("", ab,
91                                                    fnPattern="keep:%s/%s",
92                                                    dirPattern="keep:%s/%s",
93                                                    raiseOSError=True)
94                 with SourceLine(srcobj, "location", WorkflowException, debug):
95                     if isinstance(st, arvados.commands.run.UploadFile):
96                         uploadfiles.add((src, ab, st))
97                     elif isinstance(st, arvados.commands.run.ArvFile):
98                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
99                     else:
100                         raise WorkflowException("Input file path '%s' is invalid" % st)
101             elif src.startswith("_:"):
102                 if srcobj["class"] == "File" and "contents" not in srcobj:
103                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
104                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106             elif src.startswith("http:") or src.startswith("https:"):
107                 try:
108                     keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
109                     logger.info("%s is %s", src, keepref)
110                     self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
111                 except Exception as e:
112                     logger.warning(str(e))
113             else:
114                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
115
116         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
117             for l in srcobj.get("secondaryFiles", []):
118                 self.visit(l, uploadfiles)
119         with SourceLine(srcobj, "listing", WorkflowException, debug):
120             for l in srcobj.get("listing", []):
121                 self.visit(l, uploadfiles)
122
123     def addentry(self, obj, c, path, remap):
124         if obj["location"] in self._pathmap:
125             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
126             if srcpath == "":
127                 srcpath = "."
128             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
129             remap.append((obj["location"], path + "/" + obj["basename"]))
130             for l in obj.get("secondaryFiles", []):
131                 self.addentry(l, c, path, remap)
132         elif obj["class"] == "Directory":
133             for l in obj.get("listing", []):
134                 self.addentry(l, c, path + "/" + obj["basename"], remap)
135             remap.append((obj["location"], path + "/" + obj["basename"]))
136         elif obj["location"].startswith("_:") and "contents" in obj:
137             with c.open(path + "/" + obj["basename"], "w") as f:
138                 f.write(obj["contents"])
139             remap.append((obj["location"], path + "/" + obj["basename"]))
140         else:
141             for opt in self.optional_deps:
142                 if obj["location"] == opt["location"]:
143                     return
144             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
145
146     def needs_new_collection(self, srcobj, prefix=""):
147         """Check if files need to be staged into a new collection.
148
149         If all the files are in the same collection and in the same
150         paths they would be staged to, return False.  Otherwise, a new
151         collection is needed with files copied/created in the
152         appropriate places.
153         """
154
155         loc = srcobj["location"]
156         if loc.startswith("_:"):
157             return True
158
159         if not prefix:
160             i = loc.rfind("/")
161             if i > -1:
162                 prefix = loc[:i+1]
163                 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
164             else:
165                 prefix = loc+"/"
166                 suffix = ""
167
168         if prefix+suffix != prefix+urllib.parse.quote(srcobj["basename"], "/+@"):
169             return True
170
171         if srcobj["class"] == "File" and loc not in self._pathmap:
172             return True
173         for s in srcobj.get("secondaryFiles", []):
174             if self.needs_new_collection(s, prefix):
175                 return True
176         if srcobj.get("listing"):
177             prefix = "%s%s/" % (prefix, srcobj["basename"])
178             for l in srcobj["listing"]:
179                 if self.needs_new_collection(l, prefix):
180                     return True
181         return False
182
183     def setup(self, referenced_files, basedir):
184         # type: (List[Any], unicode) -> None
185         uploadfiles = set()
186
187         collection = None
188         if self.single_collection:
189             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
190                                                        keep_client=self.arvrunner.keep_client,
191                                                        num_retries=self.arvrunner.num_retries)
192
193         for srcobj in referenced_files:
194             self.visit(srcobj, uploadfiles)
195
196         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
197                                          self.arvrunner.api,
198                                          dry_run=False,
199                                          num_retries=self.arvrunner.num_retries,
200                                          fnPattern="keep:%s/%s",
201                                          name=self.name,
202                                          project=self.arvrunner.project_uuid,
203                                          collection=collection,
204                                          packed=False)
205
206         for src, ab, st in uploadfiles:
207             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
208                                            "Directory" if os.path.isdir(ab) else "File", True)
209
210         for srcobj in referenced_files:
211             remap = []
212             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
213                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
214                                                   keep_client=self.arvrunner.keep_client,
215                                                   num_retries=self.arvrunner.num_retries)
216                 for l in srcobj.get("listing", []):
217                     self.addentry(l, c, ".", remap)
218
219                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
220                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
221
222                 c.save_new(name=info["name"],
223                            owner_uuid=self.arvrunner.project_uuid,
224                            ensure_unique_name=True,
225                            trash_at=info["trash_at"],
226                            properties=info["properties"])
227
228                 ab = self.collection_pattern % c.portable_data_hash()
229                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
230             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
231                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
232                                                   keep_client=self.arvrunner.keep_client,
233                                                   num_retries=self.arvrunner.num_retries)
234                 self.addentry(srcobj, c, ".", remap)
235
236                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
237                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
238
239                 c.save_new(name=info["name"],
240                            owner_uuid=self.arvrunner.project_uuid,
241                            ensure_unique_name=True,
242                            trash_at=info["trash_at"],
243                            properties=info["properties"])
244
245                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
246                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
247                                                               ab, "File", True)
248                 if srcobj.get("secondaryFiles"):
249                     ab = self.collection_pattern % c.portable_data_hash()
250                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
251
252             if remap:
253                 for loc, sub in remap:
254                     # subdirs start with "./", strip it off
255                     if sub.startswith("./"):
256                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
257                     else:
258                         ab = self.file_pattern % (c.portable_data_hash(), sub)
259                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
260                                                    ab, "Directory", True)
261
262         self.keepdir = None
263
264     def reversemap(self, target):
265         p = super(ArvPathMapper, self).reversemap(target)
266         if p:
267             return p
268         elif target.startswith("keep:"):
269             return (target, target)
270         elif self.keepdir and target.startswith(self.keepdir):
271             kp = "keep:" + target[len(self.keepdir)+1:]
272             return (kp, kp)
273         else:
274             return None
275
276
277 class StagingPathMapper(PathMapper):
278     # Note that StagingPathMapper internally maps files from target to source.
279     # Specifically, the 'self._pathmap' dict keys are the target location and the
280     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
281     # as the file identifier. This makes it possible to map an input file to multiple
282     # target directories. The exception is for file literals, which store the contents of
283     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
284
285     _follow_dirs = True
286
287     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
288         self.targets = set()
289         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
290
291     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
292         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
293         loc = obj["location"]
294         stagedir = obj.get("dirname") or stagedir
295         tgt = os.path.join(stagedir, obj["basename"])
296         basetgt, baseext = os.path.splitext(tgt)
297
298         def targetExists():
299             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
300         def literalTargetExists():
301             return tgt in self.targets and "contents" in obj
302
303         n = 1
304         if targetExists() or literalTargetExists():
305             while tgt in self.targets:
306                 n += 1
307                 tgt = "%s_%i%s" % (basetgt, n, baseext)
308         self.targets.add(tgt)
309         if obj["class"] == "Directory":
310             if obj.get("writable"):
311                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
312             else:
313                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
314             if loc.startswith("_:") or self._follow_dirs:
315                 self.visitlisting(obj.get("listing", []), tgt, basedir)
316         elif obj["class"] == "File":
317             if tgt in self._pathmap:
318                 return
319             if "contents" in obj and loc.startswith("_:"):
320                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
321             else:
322                 if copy or obj.get("writable"):
323                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
324                 else:
325                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
326                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
327
328     def mapper(self, src):  # type: (Text) -> MapperEnt.
329         # Overridden to maintain the use case of mapping by source (identifier) to
330         # target regardless of how the map is structured interally.
331         def getMapperEnt(src):
332             for k,v in viewitems(self._pathmap):
333                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
334                     return v
335
336         if u"#" in src:
337             i = src.index(u"#")
338             v = getMapperEnt(src[i:])
339             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
340         return getMapperEnt(src)
341
342
343 class VwdPathMapper(StagingPathMapper):
344     def setup(self, referenced_files, basedir):
345         # type: (List[Any], unicode) -> None
346
347         # Go through each file and set the target to its own directory along
348         # with any secondary files.
349         self.visitlisting(referenced_files, self.stagedir, basedir)
350
351         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
352             if type in ("File", "Directory") and ab.startswith("keep:"):
353                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
354
355
356 class NoFollowPathMapper(StagingPathMapper):
357     _follow_dirs = False
358     def setup(self, referenced_files, basedir):
359         # type: (List[Any], unicode) -> None
360         self.visitlisting(referenced_files, self.stagedir, basedir)