1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
25 from cwltool.workflow import WorkflowException
27 from .http import http_to_keep
29 logger = logging.getLogger('arvados.cwl-runner')
31 def trim_listing(obj):
32 """Remove 'listing' field from Directory objects that are keep references.
34 When Directory objects represent Keep references, it is redundant and
35 potentially very expensive to pass fully enumerated Directory objects
36 between instances of cwl-runner (e.g. a submitting a job, or using the
37 RunInSingleContainer feature), so delete the 'listing' field when it is
42 if obj.get("location", "").startswith("keep:") and "listing" in obj:
46 class ArvPathMapper(PathMapper):
47 """Convert container-local paths to and from Keep collection ids."""
49 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
50 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
52 def __init__(self, arvrunner, referenced_files, input_basedir,
53 collection_pattern, file_pattern, name=None, single_collection=False):
54 self.arvrunner = arvrunner
55 self.input_basedir = input_basedir
56 self.collection_pattern = collection_pattern
57 self.file_pattern = file_pattern
59 self.referenced_files = [r["location"] for r in referenced_files]
60 self.single_collection = single_collection
62 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
64 def visit(self, srcobj, uploadfiles):
65 src = srcobj["location"]
67 src = src[:src.index("#")]
69 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
70 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
71 if arvados_cwl.util.collectionUUID in srcobj:
72 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
74 debug = logger.isEnabledFor(logging.DEBUG)
76 if src not in self._pathmap:
77 if src.startswith("file:"):
78 # Local FS ref, may need to be uploaded or may be on keep
80 ab = abspath(src, self.input_basedir)
81 st = arvados.commands.run.statfile("", ab,
82 fnPattern="keep:%s/%s",
83 dirPattern="keep:%s/%s",
85 with SourceLine(srcobj, "location", WorkflowException, debug):
86 if isinstance(st, arvados.commands.run.UploadFile):
87 uploadfiles.add((src, ab, st))
88 elif isinstance(st, arvados.commands.run.ArvFile):
89 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
91 raise WorkflowException("Input file path '%s' is invalid" % st)
92 elif src.startswith("_:"):
93 if srcobj["class"] == "File" and "contents" not in srcobj:
94 raise WorkflowException("File literal '%s' is missing `contents`" % src)
95 if srcobj["class"] == "Directory" and "listing" not in srcobj:
96 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
97 elif src.startswith("http:") or src.startswith("https:"):
98 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
99 logger.info("%s is %s", src, keepref)
100 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
102 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
104 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
105 for l in srcobj.get("secondaryFiles", []):
106 self.visit(l, uploadfiles)
107 with SourceLine(srcobj, "listing", WorkflowException, debug):
108 for l in srcobj.get("listing", []):
109 self.visit(l, uploadfiles)
111 def addentry(self, obj, c, path, remap):
112 if obj["location"] in self._pathmap:
113 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
116 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
117 remap.append((obj["location"], path + "/" + obj["basename"]))
118 for l in obj.get("secondaryFiles", []):
119 self.addentry(l, c, path, remap)
120 elif obj["class"] == "Directory":
121 for l in obj.get("listing", []):
122 self.addentry(l, c, path + "/" + obj["basename"], remap)
123 remap.append((obj["location"], path + "/" + obj["basename"]))
124 elif obj["location"].startswith("_:") and "contents" in obj:
125 with c.open(path + "/" + obj["basename"], "w") as f:
126 f.write(obj["contents"])
127 remap.append((obj["location"], path + "/" + obj["basename"]))
129 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
131 def needs_new_collection(self, srcobj, prefix=""):
132 """Check if files need to be staged into a new collection.
134 If all the files are in the same collection and in the same
135 paths they would be staged to, return False. Otherwise, a new
136 collection is needed with files copied/created in the
140 loc = srcobj["location"]
141 if loc.startswith("_:"):
144 if loc != prefix+srcobj["basename"]:
152 if srcobj["class"] == "File" and loc not in self._pathmap:
154 for s in srcobj.get("secondaryFiles", []):
155 if self.needs_new_collection(s, prefix):
157 if srcobj.get("listing"):
158 prefix = "%s%s/" % (prefix, srcobj["basename"])
159 for l in srcobj["listing"]:
160 if self.needs_new_collection(l, prefix):
164 def setup(self, referenced_files, basedir):
165 # type: (List[Any], unicode) -> None
169 if self.single_collection:
170 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
171 keep_client=self.arvrunner.keep_client,
172 num_retries=self.arvrunner.num_retries)
174 for srcobj in referenced_files:
175 self.visit(srcobj, uploadfiles)
177 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
180 num_retries=self.arvrunner.num_retries,
181 fnPattern="keep:%s/%s",
183 project=self.arvrunner.project_uuid,
184 collection=collection,
187 for src, ab, st in uploadfiles:
188 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
189 "Directory" if os.path.isdir(ab) else "File", True)
191 for srcobj in referenced_files:
193 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
194 c = arvados.collection.Collection(api_client=self.arvrunner.api,
195 keep_client=self.arvrunner.keep_client,
196 num_retries=self.arvrunner.num_retries)
197 for l in srcobj.get("listing", []):
198 self.addentry(l, c, ".", remap)
200 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
201 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
203 c.save_new(name=info["name"],
204 owner_uuid=self.arvrunner.project_uuid,
205 ensure_unique_name=True,
206 trash_at=info["trash_at"],
207 properties=info["properties"])
209 ab = self.collection_pattern % c.portable_data_hash()
210 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
211 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
212 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
214 # If all secondary files/directories are located in
215 # the same collection as the primary file and the
216 # paths and names that are consistent with staging,
217 # don't create a new collection.
218 if not self.needs_new_collection(srcobj):
221 c = arvados.collection.Collection(api_client=self.arvrunner.api,
222 keep_client=self.arvrunner.keep_client,
223 num_retries=self.arvrunner.num_retries )
224 self.addentry(srcobj, c, ".", remap)
226 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
227 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
229 c.save_new(name=info["name"],
230 owner_uuid=self.arvrunner.project_uuid,
231 ensure_unique_name=True,
232 trash_at=info["trash_at"],
233 properties=info["properties"])
235 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
236 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
238 if srcobj.get("secondaryFiles"):
239 ab = self.collection_pattern % c.portable_data_hash()
240 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
243 for loc, sub in remap:
244 # subdirs start with "./", strip it off
245 if sub.startswith("./"):
246 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
248 ab = self.file_pattern % (c.portable_data_hash(), sub)
249 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
250 ab, "Directory", True)
254 def reversemap(self, target):
255 p = super(ArvPathMapper, self).reversemap(target)
258 elif target.startswith("keep:"):
259 return (target, target)
260 elif self.keepdir and target.startswith(self.keepdir):
261 kp = "keep:" + target[len(self.keepdir)+1:]
267 class StagingPathMapper(PathMapper):
268 # Note that StagingPathMapper internally maps files from target to source.
269 # Specifically, the 'self._pathmap' dict keys are the target location and the
270 # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
271 # as the file identifier. This makes it possible to map an input file to multiple
272 # target directories. The exception is for file literals, which store the contents of
273 # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
277 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
279 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
281 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
282 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
283 loc = obj["location"]
284 tgt = os.path.join(stagedir, obj["basename"])
285 basetgt, baseext = os.path.splitext(tgt)
288 return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
289 def literalTargetExists():
290 return tgt in self.targets and "contents" in obj
293 if targetExists() or literalTargetExists():
294 while tgt in self.targets:
296 tgt = "%s_%i%s" % (basetgt, n, baseext)
297 self.targets.add(tgt)
298 if obj["class"] == "Directory":
299 if obj.get("writable"):
300 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
302 self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
303 if loc.startswith("_:") or self._follow_dirs:
304 self.visitlisting(obj.get("listing", []), tgt, basedir)
305 elif obj["class"] == "File":
306 if tgt in self._pathmap:
308 if "contents" in obj and loc.startswith("_:"):
309 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
311 if copy or obj.get("writable"):
312 self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
314 self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
315 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
317 def mapper(self, src): # type: (Text) -> MapperEnt.
318 # Overridden to maintain the use case of mapping by source (identifier) to
319 # target regardless of how the map is structured interally.
320 def getMapperEnt(src):
321 for k,v in viewitems(self._pathmap):
322 if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
327 v = getMapperEnt(src[i:])
328 return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
329 return getMapperEnt(src)
332 class VwdPathMapper(StagingPathMapper):
333 def setup(self, referenced_files, basedir):
334 # type: (List[Any], unicode) -> None
336 # Go through each file and set the target to its own directory along
337 # with any secondary files.
338 self.visitlisting(referenced_files, self.stagedir, basedir)
340 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
341 if type in ("File", "Directory") and ab.startswith("keep:"):
342 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
345 class NoFollowPathMapper(StagingPathMapper):
347 def setup(self, referenced_files, basedir):
348 # type: (List[Any], unicode) -> None
349 self.visitlisting(referenced_files, self.stagedir, basedir)