1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
11 from arvados_cwl.util import get_current_container, get_intermediate_collection_info
12 import arvados.commands.run
13 import arvados.collection
15 from schema_salad.sourceline import SourceLine
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
21 from .http import http_to_keep
23 logger = logging.getLogger('arvados.cwl-runner')
25 def trim_listing(obj):
26 """Remove 'listing' field from Directory objects that are keep references.
28 When Directory objects represent Keep references, it is redundant and
29 potentially very expensive to pass fully enumerated Directory objects
30 between instances of cwl-runner (e.g. a submitting a job, or using the
31 RunInSingleContainer feature), so delete the 'listing' field when it is
36 if obj.get("location", "").startswith("keep:") and "listing" in obj:
40 class ArvPathMapper(PathMapper):
41 """Convert container-local paths to and from Keep collection ids."""
43 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
46 def __init__(self, arvrunner, referenced_files, input_basedir,
47 collection_pattern, file_pattern, name=None, single_collection=False):
48 self.arvrunner = arvrunner
49 self.input_basedir = input_basedir
50 self.collection_pattern = collection_pattern
51 self.file_pattern = file_pattern
53 self.referenced_files = [r["location"] for r in referenced_files]
54 self.single_collection = single_collection
55 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
57 def visit(self, srcobj, uploadfiles):
58 src = srcobj["location"]
60 src = src[:src.index("#")]
62 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
65 debug = logger.isEnabledFor(logging.DEBUG)
67 if src not in self._pathmap:
68 if src.startswith("file:"):
69 # Local FS ref, may need to be uploaded or may be on keep
71 ab = abspath(src, self.input_basedir)
72 st = arvados.commands.run.statfile("", ab,
73 fnPattern="keep:%s/%s",
74 dirPattern="keep:%s/%s",
76 with SourceLine(srcobj, "location", WorkflowException, debug):
77 if isinstance(st, arvados.commands.run.UploadFile):
78 uploadfiles.add((src, ab, st))
79 elif isinstance(st, arvados.commands.run.ArvFile):
80 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
82 raise WorkflowException("Input file path '%s' is invalid" % st)
83 elif src.startswith("_:"):
84 if srcobj["class"] == "File" and "contents" not in srcobj:
85 raise WorkflowException("File literal '%s' is missing `contents`" % src)
86 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88 elif src.startswith("http:") or src.startswith("https:"):
89 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90 logger.info("%s is %s", src, keepref)
91 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
93 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
95 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96 for l in srcobj.get("secondaryFiles", []):
97 self.visit(l, uploadfiles)
98 with SourceLine(srcobj, "listing", WorkflowException, debug):
99 for l in srcobj.get("listing", []):
100 self.visit(l, uploadfiles)
102 def addentry(self, obj, c, path, remap):
103 if obj["location"] in self._pathmap:
104 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
107 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108 remap.append((obj["location"], path + "/" + obj["basename"]))
109 for l in obj.get("secondaryFiles", []):
110 self.addentry(l, c, path, remap)
111 elif obj["class"] == "Directory":
112 for l in obj.get("listing", []):
113 self.addentry(l, c, path + "/" + obj["basename"], remap)
114 remap.append((obj["location"], path + "/" + obj["basename"]))
115 elif obj["location"].startswith("_:") and "contents" in obj:
116 with c.open(path + "/" + obj["basename"], "w") as f:
117 f.write(obj["contents"].encode("utf-8"))
118 remap.append((obj["location"], path + "/" + obj["basename"]))
120 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
122 def setup(self, referenced_files, basedir):
123 # type: (List[Any], unicode) -> None
127 if self.single_collection:
128 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
129 keep_client=self.arvrunner.keep_client,
130 num_retries=self.arvrunner.num_retries)
132 for srcobj in referenced_files:
133 self.visit(srcobj, uploadfiles)
135 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
138 num_retries=self.arvrunner.num_retries,
139 fnPattern="keep:%s/%s",
141 project=self.arvrunner.project_uuid,
142 collection=collection,
145 for src, ab, st in uploadfiles:
146 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147 "Directory" if os.path.isdir(ab) else "File", True)
149 for srcobj in referenced_files:
151 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
152 c = arvados.collection.Collection(api_client=self.arvrunner.api,
153 keep_client=self.arvrunner.keep_client,
154 num_retries=self.arvrunner.num_retries)
155 for l in srcobj.get("listing", []):
156 self.addentry(l, c, ".", remap)
158 container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
159 info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
161 c.save_new(name=info["name"],
162 owner_uuid=self.arvrunner.project_uuid,
163 ensure_unique_name=True,
164 trash_at=info["trash_at"],
165 properties=info["properties"])
167 ab = self.collection_pattern % c.portable_data_hash()
168 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
169 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
170 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
172 c = arvados.collection.Collection(api_client=self.arvrunner.api,
173 keep_client=self.arvrunner.keep_client,
174 num_retries=self.arvrunner.num_retries )
175 self.addentry(srcobj, c, ".", remap)
177 container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
178 info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
180 c.save_new(name=info["name"],
181 owner_uuid=self.arvrunner.project_uuid,
182 ensure_unique_name=True,
183 trash_at=info["trash_at"],
184 properties=info["properties"])
186 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
187 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
189 if srcobj.get("secondaryFiles"):
190 ab = self.collection_pattern % c.portable_data_hash()
191 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
194 for loc, sub in remap:
195 # subdirs start with "./", strip it off
196 if sub.startswith("./"):
197 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
199 ab = self.file_pattern % (c.portable_data_hash(), sub)
200 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
201 ab, "Directory", True)
205 def reversemap(self, target):
206 p = super(ArvPathMapper, self).reversemap(target)
209 elif target.startswith("keep:"):
210 return (target, target)
211 elif self.keepdir and target.startswith(self.keepdir):
212 kp = "keep:" + target[len(self.keepdir)+1:]
218 class StagingPathMapper(PathMapper):
221 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
223 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
225 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
226 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
227 loc = obj["location"]
228 tgt = os.path.join(stagedir, obj["basename"])
229 basetgt, baseext = os.path.splitext(tgt)
231 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
232 while tgt in self.targets:
234 tgt = "%s_%i%s" % (basetgt, n, baseext)
235 self.targets.add(tgt)
236 if obj["class"] == "Directory":
237 if obj.get("writable"):
238 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
240 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
241 if loc.startswith("_:") or self._follow_dirs:
242 self.visitlisting(obj.get("listing", []), tgt, basedir)
243 elif obj["class"] == "File":
244 if loc in self._pathmap:
246 if "contents" in obj and loc.startswith("_:"):
247 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
249 if copy or obj.get("writable"):
250 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
252 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
253 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
256 class VwdPathMapper(StagingPathMapper):
257 def setup(self, referenced_files, basedir):
258 # type: (List[Any], unicode) -> None
260 # Go through each file and set the target to its own directory along
261 # with any secondary files.
262 self.visitlisting(referenced_files, self.stagedir, basedir)
264 for path, (ab, tgt, type, staged) in self._pathmap.items():
265 if type in ("File", "Directory") and ab.startswith("keep:"):
266 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
269 class NoFollowPathMapper(StagingPathMapper):
271 def setup(self, referenced_files, basedir):
272 # type: (List[Any], unicode) -> None
273 self.visitlisting(referenced_files, self.stagedir, basedir)