20257: Fix use of dataclass
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34 import weakref
35
36 if sys.version_info >= (3, 0):
37     from io import BytesIO
38 else:
39     from cStringIO import StringIO as BytesIO
40
41 import arvados
42 import arvados.config as config
43 import arvados.errors
44 import arvados.retry as retry
45 import arvados.util
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
48
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
51
52
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56     if not hasattr(socket, 'TCP_KEEPALIVE'):
57         socket.TCP_KEEPALIVE = 0x010
58     if not hasattr(socket, 'TCP_KEEPINTVL'):
59         socket.TCP_KEEPINTVL = 0x101
60     if not hasattr(socket, 'TCP_KEEPCNT'):
61         socket.TCP_KEEPCNT = 0x102
62
63
64 class KeepLocator(object):
65     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
67
68     def __init__(self, locator_str):
69         self.hints = []
70         self._perm_sig = None
71         self._perm_expiry = None
72         pieces = iter(locator_str.split('+'))
73         self.md5sum = next(pieces)
74         try:
75             self.size = int(next(pieces))
76         except StopIteration:
77             self.size = None
78         for hint in pieces:
79             if self.HINT_RE.match(hint) is None:
80                 raise ValueError("invalid hint format: {}".format(hint))
81             elif hint.startswith('A'):
82                 self.parse_permission_hint(hint)
83             else:
84                 self.hints.append(hint)
85
86     def __str__(self):
87         return '+'.join(
88             native_str(s)
89             for s in [self.md5sum, self.size,
90                       self.permission_hint()] + self.hints
91             if s is not None)
92
93     def stripped(self):
94         if self.size is not None:
95             return "%s+%i" % (self.md5sum, self.size)
96         else:
97             return self.md5sum
98
99     def _make_hex_prop(name, length):
100         # Build and return a new property with the given name that
101         # must be a hex string of the given length.
102         data_name = '_{}'.format(name)
103         def getter(self):
104             return getattr(self, data_name)
105         def setter(self, hex_str):
106             if not arvados.util.is_hex(hex_str, length):
107                 raise ValueError("{} is not a {}-digit hex string: {!r}".
108                                  format(name, length, hex_str))
109             setattr(self, data_name, hex_str)
110         return property(getter, setter)
111
112     md5sum = _make_hex_prop('md5sum', 32)
113     perm_sig = _make_hex_prop('perm_sig', 40)
114
115     @property
116     def perm_expiry(self):
117         return self._perm_expiry
118
119     @perm_expiry.setter
120     def perm_expiry(self, value):
121         if not arvados.util.is_hex(value, 1, 8):
122             raise ValueError(
123                 "permission timestamp must be a hex Unix timestamp: {}".
124                 format(value))
125         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
126
127     def permission_hint(self):
128         data = [self.perm_sig, self.perm_expiry]
129         if None in data:
130             return None
131         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132         return "A{}@{:08x}".format(*data)
133
134     def parse_permission_hint(self, s):
135         try:
136             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
137         except IndexError:
138             raise ValueError("bad permission hint {}".format(s))
139
140     def permission_expired(self, as_of_dt=None):
141         if self.perm_expiry is None:
142             return False
143         elif as_of_dt is None:
144             as_of_dt = datetime.datetime.now()
145         return self.perm_expiry <= as_of_dt
146
147
148 class Keep(object):
149     """Simple interface to a global KeepClient object.
150
151     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
152     own API client.  The global KeepClient will build an API client from the
153     current Arvados configuration, which may not match the one you built.
154     """
155     _last_key = None
156
157     @classmethod
158     def global_client_object(cls):
159         global global_client_object
160         # Previously, KeepClient would change its behavior at runtime based
161         # on these configuration settings.  We simulate that behavior here
162         # by checking the values and returning a new KeepClient if any of
163         # them have changed.
164         key = (config.get('ARVADOS_API_HOST'),
165                config.get('ARVADOS_API_TOKEN'),
166                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167                config.get('ARVADOS_KEEP_PROXY'),
168                os.environ.get('KEEP_LOCAL_STORE'))
169         if (global_client_object is None) or (cls._last_key != key):
170             global_client_object = KeepClient()
171             cls._last_key = key
172         return global_client_object
173
174     @staticmethod
175     def get(locator, **kwargs):
176         return Keep.global_client_object().get(locator, **kwargs)
177
178     @staticmethod
179     def put(data, **kwargs):
180         return Keep.global_client_object().put(data, **kwargs)
181
182 class KeepBlockCache(object):
183     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184         self.cache_max = cache_max
185         self._cache = []
186         self._cache_lock = threading.Lock()
187         self._max_slots = max_slots
188         self._disk_cache = disk_cache
189         self._disk_cache_dir = disk_cache_dir
190         self._cache_updating = threading.Condition(self._cache_lock)
191
192         if self._disk_cache and self._disk_cache_dir is None:
193             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
195
196         if self._max_slots == 0:
197             if self._disk_cache:
198                 # Each block uses two file descriptors, one used to
199                 # open it initially and hold the flock(), and a second
200                 # hidden one used by mmap().
201                 #
202                 # Set max slots to 1/8 of maximum file handles.  This
203                 # means we'll use at most 1/4 of total file handles.
204                 #
205                 # NOFILE typically defaults to 1024 on Linux so this
206                 # is 128 slots (256 file handles), which means we can
207                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
208                 # 768 file handles for sockets and other stuff.
209                 #
210                 # When we want the ability to have more cache (e.g. in
211                 # arv-mount) we'll increase rlimit before calling
212                 # this.
213                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
214             else:
215                 # RAM cache slots
216                 self._max_slots = 512
217
218         if self.cache_max == 0:
219             if self._disk_cache:
220                 fs = os.statvfs(self._disk_cache_dir)
221                 # Calculation of available space incorporates existing cache usage
222                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
225                 # pick smallest of:
226                 # 10% of total disk size
227                 # 25% of available space
228                 # max_slots * 64 MiB
229                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
230             else:
231                 # 256 MiB in RAM
232                 self.cache_max = (256 * 1024 * 1024)
233
234         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
235
236         if self._disk_cache:
237             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
238             self.cap_cache()
239
240
241     class CacheSlot(object):
242         __slots__ = ("locator", "ready", "content")
243
244         def __init__(self, locator):
245             self.locator = locator
246             self.ready = threading.Event()
247             self.content = None
248
249         def get(self):
250             self.ready.wait()
251             return self.content
252
253         def set(self, value):
254             self.content = value
255             self.ready.set()
256
257         def size(self):
258             if self.content is None:
259                 return 0
260             else:
261                 return len(self.content)
262
263         def evict(self):
264             self.content = None
265             return self.gone()
266
267         def gone(self):
268             return (self.content is None)
269
270     def _resize_cache(self, cache_max, max_slots):
271         # Try and make sure the contents of the cache do not exceed
272         # the supplied maximums.
273
274         # Select all slots except those where ready.is_set() and content is
275         # None (that means there was an error reading the block).
276         self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
277         sm = sum([slot.size() for slot in self._cache])
278         while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
279             for i in range(len(self._cache)-1, -1, -1):
280                 # start from the back, find a slot that is a candidate to evict
281                 if self._cache[i].ready.is_set():
282                     sz = self._cache[i].size()
283
284                     # If evict returns false it means the
285                     # underlying disk cache couldn't lock the file
286                     # for deletion because another process was using
287                     # it. Don't count it as reducing the amount
288                     # of data in the cache, find something else to
289                     # throw out.
290                     if self._cache[i].evict():
291                         sm -= sz
292
293                     # check to make sure the underlying data is gone
294                     if self._cache[i].gone():
295                         # either way we forget about it.  either the
296                         # other process will delete it, or if we need
297                         # it again and it is still there, we'll find
298                         # it on disk.
299                         del self._cache[i]
300                     break
301
302
303     def cap_cache(self):
304         '''Cap the cache size to self.cache_max'''
305         with self._cache_updating:
306             self._resize_cache(self.cache_max, self._max_slots)
307             self._cache_updating.notify_all()
308
309     def _get(self, locator):
310         # Test if the locator is already in the cache
311         for i in range(0, len(self._cache)):
312             if self._cache[i].locator == locator:
313                 n = self._cache[i]
314                 if i != 0:
315                     # move it to the front
316                     del self._cache[i]
317                     self._cache.insert(0, n)
318                 return n
319         if self._disk_cache:
320             # see if it exists on disk
321             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
322             if n is not None:
323                 self._cache.insert(0, n)
324                 return n
325         return None
326
327     def get(self, locator):
328         with self._cache_lock:
329             return self._get(locator)
330
331     def reserve_cache(self, locator):
332         '''Reserve a cache slot for the specified locator,
333         or return the existing slot.'''
334         with self._cache_updating:
335             n = self._get(locator)
336             if n:
337                 return n, False
338             else:
339                 # Add a new cache slot for the locator
340                 self._resize_cache(self.cache_max, self._max_slots-1)
341                 while len(self._cache) >= self._max_slots:
342                     # If there isn't a slot available, need to wait
343                     # for something to happen that releases one of the
344                     # cache slots.  Idle for 200 ms or woken up by
345                     # another thread
346                     self._cache_updating.wait(timeout=0.2)
347                     self._resize_cache(self.cache_max, self._max_slots-1)
348
349                 if self._disk_cache:
350                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
351                 else:
352                     n = KeepBlockCache.CacheSlot(locator)
353                 self._cache.insert(0, n)
354                 return n, True
355
356     def set(self, slot, blob):
357         try:
358             slot.set(blob)
359             return
360         except OSError as e:
361             if e.errno == errno.ENOMEM:
362                 # Reduce max slots to current - 4, cap cache and retry
363                 with self._cache_lock:
364                     self._max_slots = max(4, len(self._cache) - 4)
365             elif e.errno == errno.ENOSPC:
366                 # Reduce disk max space to current - 256 MiB, cap cache and retry
367                 with self._cache_lock:
368                     sm = sum([st.size() for st in self._cache])
369                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
370             elif e.errno == errno.ENODEV:
371                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
372         except Exception as e:
373             pass
374         finally:
375             # Check if we should evict things from the cache.  Either
376             # because we added a new thing or there was an error and
377             # we possibly adjusted the limits down, so we might need
378             # to push something out.
379             self.cap_cache()
380
381         try:
382             # Only gets here if there was an error the first time. The
383             # exception handler adjusts limits downward in some cases
384             # to free up resources, which would make the operation
385             # succeed.
386             slot.set(blob)
387         except Exception as e:
388             # It failed again.  Give up.
389             slot.set(None)
390             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
391
392         self.cap_cache()
393
394 class Counter(object):
395     def __init__(self, v=0):
396         self._lk = threading.Lock()
397         self._val = v
398
399     def add(self, v):
400         with self._lk:
401             self._val += v
402
403     def get(self):
404         with self._lk:
405             return self._val
406
407
408 class KeepClient(object):
409     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
410     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
411
412     class KeepService(PyCurlHelper):
413         """Make requests to a single Keep service, and track results.
414
415         A KeepService is intended to last long enough to perform one
416         transaction (GET or PUT) against one Keep service. This can
417         involve calling either get() or put() multiple times in order
418         to retry after transient failures. However, calling both get()
419         and put() on a single instance -- or using the same instance
420         to access two different Keep services -- will not produce
421         sensible behavior.
422         """
423
424         HTTP_ERRORS = (
425             socket.error,
426             ssl.SSLError,
427             arvados.errors.HttpError,
428         )
429
430         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
431                      upload_counter=None,
432                      download_counter=None,
433                      headers={},
434                      insecure=False):
435             super(KeepClient.KeepService, self).__init__()
436             self.root = root
437             self._user_agent_pool = user_agent_pool
438             self._result = {'error': None}
439             self._usable = True
440             self._session = None
441             self._socket = None
442             self.get_headers = {'Accept': 'application/octet-stream'}
443             self.get_headers.update(headers)
444             self.put_headers = headers
445             self.upload_counter = upload_counter
446             self.download_counter = download_counter
447             self.insecure = insecure
448
449         def usable(self):
450             """Is it worth attempting a request?"""
451             return self._usable
452
453         def finished(self):
454             """Did the request succeed or encounter permanent failure?"""
455             return self._result['error'] == False or not self._usable
456
457         def last_result(self):
458             return self._result
459
460         def _get_user_agent(self):
461             try:
462                 return self._user_agent_pool.get(block=False)
463             except queue.Empty:
464                 return pycurl.Curl()
465
466         def _put_user_agent(self, ua):
467             try:
468                 ua.reset()
469                 self._user_agent_pool.put(ua, block=False)
470             except:
471                 ua.close()
472
473         def get(self, locator, method="GET", timeout=None):
474             # locator is a KeepLocator object.
475             url = self.root + str(locator)
476             _logger.debug("Request: %s %s", method, url)
477             curl = self._get_user_agent()
478             ok = None
479             try:
480                 with timer.Timer() as t:
481                     self._headers = {}
482                     response_body = BytesIO()
483                     curl.setopt(pycurl.NOSIGNAL, 1)
484                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
485                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
486                     curl.setopt(pycurl.URL, url.encode('utf-8'))
487                     curl.setopt(pycurl.HTTPHEADER, [
488                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
489                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
490                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
491                     if self.insecure:
492                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
493                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
494                     else:
495                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
496                     if method == "HEAD":
497                         curl.setopt(pycurl.NOBODY, True)
498                     else:
499                         curl.setopt(pycurl.HTTPGET, True)
500                     self._setcurltimeouts(curl, timeout, method=="HEAD")
501
502                     try:
503                         curl.perform()
504                     except Exception as e:
505                         raise arvados.errors.HttpError(0, str(e))
506                     finally:
507                         if self._socket:
508                             self._socket.close()
509                             self._socket = None
510                     self._result = {
511                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
512                         'body': response_body.getvalue(),
513                         'headers': self._headers,
514                         'error': False,
515                     }
516
517                 ok = retry.check_http_response_success(self._result['status_code'])
518                 if not ok:
519                     self._result['error'] = arvados.errors.HttpError(
520                         self._result['status_code'],
521                         self._headers.get('x-status-line', 'Error'))
522             except self.HTTP_ERRORS as e:
523                 self._result = {
524                     'error': e,
525                 }
526             self._usable = ok != False
527             if self._result.get('status_code', None):
528                 # The client worked well enough to get an HTTP status
529                 # code, so presumably any problems are just on the
530                 # server side and it's OK to reuse the client.
531                 self._put_user_agent(curl)
532             else:
533                 # Don't return this client to the pool, in case it's
534                 # broken.
535                 curl.close()
536             if not ok:
537                 _logger.debug("Request fail: GET %s => %s: %s",
538                               url, type(self._result['error']), str(self._result['error']))
539                 return None
540             if method == "HEAD":
541                 _logger.info("HEAD %s: %s bytes",
542                          self._result['status_code'],
543                          self._result.get('content-length'))
544                 if self._result['headers'].get('x-keep-locator'):
545                     # This is a response to a remote block copy request, return
546                     # the local copy block locator.
547                     return self._result['headers'].get('x-keep-locator')
548                 return True
549
550             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
551                          self._result['status_code'],
552                          len(self._result['body']),
553                          t.msecs,
554                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
555
556             if self.download_counter:
557                 self.download_counter.add(len(self._result['body']))
558             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
559             if resp_md5 != locator.md5sum:
560                 _logger.warning("Checksum fail: md5(%s) = %s",
561                                 url, resp_md5)
562                 self._result['error'] = arvados.errors.HttpError(
563                     0, 'Checksum fail')
564                 return None
565             return self._result['body']
566
567         def put(self, hash_s, body, timeout=None, headers={}):
568             put_headers = copy.copy(self.put_headers)
569             put_headers.update(headers)
570             url = self.root + hash_s
571             _logger.debug("Request: PUT %s", url)
572             curl = self._get_user_agent()
573             ok = None
574             try:
575                 with timer.Timer() as t:
576                     self._headers = {}
577                     body_reader = BytesIO(body)
578                     response_body = BytesIO()
579                     curl.setopt(pycurl.NOSIGNAL, 1)
580                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
581                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
582                     curl.setopt(pycurl.URL, url.encode('utf-8'))
583                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
584                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
585                     # response) instead of sending the request body immediately.
586                     # This allows the server to reject the request if the request
587                     # is invalid or the server is read-only, without waiting for
588                     # the client to send the entire block.
589                     curl.setopt(pycurl.UPLOAD, True)
590                     curl.setopt(pycurl.INFILESIZE, len(body))
591                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
592                     curl.setopt(pycurl.HTTPHEADER, [
593                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
594                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
595                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
596                     if self.insecure:
597                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
598                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
599                     else:
600                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
601                     self._setcurltimeouts(curl, timeout)
602                     try:
603                         curl.perform()
604                     except Exception as e:
605                         raise arvados.errors.HttpError(0, str(e))
606                     finally:
607                         if self._socket:
608                             self._socket.close()
609                             self._socket = None
610                     self._result = {
611                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
612                         'body': response_body.getvalue().decode('utf-8'),
613                         'headers': self._headers,
614                         'error': False,
615                     }
616                 ok = retry.check_http_response_success(self._result['status_code'])
617                 if not ok:
618                     self._result['error'] = arvados.errors.HttpError(
619                         self._result['status_code'],
620                         self._headers.get('x-status-line', 'Error'))
621             except self.HTTP_ERRORS as e:
622                 self._result = {
623                     'error': e,
624                 }
625             self._usable = ok != False # still usable if ok is True or None
626             if self._result.get('status_code', None):
627                 # Client is functional. See comment in get().
628                 self._put_user_agent(curl)
629             else:
630                 curl.close()
631             if not ok:
632                 _logger.debug("Request fail: PUT %s => %s: %s",
633                               url, type(self._result['error']), str(self._result['error']))
634                 return False
635             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
636                          self._result['status_code'],
637                          len(body),
638                          t.msecs,
639                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
640             if self.upload_counter:
641                 self.upload_counter.add(len(body))
642             return True
643
644
645     class KeepWriterQueue(queue.Queue):
646         def __init__(self, copies, classes=[]):
647             queue.Queue.__init__(self) # Old-style superclass
648             self.wanted_copies = copies
649             self.wanted_storage_classes = classes
650             self.successful_copies = 0
651             self.confirmed_storage_classes = {}
652             self.response = None
653             self.storage_classes_tracking = True
654             self.queue_data_lock = threading.RLock()
655             self.pending_tries = max(copies, len(classes))
656             self.pending_tries_notification = threading.Condition()
657
658         def write_success(self, response, replicas_nr, classes_confirmed):
659             with self.queue_data_lock:
660                 self.successful_copies += replicas_nr
661                 if classes_confirmed is None:
662                     self.storage_classes_tracking = False
663                 elif self.storage_classes_tracking:
664                     for st_class, st_copies in classes_confirmed.items():
665                         try:
666                             self.confirmed_storage_classes[st_class] += st_copies
667                         except KeyError:
668                             self.confirmed_storage_classes[st_class] = st_copies
669                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
670                 self.response = response
671             with self.pending_tries_notification:
672                 self.pending_tries_notification.notify_all()
673
674         def write_fail(self, ks):
675             with self.pending_tries_notification:
676                 self.pending_tries += 1
677                 self.pending_tries_notification.notify()
678
679         def pending_copies(self):
680             with self.queue_data_lock:
681                 return self.wanted_copies - self.successful_copies
682
683         def satisfied_classes(self):
684             with self.queue_data_lock:
685                 if not self.storage_classes_tracking:
686                     # Notifies disabled storage classes expectation to
687                     # the outer loop.
688                     return None
689             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
690
691         def pending_classes(self):
692             with self.queue_data_lock:
693                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
694                     return []
695                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
696                 for st_class, st_copies in self.confirmed_storage_classes.items():
697                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
698                         unsatisfied_classes.remove(st_class)
699                 return unsatisfied_classes
700
701         def get_next_task(self):
702             with self.pending_tries_notification:
703                 while True:
704                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
705                         # This notify_all() is unnecessary --
706                         # write_success() already called notify_all()
707                         # when pending<1 became true, so it's not
708                         # possible for any other thread to be in
709                         # wait() now -- but it's cheap insurance
710                         # against deadlock so we do it anyway:
711                         self.pending_tries_notification.notify_all()
712                         # Drain the queue and then raise Queue.Empty
713                         while True:
714                             self.get_nowait()
715                             self.task_done()
716                     elif self.pending_tries > 0:
717                         service, service_root = self.get_nowait()
718                         if service.finished():
719                             self.task_done()
720                             continue
721                         self.pending_tries -= 1
722                         return service, service_root
723                     elif self.empty():
724                         self.pending_tries_notification.notify_all()
725                         raise queue.Empty
726                     else:
727                         self.pending_tries_notification.wait()
728
729
730     class KeepWriterThreadPool(object):
731         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
732             self.total_task_nr = 0
733             if (not max_service_replicas) or (max_service_replicas >= copies):
734                 num_threads = 1
735             else:
736                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
737             _logger.debug("Pool max threads is %d", num_threads)
738             self.workers = []
739             self.queue = KeepClient.KeepWriterQueue(copies, classes)
740             # Create workers
741             for _ in range(num_threads):
742                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
743                 self.workers.append(w)
744
745         def add_task(self, ks, service_root):
746             self.queue.put((ks, service_root))
747             self.total_task_nr += 1
748
749         def done(self):
750             return self.queue.successful_copies, self.queue.satisfied_classes()
751
752         def join(self):
753             # Start workers
754             for worker in self.workers:
755                 worker.start()
756             # Wait for finished work
757             self.queue.join()
758
759         def response(self):
760             return self.queue.response
761
762
763     class KeepWriterThread(threading.Thread):
764         class TaskFailed(RuntimeError): pass
765
766         def __init__(self, queue, data, data_hash, timeout=None):
767             super(KeepClient.KeepWriterThread, self).__init__()
768             self.timeout = timeout
769             self.queue = queue
770             self.data = data
771             self.data_hash = data_hash
772             self.daemon = True
773
774         def run(self):
775             while True:
776                 try:
777                     service, service_root = self.queue.get_next_task()
778                 except queue.Empty:
779                     return
780                 try:
781                     locator, copies, classes = self.do_task(service, service_root)
782                 except Exception as e:
783                     if not isinstance(e, self.TaskFailed):
784                         _logger.exception("Exception in KeepWriterThread")
785                     self.queue.write_fail(service)
786                 else:
787                     self.queue.write_success(locator, copies, classes)
788                 finally:
789                     self.queue.task_done()
790
791         def do_task(self, service, service_root):
792             classes = self.queue.pending_classes()
793             headers = {}
794             if len(classes) > 0:
795                 classes.sort()
796                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
797             success = bool(service.put(self.data_hash,
798                                         self.data,
799                                         timeout=self.timeout,
800                                         headers=headers))
801             result = service.last_result()
802
803             if not success:
804                 if result.get('status_code'):
805                     _logger.debug("Request fail: PUT %s => %s %s",
806                                   self.data_hash,
807                                   result.get('status_code'),
808                                   result.get('body'))
809                 raise self.TaskFailed()
810
811             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
812                           str(threading.current_thread()),
813                           self.data_hash,
814                           len(self.data),
815                           service_root)
816             try:
817                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
818             except (KeyError, ValueError):
819                 replicas_stored = 1
820
821             classes_confirmed = {}
822             try:
823                 scch = result['headers']['x-keep-storage-classes-confirmed']
824                 for confirmation in scch.replace(' ', '').split(','):
825                     if '=' in confirmation:
826                         stored_class, stored_copies = confirmation.split('=')[:2]
827                         classes_confirmed[stored_class] = int(stored_copies)
828             except (KeyError, ValueError):
829                 # Storage classes confirmed header missing or corrupt
830                 classes_confirmed = None
831
832             return result['body'].strip(), replicas_stored, classes_confirmed
833
834
835     def __init__(self, api_client=None, proxy=None,
836                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
837                  api_token=None, local_store=None, block_cache=None,
838                  num_retries=0, session=None):
839         """Initialize a new KeepClient.
840
841         Arguments:
842         :api_client:
843           The API client to use to find Keep services.  If not
844           provided, KeepClient will build one from available Arvados
845           configuration.
846
847         :proxy:
848           If specified, this KeepClient will send requests to this Keep
849           proxy.  Otherwise, KeepClient will fall back to the setting of the
850           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
851           If you want to KeepClient does not use a proxy, pass in an empty
852           string.
853
854         :timeout:
855           The initial timeout (in seconds) for HTTP requests to Keep
856           non-proxy servers.  A tuple of three floats is interpreted as
857           (connection_timeout, read_timeout, minimum_bandwidth). A connection
858           will be aborted if the average traffic rate falls below
859           minimum_bandwidth bytes per second over an interval of read_timeout
860           seconds. Because timeouts are often a result of transient server
861           load, the actual connection timeout will be increased by a factor
862           of two on each retry.
863           Default: (2, 256, 32768).
864
865         :proxy_timeout:
866           The initial timeout (in seconds) for HTTP requests to
867           Keep proxies. A tuple of three floats is interpreted as
868           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
869           described above for adjusting connection timeouts on retry also
870           applies.
871           Default: (20, 256, 32768).
872
873         :api_token:
874           If you're not using an API client, but only talking
875           directly to a Keep proxy, this parameter specifies an API token
876           to authenticate Keep requests.  It is an error to specify both
877           api_client and api_token.  If you specify neither, KeepClient
878           will use one available from the Arvados configuration.
879
880         :local_store:
881           If specified, this KeepClient will bypass Keep
882           services, and save data to the named directory.  If unspecified,
883           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
884           environment variable.  If you want to ensure KeepClient does not
885           use local storage, pass in an empty string.  This is primarily
886           intended to mock a server for testing.
887
888         :num_retries:
889           The default number of times to retry failed requests.
890           This will be used as the default num_retries value when get() and
891           put() are called.  Default 0.
892         """
893         self.lock = threading.Lock()
894         if proxy is None:
895             if config.get('ARVADOS_KEEP_SERVICES'):
896                 proxy = config.get('ARVADOS_KEEP_SERVICES')
897             else:
898                 proxy = config.get('ARVADOS_KEEP_PROXY')
899         if api_token is None:
900             if api_client is None:
901                 api_token = config.get('ARVADOS_API_TOKEN')
902             else:
903                 api_token = api_client.api_token
904         elif api_client is not None:
905             raise ValueError(
906                 "can't build KeepClient with both API client and token")
907         if local_store is None:
908             local_store = os.environ.get('KEEP_LOCAL_STORE')
909
910         if api_client is None:
911             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
912         else:
913             self.insecure = api_client.insecure
914
915         self.block_cache = block_cache if block_cache else KeepBlockCache()
916         self.timeout = timeout
917         self.proxy_timeout = proxy_timeout
918         self._user_agent_pool = queue.LifoQueue()
919         self.upload_counter = Counter()
920         self.download_counter = Counter()
921         self.put_counter = Counter()
922         self.get_counter = Counter()
923         self.hits_counter = Counter()
924         self.misses_counter = Counter()
925         self._storage_classes_unsupported_warning = False
926         self._default_classes = []
927
928         if local_store:
929             self.local_store = local_store
930             self.head = self.local_store_head
931             self.get = self.local_store_get
932             self.put = self.local_store_put
933         else:
934             self.num_retries = num_retries
935             self.max_replicas_per_service = None
936             if proxy:
937                 proxy_uris = proxy.split()
938                 for i in range(len(proxy_uris)):
939                     if not proxy_uris[i].endswith('/'):
940                         proxy_uris[i] += '/'
941                     # URL validation
942                     url = urllib.parse.urlparse(proxy_uris[i])
943                     if not (url.scheme and url.netloc):
944                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
945                 self.api_token = api_token
946                 self._gateway_services = {}
947                 self._keep_services = [{
948                     'uuid': "00000-bi6l4-%015d" % idx,
949                     'service_type': 'proxy',
950                     '_service_root': uri,
951                     } for idx, uri in enumerate(proxy_uris)]
952                 self._writable_services = self._keep_services
953                 self.using_proxy = True
954                 self._static_services_list = True
955             else:
956                 # It's important to avoid instantiating an API client
957                 # unless we actually need one, for testing's sake.
958                 if api_client is None:
959                     api_client = arvados.api('v1')
960                 self.api_client = api_client
961                 self.api_token = api_client.api_token
962                 self._gateway_services = {}
963                 self._keep_services = None
964                 self._writable_services = None
965                 self.using_proxy = None
966                 self._static_services_list = False
967                 try:
968                     self._default_classes = [
969                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
970                 except KeyError:
971                     # We're talking to an old cluster
972                     pass
973
974     def current_timeout(self, attempt_number):
975         """Return the appropriate timeout to use for this client.
976
977         The proxy timeout setting if the backend service is currently a proxy,
978         the regular timeout setting otherwise.  The `attempt_number` indicates
979         how many times the operation has been tried already (starting from 0
980         for the first try), and scales the connection timeout portion of the
981         return value accordingly.
982
983         """
984         # TODO(twp): the timeout should be a property of a
985         # KeepService, not a KeepClient. See #4488.
986         t = self.proxy_timeout if self.using_proxy else self.timeout
987         if len(t) == 2:
988             return (t[0] * (1 << attempt_number), t[1])
989         else:
990             return (t[0] * (1 << attempt_number), t[1], t[2])
991     def _any_nondisk_services(self, service_list):
992         return any(ks.get('service_type', 'disk') != 'disk'
993                    for ks in service_list)
994
995     def build_services_list(self, force_rebuild=False):
996         if (self._static_services_list or
997               (self._keep_services and not force_rebuild)):
998             return
999         with self.lock:
1000             try:
1001                 keep_services = self.api_client.keep_services().accessible()
1002             except Exception:  # API server predates Keep services.
1003                 keep_services = self.api_client.keep_disks().list()
1004
1005             # Gateway services are only used when specified by UUID,
1006             # so there's nothing to gain by filtering them by
1007             # service_type.
1008             self._gateway_services = {ks['uuid']: ks for ks in
1009                                       keep_services.execute()['items']}
1010             if not self._gateway_services:
1011                 raise arvados.errors.NoKeepServersError()
1012
1013             # Precompute the base URI for each service.
1014             for r in self._gateway_services.values():
1015                 host = r['service_host']
1016                 if not host.startswith('[') and host.find(':') >= 0:
1017                     # IPv6 URIs must be formatted like http://[::1]:80/...
1018                     host = '[' + host + ']'
1019                 r['_service_root'] = "{}://{}:{:d}/".format(
1020                     'https' if r['service_ssl_flag'] else 'http',
1021                     host,
1022                     r['service_port'])
1023
1024             _logger.debug(str(self._gateway_services))
1025             self._keep_services = [
1026                 ks for ks in self._gateway_services.values()
1027                 if not ks.get('service_type', '').startswith('gateway:')]
1028             self._writable_services = [ks for ks in self._keep_services
1029                                        if not ks.get('read_only')]
1030
1031             # For disk type services, max_replicas_per_service is 1
1032             # It is unknown (unlimited) for other service types.
1033             if self._any_nondisk_services(self._writable_services):
1034                 self.max_replicas_per_service = None
1035             else:
1036                 self.max_replicas_per_service = 1
1037
1038     def _service_weight(self, data_hash, service_uuid):
1039         """Compute the weight of a Keep service endpoint for a data
1040         block with a known hash.
1041
1042         The weight is md5(h + u) where u is the last 15 characters of
1043         the service endpoint's UUID.
1044         """
1045         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1046
1047     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1048         """Return an array of Keep service endpoints, in the order in
1049         which they should be probed when reading or writing data with
1050         the given hash+hints.
1051         """
1052         self.build_services_list(force_rebuild)
1053
1054         sorted_roots = []
1055         # Use the services indicated by the given +K@... remote
1056         # service hints, if any are present and can be resolved to a
1057         # URI.
1058         for hint in locator.hints:
1059             if hint.startswith('K@'):
1060                 if len(hint) == 7:
1061                     sorted_roots.append(
1062                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1063                 elif len(hint) == 29:
1064                     svc = self._gateway_services.get(hint[2:])
1065                     if svc:
1066                         sorted_roots.append(svc['_service_root'])
1067
1068         # Sort the available local services by weight (heaviest first)
1069         # for this locator, and return their service_roots (base URIs)
1070         # in that order.
1071         use_services = self._keep_services
1072         if need_writable:
1073             use_services = self._writable_services
1074         self.using_proxy = self._any_nondisk_services(use_services)
1075         sorted_roots.extend([
1076             svc['_service_root'] for svc in sorted(
1077                 use_services,
1078                 reverse=True,
1079                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1080         _logger.debug("{}: {}".format(locator, sorted_roots))
1081         return sorted_roots
1082
1083     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1084         # roots_map is a dictionary, mapping Keep service root strings
1085         # to KeepService objects.  Poll for Keep services, and add any
1086         # new ones to roots_map.  Return the current list of local
1087         # root strings.
1088         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1089         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1090         for root in local_roots:
1091             if root not in roots_map:
1092                 roots_map[root] = self.KeepService(
1093                     root, self._user_agent_pool,
1094                     upload_counter=self.upload_counter,
1095                     download_counter=self.download_counter,
1096                     headers=headers,
1097                     insecure=self.insecure)
1098         return local_roots
1099
1100     @staticmethod
1101     def _check_loop_result(result):
1102         # KeepClient RetryLoops should save results as a 2-tuple: the
1103         # actual result of the request, and the number of servers available
1104         # to receive the request this round.
1105         # This method returns True if there's a real result, False if
1106         # there are no more servers available, otherwise None.
1107         if isinstance(result, Exception):
1108             return None
1109         result, tried_server_count = result
1110         if (result is not None) and (result is not False):
1111             return True
1112         elif tried_server_count < 1:
1113             _logger.info("No more Keep services to try; giving up")
1114             return False
1115         else:
1116             return None
1117
1118     def get_from_cache(self, loc_s):
1119         """Fetch a block only if is in the cache, otherwise return None."""
1120         locator = KeepLocator(loc_s)
1121         slot = self.block_cache.get(locator.md5sum)
1122         if slot is not None and slot.ready.is_set():
1123             return slot.get()
1124         else:
1125             return None
1126
1127     def refresh_signature(self, loc):
1128         """Ask Keep to get the remote block and return its local signature"""
1129         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1130         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1131
1132     @retry.retry_method
1133     def head(self, loc_s, **kwargs):
1134         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1135
1136     @retry.retry_method
1137     def get(self, loc_s, **kwargs):
1138         return self._get_or_head(loc_s, method="GET", **kwargs)
1139
1140     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1141         """Get data from Keep.
1142
1143         This method fetches one or more blocks of data from Keep.  It
1144         sends a request each Keep service registered with the API
1145         server (or the proxy provided when this client was
1146         instantiated), then each service named in location hints, in
1147         sequence.  As soon as one service provides the data, it's
1148         returned.
1149
1150         Arguments:
1151         * loc_s: A string of one or more comma-separated locators to fetch.
1152           This method returns the concatenation of these blocks.
1153         * num_retries: The number of times to retry GET requests to
1154           *each* Keep server if it returns temporary failures, with
1155           exponential backoff.  Note that, in each loop, the method may try
1156           to fetch data from every available Keep service, along with any
1157           that are named in location hints in the locator.  The default value
1158           is set when the KeepClient is initialized.
1159         """
1160         if ',' in loc_s:
1161             return ''.join(self.get(x) for x in loc_s.split(','))
1162
1163         self.get_counter.add(1)
1164
1165         request_id = (request_id or
1166                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1167                       arvados.util.new_request_id())
1168         if headers is None:
1169             headers = {}
1170         headers['X-Request-Id'] = request_id
1171
1172         slot = None
1173         blob = None
1174         try:
1175             locator = KeepLocator(loc_s)
1176             if method == "GET":
1177                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1178                 if not first:
1179                     if prefetch:
1180                         # this is request for a prefetch, if it is
1181                         # already in flight, return immediately.
1182                         # clear 'slot' to prevent finally block from
1183                         # calling slot.set()
1184                         slot = None
1185                         return None
1186                     self.hits_counter.add(1)
1187                     blob = slot.get()
1188                     if blob is None:
1189                         raise arvados.errors.KeepReadError(
1190                             "failed to read {}".format(loc_s))
1191                     return blob
1192
1193             self.misses_counter.add(1)
1194
1195             # If the locator has hints specifying a prefix (indicating a
1196             # remote keepproxy) or the UUID of a local gateway service,
1197             # read data from the indicated service(s) instead of the usual
1198             # list of local disk services.
1199             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1200                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1201             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1202                                for hint in locator.hints if (
1203                                        hint.startswith('K@') and
1204                                        len(hint) == 29 and
1205                                        self._gateway_services.get(hint[2:])
1206                                        )])
1207             # Map root URLs to their KeepService objects.
1208             roots_map = {
1209                 root: self.KeepService(root, self._user_agent_pool,
1210                                        upload_counter=self.upload_counter,
1211                                        download_counter=self.download_counter,
1212                                        headers=headers,
1213                                        insecure=self.insecure)
1214                 for root in hint_roots
1215             }
1216
1217             # See #3147 for a discussion of the loop implementation.  Highlights:
1218             # * Refresh the list of Keep services after each failure, in case
1219             #   it's being updated.
1220             # * Retry until we succeed, we're out of retries, or every available
1221             #   service has returned permanent failure.
1222             sorted_roots = []
1223             roots_map = {}
1224             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1225                                    backoff_start=2)
1226             for tries_left in loop:
1227                 try:
1228                     sorted_roots = self.map_new_services(
1229                         roots_map, locator,
1230                         force_rebuild=(tries_left < num_retries),
1231                         need_writable=False,
1232                         headers=headers)
1233                 except Exception as error:
1234                     loop.save_result(error)
1235                     continue
1236
1237                 # Query KeepService objects that haven't returned
1238                 # permanent failure, in our specified shuffle order.
1239                 services_to_try = [roots_map[root]
1240                                    for root in sorted_roots
1241                                    if roots_map[root].usable()]
1242                 for keep_service in services_to_try:
1243                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1244                     if blob is not None:
1245                         break
1246                 loop.save_result((blob, len(services_to_try)))
1247
1248             # Always cache the result, then return it if we succeeded.
1249             if loop.success():
1250                 return blob
1251         finally:
1252             if slot is not None:
1253                 self.block_cache.set(slot, blob)
1254
1255         # Q: Including 403 is necessary for the Keep tests to continue
1256         # passing, but maybe they should expect KeepReadError instead?
1257         not_founds = sum(1 for key in sorted_roots
1258                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1259         service_errors = ((key, roots_map[key].last_result()['error'])
1260                           for key in sorted_roots)
1261         if not roots_map:
1262             raise arvados.errors.KeepReadError(
1263                 "[{}] failed to read {}: no Keep services available ({})".format(
1264                     request_id, loc_s, loop.last_result()))
1265         elif not_founds == len(sorted_roots):
1266             raise arvados.errors.NotFoundError(
1267                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1268         else:
1269             raise arvados.errors.KeepReadError(
1270                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1271
1272     @retry.retry_method
1273     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1274         """Save data in Keep.
1275
1276         This method will get a list of Keep services from the API server, and
1277         send the data to each one simultaneously in a new thread.  Once the
1278         uploads are finished, if enough copies are saved, this method returns
1279         the most recent HTTP response body.  If requests fail to upload
1280         enough copies, this method raises KeepWriteError.
1281
1282         Arguments:
1283         * data: The string of data to upload.
1284         * copies: The number of copies that the user requires be saved.
1285           Default 2.
1286         * num_retries: The number of times to retry PUT requests to
1287           *each* Keep server if it returns temporary failures, with
1288           exponential backoff.  The default value is set when the
1289           KeepClient is initialized.
1290         * classes: An optional list of storage class names where copies should
1291           be written.
1292         """
1293
1294         classes = classes or self._default_classes
1295
1296         if not isinstance(data, bytes):
1297             data = data.encode()
1298
1299         self.put_counter.add(1)
1300
1301         data_hash = hashlib.md5(data).hexdigest()
1302         loc_s = data_hash + '+' + str(len(data))
1303         if copies < 1:
1304             return loc_s
1305         locator = KeepLocator(loc_s)
1306
1307         request_id = (request_id or
1308                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1309                       arvados.util.new_request_id())
1310         headers = {
1311             'X-Request-Id': request_id,
1312             'X-Keep-Desired-Replicas': str(copies),
1313         }
1314         roots_map = {}
1315         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1316                                backoff_start=2)
1317         done_copies = 0
1318         done_classes = []
1319         for tries_left in loop:
1320             try:
1321                 sorted_roots = self.map_new_services(
1322                     roots_map, locator,
1323                     force_rebuild=(tries_left < num_retries),
1324                     need_writable=True,
1325                     headers=headers)
1326             except Exception as error:
1327                 loop.save_result(error)
1328                 continue
1329
1330             pending_classes = []
1331             if done_classes is not None:
1332                 pending_classes = list(set(classes) - set(done_classes))
1333             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1334                                                         data_hash=data_hash,
1335                                                         copies=copies - done_copies,
1336                                                         max_service_replicas=self.max_replicas_per_service,
1337                                                         timeout=self.current_timeout(num_retries - tries_left),
1338                                                         classes=pending_classes)
1339             for service_root, ks in [(root, roots_map[root])
1340                                      for root in sorted_roots]:
1341                 if ks.finished():
1342                     continue
1343                 writer_pool.add_task(ks, service_root)
1344             writer_pool.join()
1345             pool_copies, pool_classes = writer_pool.done()
1346             done_copies += pool_copies
1347             if (done_classes is not None) and (pool_classes is not None):
1348                 done_classes += pool_classes
1349                 loop.save_result(
1350                     (done_copies >= copies and set(done_classes) == set(classes),
1351                     writer_pool.total_task_nr))
1352             else:
1353                 # Old keepstore contacted without storage classes support:
1354                 # success is determined only by successful copies.
1355                 #
1356                 # Disable storage classes tracking from this point forward.
1357                 if not self._storage_classes_unsupported_warning:
1358                     self._storage_classes_unsupported_warning = True
1359                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1360                 done_classes = None
1361                 loop.save_result(
1362                     (done_copies >= copies, writer_pool.total_task_nr))
1363
1364         if loop.success():
1365             return writer_pool.response()
1366         if not roots_map:
1367             raise arvados.errors.KeepWriteError(
1368                 "[{}] failed to write {}: no Keep services available ({})".format(
1369                     request_id, data_hash, loop.last_result()))
1370         else:
1371             service_errors = ((key, roots_map[key].last_result()['error'])
1372                               for key in sorted_roots
1373                               if roots_map[key].last_result()['error'])
1374             raise arvados.errors.KeepWriteError(
1375                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1376                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1377
1378     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1379         """A stub for put().
1380
1381         This method is used in place of the real put() method when
1382         using local storage (see constructor's local_store argument).
1383
1384         copies and num_retries arguments are ignored: they are here
1385         only for the sake of offering the same call signature as
1386         put().
1387
1388         Data stored this way can be retrieved via local_store_get().
1389         """
1390         md5 = hashlib.md5(data).hexdigest()
1391         locator = '%s+%d' % (md5, len(data))
1392         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1393             f.write(data)
1394         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1395                   os.path.join(self.local_store, md5))
1396         return locator
1397
1398     def local_store_get(self, loc_s, num_retries=None):
1399         """Companion to local_store_put()."""
1400         try:
1401             locator = KeepLocator(loc_s)
1402         except ValueError:
1403             raise arvados.errors.NotFoundError(
1404                 "Invalid data locator: '%s'" % loc_s)
1405         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1406             return b''
1407         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1408             return f.read()
1409
1410     def local_store_head(self, loc_s, num_retries=None):
1411         """Companion to local_store_put()."""
1412         try:
1413             locator = KeepLocator(loc_s)
1414         except ValueError:
1415             raise arvados.errors.NotFoundError(
1416                 "Invalid data locator: '%s'" % loc_s)
1417         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1418             return True
1419         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1420             return True