1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
29 from .http import http_to_keep
31 logger = logging.getLogger('arvados.cwl-runner')
33 def trim_listing(obj):
34 """Remove 'listing' field from Directory objects that are keep references.
36 When Directory objects represent Keep references, it is redundant and
37 potentially very expensive to pass fully enumerated Directory objects
38 between instances of cwl-runner (e.g. a submitting a job, or using the
39 RunInSingleContainer feature), so delete the 'listing' field when it is
44 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
51 class ArvPathMapper(PathMapper):
52 """Convert container-local paths to and from Keep collection ids."""
54 def __init__(self, arvrunner, referenced_files, input_basedir,
55 collection_pattern, file_pattern, name=None, single_collection=False,
57 self.arvrunner = arvrunner
58 self.input_basedir = input_basedir
59 self.collection_pattern = collection_pattern
60 self.file_pattern = file_pattern
62 self.referenced_files = [r["location"] for r in referenced_files]
63 self.single_collection = single_collection
65 self.optional_deps = optional_deps or []
66 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
68 def visit(self, srcobj, uploadfiles):
69 src = srcobj["location"]
71 src = src[:src.index("#")]
73 debug = logger.isEnabledFor(logging.DEBUG)
75 if isinstance(src, basestring) and src.startswith("keep:"):
76 if collection_pdh_pattern.match(src):
77 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
79 if arvados_cwl.util.collectionUUID in srcobj:
80 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81 elif not collection_uuid_pattern.match(src):
82 with SourceLine(srcobj, "location", WorkflowException, debug):
83 raise WorkflowException("Invalid keep reference '%s'" % src)
85 if src not in self._pathmap:
86 if src.startswith("file:"):
87 # Local FS ref, may need to be uploaded or may be on keep
89 ab = abspath(src, self.input_basedir)
90 st = arvados.commands.run.statfile("", ab,
91 fnPattern="keep:%s/%s",
92 dirPattern="keep:%s/%s",
94 with SourceLine(srcobj, "location", WorkflowException, debug):
95 if isinstance(st, arvados.commands.run.UploadFile):
96 uploadfiles.add((src, ab, st))
97 elif isinstance(st, arvados.commands.run.ArvFile):
98 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
100 raise WorkflowException("Input file path '%s' is invalid" % st)
101 elif src.startswith("_:"):
102 if srcobj["class"] == "File" and "contents" not in srcobj:
103 raise WorkflowException("File literal '%s' is missing `contents`" % src)
104 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106 elif src.startswith("http:") or src.startswith("https:"):
108 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
109 logger.info("%s is %s", src, keepref)
110 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
111 except Exception as e:
112 logger.warning(str(e))
114 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
116 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
117 for l in srcobj.get("secondaryFiles", []):
118 self.visit(l, uploadfiles)
119 with SourceLine(srcobj, "listing", WorkflowException, debug):
120 for l in srcobj.get("listing", []):
121 self.visit(l, uploadfiles)
123 def addentry(self, obj, c, path, remap):
124 if obj["location"] in self._pathmap:
125 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
128 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
129 remap.append((obj["location"], path + "/" + obj["basename"]))
130 for l in obj.get("secondaryFiles", []):
131 self.addentry(l, c, path, remap)
132 elif obj["class"] == "Directory":
133 for l in obj.get("listing", []):
134 self.addentry(l, c, path + "/" + obj["basename"], remap)
135 remap.append((obj["location"], path + "/" + obj["basename"]))
136 elif obj["location"].startswith("_:") and "contents" in obj:
137 with c.open(path + "/" + obj["basename"], "w") as f:
138 f.write(obj["contents"])
139 remap.append((obj["location"], path + "/" + obj["basename"]))
141 for opt in self.optional_deps:
142 if obj["location"] == opt["location"]:
144 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
146 def needs_new_collection(self, srcobj, prefix=""):
147 """Check if files need to be staged into a new collection.
149 If all the files are in the same collection and in the same
150 paths they would be staged to, return False. Otherwise, a new
151 collection is needed with files copied/created in the
155 loc = srcobj["location"]
156 if loc.startswith("_:"):
161 loc_prefix = loc[:i+1]
164 # quote/unquote to ensure consistent quoting
165 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
172 if prefix != loc_prefix:
175 if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
178 if srcobj["class"] == "File" and loc not in self._pathmap:
180 for s in srcobj.get("secondaryFiles", []):
181 if self.needs_new_collection(s, prefix):
183 if srcobj.get("listing"):
184 prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
185 for l in srcobj["listing"]:
186 if self.needs_new_collection(l, prefix):
190 def setup(self, referenced_files, basedir):
191 # type: (List[Any], unicode) -> None
195 if self.single_collection:
196 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
197 keep_client=self.arvrunner.keep_client,
198 num_retries=self.arvrunner.num_retries)
200 for srcobj in referenced_files:
201 self.visit(srcobj, uploadfiles)
203 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
206 num_retries=self.arvrunner.num_retries,
207 fnPattern="keep:%s/%s",
209 project=self.arvrunner.project_uuid,
210 collection=collection,
213 for src, ab, st in uploadfiles:
214 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
215 "Directory" if os.path.isdir(ab) else "File", True)
217 for srcobj in referenced_files:
219 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
220 c = arvados.collection.Collection(api_client=self.arvrunner.api,
221 keep_client=self.arvrunner.keep_client,
222 num_retries=self.arvrunner.num_retries)
223 for l in srcobj.get("listing", []):
224 self.addentry(l, c, ".", remap)
226 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
227 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
229 c.save_new(name=info["name"],
230 owner_uuid=self.arvrunner.project_uuid,
231 ensure_unique_name=True,
232 trash_at=info["trash_at"],
233 properties=info["properties"])
235 ab = self.collection_pattern % c.portable_data_hash()
236 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
237 elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
238 c = arvados.collection.Collection(api_client=self.arvrunner.api,
239 keep_client=self.arvrunner.keep_client,
240 num_retries=self.arvrunner.num_retries)
241 self.addentry(srcobj, c, ".", remap)
243 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
244 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
246 c.save_new(name=info["name"],
247 owner_uuid=self.arvrunner.project_uuid,
248 ensure_unique_name=True,
249 trash_at=info["trash_at"],
250 properties=info["properties"])
252 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
253 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
255 if srcobj.get("secondaryFiles"):
256 ab = self.collection_pattern % c.portable_data_hash()
257 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
260 for loc, sub in remap:
261 # subdirs start with "./", strip it off
262 if sub.startswith("./"):
263 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
265 ab = self.file_pattern % (c.portable_data_hash(), sub)
266 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
267 ab, "Directory", True)
271 def reversemap(self, target):
272 p = super(ArvPathMapper, self).reversemap(target)
275 elif target.startswith("keep:"):
276 return (target, target)
277 elif self.keepdir and target.startswith(self.keepdir):
278 kp = "keep:" + target[len(self.keepdir)+1:]
284 class StagingPathMapper(PathMapper):
285 # Note that StagingPathMapper internally maps files from target to source.
286 # Specifically, the 'self._pathmap' dict keys are the target location and the
287 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
288 # as the file identifier. This makes it possible to map an input file to multiple
289 # target directories. The exception is for file literals, which store the contents of
290 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
294 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
296 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
298 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
299 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
300 loc = obj["location"]
301 stagedir = obj.get("dirname") or stagedir
302 tgt = os.path.join(stagedir, obj["basename"])
303 basetgt, baseext = os.path.splitext(tgt)
306 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
307 def literalTargetExists():
308 return tgt in self.targets and "contents" in obj
311 if targetExists() or literalTargetExists():
312 while tgt in self.targets:
314 tgt = "%s_%i%s" % (basetgt, n, baseext)
315 self.targets.add(tgt)
316 if obj["class"] == "Directory":
317 if obj.get("writable"):
318 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
320 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
321 if loc.startswith("_:") or self._follow_dirs:
322 self.visitlisting(obj.get("listing", []), tgt, basedir)
323 elif obj["class"] == "File":
324 if tgt in self._pathmap:
326 if "contents" in obj and loc.startswith("_:"):
327 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
329 if copy or obj.get("writable"):
330 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
332 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
333 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
335 def mapper(self, src): # type: (Text) -> MapperEnt.
336 # Overridden to maintain the use case of mapping by source (identifier) to
337 # target regardless of how the map is structured interally.
338 def getMapperEnt(src):
339 for k,v in viewitems(self._pathmap):
340 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
345 v = getMapperEnt(src[i:])
346 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
347 return getMapperEnt(src)
350 class VwdPathMapper(StagingPathMapper):
351 def setup(self, referenced_files, basedir):
352 # type: (List[Any], unicode) -> None
354 # Go through each file and set the target to its own directory along
355 # with any secondary files.
356 self.visitlisting(referenced_files, self.stagedir, basedir)
358 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
359 if type in ("File", "Directory") and ab.startswith("keep:"):
360 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
363 class NoFollowPathMapper(StagingPathMapper):
365 def setup(self, referenced_files, basedir):
366 # type: (List[Any], unicode) -> None
367 self.visitlisting(referenced_files, self.stagedir, basedir)