1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
9 import urllib.request, urllib.parse, urllib.error
11 import arvados_cwl.util
12 import arvados.commands.run
13 import arvados.collection
15 from arvados.errors import ApiError
16 from arvados._internal.http_to_keep import http_to_keep
17 from cwltool.pathmapper import PathMapper, MapperEnt
18 from cwltool.utils import adjustFileObjs, adjustDirObjs
19 from cwltool.stdfsaccess import abspath
20 from cwltool.workflow import WorkflowException
21 from schema_salad.sourceline import SourceLine
23 logger = logging.getLogger('arvados.cwl-runner')
25 def trim_listing(obj):
26 """Remove 'listing' field from Directory objects that are keep references.
28 When Directory objects represent Keep references, it is redundant and
29 potentially very expensive to pass fully enumerated Directory objects
30 between instances of cwl-runner (e.g. a submitting a job, or using the
31 RunInSingleContainer feature), so delete the 'listing' field when it is
36 if obj.get("location", "").startswith("keep:") and "listing" in obj:
39 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
41 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
43 class ArvPathMapper(PathMapper):
44 """Convert container-local paths to and from Keep collection ids."""
46 def __init__(self, arvrunner, referenced_files, input_basedir,
47 collection_pattern, file_pattern, name=None, single_collection=False,
49 self.arvrunner = arvrunner
50 self.input_basedir = input_basedir
51 self.collection_pattern = collection_pattern
52 self.file_pattern = file_pattern
54 self.referenced_files = [r["location"] for r in referenced_files]
55 self.single_collection = single_collection
57 self.optional_deps = optional_deps or []
58 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
60 def visit(self, srcobj, uploadfiles):
61 src = srcobj["location"]
63 src = src[:src.index("#")]
65 debug = logger.isEnabledFor(logging.DEBUG)
67 if isinstance(src, str) and src.startswith("keep:"):
68 if collection_pdh_pattern.match(src):
69 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
71 if arvados_cwl.util.collectionUUID in srcobj:
72 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
73 elif not collection_uuid_pattern.match(src):
74 with SourceLine(srcobj, "location", WorkflowException, debug):
75 raise WorkflowException("Invalid keep reference '%s'" % src)
77 if src not in self._pathmap:
78 if src.startswith("file:"):
79 # Local FS ref, may need to be uploaded or may be on keep
81 ab = abspath(src, self.input_basedir)
82 st = arvados.commands.run.statfile("", ab,
83 fnPattern="keep:%s/%s",
84 dirPattern="keep:%s/%s",
86 with SourceLine(srcobj, "location", WorkflowException, debug):
87 if isinstance(st, arvados.commands.run.UploadFile):
88 uploadfiles.add((src, ab, st))
89 elif isinstance(st, arvados.commands.run.ArvFile):
90 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
92 raise WorkflowException("Input file path '%s' is invalid" % st)
93 elif src.startswith("_:"):
94 if srcobj["class"] == "File" and "contents" not in srcobj:
95 raise WorkflowException("File literal '%s' is missing `contents`" % src)
96 if srcobj["class"] == "Directory" and "listing" not in srcobj:
97 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
98 elif src.startswith("http:") or src.startswith("https:"):
100 if self.arvrunner.defer_downloads:
101 # passthrough, we'll download it later.
102 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
104 results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
105 varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
106 prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
107 keepref = "keep:%s/%s" % (results[0], results[1])
108 logger.info("%s is %s", src, keepref)
109 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
110 except Exception as e:
111 logger.warning("Download error: %s", e)
113 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
115 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
116 for l in srcobj.get("secondaryFiles", []):
117 self.visit(l, uploadfiles)
118 with SourceLine(srcobj, "listing", WorkflowException, debug):
119 for l in srcobj.get("listing", []):
120 self.visit(l, uploadfiles)
122 def addentry(self, obj, c, path, remap):
123 if obj["location"] in self._pathmap:
124 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
127 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
128 remap.append((obj["location"], path + "/" + obj["basename"]))
129 for l in obj.get("secondaryFiles", []):
130 self.addentry(l, c, path, remap)
131 elif obj["class"] == "Directory":
132 for l in obj.get("listing", []):
133 self.addentry(l, c, path + "/" + obj["basename"], remap)
134 remap.append((obj["location"], path + "/" + obj["basename"]))
135 elif obj["location"].startswith("_:") and "contents" in obj:
136 with c.open(path + "/" + obj["basename"], "w") as f:
137 f.write(obj["contents"])
138 remap.append((obj["location"], path + "/" + obj["basename"]))
140 for opt in self.optional_deps:
141 if obj["location"] == opt["location"]:
143 raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
145 def needs_new_collection(self, srcobj, prefix=""):
146 """Check if files need to be staged into a new collection.
148 If all the files are in the same collection and in the same
149 paths they would be staged to, return False. Otherwise, a new
150 collection is needed with files copied/created in the
154 loc = srcobj["location"]
155 if loc.startswith("_:"):
158 if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
163 loc_prefix = loc[:i+1]
166 # quote/unquote to ensure consistent quoting
167 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
174 if prefix != loc_prefix:
177 if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
180 if srcobj["class"] == "File" and loc not in self._pathmap:
182 for s in srcobj.get("secondaryFiles", []):
183 if self.needs_new_collection(s, prefix):
185 if srcobj.get("listing"):
186 prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
187 for l in srcobj["listing"]:
188 if self.needs_new_collection(l, prefix):
192 def setup(self, referenced_files, basedir):
193 # type: (List[Any], unicode) -> None
197 if self.single_collection:
198 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
199 keep_client=self.arvrunner.keep_client,
200 num_retries=self.arvrunner.num_retries)
202 for srcobj in referenced_files:
203 self.visit(srcobj, uploadfiles)
205 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
208 num_retries=self.arvrunner.num_retries,
209 fnPattern="keep:%s/%s",
211 project=self.arvrunner.project_uuid,
212 collection=collection,
215 for src, ab, st in uploadfiles:
216 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
217 "Directory" if os.path.isdir(ab) else "File", True)
219 for srcobj in referenced_files:
221 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
222 c = arvados.collection.Collection(api_client=self.arvrunner.api,
223 keep_client=self.arvrunner.keep_client,
224 num_retries=self.arvrunner.num_retries)
225 for l in srcobj.get("listing", []):
226 self.addentry(l, c, ".", remap)
228 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
229 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
231 c.save_new(name=info["name"],
232 owner_uuid=self.arvrunner.project_uuid,
233 ensure_unique_name=True,
234 trash_at=info["trash_at"],
235 properties=info["properties"])
237 ab = self.collection_pattern % c.portable_data_hash()
238 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
239 elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
240 c = arvados.collection.Collection(api_client=self.arvrunner.api,
241 keep_client=self.arvrunner.keep_client,
242 num_retries=self.arvrunner.num_retries)
243 self.addentry(srcobj, c, ".", remap)
245 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
246 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
248 c.save_new(name=info["name"],
249 owner_uuid=self.arvrunner.project_uuid,
250 ensure_unique_name=True,
251 trash_at=info["trash_at"],
252 properties=info["properties"])
254 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
255 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
257 if srcobj.get("secondaryFiles"):
258 ab = self.collection_pattern % c.portable_data_hash()
259 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
262 for loc, sub in remap:
263 # subdirs start with "./", strip it off
264 if sub.startswith("./"):
265 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
267 ab = self.file_pattern % (c.portable_data_hash(), sub)
268 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
269 ab, "Directory", True)
273 def reversemap(self, target):
274 p = super(ArvPathMapper, self).reversemap(target)
277 elif target.startswith("keep:"):
278 return (target, target)
279 elif self.keepdir and target.startswith(self.keepdir):
280 kp = "keep:" + target[len(self.keepdir)+1:]
286 class StagingPathMapper(PathMapper):
287 # Note that StagingPathMapper internally maps files from target to source.
288 # Specifically, the 'self._pathmap' dict keys are the target location and the
289 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
290 # as the file identifier. This makes it possible to map an input file to multiple
291 # target directories. The exception is for file literals, which store the contents of
292 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
296 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
298 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
300 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
301 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
302 loc = obj["location"]
303 stagedir = obj.get("dirname") or stagedir
304 tgt = os.path.join(stagedir, obj["basename"])
305 basetgt, baseext = os.path.splitext(tgt)
308 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
309 def literalTargetExists():
310 return tgt in self.targets and "contents" in obj
313 if targetExists() or literalTargetExists():
314 while tgt in self.targets:
316 tgt = "%s_%i%s" % (basetgt, n, baseext)
317 self.targets.add(tgt)
318 if obj["class"] == "Directory":
319 if obj.get("writable"):
320 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
322 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
323 if loc.startswith("_:") or self._follow_dirs:
324 self.visitlisting(obj.get("listing", []), tgt, basedir)
325 elif obj["class"] == "File":
326 if tgt in self._pathmap:
328 if "contents" in obj and loc.startswith("_:"):
329 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
331 if copy or obj.get("writable"):
332 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
334 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
335 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
337 def mapper(self, src): # type: (Text) -> MapperEnt.
338 # Overridden to maintain the use case of mapping by source (identifier) to
339 # target regardless of how the map is structured interally.
340 def getMapperEnt(src):
341 for k,v in self._pathmap.items():
342 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
347 v = getMapperEnt(src[i:])
348 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
349 return getMapperEnt(src)
352 class VwdPathMapper(StagingPathMapper):
353 def setup(self, referenced_files, basedir):
354 # type: (List[Any], unicode) -> None
356 # Go through each file and set the target to its own directory along
357 # with any secondary files.
358 self.visitlisting(referenced_files, self.stagedir, basedir)
360 for path, (ab, tgt, type, staged) in self._pathmap.items():
361 if type in ("File", "Directory") and ab.startswith("keep:"):
362 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
365 class NoFollowPathMapper(StagingPathMapper):
367 def setup(self, referenced_files, basedir):
368 # type: (List[Any], unicode) -> None
369 self.visitlisting(referenced_files, self.stagedir, basedir)