1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues
18 from collections import OrderedDict
20 import ruamel.yaml as yaml
22 import cwltool.stdfsaccess
23 from cwltool.pathmapper import abspath
24 import cwltool.resolver
27 import arvados.collection
28 import arvados.arvfile
31 from googleapiclient.errors import HttpError
33 from schema_salad.ref_resolver import DefaultFetcher
35 logger = logging.getLogger('arvados.cwl-runner')
37 pdh_size = re.compile(r'([0-9a-f]{32})\+(\d+)(\+\S+)*')
39 class CollectionCache(object):
40 def __init__(self, api_client, keep_client, num_retries,
43 self.api_client = api_client
44 self.keep_client = keep_client
45 self.num_retries = num_retries
46 self.collections = OrderedDict()
47 self.lock = threading.Lock()
50 self.min_entries = min_entries
52 def set_cap(self, cap):
55 def cap_cache(self, required):
56 # ordered dict iterates from oldest to newest
57 for pdh, v in list(self.collections.items()):
58 available = self.cap - self.total
59 if available >= required or len(self.collections) < self.min_entries:
62 logger.debug("Evicting collection reader %s from cache (cap %s total %s required %s)", pdh, self.cap, self.total, required)
63 del self.collections[pdh]
68 if pdh not in self.collections:
69 m = pdh_size.match(pdh)
71 self.cap_cache(int(m.group(2)) * 128)
72 logger.debug("Creating collection reader for %s", pdh)
73 cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
74 keep_client=self.keep_client,
75 num_retries=self.num_retries)
76 sz = len(cr.manifest_text()) * 128
77 self.collections[pdh] = (cr, sz)
80 cr, sz = self.collections[pdh]
82 del self.collections[pdh]
83 self.collections[pdh] = (cr, sz)
87 class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
88 """Implement the cwltool FsAccess interface for Arvados Collections."""
90 def __init__(self, basedir, collection_cache=None):
91 super(CollectionFsAccess, self).__init__(basedir)
92 self.collection_cache = collection_cache
94 def get_collection(self, path):
95 sp = path.split("/", 1)
97 if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
99 return (self.collection_cache.get(pdh), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
103 def _match(self, collection, patternsegments, parent):
104 if not patternsegments:
107 if not isinstance(collection, arvados.collection.RichCollectionBase):
111 # iterate over the files and subcollections in 'collection'
112 for filename in collection:
113 if patternsegments[0] == '.':
114 # Pattern contains something like "./foo" so just shift
116 ret.extend(self._match(collection, patternsegments[1:], parent))
117 elif fnmatch.fnmatch(filename, patternsegments[0]):
118 cur = os.path.join(parent, filename)
119 if len(patternsegments) == 1:
122 ret.extend(self._match(collection[filename], patternsegments[1:], cur))
125 def glob(self, pattern):
126 collection, rest = self.get_collection(pattern)
127 if collection is not None and not rest:
129 patternsegments = rest.split("/")
130 return sorted(self._match(collection, patternsegments, "keep:" + collection.manifest_locator()))
132 def open(self, fn, mode):
133 collection, rest = self.get_collection(fn)
134 if collection is not None:
135 return collection.open(rest, mode)
137 return super(CollectionFsAccess, self).open(self._abs(fn), mode)
139 def exists(self, fn):
141 collection, rest = self.get_collection(fn)
142 except HttpError as err:
143 if err.resp.status == 404:
147 if collection is not None:
149 return collection.exists(rest)
153 return super(CollectionFsAccess, self).exists(fn)
155 def size(self, fn): # type: (unicode) -> bool
156 collection, rest = self.get_collection(fn)
157 if collection is not None:
159 arvfile = collection.find(rest)
160 if isinstance(arvfile, arvados.arvfile.ArvadosFile):
161 return arvfile.size()
162 raise IOError(errno.EINVAL, "Not a path to a file %s" % (fn))
164 return super(CollectionFsAccess, self).size(fn)
166 def isfile(self, fn): # type: (unicode) -> bool
167 collection, rest = self.get_collection(fn)
168 if collection is not None:
170 return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
174 return super(CollectionFsAccess, self).isfile(fn)
176 def isdir(self, fn): # type: (unicode) -> bool
177 collection, rest = self.get_collection(fn)
178 if collection is not None:
180 return isinstance(collection.find(rest), arvados.collection.RichCollectionBase)
184 return super(CollectionFsAccess, self).isdir(fn)
186 def listdir(self, fn): # type: (unicode) -> List[unicode]
187 collection, rest = self.get_collection(fn)
188 if collection is not None:
190 dir = collection.find(rest)
194 raise IOError(errno.ENOENT, "Directory '%s' in '%s' not found" % (rest, collection.portable_data_hash()))
195 if not isinstance(dir, arvados.collection.RichCollectionBase):
196 raise IOError(errno.ENOENT, "Path '%s' in '%s' is not a Directory" % (rest, collection.portable_data_hash()))
197 return [abspath(l, fn) for l in list(dir.keys())]
199 return super(CollectionFsAccess, self).listdir(fn)
201 def join(self, path, *paths): # type: (unicode, *unicode) -> unicode
202 if paths and paths[-1].startswith("keep:") and arvados.util.keep_locator_pattern.match(paths[-1][5:]):
204 return os.path.join(path, *paths)
206 def realpath(self, path):
207 if path.startswith("$(task.tmpdir)") or path.startswith("$(task.outdir)"):
209 collection, rest = self.get_collection(path)
210 if collection is not None:
213 return os.path.realpath(path)
215 class CollectionFetcher(DefaultFetcher):
216 def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
217 super(CollectionFetcher, self).__init__(cache, session)
218 self.api_client = api_client
219 self.fsaccess = fs_access
220 self.num_retries = num_retries
222 def fetch_text(self, url):
223 if url.startswith("keep:"):
224 with self.fsaccess.open(url, "r") as f:
226 if url.startswith("arvwf:"):
227 record = self.api_client.workflows().get(uuid=url[6:]).execute(num_retries=self.num_retries)
228 definition = record["definition"] + ('\nlabel: "%s"\n' % record["name"].replace('"', '\\"'))
230 return super(CollectionFetcher, self).fetch_text(url)
232 def check_exists(self, url):
234 if url.startswith("http://arvados.org/cwl"):
236 if url.startswith("keep:"):
237 return self.fsaccess.exists(url)
238 if url.startswith("arvwf:"):
239 if self.fetch_text(url):
241 except arvados.errors.NotFoundError:
244 logger.exception("Got unexpected exception checking if file exists:")
246 return super(CollectionFetcher, self).check_exists(url)
248 def urljoin(self, base_url, url):
252 urlsp = urllib.parse.urlsplit(url)
253 if urlsp.scheme or not base_url:
256 basesp = urllib.parse.urlsplit(base_url)
257 if basesp.scheme in ("keep", "arvwf"):
259 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
261 baseparts = basesp.path.split("/")
262 urlparts = urlsp.path.split("/") if urlsp.path else []
264 pdh = baseparts.pop(0)
266 if basesp.scheme == "keep" and not arvados.util.keep_locator_pattern.match(pdh):
267 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
269 if urlsp.path.startswith("/"):
273 if baseparts and urlsp.path:
276 path = "/".join([pdh] + baseparts + urlparts)
277 return urllib.parse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
279 return super(CollectionFetcher, self).urljoin(base_url, url)
281 schemes = [u"file", u"http", u"https", u"mailto", u"keep", u"arvwf"]
283 def supported_schemes(self): # type: () -> List[Text]
287 workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
288 pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
290 def collectionResolver(api_client, document_loader, uri, num_retries=4):
291 if uri.startswith("keep:") or uri.startswith("arvwf:"):
294 if workflow_uuid_pattern.match(uri):
295 return u"arvwf:%s#main" % (uri)
297 if pipeline_template_uuid_pattern.match(uri):
298 pt = api_client.pipeline_templates().get(uuid=uri).execute(num_retries=num_retries)
299 return u"keep:" + viewvalues(pt["components"])[0]["script_parameters"]["cwl:tool"]
302 if arvados.util.keep_locator_pattern.match(p[0]):
303 return u"keep:%s" % (uri)
305 if arvados.util.collection_uuid_pattern.match(p[0]):
306 return u"keep:%s%s" % (api_client.collections().
307 get(uuid=p[0]).execute()["portable_data_hash"],
310 return cwltool.resolver.tool_resolver(document_loader, uri)