1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt
25 from cwltool.utils import adjustFileObjs, adjustDirObjs
26 from cwltool.stdfsaccess import abspath
27 from cwltool.workflow import WorkflowException
29 from .http import http_to_keep
31 logger = logging.getLogger('arvados.cwl-runner')
33 def trim_listing(obj):
34 """Remove 'listing' field from Directory objects that are keep references.
36 When Directory objects represent Keep references, it is redundant and
37 potentially very expensive to pass fully enumerated Directory objects
38 between instances of cwl-runner (e.g. a submitting a job, or using the
39 RunInSingleContainer feature), so delete the 'listing' field when it is
44 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 collection_pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
48 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
49 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
51 class ArvPathMapper(PathMapper):
52 """Convert container-local paths to and from Keep collection ids."""
54 def __init__(self, arvrunner, referenced_files, input_basedir,
55 collection_pattern, file_pattern, name=None, single_collection=False,
57 self.arvrunner = arvrunner
58 self.input_basedir = input_basedir
59 self.collection_pattern = collection_pattern
60 self.file_pattern = file_pattern
62 self.referenced_files = [r["location"] for r in referenced_files]
63 self.single_collection = single_collection
65 self.optional_deps = optional_deps or []
66 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
68 def visit(self, srcobj, uploadfiles):
69 src = srcobj["location"]
71 src = src[:src.index("#")]
73 debug = logger.isEnabledFor(logging.DEBUG)
75 if isinstance(src, basestring) and src.startswith("keep:"):
76 if collection_pdh_pattern.match(src):
77 #self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], srcobj["class"], True)
78 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
80 if arvados_cwl.util.collectionUUID in srcobj:
81 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
82 elif not collection_uuid_pattern.match(src):
83 with SourceLine(srcobj, "location", WorkflowException, debug):
84 raise WorkflowException("Invalid keep reference '%s'" % src)
86 if src not in self._pathmap:
87 if src.startswith("file:"):
88 # Local FS ref, may need to be uploaded or may be on keep
90 ab = abspath(src, self.input_basedir)
91 st = arvados.commands.run.statfile("", ab,
92 fnPattern="keep:%s/%s",
93 dirPattern="keep:%s/%s",
95 with SourceLine(srcobj, "location", WorkflowException, debug):
96 if isinstance(st, arvados.commands.run.UploadFile):
97 print("VV", (src, ab, st))
98 uploadfiles.add((src, ab, st))
99 elif isinstance(st, arvados.commands.run.ArvFile):
100 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
102 #self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File", True)
104 raise WorkflowException("Input file path '%s' is invalid" % st)
105 elif src.startswith("_:"):
106 if srcobj["class"] == "File" and "contents" not in srcobj:
107 raise WorkflowException("File literal '%s' is missing `contents`" % src)
108 if srcobj["class"] == "Directory" and "listing" not in srcobj:
109 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
110 elif src.startswith("http:") or src.startswith("https:"):
112 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
113 logger.info("%s is %s", src, keepref)
114 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
115 except Exception as e:
116 logger.warning(str(e))
118 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
120 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
121 for l in srcobj.get("secondaryFiles", []):
122 self.visit(l, uploadfiles)
123 with SourceLine(srcobj, "listing", WorkflowException, debug):
124 for l in srcobj.get("listing", []):
125 self.visit(l, uploadfiles)
127 def addentry(self, obj, c, path, remap):
128 print(obj["location"], self._pathmap)
129 if obj["location"] in self._pathmap:
130 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
133 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
134 remap.append((obj["location"], path + "/" + obj["basename"]))
135 for l in obj.get("secondaryFiles", []):
136 self.addentry(l, c, path, remap)
137 elif obj["class"] == "Directory":
138 for l in obj.get("listing", []):
139 self.addentry(l, c, path + "/" + obj["basename"], remap)
140 remap.append((obj["location"], path + "/" + obj["basename"]))
141 elif obj["location"].startswith("_:") and "contents" in obj:
142 with c.open(path + "/" + obj["basename"], "w") as f:
143 f.write(obj["contents"])
144 remap.append((obj["location"], path + "/" + obj["basename"]))
146 for opt in self.optional_deps:
147 if obj["location"] == opt["location"]:
149 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
151 def needs_new_collection(self, srcobj, prefix=""):
152 """Check if files need to be staged into a new collection.
154 If all the files are in the same collection and in the same
155 paths they would be staged to, return False. Otherwise, a new
156 collection is needed with files copied/created in the
160 loc = srcobj["location"]
161 if loc.startswith("_:"):
168 suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
173 print("LLL", prefix+suffix, prefix+urllib.parse.quote(srcobj["basename"], "/+@"))
174 if prefix+suffix != prefix+urllib.parse.quote(srcobj["basename"], "/+@"):
175 print("LLL -> needs new collection")
178 if srcobj["class"] == "File" and loc not in self._pathmap:
180 for s in srcobj.get("secondaryFiles", []):
181 if self.needs_new_collection(s, prefix):
183 if srcobj.get("listing"):
184 prefix = "%s%s/" % (prefix, srcobj["basename"])
185 for l in srcobj["listing"]:
186 if self.needs_new_collection(l, prefix):
190 def setup(self, referenced_files, basedir):
191 # type: (List[Any], unicode) -> None
195 if self.single_collection:
196 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
197 keep_client=self.arvrunner.keep_client,
198 num_retries=self.arvrunner.num_retries)
200 for srcobj in referenced_files:
201 self.visit(srcobj, uploadfiles)
203 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
206 num_retries=self.arvrunner.num_retries,
207 fnPattern="keep:%s/%s",
209 project=self.arvrunner.project_uuid,
210 collection=collection,
213 for src, ab, st in uploadfiles:
214 print("BBBBB", src, ab, st.fn, urllib.parse.quote(st.fn, "/:+@"))
215 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
216 "Directory" if os.path.isdir(ab) else "File", True)
218 print("CCCCC", self._pathmap)
220 for srcobj in referenced_files:
222 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
223 c = arvados.collection.Collection(api_client=self.arvrunner.api,
224 keep_client=self.arvrunner.keep_client,
225 num_retries=self.arvrunner.num_retries)
226 for l in srcobj.get("listing", []):
227 self.addentry(l, c, ".", remap)
229 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
230 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
232 c.save_new(name=info["name"],
233 owner_uuid=self.arvrunner.project_uuid,
234 ensure_unique_name=True,
235 trash_at=info["trash_at"],
236 properties=info["properties"])
238 ab = self.collection_pattern % c.portable_data_hash()
239 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
240 elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
241 c = arvados.collection.Collection(api_client=self.arvrunner.api,
242 keep_client=self.arvrunner.keep_client,
243 num_retries=self.arvrunner.num_retries)
244 self.addentry(srcobj, c, ".", remap)
246 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
247 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
249 c.save_new(name=info["name"],
250 owner_uuid=self.arvrunner.project_uuid,
251 ensure_unique_name=True,
252 trash_at=info["trash_at"],
253 properties=info["properties"])
255 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
256 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
258 if srcobj.get("secondaryFiles"):
259 ab = self.collection_pattern % c.portable_data_hash()
260 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
263 for loc, sub in remap:
264 # subdirs start with "./", strip it off
265 if sub.startswith("./"):
266 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
268 ab = self.file_pattern % (c.portable_data_hash(), sub)
269 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
270 ab, "Directory", True)
274 def reversemap(self, target):
275 p = super(ArvPathMapper, self).reversemap(target)
278 elif target.startswith("keep:"):
279 return (target, target)
280 elif self.keepdir and target.startswith(self.keepdir):
281 kp = "keep:" + target[len(self.keepdir)+1:]
287 class StagingPathMapper(PathMapper):
288 # Note that StagingPathMapper internally maps files from target to source.
289 # Specifically, the 'self._pathmap' dict keys are the target location and the
290 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
291 # as the file identifier. This makes it possible to map an input file to multiple
292 # target directories. The exception is for file literals, which store the contents of
293 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
297 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
299 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
301 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
302 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
303 loc = obj["location"]
304 stagedir = obj.get("dirname") or stagedir
305 tgt = os.path.join(stagedir, obj["basename"])
306 basetgt, baseext = os.path.splitext(tgt)
309 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
310 def literalTargetExists():
311 return tgt in self.targets and "contents" in obj
314 if targetExists() or literalTargetExists():
315 while tgt in self.targets:
317 tgt = "%s_%i%s" % (basetgt, n, baseext)
318 self.targets.add(tgt)
319 if obj["class"] == "Directory":
320 if obj.get("writable"):
321 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
323 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
324 if loc.startswith("_:") or self._follow_dirs:
325 self.visitlisting(obj.get("listing", []), tgt, basedir)
326 elif obj["class"] == "File":
327 if tgt in self._pathmap:
329 if "contents" in obj and loc.startswith("_:"):
330 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
332 if copy or obj.get("writable"):
333 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
335 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
336 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
338 def mapper(self, src): # type: (Text) -> MapperEnt.
339 # Overridden to maintain the use case of mapping by source (identifier) to
340 # target regardless of how the map is structured interally.
341 def getMapperEnt(src):
342 for k,v in viewitems(self._pathmap):
343 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
348 v = getMapperEnt(src[i:])
349 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
350 return getMapperEnt(src)
353 class VwdPathMapper(StagingPathMapper):
354 def setup(self, referenced_files, basedir):
355 # type: (List[Any], unicode) -> None
357 # Go through each file and set the target to its own directory along
358 # with any secondary files.
359 self.visitlisting(referenced_files, self.stagedir, basedir)
361 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
362 if type in ("File", "Directory") and ab.startswith("keep:"):
363 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
366 class NoFollowPathMapper(StagingPathMapper):
368 def setup(self, referenced_files, basedir):
369 # type: (List[Any], unicode) -> None
370 self.visitlisting(referenced_files, self.stagedir, basedir)