18842: Add disk cache, test python sdk/fuse disk_cache=true/false
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import io
19 import logging
20 import math
21 import os
22 import pycurl
23 import queue
24 import re
25 import socket
26 import ssl
27 import sys
28 import threading
29 import resource
30 from . import timer
31 import urllib.parse
32
33 if sys.version_info >= (3, 0):
34     from io import BytesIO
35 else:
36     from cStringIO import StringIO as BytesIO
37
38 import arvados
39 import arvados.config as config
40 import arvados.errors
41 import arvados.retry as retry
42 import arvados.util
43 import arvados.diskcache
44
45 _logger = logging.getLogger('arvados.keep')
46 global_client_object = None
47
48
49 # Monkey patch TCP constants when not available (apple). Values sourced from:
50 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
51 if sys.platform == 'darwin':
52     if not hasattr(socket, 'TCP_KEEPALIVE'):
53         socket.TCP_KEEPALIVE = 0x010
54     if not hasattr(socket, 'TCP_KEEPINTVL'):
55         socket.TCP_KEEPINTVL = 0x101
56     if not hasattr(socket, 'TCP_KEEPCNT'):
57         socket.TCP_KEEPCNT = 0x102
58
59
60 class KeepLocator(object):
61     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
62     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
63
64     def __init__(self, locator_str):
65         self.hints = []
66         self._perm_sig = None
67         self._perm_expiry = None
68         pieces = iter(locator_str.split('+'))
69         self.md5sum = next(pieces)
70         try:
71             self.size = int(next(pieces))
72         except StopIteration:
73             self.size = None
74         for hint in pieces:
75             if self.HINT_RE.match(hint) is None:
76                 raise ValueError("invalid hint format: {}".format(hint))
77             elif hint.startswith('A'):
78                 self.parse_permission_hint(hint)
79             else:
80                 self.hints.append(hint)
81
82     def __str__(self):
83         return '+'.join(
84             native_str(s)
85             for s in [self.md5sum, self.size,
86                       self.permission_hint()] + self.hints
87             if s is not None)
88
89     def stripped(self):
90         if self.size is not None:
91             return "%s+%i" % (self.md5sum, self.size)
92         else:
93             return self.md5sum
94
95     def _make_hex_prop(name, length):
96         # Build and return a new property with the given name that
97         # must be a hex string of the given length.
98         data_name = '_{}'.format(name)
99         def getter(self):
100             return getattr(self, data_name)
101         def setter(self, hex_str):
102             if not arvados.util.is_hex(hex_str, length):
103                 raise ValueError("{} is not a {}-digit hex string: {!r}".
104                                  format(name, length, hex_str))
105             setattr(self, data_name, hex_str)
106         return property(getter, setter)
107
108     md5sum = _make_hex_prop('md5sum', 32)
109     perm_sig = _make_hex_prop('perm_sig', 40)
110
111     @property
112     def perm_expiry(self):
113         return self._perm_expiry
114
115     @perm_expiry.setter
116     def perm_expiry(self, value):
117         if not arvados.util.is_hex(value, 1, 8):
118             raise ValueError(
119                 "permission timestamp must be a hex Unix timestamp: {}".
120                 format(value))
121         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
122
123     def permission_hint(self):
124         data = [self.perm_sig, self.perm_expiry]
125         if None in data:
126             return None
127         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
128         return "A{}@{:08x}".format(*data)
129
130     def parse_permission_hint(self, s):
131         try:
132             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
133         except IndexError:
134             raise ValueError("bad permission hint {}".format(s))
135
136     def permission_expired(self, as_of_dt=None):
137         if self.perm_expiry is None:
138             return False
139         elif as_of_dt is None:
140             as_of_dt = datetime.datetime.now()
141         return self.perm_expiry <= as_of_dt
142
143
144 class Keep(object):
145     """Simple interface to a global KeepClient object.
146
147     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
148     own API client.  The global KeepClient will build an API client from the
149     current Arvados configuration, which may not match the one you built.
150     """
151     _last_key = None
152
153     @classmethod
154     def global_client_object(cls):
155         global global_client_object
156         # Previously, KeepClient would change its behavior at runtime based
157         # on these configuration settings.  We simulate that behavior here
158         # by checking the values and returning a new KeepClient if any of
159         # them have changed.
160         key = (config.get('ARVADOS_API_HOST'),
161                config.get('ARVADOS_API_TOKEN'),
162                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
163                config.get('ARVADOS_KEEP_PROXY'),
164                os.environ.get('KEEP_LOCAL_STORE'))
165         if (global_client_object is None) or (cls._last_key != key):
166             global_client_object = KeepClient()
167             cls._last_key = key
168         return global_client_object
169
170     @staticmethod
171     def get(locator, **kwargs):
172         return Keep.global_client_object().get(locator, **kwargs)
173
174     @staticmethod
175     def put(data, **kwargs):
176         return Keep.global_client_object().put(data, **kwargs)
177
178 class KeepBlockCache(object):
179     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
180         self.cache_max = cache_max
181         self._cache = []
182         self._cache_lock = threading.Lock()
183         self._max_slots = max_slots
184         self._disk_cache = disk_cache
185         self._disk_cache_dir = disk_cache_dir
186
187         if self._disk_cache and self._disk_cache_dir is None:
188             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
189             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
190
191         if self._max_slots == 0:
192             if self._disk_cache:
193                 # default set max slots to half of maximum file handles
194                 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
195             else:
196                 self._max_slots = 1024
197
198         if self.cache_max == 0:
199             if self._disk_cache:
200                 fs = os.statvfs(self._disk_cache_dir)
201                 avail = (fs.f_bavail * fs.f_bsize) / 2
202                 # Half the available space or max_slots * 64 MiB
203                 self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
204             else:
205                 # 256 GiB in RAM
206                 self.cache_max = (256 * 1024 * 1024)
207
208         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
209
210     class CacheSlot(object):
211         __slots__ = ("locator", "ready", "content")
212
213         def __init__(self, locator):
214             self.locator = locator
215             self.ready = threading.Event()
216             self.content = None
217
218         def get(self):
219             self.ready.wait()
220             return self.content
221
222         def set(self, value):
223             self.content = value
224             self.ready.set()
225
226         def size(self):
227             if self.content is None:
228                 return 0
229             else:
230                 return len(self.content)
231
232         def evict(self):
233             pass
234
235     def cap_cache(self):
236         '''Cap the cache size to self.cache_max'''
237         with self._cache_lock:
238             # Select all slots except those where ready.is_set() and content is
239             # None (that means there was an error reading the block).
240             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
241             sm = sum([slot.size() for slot in self._cache])
242             while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
243                 for i in range(len(self._cache)-1, -1, -1):
244                     if self._cache[i].ready.is_set():
245                         self._cache[i].evict()
246                         del self._cache[i]
247                         break
248                 sm = sum([slot.size() for slot in self._cache])
249
250     def _get(self, locator):
251         # Test if the locator is already in the cache
252         for i in range(0, len(self._cache)):
253             if self._cache[i].locator == locator:
254                 n = self._cache[i]
255                 if i != 0:
256                     # move it to the front
257                     del self._cache[i]
258                     self._cache.insert(0, n)
259                 return n
260         if self._disk_cache:
261             # see if it exists on disk
262             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
263             if n is not None:
264                 self._cache.insert(0, n)
265                 return n
266         return None
267
268     def get(self, locator):
269         with self._cache_lock:
270             return self._get(locator)
271
272     def reserve_cache(self, locator):
273         '''Reserve a cache slot for the specified locator,
274         or return the existing slot.'''
275         with self._cache_lock:
276             n = self._get(locator)
277             if n:
278                 return n, False
279             else:
280                 # Add a new cache slot for the locator
281                 if self._disk_cache:
282                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
283                 else:
284                     n = KeepBlockCache.CacheSlot(locator)
285                 self._cache.insert(0, n)
286                 return n, True
287
288 class Counter(object):
289     def __init__(self, v=0):
290         self._lk = threading.Lock()
291         self._val = v
292
293     def add(self, v):
294         with self._lk:
295             self._val += v
296
297     def get(self):
298         with self._lk:
299             return self._val
300
301
302 class KeepClient(object):
303
304     # Default Keep server connection timeout:  2 seconds
305     # Default Keep server read timeout:       256 seconds
306     # Default Keep server bandwidth minimum:  32768 bytes per second
307     # Default Keep proxy connection timeout:  20 seconds
308     # Default Keep proxy read timeout:        256 seconds
309     # Default Keep proxy bandwidth minimum:   32768 bytes per second
310     DEFAULT_TIMEOUT = (2, 256, 32768)
311     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
312
313
314     class KeepService(object):
315         """Make requests to a single Keep service, and track results.
316
317         A KeepService is intended to last long enough to perform one
318         transaction (GET or PUT) against one Keep service. This can
319         involve calling either get() or put() multiple times in order
320         to retry after transient failures. However, calling both get()
321         and put() on a single instance -- or using the same instance
322         to access two different Keep services -- will not produce
323         sensible behavior.
324         """
325
326         HTTP_ERRORS = (
327             socket.error,
328             ssl.SSLError,
329             arvados.errors.HttpError,
330         )
331
332         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
333                      upload_counter=None,
334                      download_counter=None,
335                      headers={},
336                      insecure=False):
337             self.root = root
338             self._user_agent_pool = user_agent_pool
339             self._result = {'error': None}
340             self._usable = True
341             self._session = None
342             self._socket = None
343             self.get_headers = {'Accept': 'application/octet-stream'}
344             self.get_headers.update(headers)
345             self.put_headers = headers
346             self.upload_counter = upload_counter
347             self.download_counter = download_counter
348             self.insecure = insecure
349
350         def usable(self):
351             """Is it worth attempting a request?"""
352             return self._usable
353
354         def finished(self):
355             """Did the request succeed or encounter permanent failure?"""
356             return self._result['error'] == False or not self._usable
357
358         def last_result(self):
359             return self._result
360
361         def _get_user_agent(self):
362             try:
363                 return self._user_agent_pool.get(block=False)
364             except queue.Empty:
365                 return pycurl.Curl()
366
367         def _put_user_agent(self, ua):
368             try:
369                 ua.reset()
370                 self._user_agent_pool.put(ua, block=False)
371             except:
372                 ua.close()
373
374         def _socket_open(self, *args, **kwargs):
375             if len(args) + len(kwargs) == 2:
376                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
377             else:
378                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
379
380         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
381             return self._socket_open_pycurl_7_21_5(
382                 purpose=None,
383                 address=collections.namedtuple(
384                     'Address', ['family', 'socktype', 'protocol', 'addr'],
385                 )(family, socktype, protocol, address))
386
387         def _socket_open_pycurl_7_21_5(self, purpose, address):
388             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
389             s = socket.socket(address.family, address.socktype, address.protocol)
390             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
391             # Will throw invalid protocol error on mac. This test prevents that.
392             if hasattr(socket, 'TCP_KEEPIDLE'):
393                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
394             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
395             self._socket = s
396             return s
397
398         def get(self, locator, method="GET", timeout=None):
399             # locator is a KeepLocator object.
400             url = self.root + str(locator)
401             _logger.debug("Request: %s %s", method, url)
402             curl = self._get_user_agent()
403             ok = None
404             try:
405                 with timer.Timer() as t:
406                     self._headers = {}
407                     response_body = BytesIO()
408                     curl.setopt(pycurl.NOSIGNAL, 1)
409                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
410                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
411                     curl.setopt(pycurl.URL, url.encode('utf-8'))
412                     curl.setopt(pycurl.HTTPHEADER, [
413                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
414                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
415                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
416                     if self.insecure:
417                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
418                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
419                     else:
420                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
421                     if method == "HEAD":
422                         curl.setopt(pycurl.NOBODY, True)
423                     self._setcurltimeouts(curl, timeout, method=="HEAD")
424
425                     try:
426                         curl.perform()
427                     except Exception as e:
428                         raise arvados.errors.HttpError(0, str(e))
429                     finally:
430                         if self._socket:
431                             self._socket.close()
432                             self._socket = None
433                     self._result = {
434                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
435                         'body': response_body.getvalue(),
436                         'headers': self._headers,
437                         'error': False,
438                     }
439
440                 ok = retry.check_http_response_success(self._result['status_code'])
441                 if not ok:
442                     self._result['error'] = arvados.errors.HttpError(
443                         self._result['status_code'],
444                         self._headers.get('x-status-line', 'Error'))
445             except self.HTTP_ERRORS as e:
446                 self._result = {
447                     'error': e,
448                 }
449             self._usable = ok != False
450             if self._result.get('status_code', None):
451                 # The client worked well enough to get an HTTP status
452                 # code, so presumably any problems are just on the
453                 # server side and it's OK to reuse the client.
454                 self._put_user_agent(curl)
455             else:
456                 # Don't return this client to the pool, in case it's
457                 # broken.
458                 curl.close()
459             if not ok:
460                 _logger.debug("Request fail: GET %s => %s: %s",
461                               url, type(self._result['error']), str(self._result['error']))
462                 return None
463             if method == "HEAD":
464                 _logger.info("HEAD %s: %s bytes",
465                          self._result['status_code'],
466                          self._result.get('content-length'))
467                 if self._result['headers'].get('x-keep-locator'):
468                     # This is a response to a remote block copy request, return
469                     # the local copy block locator.
470                     return self._result['headers'].get('x-keep-locator')
471                 return True
472
473             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
474                          self._result['status_code'],
475                          len(self._result['body']),
476                          t.msecs,
477                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
478
479             if self.download_counter:
480                 self.download_counter.add(len(self._result['body']))
481             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
482             if resp_md5 != locator.md5sum:
483                 _logger.warning("Checksum fail: md5(%s) = %s",
484                                 url, resp_md5)
485                 self._result['error'] = arvados.errors.HttpError(
486                     0, 'Checksum fail')
487                 return None
488             return self._result['body']
489
490         def put(self, hash_s, body, timeout=None, headers={}):
491             put_headers = copy.copy(self.put_headers)
492             put_headers.update(headers)
493             url = self.root + hash_s
494             _logger.debug("Request: PUT %s", url)
495             curl = self._get_user_agent()
496             ok = None
497             try:
498                 with timer.Timer() as t:
499                     self._headers = {}
500                     body_reader = BytesIO(body)
501                     response_body = BytesIO()
502                     curl.setopt(pycurl.NOSIGNAL, 1)
503                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
504                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
505                     curl.setopt(pycurl.URL, url.encode('utf-8'))
506                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
507                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
508                     # response) instead of sending the request body immediately.
509                     # This allows the server to reject the request if the request
510                     # is invalid or the server is read-only, without waiting for
511                     # the client to send the entire block.
512                     curl.setopt(pycurl.UPLOAD, True)
513                     curl.setopt(pycurl.INFILESIZE, len(body))
514                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
515                     curl.setopt(pycurl.HTTPHEADER, [
516                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
517                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
518                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
519                     if self.insecure:
520                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
521                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
522                     else:
523                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
524                     self._setcurltimeouts(curl, timeout)
525                     try:
526                         curl.perform()
527                     except Exception as e:
528                         raise arvados.errors.HttpError(0, str(e))
529                     finally:
530                         if self._socket:
531                             self._socket.close()
532                             self._socket = None
533                     self._result = {
534                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
535                         'body': response_body.getvalue().decode('utf-8'),
536                         'headers': self._headers,
537                         'error': False,
538                     }
539                 ok = retry.check_http_response_success(self._result['status_code'])
540                 if not ok:
541                     self._result['error'] = arvados.errors.HttpError(
542                         self._result['status_code'],
543                         self._headers.get('x-status-line', 'Error'))
544             except self.HTTP_ERRORS as e:
545                 self._result = {
546                     'error': e,
547                 }
548             self._usable = ok != False # still usable if ok is True or None
549             if self._result.get('status_code', None):
550                 # Client is functional. See comment in get().
551                 self._put_user_agent(curl)
552             else:
553                 curl.close()
554             if not ok:
555                 _logger.debug("Request fail: PUT %s => %s: %s",
556                               url, type(self._result['error']), str(self._result['error']))
557                 return False
558             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
559                          self._result['status_code'],
560                          len(body),
561                          t.msecs,
562                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
563             if self.upload_counter:
564                 self.upload_counter.add(len(body))
565             return True
566
567         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
568             if not timeouts:
569                 return
570             elif isinstance(timeouts, tuple):
571                 if len(timeouts) == 2:
572                     conn_t, xfer_t = timeouts
573                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
574                 else:
575                     conn_t, xfer_t, bandwidth_bps = timeouts
576             else:
577                 conn_t, xfer_t = (timeouts, timeouts)
578                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
579             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
580             if not ignore_bandwidth:
581                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
582                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
583
584         def _headerfunction(self, header_line):
585             if isinstance(header_line, bytes):
586                 header_line = header_line.decode('iso-8859-1')
587             if ':' in header_line:
588                 name, value = header_line.split(':', 1)
589                 name = name.strip().lower()
590                 value = value.strip()
591             elif self._headers:
592                 name = self._lastheadername
593                 value = self._headers[name] + ' ' + header_line.strip()
594             elif header_line.startswith('HTTP/'):
595                 name = 'x-status-line'
596                 value = header_line
597             else:
598                 _logger.error("Unexpected header line: %s", header_line)
599                 return
600             self._lastheadername = name
601             self._headers[name] = value
602             # Returning None implies all bytes were written
603
604
605     class KeepWriterQueue(queue.Queue):
606         def __init__(self, copies, classes=[]):
607             queue.Queue.__init__(self) # Old-style superclass
608             self.wanted_copies = copies
609             self.wanted_storage_classes = classes
610             self.successful_copies = 0
611             self.confirmed_storage_classes = {}
612             self.response = None
613             self.storage_classes_tracking = True
614             self.queue_data_lock = threading.RLock()
615             self.pending_tries = max(copies, len(classes))
616             self.pending_tries_notification = threading.Condition()
617
618         def write_success(self, response, replicas_nr, classes_confirmed):
619             with self.queue_data_lock:
620                 self.successful_copies += replicas_nr
621                 if classes_confirmed is None:
622                     self.storage_classes_tracking = False
623                 elif self.storage_classes_tracking:
624                     for st_class, st_copies in classes_confirmed.items():
625                         try:
626                             self.confirmed_storage_classes[st_class] += st_copies
627                         except KeyError:
628                             self.confirmed_storage_classes[st_class] = st_copies
629                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
630                 self.response = response
631             with self.pending_tries_notification:
632                 self.pending_tries_notification.notify_all()
633
634         def write_fail(self, ks):
635             with self.pending_tries_notification:
636                 self.pending_tries += 1
637                 self.pending_tries_notification.notify()
638
639         def pending_copies(self):
640             with self.queue_data_lock:
641                 return self.wanted_copies - self.successful_copies
642
643         def satisfied_classes(self):
644             with self.queue_data_lock:
645                 if not self.storage_classes_tracking:
646                     # Notifies disabled storage classes expectation to
647                     # the outer loop.
648                     return None
649             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
650
651         def pending_classes(self):
652             with self.queue_data_lock:
653                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
654                     return []
655                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
656                 for st_class, st_copies in self.confirmed_storage_classes.items():
657                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
658                         unsatisfied_classes.remove(st_class)
659                 return unsatisfied_classes
660
661         def get_next_task(self):
662             with self.pending_tries_notification:
663                 while True:
664                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
665                         # This notify_all() is unnecessary --
666                         # write_success() already called notify_all()
667                         # when pending<1 became true, so it's not
668                         # possible for any other thread to be in
669                         # wait() now -- but it's cheap insurance
670                         # against deadlock so we do it anyway:
671                         self.pending_tries_notification.notify_all()
672                         # Drain the queue and then raise Queue.Empty
673                         while True:
674                             self.get_nowait()
675                             self.task_done()
676                     elif self.pending_tries > 0:
677                         service, service_root = self.get_nowait()
678                         if service.finished():
679                             self.task_done()
680                             continue
681                         self.pending_tries -= 1
682                         return service, service_root
683                     elif self.empty():
684                         self.pending_tries_notification.notify_all()
685                         raise queue.Empty
686                     else:
687                         self.pending_tries_notification.wait()
688
689
690     class KeepWriterThreadPool(object):
691         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
692             self.total_task_nr = 0
693             if (not max_service_replicas) or (max_service_replicas >= copies):
694                 num_threads = 1
695             else:
696                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
697             _logger.debug("Pool max threads is %d", num_threads)
698             self.workers = []
699             self.queue = KeepClient.KeepWriterQueue(copies, classes)
700             # Create workers
701             for _ in range(num_threads):
702                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
703                 self.workers.append(w)
704
705         def add_task(self, ks, service_root):
706             self.queue.put((ks, service_root))
707             self.total_task_nr += 1
708
709         def done(self):
710             return self.queue.successful_copies, self.queue.satisfied_classes()
711
712         def join(self):
713             # Start workers
714             for worker in self.workers:
715                 worker.start()
716             # Wait for finished work
717             self.queue.join()
718
719         def response(self):
720             return self.queue.response
721
722
723     class KeepWriterThread(threading.Thread):
724         class TaskFailed(RuntimeError): pass
725
726         def __init__(self, queue, data, data_hash, timeout=None):
727             super(KeepClient.KeepWriterThread, self).__init__()
728             self.timeout = timeout
729             self.queue = queue
730             self.data = data
731             self.data_hash = data_hash
732             self.daemon = True
733
734         def run(self):
735             while True:
736                 try:
737                     service, service_root = self.queue.get_next_task()
738                 except queue.Empty:
739                     return
740                 try:
741                     locator, copies, classes = self.do_task(service, service_root)
742                 except Exception as e:
743                     if not isinstance(e, self.TaskFailed):
744                         _logger.exception("Exception in KeepWriterThread")
745                     self.queue.write_fail(service)
746                 else:
747                     self.queue.write_success(locator, copies, classes)
748                 finally:
749                     self.queue.task_done()
750
751         def do_task(self, service, service_root):
752             classes = self.queue.pending_classes()
753             headers = {}
754             if len(classes) > 0:
755                 classes.sort()
756                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
757             success = bool(service.put(self.data_hash,
758                                         self.data,
759                                         timeout=self.timeout,
760                                         headers=headers))
761             result = service.last_result()
762
763             if not success:
764                 if result.get('status_code'):
765                     _logger.debug("Request fail: PUT %s => %s %s",
766                                   self.data_hash,
767                                   result.get('status_code'),
768                                   result.get('body'))
769                 raise self.TaskFailed()
770
771             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
772                           str(threading.current_thread()),
773                           self.data_hash,
774                           len(self.data),
775                           service_root)
776             try:
777                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
778             except (KeyError, ValueError):
779                 replicas_stored = 1
780
781             classes_confirmed = {}
782             try:
783                 scch = result['headers']['x-keep-storage-classes-confirmed']
784                 for confirmation in scch.replace(' ', '').split(','):
785                     if '=' in confirmation:
786                         stored_class, stored_copies = confirmation.split('=')[:2]
787                         classes_confirmed[stored_class] = int(stored_copies)
788             except (KeyError, ValueError):
789                 # Storage classes confirmed header missing or corrupt
790                 classes_confirmed = None
791
792             return result['body'].strip(), replicas_stored, classes_confirmed
793
794
795     def __init__(self, api_client=None, proxy=None,
796                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
797                  api_token=None, local_store=None, block_cache=None,
798                  num_retries=0, session=None):
799         """Initialize a new KeepClient.
800
801         Arguments:
802         :api_client:
803           The API client to use to find Keep services.  If not
804           provided, KeepClient will build one from available Arvados
805           configuration.
806
807         :proxy:
808           If specified, this KeepClient will send requests to this Keep
809           proxy.  Otherwise, KeepClient will fall back to the setting of the
810           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
811           If you want to KeepClient does not use a proxy, pass in an empty
812           string.
813
814         :timeout:
815           The initial timeout (in seconds) for HTTP requests to Keep
816           non-proxy servers.  A tuple of three floats is interpreted as
817           (connection_timeout, read_timeout, minimum_bandwidth). A connection
818           will be aborted if the average traffic rate falls below
819           minimum_bandwidth bytes per second over an interval of read_timeout
820           seconds. Because timeouts are often a result of transient server
821           load, the actual connection timeout will be increased by a factor
822           of two on each retry.
823           Default: (2, 256, 32768).
824
825         :proxy_timeout:
826           The initial timeout (in seconds) for HTTP requests to
827           Keep proxies. A tuple of three floats is interpreted as
828           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
829           described above for adjusting connection timeouts on retry also
830           applies.
831           Default: (20, 256, 32768).
832
833         :api_token:
834           If you're not using an API client, but only talking
835           directly to a Keep proxy, this parameter specifies an API token
836           to authenticate Keep requests.  It is an error to specify both
837           api_client and api_token.  If you specify neither, KeepClient
838           will use one available from the Arvados configuration.
839
840         :local_store:
841           If specified, this KeepClient will bypass Keep
842           services, and save data to the named directory.  If unspecified,
843           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
844           environment variable.  If you want to ensure KeepClient does not
845           use local storage, pass in an empty string.  This is primarily
846           intended to mock a server for testing.
847
848         :num_retries:
849           The default number of times to retry failed requests.
850           This will be used as the default num_retries value when get() and
851           put() are called.  Default 0.
852         """
853         self.lock = threading.Lock()
854         if proxy is None:
855             if config.get('ARVADOS_KEEP_SERVICES'):
856                 proxy = config.get('ARVADOS_KEEP_SERVICES')
857             else:
858                 proxy = config.get('ARVADOS_KEEP_PROXY')
859         if api_token is None:
860             if api_client is None:
861                 api_token = config.get('ARVADOS_API_TOKEN')
862             else:
863                 api_token = api_client.api_token
864         elif api_client is not None:
865             raise ValueError(
866                 "can't build KeepClient with both API client and token")
867         if local_store is None:
868             local_store = os.environ.get('KEEP_LOCAL_STORE')
869
870         if api_client is None:
871             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
872         else:
873             self.insecure = api_client.insecure
874
875         self.block_cache = block_cache if block_cache else KeepBlockCache()
876         self.timeout = timeout
877         self.proxy_timeout = proxy_timeout
878         self._user_agent_pool = queue.LifoQueue()
879         self.upload_counter = Counter()
880         self.download_counter = Counter()
881         self.put_counter = Counter()
882         self.get_counter = Counter()
883         self.hits_counter = Counter()
884         self.misses_counter = Counter()
885         self._storage_classes_unsupported_warning = False
886         self._default_classes = []
887
888         if local_store:
889             self.local_store = local_store
890             self.head = self.local_store_head
891             self.get = self.local_store_get
892             self.put = self.local_store_put
893         else:
894             self.num_retries = num_retries
895             self.max_replicas_per_service = None
896             if proxy:
897                 proxy_uris = proxy.split()
898                 for i in range(len(proxy_uris)):
899                     if not proxy_uris[i].endswith('/'):
900                         proxy_uris[i] += '/'
901                     # URL validation
902                     url = urllib.parse.urlparse(proxy_uris[i])
903                     if not (url.scheme and url.netloc):
904                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
905                 self.api_token = api_token
906                 self._gateway_services = {}
907                 self._keep_services = [{
908                     'uuid': "00000-bi6l4-%015d" % idx,
909                     'service_type': 'proxy',
910                     '_service_root': uri,
911                     } for idx, uri in enumerate(proxy_uris)]
912                 self._writable_services = self._keep_services
913                 self.using_proxy = True
914                 self._static_services_list = True
915             else:
916                 # It's important to avoid instantiating an API client
917                 # unless we actually need one, for testing's sake.
918                 if api_client is None:
919                     api_client = arvados.api('v1')
920                 self.api_client = api_client
921                 self.api_token = api_client.api_token
922                 self._gateway_services = {}
923                 self._keep_services = None
924                 self._writable_services = None
925                 self.using_proxy = None
926                 self._static_services_list = False
927                 try:
928                     self._default_classes = [
929                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
930                 except KeyError:
931                     # We're talking to an old cluster
932                     pass
933
934     def current_timeout(self, attempt_number):
935         """Return the appropriate timeout to use for this client.
936
937         The proxy timeout setting if the backend service is currently a proxy,
938         the regular timeout setting otherwise.  The `attempt_number` indicates
939         how many times the operation has been tried already (starting from 0
940         for the first try), and scales the connection timeout portion of the
941         return value accordingly.
942
943         """
944         # TODO(twp): the timeout should be a property of a
945         # KeepService, not a KeepClient. See #4488.
946         t = self.proxy_timeout if self.using_proxy else self.timeout
947         if len(t) == 2:
948             return (t[0] * (1 << attempt_number), t[1])
949         else:
950             return (t[0] * (1 << attempt_number), t[1], t[2])
951     def _any_nondisk_services(self, service_list):
952         return any(ks.get('service_type', 'disk') != 'disk'
953                    for ks in service_list)
954
955     def build_services_list(self, force_rebuild=False):
956         if (self._static_services_list or
957               (self._keep_services and not force_rebuild)):
958             return
959         with self.lock:
960             try:
961                 keep_services = self.api_client.keep_services().accessible()
962             except Exception:  # API server predates Keep services.
963                 keep_services = self.api_client.keep_disks().list()
964
965             # Gateway services are only used when specified by UUID,
966             # so there's nothing to gain by filtering them by
967             # service_type.
968             self._gateway_services = {ks['uuid']: ks for ks in
969                                       keep_services.execute()['items']}
970             if not self._gateway_services:
971                 raise arvados.errors.NoKeepServersError()
972
973             # Precompute the base URI for each service.
974             for r in self._gateway_services.values():
975                 host = r['service_host']
976                 if not host.startswith('[') and host.find(':') >= 0:
977                     # IPv6 URIs must be formatted like http://[::1]:80/...
978                     host = '[' + host + ']'
979                 r['_service_root'] = "{}://{}:{:d}/".format(
980                     'https' if r['service_ssl_flag'] else 'http',
981                     host,
982                     r['service_port'])
983
984             _logger.debug(str(self._gateway_services))
985             self._keep_services = [
986                 ks for ks in self._gateway_services.values()
987                 if not ks.get('service_type', '').startswith('gateway:')]
988             self._writable_services = [ks for ks in self._keep_services
989                                        if not ks.get('read_only')]
990
991             # For disk type services, max_replicas_per_service is 1
992             # It is unknown (unlimited) for other service types.
993             if self._any_nondisk_services(self._writable_services):
994                 self.max_replicas_per_service = None
995             else:
996                 self.max_replicas_per_service = 1
997
998     def _service_weight(self, data_hash, service_uuid):
999         """Compute the weight of a Keep service endpoint for a data
1000         block with a known hash.
1001
1002         The weight is md5(h + u) where u is the last 15 characters of
1003         the service endpoint's UUID.
1004         """
1005         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1006
1007     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1008         """Return an array of Keep service endpoints, in the order in
1009         which they should be probed when reading or writing data with
1010         the given hash+hints.
1011         """
1012         self.build_services_list(force_rebuild)
1013
1014         sorted_roots = []
1015         # Use the services indicated by the given +K@... remote
1016         # service hints, if any are present and can be resolved to a
1017         # URI.
1018         for hint in locator.hints:
1019             if hint.startswith('K@'):
1020                 if len(hint) == 7:
1021                     sorted_roots.append(
1022                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1023                 elif len(hint) == 29:
1024                     svc = self._gateway_services.get(hint[2:])
1025                     if svc:
1026                         sorted_roots.append(svc['_service_root'])
1027
1028         # Sort the available local services by weight (heaviest first)
1029         # for this locator, and return their service_roots (base URIs)
1030         # in that order.
1031         use_services = self._keep_services
1032         if need_writable:
1033             use_services = self._writable_services
1034         self.using_proxy = self._any_nondisk_services(use_services)
1035         sorted_roots.extend([
1036             svc['_service_root'] for svc in sorted(
1037                 use_services,
1038                 reverse=True,
1039                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1040         _logger.debug("{}: {}".format(locator, sorted_roots))
1041         return sorted_roots
1042
1043     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1044         # roots_map is a dictionary, mapping Keep service root strings
1045         # to KeepService objects.  Poll for Keep services, and add any
1046         # new ones to roots_map.  Return the current list of local
1047         # root strings.
1048         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1049         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1050         for root in local_roots:
1051             if root not in roots_map:
1052                 roots_map[root] = self.KeepService(
1053                     root, self._user_agent_pool,
1054                     upload_counter=self.upload_counter,
1055                     download_counter=self.download_counter,
1056                     headers=headers,
1057                     insecure=self.insecure)
1058         return local_roots
1059
1060     @staticmethod
1061     def _check_loop_result(result):
1062         # KeepClient RetryLoops should save results as a 2-tuple: the
1063         # actual result of the request, and the number of servers available
1064         # to receive the request this round.
1065         # This method returns True if there's a real result, False if
1066         # there are no more servers available, otherwise None.
1067         if isinstance(result, Exception):
1068             return None
1069         result, tried_server_count = result
1070         if (result is not None) and (result is not False):
1071             return True
1072         elif tried_server_count < 1:
1073             _logger.info("No more Keep services to try; giving up")
1074             return False
1075         else:
1076             return None
1077
1078     def get_from_cache(self, loc_s):
1079         """Fetch a block only if is in the cache, otherwise return None."""
1080         locator = KeepLocator(loc_s)
1081         slot = self.block_cache.get(locator.md5sum)
1082         if slot is not None and slot.ready.is_set():
1083             return slot.get()
1084         else:
1085             return None
1086
1087     def refresh_signature(self, loc):
1088         """Ask Keep to get the remote block and return its local signature"""
1089         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1090         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1091
1092     @retry.retry_method
1093     def head(self, loc_s, **kwargs):
1094         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1095
1096     @retry.retry_method
1097     def get(self, loc_s, **kwargs):
1098         return self._get_or_head(loc_s, method="GET", **kwargs)
1099
1100     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1101         """Get data from Keep.
1102
1103         This method fetches one or more blocks of data from Keep.  It
1104         sends a request each Keep service registered with the API
1105         server (or the proxy provided when this client was
1106         instantiated), then each service named in location hints, in
1107         sequence.  As soon as one service provides the data, it's
1108         returned.
1109
1110         Arguments:
1111         * loc_s: A string of one or more comma-separated locators to fetch.
1112           This method returns the concatenation of these blocks.
1113         * num_retries: The number of times to retry GET requests to
1114           *each* Keep server if it returns temporary failures, with
1115           exponential backoff.  Note that, in each loop, the method may try
1116           to fetch data from every available Keep service, along with any
1117           that are named in location hints in the locator.  The default value
1118           is set when the KeepClient is initialized.
1119         """
1120         if ',' in loc_s:
1121             return ''.join(self.get(x) for x in loc_s.split(','))
1122
1123         self.get_counter.add(1)
1124
1125         request_id = (request_id or
1126                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1127                       arvados.util.new_request_id())
1128         if headers is None:
1129             headers = {}
1130         headers['X-Request-Id'] = request_id
1131
1132         slot = None
1133         blob = None
1134         try:
1135             locator = KeepLocator(loc_s)
1136             if method == "GET":
1137                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1138                 if not first:
1139                     if prefetch:
1140                         # this is request for a prefetch, if it is
1141                         # already in flight, return immediately.
1142                         # clear 'slot' to prevent finally block from
1143                         # calling slot.set()
1144                         slot = None
1145                         return None
1146                     self.hits_counter.add(1)
1147                     blob = slot.get()
1148                     if blob is None:
1149                         raise arvados.errors.KeepReadError(
1150                             "failed to read {}".format(loc_s))
1151                     return blob
1152
1153             self.misses_counter.add(1)
1154
1155             # If the locator has hints specifying a prefix (indicating a
1156             # remote keepproxy) or the UUID of a local gateway service,
1157             # read data from the indicated service(s) instead of the usual
1158             # list of local disk services.
1159             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1160                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1161             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1162                                for hint in locator.hints if (
1163                                        hint.startswith('K@') and
1164                                        len(hint) == 29 and
1165                                        self._gateway_services.get(hint[2:])
1166                                        )])
1167             # Map root URLs to their KeepService objects.
1168             roots_map = {
1169                 root: self.KeepService(root, self._user_agent_pool,
1170                                        upload_counter=self.upload_counter,
1171                                        download_counter=self.download_counter,
1172                                        headers=headers,
1173                                        insecure=self.insecure)
1174                 for root in hint_roots
1175             }
1176
1177             # See #3147 for a discussion of the loop implementation.  Highlights:
1178             # * Refresh the list of Keep services after each failure, in case
1179             #   it's being updated.
1180             # * Retry until we succeed, we're out of retries, or every available
1181             #   service has returned permanent failure.
1182             sorted_roots = []
1183             roots_map = {}
1184             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1185                                    backoff_start=2)
1186             for tries_left in loop:
1187                 try:
1188                     sorted_roots = self.map_new_services(
1189                         roots_map, locator,
1190                         force_rebuild=(tries_left < num_retries),
1191                         need_writable=False,
1192                         headers=headers)
1193                 except Exception as error:
1194                     loop.save_result(error)
1195                     continue
1196
1197                 # Query KeepService objects that haven't returned
1198                 # permanent failure, in our specified shuffle order.
1199                 services_to_try = [roots_map[root]
1200                                    for root in sorted_roots
1201                                    if roots_map[root].usable()]
1202                 for keep_service in services_to_try:
1203                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1204                     if blob is not None:
1205                         break
1206                 loop.save_result((blob, len(services_to_try)))
1207
1208             # Always cache the result, then return it if we succeeded.
1209             if loop.success():
1210                 return blob
1211         finally:
1212             if slot is not None:
1213                 slot.set(blob)
1214                 self.block_cache.cap_cache()
1215
1216         # Q: Including 403 is necessary for the Keep tests to continue
1217         # passing, but maybe they should expect KeepReadError instead?
1218         not_founds = sum(1 for key in sorted_roots
1219                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1220         service_errors = ((key, roots_map[key].last_result()['error'])
1221                           for key in sorted_roots)
1222         if not roots_map:
1223             raise arvados.errors.KeepReadError(
1224                 "[{}] failed to read {}: no Keep services available ({})".format(
1225                     request_id, loc_s, loop.last_result()))
1226         elif not_founds == len(sorted_roots):
1227             raise arvados.errors.NotFoundError(
1228                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1229         else:
1230             raise arvados.errors.KeepReadError(
1231                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1232
1233     @retry.retry_method
1234     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1235         """Save data in Keep.
1236
1237         This method will get a list of Keep services from the API server, and
1238         send the data to each one simultaneously in a new thread.  Once the
1239         uploads are finished, if enough copies are saved, this method returns
1240         the most recent HTTP response body.  If requests fail to upload
1241         enough copies, this method raises KeepWriteError.
1242
1243         Arguments:
1244         * data: The string of data to upload.
1245         * copies: The number of copies that the user requires be saved.
1246           Default 2.
1247         * num_retries: The number of times to retry PUT requests to
1248           *each* Keep server if it returns temporary failures, with
1249           exponential backoff.  The default value is set when the
1250           KeepClient is initialized.
1251         * classes: An optional list of storage class names where copies should
1252           be written.
1253         """
1254
1255         classes = classes or self._default_classes
1256
1257         if not isinstance(data, bytes):
1258             data = data.encode()
1259
1260         self.put_counter.add(1)
1261
1262         data_hash = hashlib.md5(data).hexdigest()
1263         loc_s = data_hash + '+' + str(len(data))
1264         if copies < 1:
1265             return loc_s
1266         locator = KeepLocator(loc_s)
1267
1268         request_id = (request_id or
1269                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1270                       arvados.util.new_request_id())
1271         headers = {
1272             'X-Request-Id': request_id,
1273             'X-Keep-Desired-Replicas': str(copies),
1274         }
1275         roots_map = {}
1276         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1277                                backoff_start=2)
1278         done_copies = 0
1279         done_classes = []
1280         for tries_left in loop:
1281             try:
1282                 sorted_roots = self.map_new_services(
1283                     roots_map, locator,
1284                     force_rebuild=(tries_left < num_retries),
1285                     need_writable=True,
1286                     headers=headers)
1287             except Exception as error:
1288                 loop.save_result(error)
1289                 continue
1290
1291             pending_classes = []
1292             if done_classes is not None:
1293                 pending_classes = list(set(classes) - set(done_classes))
1294             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1295                                                         data_hash=data_hash,
1296                                                         copies=copies - done_copies,
1297                                                         max_service_replicas=self.max_replicas_per_service,
1298                                                         timeout=self.current_timeout(num_retries - tries_left),
1299                                                         classes=pending_classes)
1300             for service_root, ks in [(root, roots_map[root])
1301                                      for root in sorted_roots]:
1302                 if ks.finished():
1303                     continue
1304                 writer_pool.add_task(ks, service_root)
1305             writer_pool.join()
1306             pool_copies, pool_classes = writer_pool.done()
1307             done_copies += pool_copies
1308             if (done_classes is not None) and (pool_classes is not None):
1309                 done_classes += pool_classes
1310                 loop.save_result(
1311                     (done_copies >= copies and set(done_classes) == set(classes),
1312                     writer_pool.total_task_nr))
1313             else:
1314                 # Old keepstore contacted without storage classes support:
1315                 # success is determined only by successful copies.
1316                 #
1317                 # Disable storage classes tracking from this point forward.
1318                 if not self._storage_classes_unsupported_warning:
1319                     self._storage_classes_unsupported_warning = True
1320                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1321                 done_classes = None
1322                 loop.save_result(
1323                     (done_copies >= copies, writer_pool.total_task_nr))
1324
1325         if loop.success():
1326             return writer_pool.response()
1327         if not roots_map:
1328             raise arvados.errors.KeepWriteError(
1329                 "[{}] failed to write {}: no Keep services available ({})".format(
1330                     request_id, data_hash, loop.last_result()))
1331         else:
1332             service_errors = ((key, roots_map[key].last_result()['error'])
1333                               for key in sorted_roots
1334                               if roots_map[key].last_result()['error'])
1335             raise arvados.errors.KeepWriteError(
1336                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1337                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1338
1339     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1340         """A stub for put().
1341
1342         This method is used in place of the real put() method when
1343         using local storage (see constructor's local_store argument).
1344
1345         copies and num_retries arguments are ignored: they are here
1346         only for the sake of offering the same call signature as
1347         put().
1348
1349         Data stored this way can be retrieved via local_store_get().
1350         """
1351         md5 = hashlib.md5(data).hexdigest()
1352         locator = '%s+%d' % (md5, len(data))
1353         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1354             f.write(data)
1355         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1356                   os.path.join(self.local_store, md5))
1357         return locator
1358
1359     def local_store_get(self, loc_s, num_retries=None):
1360         """Companion to local_store_put()."""
1361         try:
1362             locator = KeepLocator(loc_s)
1363         except ValueError:
1364             raise arvados.errors.NotFoundError(
1365                 "Invalid data locator: '%s'" % loc_s)
1366         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1367             return b''
1368         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1369             return f.read()
1370
1371     def local_store_head(self, loc_s, num_retries=None):
1372         """Companion to local_store_put()."""
1373         try:
1374             locator = KeepLocator(loc_s)
1375         except ValueError:
1376             raise arvados.errors.NotFoundError(
1377                 "Invalid data locator: '%s'" % loc_s)
1378         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1379             return True
1380         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1381             return True