1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
12 import arvados.commands.run
13 import arvados.collection
15 from schema_salad.sourceline import SourceLine
17 from arvados.errors import ApiError
18 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
19 from cwltool.workflow import WorkflowException
21 from .http import http_to_keep
23 logger = logging.getLogger('arvados.cwl-runner')
25 def trim_listing(obj):
26 """Remove 'listing' field from Directory objects that are keep references.
28 When Directory objects represent Keep references, it is redundant and
29 potentially very expensive to pass fully enumerated Directory objects
30 between instances of cwl-runner (e.g. a submitting a job, or using the
31 RunInSingleContainer feature), so delete the 'listing' field when it is
36 if obj.get("location", "").startswith("keep:") and "listing" in obj:
40 class ArvPathMapper(PathMapper):
41 """Convert container-local paths to and from Keep collection ids."""
43 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
44 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
46 def __init__(self, arvrunner, referenced_files, input_basedir,
47 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
48 self.arvrunner = arvrunner
49 self.input_basedir = input_basedir
50 self.collection_pattern = collection_pattern
51 self.file_pattern = file_pattern
53 self.referenced_files = [r["location"] for r in referenced_files]
54 self.single_collection = single_collection
55 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
57 def visit(self, srcobj, uploadfiles):
58 src = srcobj["location"]
60 src = src[:src.index("#")]
62 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
63 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
65 debug = logger.isEnabledFor(logging.DEBUG)
67 if src not in self._pathmap:
68 if src.startswith("file:"):
69 # Local FS ref, may need to be uploaded or may be on keep
71 ab = abspath(src, self.input_basedir)
72 st = arvados.commands.run.statfile("", ab,
73 fnPattern="keep:%s/%s",
74 dirPattern="keep:%s/%s",
76 with SourceLine(srcobj, "location", WorkflowException, debug):
77 if isinstance(st, arvados.commands.run.UploadFile):
78 uploadfiles.add((src, ab, st))
79 elif isinstance(st, arvados.commands.run.ArvFile):
80 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
82 raise WorkflowException("Input file path '%s' is invalid" % st)
83 elif src.startswith("_:"):
84 if srcobj["class"] == "File" and "contents" not in srcobj:
85 raise WorkflowException("File literal '%s' is missing `contents`" % src)
86 if srcobj["class"] == "Directory" and "listing" not in srcobj:
87 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
88 elif src.startswith("http:") or src.startswith("https:"):
89 keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
90 logger.info("%s is %s", src, keepref)
91 self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
93 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
95 with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
96 for l in srcobj.get("secondaryFiles", []):
97 self.visit(l, uploadfiles)
98 with SourceLine(srcobj, "listing", WorkflowException, debug):
99 for l in srcobj.get("listing", []):
100 self.visit(l, uploadfiles)
102 def addentry(self, obj, c, path, remap):
103 if obj["location"] in self._pathmap:
104 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
107 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
108 remap.append((obj["location"], path + "/" + obj["basename"]))
109 for l in obj.get("secondaryFiles", []):
110 self.addentry(l, c, path, remap)
111 elif obj["class"] == "Directory":
112 for l in obj.get("listing", []):
113 self.addentry(l, c, path + "/" + obj["basename"], remap)
114 remap.append((obj["location"], path + "/" + obj["basename"]))
115 elif obj["location"].startswith("_:") and "contents" in obj:
116 with c.open(path + "/" + obj["basename"], "w") as f:
117 f.write(obj["contents"].encode("utf-8"))
118 remap.append((obj["location"], path + "/" + obj["basename"]))
120 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
122 def setup(self, referenced_files, basedir):
123 # type: (List[Any], unicode) -> None
127 if self.single_collection:
128 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
129 keep_client=self.arvrunner.keep_client,
130 num_retries=self.arvrunner.num_retries)
132 for srcobj in referenced_files:
133 self.visit(srcobj, uploadfiles)
135 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
138 num_retries=self.arvrunner.num_retries,
139 fnPattern="keep:%s/%s",
141 project=self.arvrunner.project_uuid,
142 collection=collection,
145 for src, ab, st in uploadfiles:
146 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
147 "Directory" if os.path.isdir(ab) else "File", True)
149 for srcobj in referenced_files:
151 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
152 c = arvados.collection.Collection(api_client=self.arvrunner.api,
153 keep_client=self.arvrunner.keep_client,
154 num_retries=self.arvrunner.num_retries)
155 for l in srcobj.get("listing", []):
156 self.addentry(l, c, ".", remap)
158 info = self._get_intermediate_collection_info()
160 c.save_new(name=info["name"],
161 owner_uuid=self.arvrunner.project_uuid,
162 ensure_unique_name=True,
163 trash_at=info["trash_at"],
164 properties=info["properties"])
166 ab = self.collection_pattern % c.portable_data_hash()
167 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
168 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
169 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
171 c = arvados.collection.Collection(api_client=self.arvrunner.api,
172 keep_client=self.arvrunner.keep_client,
173 num_retries=self.arvrunner.num_retries )
174 self.addentry(srcobj, c, ".", remap)
176 info = self._get_intermediate_collection_info()
178 c.save_new(name=info["name"],
179 owner_uuid=self.arvrunner.project_uuid,
180 ensure_unique_name=True,
181 trash_at=info["trash_at"],
182 properties=info["properties"])
184 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
185 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
187 if srcobj.get("secondaryFiles"):
188 ab = self.collection_pattern % c.portable_data_hash()
189 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
192 for loc, sub in remap:
193 # subdirs start with "./", strip it off
194 if sub.startswith("./"):
195 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
197 ab = self.file_pattern % (c.portable_data_hash(), sub)
198 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
199 ab, "Directory", True)
203 def reversemap(self, target):
204 p = super(ArvPathMapper, self).reversemap(target)
207 elif target.startswith("keep:"):
208 return (target, target)
209 elif self.keepdir and target.startswith(self.keepdir):
210 kp = "keep:" + target[len(self.keepdir)+1:]
215 def _get_intermediate_collection_info(self):
217 if self.arvrunner.intermediate_output_ttl > 0:
218 trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
220 current_container_uuid = None
222 current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
223 current_container_uuid = current_container['uuid']
224 except ApiError as e:
225 # Status code 404 just means we're not running in a container.
226 if e.resp.status != 404:
227 logger.info("Getting current container: %s", e)
228 props = {"type": "Intermediate",
229 "container": current_container_uuid}
231 return {"name" : "Intermediate collection",
232 "trash_at" : trash_time,
233 "properties" : props}
236 class StagingPathMapper(PathMapper):
239 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
241 super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
243 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
244 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
245 loc = obj["location"]
246 tgt = os.path.join(stagedir, obj["basename"])
247 basetgt, baseext = os.path.splitext(tgt)
249 if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
250 while tgt in self.targets:
252 tgt = "%s_%i%s" % (basetgt, n, baseext)
253 self.targets.add(tgt)
254 if obj["class"] == "Directory":
255 if obj.get("writable"):
256 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
258 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
259 if loc.startswith("_:") or self._follow_dirs:
260 self.visitlisting(obj.get("listing", []), tgt, basedir)
261 elif obj["class"] == "File":
262 if loc in self._pathmap:
264 if "contents" in obj and loc.startswith("_:"):
265 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
267 if copy or obj.get("writable"):
268 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
270 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
271 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
274 class VwdPathMapper(StagingPathMapper):
275 def setup(self, referenced_files, basedir):
276 # type: (List[Any], unicode) -> None
278 # Go through each file and set the target to its own directory along
279 # with any secondary files.
280 self.visitlisting(referenced_files, self.stagedir, basedir)
282 for path, (ab, tgt, type, staged) in self._pathmap.items():
283 if type in ("File", "Directory") and ab.startswith("keep:"):
284 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
287 class NoFollowPathMapper(StagingPathMapper):
289 def setup(self, referenced_files, basedir):
290 # type: (List[Any], unicode) -> None
291 self.visitlisting(referenced_files, self.stagedir, basedir)