1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
11 import arvados_cwl.util
12 import arvados.commands.run
13 import arvados.collection
15 from schema_salad.sourceline import SourceLine
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
21 from .http import http_to_keep
23 logger = logging.getLogger('arvados.cwl-runner')
25 def trim_listing(obj):
26 """Remove 'listing' field from Directory objects that are keep references.
28 When Directory objects represent Keep references, it is redundant and
29 potentially very expensive to pass fully enumerated Directory objects
30 between instances of cwl-runner (e.g. a submitting a job, or using the
31 RunInSingleContainer feature), so delete the 'listing' field when it is
36 if obj.get("location", "").startswith("keep:") and "listing" in obj:
40 class ArvPathMapper(PathMapper):
41 """Convert container-local paths to and from Keep collection ids."""
43 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
46 def __init__(self, arvrunner, referenced_files, input_basedir,
47 collection_pattern, file_pattern, name=None, single_collection=False):
48 self.arvrunner = arvrunner
49 self.input_basedir = input_basedir
50 self.collection_pattern = collection_pattern
51 self.file_pattern = file_pattern
53 self.referenced_files = [r["location"] for r in referenced_files]
54 self.single_collection = single_collection
55 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
57 def visit(self, srcobj, uploadfiles):
58 src = srcobj["location"]
60 src = src[:src.index("#")]
62 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
65 debug = logger.isEnabledFor(logging.DEBUG)
67 if src not in self._pathmap:
68 if src.startswith("file:"):
69 # Local FS ref, may need to be uploaded or may be on keep
71 ab = abspath(src, self.input_basedir)
72 st = arvados.commands.run.statfile("", ab,
73 fnPattern="keep:%s/%s",
74 dirPattern="keep:%s/%s",
76 with SourceLine(srcobj, "location", WorkflowException, debug):
77 if isinstance(st, arvados.commands.run.UploadFile):
78 uploadfiles.add((src, ab, st))
79 elif isinstance(st, arvados.commands.run.ArvFile):
80 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
82 raise WorkflowException("Input file path '%s' is invalid" % st)
83 elif src.startswith("_:"):
84 if srcobj["class"] == "File" and "contents" not in srcobj:
85 raise WorkflowException("File literal '%s' is missing `contents`" % src)
86 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88 elif src.startswith("http:") or src.startswith("https:"):
89 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90 logger.info("%s is %s", src, keepref)
91 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
93 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
95 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96 for l in srcobj.get("secondaryFiles", []):
97 self.visit(l, uploadfiles)
98 with SourceLine(srcobj, "listing", WorkflowException, debug):
99 for l in srcobj.get("listing", []):
100 self.visit(l, uploadfiles)
102 def addentry(self, obj, c, path, remap):
103 if obj["location"] in self._pathmap:
104 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
107 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108 remap.append((obj["location"], path + "/" + obj["basename"]))
109 for l in obj.get("secondaryFiles", []):
110 self.addentry(l, c, path, remap)
111 elif obj["class"] == "Directory":
112 for l in obj.get("listing", []):
113 self.addentry(l, c, path + "/" + obj["basename"], remap)
114 remap.append((obj["location"], path + "/" + obj["basename"]))
115 elif obj["location"].startswith("_:") and "contents" in obj:
116 with c.open(path + "/" + obj["basename"], "w") as f:
117 f.write(obj["contents"].encode("utf-8"))
118 remap.append((obj["location"], path + "/" + obj["basename"]))
120 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
122 def needs_new_collection(self, srcobj, prefix=""):
123 """Check if files need to be staged into a new collection.
125 If all the files are in the same collection and in the same
126 paths they would be staged to, return False. Otherwise, a new
127 collection is needed with files copied/created in the
131 loc = srcobj["location"]
132 if loc.startswith("_:"):
135 if loc != prefix+srcobj["basename"]:
143 if srcobj["class"] == "File" and loc not in self._pathmap:
145 for s in srcobj.get("secondaryFiles", []):
146 if self.needs_new_collection(s, prefix):
148 if srcobj.get("listing"):
149 prefix = "%s%s/" % (prefix, srcobj["basename"])
150 for l in srcobj["listing"]:
151 if self.needs_new_collection(l, prefix):
155 def setup(self, referenced_files, basedir):
156 # type: (List[Any], unicode) -> None
160 if self.single_collection:
161 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
162 keep_client=self.arvrunner.keep_client,
163 num_retries=self.arvrunner.num_retries)
165 for srcobj in referenced_files:
166 self.visit(srcobj, uploadfiles)
168 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
171 num_retries=self.arvrunner.num_retries,
172 fnPattern="keep:%s/%s",
174 project=self.arvrunner.project_uuid,
175 collection=collection,
178 for src, ab, st in uploadfiles:
179 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
180 "Directory" if os.path.isdir(ab) else "File", True)
182 for srcobj in referenced_files:
184 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
185 c = arvados.collection.Collection(api_client=self.arvrunner.api,
186 keep_client=self.arvrunner.keep_client,
187 num_retries=self.arvrunner.num_retries)
188 for l in srcobj.get("listing", []):
189 self.addentry(l, c, ".", remap)
191 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
192 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
194 c.save_new(name=info["name"],
195 owner_uuid=self.arvrunner.project_uuid,
196 ensure_unique_name=True,
197 trash_at=info["trash_at"],
198 properties=info["properties"])
200 ab = self.collection_pattern % c.portable_data_hash()
201 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
202 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
203 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
205 # If all secondary files/directories are located in
206 # the same collection as the primary file and the
207 # paths and names that are consistent with staging,
208 # don't create a new collection.
209 if not self.needs_new_collection(srcobj):
212 c = arvados.collection.Collection(api_client=self.arvrunner.api,
213 keep_client=self.arvrunner.keep_client,
214 num_retries=self.arvrunner.num_retries )
215 self.addentry(srcobj, c, ".", remap)
217 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
218 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
220 c.save_new(name=info["name"],
221 owner_uuid=self.arvrunner.project_uuid,
222 ensure_unique_name=True,
223 trash_at=info["trash_at"],
224 properties=info["properties"])
226 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
227 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
229 if srcobj.get("secondaryFiles"):
230 ab = self.collection_pattern % c.portable_data_hash()
231 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
234 for loc, sub in remap:
235 # subdirs start with "./", strip it off
236 if sub.startswith("./"):
237 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
239 ab = self.file_pattern % (c.portable_data_hash(), sub)
240 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
241 ab, "Directory", True)
245 def reversemap(self, target):
246 p = super(ArvPathMapper, self).reversemap(target)
249 elif target.startswith("keep:"):
250 return (target, target)
251 elif self.keepdir and target.startswith(self.keepdir):
252 kp = "keep:" + target[len(self.keepdir)+1:]
258 class StagingPathMapper(PathMapper):
261 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
263 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
265 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
266 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
267 loc = obj["location"]
268 tgt = os.path.join(stagedir, obj["basename"])
269 basetgt, baseext = os.path.splitext(tgt)
271 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
272 while tgt in self.targets:
274 tgt = "%s_%i%s" % (basetgt, n, baseext)
275 self.targets.add(tgt)
276 if obj["class"] == "Directory":
277 if obj.get("writable"):
278 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
280 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
281 if loc.startswith("_:") or self._follow_dirs:
282 self.visitlisting(obj.get("listing", []), tgt, basedir)
283 elif obj["class"] == "File":
284 if loc in self._pathmap:
286 if "contents" in obj and loc.startswith("_:"):
287 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
289 if copy or obj.get("writable"):
290 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
292 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
293 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
296 class VwdPathMapper(StagingPathMapper):
297 def setup(self, referenced_files, basedir):
298 # type: (List[Any], unicode) -> None
300 # Go through each file and set the target to its own directory along
301 # with any secondary files.
302 self.visitlisting(referenced_files, self.stagedir, basedir)
304 for path, (ab, tgt, type, staged) in self._pathmap.items():
305 if type in ("File", "Directory") and ab.startswith("keep:"):
306 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
309 class NoFollowPathMapper(StagingPathMapper):
311 def setup(self, referenced_files, basedir):
312 # type: (List[Any], unicode) -> None
313 self.visitlisting(referenced_files, self.stagedir, basedir)