1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
29 from arvados.http_to_keep import http_to_keep
31 logger = logging.getLogger('arvados.cwl-runner')
33 def trim_listing(obj):
34 """Remove 'listing' field from Directory objects that are keep references.
36 When Directory objects represent Keep references, it is redundant and
37 potentially very expensive to pass fully enumerated Directory objects
38 between instances of cwl-runner (e.g. a submitting a job, or using the
39 RunInSingleContainer feature), so delete the 'listing' field when it is
44 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
51 class ArvPathMapper(PathMapper):
52 """Convert container-local paths to and from Keep collection ids."""
54 def __init__(self, arvrunner, referenced_files, input_basedir,
55 collection_pattern, file_pattern, name=None, single_collection=False,
57 self.arvrunner = arvrunner
58 self.input_basedir = input_basedir
59 self.collection_pattern = collection_pattern
60 self.file_pattern = file_pattern
62 self.referenced_files = [r["location"] for r in referenced_files]
63 self.single_collection = single_collection
65 self.optional_deps = optional_deps or []
66 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
68 def visit(self, srcobj, uploadfiles):
69 src = srcobj["location"]
71 src = src[:src.index("#")]
73 debug = logger.isEnabledFor(logging.DEBUG)
75 if isinstance(src, basestring) and src.startswith("keep:"):
76 if collection_pdh_pattern.match(src):
77 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
79 if arvados_cwl.util.collectionUUID in srcobj:
80 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81 elif not collection_uuid_pattern.match(src):
82 with SourceLine(srcobj, "location", WorkflowException, debug):
83 raise WorkflowException("Invalid keep reference '%s'" % src)
85 if src not in self._pathmap:
86 if src.startswith("file:"):
87 # Local FS ref, may need to be uploaded or may be on keep
89 ab = abspath(src, self.input_basedir)
90 st = arvados.commands.run.statfile("", ab,
91 fnPattern="keep:%s/%s",
92 dirPattern="keep:%s/%s",
94 with SourceLine(srcobj, "location", WorkflowException, debug):
95 if isinstance(st, arvados.commands.run.UploadFile):
96 uploadfiles.add((src, ab, st))
97 elif isinstance(st, arvados.commands.run.ArvFile):
98 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
100 raise WorkflowException("Input file path '%s' is invalid" % st)
101 elif src.startswith("_:"):
102 if srcobj["class"] == "File" and "contents" not in srcobj:
103 raise WorkflowException("File literal '%s' is missing `contents`" % src)
104 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106 elif src.startswith("http:") or src.startswith("https:"):
108 if self.arvrunner.defer_downloads:
109 # passthrough, we'll download it later.
110 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
112 keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
113 varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
114 prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
115 logger.info("%s is %s", src, keepref)
116 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
117 except Exception as e:
118 logger.warning(str(e))
120 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
122 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
123 for l in srcobj.get("secondaryFiles", []):
124 self.visit(l, uploadfiles)
125 with SourceLine(srcobj, "listing", WorkflowException, debug):
126 for l in srcobj.get("listing", []):
127 self.visit(l, uploadfiles)
129 def addentry(self, obj, c, path, remap):
130 if obj["location"] in self._pathmap:
131 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
134 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
135 remap.append((obj["location"], path + "/" + obj["basename"]))
136 for l in obj.get("secondaryFiles", []):
137 self.addentry(l, c, path, remap)
138 elif obj["class"] == "Directory":
139 for l in obj.get("listing", []):
140 self.addentry(l, c, path + "/" + obj["basename"], remap)
141 remap.append((obj["location"], path + "/" + obj["basename"]))
142 elif obj["location"].startswith("_:") and "contents" in obj:
143 with c.open(path + "/" + obj["basename"], "w") as f:
144 f.write(obj["contents"])
145 remap.append((obj["location"], path + "/" + obj["basename"]))
147 for opt in self.optional_deps:
148 if obj["location"] == opt["location"]:
150 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
152 def needs_new_collection(self, srcobj, prefix=""):
153 """Check if files need to be staged into a new collection.
155 If all the files are in the same collection and in the same
156 paths they would be staged to, return False. Otherwise, a new
157 collection is needed with files copied/created in the
161 loc = srcobj["location"]
162 if loc.startswith("_:"):
165 if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
170 loc_prefix = loc[:i+1]
173 # quote/unquote to ensure consistent quoting
174 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
181 if prefix != loc_prefix:
184 if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
187 if srcobj["class"] == "File" and loc not in self._pathmap:
189 for s in srcobj.get("secondaryFiles", []):
190 if self.needs_new_collection(s, prefix):
192 if srcobj.get("listing"):
193 prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
194 for l in srcobj["listing"]:
195 if self.needs_new_collection(l, prefix):
199 def setup(self, referenced_files, basedir):
200 # type: (List[Any], unicode) -> None
204 if self.single_collection:
205 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
206 keep_client=self.arvrunner.keep_client,
207 num_retries=self.arvrunner.num_retries)
209 for srcobj in referenced_files:
210 self.visit(srcobj, uploadfiles)
212 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
215 num_retries=self.arvrunner.num_retries,
216 fnPattern="keep:%s/%s",
218 project=self.arvrunner.project_uuid,
219 collection=collection,
222 for src, ab, st in uploadfiles:
223 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
224 "Directory" if os.path.isdir(ab) else "File", True)
226 for srcobj in referenced_files:
228 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
229 c = arvados.collection.Collection(api_client=self.arvrunner.api,
230 keep_client=self.arvrunner.keep_client,
231 num_retries=self.arvrunner.num_retries)
232 for l in srcobj.get("listing", []):
233 self.addentry(l, c, ".", remap)
235 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
236 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
238 c.save_new(name=info["name"],
239 owner_uuid=self.arvrunner.project_uuid,
240 ensure_unique_name=True,
241 trash_at=info["trash_at"],
242 properties=info["properties"])
244 ab = self.collection_pattern % c.portable_data_hash()
245 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
246 elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
247 c = arvados.collection.Collection(api_client=self.arvrunner.api,
248 keep_client=self.arvrunner.keep_client,
249 num_retries=self.arvrunner.num_retries)
250 self.addentry(srcobj, c, ".", remap)
252 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
253 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
255 c.save_new(name=info["name"],
256 owner_uuid=self.arvrunner.project_uuid,
257 ensure_unique_name=True,
258 trash_at=info["trash_at"],
259 properties=info["properties"])
261 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
262 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
264 if srcobj.get("secondaryFiles"):
265 ab = self.collection_pattern % c.portable_data_hash()
266 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
269 for loc, sub in remap:
270 # subdirs start with "./", strip it off
271 if sub.startswith("./"):
272 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
274 ab = self.file_pattern % (c.portable_data_hash(), sub)
275 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
276 ab, "Directory", True)
280 def reversemap(self, target):
281 p = super(ArvPathMapper, self).reversemap(target)
284 elif target.startswith("keep:"):
285 return (target, target)
286 elif self.keepdir and target.startswith(self.keepdir):
287 kp = "keep:" + target[len(self.keepdir)+1:]
293 class StagingPathMapper(PathMapper):
294 # Note that StagingPathMapper internally maps files from target to source.
295 # Specifically, the 'self._pathmap' dict keys are the target location and the
296 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
297 # as the file identifier. This makes it possible to map an input file to multiple
298 # target directories. The exception is for file literals, which store the contents of
299 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
303 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
305 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
307 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
308 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
309 loc = obj["location"]
310 stagedir = obj.get("dirname") or stagedir
311 tgt = os.path.join(stagedir, obj["basename"])
312 basetgt, baseext = os.path.splitext(tgt)
315 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
316 def literalTargetExists():
317 return tgt in self.targets and "contents" in obj
320 if targetExists() or literalTargetExists():
321 while tgt in self.targets:
323 tgt = "%s_%i%s" % (basetgt, n, baseext)
324 self.targets.add(tgt)
325 if obj["class"] == "Directory":
326 if obj.get("writable"):
327 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
329 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
330 if loc.startswith("_:") or self._follow_dirs:
331 self.visitlisting(obj.get("listing", []), tgt, basedir)
332 elif obj["class"] == "File":
333 if tgt in self._pathmap:
335 if "contents" in obj and loc.startswith("_:"):
336 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
338 if copy or obj.get("writable"):
339 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
341 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
342 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
344 def mapper(self, src): # type: (Text) -> MapperEnt.
345 # Overridden to maintain the use case of mapping by source (identifier) to
346 # target regardless of how the map is structured interally.
347 def getMapperEnt(src):
348 for k,v in viewitems(self._pathmap):
349 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
354 v = getMapperEnt(src[i:])
355 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
356 return getMapperEnt(src)
359 class VwdPathMapper(StagingPathMapper):
360 def setup(self, referenced_files, basedir):
361 # type: (List[Any], unicode) -> None
363 # Go through each file and set the target to its own directory along
364 # with any secondary files.
365 self.visitlisting(referenced_files, self.stagedir, basedir)
367 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
368 if type in ("File", "Directory") and ab.startswith("keep:"):
369 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
372 class NoFollowPathMapper(StagingPathMapper):
374 def setup(self, referenced_files, basedir):
375 # type: (List[Any], unicode) -> None
376 self.visitlisting(referenced_files, self.stagedir, basedir)