1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
11 import arvados.commands.run
12 import arvados.collection
14 from schema_salad.sourceline import SourceLine
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
19 logger = logging.getLogger('arvados.cwl-runner')
21 def trim_listing(obj):
22 """Remove 'listing' field from Directory objects that are keep references.
24 When Directory objects represent Keep references, it is redundant and
25 potentially very expensive to pass fully enumerated Directory objects
26 between instances of cwl-runner (e.g. a submitting a job, or using the
27 RunInSingleContainer feature), so delete the 'listing' field when it is
32 if obj.get("location", "").startswith("keep:") and "listing" in obj:
36 class ArvPathMapper(PathMapper):
37 """Convert container-local paths to and from Keep collection ids."""
39 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
42 def __init__(self, arvrunner, referenced_files, input_basedir,
43 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44 self.arvrunner = arvrunner
45 self.input_basedir = input_basedir
46 self.collection_pattern = collection_pattern
47 self.file_pattern = file_pattern
49 self.referenced_files = [r["location"] for r in referenced_files]
50 self.single_collection = single_collection
51 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
53 def visit(self, srcobj, uploadfiles):
54 src = srcobj["location"]
56 src = src[:src.index("#")]
58 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
61 debug = logger.isEnabledFor(logging.DEBUG)
63 if src not in self._pathmap:
64 if src.startswith("file:"):
65 # Local FS ref, may need to be uploaded or may be on keep
67 ab = abspath(src, self.input_basedir)
68 st = arvados.commands.run.statfile("", ab,
69 fnPattern="keep:%s/%s",
70 dirPattern="keep:%s/%s",
72 with SourceLine(srcobj, "location", WorkflowException, debug):
73 if isinstance(st, arvados.commands.run.UploadFile):
74 uploadfiles.add((src, ab, st))
75 elif isinstance(st, arvados.commands.run.ArvFile):
76 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
78 raise WorkflowException("Input file path '%s' is invalid" % st)
79 elif src.startswith("_:"):
80 if srcobj["class"] == "File" and "contents" not in srcobj:
81 raise WorkflowException("File literal '%s' is missing `contents`" % src)
82 if srcobj["class"] == "Directory" and "listing" not in srcobj:
83 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
85 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
87 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
88 for l in srcobj.get("secondaryFiles", []):
89 self.visit(l, uploadfiles)
90 with SourceLine(srcobj, "listing", WorkflowException, debug):
91 for l in srcobj.get("listing", []):
92 self.visit(l, uploadfiles)
94 def addentry(self, obj, c, path, remap):
95 if obj["location"] in self._pathmap:
96 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
99 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
100 remap.append((obj["location"], path + "/" + obj["basename"]))
101 for l in obj.get("secondaryFiles", []):
102 self.addentry(l, c, path, remap)
103 elif obj["class"] == "Directory":
104 for l in obj.get("listing", []):
105 self.addentry(l, c, path + "/" + obj["basename"], remap)
106 remap.append((obj["location"], path + "/" + obj["basename"]))
107 elif obj["location"].startswith("_:") and "contents" in obj:
108 with c.open(path + "/" + obj["basename"], "w") as f:
109 f.write(obj["contents"].encode("utf-8"))
111 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
113 def setup(self, referenced_files, basedir):
114 # type: (List[Any], unicode) -> None
118 if self.single_collection:
119 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
120 keep_client=self.arvrunner.keep_client,
121 num_retries=self.arvrunner.num_retries)
123 already_uploaded = self.arvrunner.get_uploaded()
125 for k in referenced_files:
127 if loc in already_uploaded:
128 v = already_uploaded[loc]
129 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
130 if self.single_collection:
131 basename = k["basename"]
132 if basename not in collection:
133 self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
134 copied_files.add((loc, basename, v.type))
136 for srcobj in referenced_files:
137 self.visit(srcobj, uploadfiles)
139 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
142 num_retries=self.arvrunner.num_retries,
143 fnPattern="keep:%s/%s",
145 project=self.arvrunner.project_uuid,
146 collection=collection)
148 for src, ab, st in uploadfiles:
149 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
150 "Directory" if os.path.isdir(ab) else "File", True)
151 self.arvrunner.add_uploaded(src, self._pathmap[src])
153 for loc, basename, cls in copied_files:
154 fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
155 self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
157 for srcobj in referenced_files:
159 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
160 c = arvados.collection.Collection(api_client=self.arvrunner.api,
161 keep_client=self.arvrunner.keep_client,
162 num_retries=self.arvrunner.num_retries)
163 for l in srcobj.get("listing", []):
164 self.addentry(l, c, ".", remap)
166 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
167 if not check["items"]:
168 c.save_new(owner_uuid=self.arvrunner.project_uuid)
170 ab = self.collection_pattern % c.portable_data_hash()
171 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
172 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
173 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
175 c = arvados.collection.Collection(api_client=self.arvrunner.api,
176 keep_client=self.arvrunner.keep_client,
177 num_retries=self.arvrunner.num_retries )
178 self.addentry(srcobj, c, ".", remap)
180 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
181 if not check["items"]:
182 c.save_new(owner_uuid=self.arvrunner.project_uuid)
184 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
185 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
187 if srcobj.get("secondaryFiles"):
188 ab = self.collection_pattern % c.portable_data_hash()
189 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
192 for loc, sub in remap:
193 # subdirs start with "./", strip it off
194 if sub.startswith("./"):
195 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
197 ab = self.file_pattern % (c.portable_data_hash(), sub)
198 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
199 ab, "Directory", True)
203 def reversemap(self, target):
204 p = super(ArvPathMapper, self).reversemap(target)
207 elif target.startswith("keep:"):
208 return (target, target)
209 elif self.keepdir and target.startswith(self.keepdir):
210 kp = "keep:" + target[len(self.keepdir)+1:]
215 class StagingPathMapper(PathMapper):
218 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
220 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
222 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
223 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
224 loc = obj["location"]
225 tgt = os.path.join(stagedir, obj["basename"])
226 basetgt, baseext = os.path.splitext(tgt)
228 while tgt in self.targets:
230 tgt = "%s_%i%s" % (basetgt, n, baseext)
231 self.targets.add(tgt)
232 if obj["class"] == "Directory":
233 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
234 if loc.startswith("_:") or self._follow_dirs:
235 self.visitlisting(obj.get("listing", []), tgt, basedir)
236 elif obj["class"] == "File":
237 if loc in self._pathmap:
239 if "contents" in obj and loc.startswith("_:"):
240 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
243 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
245 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
246 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
249 class VwdPathMapper(StagingPathMapper):
250 def setup(self, referenced_files, basedir):
251 # type: (List[Any], unicode) -> None
253 # Go through each file and set the target to its own directory along
254 # with any secondary files.
255 self.visitlisting(referenced_files, self.stagedir, basedir)
257 for path, (ab, tgt, type, staged) in self._pathmap.items():
258 if type in ("File", "Directory") and ab.startswith("keep:"):
259 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
262 class NoFollowPathMapper(StagingPathMapper):
264 def setup(self, referenced_files, basedir):
265 # type: (List[Any], unicode) -> None
266 self.visitlisting(referenced_files, self.stagedir, basedir)