1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
11 import arvados_cwl.util
12 import arvados.commands.run
13 import arvados.collection
15 from schema_salad.sourceline import SourceLine
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
21 from .http import http_to_keep
23 logger = logging.getLogger('arvados.cwl-runner')
25 def trim_listing(obj):
26 """Remove 'listing' field from Directory objects that are keep references.
28 When Directory objects represent Keep references, it is redundant and
29 potentially very expensive to pass fully enumerated Directory objects
30 between instances of cwl-runner (e.g. a submitting a job, or using the
31 RunInSingleContainer feature), so delete the 'listing' field when it is
36 if obj.get("location", "").startswith("keep:") and "listing" in obj:
40 class ArvPathMapper(PathMapper):
41 """Convert container-local paths to and from Keep collection ids."""
43 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
46 def __init__(self, arvrunner, referenced_files, input_basedir,
47 collection_pattern, file_pattern, name=None, single_collection=False):
48 self.arvrunner = arvrunner
49 self.input_basedir = input_basedir
50 self.collection_pattern = collection_pattern
51 self.file_pattern = file_pattern
53 self.referenced_files = [r["location"] for r in referenced_files]
54 self.single_collection = single_collection
55 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
57 def visit(self, srcobj, uploadfiles):
58 src = srcobj["location"]
60 src = src[:src.index("#")]
62 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
65 debug = logger.isEnabledFor(logging.DEBUG)
67 if src not in self._pathmap:
68 if src.startswith("file:"):
69 # Local FS ref, may need to be uploaded or may be on keep
71 ab = abspath(src, self.input_basedir)
72 st = arvados.commands.run.statfile("", ab,
73 fnPattern="keep:%s/%s",
74 dirPattern="keep:%s/%s",
76 with SourceLine(srcobj, "location", WorkflowException, debug):
77 if isinstance(st, arvados.commands.run.UploadFile):
78 uploadfiles.add((src, ab, st))
79 elif isinstance(st, arvados.commands.run.ArvFile):
80 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
82 raise WorkflowException("Input file path '%s' is invalid" % st)
83 elif src.startswith("_:"):
84 if srcobj["class"] == "File" and "contents" not in srcobj:
85 raise WorkflowException("File literal '%s' is missing `contents`" % src)
86 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88 elif src.startswith("http:") or src.startswith("https:"):
89 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90 logger.info("%s is %s", src, keepref)
91 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
93 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
95 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96 for l in srcobj.get("secondaryFiles", []):
97 self.visit(l, uploadfiles)
98 with SourceLine(srcobj, "listing", WorkflowException, debug):
99 for l in srcobj.get("listing", []):
100 self.visit(l, uploadfiles)
102 def addentry(self, obj, c, path, remap):
103 if obj["location"] in self._pathmap:
104 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
107 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108 remap.append((obj["location"], path + "/" + obj["basename"]))
109 for l in obj.get("secondaryFiles", []):
110 self.addentry(l, c, path, remap)
111 elif obj["class"] == "Directory":
112 for l in obj.get("listing", []):
113 self.addentry(l, c, path + "/" + obj["basename"], remap)
114 remap.append((obj["location"], path + "/" + obj["basename"]))
115 elif obj["location"].startswith("_:") and "contents" in obj:
116 with c.open(path + "/" + obj["basename"], "w") as f:
117 f.write(obj["contents"].encode("utf-8"))
118 remap.append((obj["location"], path + "/" + obj["basename"]))
120 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
122 def needs_new_collection(self, srcobj, prefix=""):
123 loc = srcobj["location"]
124 if loc.startswith("_:"):
127 if loc != prefix+srcobj["basename"]:
135 if srcobj["class"] == "File" and loc not in self._pathmap:
137 if srcobj.get("secondaryFiles"):
138 for s in srcobj["secondaryFiles"]:
139 if self.needs_new_collection(s, prefix):
141 if srcobj.get("listing"):
142 prefix = "%s%s/" % (prefix, srcobj["basename"])
143 for l in srcobj["listing"]:
144 if self.needs_new_collection(l, prefix):
148 def setup(self, referenced_files, basedir):
149 # type: (List[Any], unicode) -> None
153 if self.single_collection:
154 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
155 keep_client=self.arvrunner.keep_client,
156 num_retries=self.arvrunner.num_retries)
158 for srcobj in referenced_files:
159 self.visit(srcobj, uploadfiles)
161 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
164 num_retries=self.arvrunner.num_retries,
165 fnPattern="keep:%s/%s",
167 project=self.arvrunner.project_uuid,
168 collection=collection,
171 for src, ab, st in uploadfiles:
172 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
173 "Directory" if os.path.isdir(ab) else "File", True)
175 for srcobj in referenced_files:
177 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
178 c = arvados.collection.Collection(api_client=self.arvrunner.api,
179 keep_client=self.arvrunner.keep_client,
180 num_retries=self.arvrunner.num_retries)
181 for l in srcobj.get("listing", []):
182 self.addentry(l, c, ".", remap)
184 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
185 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
187 c.save_new(name=info["name"],
188 owner_uuid=self.arvrunner.project_uuid,
189 ensure_unique_name=True,
190 trash_at=info["trash_at"],
191 properties=info["properties"])
193 ab = self.collection_pattern % c.portable_data_hash()
194 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
195 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
196 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
198 if not self.needs_new_collection(srcobj):
201 c = arvados.collection.Collection(api_client=self.arvrunner.api,
202 keep_client=self.arvrunner.keep_client,
203 num_retries=self.arvrunner.num_retries )
204 self.addentry(srcobj, c, ".", remap)
206 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207 info = arvados_cwl.util.get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
209 c.save_new(name=info["name"],
210 owner_uuid=self.arvrunner.project_uuid,
211 ensure_unique_name=True,
212 trash_at=info["trash_at"],
213 properties=info["properties"])
215 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
216 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
218 if srcobj.get("secondaryFiles"):
219 ab = self.collection_pattern % c.portable_data_hash()
220 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
223 for loc, sub in remap:
224 # subdirs start with "./", strip it off
225 if sub.startswith("./"):
226 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
228 ab = self.file_pattern % (c.portable_data_hash(), sub)
229 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
230 ab, "Directory", True)
234 def reversemap(self, target):
235 p = super(ArvPathMapper, self).reversemap(target)
238 elif target.startswith("keep:"):
239 return (target, target)
240 elif self.keepdir and target.startswith(self.keepdir):
241 kp = "keep:" + target[len(self.keepdir)+1:]
247 class StagingPathMapper(PathMapper):
250 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
252 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
254 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
255 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
256 loc = obj["location"]
257 tgt = os.path.join(stagedir, obj["basename"])
258 basetgt, baseext = os.path.splitext(tgt)
260 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
261 while tgt in self.targets:
263 tgt = "%s_%i%s" % (basetgt, n, baseext)
264 self.targets.add(tgt)
265 if obj["class"] == "Directory":
266 if obj.get("writable"):
267 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
269 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
270 if loc.startswith("_:") or self._follow_dirs:
271 self.visitlisting(obj.get("listing", []), tgt, basedir)
272 elif obj["class"] == "File":
273 if loc in self._pathmap:
275 if "contents" in obj and loc.startswith("_:"):
276 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
278 if copy or obj.get("writable"):
279 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
281 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
282 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
285 class VwdPathMapper(StagingPathMapper):
286 def setup(self, referenced_files, basedir):
287 # type: (List[Any], unicode) -> None
289 # Go through each file and set the target to its own directory along
290 # with any secondary files.
291 self.visitlisting(referenced_files, self.stagedir, basedir)
293 for path, (ab, tgt, type, staged) in self._pathmap.items():
294 if type in ("File", "Directory") and ab.startswith("keep:"):
295 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
298 class NoFollowPathMapper(StagingPathMapper):
300 def setup(self, referenced_files, basedir):
301 # type: (List[Any], unicode) -> None
302 self.visitlisting(referenced_files, self.stagedir, basedir)