Merge branch '19954-permission-dedup-doc'
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34 import weakref
35
36 if sys.version_info >= (3, 0):
37     from io import BytesIO
38 else:
39     from cStringIO import StringIO as BytesIO
40
41 import arvados
42 import arvados.config as config
43 import arvados.errors
44 import arvados.retry as retry
45 import arvados.util
46 import arvados.diskcache
47
48 _logger = logging.getLogger('arvados.keep')
49 global_client_object = None
50
51
52 # Monkey patch TCP constants when not available (apple). Values sourced from:
53 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
54 if sys.platform == 'darwin':
55     if not hasattr(socket, 'TCP_KEEPALIVE'):
56         socket.TCP_KEEPALIVE = 0x010
57     if not hasattr(socket, 'TCP_KEEPINTVL'):
58         socket.TCP_KEEPINTVL = 0x101
59     if not hasattr(socket, 'TCP_KEEPCNT'):
60         socket.TCP_KEEPCNT = 0x102
61
62
63 class KeepLocator(object):
64     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
65     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
66
67     def __init__(self, locator_str):
68         self.hints = []
69         self._perm_sig = None
70         self._perm_expiry = None
71         pieces = iter(locator_str.split('+'))
72         self.md5sum = next(pieces)
73         try:
74             self.size = int(next(pieces))
75         except StopIteration:
76             self.size = None
77         for hint in pieces:
78             if self.HINT_RE.match(hint) is None:
79                 raise ValueError("invalid hint format: {}".format(hint))
80             elif hint.startswith('A'):
81                 self.parse_permission_hint(hint)
82             else:
83                 self.hints.append(hint)
84
85     def __str__(self):
86         return '+'.join(
87             native_str(s)
88             for s in [self.md5sum, self.size,
89                       self.permission_hint()] + self.hints
90             if s is not None)
91
92     def stripped(self):
93         if self.size is not None:
94             return "%s+%i" % (self.md5sum, self.size)
95         else:
96             return self.md5sum
97
98     def _make_hex_prop(name, length):
99         # Build and return a new property with the given name that
100         # must be a hex string of the given length.
101         data_name = '_{}'.format(name)
102         def getter(self):
103             return getattr(self, data_name)
104         def setter(self, hex_str):
105             if not arvados.util.is_hex(hex_str, length):
106                 raise ValueError("{} is not a {}-digit hex string: {!r}".
107                                  format(name, length, hex_str))
108             setattr(self, data_name, hex_str)
109         return property(getter, setter)
110
111     md5sum = _make_hex_prop('md5sum', 32)
112     perm_sig = _make_hex_prop('perm_sig', 40)
113
114     @property
115     def perm_expiry(self):
116         return self._perm_expiry
117
118     @perm_expiry.setter
119     def perm_expiry(self, value):
120         if not arvados.util.is_hex(value, 1, 8):
121             raise ValueError(
122                 "permission timestamp must be a hex Unix timestamp: {}".
123                 format(value))
124         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
125
126     def permission_hint(self):
127         data = [self.perm_sig, self.perm_expiry]
128         if None in data:
129             return None
130         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
131         return "A{}@{:08x}".format(*data)
132
133     def parse_permission_hint(self, s):
134         try:
135             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
136         except IndexError:
137             raise ValueError("bad permission hint {}".format(s))
138
139     def permission_expired(self, as_of_dt=None):
140         if self.perm_expiry is None:
141             return False
142         elif as_of_dt is None:
143             as_of_dt = datetime.datetime.now()
144         return self.perm_expiry <= as_of_dt
145
146
147 class Keep(object):
148     """Simple interface to a global KeepClient object.
149
150     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
151     own API client.  The global KeepClient will build an API client from the
152     current Arvados configuration, which may not match the one you built.
153     """
154     _last_key = None
155
156     @classmethod
157     def global_client_object(cls):
158         global global_client_object
159         # Previously, KeepClient would change its behavior at runtime based
160         # on these configuration settings.  We simulate that behavior here
161         # by checking the values and returning a new KeepClient if any of
162         # them have changed.
163         key = (config.get('ARVADOS_API_HOST'),
164                config.get('ARVADOS_API_TOKEN'),
165                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
166                config.get('ARVADOS_KEEP_PROXY'),
167                os.environ.get('KEEP_LOCAL_STORE'))
168         if (global_client_object is None) or (cls._last_key != key):
169             global_client_object = KeepClient()
170             cls._last_key = key
171         return global_client_object
172
173     @staticmethod
174     def get(locator, **kwargs):
175         return Keep.global_client_object().get(locator, **kwargs)
176
177     @staticmethod
178     def put(data, **kwargs):
179         return Keep.global_client_object().put(data, **kwargs)
180
181 class KeepBlockCache(object):
182     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
183         self.cache_max = cache_max
184         self._cache = []
185         self._cache_lock = threading.Lock()
186         self._max_slots = max_slots
187         self._disk_cache = disk_cache
188         self._disk_cache_dir = disk_cache_dir
189         self._cache_updating = threading.Condition(self._cache_lock)
190
191         if self._disk_cache and self._disk_cache_dir is None:
192             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
193             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
194
195         if self._max_slots == 0:
196             if self._disk_cache:
197                 # Each block uses two file descriptors, one used to
198                 # open it initially and hold the flock(), and a second
199                 # hidden one used by mmap().
200                 #
201                 # Set max slots to 1/8 of maximum file handles.  This
202                 # means we'll use at most 1/4 of total file handles.
203                 #
204                 # NOFILE typically defaults to 1024 on Linux so this
205                 # is 128 slots (256 file handles), which means we can
206                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
207                 # 768 file handles for sockets and other stuff.
208                 #
209                 # When we want the ability to have more cache (e.g. in
210                 # arv-mount) we'll increase rlimit before calling
211                 # this.
212                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
213             else:
214                 # RAM cache slots
215                 self._max_slots = 512
216
217         if self.cache_max == 0:
218             if self._disk_cache:
219                 fs = os.statvfs(self._disk_cache_dir)
220                 # Calculation of available space incorporates existing cache usage
221                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
222                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
223                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
224                 # pick smallest of:
225                 # 10% of total disk size
226                 # 25% of available space
227                 # max_slots * 64 MiB
228                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
229             else:
230                 # 256 MiB in RAM
231                 self.cache_max = (256 * 1024 * 1024)
232
233         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
234
235         if self._disk_cache:
236             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
237             self.cap_cache()
238
239
240     class CacheSlot(object):
241         __slots__ = ("locator", "ready", "content")
242
243         def __init__(self, locator):
244             self.locator = locator
245             self.ready = threading.Event()
246             self.content = None
247
248         def get(self):
249             self.ready.wait()
250             return self.content
251
252         def set(self, value):
253             self.content = value
254             self.ready.set()
255
256         def size(self):
257             if self.content is None:
258                 return 0
259             else:
260                 return len(self.content)
261
262         def evict(self):
263             self.content = None
264             return self.gone()
265
266         def gone(self):
267             return (self.content is None)
268
269     def _resize_cache(self, cache_max, max_slots):
270         # Try and make sure the contents of the cache do not exceed
271         # the supplied maximums.
272
273         # Select all slots except those where ready.is_set() and content is
274         # None (that means there was an error reading the block).
275         self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
276         sm = sum([slot.size() for slot in self._cache])
277         while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
278             for i in range(len(self._cache)-1, -1, -1):
279                 # start from the back, find a slot that is a candidate to evict
280                 if self._cache[i].ready.is_set():
281                     sz = self._cache[i].size()
282
283                     # If evict returns false it means the
284                     # underlying disk cache couldn't lock the file
285                     # for deletion because another process was using
286                     # it. Don't count it as reducing the amount
287                     # of data in the cache, find something else to
288                     # throw out.
289                     if self._cache[i].evict():
290                         sm -= sz
291
292                     # check to make sure the underlying data is gone
293                     if self._cache[i].gone():
294                         # either way we forget about it.  either the
295                         # other process will delete it, or if we need
296                         # it again and it is still there, we'll find
297                         # it on disk.
298                         del self._cache[i]
299                     break
300
301
302     def cap_cache(self):
303         '''Cap the cache size to self.cache_max'''
304         with self._cache_updating:
305             self._resize_cache(self.cache_max, self._max_slots)
306             self._cache_updating.notify_all()
307
308     def _get(self, locator):
309         # Test if the locator is already in the cache
310         for i in range(0, len(self._cache)):
311             if self._cache[i].locator == locator:
312                 n = self._cache[i]
313                 if i != 0:
314                     # move it to the front
315                     del self._cache[i]
316                     self._cache.insert(0, n)
317                 return n
318         if self._disk_cache:
319             # see if it exists on disk
320             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
321             if n is not None:
322                 self._cache.insert(0, n)
323                 return n
324         return None
325
326     def get(self, locator):
327         with self._cache_lock:
328             return self._get(locator)
329
330     def reserve_cache(self, locator):
331         '''Reserve a cache slot for the specified locator,
332         or return the existing slot.'''
333         with self._cache_updating:
334             n = self._get(locator)
335             if n:
336                 return n, False
337             else:
338                 # Add a new cache slot for the locator
339                 self._resize_cache(self.cache_max, self._max_slots-1)
340                 while len(self._cache) >= self._max_slots:
341                     # If there isn't a slot available, need to wait
342                     # for something to happen that releases one of the
343                     # cache slots.  Idle for 200 ms or woken up by
344                     # another thread
345                     self._cache_updating.wait(timeout=0.2)
346                     self._resize_cache(self.cache_max, self._max_slots-1)
347
348                 if self._disk_cache:
349                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
350                 else:
351                     n = KeepBlockCache.CacheSlot(locator)
352                 self._cache.insert(0, n)
353                 return n, True
354
355     def set(self, slot, blob):
356         try:
357             slot.set(blob)
358             return
359         except OSError as e:
360             if e.errno == errno.ENOMEM:
361                 # Reduce max slots to current - 4, cap cache and retry
362                 with self._cache_lock:
363                     self._max_slots = max(4, len(self._cache) - 4)
364             elif e.errno == errno.ENOSPC:
365                 # Reduce disk max space to current - 256 MiB, cap cache and retry
366                 with self._cache_lock:
367                     sm = sum([st.size() for st in self._cache])
368                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
369             elif e.errno == errno.ENODEV:
370                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
371         except Exception as e:
372             pass
373         finally:
374             # Check if we should evict things from the cache.  Either
375             # because we added a new thing or there was an error and
376             # we possibly adjusted the limits down, so we might need
377             # to push something out.
378             self.cap_cache()
379
380         try:
381             # Only gets here if there was an error the first time. The
382             # exception handler adjusts limits downward in some cases
383             # to free up resources, which would make the operation
384             # succeed.
385             slot.set(blob)
386         except Exception as e:
387             # It failed again.  Give up.
388             slot.set(None)
389             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
390
391         self.cap_cache()
392
393 class Counter(object):
394     def __init__(self, v=0):
395         self._lk = threading.Lock()
396         self._val = v
397
398     def add(self, v):
399         with self._lk:
400             self._val += v
401
402     def get(self):
403         with self._lk:
404             return self._val
405
406
407 class KeepClient(object):
408
409     # Default Keep server connection timeout:  2 seconds
410     # Default Keep server read timeout:       256 seconds
411     # Default Keep server bandwidth minimum:  32768 bytes per second
412     # Default Keep proxy connection timeout:  20 seconds
413     # Default Keep proxy read timeout:        256 seconds
414     # Default Keep proxy bandwidth minimum:   32768 bytes per second
415     DEFAULT_TIMEOUT = (2, 256, 32768)
416     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
417
418
419     class KeepService(object):
420         """Make requests to a single Keep service, and track results.
421
422         A KeepService is intended to last long enough to perform one
423         transaction (GET or PUT) against one Keep service. This can
424         involve calling either get() or put() multiple times in order
425         to retry after transient failures. However, calling both get()
426         and put() on a single instance -- or using the same instance
427         to access two different Keep services -- will not produce
428         sensible behavior.
429         """
430
431         HTTP_ERRORS = (
432             socket.error,
433             ssl.SSLError,
434             arvados.errors.HttpError,
435         )
436
437         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
438                      upload_counter=None,
439                      download_counter=None,
440                      headers={},
441                      insecure=False):
442             self.root = root
443             self._user_agent_pool = user_agent_pool
444             self._result = {'error': None}
445             self._usable = True
446             self._session = None
447             self._socket = None
448             self.get_headers = {'Accept': 'application/octet-stream'}
449             self.get_headers.update(headers)
450             self.put_headers = headers
451             self.upload_counter = upload_counter
452             self.download_counter = download_counter
453             self.insecure = insecure
454
455         def usable(self):
456             """Is it worth attempting a request?"""
457             return self._usable
458
459         def finished(self):
460             """Did the request succeed or encounter permanent failure?"""
461             return self._result['error'] == False or not self._usable
462
463         def last_result(self):
464             return self._result
465
466         def _get_user_agent(self):
467             try:
468                 return self._user_agent_pool.get(block=False)
469             except queue.Empty:
470                 return pycurl.Curl()
471
472         def _put_user_agent(self, ua):
473             try:
474                 ua.reset()
475                 self._user_agent_pool.put(ua, block=False)
476             except:
477                 ua.close()
478
479         def _socket_open(self, *args, **kwargs):
480             if len(args) + len(kwargs) == 2:
481                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
482             else:
483                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
484
485         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
486             return self._socket_open_pycurl_7_21_5(
487                 purpose=None,
488                 address=collections.namedtuple(
489                     'Address', ['family', 'socktype', 'protocol', 'addr'],
490                 )(family, socktype, protocol, address))
491
492         def _socket_open_pycurl_7_21_5(self, purpose, address):
493             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
494             s = socket.socket(address.family, address.socktype, address.protocol)
495             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
496             # Will throw invalid protocol error on mac. This test prevents that.
497             if hasattr(socket, 'TCP_KEEPIDLE'):
498                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
499             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
500             self._socket = s
501             return s
502
503         def get(self, locator, method="GET", timeout=None):
504             # locator is a KeepLocator object.
505             url = self.root + str(locator)
506             _logger.debug("Request: %s %s", method, url)
507             curl = self._get_user_agent()
508             ok = None
509             try:
510                 with timer.Timer() as t:
511                     self._headers = {}
512                     response_body = BytesIO()
513                     curl.setopt(pycurl.NOSIGNAL, 1)
514                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
515                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
516                     curl.setopt(pycurl.URL, url.encode('utf-8'))
517                     curl.setopt(pycurl.HTTPHEADER, [
518                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
519                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
520                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
521                     if self.insecure:
522                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
523                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
524                     else:
525                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
526                     if method == "HEAD":
527                         curl.setopt(pycurl.NOBODY, True)
528                     self._setcurltimeouts(curl, timeout, method=="HEAD")
529
530                     try:
531                         curl.perform()
532                     except Exception as e:
533                         raise arvados.errors.HttpError(0, str(e))
534                     finally:
535                         if self._socket:
536                             self._socket.close()
537                             self._socket = None
538                     self._result = {
539                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
540                         'body': response_body.getvalue(),
541                         'headers': self._headers,
542                         'error': False,
543                     }
544
545                 ok = retry.check_http_response_success(self._result['status_code'])
546                 if not ok:
547                     self._result['error'] = arvados.errors.HttpError(
548                         self._result['status_code'],
549                         self._headers.get('x-status-line', 'Error'))
550             except self.HTTP_ERRORS as e:
551                 self._result = {
552                     'error': e,
553                 }
554             self._usable = ok != False
555             if self._result.get('status_code', None):
556                 # The client worked well enough to get an HTTP status
557                 # code, so presumably any problems are just on the
558                 # server side and it's OK to reuse the client.
559                 self._put_user_agent(curl)
560             else:
561                 # Don't return this client to the pool, in case it's
562                 # broken.
563                 curl.close()
564             if not ok:
565                 _logger.debug("Request fail: GET %s => %s: %s",
566                               url, type(self._result['error']), str(self._result['error']))
567                 return None
568             if method == "HEAD":
569                 _logger.info("HEAD %s: %s bytes",
570                          self._result['status_code'],
571                          self._result.get('content-length'))
572                 if self._result['headers'].get('x-keep-locator'):
573                     # This is a response to a remote block copy request, return
574                     # the local copy block locator.
575                     return self._result['headers'].get('x-keep-locator')
576                 return True
577
578             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
579                          self._result['status_code'],
580                          len(self._result['body']),
581                          t.msecs,
582                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
583
584             if self.download_counter:
585                 self.download_counter.add(len(self._result['body']))
586             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
587             if resp_md5 != locator.md5sum:
588                 _logger.warning("Checksum fail: md5(%s) = %s",
589                                 url, resp_md5)
590                 self._result['error'] = arvados.errors.HttpError(
591                     0, 'Checksum fail')
592                 return None
593             return self._result['body']
594
595         def put(self, hash_s, body, timeout=None, headers={}):
596             put_headers = copy.copy(self.put_headers)
597             put_headers.update(headers)
598             url = self.root + hash_s
599             _logger.debug("Request: PUT %s", url)
600             curl = self._get_user_agent()
601             ok = None
602             try:
603                 with timer.Timer() as t:
604                     self._headers = {}
605                     body_reader = BytesIO(body)
606                     response_body = BytesIO()
607                     curl.setopt(pycurl.NOSIGNAL, 1)
608                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
609                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
610                     curl.setopt(pycurl.URL, url.encode('utf-8'))
611                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
612                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
613                     # response) instead of sending the request body immediately.
614                     # This allows the server to reject the request if the request
615                     # is invalid or the server is read-only, without waiting for
616                     # the client to send the entire block.
617                     curl.setopt(pycurl.UPLOAD, True)
618                     curl.setopt(pycurl.INFILESIZE, len(body))
619                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
620                     curl.setopt(pycurl.HTTPHEADER, [
621                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
622                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
623                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
624                     if self.insecure:
625                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
626                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
627                     else:
628                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
629                     self._setcurltimeouts(curl, timeout)
630                     try:
631                         curl.perform()
632                     except Exception as e:
633                         raise arvados.errors.HttpError(0, str(e))
634                     finally:
635                         if self._socket:
636                             self._socket.close()
637                             self._socket = None
638                     self._result = {
639                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
640                         'body': response_body.getvalue().decode('utf-8'),
641                         'headers': self._headers,
642                         'error': False,
643                     }
644                 ok = retry.check_http_response_success(self._result['status_code'])
645                 if not ok:
646                     self._result['error'] = arvados.errors.HttpError(
647                         self._result['status_code'],
648                         self._headers.get('x-status-line', 'Error'))
649             except self.HTTP_ERRORS as e:
650                 self._result = {
651                     'error': e,
652                 }
653             self._usable = ok != False # still usable if ok is True or None
654             if self._result.get('status_code', None):
655                 # Client is functional. See comment in get().
656                 self._put_user_agent(curl)
657             else:
658                 curl.close()
659             if not ok:
660                 _logger.debug("Request fail: PUT %s => %s: %s",
661                               url, type(self._result['error']), str(self._result['error']))
662                 return False
663             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
664                          self._result['status_code'],
665                          len(body),
666                          t.msecs,
667                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
668             if self.upload_counter:
669                 self.upload_counter.add(len(body))
670             return True
671
672         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
673             if not timeouts:
674                 return
675             elif isinstance(timeouts, tuple):
676                 if len(timeouts) == 2:
677                     conn_t, xfer_t = timeouts
678                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
679                 else:
680                     conn_t, xfer_t, bandwidth_bps = timeouts
681             else:
682                 conn_t, xfer_t = (timeouts, timeouts)
683                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
684             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
685             if not ignore_bandwidth:
686                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
687                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
688
689         def _headerfunction(self, header_line):
690             if isinstance(header_line, bytes):
691                 header_line = header_line.decode('iso-8859-1')
692             if ':' in header_line:
693                 name, value = header_line.split(':', 1)
694                 name = name.strip().lower()
695                 value = value.strip()
696             elif self._headers:
697                 name = self._lastheadername
698                 value = self._headers[name] + ' ' + header_line.strip()
699             elif header_line.startswith('HTTP/'):
700                 name = 'x-status-line'
701                 value = header_line
702             else:
703                 _logger.error("Unexpected header line: %s", header_line)
704                 return
705             self._lastheadername = name
706             self._headers[name] = value
707             # Returning None implies all bytes were written
708
709
710     class KeepWriterQueue(queue.Queue):
711         def __init__(self, copies, classes=[]):
712             queue.Queue.__init__(self) # Old-style superclass
713             self.wanted_copies = copies
714             self.wanted_storage_classes = classes
715             self.successful_copies = 0
716             self.confirmed_storage_classes = {}
717             self.response = None
718             self.storage_classes_tracking = True
719             self.queue_data_lock = threading.RLock()
720             self.pending_tries = max(copies, len(classes))
721             self.pending_tries_notification = threading.Condition()
722
723         def write_success(self, response, replicas_nr, classes_confirmed):
724             with self.queue_data_lock:
725                 self.successful_copies += replicas_nr
726                 if classes_confirmed is None:
727                     self.storage_classes_tracking = False
728                 elif self.storage_classes_tracking:
729                     for st_class, st_copies in classes_confirmed.items():
730                         try:
731                             self.confirmed_storage_classes[st_class] += st_copies
732                         except KeyError:
733                             self.confirmed_storage_classes[st_class] = st_copies
734                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
735                 self.response = response
736             with self.pending_tries_notification:
737                 self.pending_tries_notification.notify_all()
738
739         def write_fail(self, ks):
740             with self.pending_tries_notification:
741                 self.pending_tries += 1
742                 self.pending_tries_notification.notify()
743
744         def pending_copies(self):
745             with self.queue_data_lock:
746                 return self.wanted_copies - self.successful_copies
747
748         def satisfied_classes(self):
749             with self.queue_data_lock:
750                 if not self.storage_classes_tracking:
751                     # Notifies disabled storage classes expectation to
752                     # the outer loop.
753                     return None
754             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
755
756         def pending_classes(self):
757             with self.queue_data_lock:
758                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
759                     return []
760                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
761                 for st_class, st_copies in self.confirmed_storage_classes.items():
762                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
763                         unsatisfied_classes.remove(st_class)
764                 return unsatisfied_classes
765
766         def get_next_task(self):
767             with self.pending_tries_notification:
768                 while True:
769                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
770                         # This notify_all() is unnecessary --
771                         # write_success() already called notify_all()
772                         # when pending<1 became true, so it's not
773                         # possible for any other thread to be in
774                         # wait() now -- but it's cheap insurance
775                         # against deadlock so we do it anyway:
776                         self.pending_tries_notification.notify_all()
777                         # Drain the queue and then raise Queue.Empty
778                         while True:
779                             self.get_nowait()
780                             self.task_done()
781                     elif self.pending_tries > 0:
782                         service, service_root = self.get_nowait()
783                         if service.finished():
784                             self.task_done()
785                             continue
786                         self.pending_tries -= 1
787                         return service, service_root
788                     elif self.empty():
789                         self.pending_tries_notification.notify_all()
790                         raise queue.Empty
791                     else:
792                         self.pending_tries_notification.wait()
793
794
795     class KeepWriterThreadPool(object):
796         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
797             self.total_task_nr = 0
798             if (not max_service_replicas) or (max_service_replicas >= copies):
799                 num_threads = 1
800             else:
801                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
802             _logger.debug("Pool max threads is %d", num_threads)
803             self.workers = []
804             self.queue = KeepClient.KeepWriterQueue(copies, classes)
805             # Create workers
806             for _ in range(num_threads):
807                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
808                 self.workers.append(w)
809
810         def add_task(self, ks, service_root):
811             self.queue.put((ks, service_root))
812             self.total_task_nr += 1
813
814         def done(self):
815             return self.queue.successful_copies, self.queue.satisfied_classes()
816
817         def join(self):
818             # Start workers
819             for worker in self.workers:
820                 worker.start()
821             # Wait for finished work
822             self.queue.join()
823
824         def response(self):
825             return self.queue.response
826
827
828     class KeepWriterThread(threading.Thread):
829         class TaskFailed(RuntimeError): pass
830
831         def __init__(self, queue, data, data_hash, timeout=None):
832             super(KeepClient.KeepWriterThread, self).__init__()
833             self.timeout = timeout
834             self.queue = queue
835             self.data = data
836             self.data_hash = data_hash
837             self.daemon = True
838
839         def run(self):
840             while True:
841                 try:
842                     service, service_root = self.queue.get_next_task()
843                 except queue.Empty:
844                     return
845                 try:
846                     locator, copies, classes = self.do_task(service, service_root)
847                 except Exception as e:
848                     if not isinstance(e, self.TaskFailed):
849                         _logger.exception("Exception in KeepWriterThread")
850                     self.queue.write_fail(service)
851                 else:
852                     self.queue.write_success(locator, copies, classes)
853                 finally:
854                     self.queue.task_done()
855
856         def do_task(self, service, service_root):
857             classes = self.queue.pending_classes()
858             headers = {}
859             if len(classes) > 0:
860                 classes.sort()
861                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
862             success = bool(service.put(self.data_hash,
863                                         self.data,
864                                         timeout=self.timeout,
865                                         headers=headers))
866             result = service.last_result()
867
868             if not success:
869                 if result.get('status_code'):
870                     _logger.debug("Request fail: PUT %s => %s %s",
871                                   self.data_hash,
872                                   result.get('status_code'),
873                                   result.get('body'))
874                 raise self.TaskFailed()
875
876             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
877                           str(threading.current_thread()),
878                           self.data_hash,
879                           len(self.data),
880                           service_root)
881             try:
882                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
883             except (KeyError, ValueError):
884                 replicas_stored = 1
885
886             classes_confirmed = {}
887             try:
888                 scch = result['headers']['x-keep-storage-classes-confirmed']
889                 for confirmation in scch.replace(' ', '').split(','):
890                     if '=' in confirmation:
891                         stored_class, stored_copies = confirmation.split('=')[:2]
892                         classes_confirmed[stored_class] = int(stored_copies)
893             except (KeyError, ValueError):
894                 # Storage classes confirmed header missing or corrupt
895                 classes_confirmed = None
896
897             return result['body'].strip(), replicas_stored, classes_confirmed
898
899
900     def __init__(self, api_client=None, proxy=None,
901                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
902                  api_token=None, local_store=None, block_cache=None,
903                  num_retries=0, session=None):
904         """Initialize a new KeepClient.
905
906         Arguments:
907         :api_client:
908           The API client to use to find Keep services.  If not
909           provided, KeepClient will build one from available Arvados
910           configuration.
911
912         :proxy:
913           If specified, this KeepClient will send requests to this Keep
914           proxy.  Otherwise, KeepClient will fall back to the setting of the
915           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
916           If you want to KeepClient does not use a proxy, pass in an empty
917           string.
918
919         :timeout:
920           The initial timeout (in seconds) for HTTP requests to Keep
921           non-proxy servers.  A tuple of three floats is interpreted as
922           (connection_timeout, read_timeout, minimum_bandwidth). A connection
923           will be aborted if the average traffic rate falls below
924           minimum_bandwidth bytes per second over an interval of read_timeout
925           seconds. Because timeouts are often a result of transient server
926           load, the actual connection timeout will be increased by a factor
927           of two on each retry.
928           Default: (2, 256, 32768).
929
930         :proxy_timeout:
931           The initial timeout (in seconds) for HTTP requests to
932           Keep proxies. A tuple of three floats is interpreted as
933           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
934           described above for adjusting connection timeouts on retry also
935           applies.
936           Default: (20, 256, 32768).
937
938         :api_token:
939           If you're not using an API client, but only talking
940           directly to a Keep proxy, this parameter specifies an API token
941           to authenticate Keep requests.  It is an error to specify both
942           api_client and api_token.  If you specify neither, KeepClient
943           will use one available from the Arvados configuration.
944
945         :local_store:
946           If specified, this KeepClient will bypass Keep
947           services, and save data to the named directory.  If unspecified,
948           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
949           environment variable.  If you want to ensure KeepClient does not
950           use local storage, pass in an empty string.  This is primarily
951           intended to mock a server for testing.
952
953         :num_retries:
954           The default number of times to retry failed requests.
955           This will be used as the default num_retries value when get() and
956           put() are called.  Default 0.
957         """
958         self.lock = threading.Lock()
959         if proxy is None:
960             if config.get('ARVADOS_KEEP_SERVICES'):
961                 proxy = config.get('ARVADOS_KEEP_SERVICES')
962             else:
963                 proxy = config.get('ARVADOS_KEEP_PROXY')
964         if api_token is None:
965             if api_client is None:
966                 api_token = config.get('ARVADOS_API_TOKEN')
967             else:
968                 api_token = api_client.api_token
969         elif api_client is not None:
970             raise ValueError(
971                 "can't build KeepClient with both API client and token")
972         if local_store is None:
973             local_store = os.environ.get('KEEP_LOCAL_STORE')
974
975         if api_client is None:
976             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
977         else:
978             self.insecure = api_client.insecure
979
980         self.block_cache = block_cache if block_cache else KeepBlockCache()
981         self.timeout = timeout
982         self.proxy_timeout = proxy_timeout
983         self._user_agent_pool = queue.LifoQueue()
984         self.upload_counter = Counter()
985         self.download_counter = Counter()
986         self.put_counter = Counter()
987         self.get_counter = Counter()
988         self.hits_counter = Counter()
989         self.misses_counter = Counter()
990         self._storage_classes_unsupported_warning = False
991         self._default_classes = []
992
993         if local_store:
994             self.local_store = local_store
995             self.head = self.local_store_head
996             self.get = self.local_store_get
997             self.put = self.local_store_put
998         else:
999             self.num_retries = num_retries
1000             self.max_replicas_per_service = None
1001             if proxy:
1002                 proxy_uris = proxy.split()
1003                 for i in range(len(proxy_uris)):
1004                     if not proxy_uris[i].endswith('/'):
1005                         proxy_uris[i] += '/'
1006                     # URL validation
1007                     url = urllib.parse.urlparse(proxy_uris[i])
1008                     if not (url.scheme and url.netloc):
1009                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
1010                 self.api_token = api_token
1011                 self._gateway_services = {}
1012                 self._keep_services = [{
1013                     'uuid': "00000-bi6l4-%015d" % idx,
1014                     'service_type': 'proxy',
1015                     '_service_root': uri,
1016                     } for idx, uri in enumerate(proxy_uris)]
1017                 self._writable_services = self._keep_services
1018                 self.using_proxy = True
1019                 self._static_services_list = True
1020             else:
1021                 # It's important to avoid instantiating an API client
1022                 # unless we actually need one, for testing's sake.
1023                 if api_client is None:
1024                     api_client = arvados.api('v1')
1025                 self.api_client = api_client
1026                 self.api_token = api_client.api_token
1027                 self._gateway_services = {}
1028                 self._keep_services = None
1029                 self._writable_services = None
1030                 self.using_proxy = None
1031                 self._static_services_list = False
1032                 try:
1033                     self._default_classes = [
1034                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
1035                 except KeyError:
1036                     # We're talking to an old cluster
1037                     pass
1038
1039     def current_timeout(self, attempt_number):
1040         """Return the appropriate timeout to use for this client.
1041
1042         The proxy timeout setting if the backend service is currently a proxy,
1043         the regular timeout setting otherwise.  The `attempt_number` indicates
1044         how many times the operation has been tried already (starting from 0
1045         for the first try), and scales the connection timeout portion of the
1046         return value accordingly.
1047
1048         """
1049         # TODO(twp): the timeout should be a property of a
1050         # KeepService, not a KeepClient. See #4488.
1051         t = self.proxy_timeout if self.using_proxy else self.timeout
1052         if len(t) == 2:
1053             return (t[0] * (1 << attempt_number), t[1])
1054         else:
1055             return (t[0] * (1 << attempt_number), t[1], t[2])
1056     def _any_nondisk_services(self, service_list):
1057         return any(ks.get('service_type', 'disk') != 'disk'
1058                    for ks in service_list)
1059
1060     def build_services_list(self, force_rebuild=False):
1061         if (self._static_services_list or
1062               (self._keep_services and not force_rebuild)):
1063             return
1064         with self.lock:
1065             try:
1066                 keep_services = self.api_client.keep_services().accessible()
1067             except Exception:  # API server predates Keep services.
1068                 keep_services = self.api_client.keep_disks().list()
1069
1070             # Gateway services are only used when specified by UUID,
1071             # so there's nothing to gain by filtering them by
1072             # service_type.
1073             self._gateway_services = {ks['uuid']: ks for ks in
1074                                       keep_services.execute()['items']}
1075             if not self._gateway_services:
1076                 raise arvados.errors.NoKeepServersError()
1077
1078             # Precompute the base URI for each service.
1079             for r in self._gateway_services.values():
1080                 host = r['service_host']
1081                 if not host.startswith('[') and host.find(':') >= 0:
1082                     # IPv6 URIs must be formatted like http://[::1]:80/...
1083                     host = '[' + host + ']'
1084                 r['_service_root'] = "{}://{}:{:d}/".format(
1085                     'https' if r['service_ssl_flag'] else 'http',
1086                     host,
1087                     r['service_port'])
1088
1089             _logger.debug(str(self._gateway_services))
1090             self._keep_services = [
1091                 ks for ks in self._gateway_services.values()
1092                 if not ks.get('service_type', '').startswith('gateway:')]
1093             self._writable_services = [ks for ks in self._keep_services
1094                                        if not ks.get('read_only')]
1095
1096             # For disk type services, max_replicas_per_service is 1
1097             # It is unknown (unlimited) for other service types.
1098             if self._any_nondisk_services(self._writable_services):
1099                 self.max_replicas_per_service = None
1100             else:
1101                 self.max_replicas_per_service = 1
1102
1103     def _service_weight(self, data_hash, service_uuid):
1104         """Compute the weight of a Keep service endpoint for a data
1105         block with a known hash.
1106
1107         The weight is md5(h + u) where u is the last 15 characters of
1108         the service endpoint's UUID.
1109         """
1110         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1111
1112     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1113         """Return an array of Keep service endpoints, in the order in
1114         which they should be probed when reading or writing data with
1115         the given hash+hints.
1116         """
1117         self.build_services_list(force_rebuild)
1118
1119         sorted_roots = []
1120         # Use the services indicated by the given +K@... remote
1121         # service hints, if any are present and can be resolved to a
1122         # URI.
1123         for hint in locator.hints:
1124             if hint.startswith('K@'):
1125                 if len(hint) == 7:
1126                     sorted_roots.append(
1127                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1128                 elif len(hint) == 29:
1129                     svc = self._gateway_services.get(hint[2:])
1130                     if svc:
1131                         sorted_roots.append(svc['_service_root'])
1132
1133         # Sort the available local services by weight (heaviest first)
1134         # for this locator, and return their service_roots (base URIs)
1135         # in that order.
1136         use_services = self._keep_services
1137         if need_writable:
1138             use_services = self._writable_services
1139         self.using_proxy = self._any_nondisk_services(use_services)
1140         sorted_roots.extend([
1141             svc['_service_root'] for svc in sorted(
1142                 use_services,
1143                 reverse=True,
1144                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1145         _logger.debug("{}: {}".format(locator, sorted_roots))
1146         return sorted_roots
1147
1148     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1149         # roots_map is a dictionary, mapping Keep service root strings
1150         # to KeepService objects.  Poll for Keep services, and add any
1151         # new ones to roots_map.  Return the current list of local
1152         # root strings.
1153         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1154         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1155         for root in local_roots:
1156             if root not in roots_map:
1157                 roots_map[root] = self.KeepService(
1158                     root, self._user_agent_pool,
1159                     upload_counter=self.upload_counter,
1160                     download_counter=self.download_counter,
1161                     headers=headers,
1162                     insecure=self.insecure)
1163         return local_roots
1164
1165     @staticmethod
1166     def _check_loop_result(result):
1167         # KeepClient RetryLoops should save results as a 2-tuple: the
1168         # actual result of the request, and the number of servers available
1169         # to receive the request this round.
1170         # This method returns True if there's a real result, False if
1171         # there are no more servers available, otherwise None.
1172         if isinstance(result, Exception):
1173             return None
1174         result, tried_server_count = result
1175         if (result is not None) and (result is not False):
1176             return True
1177         elif tried_server_count < 1:
1178             _logger.info("No more Keep services to try; giving up")
1179             return False
1180         else:
1181             return None
1182
1183     def get_from_cache(self, loc_s):
1184         """Fetch a block only if is in the cache, otherwise return None."""
1185         locator = KeepLocator(loc_s)
1186         slot = self.block_cache.get(locator.md5sum)
1187         if slot is not None and slot.ready.is_set():
1188             return slot.get()
1189         else:
1190             return None
1191
1192     def refresh_signature(self, loc):
1193         """Ask Keep to get the remote block and return its local signature"""
1194         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1195         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1196
1197     @retry.retry_method
1198     def head(self, loc_s, **kwargs):
1199         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1200
1201     @retry.retry_method
1202     def get(self, loc_s, **kwargs):
1203         return self._get_or_head(loc_s, method="GET", **kwargs)
1204
1205     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1206         """Get data from Keep.
1207
1208         This method fetches one or more blocks of data from Keep.  It
1209         sends a request each Keep service registered with the API
1210         server (or the proxy provided when this client was
1211         instantiated), then each service named in location hints, in
1212         sequence.  As soon as one service provides the data, it's
1213         returned.
1214
1215         Arguments:
1216         * loc_s: A string of one or more comma-separated locators to fetch.
1217           This method returns the concatenation of these blocks.
1218         * num_retries: The number of times to retry GET requests to
1219           *each* Keep server if it returns temporary failures, with
1220           exponential backoff.  Note that, in each loop, the method may try
1221           to fetch data from every available Keep service, along with any
1222           that are named in location hints in the locator.  The default value
1223           is set when the KeepClient is initialized.
1224         """
1225         if ',' in loc_s:
1226             return ''.join(self.get(x) for x in loc_s.split(','))
1227
1228         self.get_counter.add(1)
1229
1230         request_id = (request_id or
1231                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1232                       arvados.util.new_request_id())
1233         if headers is None:
1234             headers = {}
1235         headers['X-Request-Id'] = request_id
1236
1237         slot = None
1238         blob = None
1239         try:
1240             locator = KeepLocator(loc_s)
1241             if method == "GET":
1242                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1243                 if not first:
1244                     if prefetch:
1245                         # this is request for a prefetch, if it is
1246                         # already in flight, return immediately.
1247                         # clear 'slot' to prevent finally block from
1248                         # calling slot.set()
1249                         slot = None
1250                         return None
1251                     self.hits_counter.add(1)
1252                     blob = slot.get()
1253                     if blob is None:
1254                         raise arvados.errors.KeepReadError(
1255                             "failed to read {}".format(loc_s))
1256                     return blob
1257
1258             self.misses_counter.add(1)
1259
1260             # If the locator has hints specifying a prefix (indicating a
1261             # remote keepproxy) or the UUID of a local gateway service,
1262             # read data from the indicated service(s) instead of the usual
1263             # list of local disk services.
1264             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1265                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1266             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1267                                for hint in locator.hints if (
1268                                        hint.startswith('K@') and
1269                                        len(hint) == 29 and
1270                                        self._gateway_services.get(hint[2:])
1271                                        )])
1272             # Map root URLs to their KeepService objects.
1273             roots_map = {
1274                 root: self.KeepService(root, self._user_agent_pool,
1275                                        upload_counter=self.upload_counter,
1276                                        download_counter=self.download_counter,
1277                                        headers=headers,
1278                                        insecure=self.insecure)
1279                 for root in hint_roots
1280             }
1281
1282             # See #3147 for a discussion of the loop implementation.  Highlights:
1283             # * Refresh the list of Keep services after each failure, in case
1284             #   it's being updated.
1285             # * Retry until we succeed, we're out of retries, or every available
1286             #   service has returned permanent failure.
1287             sorted_roots = []
1288             roots_map = {}
1289             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1290                                    backoff_start=2)
1291             for tries_left in loop:
1292                 try:
1293                     sorted_roots = self.map_new_services(
1294                         roots_map, locator,
1295                         force_rebuild=(tries_left < num_retries),
1296                         need_writable=False,
1297                         headers=headers)
1298                 except Exception as error:
1299                     loop.save_result(error)
1300                     continue
1301
1302                 # Query KeepService objects that haven't returned
1303                 # permanent failure, in our specified shuffle order.
1304                 services_to_try = [roots_map[root]
1305                                    for root in sorted_roots
1306                                    if roots_map[root].usable()]
1307                 for keep_service in services_to_try:
1308                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1309                     if blob is not None:
1310                         break
1311                 loop.save_result((blob, len(services_to_try)))
1312
1313             # Always cache the result, then return it if we succeeded.
1314             if loop.success():
1315                 return blob
1316         finally:
1317             if slot is not None:
1318                 self.block_cache.set(slot, blob)
1319
1320         # Q: Including 403 is necessary for the Keep tests to continue
1321         # passing, but maybe they should expect KeepReadError instead?
1322         not_founds = sum(1 for key in sorted_roots
1323                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1324         service_errors = ((key, roots_map[key].last_result()['error'])
1325                           for key in sorted_roots)
1326         if not roots_map:
1327             raise arvados.errors.KeepReadError(
1328                 "[{}] failed to read {}: no Keep services available ({})".format(
1329                     request_id, loc_s, loop.last_result()))
1330         elif not_founds == len(sorted_roots):
1331             raise arvados.errors.NotFoundError(
1332                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1333         else:
1334             raise arvados.errors.KeepReadError(
1335                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1336
1337     @retry.retry_method
1338     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1339         """Save data in Keep.
1340
1341         This method will get a list of Keep services from the API server, and
1342         send the data to each one simultaneously in a new thread.  Once the
1343         uploads are finished, if enough copies are saved, this method returns
1344         the most recent HTTP response body.  If requests fail to upload
1345         enough copies, this method raises KeepWriteError.
1346
1347         Arguments:
1348         * data: The string of data to upload.
1349         * copies: The number of copies that the user requires be saved.
1350           Default 2.
1351         * num_retries: The number of times to retry PUT requests to
1352           *each* Keep server if it returns temporary failures, with
1353           exponential backoff.  The default value is set when the
1354           KeepClient is initialized.
1355         * classes: An optional list of storage class names where copies should
1356           be written.
1357         """
1358
1359         classes = classes or self._default_classes
1360
1361         if not isinstance(data, bytes):
1362             data = data.encode()
1363
1364         self.put_counter.add(1)
1365
1366         data_hash = hashlib.md5(data).hexdigest()
1367         loc_s = data_hash + '+' + str(len(data))
1368         if copies < 1:
1369             return loc_s
1370         locator = KeepLocator(loc_s)
1371
1372         request_id = (request_id or
1373                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1374                       arvados.util.new_request_id())
1375         headers = {
1376             'X-Request-Id': request_id,
1377             'X-Keep-Desired-Replicas': str(copies),
1378         }
1379         roots_map = {}
1380         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1381                                backoff_start=2)
1382         done_copies = 0
1383         done_classes = []
1384         for tries_left in loop:
1385             try:
1386                 sorted_roots = self.map_new_services(
1387                     roots_map, locator,
1388                     force_rebuild=(tries_left < num_retries),
1389                     need_writable=True,
1390                     headers=headers)
1391             except Exception as error:
1392                 loop.save_result(error)
1393                 continue
1394
1395             pending_classes = []
1396             if done_classes is not None:
1397                 pending_classes = list(set(classes) - set(done_classes))
1398             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1399                                                         data_hash=data_hash,
1400                                                         copies=copies - done_copies,
1401                                                         max_service_replicas=self.max_replicas_per_service,
1402                                                         timeout=self.current_timeout(num_retries - tries_left),
1403                                                         classes=pending_classes)
1404             for service_root, ks in [(root, roots_map[root])
1405                                      for root in sorted_roots]:
1406                 if ks.finished():
1407                     continue
1408                 writer_pool.add_task(ks, service_root)
1409             writer_pool.join()
1410             pool_copies, pool_classes = writer_pool.done()
1411             done_copies += pool_copies
1412             if (done_classes is not None) and (pool_classes is not None):
1413                 done_classes += pool_classes
1414                 loop.save_result(
1415                     (done_copies >= copies and set(done_classes) == set(classes),
1416                     writer_pool.total_task_nr))
1417             else:
1418                 # Old keepstore contacted without storage classes support:
1419                 # success is determined only by successful copies.
1420                 #
1421                 # Disable storage classes tracking from this point forward.
1422                 if not self._storage_classes_unsupported_warning:
1423                     self._storage_classes_unsupported_warning = True
1424                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1425                 done_classes = None
1426                 loop.save_result(
1427                     (done_copies >= copies, writer_pool.total_task_nr))
1428
1429         if loop.success():
1430             return writer_pool.response()
1431         if not roots_map:
1432             raise arvados.errors.KeepWriteError(
1433                 "[{}] failed to write {}: no Keep services available ({})".format(
1434                     request_id, data_hash, loop.last_result()))
1435         else:
1436             service_errors = ((key, roots_map[key].last_result()['error'])
1437                               for key in sorted_roots
1438                               if roots_map[key].last_result()['error'])
1439             raise arvados.errors.KeepWriteError(
1440                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1441                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1442
1443     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1444         """A stub for put().
1445
1446         This method is used in place of the real put() method when
1447         using local storage (see constructor's local_store argument).
1448
1449         copies and num_retries arguments are ignored: they are here
1450         only for the sake of offering the same call signature as
1451         put().
1452
1453         Data stored this way can be retrieved via local_store_get().
1454         """
1455         md5 = hashlib.md5(data).hexdigest()
1456         locator = '%s+%d' % (md5, len(data))
1457         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1458             f.write(data)
1459         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1460                   os.path.join(self.local_store, md5))
1461         return locator
1462
1463     def local_store_get(self, loc_s, num_retries=None):
1464         """Companion to local_store_put()."""
1465         try:
1466             locator = KeepLocator(loc_s)
1467         except ValueError:
1468             raise arvados.errors.NotFoundError(
1469                 "Invalid data locator: '%s'" % loc_s)
1470         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1471             return b''
1472         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1473             return f.read()
1474
1475     def local_store_head(self, loc_s, num_retries=None):
1476         """Companion to local_store_put()."""
1477         try:
1478             locator = KeepLocator(loc_s)
1479         except ValueError:
1480             raise arvados.errors.NotFoundError(
1481                 "Invalid data locator: '%s'" % loc_s)
1482         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1483             return True
1484         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1485             return True