1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
9 import urllib.request, urllib.parse, urllib.error
11 import arvados_cwl.util
12 import arvados.commands.run
13 import arvados.collection
15 from schema_salad.sourceline import SourceLine
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt
19 from cwltool.utils import adjustFileObjs, adjustDirObjs
20 from cwltool.stdfsaccess import abspath
21 from cwltool.workflow import WorkflowException
23 from arvados.http_to_keep import http_to_keep
25 logger = logging.getLogger('arvados.cwl-runner')
27 def trim_listing(obj):
28 """Remove 'listing' field from Directory objects that are keep references.
30 When Directory objects represent Keep references, it is redundant and
31 potentially very expensive to pass fully enumerated Directory objects
32 between instances of cwl-runner (e.g. a submitting a job, or using the
33 RunInSingleContainer feature), so delete the 'listing' field when it is
38 if obj.get("location", "").startswith("keep:") and "listing" in obj:
41 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
42 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
43 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
45 class ArvPathMapper(PathMapper):
46 """Convert container-local paths to and from Keep collection ids."""
48 def __init__(self, arvrunner, referenced_files, input_basedir,
49 collection_pattern, file_pattern, name=None, single_collection=False,
51 self.arvrunner = arvrunner
52 self.input_basedir = input_basedir
53 self.collection_pattern = collection_pattern
54 self.file_pattern = file_pattern
56 self.referenced_files = [r["location"] for r in referenced_files]
57 self.single_collection = single_collection
59 self.optional_deps = optional_deps or []
60 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
62 def visit(self, srcobj, uploadfiles):
63 src = srcobj["location"]
65 src = src[:src.index("#")]
67 debug = logger.isEnabledFor(logging.DEBUG)
69 if isinstance(src, str) and src.startswith("keep:"):
70 if collection_pdh_pattern.match(src):
71 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
73 if arvados_cwl.util.collectionUUID in srcobj:
74 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
75 elif not collection_uuid_pattern.match(src):
76 with SourceLine(srcobj, "location", WorkflowException, debug):
77 raise WorkflowException("Invalid keep reference '%s'" % src)
79 if src not in self._pathmap:
80 if src.startswith("file:"):
81 # Local FS ref, may need to be uploaded or may be on keep
83 ab = abspath(src, self.input_basedir)
84 st = arvados.commands.run.statfile("", ab,
85 fnPattern="keep:%s/%s",
86 dirPattern="keep:%s/%s",
88 with SourceLine(srcobj, "location", WorkflowException, debug):
89 if isinstance(st, arvados.commands.run.UploadFile):
90 uploadfiles.add((src, ab, st))
91 elif isinstance(st, arvados.commands.run.ArvFile):
92 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
94 raise WorkflowException("Input file path '%s' is invalid" % st)
95 elif src.startswith("_:"):
96 if srcobj["class"] == "File" and "contents" not in srcobj:
97 raise WorkflowException("File literal '%s' is missing `contents`" % src)
98 if srcobj["class"] == "Directory" and "listing" not in srcobj:
99 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
100 elif src.startswith("http:") or src.startswith("https:"):
102 if self.arvrunner.defer_downloads:
103 # passthrough, we'll download it later.
104 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
106 results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
107 varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
108 prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
109 keepref = "keep:%s/%s" % (results[0], results[1])
110 logger.info("%s is %s", src, keepref)
111 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
112 except Exception as e:
113 logger.warning("Download error: %s", e)
115 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
117 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
118 for l in srcobj.get("secondaryFiles", []):
119 self.visit(l, uploadfiles)
120 with SourceLine(srcobj, "listing", WorkflowException, debug):
121 for l in srcobj.get("listing", []):
122 self.visit(l, uploadfiles)
124 def addentry(self, obj, c, path, remap):
125 if obj["location"] in self._pathmap:
126 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
129 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
130 remap.append((obj["location"], path + "/" + obj["basename"]))
131 for l in obj.get("secondaryFiles", []):
132 self.addentry(l, c, path, remap)
133 elif obj["class"] == "Directory":
134 for l in obj.get("listing", []):
135 self.addentry(l, c, path + "/" + obj["basename"], remap)
136 remap.append((obj["location"], path + "/" + obj["basename"]))
137 elif obj["location"].startswith("_:") and "contents" in obj:
138 with c.open(path + "/" + obj["basename"], "w") as f:
139 f.write(obj["contents"])
140 remap.append((obj["location"], path + "/" + obj["basename"]))
142 for opt in self.optional_deps:
143 if obj["location"] == opt["location"]:
145 raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
147 def needs_new_collection(self, srcobj, prefix=""):
148 """Check if files need to be staged into a new collection.
150 If all the files are in the same collection and in the same
151 paths they would be staged to, return False. Otherwise, a new
152 collection is needed with files copied/created in the
156 loc = srcobj["location"]
157 if loc.startswith("_:"):
160 if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
165 loc_prefix = loc[:i+1]
168 # quote/unquote to ensure consistent quoting
169 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
176 if prefix != loc_prefix:
179 if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
182 if srcobj["class"] == "File" and loc not in self._pathmap:
184 for s in srcobj.get("secondaryFiles", []):
185 if self.needs_new_collection(s, prefix):
187 if srcobj.get("listing"):
188 prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
189 for l in srcobj["listing"]:
190 if self.needs_new_collection(l, prefix):
194 def setup(self, referenced_files, basedir):
195 # type: (List[Any], unicode) -> None
199 if self.single_collection:
200 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
201 keep_client=self.arvrunner.keep_client,
202 num_retries=self.arvrunner.num_retries)
204 for srcobj in referenced_files:
205 self.visit(srcobj, uploadfiles)
207 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
210 num_retries=self.arvrunner.num_retries,
211 fnPattern="keep:%s/%s",
213 project=self.arvrunner.project_uuid,
214 collection=collection,
217 for src, ab, st in uploadfiles:
218 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
219 "Directory" if os.path.isdir(ab) else "File", True)
221 for srcobj in referenced_files:
223 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
224 c = arvados.collection.Collection(api_client=self.arvrunner.api,
225 keep_client=self.arvrunner.keep_client,
226 num_retries=self.arvrunner.num_retries)
227 for l in srcobj.get("listing", []):
228 self.addentry(l, c, ".", remap)
230 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
231 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
233 c.save_new(name=info["name"],
234 owner_uuid=self.arvrunner.project_uuid,
235 ensure_unique_name=True,
236 trash_at=info["trash_at"],
237 properties=info["properties"])
239 ab = self.collection_pattern % c.portable_data_hash()
240 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
241 elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
242 c = arvados.collection.Collection(api_client=self.arvrunner.api,
243 keep_client=self.arvrunner.keep_client,
244 num_retries=self.arvrunner.num_retries)
245 self.addentry(srcobj, c, ".", remap)
247 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
248 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
250 c.save_new(name=info["name"],
251 owner_uuid=self.arvrunner.project_uuid,
252 ensure_unique_name=True,
253 trash_at=info["trash_at"],
254 properties=info["properties"])
256 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
257 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
259 if srcobj.get("secondaryFiles"):
260 ab = self.collection_pattern % c.portable_data_hash()
261 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
264 for loc, sub in remap:
265 # subdirs start with "./", strip it off
266 if sub.startswith("./"):
267 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
269 ab = self.file_pattern % (c.portable_data_hash(), sub)
270 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
271 ab, "Directory", True)
275 def reversemap(self, target):
276 p = super(ArvPathMapper, self).reversemap(target)
279 elif target.startswith("keep:"):
280 return (target, target)
281 elif self.keepdir and target.startswith(self.keepdir):
282 kp = "keep:" + target[len(self.keepdir)+1:]
288 class StagingPathMapper(PathMapper):
289 # Note that StagingPathMapper internally maps files from target to source.
290 # Specifically, the 'self._pathmap' dict keys are the target location and the
291 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
292 # as the file identifier. This makes it possible to map an input file to multiple
293 # target directories. The exception is for file literals, which store the contents of
294 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
298 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
300 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
302 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
303 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
304 loc = obj["location"]
305 stagedir = obj.get("dirname") or stagedir
306 tgt = os.path.join(stagedir, obj["basename"])
307 basetgt, baseext = os.path.splitext(tgt)
310 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
311 def literalTargetExists():
312 return tgt in self.targets and "contents" in obj
315 if targetExists() or literalTargetExists():
316 while tgt in self.targets:
318 tgt = "%s_%i%s" % (basetgt, n, baseext)
319 self.targets.add(tgt)
320 if obj["class"] == "Directory":
321 if obj.get("writable"):
322 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
324 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
325 if loc.startswith("_:") or self._follow_dirs:
326 self.visitlisting(obj.get("listing", []), tgt, basedir)
327 elif obj["class"] == "File":
328 if tgt in self._pathmap:
330 if "contents" in obj and loc.startswith("_:"):
331 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
333 if copy or obj.get("writable"):
334 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
336 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
337 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
339 def mapper(self, src): # type: (Text) -> MapperEnt.
340 # Overridden to maintain the use case of mapping by source (identifier) to
341 # target regardless of how the map is structured interally.
342 def getMapperEnt(src):
343 for k,v in self._pathmap.items():
344 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
349 v = getMapperEnt(src[i:])
350 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
351 return getMapperEnt(src)
354 class VwdPathMapper(StagingPathMapper):
355 def setup(self, referenced_files, basedir):
356 # type: (List[Any], unicode) -> None
358 # Go through each file and set the target to its own directory along
359 # with any secondary files.
360 self.visitlisting(referenced_files, self.stagedir, basedir)
362 for path, (ab, tgt, type, staged) in self._pathmap.items():
363 if type in ("File", "Directory") and ab.startswith("keep:"):
364 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
367 class NoFollowPathMapper(StagingPathMapper):
369 def setup(self, referenced_files, basedir):
370 # type: (List[Any], unicode) -> None
371 self.visitlisting(referenced_files, self.stagedir, basedir)