1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
29 from arvados.http_to_keep import http_to_keep
31 logger = logging.getLogger('arvados.cwl-runner')
33 def trim_listing(obj):
34 """Remove 'listing' field from Directory objects that are keep references.
36 When Directory objects represent Keep references, it is redundant and
37 potentially very expensive to pass fully enumerated Directory objects
38 between instances of cwl-runner (e.g. a submitting a job, or using the
39 RunInSingleContainer feature), so delete the 'listing' field when it is
44 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
51 class ArvPathMapper(PathMapper):
52 """Convert container-local paths to and from Keep collection ids."""
54 def __init__(self, arvrunner, referenced_files, input_basedir,
55 collection_pattern, file_pattern, name=None, single_collection=False,
57 self.arvrunner = arvrunner
58 self.input_basedir = input_basedir
59 self.collection_pattern = collection_pattern
60 self.file_pattern = file_pattern
62 self.referenced_files = [r["location"] for r in referenced_files]
63 self.single_collection = single_collection
65 self.optional_deps = optional_deps or []
66 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
68 def visit(self, srcobj, uploadfiles):
69 src = srcobj["location"]
71 src = src[:src.index("#")]
73 debug = logger.isEnabledFor(logging.DEBUG)
75 if isinstance(src, basestring) and src.startswith("keep:"):
76 if collection_pdh_pattern.match(src):
77 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
79 if arvados_cwl.util.collectionUUID in srcobj:
80 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81 elif not collection_uuid_pattern.match(src):
82 with SourceLine(srcobj, "location", WorkflowException, debug):
83 raise WorkflowException("Invalid keep reference '%s'" % src)
85 if src not in self._pathmap:
86 if src.startswith("file:"):
87 # Local FS ref, may need to be uploaded or may be on keep
89 ab = abspath(src, self.input_basedir)
90 st = arvados.commands.run.statfile("", ab,
91 fnPattern="keep:%s/%s",
92 dirPattern="keep:%s/%s",
94 with SourceLine(srcobj, "location", WorkflowException, debug):
95 if isinstance(st, arvados.commands.run.UploadFile):
96 uploadfiles.add((src, ab, st))
97 elif isinstance(st, arvados.commands.run.ArvFile):
98 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
100 raise WorkflowException("Input file path '%s' is invalid" % st)
101 elif src.startswith("_:"):
102 if srcobj["class"] == "File" and "contents" not in srcobj:
103 raise WorkflowException("File literal '%s' is missing `contents`" % src)
104 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106 elif src.startswith("http:") or src.startswith("https:"):
108 if self.arvrunner.defer_downloads:
109 # passthrough, we'll download it later.
110 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
112 results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
113 varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
114 prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
115 keepref = "keep:%s/%s" % (results[0], results[1])
116 logger.info("%s is %s", src, keepref)
117 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
118 except Exception as e:
119 logger.warning("Download error: %s", e)
121 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
123 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
124 for l in srcobj.get("secondaryFiles", []):
125 self.visit(l, uploadfiles)
126 with SourceLine(srcobj, "listing", WorkflowException, debug):
127 for l in srcobj.get("listing", []):
128 self.visit(l, uploadfiles)
130 def addentry(self, obj, c, path, remap):
131 if obj["location"] in self._pathmap:
132 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
135 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
136 remap.append((obj["location"], path + "/" + obj["basename"]))
137 for l in obj.get("secondaryFiles", []):
138 self.addentry(l, c, path, remap)
139 elif obj["class"] == "Directory":
140 for l in obj.get("listing", []):
141 self.addentry(l, c, path + "/" + obj["basename"], remap)
142 remap.append((obj["location"], path + "/" + obj["basename"]))
143 elif obj["location"].startswith("_:") and "contents" in obj:
144 with c.open(path + "/" + obj["basename"], "w") as f:
145 f.write(obj["contents"])
146 remap.append((obj["location"], path + "/" + obj["basename"]))
148 for opt in self.optional_deps:
149 if obj["location"] == opt["location"]:
151 raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
153 def needs_new_collection(self, srcobj, prefix=""):
154 """Check if files need to be staged into a new collection.
156 If all the files are in the same collection and in the same
157 paths they would be staged to, return False. Otherwise, a new
158 collection is needed with files copied/created in the
162 loc = srcobj["location"]
163 if loc.startswith("_:"):
166 if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
171 loc_prefix = loc[:i+1]
174 # quote/unquote to ensure consistent quoting
175 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
182 if prefix != loc_prefix:
185 if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
188 if srcobj["class"] == "File" and loc not in self._pathmap:
190 for s in srcobj.get("secondaryFiles", []):
191 if self.needs_new_collection(s, prefix):
193 if srcobj.get("listing"):
194 prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
195 for l in srcobj["listing"]:
196 if self.needs_new_collection(l, prefix):
200 def setup(self, referenced_files, basedir):
201 # type: (List[Any], unicode) -> None
205 if self.single_collection:
206 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
207 keep_client=self.arvrunner.keep_client,
208 num_retries=self.arvrunner.num_retries)
210 for srcobj in referenced_files:
211 self.visit(srcobj, uploadfiles)
213 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
216 num_retries=self.arvrunner.num_retries,
217 fnPattern="keep:%s/%s",
219 project=self.arvrunner.project_uuid,
220 collection=collection,
223 for src, ab, st in uploadfiles:
224 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
225 "Directory" if os.path.isdir(ab) else "File", True)
227 for srcobj in referenced_files:
229 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
230 c = arvados.collection.Collection(api_client=self.arvrunner.api,
231 keep_client=self.arvrunner.keep_client,
232 num_retries=self.arvrunner.num_retries)
233 for l in srcobj.get("listing", []):
234 self.addentry(l, c, ".", remap)
236 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
237 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
239 c.save_new(name=info["name"],
240 owner_uuid=self.arvrunner.project_uuid,
241 ensure_unique_name=True,
242 trash_at=info["trash_at"],
243 properties=info["properties"])
245 ab = self.collection_pattern % c.portable_data_hash()
246 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
247 elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
248 c = arvados.collection.Collection(api_client=self.arvrunner.api,
249 keep_client=self.arvrunner.keep_client,
250 num_retries=self.arvrunner.num_retries)
251 self.addentry(srcobj, c, ".", remap)
253 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
254 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
256 c.save_new(name=info["name"],
257 owner_uuid=self.arvrunner.project_uuid,
258 ensure_unique_name=True,
259 trash_at=info["trash_at"],
260 properties=info["properties"])
262 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
263 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
265 if srcobj.get("secondaryFiles"):
266 ab = self.collection_pattern % c.portable_data_hash()
267 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
270 for loc, sub in remap:
271 # subdirs start with "./", strip it off
272 if sub.startswith("./"):
273 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
275 ab = self.file_pattern % (c.portable_data_hash(), sub)
276 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
277 ab, "Directory", True)
281 def reversemap(self, target):
282 p = super(ArvPathMapper, self).reversemap(target)
285 elif target.startswith("keep:"):
286 return (target, target)
287 elif self.keepdir and target.startswith(self.keepdir):
288 kp = "keep:" + target[len(self.keepdir)+1:]
294 class StagingPathMapper(PathMapper):
295 # Note that StagingPathMapper internally maps files from target to source.
296 # Specifically, the 'self._pathmap' dict keys are the target location and the
297 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
298 # as the file identifier. This makes it possible to map an input file to multiple
299 # target directories. The exception is for file literals, which store the contents of
300 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
304 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
306 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
308 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
309 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
310 loc = obj["location"]
311 stagedir = obj.get("dirname") or stagedir
312 tgt = os.path.join(stagedir, obj["basename"])
313 basetgt, baseext = os.path.splitext(tgt)
316 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
317 def literalTargetExists():
318 return tgt in self.targets and "contents" in obj
321 if targetExists() or literalTargetExists():
322 while tgt in self.targets:
324 tgt = "%s_%i%s" % (basetgt, n, baseext)
325 self.targets.add(tgt)
326 if obj["class"] == "Directory":
327 if obj.get("writable"):
328 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
330 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
331 if loc.startswith("_:") or self._follow_dirs:
332 self.visitlisting(obj.get("listing", []), tgt, basedir)
333 elif obj["class"] == "File":
334 if tgt in self._pathmap:
336 if "contents" in obj and loc.startswith("_:"):
337 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
339 if copy or obj.get("writable"):
340 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
342 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
343 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
345 def mapper(self, src): # type: (Text) -> MapperEnt.
346 # Overridden to maintain the use case of mapping by source (identifier) to
347 # target regardless of how the map is structured interally.
348 def getMapperEnt(src):
349 for k,v in viewitems(self._pathmap):
350 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
355 v = getMapperEnt(src[i:])
356 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
357 return getMapperEnt(src)
360 class VwdPathMapper(StagingPathMapper):
361 def setup(self, referenced_files, basedir):
362 # type: (List[Any], unicode) -> None
364 # Go through each file and set the target to its own directory along
365 # with any secondary files.
366 self.visitlisting(referenced_files, self.stagedir, basedir)
368 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
369 if type in ("File", "Directory") and ab.startswith("keep:"):
370 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
373 class NoFollowPathMapper(StagingPathMapper):
375 def setup(self, referenced_files, basedir):
376 # type: (List[Any], unicode) -> None
377 self.visitlisting(referenced_files, self.stagedir, basedir)