1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
29 from .http import http_to_keep
31 logger = logging.getLogger('arvados.cwl-runner')
33 def trim_listing(obj):
34 """Remove 'listing' field from Directory objects that are keep references.
36 When Directory objects represent Keep references, it is redundant and
37 potentially very expensive to pass fully enumerated Directory objects
38 between instances of cwl-runner (e.g. a submitting a job, or using the
39 RunInSingleContainer feature), so delete the 'listing' field when it is
44 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
51 class ArvPathMapper(PathMapper):
52 """Convert container-local paths to and from Keep collection ids."""
54 def __init__(self, arvrunner, referenced_files, input_basedir,
55 collection_pattern, file_pattern, name=None, single_collection=False,
57 self.arvrunner = arvrunner
58 self.input_basedir = input_basedir
59 self.collection_pattern = collection_pattern
60 self.file_pattern = file_pattern
62 self.referenced_files = [r["location"] for r in referenced_files]
63 self.single_collection = single_collection
65 self.optional_deps = optional_deps or []
66 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
68 def visit(self, srcobj, uploadfiles):
69 src = srcobj["location"]
71 src = src[:src.index("#")]
73 debug = logger.isEnabledFor(logging.DEBUG)
75 if isinstance(src, basestring) and src.startswith("keep:"):
76 if collection_pdh_pattern.match(src):
77 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
79 if arvados_cwl.util.collectionUUID in srcobj:
80 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81 elif not collection_uuid_pattern.match(src):
82 with SourceLine(srcobj, "location", WorkflowException, debug):
83 raise WorkflowException("Invalid keep reference '%s'" % src)
85 if src not in self._pathmap:
86 if src.startswith("file:"):
87 # Local FS ref, may need to be uploaded or may be on keep
89 ab = abspath(src, self.input_basedir)
90 st = arvados.commands.run.statfile("", ab,
91 fnPattern="keep:%s/%s",
92 dirPattern="keep:%s/%s",
94 with SourceLine(srcobj, "location", WorkflowException, debug):
95 if isinstance(st, arvados.commands.run.UploadFile):
96 uploadfiles.add((src, ab, st))
97 elif isinstance(st, arvados.commands.run.ArvFile):
98 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
100 raise WorkflowException("Input file path '%s' is invalid" % st)
101 elif src.startswith("_:"):
102 if srcobj["class"] == "File" and "contents" not in srcobj:
103 raise WorkflowException("File literal '%s' is missing `contents`" % src)
104 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106 elif src.startswith("http:") or src.startswith("https:"):
108 if self.arvrunner.defer_downloads:
109 # passthrough, we'll download it later.
110 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
112 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src, varying_url_params=self.arvrunner.varying_url_params)
113 logger.info("%s is %s", src, keepref)
114 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
115 except Exception as e:
116 logger.warning(str(e))
118 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
120 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
121 for l in srcobj.get("secondaryFiles", []):
122 self.visit(l, uploadfiles)
123 with SourceLine(srcobj, "listing", WorkflowException, debug):
124 for l in srcobj.get("listing", []):
125 self.visit(l, uploadfiles)
127 def addentry(self, obj, c, path, remap):
128 if obj["location"] in self._pathmap:
129 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
132 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
133 remap.append((obj["location"], path + "/" + obj["basename"]))
134 for l in obj.get("secondaryFiles", []):
135 self.addentry(l, c, path, remap)
136 elif obj["class"] == "Directory":
137 for l in obj.get("listing", []):
138 self.addentry(l, c, path + "/" + obj["basename"], remap)
139 remap.append((obj["location"], path + "/" + obj["basename"]))
140 elif obj["location"].startswith("_:") and "contents" in obj:
141 with c.open(path + "/" + obj["basename"], "w") as f:
142 f.write(obj["contents"])
143 remap.append((obj["location"], path + "/" + obj["basename"]))
145 for opt in self.optional_deps:
146 if obj["location"] == opt["location"]:
148 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
150 def needs_new_collection(self, srcobj, prefix=""):
151 """Check if files need to be staged into a new collection.
153 If all the files are in the same collection and in the same
154 paths they would be staged to, return False. Otherwise, a new
155 collection is needed with files copied/created in the
159 loc = srcobj["location"]
160 if loc.startswith("_:"):
163 if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
168 loc_prefix = loc[:i+1]
171 # quote/unquote to ensure consistent quoting
172 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
179 if prefix != loc_prefix:
182 if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
185 if srcobj["class"] == "File" and loc not in self._pathmap:
187 for s in srcobj.get("secondaryFiles", []):
188 if self.needs_new_collection(s, prefix):
190 if srcobj.get("listing"):
191 prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
192 for l in srcobj["listing"]:
193 if self.needs_new_collection(l, prefix):
197 def setup(self, referenced_files, basedir):
198 # type: (List[Any], unicode) -> None
202 if self.single_collection:
203 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
204 keep_client=self.arvrunner.keep_client,
205 num_retries=self.arvrunner.num_retries)
207 for srcobj in referenced_files:
208 self.visit(srcobj, uploadfiles)
210 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
213 num_retries=self.arvrunner.num_retries,
214 fnPattern="keep:%s/%s",
216 project=self.arvrunner.project_uuid,
217 collection=collection,
220 for src, ab, st in uploadfiles:
221 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
222 "Directory" if os.path.isdir(ab) else "File", True)
224 for srcobj in referenced_files:
226 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
227 c = arvados.collection.Collection(api_client=self.arvrunner.api,
228 keep_client=self.arvrunner.keep_client,
229 num_retries=self.arvrunner.num_retries)
230 for l in srcobj.get("listing", []):
231 self.addentry(l, c, ".", remap)
233 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
234 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
236 c.save_new(name=info["name"],
237 owner_uuid=self.arvrunner.project_uuid,
238 ensure_unique_name=True,
239 trash_at=info["trash_at"],
240 properties=info["properties"])
242 ab = self.collection_pattern % c.portable_data_hash()
243 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
244 elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
245 c = arvados.collection.Collection(api_client=self.arvrunner.api,
246 keep_client=self.arvrunner.keep_client,
247 num_retries=self.arvrunner.num_retries)
248 self.addentry(srcobj, c, ".", remap)
250 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
251 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
253 c.save_new(name=info["name"],
254 owner_uuid=self.arvrunner.project_uuid,
255 ensure_unique_name=True,
256 trash_at=info["trash_at"],
257 properties=info["properties"])
259 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
260 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
262 if srcobj.get("secondaryFiles"):
263 ab = self.collection_pattern % c.portable_data_hash()
264 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
267 for loc, sub in remap:
268 # subdirs start with "./", strip it off
269 if sub.startswith("./"):
270 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
272 ab = self.file_pattern % (c.portable_data_hash(), sub)
273 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
274 ab, "Directory", True)
278 def reversemap(self, target):
279 p = super(ArvPathMapper, self).reversemap(target)
282 elif target.startswith("keep:"):
283 return (target, target)
284 elif self.keepdir and target.startswith(self.keepdir):
285 kp = "keep:" + target[len(self.keepdir)+1:]
291 class StagingPathMapper(PathMapper):
292 # Note that StagingPathMapper internally maps files from target to source.
293 # Specifically, the 'self._pathmap' dict keys are the target location and the
294 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
295 # as the file identifier. This makes it possible to map an input file to multiple
296 # target directories. The exception is for file literals, which store the contents of
297 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
301 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
303 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
305 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
306 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
307 loc = obj["location"]
308 stagedir = obj.get("dirname") or stagedir
309 tgt = os.path.join(stagedir, obj["basename"])
310 basetgt, baseext = os.path.splitext(tgt)
313 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
314 def literalTargetExists():
315 return tgt in self.targets and "contents" in obj
318 if targetExists() or literalTargetExists():
319 while tgt in self.targets:
321 tgt = "%s_%i%s" % (basetgt, n, baseext)
322 self.targets.add(tgt)
323 if obj["class"] == "Directory":
324 if obj.get("writable"):
325 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
327 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
328 if loc.startswith("_:") or self._follow_dirs:
329 self.visitlisting(obj.get("listing", []), tgt, basedir)
330 elif obj["class"] == "File":
331 if tgt in self._pathmap:
333 if "contents" in obj and loc.startswith("_:"):
334 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
336 if copy or obj.get("writable"):
337 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
339 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
340 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
342 def mapper(self, src): # type: (Text) -> MapperEnt.
343 # Overridden to maintain the use case of mapping by source (identifier) to
344 # target regardless of how the map is structured interally.
345 def getMapperEnt(src):
346 for k,v in viewitems(self._pathmap):
347 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
352 v = getMapperEnt(src[i:])
353 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
354 return getMapperEnt(src)
357 class VwdPathMapper(StagingPathMapper):
358 def setup(self, referenced_files, basedir):
359 # type: (List[Any], unicode) -> None
361 # Go through each file and set the target to its own directory along
362 # with any secondary files.
363 self.visitlisting(referenced_files, self.stagedir, basedir)
365 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
366 if type in ("File", "Directory") and ab.startswith("keep:"):
367 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
370 class NoFollowPathMapper(StagingPathMapper):
372 def setup(self, referenced_files, basedir):
373 # type: (List[Any], unicode) -> None
374 self.visitlisting(referenced_files, self.stagedir, basedir)