1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
29 from .http import http_to_keep
31 logger = logging.getLogger('arvados.cwl-runner')
33 def trim_listing(obj):
34 """Remove 'listing' field from Directory objects that are keep references.
36 When Directory objects represent Keep references, it is redundant and
37 potentially very expensive to pass fully enumerated Directory objects
38 between instances of cwl-runner (e.g. a submitting a job, or using the
39 RunInSingleContainer feature), so delete the 'listing' field when it is
44 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
51 class ArvPathMapper(PathMapper):
52 """Convert container-local paths to and from Keep collection ids."""
54 def __init__(self, arvrunner, referenced_files, input_basedir,
55 collection_pattern, file_pattern, name=None, single_collection=False,
57 self.arvrunner = arvrunner
58 self.input_basedir = input_basedir
59 self.collection_pattern = collection_pattern
60 self.file_pattern = file_pattern
62 self.referenced_files = [r["location"] for r in referenced_files]
63 self.single_collection = single_collection
65 self.optional_deps = optional_deps or []
66 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
68 def visit(self, srcobj, uploadfiles):
69 src = srcobj["location"]
71 src = src[:src.index("#")]
73 debug = logger.isEnabledFor(logging.DEBUG)
75 if isinstance(src, basestring) and src.startswith("keep:"):
76 if collection_pdh_pattern.match(src):
77 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
79 if arvados_cwl.util.collectionUUID in srcobj:
80 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
81 elif not collection_uuid_pattern.match(src):
82 with SourceLine(srcobj, "location", WorkflowException, debug):
83 raise WorkflowException("Invalid keep reference '%s'" % src)
85 if src not in self._pathmap:
86 if src.startswith("file:"):
87 # Local FS ref, may need to be uploaded or may be on keep
89 ab = abspath(src, self.input_basedir)
90 st = arvados.commands.run.statfile("", ab,
91 fnPattern="keep:%s/%s",
92 dirPattern="keep:%s/%s",
94 with SourceLine(srcobj, "location", WorkflowException, debug):
95 if isinstance(st, arvados.commands.run.UploadFile):
96 uploadfiles.add((src, ab, st))
97 elif isinstance(st, arvados.commands.run.ArvFile):
98 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
100 raise WorkflowException("Input file path '%s' is invalid" % st)
101 elif src.startswith("_:"):
102 if srcobj["class"] == "File" and "contents" not in srcobj:
103 raise WorkflowException("File literal '%s' is missing `contents`" % src)
104 if srcobj["class"] == "Directory" and "listing" not in srcobj:
105 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
106 elif src.startswith("http:") or src.startswith("https:"):
108 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
109 logger.info("%s is %s", src, keepref)
110 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
111 except Exception as e:
112 logger.warning(str(e))
114 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
116 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
117 for l in srcobj.get("secondaryFiles", []):
118 self.visit(l, uploadfiles)
119 with SourceLine(srcobj, "listing", WorkflowException, debug):
120 for l in srcobj.get("listing", []):
121 self.visit(l, uploadfiles)
123 def addentry(self, obj, c, path, remap):
124 if obj["location"] in self._pathmap:
125 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
128 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
129 remap.append((obj["location"], path + "/" + obj["basename"]))
130 for l in obj.get("secondaryFiles", []):
131 self.addentry(l, c, path, remap)
132 elif obj["class"] == "Directory":
133 for l in obj.get("listing", []):
134 self.addentry(l, c, path + "/" + obj["basename"], remap)
135 remap.append((obj["location"], path + "/" + obj["basename"]))
136 elif obj["location"].startswith("_:") and "contents" in obj:
137 with c.open(path + "/" + obj["basename"], "w") as f:
138 f.write(obj["contents"])
139 remap.append((obj["location"], path + "/" + obj["basename"]))
141 for opt in self.optional_deps:
142 if obj["location"] == opt["location"]:
144 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
146 def needs_new_collection(self, srcobj, prefix=""):
147 """Check if files need to be staged into a new collection.
149 If all the files are in the same collection and in the same
150 paths they would be staged to, return False. Otherwise, a new
151 collection is needed with files copied/created in the
155 loc = srcobj["location"]
156 if loc.startswith("_:"):
163 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
167 suffix = loc[len(prefix):]
169 if "basename" in srcobj and prefix+suffix != prefix+urllib.parse.quote(srcobj["basename"], "/+@"):
172 if srcobj["class"] == "File" and loc not in self._pathmap:
174 for s in srcobj.get("secondaryFiles", []):
175 if self.needs_new_collection(s, prefix):
177 if srcobj.get("listing"):
178 prefix = "%s%s/" % (prefix, srcobj["basename"])
179 for l in srcobj["listing"]:
180 if self.needs_new_collection(l, prefix):
184 def setup(self, referenced_files, basedir):
185 # type: (List[Any], unicode) -> None
189 if self.single_collection:
190 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
191 keep_client=self.arvrunner.keep_client,
192 num_retries=self.arvrunner.num_retries)
194 for srcobj in referenced_files:
195 self.visit(srcobj, uploadfiles)
197 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
200 num_retries=self.arvrunner.num_retries,
201 fnPattern="keep:%s/%s",
203 project=self.arvrunner.project_uuid,
204 collection=collection,
207 for src, ab, st in uploadfiles:
208 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
209 "Directory" if os.path.isdir(ab) else "File", True)
211 for srcobj in referenced_files:
213 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
214 c = arvados.collection.Collection(api_client=self.arvrunner.api,
215 keep_client=self.arvrunner.keep_client,
216 num_retries=self.arvrunner.num_retries)
217 for l in srcobj.get("listing", []):
218 self.addentry(l, c, ".", remap)
220 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
221 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
223 c.save_new(name=info["name"],
224 owner_uuid=self.arvrunner.project_uuid,
225 ensure_unique_name=True,
226 trash_at=info["trash_at"],
227 properties=info["properties"])
229 ab = self.collection_pattern % c.portable_data_hash()
230 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
231 elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
232 c = arvados.collection.Collection(api_client=self.arvrunner.api,
233 keep_client=self.arvrunner.keep_client,
234 num_retries=self.arvrunner.num_retries)
235 self.addentry(srcobj, c, ".", remap)
237 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
238 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
240 c.save_new(name=info["name"],
241 owner_uuid=self.arvrunner.project_uuid,
242 ensure_unique_name=True,
243 trash_at=info["trash_at"],
244 properties=info["properties"])
246 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
247 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
249 if srcobj.get("secondaryFiles"):
250 ab = self.collection_pattern % c.portable_data_hash()
251 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
254 for loc, sub in remap:
255 # subdirs start with "./", strip it off
256 if sub.startswith("./"):
257 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
259 ab = self.file_pattern % (c.portable_data_hash(), sub)
260 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
261 ab, "Directory", True)
265 def reversemap(self, target):
266 p = super(ArvPathMapper, self).reversemap(target)
269 elif target.startswith("keep:"):
270 return (target, target)
271 elif self.keepdir and target.startswith(self.keepdir):
272 kp = "keep:" + target[len(self.keepdir)+1:]
278 class StagingPathMapper(PathMapper):
279 # Note that StagingPathMapper internally maps files from target to source.
280 # Specifically, the 'self._pathmap' dict keys are the target location and the
281 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
282 # as the file identifier. This makes it possible to map an input file to multiple
283 # target directories. The exception is for file literals, which store the contents of
284 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
288 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
290 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
292 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
293 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
294 loc = obj["location"]
295 stagedir = obj.get("dirname") or stagedir
296 tgt = os.path.join(stagedir, obj["basename"])
297 basetgt, baseext = os.path.splitext(tgt)
300 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
301 def literalTargetExists():
302 return tgt in self.targets and "contents" in obj
305 if targetExists() or literalTargetExists():
306 while tgt in self.targets:
308 tgt = "%s_%i%s" % (basetgt, n, baseext)
309 self.targets.add(tgt)
310 if obj["class"] == "Directory":
311 if obj.get("writable"):
312 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
314 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
315 if loc.startswith("_:") or self._follow_dirs:
316 self.visitlisting(obj.get("listing", []), tgt, basedir)
317 elif obj["class"] == "File":
318 if tgt in self._pathmap:
320 if "contents" in obj and loc.startswith("_:"):
321 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
323 if copy or obj.get("writable"):
324 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
326 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
327 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
329 def mapper(self, src): # type: (Text) -> MapperEnt.
330 # Overridden to maintain the use case of mapping by source (identifier) to
331 # target regardless of how the map is structured interally.
332 def getMapperEnt(src):
333 for k,v in viewitems(self._pathmap):
334 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
339 v = getMapperEnt(src[i:])
340 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
341 return getMapperEnt(src)
344 class VwdPathMapper(StagingPathMapper):
345 def setup(self, referenced_files, basedir):
346 # type: (List[Any], unicode) -> None
348 # Go through each file and set the target to its own directory along
349 # with any secondary files.
350 self.visitlisting(referenced_files, self.stagedir, basedir)
352 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
353 if type in ("File", "Directory") and ab.startswith("keep:"):
354 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
357 class NoFollowPathMapper(StagingPathMapper):
359 def setup(self, referenced_files, basedir):
360 # type: (List[Any], unicode) -> None
361 self.visitlisting(referenced_files, self.stagedir, basedir)