19744: Report steps with low utilization at end of workflow
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
10
11 import re
12 import logging
13 import uuid
14 import os
15 import urllib.request, urllib.parse, urllib.error
16
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
20
21 from schema_salad.sourceline import SourceLine
22
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
28
29 from arvados.http_to_keep import http_to_keep
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 def trim_listing(obj):
34     """Remove 'listing' field from Directory objects that are keep references.
35
36     When Directory objects represent Keep references, it is redundant and
37     potentially very expensive to pass fully enumerated Directory objects
38     between instances of cwl-runner (e.g. a submitting a job, or using the
39     RunInSingleContainer feature), so delete the 'listing' field when it is
40     safe to do so.
41
42     """
43
44     if obj.get("location", "").startswith("keep:") and "listing" in obj:
45         del obj["listing"]
46
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
50
51 class ArvPathMapper(PathMapper):
52     """Convert container-local paths to and from Keep collection ids."""
53
54     def __init__(self, arvrunner, referenced_files, input_basedir,
55                  collection_pattern, file_pattern, name=None, single_collection=False,
56                  optional_deps=None):
57         self.arvrunner = arvrunner
58         self.input_basedir = input_basedir
59         self.collection_pattern = collection_pattern
60         self.file_pattern = file_pattern
61         self.name = name
62         self.referenced_files = [r["location"] for r in referenced_files]
63         self.single_collection = single_collection
64         self.pdh_to_uuid = {}
65         self.optional_deps = optional_deps or []
66         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
67
68     def visit(self, srcobj, uploadfiles):
69         src = srcobj["location"]
70         if "#" in src:
71             src = src[:src.index("#")]
72
73         debug = logger.isEnabledFor(logging.DEBUG)
74
75         if isinstance(src, basestring) and src.startswith("keep:"):
76             if collection_pdh_pattern.match(src):
77                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
78
79                 if arvados_cwl.util.collectionUUID in srcobj:
80                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81             elif not collection_uuid_pattern.match(src):
82                 with SourceLine(srcobj, "location", WorkflowException, debug):
83                     raise WorkflowException("Invalid keep reference '%s'" % src)
84
85         if src not in self._pathmap:
86             if src.startswith("file:"):
87                 # Local FS ref, may need to be uploaded or may be on keep
88                 # mount.
89                 ab = abspath(src, self.input_basedir)
90                 st = arvados.commands.run.statfile("", ab,
91                                                    fnPattern="keep:%s/%s",
92                                                    dirPattern="keep:%s/%s",
93                                                    raiseOSError=True)
94                 with SourceLine(srcobj, "location", WorkflowException, debug):
95                     if isinstance(st, arvados.commands.run.UploadFile):
96                         uploadfiles.add((src, ab, st))
97                     elif isinstance(st, arvados.commands.run.ArvFile):
98                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
99                     else:
100                         raise WorkflowException("Input file path '%s' is invalid" % st)
101             elif src.startswith("_:"):
102                 if srcobj["class"] == "File" and "contents" not in srcobj:
103                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
104                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106             elif src.startswith("http:") or src.startswith("https:"):
107                 try:
108                     if self.arvrunner.defer_downloads:
109                         # passthrough, we'll download it later.
110                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
111                     else:
112                         results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
113                                                               varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
114                                                               prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
115                         keepref = "keep:%s/%s" % (results[0], results[1])
116                         logger.info("%s is %s", src, keepref)
117                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
118                 except Exception as e:
119                     logger.warning("Download error: %s", e)
120             else:
121                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
122
123         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
124             for l in srcobj.get("secondaryFiles", []):
125                 self.visit(l, uploadfiles)
126         with SourceLine(srcobj, "listing", WorkflowException, debug):
127             for l in srcobj.get("listing", []):
128                 self.visit(l, uploadfiles)
129
130     def addentry(self, obj, c, path, remap):
131         if obj["location"] in self._pathmap:
132             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
133             if srcpath == "":
134                 srcpath = "."
135             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
136             remap.append((obj["location"], path + "/" + obj["basename"]))
137             for l in obj.get("secondaryFiles", []):
138                 self.addentry(l, c, path, remap)
139         elif obj["class"] == "Directory":
140             for l in obj.get("listing", []):
141                 self.addentry(l, c, path + "/" + obj["basename"], remap)
142             remap.append((obj["location"], path + "/" + obj["basename"]))
143         elif obj["location"].startswith("_:") and "contents" in obj:
144             with c.open(path + "/" + obj["basename"], "w") as f:
145                 f.write(obj["contents"])
146             remap.append((obj["location"], path + "/" + obj["basename"]))
147         else:
148             for opt in self.optional_deps:
149                 if obj["location"] == opt["location"]:
150                     return
151             raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
152
153     def needs_new_collection(self, srcobj, prefix=""):
154         """Check if files need to be staged into a new collection.
155
156         If all the files are in the same collection and in the same
157         paths they would be staged to, return False.  Otherwise, a new
158         collection is needed with files copied/created in the
159         appropriate places.
160         """
161
162         loc = srcobj["location"]
163         if loc.startswith("_:"):
164             return True
165
166         if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
167             return False
168
169         i = loc.rfind("/")
170         if i > -1:
171             loc_prefix = loc[:i+1]
172             if not prefix:
173                 prefix = loc_prefix
174             # quote/unquote to ensure consistent quoting
175             suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
176         else:
177             # no '/' found
178             loc_prefix = loc+"/"
179             prefix = loc+"/"
180             suffix = ""
181
182         if prefix != loc_prefix:
183             return True
184
185         if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
186             return True
187
188         if srcobj["class"] == "File" and loc not in self._pathmap:
189             return True
190         for s in srcobj.get("secondaryFiles", []):
191             if self.needs_new_collection(s, prefix):
192                 return True
193         if srcobj.get("listing"):
194             prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
195             for l in srcobj["listing"]:
196                 if self.needs_new_collection(l, prefix):
197                     return True
198         return False
199
200     def setup(self, referenced_files, basedir):
201         # type: (List[Any], unicode) -> None
202         uploadfiles = set()
203
204         collection = None
205         if self.single_collection:
206             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
207                                                        keep_client=self.arvrunner.keep_client,
208                                                        num_retries=self.arvrunner.num_retries)
209
210         for srcobj in referenced_files:
211             self.visit(srcobj, uploadfiles)
212
213         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
214                                          self.arvrunner.api,
215                                          dry_run=False,
216                                          num_retries=self.arvrunner.num_retries,
217                                          fnPattern="keep:%s/%s",
218                                          name=self.name,
219                                          project=self.arvrunner.project_uuid,
220                                          collection=collection,
221                                          packed=False)
222
223         for src, ab, st in uploadfiles:
224             self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
225                                            "Directory" if os.path.isdir(ab) else "File", True)
226
227         for srcobj in referenced_files:
228             remap = []
229             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
230                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
231                                                   keep_client=self.arvrunner.keep_client,
232                                                   num_retries=self.arvrunner.num_retries)
233                 for l in srcobj.get("listing", []):
234                     self.addentry(l, c, ".", remap)
235
236                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
237                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
238
239                 c.save_new(name=info["name"],
240                            owner_uuid=self.arvrunner.project_uuid,
241                            ensure_unique_name=True,
242                            trash_at=info["trash_at"],
243                            properties=info["properties"])
244
245                 ab = self.collection_pattern % c.portable_data_hash()
246                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
247             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
248                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
249                                                   keep_client=self.arvrunner.keep_client,
250                                                   num_retries=self.arvrunner.num_retries)
251                 self.addentry(srcobj, c, ".", remap)
252
253                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
254                 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
255
256                 c.save_new(name=info["name"],
257                            owner_uuid=self.arvrunner.project_uuid,
258                            ensure_unique_name=True,
259                            trash_at=info["trash_at"],
260                            properties=info["properties"])
261
262                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
263                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
264                                                               ab, "File", True)
265                 if srcobj.get("secondaryFiles"):
266                     ab = self.collection_pattern % c.portable_data_hash()
267                     self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
268
269             if remap:
270                 for loc, sub in remap:
271                     # subdirs start with "./", strip it off
272                     if sub.startswith("./"):
273                         ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
274                     else:
275                         ab = self.file_pattern % (c.portable_data_hash(), sub)
276                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
277                                                    ab, "Directory", True)
278
279         self.keepdir = None
280
281     def reversemap(self, target):
282         p = super(ArvPathMapper, self).reversemap(target)
283         if p:
284             return p
285         elif target.startswith("keep:"):
286             return (target, target)
287         elif self.keepdir and target.startswith(self.keepdir):
288             kp = "keep:" + target[len(self.keepdir)+1:]
289             return (kp, kp)
290         else:
291             return None
292
293
294 class StagingPathMapper(PathMapper):
295     # Note that StagingPathMapper internally maps files from target to source.
296     # Specifically, the 'self._pathmap' dict keys are the target location and the
297     # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
298     # as the file identifier. This makes it possible to map an input file to multiple
299     # target directories. The exception is for file literals, which store the contents of
300     # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
301
302     _follow_dirs = True
303
304     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
305         self.targets = set()
306         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
307
308     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
309         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
310         loc = obj["location"]
311         stagedir = obj.get("dirname") or stagedir
312         tgt = os.path.join(stagedir, obj["basename"])
313         basetgt, baseext = os.path.splitext(tgt)
314
315         def targetExists():
316             return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
317         def literalTargetExists():
318             return tgt in self.targets and "contents" in obj
319
320         n = 1
321         if targetExists() or literalTargetExists():
322             while tgt in self.targets:
323                 n += 1
324                 tgt = "%s_%i%s" % (basetgt, n, baseext)
325         self.targets.add(tgt)
326         if obj["class"] == "Directory":
327             if obj.get("writable"):
328                 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
329             else:
330                 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
331             if loc.startswith("_:") or self._follow_dirs:
332                 self.visitlisting(obj.get("listing", []), tgt, basedir)
333         elif obj["class"] == "File":
334             if tgt in self._pathmap:
335                 return
336             if "contents" in obj and loc.startswith("_:"):
337                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
338             else:
339                 if copy or obj.get("writable"):
340                     self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
341                 else:
342                     self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
343                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
344
345     def mapper(self, src):  # type: (Text) -> MapperEnt.
346         # Overridden to maintain the use case of mapping by source (identifier) to
347         # target regardless of how the map is structured interally.
348         def getMapperEnt(src):
349             for k,v in viewitems(self._pathmap):
350                 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
351                     return v
352
353         if u"#" in src:
354             i = src.index(u"#")
355             v = getMapperEnt(src[i:])
356             return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
357         return getMapperEnt(src)
358
359
360 class VwdPathMapper(StagingPathMapper):
361     def setup(self, referenced_files, basedir):
362         # type: (List[Any], unicode) -> None
363
364         # Go through each file and set the target to its own directory along
365         # with any secondary files.
366         self.visitlisting(referenced_files, self.stagedir, basedir)
367
368         for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
369             if type in ("File", "Directory") and ab.startswith("keep:"):
370                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
371
372
373 class NoFollowPathMapper(StagingPathMapper):
374     _follow_dirs = False
375     def setup(self, referenced_files, basedir):
376         # type: (List[Any], unicode) -> None
377         self.visitlisting(referenced_files, self.stagedir, basedir)