1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from past.builtins import basestring
9 from future.utils import viewitems
15 import urllib.request, urllib.parse, urllib.error
17 import arvados_cwl.util
18 import arvados.commands.run
19 import arvados.collection
21 from schema_salad.sourceline import SourceLine
23 from arvados.errors import ApiError
24 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
25 from cwltool.workflow import WorkflowException
27 from .http import http_to_keep
29 logger = logging.getLogger('arvados.cwl-runner')
31 def trim_listing(obj):
32 """Remove 'listing' field from Directory objects that are keep references.
34 When Directory objects represent Keep references, it is redundant and
35 potentially very expensive to pass fully enumerated Directory objects
36 between instances of cwl-runner (e.g. a submitting a job, or using the
37 RunInSingleContainer feature), so delete the 'listing' field when it is
42 if obj.get("location", "").startswith("keep:") and "listing" in obj:
46 class ArvPathMapper(PathMapper):
47 """Convert container-local paths to and from Keep collection ids."""
49 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
50 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
52 def __init__(self, arvrunner, referenced_files, input_basedir,
53 collection_pattern, file_pattern, name=None, single_collection=False):
54 self.arvrunner = arvrunner
55 self.input_basedir = input_basedir
56 self.collection_pattern = collection_pattern
57 self.file_pattern = file_pattern
59 self.referenced_files = [r["location"] for r in referenced_files]
60 self.single_collection = single_collection
62 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
64 def visit(self, srcobj, uploadfiles):
65 src = srcobj["location"]
67 src = src[:src.index("#")]
69 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
70 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
71 if arvados_cwl.util.collectionUUID in srcobj:
72 self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
74 debug = logger.isEnabledFor(logging.DEBUG)
76 if src not in self._pathmap:
77 if src.startswith("file:"):
78 # Local FS ref, may need to be uploaded or may be on keep
80 ab = abspath(src, self.input_basedir)
81 st = arvados.commands.run.statfile("", ab,
82 fnPattern="keep:%s/%s",
83 dirPattern="keep:%s/%s",
85 with SourceLine(srcobj, "location", WorkflowException, debug):
86 if isinstance(st, arvados.commands.run.UploadFile):
87 uploadfiles.add((src, ab, st))
88 elif isinstance(st, arvados.commands.run.ArvFile):
89 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
91 raise WorkflowException("Input file path '%s' is invalid" % st)
92 elif src.startswith("_:"):
93 if srcobj["class"] == "File" and "contents" not in srcobj:
94 raise WorkflowException("File literal '%s' is missing `contents`" % src)
95 if srcobj["class"] == "Directory" and "listing" not in srcobj:
96 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
97 elif src.startswith("http:") or src.startswith("https:"):
98 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
99 logger.info("%s is %s", src, keepref)
100 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
102 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
104 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
105 for l in srcobj.get("secondaryFiles", []):
106 self.visit(l, uploadfiles)
107 with SourceLine(srcobj, "listing", WorkflowException, debug):
108 for l in srcobj.get("listing", []):
109 self.visit(l, uploadfiles)
111 def addentry(self, obj, c, path, remap):
112 if obj["location"] in self._pathmap:
113 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
116 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
117 remap.append((obj["location"], path + "/" + obj["basename"]))
118 for l in obj.get("secondaryFiles", []):
119 self.addentry(l, c, path, remap)
120 elif obj["class"] == "Directory":
121 for l in obj.get("listing", []):
122 self.addentry(l, c, path + "/" + obj["basename"], remap)
123 remap.append((obj["location"], path + "/" + obj["basename"]))
124 elif obj["location"].startswith("_:") and "contents" in obj:
125 with c.open(path + "/" + obj["basename"], "w") as f:
126 f.write(obj["contents"])
127 remap.append((obj["location"], path + "/" + obj["basename"]))
129 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
131 def needs_new_collection(self, srcobj, prefix=""):
132 """Check if files need to be staged into a new collection.
134 If all the files are in the same collection and in the same
135 paths they would be staged to, return False. Otherwise, a new
136 collection is needed with files copied/created in the
140 loc = srcobj["location"]
141 if loc.startswith("_:"):
144 if loc != prefix+srcobj["basename"]:
152 if srcobj["class"] == "File" and loc not in self._pathmap:
154 for s in srcobj.get("secondaryFiles", []):
155 if self.needs_new_collection(s, prefix):
157 if srcobj.get("listing"):
158 prefix = "%s%s/" % (prefix, srcobj["basename"])
159 for l in srcobj["listing"]:
160 if self.needs_new_collection(l, prefix):
164 def setup(self, referenced_files, basedir):
165 # type: (List[Any], unicode) -> None
169 if self.single_collection:
170 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
171 keep_client=self.arvrunner.keep_client,
172 num_retries=self.arvrunner.num_retries)
174 for srcobj in referenced_files:
175 self.visit(srcobj, uploadfiles)
177 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
180 num_retries=self.arvrunner.num_retries,
181 fnPattern="keep:%s/%s",
183 project=self.arvrunner.project_uuid,
184 collection=collection,
187 for src, ab, st in uploadfiles:
188 self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
189 "Directory" if os.path.isdir(ab) else "File", True)
191 for srcobj in referenced_files:
193 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
194 c = arvados.collection.Collection(api_client=self.arvrunner.api,
195 keep_client=self.arvrunner.keep_client,
196 num_retries=self.arvrunner.num_retries)
197 for l in srcobj.get("listing", []):
198 self.addentry(l, c, ".", remap)
200 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
201 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
203 c.save_new(name=info["name"],
204 owner_uuid=self.arvrunner.project_uuid,
205 ensure_unique_name=True,
206 trash_at=info["trash_at"],
207 properties=info["properties"])
209 ab = self.collection_pattern % c.portable_data_hash()
210 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
211 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
212 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
214 # If all secondary files/directories are located in
215 # the same collection as the primary file and the
216 # paths and names that are consistent with staging,
217 # don't create a new collection.
218 if not self.needs_new_collection(srcobj):
221 c = arvados.collection.Collection(api_client=self.arvrunner.api,
222 keep_client=self.arvrunner.keep_client,
223 num_retries=self.arvrunner.num_retries )
224 self.addentry(srcobj, c, ".", remap)
226 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
227 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
229 c.save_new(name=info["name"],
230 owner_uuid=self.arvrunner.project_uuid,
231 ensure_unique_name=True,
232 trash_at=info["trash_at"],
233 properties=info["properties"])
235 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
236 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
238 if srcobj.get("secondaryFiles"):
239 ab = self.collection_pattern % c.portable_data_hash()
240 self._pathmap["_:" + str(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
243 for loc, sub in remap:
244 # subdirs start with "./", strip it off
245 if sub.startswith("./"):
246 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
248 ab = self.file_pattern % (c.portable_data_hash(), sub)
249 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
250 ab, "Directory", True)
254 def reversemap(self, target):
255 p = super(ArvPathMapper, self).reversemap(target)
258 elif target.startswith("keep:"):
259 return (target, target)
260 elif self.keepdir and target.startswith(self.keepdir):
261 kp = "keep:" + target[len(self.keepdir)+1:]
267 class StagingPathMapper(PathMapper):
270 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
272 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
274 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
275 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
276 loc = obj["location"]
277 tgt = os.path.join(stagedir, obj["basename"])
278 basetgt, baseext = os.path.splitext(tgt)
280 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
281 while tgt in self.targets:
283 tgt = "%s_%i%s" % (basetgt, n, baseext)
284 self.targets.add(tgt)
285 if obj["class"] == "Directory":
286 if obj.get("writable"):
287 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
289 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
290 if loc.startswith("_:") or self._follow_dirs:
291 self.visitlisting(obj.get("listing", []), tgt, basedir)
292 elif obj["class"] == "File":
293 if loc in self._pathmap:
295 if "contents" in obj and loc.startswith("_:"):
296 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
298 if copy or obj.get("writable"):
299 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
301 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
302 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
305 class VwdPathMapper(StagingPathMapper):
306 def setup(self, referenced_files, basedir):
307 # type: (List[Any], unicode) -> None
309 # Go through each file and set the target to its own directory along
310 # with any secondary files.
311 self.visitlisting(referenced_files, self.stagedir, basedir)
313 for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
314 if type in ("File", "Directory") and ab.startswith("keep:"):
315 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
318 class NoFollowPathMapper(StagingPathMapper):
320 def setup(self, referenced_files, basedir):
321 # type: (List[Any], unicode) -> None
322 self.visitlisting(referenced_files, self.stagedir, basedir)