18994: Remove debug print
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from .http import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False):
56         self.arvrunner = arvrunner
57         self.input_basedir = input_basedir
58         self.collection_pattern = collection_pattern
59         self.file_pattern = file_pattern
60         self.name = name
61         self.referenced_files = [r["location"] for r in referenced_files]
62         self.single_collection = single_collection
63         self.pdh_to_uuid = {}
64         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
65
66     def visit(self, srcobj, uploadfiles):
67         src = srcobj["location"]
68         if "#" in src:
69             src = src[:src.index("#")]
70
71         debug = logger.isEnabledFor(logging.DEBUG)
72
73         if isinstance(src, basestring) and src.startswith("keep:"):
74             if collection_pdh_pattern.match(src):
75                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
76                 if arvados_cwl.util.collectionUUID in srcobj:
77                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
78             elif not collection_uuid_pattern.match(src):
79                 with SourceLine(srcobj, "location", WorkflowException, debug):
80                     raise WorkflowException("Invalid keep reference '%s'" % src)
81
82         if src not in self._pathmap:
83             if src.startswith("file:"):
84                 # Local FS ref, may need to be uploaded or may be on keep
85                 # mount.
86                 ab = abspath(src, self.input_basedir)
87                 st = arvados.commands.run.statfile("", ab,
88                                                    fnPattern="keep:%s/%s",
89                                                    dirPattern="keep:%s/%s",
90                                                    raiseOSError=True)
91                 with SourceLine(srcobj, "location", WorkflowException, debug):
92                     if isinstance(st, arvados.commands.run.UploadFile):
93                         uploadfiles.add((src, ab, st))
94                     elif isinstance(st, arvados.commands.run.ArvFile):
95                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
96                     else:
97                         raise WorkflowException("Input file path '%s' is invalid" % st)
98             elif src.startswith("_:"):
99                 if srcobj["class"] == "File" and "contents" not in srcobj:
100                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
101                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
102                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
103             elif src.startswith("http:") or src.startswith("https:"):
104                 try:
105                     keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
106                     logger.info("%s is %s", src, keepref)
107                     self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
108                 except Exception as e:
109                     logger.warning(str(e))
110             else:
111                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
112
113         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
114             for l in srcobj.get("secondaryFiles", []):
115                 self.visit(l, uploadfiles)
116         with SourceLine(srcobj, "listing", WorkflowException, debug):
117             for l in srcobj.get("listing", []):
118                 self.visit(l, uploadfiles)
119
120     def addentry(self, obj, c, path, remap):
121         if obj["location"] in self._pathmap:
122             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
123             if srcpath == "":
124                 srcpath = "."
125             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
126             remap.append((obj["location"], path + "/" + obj["basename"]))
127             for l in obj.get("secondaryFiles", []):
128                 self.addentry(l, c, path, remap)
129         elif obj["class"] == "Directory":
130             for l in obj.get("listing", []):
131                 self.addentry(l, c, path + "/" + obj["basename"], remap)
132             remap.append((obj["location"], path + "/" + obj["basename"]))
133         elif obj["location"].startswith("_:") and "contents" in obj:
134             with c.open(path + "/" + obj["basename"], "w") as f:
135                 f.write(obj["contents"])
136             remap.append((obj["location"], path + "/" + obj["basename"]))
137         else:
138             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
139
140     def needs_new_collection(self, srcobj, prefix=""):
141         """Check if files need to be staged into a new collection.
142
143         If all the files are in the same collection and in the same
144         paths they would be staged to, return False.  Otherwise, a new
145         collection is needed with files copied/created in the
146         appropriate places.
147         """
148
149         loc = srcobj["location"]
150         if loc.startswith("_:"):
151             return True
152         if not prefix:
153             i = loc.rfind("/")
154             if i > -1:
155                 prefix = loc[:i+1]
156             else:
157                 prefix = loc+"/"
158
159         if loc != prefix+srcobj["basename"]:
160             return True
161
162         if srcobj["class"] == "File" and loc not in self._pathmap:
163             return True
164         for s in srcobj.get("secondaryFiles", []):
165             if self.needs_new_collection(s, prefix):
166                 return True
167         if srcobj.get("listing"):
168             prefix = "%s%s/" % (prefix, srcobj["basename"])
169             for l in srcobj["listing"]:
170                 if self.needs_new_collection(l, prefix):
171                     return True
172         return False
173
174     def setup(self, referenced_files, basedir):
175         # type: (List[Any], unicode) -> None
176         uploadfiles = set()
177
178         collection = None
179         if self.single_collection:
180             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
181                                                        keep_client=self.arvrunner.keep_client,
182                                                        num_retries=self.arvrunner.num_retries)
183
184         for srcobj in referenced_files:
185             self.visit(srcobj, uploadfiles)
186
187         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
188                                          self.arvrunner.api,
189                                          dry_run=False,
190                                          num_retries=self.arvrunner.num_retries,
191                                          fnPattern="keep:%s/%s",
192                                          name=self.name,
193                                          project=self.arvrunner.project_uuid,
194                                          collection=collection,
195                                          packed=False)
196
197         for src, ab, st in uploadfiles:
198             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
199                                            "Directory" if os.path.isdir(ab) else "File", True)
200
201         for srcobj in referenced_files:
202             remap = []
203             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
204                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
205                                                   keep_client=self.arvrunner.keep_client,
206                                                   num_retries=self.arvrunner.num_retries)
207                 for l in srcobj.get("listing", []):
208                     self.addentry(l, c, ".", remap)
209
210                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
211                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
212
213                 c.save_new(name=info["name"],
214                            owner_uuid=self.arvrunner.project_uuid,
215                            ensure_unique_name=True,
216                            trash_at=info["trash_at"],
217                            properties=info["properties"])
218
219                 ab = self.collection_pattern % c.portable_data_hash()
220                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
221             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
222                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
223                                                   keep_client=self.arvrunner.keep_client,
224                                                   num_retries=self.arvrunner.num_retries                                                  )
225                 self.addentry(srcobj, c, ".", remap)
226
227                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
228                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
229
230                 c.save_new(name=info["name"],
231                            owner_uuid=self.arvrunner.project_uuid,
232                            ensure_unique_name=True,
233                            trash_at=info["trash_at"],
234                            properties=info["properties"])
235
236                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
237                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
238                                                               ab, "File", True)
239                 if srcobj.get("secondaryFiles"):
240                     ab = self.collection_pattern % c.portable_data_hash()
241                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
242
243             if remap:
244                 for loc, sub in remap:
245                     # subdirs start with "./", strip it off
246                     if sub.startswith("./"):
247                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
248                     else:
249                         ab = self.file_pattern % (c.portable_data_hash(), sub)
250                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
251                                                    ab, "Directory", True)
252
253         self.keepdir = None
254
255     def reversemap(self, target):
256         p = super(ArvPathMapper, self).reversemap(target)
257         if p:
258             return p
259         elif target.startswith("keep:"):
260             return (target, target)
261         elif self.keepdir and target.startswith(self.keepdir):
262             kp = "keep:" + target[len(self.keepdir)+1:]
263             return (kp, kp)
264         else:
265             return None
266
267
268 class StagingPathMapper(PathMapper):
269     # Note that StagingPathMapper internally maps files from target to source.
270     # Specifically, the 'self._pathmap' dict keys are the target location and the
271     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
272     # as the file identifier. This makes it possible to map an input file to multiple
273     # target directories. The exception is for file literals, which store the contents of
274     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
275
276     _follow_dirs = True
277
278     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
279         self.targets = set()
280         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
281
282     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
283         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
284         loc = obj["location"]
285         stagedir = obj.get("dirname") or stagedir
286         tgt = os.path.join(stagedir, obj["basename"])
287         basetgt, baseext = os.path.splitext(tgt)
288
289         def targetExists():
290             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
291         def literalTargetExists():
292             return tgt in self.targets and "contents" in obj
293
294         n = 1
295         if targetExists() or literalTargetExists():
296             while tgt in self.targets:
297                 n += 1
298                 tgt = "%s_%i%s" % (basetgt, n, baseext)
299         self.targets.add(tgt)
300         if obj["class"] == "Directory":
301             if obj.get("writable"):
302                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
303             else:
304                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
305             if loc.startswith("_:") or self._follow_dirs:
306                 self.visitlisting(obj.get("listing", []), tgt, basedir)
307         elif obj["class"] == "File":
308             if tgt in self._pathmap:
309                 return
310             if "contents" in obj and loc.startswith("_:"):
311                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
312             else:
313                 if copy or obj.get("writable"):
314                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
315                 else:
316                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
317                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
318
319     def mapper(self, src):  # type: (Text) -> MapperEnt.
320         # Overridden to maintain the use case of mapping by source (identifier) to
321         # target regardless of how the map is structured interally.
322         def getMapperEnt(src):
323             for k,v in viewitems(self._pathmap):
324                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
325                     return v
326
327         if u"#" in src:
328             i = src.index(u"#")
329             v = getMapperEnt(src[i:])
330             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
331         return getMapperEnt(src)
332
333
334 class VwdPathMapper(StagingPathMapper):
335     def setup(self, referenced_files, basedir):
336         # type: (List[Any], unicode) -> None
337
338         # Go through each file and set the target to its own directory along
339         # with any secondary files.
340         self.visitlisting(referenced_files, self.stagedir, basedir)
341
342         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
343             if type in ("File", "Directory") and ab.startswith("keep:"):
344                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
345
346
347 class NoFollowPathMapper(StagingPathMapper):
348     _follow_dirs = False
349     def setup(self, referenced_files, basedir):
350         # type: (List[Any], unicode) -> None
351         self.visitlisting(referenced_files, self.stagedir, basedir)