18842: When starting up the disk cache, map in everything
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import io
19 import logging
20 import math
21 import os
22 import pycurl
23 import queue
24 import re
25 import socket
26 import ssl
27 import sys
28 import threading
29 import resource
30 from . import timer
31 import urllib.parse
32
33 if sys.version_info >= (3, 0):
34     from io import BytesIO
35 else:
36     from cStringIO import StringIO as BytesIO
37
38 import arvados
39 import arvados.config as config
40 import arvados.errors
41 import arvados.retry as retry
42 import arvados.util
43 import arvados.diskcache
44
45 _logger = logging.getLogger('arvados.keep')
46 global_client_object = None
47
48
49 # Monkey patch TCP constants when not available (apple). Values sourced from:
50 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
51 if sys.platform == 'darwin':
52     if not hasattr(socket, 'TCP_KEEPALIVE'):
53         socket.TCP_KEEPALIVE = 0x010
54     if not hasattr(socket, 'TCP_KEEPINTVL'):
55         socket.TCP_KEEPINTVL = 0x101
56     if not hasattr(socket, 'TCP_KEEPCNT'):
57         socket.TCP_KEEPCNT = 0x102
58
59
60 class KeepLocator(object):
61     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
62     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
63
64     def __init__(self, locator_str):
65         self.hints = []
66         self._perm_sig = None
67         self._perm_expiry = None
68         pieces = iter(locator_str.split('+'))
69         self.md5sum = next(pieces)
70         try:
71             self.size = int(next(pieces))
72         except StopIteration:
73             self.size = None
74         for hint in pieces:
75             if self.HINT_RE.match(hint) is None:
76                 raise ValueError("invalid hint format: {}".format(hint))
77             elif hint.startswith('A'):
78                 self.parse_permission_hint(hint)
79             else:
80                 self.hints.append(hint)
81
82     def __str__(self):
83         return '+'.join(
84             native_str(s)
85             for s in [self.md5sum, self.size,
86                       self.permission_hint()] + self.hints
87             if s is not None)
88
89     def stripped(self):
90         if self.size is not None:
91             return "%s+%i" % (self.md5sum, self.size)
92         else:
93             return self.md5sum
94
95     def _make_hex_prop(name, length):
96         # Build and return a new property with the given name that
97         # must be a hex string of the given length.
98         data_name = '_{}'.format(name)
99         def getter(self):
100             return getattr(self, data_name)
101         def setter(self, hex_str):
102             if not arvados.util.is_hex(hex_str, length):
103                 raise ValueError("{} is not a {}-digit hex string: {!r}".
104                                  format(name, length, hex_str))
105             setattr(self, data_name, hex_str)
106         return property(getter, setter)
107
108     md5sum = _make_hex_prop('md5sum', 32)
109     perm_sig = _make_hex_prop('perm_sig', 40)
110
111     @property
112     def perm_expiry(self):
113         return self._perm_expiry
114
115     @perm_expiry.setter
116     def perm_expiry(self, value):
117         if not arvados.util.is_hex(value, 1, 8):
118             raise ValueError(
119                 "permission timestamp must be a hex Unix timestamp: {}".
120                 format(value))
121         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
122
123     def permission_hint(self):
124         data = [self.perm_sig, self.perm_expiry]
125         if None in data:
126             return None
127         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
128         return "A{}@{:08x}".format(*data)
129
130     def parse_permission_hint(self, s):
131         try:
132             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
133         except IndexError:
134             raise ValueError("bad permission hint {}".format(s))
135
136     def permission_expired(self, as_of_dt=None):
137         if self.perm_expiry is None:
138             return False
139         elif as_of_dt is None:
140             as_of_dt = datetime.datetime.now()
141         return self.perm_expiry <= as_of_dt
142
143
144 class Keep(object):
145     """Simple interface to a global KeepClient object.
146
147     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
148     own API client.  The global KeepClient will build an API client from the
149     current Arvados configuration, which may not match the one you built.
150     """
151     _last_key = None
152
153     @classmethod
154     def global_client_object(cls):
155         global global_client_object
156         # Previously, KeepClient would change its behavior at runtime based
157         # on these configuration settings.  We simulate that behavior here
158         # by checking the values and returning a new KeepClient if any of
159         # them have changed.
160         key = (config.get('ARVADOS_API_HOST'),
161                config.get('ARVADOS_API_TOKEN'),
162                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
163                config.get('ARVADOS_KEEP_PROXY'),
164                os.environ.get('KEEP_LOCAL_STORE'))
165         if (global_client_object is None) or (cls._last_key != key):
166             global_client_object = KeepClient()
167             cls._last_key = key
168         return global_client_object
169
170     @staticmethod
171     def get(locator, **kwargs):
172         return Keep.global_client_object().get(locator, **kwargs)
173
174     @staticmethod
175     def put(data, **kwargs):
176         return Keep.global_client_object().put(data, **kwargs)
177
178 class KeepBlockCache(object):
179     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
180         self.cache_max = cache_max
181         self._cache = []
182         self._cache_lock = threading.Lock()
183         self._max_slots = max_slots
184         self._disk_cache = disk_cache
185         self._disk_cache_dir = disk_cache_dir
186
187         if self._disk_cache and self._disk_cache_dir is None:
188             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
189             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
190
191         if self._max_slots == 0:
192             if self._disk_cache:
193                 # default set max slots to half of maximum file handles
194                 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
195             else:
196                 self._max_slots = 1024
197
198         if self.cache_max == 0:
199             if self._disk_cache:
200                 fs = os.statvfs(self._disk_cache_dir)
201                 avail = (fs.f_bavail * fs.f_bsize) / 2
202                 # Half the available space or max_slots * 64 MiB
203                 self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
204             else:
205                 # 256 GiB in RAM
206                 self.cache_max = (256 * 1024 * 1024)
207
208         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
209
210         if self._disk_cache:
211             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
212             self.cap_cache()
213
214
215     class CacheSlot(object):
216         __slots__ = ("locator", "ready", "content")
217
218         def __init__(self, locator):
219             self.locator = locator
220             self.ready = threading.Event()
221             self.content = None
222
223         def get(self):
224             self.ready.wait()
225             return self.content
226
227         def set(self, value):
228             self.content = value
229             self.ready.set()
230
231         def size(self):
232             if self.content is None:
233                 return 0
234             else:
235                 return len(self.content)
236
237         def evict(self):
238             pass
239
240     def cap_cache(self):
241         '''Cap the cache size to self.cache_max'''
242         with self._cache_lock:
243             # Select all slots except those where ready.is_set() and content is
244             # None (that means there was an error reading the block).
245             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
246             sm = sum([slot.size() for slot in self._cache])
247             while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
248                 for i in range(len(self._cache)-1, -1, -1):
249                     if self._cache[i].ready.is_set():
250                         self._cache[i].evict()
251                         del self._cache[i]
252                         break
253                 sm = sum([slot.size() for slot in self._cache])
254
255     def _get(self, locator):
256         # Test if the locator is already in the cache
257         for i in range(0, len(self._cache)):
258             if self._cache[i].locator == locator:
259                 n = self._cache[i]
260                 if i != 0:
261                     # move it to the front
262                     del self._cache[i]
263                     self._cache.insert(0, n)
264                 return n
265         if self._disk_cache:
266             # see if it exists on disk
267             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
268             if n is not None:
269                 self._cache.insert(0, n)
270                 return n
271         return None
272
273     def get(self, locator):
274         with self._cache_lock:
275             return self._get(locator)
276
277     def reserve_cache(self, locator):
278         '''Reserve a cache slot for the specified locator,
279         or return the existing slot.'''
280         with self._cache_lock:
281             n = self._get(locator)
282             if n:
283                 return n, False
284             else:
285                 # Add a new cache slot for the locator
286                 if self._disk_cache:
287                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
288                 else:
289                     n = KeepBlockCache.CacheSlot(locator)
290                 self._cache.insert(0, n)
291                 return n, True
292
293 class Counter(object):
294     def __init__(self, v=0):
295         self._lk = threading.Lock()
296         self._val = v
297
298     def add(self, v):
299         with self._lk:
300             self._val += v
301
302     def get(self):
303         with self._lk:
304             return self._val
305
306
307 class KeepClient(object):
308
309     # Default Keep server connection timeout:  2 seconds
310     # Default Keep server read timeout:       256 seconds
311     # Default Keep server bandwidth minimum:  32768 bytes per second
312     # Default Keep proxy connection timeout:  20 seconds
313     # Default Keep proxy read timeout:        256 seconds
314     # Default Keep proxy bandwidth minimum:   32768 bytes per second
315     DEFAULT_TIMEOUT = (2, 256, 32768)
316     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
317
318
319     class KeepService(object):
320         """Make requests to a single Keep service, and track results.
321
322         A KeepService is intended to last long enough to perform one
323         transaction (GET or PUT) against one Keep service. This can
324         involve calling either get() or put() multiple times in order
325         to retry after transient failures. However, calling both get()
326         and put() on a single instance -- or using the same instance
327         to access two different Keep services -- will not produce
328         sensible behavior.
329         """
330
331         HTTP_ERRORS = (
332             socket.error,
333             ssl.SSLError,
334             arvados.errors.HttpError,
335         )
336
337         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
338                      upload_counter=None,
339                      download_counter=None,
340                      headers={},
341                      insecure=False):
342             self.root = root
343             self._user_agent_pool = user_agent_pool
344             self._result = {'error': None}
345             self._usable = True
346             self._session = None
347             self._socket = None
348             self.get_headers = {'Accept': 'application/octet-stream'}
349             self.get_headers.update(headers)
350             self.put_headers = headers
351             self.upload_counter = upload_counter
352             self.download_counter = download_counter
353             self.insecure = insecure
354
355         def usable(self):
356             """Is it worth attempting a request?"""
357             return self._usable
358
359         def finished(self):
360             """Did the request succeed or encounter permanent failure?"""
361             return self._result['error'] == False or not self._usable
362
363         def last_result(self):
364             return self._result
365
366         def _get_user_agent(self):
367             try:
368                 return self._user_agent_pool.get(block=False)
369             except queue.Empty:
370                 return pycurl.Curl()
371
372         def _put_user_agent(self, ua):
373             try:
374                 ua.reset()
375                 self._user_agent_pool.put(ua, block=False)
376             except:
377                 ua.close()
378
379         def _socket_open(self, *args, **kwargs):
380             if len(args) + len(kwargs) == 2:
381                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
382             else:
383                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
384
385         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
386             return self._socket_open_pycurl_7_21_5(
387                 purpose=None,
388                 address=collections.namedtuple(
389                     'Address', ['family', 'socktype', 'protocol', 'addr'],
390                 )(family, socktype, protocol, address))
391
392         def _socket_open_pycurl_7_21_5(self, purpose, address):
393             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
394             s = socket.socket(address.family, address.socktype, address.protocol)
395             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
396             # Will throw invalid protocol error on mac. This test prevents that.
397             if hasattr(socket, 'TCP_KEEPIDLE'):
398                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
399             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
400             self._socket = s
401             return s
402
403         def get(self, locator, method="GET", timeout=None):
404             # locator is a KeepLocator object.
405             url = self.root + str(locator)
406             _logger.debug("Request: %s %s", method, url)
407             curl = self._get_user_agent()
408             ok = None
409             try:
410                 with timer.Timer() as t:
411                     self._headers = {}
412                     response_body = BytesIO()
413                     curl.setopt(pycurl.NOSIGNAL, 1)
414                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
415                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
416                     curl.setopt(pycurl.URL, url.encode('utf-8'))
417                     curl.setopt(pycurl.HTTPHEADER, [
418                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
419                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
420                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
421                     if self.insecure:
422                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
423                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
424                     else:
425                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
426                     if method == "HEAD":
427                         curl.setopt(pycurl.NOBODY, True)
428                     self._setcurltimeouts(curl, timeout, method=="HEAD")
429
430                     try:
431                         curl.perform()
432                     except Exception as e:
433                         raise arvados.errors.HttpError(0, str(e))
434                     finally:
435                         if self._socket:
436                             self._socket.close()
437                             self._socket = None
438                     self._result = {
439                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
440                         'body': response_body.getvalue(),
441                         'headers': self._headers,
442                         'error': False,
443                     }
444
445                 ok = retry.check_http_response_success(self._result['status_code'])
446                 if not ok:
447                     self._result['error'] = arvados.errors.HttpError(
448                         self._result['status_code'],
449                         self._headers.get('x-status-line', 'Error'))
450             except self.HTTP_ERRORS as e:
451                 self._result = {
452                     'error': e,
453                 }
454             self._usable = ok != False
455             if self._result.get('status_code', None):
456                 # The client worked well enough to get an HTTP status
457                 # code, so presumably any problems are just on the
458                 # server side and it's OK to reuse the client.
459                 self._put_user_agent(curl)
460             else:
461                 # Don't return this client to the pool, in case it's
462                 # broken.
463                 curl.close()
464             if not ok:
465                 _logger.debug("Request fail: GET %s => %s: %s",
466                               url, type(self._result['error']), str(self._result['error']))
467                 return None
468             if method == "HEAD":
469                 _logger.info("HEAD %s: %s bytes",
470                          self._result['status_code'],
471                          self._result.get('content-length'))
472                 if self._result['headers'].get('x-keep-locator'):
473                     # This is a response to a remote block copy request, return
474                     # the local copy block locator.
475                     return self._result['headers'].get('x-keep-locator')
476                 return True
477
478             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
479                          self._result['status_code'],
480                          len(self._result['body']),
481                          t.msecs,
482                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
483
484             if self.download_counter:
485                 self.download_counter.add(len(self._result['body']))
486             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
487             if resp_md5 != locator.md5sum:
488                 _logger.warning("Checksum fail: md5(%s) = %s",
489                                 url, resp_md5)
490                 self._result['error'] = arvados.errors.HttpError(
491                     0, 'Checksum fail')
492                 return None
493             return self._result['body']
494
495         def put(self, hash_s, body, timeout=None, headers={}):
496             put_headers = copy.copy(self.put_headers)
497             put_headers.update(headers)
498             url = self.root + hash_s
499             _logger.debug("Request: PUT %s", url)
500             curl = self._get_user_agent()
501             ok = None
502             try:
503                 with timer.Timer() as t:
504                     self._headers = {}
505                     body_reader = BytesIO(body)
506                     response_body = BytesIO()
507                     curl.setopt(pycurl.NOSIGNAL, 1)
508                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
509                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
510                     curl.setopt(pycurl.URL, url.encode('utf-8'))
511                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
512                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
513                     # response) instead of sending the request body immediately.
514                     # This allows the server to reject the request if the request
515                     # is invalid or the server is read-only, without waiting for
516                     # the client to send the entire block.
517                     curl.setopt(pycurl.UPLOAD, True)
518                     curl.setopt(pycurl.INFILESIZE, len(body))
519                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
520                     curl.setopt(pycurl.HTTPHEADER, [
521                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
522                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
523                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
524                     if self.insecure:
525                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
526                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
527                     else:
528                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
529                     self._setcurltimeouts(curl, timeout)
530                     try:
531                         curl.perform()
532                     except Exception as e:
533                         raise arvados.errors.HttpError(0, str(e))
534                     finally:
535                         if self._socket:
536                             self._socket.close()
537                             self._socket = None
538                     self._result = {
539                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
540                         'body': response_body.getvalue().decode('utf-8'),
541                         'headers': self._headers,
542                         'error': False,
543                     }
544                 ok = retry.check_http_response_success(self._result['status_code'])
545                 if not ok:
546                     self._result['error'] = arvados.errors.HttpError(
547                         self._result['status_code'],
548                         self._headers.get('x-status-line', 'Error'))
549             except self.HTTP_ERRORS as e:
550                 self._result = {
551                     'error': e,
552                 }
553             self._usable = ok != False # still usable if ok is True or None
554             if self._result.get('status_code', None):
555                 # Client is functional. See comment in get().
556                 self._put_user_agent(curl)
557             else:
558                 curl.close()
559             if not ok:
560                 _logger.debug("Request fail: PUT %s => %s: %s",
561                               url, type(self._result['error']), str(self._result['error']))
562                 return False
563             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
564                          self._result['status_code'],
565                          len(body),
566                          t.msecs,
567                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
568             if self.upload_counter:
569                 self.upload_counter.add(len(body))
570             return True
571
572         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
573             if not timeouts:
574                 return
575             elif isinstance(timeouts, tuple):
576                 if len(timeouts) == 2:
577                     conn_t, xfer_t = timeouts
578                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
579                 else:
580                     conn_t, xfer_t, bandwidth_bps = timeouts
581             else:
582                 conn_t, xfer_t = (timeouts, timeouts)
583                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
584             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
585             if not ignore_bandwidth:
586                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
587                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
588
589         def _headerfunction(self, header_line):
590             if isinstance(header_line, bytes):
591                 header_line = header_line.decode('iso-8859-1')
592             if ':' in header_line:
593                 name, value = header_line.split(':', 1)
594                 name = name.strip().lower()
595                 value = value.strip()
596             elif self._headers:
597                 name = self._lastheadername
598                 value = self._headers[name] + ' ' + header_line.strip()
599             elif header_line.startswith('HTTP/'):
600                 name = 'x-status-line'
601                 value = header_line
602             else:
603                 _logger.error("Unexpected header line: %s", header_line)
604                 return
605             self._lastheadername = name
606             self._headers[name] = value
607             # Returning None implies all bytes were written
608
609
610     class KeepWriterQueue(queue.Queue):
611         def __init__(self, copies, classes=[]):
612             queue.Queue.__init__(self) # Old-style superclass
613             self.wanted_copies = copies
614             self.wanted_storage_classes = classes
615             self.successful_copies = 0
616             self.confirmed_storage_classes = {}
617             self.response = None
618             self.storage_classes_tracking = True
619             self.queue_data_lock = threading.RLock()
620             self.pending_tries = max(copies, len(classes))
621             self.pending_tries_notification = threading.Condition()
622
623         def write_success(self, response, replicas_nr, classes_confirmed):
624             with self.queue_data_lock:
625                 self.successful_copies += replicas_nr
626                 if classes_confirmed is None:
627                     self.storage_classes_tracking = False
628                 elif self.storage_classes_tracking:
629                     for st_class, st_copies in classes_confirmed.items():
630                         try:
631                             self.confirmed_storage_classes[st_class] += st_copies
632                         except KeyError:
633                             self.confirmed_storage_classes[st_class] = st_copies
634                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
635                 self.response = response
636             with self.pending_tries_notification:
637                 self.pending_tries_notification.notify_all()
638
639         def write_fail(self, ks):
640             with self.pending_tries_notification:
641                 self.pending_tries += 1
642                 self.pending_tries_notification.notify()
643
644         def pending_copies(self):
645             with self.queue_data_lock:
646                 return self.wanted_copies - self.successful_copies
647
648         def satisfied_classes(self):
649             with self.queue_data_lock:
650                 if not self.storage_classes_tracking:
651                     # Notifies disabled storage classes expectation to
652                     # the outer loop.
653                     return None
654             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
655
656         def pending_classes(self):
657             with self.queue_data_lock:
658                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
659                     return []
660                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
661                 for st_class, st_copies in self.confirmed_storage_classes.items():
662                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
663                         unsatisfied_classes.remove(st_class)
664                 return unsatisfied_classes
665
666         def get_next_task(self):
667             with self.pending_tries_notification:
668                 while True:
669                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
670                         # This notify_all() is unnecessary --
671                         # write_success() already called notify_all()
672                         # when pending<1 became true, so it's not
673                         # possible for any other thread to be in
674                         # wait() now -- but it's cheap insurance
675                         # against deadlock so we do it anyway:
676                         self.pending_tries_notification.notify_all()
677                         # Drain the queue and then raise Queue.Empty
678                         while True:
679                             self.get_nowait()
680                             self.task_done()
681                     elif self.pending_tries > 0:
682                         service, service_root = self.get_nowait()
683                         if service.finished():
684                             self.task_done()
685                             continue
686                         self.pending_tries -= 1
687                         return service, service_root
688                     elif self.empty():
689                         self.pending_tries_notification.notify_all()
690                         raise queue.Empty
691                     else:
692                         self.pending_tries_notification.wait()
693
694
695     class KeepWriterThreadPool(object):
696         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
697             self.total_task_nr = 0
698             if (not max_service_replicas) or (max_service_replicas >= copies):
699                 num_threads = 1
700             else:
701                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
702             _logger.debug("Pool max threads is %d", num_threads)
703             self.workers = []
704             self.queue = KeepClient.KeepWriterQueue(copies, classes)
705             # Create workers
706             for _ in range(num_threads):
707                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
708                 self.workers.append(w)
709
710         def add_task(self, ks, service_root):
711             self.queue.put((ks, service_root))
712             self.total_task_nr += 1
713
714         def done(self):
715             return self.queue.successful_copies, self.queue.satisfied_classes()
716
717         def join(self):
718             # Start workers
719             for worker in self.workers:
720                 worker.start()
721             # Wait for finished work
722             self.queue.join()
723
724         def response(self):
725             return self.queue.response
726
727
728     class KeepWriterThread(threading.Thread):
729         class TaskFailed(RuntimeError): pass
730
731         def __init__(self, queue, data, data_hash, timeout=None):
732             super(KeepClient.KeepWriterThread, self).__init__()
733             self.timeout = timeout
734             self.queue = queue
735             self.data = data
736             self.data_hash = data_hash
737             self.daemon = True
738
739         def run(self):
740             while True:
741                 try:
742                     service, service_root = self.queue.get_next_task()
743                 except queue.Empty:
744                     return
745                 try:
746                     locator, copies, classes = self.do_task(service, service_root)
747                 except Exception as e:
748                     if not isinstance(e, self.TaskFailed):
749                         _logger.exception("Exception in KeepWriterThread")
750                     self.queue.write_fail(service)
751                 else:
752                     self.queue.write_success(locator, copies, classes)
753                 finally:
754                     self.queue.task_done()
755
756         def do_task(self, service, service_root):
757             classes = self.queue.pending_classes()
758             headers = {}
759             if len(classes) > 0:
760                 classes.sort()
761                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
762             success = bool(service.put(self.data_hash,
763                                         self.data,
764                                         timeout=self.timeout,
765                                         headers=headers))
766             result = service.last_result()
767
768             if not success:
769                 if result.get('status_code'):
770                     _logger.debug("Request fail: PUT %s => %s %s",
771                                   self.data_hash,
772                                   result.get('status_code'),
773                                   result.get('body'))
774                 raise self.TaskFailed()
775
776             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
777                           str(threading.current_thread()),
778                           self.data_hash,
779                           len(self.data),
780                           service_root)
781             try:
782                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
783             except (KeyError, ValueError):
784                 replicas_stored = 1
785
786             classes_confirmed = {}
787             try:
788                 scch = result['headers']['x-keep-storage-classes-confirmed']
789                 for confirmation in scch.replace(' ', '').split(','):
790                     if '=' in confirmation:
791                         stored_class, stored_copies = confirmation.split('=')[:2]
792                         classes_confirmed[stored_class] = int(stored_copies)
793             except (KeyError, ValueError):
794                 # Storage classes confirmed header missing or corrupt
795                 classes_confirmed = None
796
797             return result['body'].strip(), replicas_stored, classes_confirmed
798
799
800     def __init__(self, api_client=None, proxy=None,
801                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
802                  api_token=None, local_store=None, block_cache=None,
803                  num_retries=0, session=None):
804         """Initialize a new KeepClient.
805
806         Arguments:
807         :api_client:
808           The API client to use to find Keep services.  If not
809           provided, KeepClient will build one from available Arvados
810           configuration.
811
812         :proxy:
813           If specified, this KeepClient will send requests to this Keep
814           proxy.  Otherwise, KeepClient will fall back to the setting of the
815           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
816           If you want to KeepClient does not use a proxy, pass in an empty
817           string.
818
819         :timeout:
820           The initial timeout (in seconds) for HTTP requests to Keep
821           non-proxy servers.  A tuple of three floats is interpreted as
822           (connection_timeout, read_timeout, minimum_bandwidth). A connection
823           will be aborted if the average traffic rate falls below
824           minimum_bandwidth bytes per second over an interval of read_timeout
825           seconds. Because timeouts are often a result of transient server
826           load, the actual connection timeout will be increased by a factor
827           of two on each retry.
828           Default: (2, 256, 32768).
829
830         :proxy_timeout:
831           The initial timeout (in seconds) for HTTP requests to
832           Keep proxies. A tuple of three floats is interpreted as
833           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
834           described above for adjusting connection timeouts on retry also
835           applies.
836           Default: (20, 256, 32768).
837
838         :api_token:
839           If you're not using an API client, but only talking
840           directly to a Keep proxy, this parameter specifies an API token
841           to authenticate Keep requests.  It is an error to specify both
842           api_client and api_token.  If you specify neither, KeepClient
843           will use one available from the Arvados configuration.
844
845         :local_store:
846           If specified, this KeepClient will bypass Keep
847           services, and save data to the named directory.  If unspecified,
848           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
849           environment variable.  If you want to ensure KeepClient does not
850           use local storage, pass in an empty string.  This is primarily
851           intended to mock a server for testing.
852
853         :num_retries:
854           The default number of times to retry failed requests.
855           This will be used as the default num_retries value when get() and
856           put() are called.  Default 0.
857         """
858         self.lock = threading.Lock()
859         if proxy is None:
860             if config.get('ARVADOS_KEEP_SERVICES'):
861                 proxy = config.get('ARVADOS_KEEP_SERVICES')
862             else:
863                 proxy = config.get('ARVADOS_KEEP_PROXY')
864         if api_token is None:
865             if api_client is None:
866                 api_token = config.get('ARVADOS_API_TOKEN')
867             else:
868                 api_token = api_client.api_token
869         elif api_client is not None:
870             raise ValueError(
871                 "can't build KeepClient with both API client and token")
872         if local_store is None:
873             local_store = os.environ.get('KEEP_LOCAL_STORE')
874
875         if api_client is None:
876             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
877         else:
878             self.insecure = api_client.insecure
879
880         self.block_cache = block_cache if block_cache else KeepBlockCache()
881         self.timeout = timeout
882         self.proxy_timeout = proxy_timeout
883         self._user_agent_pool = queue.LifoQueue()
884         self.upload_counter = Counter()
885         self.download_counter = Counter()
886         self.put_counter = Counter()
887         self.get_counter = Counter()
888         self.hits_counter = Counter()
889         self.misses_counter = Counter()
890         self._storage_classes_unsupported_warning = False
891         self._default_classes = []
892
893         if local_store:
894             self.local_store = local_store
895             self.head = self.local_store_head
896             self.get = self.local_store_get
897             self.put = self.local_store_put
898         else:
899             self.num_retries = num_retries
900             self.max_replicas_per_service = None
901             if proxy:
902                 proxy_uris = proxy.split()
903                 for i in range(len(proxy_uris)):
904                     if not proxy_uris[i].endswith('/'):
905                         proxy_uris[i] += '/'
906                     # URL validation
907                     url = urllib.parse.urlparse(proxy_uris[i])
908                     if not (url.scheme and url.netloc):
909                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
910                 self.api_token = api_token
911                 self._gateway_services = {}
912                 self._keep_services = [{
913                     'uuid': "00000-bi6l4-%015d" % idx,
914                     'service_type': 'proxy',
915                     '_service_root': uri,
916                     } for idx, uri in enumerate(proxy_uris)]
917                 self._writable_services = self._keep_services
918                 self.using_proxy = True
919                 self._static_services_list = True
920             else:
921                 # It's important to avoid instantiating an API client
922                 # unless we actually need one, for testing's sake.
923                 if api_client is None:
924                     api_client = arvados.api('v1')
925                 self.api_client = api_client
926                 self.api_token = api_client.api_token
927                 self._gateway_services = {}
928                 self._keep_services = None
929                 self._writable_services = None
930                 self.using_proxy = None
931                 self._static_services_list = False
932                 try:
933                     self._default_classes = [
934                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
935                 except KeyError:
936                     # We're talking to an old cluster
937                     pass
938
939     def current_timeout(self, attempt_number):
940         """Return the appropriate timeout to use for this client.
941
942         The proxy timeout setting if the backend service is currently a proxy,
943         the regular timeout setting otherwise.  The `attempt_number` indicates
944         how many times the operation has been tried already (starting from 0
945         for the first try), and scales the connection timeout portion of the
946         return value accordingly.
947
948         """
949         # TODO(twp): the timeout should be a property of a
950         # KeepService, not a KeepClient. See #4488.
951         t = self.proxy_timeout if self.using_proxy else self.timeout
952         if len(t) == 2:
953             return (t[0] * (1 << attempt_number), t[1])
954         else:
955             return (t[0] * (1 << attempt_number), t[1], t[2])
956     def _any_nondisk_services(self, service_list):
957         return any(ks.get('service_type', 'disk') != 'disk'
958                    for ks in service_list)
959
960     def build_services_list(self, force_rebuild=False):
961         if (self._static_services_list or
962               (self._keep_services and not force_rebuild)):
963             return
964         with self.lock:
965             try:
966                 keep_services = self.api_client.keep_services().accessible()
967             except Exception:  # API server predates Keep services.
968                 keep_services = self.api_client.keep_disks().list()
969
970             # Gateway services are only used when specified by UUID,
971             # so there's nothing to gain by filtering them by
972             # service_type.
973             self._gateway_services = {ks['uuid']: ks for ks in
974                                       keep_services.execute()['items']}
975             if not self._gateway_services:
976                 raise arvados.errors.NoKeepServersError()
977
978             # Precompute the base URI for each service.
979             for r in self._gateway_services.values():
980                 host = r['service_host']
981                 if not host.startswith('[') and host.find(':') >= 0:
982                     # IPv6 URIs must be formatted like http://[::1]:80/...
983                     host = '[' + host + ']'
984                 r['_service_root'] = "{}://{}:{:d}/".format(
985                     'https' if r['service_ssl_flag'] else 'http',
986                     host,
987                     r['service_port'])
988
989             _logger.debug(str(self._gateway_services))
990             self._keep_services = [
991                 ks for ks in self._gateway_services.values()
992                 if not ks.get('service_type', '').startswith('gateway:')]
993             self._writable_services = [ks for ks in self._keep_services
994                                        if not ks.get('read_only')]
995
996             # For disk type services, max_replicas_per_service is 1
997             # It is unknown (unlimited) for other service types.
998             if self._any_nondisk_services(self._writable_services):
999                 self.max_replicas_per_service = None
1000             else:
1001                 self.max_replicas_per_service = 1
1002
1003     def _service_weight(self, data_hash, service_uuid):
1004         """Compute the weight of a Keep service endpoint for a data
1005         block with a known hash.
1006
1007         The weight is md5(h + u) where u is the last 15 characters of
1008         the service endpoint's UUID.
1009         """
1010         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1011
1012     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1013         """Return an array of Keep service endpoints, in the order in
1014         which they should be probed when reading or writing data with
1015         the given hash+hints.
1016         """
1017         self.build_services_list(force_rebuild)
1018
1019         sorted_roots = []
1020         # Use the services indicated by the given +K@... remote
1021         # service hints, if any are present and can be resolved to a
1022         # URI.
1023         for hint in locator.hints:
1024             if hint.startswith('K@'):
1025                 if len(hint) == 7:
1026                     sorted_roots.append(
1027                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1028                 elif len(hint) == 29:
1029                     svc = self._gateway_services.get(hint[2:])
1030                     if svc:
1031                         sorted_roots.append(svc['_service_root'])
1032
1033         # Sort the available local services by weight (heaviest first)
1034         # for this locator, and return their service_roots (base URIs)
1035         # in that order.
1036         use_services = self._keep_services
1037         if need_writable:
1038             use_services = self._writable_services
1039         self.using_proxy = self._any_nondisk_services(use_services)
1040         sorted_roots.extend([
1041             svc['_service_root'] for svc in sorted(
1042                 use_services,
1043                 reverse=True,
1044                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1045         _logger.debug("{}: {}".format(locator, sorted_roots))
1046         return sorted_roots
1047
1048     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1049         # roots_map is a dictionary, mapping Keep service root strings
1050         # to KeepService objects.  Poll for Keep services, and add any
1051         # new ones to roots_map.  Return the current list of local
1052         # root strings.
1053         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1054         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1055         for root in local_roots:
1056             if root not in roots_map:
1057                 roots_map[root] = self.KeepService(
1058                     root, self._user_agent_pool,
1059                     upload_counter=self.upload_counter,
1060                     download_counter=self.download_counter,
1061                     headers=headers,
1062                     insecure=self.insecure)
1063         return local_roots
1064
1065     @staticmethod
1066     def _check_loop_result(result):
1067         # KeepClient RetryLoops should save results as a 2-tuple: the
1068         # actual result of the request, and the number of servers available
1069         # to receive the request this round.
1070         # This method returns True if there's a real result, False if
1071         # there are no more servers available, otherwise None.
1072         if isinstance(result, Exception):
1073             return None
1074         result, tried_server_count = result
1075         if (result is not None) and (result is not False):
1076             return True
1077         elif tried_server_count < 1:
1078             _logger.info("No more Keep services to try; giving up")
1079             return False
1080         else:
1081             return None
1082
1083     def get_from_cache(self, loc_s):
1084         """Fetch a block only if is in the cache, otherwise return None."""
1085         locator = KeepLocator(loc_s)
1086         slot = self.block_cache.get(locator.md5sum)
1087         if slot is not None and slot.ready.is_set():
1088             return slot.get()
1089         else:
1090             return None
1091
1092     def refresh_signature(self, loc):
1093         """Ask Keep to get the remote block and return its local signature"""
1094         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1095         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1096
1097     @retry.retry_method
1098     def head(self, loc_s, **kwargs):
1099         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1100
1101     @retry.retry_method
1102     def get(self, loc_s, **kwargs):
1103         return self._get_or_head(loc_s, method="GET", **kwargs)
1104
1105     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1106         """Get data from Keep.
1107
1108         This method fetches one or more blocks of data from Keep.  It
1109         sends a request each Keep service registered with the API
1110         server (or the proxy provided when this client was
1111         instantiated), then each service named in location hints, in
1112         sequence.  As soon as one service provides the data, it's
1113         returned.
1114
1115         Arguments:
1116         * loc_s: A string of one or more comma-separated locators to fetch.
1117           This method returns the concatenation of these blocks.
1118         * num_retries: The number of times to retry GET requests to
1119           *each* Keep server if it returns temporary failures, with
1120           exponential backoff.  Note that, in each loop, the method may try
1121           to fetch data from every available Keep service, along with any
1122           that are named in location hints in the locator.  The default value
1123           is set when the KeepClient is initialized.
1124         """
1125         if ',' in loc_s:
1126             return ''.join(self.get(x) for x in loc_s.split(','))
1127
1128         self.get_counter.add(1)
1129
1130         request_id = (request_id or
1131                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1132                       arvados.util.new_request_id())
1133         if headers is None:
1134             headers = {}
1135         headers['X-Request-Id'] = request_id
1136
1137         slot = None
1138         blob = None
1139         try:
1140             locator = KeepLocator(loc_s)
1141             if method == "GET":
1142                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1143                 if not first:
1144                     if prefetch:
1145                         # this is request for a prefetch, if it is
1146                         # already in flight, return immediately.
1147                         # clear 'slot' to prevent finally block from
1148                         # calling slot.set()
1149                         slot = None
1150                         return None
1151                     self.hits_counter.add(1)
1152                     blob = slot.get()
1153                     if blob is None:
1154                         raise arvados.errors.KeepReadError(
1155                             "failed to read {}".format(loc_s))
1156                     return blob
1157
1158             self.misses_counter.add(1)
1159
1160             # If the locator has hints specifying a prefix (indicating a
1161             # remote keepproxy) or the UUID of a local gateway service,
1162             # read data from the indicated service(s) instead of the usual
1163             # list of local disk services.
1164             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1165                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1166             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1167                                for hint in locator.hints if (
1168                                        hint.startswith('K@') and
1169                                        len(hint) == 29 and
1170                                        self._gateway_services.get(hint[2:])
1171                                        )])
1172             # Map root URLs to their KeepService objects.
1173             roots_map = {
1174                 root: self.KeepService(root, self._user_agent_pool,
1175                                        upload_counter=self.upload_counter,
1176                                        download_counter=self.download_counter,
1177                                        headers=headers,
1178                                        insecure=self.insecure)
1179                 for root in hint_roots
1180             }
1181
1182             # See #3147 for a discussion of the loop implementation.  Highlights:
1183             # * Refresh the list of Keep services after each failure, in case
1184             #   it's being updated.
1185             # * Retry until we succeed, we're out of retries, or every available
1186             #   service has returned permanent failure.
1187             sorted_roots = []
1188             roots_map = {}
1189             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1190                                    backoff_start=2)
1191             for tries_left in loop:
1192                 try:
1193                     sorted_roots = self.map_new_services(
1194                         roots_map, locator,
1195                         force_rebuild=(tries_left < num_retries),
1196                         need_writable=False,
1197                         headers=headers)
1198                 except Exception as error:
1199                     loop.save_result(error)
1200                     continue
1201
1202                 # Query KeepService objects that haven't returned
1203                 # permanent failure, in our specified shuffle order.
1204                 services_to_try = [roots_map[root]
1205                                    for root in sorted_roots
1206                                    if roots_map[root].usable()]
1207                 for keep_service in services_to_try:
1208                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1209                     if blob is not None:
1210                         break
1211                 loop.save_result((blob, len(services_to_try)))
1212
1213             # Always cache the result, then return it if we succeeded.
1214             if loop.success():
1215                 return blob
1216         finally:
1217             if slot is not None:
1218                 slot.set(blob)
1219                 self.block_cache.cap_cache()
1220
1221         # Q: Including 403 is necessary for the Keep tests to continue
1222         # passing, but maybe they should expect KeepReadError instead?
1223         not_founds = sum(1 for key in sorted_roots
1224                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1225         service_errors = ((key, roots_map[key].last_result()['error'])
1226                           for key in sorted_roots)
1227         if not roots_map:
1228             raise arvados.errors.KeepReadError(
1229                 "[{}] failed to read {}: no Keep services available ({})".format(
1230                     request_id, loc_s, loop.last_result()))
1231         elif not_founds == len(sorted_roots):
1232             raise arvados.errors.NotFoundError(
1233                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1234         else:
1235             raise arvados.errors.KeepReadError(
1236                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1237
1238     @retry.retry_method
1239     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1240         """Save data in Keep.
1241
1242         This method will get a list of Keep services from the API server, and
1243         send the data to each one simultaneously in a new thread.  Once the
1244         uploads are finished, if enough copies are saved, this method returns
1245         the most recent HTTP response body.  If requests fail to upload
1246         enough copies, this method raises KeepWriteError.
1247
1248         Arguments:
1249         * data: The string of data to upload.
1250         * copies: The number of copies that the user requires be saved.
1251           Default 2.
1252         * num_retries: The number of times to retry PUT requests to
1253           *each* Keep server if it returns temporary failures, with
1254           exponential backoff.  The default value is set when the
1255           KeepClient is initialized.
1256         * classes: An optional list of storage class names where copies should
1257           be written.
1258         """
1259
1260         classes = classes or self._default_classes
1261
1262         if not isinstance(data, bytes):
1263             data = data.encode()
1264
1265         self.put_counter.add(1)
1266
1267         data_hash = hashlib.md5(data).hexdigest()
1268         loc_s = data_hash + '+' + str(len(data))
1269         if copies < 1:
1270             return loc_s
1271         locator = KeepLocator(loc_s)
1272
1273         request_id = (request_id or
1274                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1275                       arvados.util.new_request_id())
1276         headers = {
1277             'X-Request-Id': request_id,
1278             'X-Keep-Desired-Replicas': str(copies),
1279         }
1280         roots_map = {}
1281         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1282                                backoff_start=2)
1283         done_copies = 0
1284         done_classes = []
1285         for tries_left in loop:
1286             try:
1287                 sorted_roots = self.map_new_services(
1288                     roots_map, locator,
1289                     force_rebuild=(tries_left < num_retries),
1290                     need_writable=True,
1291                     headers=headers)
1292             except Exception as error:
1293                 loop.save_result(error)
1294                 continue
1295
1296             pending_classes = []
1297             if done_classes is not None:
1298                 pending_classes = list(set(classes) - set(done_classes))
1299             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1300                                                         data_hash=data_hash,
1301                                                         copies=copies - done_copies,
1302                                                         max_service_replicas=self.max_replicas_per_service,
1303                                                         timeout=self.current_timeout(num_retries - tries_left),
1304                                                         classes=pending_classes)
1305             for service_root, ks in [(root, roots_map[root])
1306                                      for root in sorted_roots]:
1307                 if ks.finished():
1308                     continue
1309                 writer_pool.add_task(ks, service_root)
1310             writer_pool.join()
1311             pool_copies, pool_classes = writer_pool.done()
1312             done_copies += pool_copies
1313             if (done_classes is not None) and (pool_classes is not None):
1314                 done_classes += pool_classes
1315                 loop.save_result(
1316                     (done_copies >= copies and set(done_classes) == set(classes),
1317                     writer_pool.total_task_nr))
1318             else:
1319                 # Old keepstore contacted without storage classes support:
1320                 # success is determined only by successful copies.
1321                 #
1322                 # Disable storage classes tracking from this point forward.
1323                 if not self._storage_classes_unsupported_warning:
1324                     self._storage_classes_unsupported_warning = True
1325                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1326                 done_classes = None
1327                 loop.save_result(
1328                     (done_copies >= copies, writer_pool.total_task_nr))
1329
1330         if loop.success():
1331             return writer_pool.response()
1332         if not roots_map:
1333             raise arvados.errors.KeepWriteError(
1334                 "[{}] failed to write {}: no Keep services available ({})".format(
1335                     request_id, data_hash, loop.last_result()))
1336         else:
1337             service_errors = ((key, roots_map[key].last_result()['error'])
1338                               for key in sorted_roots
1339                               if roots_map[key].last_result()['error'])
1340             raise arvados.errors.KeepWriteError(
1341                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1342                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1343
1344     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1345         """A stub for put().
1346
1347         This method is used in place of the real put() method when
1348         using local storage (see constructor's local_store argument).
1349
1350         copies and num_retries arguments are ignored: they are here
1351         only for the sake of offering the same call signature as
1352         put().
1353
1354         Data stored this way can be retrieved via local_store_get().
1355         """
1356         md5 = hashlib.md5(data).hexdigest()
1357         locator = '%s+%d' % (md5, len(data))
1358         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1359             f.write(data)
1360         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1361                   os.path.join(self.local_store, md5))
1362         return locator
1363
1364     def local_store_get(self, loc_s, num_retries=None):
1365         """Companion to local_store_put()."""
1366         try:
1367             locator = KeepLocator(loc_s)
1368         except ValueError:
1369             raise arvados.errors.NotFoundError(
1370                 "Invalid data locator: '%s'" % loc_s)
1371         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1372             return b''
1373         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1374             return f.read()
1375
1376     def local_store_head(self, loc_s, num_retries=None):
1377         """Companion to local_store_put()."""
1378         try:
1379             locator = KeepLocator(loc_s)
1380         except ValueError:
1381             raise arvados.errors.NotFoundError(
1382                 "Invalid data locator: '%s'" % loc_s)
1383         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1384             return True
1385         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1386             return True