1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
11 import arvados.commands.run
12 import arvados.collection
14 from schema_salad.sourceline import SourceLine
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
19 from .http import http_to_keep
21 logger = logging.getLogger('arvados.cwl-runner')
23 def trim_listing(obj):
24 """Remove 'listing' field from Directory objects that are keep references.
26 When Directory objects represent Keep references, it is redundant and
27 potentially very expensive to pass fully enumerated Directory objects
28 between instances of cwl-runner (e.g. a submitting a job, or using the
29 RunInSingleContainer feature), so delete the 'listing' field when it is
34 if obj.get("location", "").startswith("keep:") and "listing" in obj:
38 class ArvPathMapper(PathMapper):
39 """Convert container-local paths to and from Keep collection ids."""
41 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
42 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
44 def __init__(self, arvrunner, referenced_files, input_basedir,
45 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
46 self.arvrunner = arvrunner
47 self.input_basedir = input_basedir
48 self.collection_pattern = collection_pattern
49 self.file_pattern = file_pattern
51 self.referenced_files = [r["location"] for r in referenced_files]
52 self.single_collection = single_collection
53 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
55 def visit(self, srcobj, uploadfiles):
56 src = srcobj["location"]
58 src = src[:src.index("#")]
60 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
61 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
63 debug = logger.isEnabledFor(logging.DEBUG)
65 if src not in self._pathmap:
66 if src.startswith("file:"):
67 # Local FS ref, may need to be uploaded or may be on keep
69 ab = abspath(src, self.input_basedir)
70 st = arvados.commands.run.statfile("", ab,
71 fnPattern="keep:%s/%s",
72 dirPattern="keep:%s/%s",
74 with SourceLine(srcobj, "location", WorkflowException, debug):
75 if isinstance(st, arvados.commands.run.UploadFile):
76 uploadfiles.add((src, ab, st))
77 elif isinstance(st, arvados.commands.run.ArvFile):
78 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
80 raise WorkflowException("Input file path '%s' is invalid" % st)
81 elif src.startswith("_:"):
82 if srcobj["class"] == "File" and "contents" not in srcobj:
83 raise WorkflowException("File literal '%s' is missing `contents`" % src)
84 if srcobj["class"] == "Directory" and "listing" not in srcobj:
85 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
86 elif src.startswith("http:") or src.startswith("https:"):
87 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
88 logger.info("%s is %s", src, keepref)
89 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
91 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
93 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
94 for l in srcobj.get("secondaryFiles", []):
95 self.visit(l, uploadfiles)
96 with SourceLine(srcobj, "listing", WorkflowException, debug):
97 for l in srcobj.get("listing", []):
98 self.visit(l, uploadfiles)
100 def addentry(self, obj, c, path, remap):
101 if obj["location"] in self._pathmap:
102 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
105 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
106 remap.append((obj["location"], path + "/" + obj["basename"]))
107 for l in obj.get("secondaryFiles", []):
108 self.addentry(l, c, path, remap)
109 elif obj["class"] == "Directory":
110 for l in obj.get("listing", []):
111 self.addentry(l, c, path + "/" + obj["basename"], remap)
112 remap.append((obj["location"], path + "/" + obj["basename"]))
113 elif obj["location"].startswith("_:") and "contents" in obj:
114 with c.open(path + "/" + obj["basename"], "w") as f:
115 f.write(obj["contents"].encode("utf-8"))
116 remap.append((obj["location"], path + "/" + obj["basename"]))
118 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
120 def setup(self, referenced_files, basedir):
121 # type: (List[Any], unicode) -> None
125 if self.single_collection:
126 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
127 keep_client=self.arvrunner.keep_client,
128 num_retries=self.arvrunner.num_retries)
130 for srcobj in referenced_files:
131 self.visit(srcobj, uploadfiles)
133 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
136 num_retries=self.arvrunner.num_retries,
137 fnPattern="keep:%s/%s",
139 project=self.arvrunner.project_uuid,
140 collection=collection,
143 for src, ab, st in uploadfiles:
144 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
145 "Directory" if os.path.isdir(ab) else "File", True)
147 for srcobj in referenced_files:
149 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
150 c = arvados.collection.Collection(api_client=self.arvrunner.api,
151 keep_client=self.arvrunner.keep_client,
152 num_retries=self.arvrunner.num_retries)
153 for l in srcobj.get("listing", []):
154 self.addentry(l, c, ".", remap)
156 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
157 if not check["items"]:
158 c.save_new(owner_uuid=self.arvrunner.project_uuid)
160 ab = self.collection_pattern % c.portable_data_hash()
161 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
162 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
163 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
165 c = arvados.collection.Collection(api_client=self.arvrunner.api,
166 keep_client=self.arvrunner.keep_client,
167 num_retries=self.arvrunner.num_retries )
168 self.addentry(srcobj, c, ".", remap)
170 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
171 if not check["items"]:
172 c.save_new(owner_uuid=self.arvrunner.project_uuid)
174 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
175 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
177 if srcobj.get("secondaryFiles"):
178 ab = self.collection_pattern % c.portable_data_hash()
179 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
182 for loc, sub in remap:
183 # subdirs start with "./", strip it off
184 if sub.startswith("./"):
185 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
187 ab = self.file_pattern % (c.portable_data_hash(), sub)
188 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
189 ab, "Directory", True)
193 def reversemap(self, target):
194 p = super(ArvPathMapper, self).reversemap(target)
197 elif target.startswith("keep:"):
198 return (target, target)
199 elif self.keepdir and target.startswith(self.keepdir):
200 kp = "keep:" + target[len(self.keepdir)+1:]
205 class StagingPathMapper(PathMapper):
208 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
210 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
212 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
213 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
214 loc = obj["location"]
215 tgt = os.path.join(stagedir, obj["basename"])
216 basetgt, baseext = os.path.splitext(tgt)
218 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
219 while tgt in self.targets:
221 tgt = "%s_%i%s" % (basetgt, n, baseext)
222 self.targets.add(tgt)
223 if obj["class"] == "Directory":
224 if obj.get("writable"):
225 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
227 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
228 if loc.startswith("_:") or self._follow_dirs:
229 self.visitlisting(obj.get("listing", []), tgt, basedir)
230 elif obj["class"] == "File":
231 if loc in self._pathmap:
233 if "contents" in obj and loc.startswith("_:"):
234 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
236 if copy or obj.get("writable"):
237 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
239 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
240 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
243 class VwdPathMapper(StagingPathMapper):
244 def setup(self, referenced_files, basedir):
245 # type: (List[Any], unicode) -> None
247 # Go through each file and set the target to its own directory along
248 # with any secondary files.
249 self.visitlisting(referenced_files, self.stagedir, basedir)
251 for path, (ab, tgt, type, staged) in self._pathmap.items():
252 if type in ("File", "Directory") and ab.startswith("keep:"):
253 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
256 class NoFollowPathMapper(StagingPathMapper):
258 def setup(self, referenced_files, basedir):
259 # type: (List[Any], unicode) -> None
260 self.visitlisting(referenced_files, self.stagedir, basedir)