1 from future import standard_library
2 standard_library.install_aliases()
3 from builtins import object
4 # Copyright (C) The Arvados Authors. All rights reserved.
6 # SPDX-License-Identifier: Apache-2.0
15 from collections import OrderedDict
17 import ruamel.yaml as yaml
19 import cwltool.stdfsaccess
20 from cwltool.pathmapper import abspath
21 import cwltool.resolver
24 import arvados.collection
25 import arvados.arvfile
28 from googleapiclient.errors import HttpError
30 from schema_salad.ref_resolver import DefaultFetcher
32 logger = logging.getLogger('arvados.cwl-runner')
34 pdh_size = re.compile(r'([0-9a-f]{32})\+(\d+)(\+\S+)*')
36 class CollectionCache(object):
37 def __init__(self, api_client, keep_client, num_retries,
40 self.api_client = api_client
41 self.keep_client = keep_client
42 self.num_retries = num_retries
43 self.collections = OrderedDict()
44 self.lock = threading.Lock()
47 self.min_entries = min_entries
49 def set_cap(self, cap):
52 def cap_cache(self, required):
53 # ordered dict iterates from oldest to newest
54 for pdh, v in list(self.collections.items()):
55 available = self.cap - self.total
56 if available >= required or len(self.collections) < self.min_entries:
59 logger.debug("Evicting collection reader %s from cache (cap %s total %s required %s)", pdh, self.cap, self.total, required)
60 del self.collections[pdh]
65 if pdh not in self.collections:
66 m = pdh_size.match(pdh)
68 self.cap_cache(int(m.group(2)) * 128)
69 logger.debug("Creating collection reader for %s", pdh)
70 cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
71 keep_client=self.keep_client,
72 num_retries=self.num_retries)
73 sz = len(cr.manifest_text()) * 128
74 self.collections[pdh] = (cr, sz)
77 cr, sz = self.collections[pdh]
79 del self.collections[pdh]
80 self.collections[pdh] = (cr, sz)
84 class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
85 """Implement the cwltool FsAccess interface for Arvados Collections."""
87 def __init__(self, basedir, collection_cache=None):
88 super(CollectionFsAccess, self).__init__(basedir)
89 self.collection_cache = collection_cache
91 def get_collection(self, path):
92 sp = path.split("/", 1)
94 if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
96 return (self.collection_cache.get(pdh), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
100 def _match(self, collection, patternsegments, parent):
101 if not patternsegments:
104 if not isinstance(collection, arvados.collection.RichCollectionBase):
108 # iterate over the files and subcollections in 'collection'
109 for filename in collection:
110 if patternsegments[0] == '.':
111 # Pattern contains something like "./foo" so just shift
113 ret.extend(self._match(collection, patternsegments[1:], parent))
114 elif fnmatch.fnmatch(filename, patternsegments[0]):
115 cur = os.path.join(parent, filename)
116 if len(patternsegments) == 1:
119 ret.extend(self._match(collection[filename], patternsegments[1:], cur))
122 def glob(self, pattern):
123 collection, rest = self.get_collection(pattern)
124 if collection is not None and not rest:
126 patternsegments = rest.split("/")
127 return sorted(self._match(collection, patternsegments, "keep:" + collection.manifest_locator()))
129 def open(self, fn, mode):
130 collection, rest = self.get_collection(fn)
131 if collection is not None:
132 return collection.open(rest, mode)
134 return super(CollectionFsAccess, self).open(self._abs(fn), mode)
136 def exists(self, fn):
138 collection, rest = self.get_collection(fn)
139 except HttpError as err:
140 if err.resp.status == 404:
144 if collection is not None:
146 return collection.exists(rest)
150 return super(CollectionFsAccess, self).exists(fn)
152 def size(self, fn): # type: (unicode) -> bool
153 collection, rest = self.get_collection(fn)
154 if collection is not None:
156 arvfile = collection.find(rest)
157 if isinstance(arvfile, arvados.arvfile.ArvadosFile):
158 return arvfile.size()
159 raise IOError(errno.EINVAL, "Not a path to a file %s" % (fn))
161 return super(CollectionFsAccess, self).size(fn)
163 def isfile(self, fn): # type: (unicode) -> bool
164 collection, rest = self.get_collection(fn)
165 if collection is not None:
167 return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
171 return super(CollectionFsAccess, self).isfile(fn)
173 def isdir(self, fn): # type: (unicode) -> bool
174 collection, rest = self.get_collection(fn)
175 if collection is not None:
177 return isinstance(collection.find(rest), arvados.collection.RichCollectionBase)
181 return super(CollectionFsAccess, self).isdir(fn)
183 def listdir(self, fn): # type: (unicode) -> List[unicode]
184 collection, rest = self.get_collection(fn)
185 if collection is not None:
187 dir = collection.find(rest)
191 raise IOError(errno.ENOENT, "Directory '%s' in '%s' not found" % (rest, collection.portable_data_hash()))
192 if not isinstance(dir, arvados.collection.RichCollectionBase):
193 raise IOError(errno.ENOENT, "Path '%s' in '%s' is not a Directory" % (rest, collection.portable_data_hash()))
194 return [abspath(l, fn) for l in list(dir.keys())]
196 return super(CollectionFsAccess, self).listdir(fn)
198 def join(self, path, *paths): # type: (unicode, *unicode) -> unicode
199 if paths and paths[-1].startswith("keep:") and arvados.util.keep_locator_pattern.match(paths[-1][5:]):
201 return os.path.join(path, *paths)
203 def realpath(self, path):
204 if path.startswith("$(task.tmpdir)") or path.startswith("$(task.outdir)"):
206 collection, rest = self.get_collection(path)
207 if collection is not None:
210 return os.path.realpath(path)
212 class CollectionFetcher(DefaultFetcher):
213 def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
214 super(CollectionFetcher, self).__init__(cache, session)
215 self.api_client = api_client
216 self.fsaccess = fs_access
217 self.num_retries = num_retries
219 def fetch_text(self, url):
220 if url.startswith("keep:"):
221 with self.fsaccess.open(url, "r") as f:
223 if url.startswith("arvwf:"):
224 record = self.api_client.workflows().get(uuid=url[6:]).execute(num_retries=self.num_retries)
225 definition = record["definition"] + ('\nlabel: "%s"\n' % record["name"].replace('"', '\\"'))
227 return super(CollectionFetcher, self).fetch_text(url)
229 def check_exists(self, url):
231 if url.startswith("http://arvados.org/cwl"):
233 if url.startswith("keep:"):
234 return self.fsaccess.exists(url)
235 if url.startswith("arvwf:"):
236 if self.fetch_text(url):
238 except arvados.errors.NotFoundError:
241 logger.exception("Got unexpected exception checking if file exists:")
243 return super(CollectionFetcher, self).check_exists(url)
245 def urljoin(self, base_url, url):
249 urlsp = urllib.parse.urlsplit(url)
250 if urlsp.scheme or not base_url:
253 basesp = urllib.parse.urlsplit(base_url)
254 if basesp.scheme in ("keep", "arvwf"):
256 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
258 baseparts = basesp.path.split("/")
259 urlparts = urlsp.path.split("/") if urlsp.path else []
261 pdh = baseparts.pop(0)
263 if basesp.scheme == "keep" and not arvados.util.keep_locator_pattern.match(pdh):
264 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
266 if urlsp.path.startswith("/"):
270 if baseparts and urlsp.path:
273 path = "/".join([pdh] + baseparts + urlparts)
274 return urllib.parse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
276 return super(CollectionFetcher, self).urljoin(base_url, url)
278 schemes = [u"file", u"http", u"https", u"mailto", u"keep", u"arvwf"]
280 def supported_schemes(self): # type: () -> List[Text]
284 workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
285 pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
287 def collectionResolver(api_client, document_loader, uri, num_retries=4):
288 if uri.startswith("keep:") or uri.startswith("arvwf:"):
291 if workflow_uuid_pattern.match(uri):
292 return "arvwf:%s#main" % (uri)
294 if pipeline_template_uuid_pattern.match(uri):
295 pt = api_client.pipeline_templates().get(uuid=uri).execute(num_retries=num_retries)
296 return "keep:" + list(pt["components"].values())[0]["script_parameters"]["cwl:tool"]
299 if arvados.util.keep_locator_pattern.match(p[0]):
300 return "keep:%s" % (uri)
302 if arvados.util.collection_uuid_pattern.match(p[0]):
303 return "keep:%s%s" % (api_client.collections().
304 get(uuid=p[0]).execute()["portable_data_hash"],
307 return cwltool.resolver.tool_resolver(document_loader, uri)