18842: One more refactor
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34
35 if sys.version_info >= (3, 0):
36     from io import BytesIO
37 else:
38     from cStringIO import StringIO as BytesIO
39
40 import arvados
41 import arvados.config as config
42 import arvados.errors
43 import arvados.retry as retry
44 import arvados.util
45 import arvados.diskcache
46
47 _logger = logging.getLogger('arvados.keep')
48 global_client_object = None
49
50
51 # Monkey patch TCP constants when not available (apple). Values sourced from:
52 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
53 if sys.platform == 'darwin':
54     if not hasattr(socket, 'TCP_KEEPALIVE'):
55         socket.TCP_KEEPALIVE = 0x010
56     if not hasattr(socket, 'TCP_KEEPINTVL'):
57         socket.TCP_KEEPINTVL = 0x101
58     if not hasattr(socket, 'TCP_KEEPCNT'):
59         socket.TCP_KEEPCNT = 0x102
60
61
62 class KeepLocator(object):
63     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
64     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
65
66     def __init__(self, locator_str):
67         self.hints = []
68         self._perm_sig = None
69         self._perm_expiry = None
70         pieces = iter(locator_str.split('+'))
71         self.md5sum = next(pieces)
72         try:
73             self.size = int(next(pieces))
74         except StopIteration:
75             self.size = None
76         for hint in pieces:
77             if self.HINT_RE.match(hint) is None:
78                 raise ValueError("invalid hint format: {}".format(hint))
79             elif hint.startswith('A'):
80                 self.parse_permission_hint(hint)
81             else:
82                 self.hints.append(hint)
83
84     def __str__(self):
85         return '+'.join(
86             native_str(s)
87             for s in [self.md5sum, self.size,
88                       self.permission_hint()] + self.hints
89             if s is not None)
90
91     def stripped(self):
92         if self.size is not None:
93             return "%s+%i" % (self.md5sum, self.size)
94         else:
95             return self.md5sum
96
97     def _make_hex_prop(name, length):
98         # Build and return a new property with the given name that
99         # must be a hex string of the given length.
100         data_name = '_{}'.format(name)
101         def getter(self):
102             return getattr(self, data_name)
103         def setter(self, hex_str):
104             if not arvados.util.is_hex(hex_str, length):
105                 raise ValueError("{} is not a {}-digit hex string: {!r}".
106                                  format(name, length, hex_str))
107             setattr(self, data_name, hex_str)
108         return property(getter, setter)
109
110     md5sum = _make_hex_prop('md5sum', 32)
111     perm_sig = _make_hex_prop('perm_sig', 40)
112
113     @property
114     def perm_expiry(self):
115         return self._perm_expiry
116
117     @perm_expiry.setter
118     def perm_expiry(self, value):
119         if not arvados.util.is_hex(value, 1, 8):
120             raise ValueError(
121                 "permission timestamp must be a hex Unix timestamp: {}".
122                 format(value))
123         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
124
125     def permission_hint(self):
126         data = [self.perm_sig, self.perm_expiry]
127         if None in data:
128             return None
129         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
130         return "A{}@{:08x}".format(*data)
131
132     def parse_permission_hint(self, s):
133         try:
134             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
135         except IndexError:
136             raise ValueError("bad permission hint {}".format(s))
137
138     def permission_expired(self, as_of_dt=None):
139         if self.perm_expiry is None:
140             return False
141         elif as_of_dt is None:
142             as_of_dt = datetime.datetime.now()
143         return self.perm_expiry <= as_of_dt
144
145
146 class Keep(object):
147     """Simple interface to a global KeepClient object.
148
149     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
150     own API client.  The global KeepClient will build an API client from the
151     current Arvados configuration, which may not match the one you built.
152     """
153     _last_key = None
154
155     @classmethod
156     def global_client_object(cls):
157         global global_client_object
158         # Previously, KeepClient would change its behavior at runtime based
159         # on these configuration settings.  We simulate that behavior here
160         # by checking the values and returning a new KeepClient if any of
161         # them have changed.
162         key = (config.get('ARVADOS_API_HOST'),
163                config.get('ARVADOS_API_TOKEN'),
164                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
165                config.get('ARVADOS_KEEP_PROXY'),
166                os.environ.get('KEEP_LOCAL_STORE'))
167         if (global_client_object is None) or (cls._last_key != key):
168             global_client_object = KeepClient()
169             cls._last_key = key
170         return global_client_object
171
172     @staticmethod
173     def get(locator, **kwargs):
174         return Keep.global_client_object().get(locator, **kwargs)
175
176     @staticmethod
177     def put(data, **kwargs):
178         return Keep.global_client_object().put(data, **kwargs)
179
180 class KeepBlockCache(object):
181     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
182         self.cache_max = cache_max
183         self._cache = []
184         self._cache_lock = threading.Lock()
185         self._max_slots = max_slots
186         self._disk_cache = disk_cache
187         self._disk_cache_dir = disk_cache_dir
188
189         if self._disk_cache and self._disk_cache_dir is None:
190             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
191             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
192
193         if self._max_slots == 0:
194             if self._disk_cache:
195                 # default max slots to half of maximum file handles
196                 # NOFILE typically defaults to 1024 on Linux so this
197                 # will be 512 slots.
198                 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
199             else:
200                 # RAM cache slots
201                 self._max_slots = 512
202
203         if self.cache_max == 0:
204             if self._disk_cache:
205                 fs = os.statvfs(self._disk_cache_dir)
206                 # Calculation of available space incorporates existing cache usage
207                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
210                 # pick smallest of:
211                 # 10% of total disk size
212                 # 25% of available space
213                 # max_slots * 64 MiB
214                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
215             else:
216                 # 256 MiB in RAM
217                 self.cache_max = (256 * 1024 * 1024)
218
219         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
220
221         if self._disk_cache:
222             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
223             self.cap_cache()
224
225
226     class CacheSlot(object):
227         __slots__ = ("locator", "ready", "content")
228
229         def __init__(self, locator):
230             self.locator = locator
231             self.ready = threading.Event()
232             self.content = None
233
234         def get(self):
235             self.ready.wait()
236             return self.content
237
238         def set(self, value):
239             self.content = value
240             self.ready.set()
241
242         def size(self):
243             if self.content is None:
244                 return 0
245             else:
246                 return len(self.content)
247
248         def evict(self):
249             return True
250
251     def cap_cache(self):
252         '''Cap the cache size to self.cache_max'''
253         with self._cache_lock:
254             # Select all slots except those where ready.is_set() and content is
255             # None (that means there was an error reading the block).
256             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
257             sm = sum([slot.size() for slot in self._cache])
258             while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
259                 for i in range(len(self._cache)-1, -1, -1):
260                     # start from the back, find a slot that is a candidate to evict
261                     if self._cache[i].ready.is_set():
262                         sz = self._cache[i].size()
263
264                         # If evict returns false it means the
265                         # underlying disk cache couldn't lock the file
266                         # for deletion because another process was using
267                         # it. Don't count it as reducing the amount
268                         # of data in the cache, find something else to
269                         # throw out.
270                         if self._cache[i].evict():
271                             sm -= sz
272
273                         # either way we forget about it.  either the
274                         # other process will delete it, or if we need
275                         # it again and it is still there, we'll find
276                         # it on disk.
277                         del self._cache[i]
278                         break
279
280     def _get(self, locator):
281         # Test if the locator is already in the cache
282         for i in range(0, len(self._cache)):
283             if self._cache[i].locator == locator:
284                 n = self._cache[i]
285                 if i != 0:
286                     # move it to the front
287                     del self._cache[i]
288                     self._cache.insert(0, n)
289                 return n
290         if self._disk_cache:
291             # see if it exists on disk
292             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
293             if n is not None:
294                 self._cache.insert(0, n)
295                 return n
296         return None
297
298     def get(self, locator):
299         with self._cache_lock:
300             return self._get(locator)
301
302     def reserve_cache(self, locator):
303         '''Reserve a cache slot for the specified locator,
304         or return the existing slot.'''
305         with self._cache_lock:
306             n = self._get(locator)
307             if n:
308                 return n, False
309             else:
310                 # Add a new cache slot for the locator
311                 if self._disk_cache:
312                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
313                 else:
314                     n = KeepBlockCache.CacheSlot(locator)
315                 self._cache.insert(0, n)
316                 return n, True
317
318     def set(self, slot, blob):
319         try:
320             slot.set(blob)
321             return
322         except OSError as e:
323             if e.errno == errno.ENOMEM:
324                 # Reduce max slots to current - 4, cap cache and retry
325                 with self._cache_lock:
326                     self._max_slots = max(4, len(self._cache) - 4)
327             elif e.errno == errno.ENOSPC:
328                 # Reduce disk max space to current - 256 MiB, cap cache and retry
329                 with self._cache_lock:
330                     sm = sum([st.size() for st in self._cache])
331                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
332             elif e.errno == errno.ENODEV:
333                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
334         except Exception as e:
335             pass
336         finally:
337             # Check if we should evict things from the cache.  Either
338             # because we added a new thing or there was an error and
339             # we possibly adjusted the limits down, so we might need
340             # to push something out.
341             self.cap_cache()
342
343         try:
344             # Only gets here if there was an error the first time. The
345             # exception handler adjusts limits downward in some cases
346             # to free up resources, which would make the operation
347             # succeed.
348             slot.set(blob)
349         except Exception as e:
350             # It failed again.  Give up.
351             slot.set(None)
352             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
353
354         self.cap_cache()
355
356 class Counter(object):
357     def __init__(self, v=0):
358         self._lk = threading.Lock()
359         self._val = v
360
361     def add(self, v):
362         with self._lk:
363             self._val += v
364
365     def get(self):
366         with self._lk:
367             return self._val
368
369
370 class KeepClient(object):
371
372     # Default Keep server connection timeout:  2 seconds
373     # Default Keep server read timeout:       256 seconds
374     # Default Keep server bandwidth minimum:  32768 bytes per second
375     # Default Keep proxy connection timeout:  20 seconds
376     # Default Keep proxy read timeout:        256 seconds
377     # Default Keep proxy bandwidth minimum:   32768 bytes per second
378     DEFAULT_TIMEOUT = (2, 256, 32768)
379     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
380
381
382     class KeepService(object):
383         """Make requests to a single Keep service, and track results.
384
385         A KeepService is intended to last long enough to perform one
386         transaction (GET or PUT) against one Keep service. This can
387         involve calling either get() or put() multiple times in order
388         to retry after transient failures. However, calling both get()
389         and put() on a single instance -- or using the same instance
390         to access two different Keep services -- will not produce
391         sensible behavior.
392         """
393
394         HTTP_ERRORS = (
395             socket.error,
396             ssl.SSLError,
397             arvados.errors.HttpError,
398         )
399
400         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
401                      upload_counter=None,
402                      download_counter=None,
403                      headers={},
404                      insecure=False):
405             self.root = root
406             self._user_agent_pool = user_agent_pool
407             self._result = {'error': None}
408             self._usable = True
409             self._session = None
410             self._socket = None
411             self.get_headers = {'Accept': 'application/octet-stream'}
412             self.get_headers.update(headers)
413             self.put_headers = headers
414             self.upload_counter = upload_counter
415             self.download_counter = download_counter
416             self.insecure = insecure
417
418         def usable(self):
419             """Is it worth attempting a request?"""
420             return self._usable
421
422         def finished(self):
423             """Did the request succeed or encounter permanent failure?"""
424             return self._result['error'] == False or not self._usable
425
426         def last_result(self):
427             return self._result
428
429         def _get_user_agent(self):
430             try:
431                 return self._user_agent_pool.get(block=False)
432             except queue.Empty:
433                 return pycurl.Curl()
434
435         def _put_user_agent(self, ua):
436             try:
437                 ua.reset()
438                 self._user_agent_pool.put(ua, block=False)
439             except:
440                 ua.close()
441
442         def _socket_open(self, *args, **kwargs):
443             if len(args) + len(kwargs) == 2:
444                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
445             else:
446                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
447
448         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
449             return self._socket_open_pycurl_7_21_5(
450                 purpose=None,
451                 address=collections.namedtuple(
452                     'Address', ['family', 'socktype', 'protocol', 'addr'],
453                 )(family, socktype, protocol, address))
454
455         def _socket_open_pycurl_7_21_5(self, purpose, address):
456             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
457             s = socket.socket(address.family, address.socktype, address.protocol)
458             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
459             # Will throw invalid protocol error on mac. This test prevents that.
460             if hasattr(socket, 'TCP_KEEPIDLE'):
461                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
462             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
463             self._socket = s
464             return s
465
466         def get(self, locator, method="GET", timeout=None):
467             # locator is a KeepLocator object.
468             url = self.root + str(locator)
469             _logger.debug("Request: %s %s", method, url)
470             curl = self._get_user_agent()
471             ok = None
472             try:
473                 with timer.Timer() as t:
474                     self._headers = {}
475                     response_body = BytesIO()
476                     curl.setopt(pycurl.NOSIGNAL, 1)
477                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
478                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
479                     curl.setopt(pycurl.URL, url.encode('utf-8'))
480                     curl.setopt(pycurl.HTTPHEADER, [
481                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
482                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
483                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
484                     if self.insecure:
485                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
486                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
487                     else:
488                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
489                     if method == "HEAD":
490                         curl.setopt(pycurl.NOBODY, True)
491                     self._setcurltimeouts(curl, timeout, method=="HEAD")
492
493                     try:
494                         curl.perform()
495                     except Exception as e:
496                         raise arvados.errors.HttpError(0, str(e))
497                     finally:
498                         if self._socket:
499                             self._socket.close()
500                             self._socket = None
501                     self._result = {
502                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
503                         'body': response_body.getvalue(),
504                         'headers': self._headers,
505                         'error': False,
506                     }
507
508                 ok = retry.check_http_response_success(self._result['status_code'])
509                 if not ok:
510                     self._result['error'] = arvados.errors.HttpError(
511                         self._result['status_code'],
512                         self._headers.get('x-status-line', 'Error'))
513             except self.HTTP_ERRORS as e:
514                 self._result = {
515                     'error': e,
516                 }
517             self._usable = ok != False
518             if self._result.get('status_code', None):
519                 # The client worked well enough to get an HTTP status
520                 # code, so presumably any problems are just on the
521                 # server side and it's OK to reuse the client.
522                 self._put_user_agent(curl)
523             else:
524                 # Don't return this client to the pool, in case it's
525                 # broken.
526                 curl.close()
527             if not ok:
528                 _logger.debug("Request fail: GET %s => %s: %s",
529                               url, type(self._result['error']), str(self._result['error']))
530                 return None
531             if method == "HEAD":
532                 _logger.info("HEAD %s: %s bytes",
533                          self._result['status_code'],
534                          self._result.get('content-length'))
535                 if self._result['headers'].get('x-keep-locator'):
536                     # This is a response to a remote block copy request, return
537                     # the local copy block locator.
538                     return self._result['headers'].get('x-keep-locator')
539                 return True
540
541             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
542                          self._result['status_code'],
543                          len(self._result['body']),
544                          t.msecs,
545                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
546
547             if self.download_counter:
548                 self.download_counter.add(len(self._result['body']))
549             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
550             if resp_md5 != locator.md5sum:
551                 _logger.warning("Checksum fail: md5(%s) = %s",
552                                 url, resp_md5)
553                 self._result['error'] = arvados.errors.HttpError(
554                     0, 'Checksum fail')
555                 return None
556             return self._result['body']
557
558         def put(self, hash_s, body, timeout=None, headers={}):
559             put_headers = copy.copy(self.put_headers)
560             put_headers.update(headers)
561             url = self.root + hash_s
562             _logger.debug("Request: PUT %s", url)
563             curl = self._get_user_agent()
564             ok = None
565             try:
566                 with timer.Timer() as t:
567                     self._headers = {}
568                     body_reader = BytesIO(body)
569                     response_body = BytesIO()
570                     curl.setopt(pycurl.NOSIGNAL, 1)
571                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
572                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
573                     curl.setopt(pycurl.URL, url.encode('utf-8'))
574                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
575                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
576                     # response) instead of sending the request body immediately.
577                     # This allows the server to reject the request if the request
578                     # is invalid or the server is read-only, without waiting for
579                     # the client to send the entire block.
580                     curl.setopt(pycurl.UPLOAD, True)
581                     curl.setopt(pycurl.INFILESIZE, len(body))
582                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
583                     curl.setopt(pycurl.HTTPHEADER, [
584                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
585                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
586                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
587                     if self.insecure:
588                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
589                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
590                     else:
591                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
592                     self._setcurltimeouts(curl, timeout)
593                     try:
594                         curl.perform()
595                     except Exception as e:
596                         raise arvados.errors.HttpError(0, str(e))
597                     finally:
598                         if self._socket:
599                             self._socket.close()
600                             self._socket = None
601                     self._result = {
602                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
603                         'body': response_body.getvalue().decode('utf-8'),
604                         'headers': self._headers,
605                         'error': False,
606                     }
607                 ok = retry.check_http_response_success(self._result['status_code'])
608                 if not ok:
609                     self._result['error'] = arvados.errors.HttpError(
610                         self._result['status_code'],
611                         self._headers.get('x-status-line', 'Error'))
612             except self.HTTP_ERRORS as e:
613                 self._result = {
614                     'error': e,
615                 }
616             self._usable = ok != False # still usable if ok is True or None
617             if self._result.get('status_code', None):
618                 # Client is functional. See comment in get().
619                 self._put_user_agent(curl)
620             else:
621                 curl.close()
622             if not ok:
623                 _logger.debug("Request fail: PUT %s => %s: %s",
624                               url, type(self._result['error']), str(self._result['error']))
625                 return False
626             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
627                          self._result['status_code'],
628                          len(body),
629                          t.msecs,
630                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
631             if self.upload_counter:
632                 self.upload_counter.add(len(body))
633             return True
634
635         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
636             if not timeouts:
637                 return
638             elif isinstance(timeouts, tuple):
639                 if len(timeouts) == 2:
640                     conn_t, xfer_t = timeouts
641                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
642                 else:
643                     conn_t, xfer_t, bandwidth_bps = timeouts
644             else:
645                 conn_t, xfer_t = (timeouts, timeouts)
646                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
647             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
648             if not ignore_bandwidth:
649                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
650                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
651
652         def _headerfunction(self, header_line):
653             if isinstance(header_line, bytes):
654                 header_line = header_line.decode('iso-8859-1')
655             if ':' in header_line:
656                 name, value = header_line.split(':', 1)
657                 name = name.strip().lower()
658                 value = value.strip()
659             elif self._headers:
660                 name = self._lastheadername
661                 value = self._headers[name] + ' ' + header_line.strip()
662             elif header_line.startswith('HTTP/'):
663                 name = 'x-status-line'
664                 value = header_line
665             else:
666                 _logger.error("Unexpected header line: %s", header_line)
667                 return
668             self._lastheadername = name
669             self._headers[name] = value
670             # Returning None implies all bytes were written
671
672
673     class KeepWriterQueue(queue.Queue):
674         def __init__(self, copies, classes=[]):
675             queue.Queue.__init__(self) # Old-style superclass
676             self.wanted_copies = copies
677             self.wanted_storage_classes = classes
678             self.successful_copies = 0
679             self.confirmed_storage_classes = {}
680             self.response = None
681             self.storage_classes_tracking = True
682             self.queue_data_lock = threading.RLock()
683             self.pending_tries = max(copies, len(classes))
684             self.pending_tries_notification = threading.Condition()
685
686         def write_success(self, response, replicas_nr, classes_confirmed):
687             with self.queue_data_lock:
688                 self.successful_copies += replicas_nr
689                 if classes_confirmed is None:
690                     self.storage_classes_tracking = False
691                 elif self.storage_classes_tracking:
692                     for st_class, st_copies in classes_confirmed.items():
693                         try:
694                             self.confirmed_storage_classes[st_class] += st_copies
695                         except KeyError:
696                             self.confirmed_storage_classes[st_class] = st_copies
697                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
698                 self.response = response
699             with self.pending_tries_notification:
700                 self.pending_tries_notification.notify_all()
701
702         def write_fail(self, ks):
703             with self.pending_tries_notification:
704                 self.pending_tries += 1
705                 self.pending_tries_notification.notify()
706
707         def pending_copies(self):
708             with self.queue_data_lock:
709                 return self.wanted_copies - self.successful_copies
710
711         def satisfied_classes(self):
712             with self.queue_data_lock:
713                 if not self.storage_classes_tracking:
714                     # Notifies disabled storage classes expectation to
715                     # the outer loop.
716                     return None
717             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
718
719         def pending_classes(self):
720             with self.queue_data_lock:
721                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
722                     return []
723                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
724                 for st_class, st_copies in self.confirmed_storage_classes.items():
725                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
726                         unsatisfied_classes.remove(st_class)
727                 return unsatisfied_classes
728
729         def get_next_task(self):
730             with self.pending_tries_notification:
731                 while True:
732                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
733                         # This notify_all() is unnecessary --
734                         # write_success() already called notify_all()
735                         # when pending<1 became true, so it's not
736                         # possible for any other thread to be in
737                         # wait() now -- but it's cheap insurance
738                         # against deadlock so we do it anyway:
739                         self.pending_tries_notification.notify_all()
740                         # Drain the queue and then raise Queue.Empty
741                         while True:
742                             self.get_nowait()
743                             self.task_done()
744                     elif self.pending_tries > 0:
745                         service, service_root = self.get_nowait()
746                         if service.finished():
747                             self.task_done()
748                             continue
749                         self.pending_tries -= 1
750                         return service, service_root
751                     elif self.empty():
752                         self.pending_tries_notification.notify_all()
753                         raise queue.Empty
754                     else:
755                         self.pending_tries_notification.wait()
756
757
758     class KeepWriterThreadPool(object):
759         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
760             self.total_task_nr = 0
761             if (not max_service_replicas) or (max_service_replicas >= copies):
762                 num_threads = 1
763             else:
764                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
765             _logger.debug("Pool max threads is %d", num_threads)
766             self.workers = []
767             self.queue = KeepClient.KeepWriterQueue(copies, classes)
768             # Create workers
769             for _ in range(num_threads):
770                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
771                 self.workers.append(w)
772
773         def add_task(self, ks, service_root):
774             self.queue.put((ks, service_root))
775             self.total_task_nr += 1
776
777         def done(self):
778             return self.queue.successful_copies, self.queue.satisfied_classes()
779
780         def join(self):
781             # Start workers
782             for worker in self.workers:
783                 worker.start()
784             # Wait for finished work
785             self.queue.join()
786
787         def response(self):
788             return self.queue.response
789
790
791     class KeepWriterThread(threading.Thread):
792         class TaskFailed(RuntimeError): pass
793
794         def __init__(self, queue, data, data_hash, timeout=None):
795             super(KeepClient.KeepWriterThread, self).__init__()
796             self.timeout = timeout
797             self.queue = queue
798             self.data = data
799             self.data_hash = data_hash
800             self.daemon = True
801
802         def run(self):
803             while True:
804                 try:
805                     service, service_root = self.queue.get_next_task()
806                 except queue.Empty:
807                     return
808                 try:
809                     locator, copies, classes = self.do_task(service, service_root)
810                 except Exception as e:
811                     if not isinstance(e, self.TaskFailed):
812                         _logger.exception("Exception in KeepWriterThread")
813                     self.queue.write_fail(service)
814                 else:
815                     self.queue.write_success(locator, copies, classes)
816                 finally:
817                     self.queue.task_done()
818
819         def do_task(self, service, service_root):
820             classes = self.queue.pending_classes()
821             headers = {}
822             if len(classes) > 0:
823                 classes.sort()
824                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
825             success = bool(service.put(self.data_hash,
826                                         self.data,
827                                         timeout=self.timeout,
828                                         headers=headers))
829             result = service.last_result()
830
831             if not success:
832                 if result.get('status_code'):
833                     _logger.debug("Request fail: PUT %s => %s %s",
834                                   self.data_hash,
835                                   result.get('status_code'),
836                                   result.get('body'))
837                 raise self.TaskFailed()
838
839             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
840                           str(threading.current_thread()),
841                           self.data_hash,
842                           len(self.data),
843                           service_root)
844             try:
845                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
846             except (KeyError, ValueError):
847                 replicas_stored = 1
848
849             classes_confirmed = {}
850             try:
851                 scch = result['headers']['x-keep-storage-classes-confirmed']
852                 for confirmation in scch.replace(' ', '').split(','):
853                     if '=' in confirmation:
854                         stored_class, stored_copies = confirmation.split('=')[:2]
855                         classes_confirmed[stored_class] = int(stored_copies)
856             except (KeyError, ValueError):
857                 # Storage classes confirmed header missing or corrupt
858                 classes_confirmed = None
859
860             return result['body'].strip(), replicas_stored, classes_confirmed
861
862
863     def __init__(self, api_client=None, proxy=None,
864                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
865                  api_token=None, local_store=None, block_cache=None,
866                  num_retries=0, session=None):
867         """Initialize a new KeepClient.
868
869         Arguments:
870         :api_client:
871           The API client to use to find Keep services.  If not
872           provided, KeepClient will build one from available Arvados
873           configuration.
874
875         :proxy:
876           If specified, this KeepClient will send requests to this Keep
877           proxy.  Otherwise, KeepClient will fall back to the setting of the
878           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
879           If you want to KeepClient does not use a proxy, pass in an empty
880           string.
881
882         :timeout:
883           The initial timeout (in seconds) for HTTP requests to Keep
884           non-proxy servers.  A tuple of three floats is interpreted as
885           (connection_timeout, read_timeout, minimum_bandwidth). A connection
886           will be aborted if the average traffic rate falls below
887           minimum_bandwidth bytes per second over an interval of read_timeout
888           seconds. Because timeouts are often a result of transient server
889           load, the actual connection timeout will be increased by a factor
890           of two on each retry.
891           Default: (2, 256, 32768).
892
893         :proxy_timeout:
894           The initial timeout (in seconds) for HTTP requests to
895           Keep proxies. A tuple of three floats is interpreted as
896           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
897           described above for adjusting connection timeouts on retry also
898           applies.
899           Default: (20, 256, 32768).
900
901         :api_token:
902           If you're not using an API client, but only talking
903           directly to a Keep proxy, this parameter specifies an API token
904           to authenticate Keep requests.  It is an error to specify both
905           api_client and api_token.  If you specify neither, KeepClient
906           will use one available from the Arvados configuration.
907
908         :local_store:
909           If specified, this KeepClient will bypass Keep
910           services, and save data to the named directory.  If unspecified,
911           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
912           environment variable.  If you want to ensure KeepClient does not
913           use local storage, pass in an empty string.  This is primarily
914           intended to mock a server for testing.
915
916         :num_retries:
917           The default number of times to retry failed requests.
918           This will be used as the default num_retries value when get() and
919           put() are called.  Default 0.
920         """
921         self.lock = threading.Lock()
922         if proxy is None:
923             if config.get('ARVADOS_KEEP_SERVICES'):
924                 proxy = config.get('ARVADOS_KEEP_SERVICES')
925             else:
926                 proxy = config.get('ARVADOS_KEEP_PROXY')
927         if api_token is None:
928             if api_client is None:
929                 api_token = config.get('ARVADOS_API_TOKEN')
930             else:
931                 api_token = api_client.api_token
932         elif api_client is not None:
933             raise ValueError(
934                 "can't build KeepClient with both API client and token")
935         if local_store is None:
936             local_store = os.environ.get('KEEP_LOCAL_STORE')
937
938         if api_client is None:
939             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
940         else:
941             self.insecure = api_client.insecure
942
943         self.block_cache = block_cache if block_cache else KeepBlockCache()
944         self.timeout = timeout
945         self.proxy_timeout = proxy_timeout
946         self._user_agent_pool = queue.LifoQueue()
947         self.upload_counter = Counter()
948         self.download_counter = Counter()
949         self.put_counter = Counter()
950         self.get_counter = Counter()
951         self.hits_counter = Counter()
952         self.misses_counter = Counter()
953         self._storage_classes_unsupported_warning = False
954         self._default_classes = []
955
956         if local_store:
957             self.local_store = local_store
958             self.head = self.local_store_head
959             self.get = self.local_store_get
960             self.put = self.local_store_put
961         else:
962             self.num_retries = num_retries
963             self.max_replicas_per_service = None
964             if proxy:
965                 proxy_uris = proxy.split()
966                 for i in range(len(proxy_uris)):
967                     if not proxy_uris[i].endswith('/'):
968                         proxy_uris[i] += '/'
969                     # URL validation
970                     url = urllib.parse.urlparse(proxy_uris[i])
971                     if not (url.scheme and url.netloc):
972                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
973                 self.api_token = api_token
974                 self._gateway_services = {}
975                 self._keep_services = [{
976                     'uuid': "00000-bi6l4-%015d" % idx,
977                     'service_type': 'proxy',
978                     '_service_root': uri,
979                     } for idx, uri in enumerate(proxy_uris)]
980                 self._writable_services = self._keep_services
981                 self.using_proxy = True
982                 self._static_services_list = True
983             else:
984                 # It's important to avoid instantiating an API client
985                 # unless we actually need one, for testing's sake.
986                 if api_client is None:
987                     api_client = arvados.api('v1')
988                 self.api_client = api_client
989                 self.api_token = api_client.api_token
990                 self._gateway_services = {}
991                 self._keep_services = None
992                 self._writable_services = None
993                 self.using_proxy = None
994                 self._static_services_list = False
995                 try:
996                     self._default_classes = [
997                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
998                 except KeyError:
999                     # We're talking to an old cluster
1000                     pass
1001
1002     def current_timeout(self, attempt_number):
1003         """Return the appropriate timeout to use for this client.
1004
1005         The proxy timeout setting if the backend service is currently a proxy,
1006         the regular timeout setting otherwise.  The `attempt_number` indicates
1007         how many times the operation has been tried already (starting from 0
1008         for the first try), and scales the connection timeout portion of the
1009         return value accordingly.
1010
1011         """
1012         # TODO(twp): the timeout should be a property of a
1013         # KeepService, not a KeepClient. See #4488.
1014         t = self.proxy_timeout if self.using_proxy else self.timeout
1015         if len(t) == 2:
1016             return (t[0] * (1 << attempt_number), t[1])
1017         else:
1018             return (t[0] * (1 << attempt_number), t[1], t[2])
1019     def _any_nondisk_services(self, service_list):
1020         return any(ks.get('service_type', 'disk') != 'disk'
1021                    for ks in service_list)
1022
1023     def build_services_list(self, force_rebuild=False):
1024         if (self._static_services_list or
1025               (self._keep_services and not force_rebuild)):
1026             return
1027         with self.lock:
1028             try:
1029                 keep_services = self.api_client.keep_services().accessible()
1030             except Exception:  # API server predates Keep services.
1031                 keep_services = self.api_client.keep_disks().list()
1032
1033             # Gateway services are only used when specified by UUID,
1034             # so there's nothing to gain by filtering them by
1035             # service_type.
1036             self._gateway_services = {ks['uuid']: ks for ks in
1037                                       keep_services.execute()['items']}
1038             if not self._gateway_services:
1039                 raise arvados.errors.NoKeepServersError()
1040
1041             # Precompute the base URI for each service.
1042             for r in self._gateway_services.values():
1043                 host = r['service_host']
1044                 if not host.startswith('[') and host.find(':') >= 0:
1045                     # IPv6 URIs must be formatted like http://[::1]:80/...
1046                     host = '[' + host + ']'
1047                 r['_service_root'] = "{}://{}:{:d}/".format(
1048                     'https' if r['service_ssl_flag'] else 'http',
1049                     host,
1050                     r['service_port'])
1051
1052             _logger.debug(str(self._gateway_services))
1053             self._keep_services = [
1054                 ks for ks in self._gateway_services.values()
1055                 if not ks.get('service_type', '').startswith('gateway:')]
1056             self._writable_services = [ks for ks in self._keep_services
1057                                        if not ks.get('read_only')]
1058
1059             # For disk type services, max_replicas_per_service is 1
1060             # It is unknown (unlimited) for other service types.
1061             if self._any_nondisk_services(self._writable_services):
1062                 self.max_replicas_per_service = None
1063             else:
1064                 self.max_replicas_per_service = 1
1065
1066     def _service_weight(self, data_hash, service_uuid):
1067         """Compute the weight of a Keep service endpoint for a data
1068         block with a known hash.
1069
1070         The weight is md5(h + u) where u is the last 15 characters of
1071         the service endpoint's UUID.
1072         """
1073         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1074
1075     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1076         """Return an array of Keep service endpoints, in the order in
1077         which they should be probed when reading or writing data with
1078         the given hash+hints.
1079         """
1080         self.build_services_list(force_rebuild)
1081
1082         sorted_roots = []
1083         # Use the services indicated by the given +K@... remote
1084         # service hints, if any are present and can be resolved to a
1085         # URI.
1086         for hint in locator.hints:
1087             if hint.startswith('K@'):
1088                 if len(hint) == 7:
1089                     sorted_roots.append(
1090                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1091                 elif len(hint) == 29:
1092                     svc = self._gateway_services.get(hint[2:])
1093                     if svc:
1094                         sorted_roots.append(svc['_service_root'])
1095
1096         # Sort the available local services by weight (heaviest first)
1097         # for this locator, and return their service_roots (base URIs)
1098         # in that order.
1099         use_services = self._keep_services
1100         if need_writable:
1101             use_services = self._writable_services
1102         self.using_proxy = self._any_nondisk_services(use_services)
1103         sorted_roots.extend([
1104             svc['_service_root'] for svc in sorted(
1105                 use_services,
1106                 reverse=True,
1107                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1108         _logger.debug("{}: {}".format(locator, sorted_roots))
1109         return sorted_roots
1110
1111     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1112         # roots_map is a dictionary, mapping Keep service root strings
1113         # to KeepService objects.  Poll for Keep services, and add any
1114         # new ones to roots_map.  Return the current list of local
1115         # root strings.
1116         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1117         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1118         for root in local_roots:
1119             if root not in roots_map:
1120                 roots_map[root] = self.KeepService(
1121                     root, self._user_agent_pool,
1122                     upload_counter=self.upload_counter,
1123                     download_counter=self.download_counter,
1124                     headers=headers,
1125                     insecure=self.insecure)
1126         return local_roots
1127
1128     @staticmethod
1129     def _check_loop_result(result):
1130         # KeepClient RetryLoops should save results as a 2-tuple: the
1131         # actual result of the request, and the number of servers available
1132         # to receive the request this round.
1133         # This method returns True if there's a real result, False if
1134         # there are no more servers available, otherwise None.
1135         if isinstance(result, Exception):
1136             return None
1137         result, tried_server_count = result
1138         if (result is not None) and (result is not False):
1139             return True
1140         elif tried_server_count < 1:
1141             _logger.info("No more Keep services to try; giving up")
1142             return False
1143         else:
1144             return None
1145
1146     def get_from_cache(self, loc_s):
1147         """Fetch a block only if is in the cache, otherwise return None."""
1148         locator = KeepLocator(loc_s)
1149         slot = self.block_cache.get(locator.md5sum)
1150         if slot is not None and slot.ready.is_set():
1151             return slot.get()
1152         else:
1153             return None
1154
1155     def refresh_signature(self, loc):
1156         """Ask Keep to get the remote block and return its local signature"""
1157         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1158         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1159
1160     @retry.retry_method
1161     def head(self, loc_s, **kwargs):
1162         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1163
1164     @retry.retry_method
1165     def get(self, loc_s, **kwargs):
1166         return self._get_or_head(loc_s, method="GET", **kwargs)
1167
1168     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1169         """Get data from Keep.
1170
1171         This method fetches one or more blocks of data from Keep.  It
1172         sends a request each Keep service registered with the API
1173         server (or the proxy provided when this client was
1174         instantiated), then each service named in location hints, in
1175         sequence.  As soon as one service provides the data, it's
1176         returned.
1177
1178         Arguments:
1179         * loc_s: A string of one or more comma-separated locators to fetch.
1180           This method returns the concatenation of these blocks.
1181         * num_retries: The number of times to retry GET requests to
1182           *each* Keep server if it returns temporary failures, with
1183           exponential backoff.  Note that, in each loop, the method may try
1184           to fetch data from every available Keep service, along with any
1185           that are named in location hints in the locator.  The default value
1186           is set when the KeepClient is initialized.
1187         """
1188         if ',' in loc_s:
1189             return ''.join(self.get(x) for x in loc_s.split(','))
1190
1191         self.get_counter.add(1)
1192
1193         request_id = (request_id or
1194                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1195                       arvados.util.new_request_id())
1196         if headers is None:
1197             headers = {}
1198         headers['X-Request-Id'] = request_id
1199
1200         slot = None
1201         blob = None
1202         try:
1203             locator = KeepLocator(loc_s)
1204             if method == "GET":
1205                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1206                 if not first:
1207                     if prefetch:
1208                         # this is request for a prefetch, if it is
1209                         # already in flight, return immediately.
1210                         # clear 'slot' to prevent finally block from
1211                         # calling slot.set()
1212                         slot = None
1213                         return None
1214                     self.hits_counter.add(1)
1215                     blob = slot.get()
1216                     if blob is None:
1217                         raise arvados.errors.KeepReadError(
1218                             "failed to read {}".format(loc_s))
1219                     return blob
1220
1221             self.misses_counter.add(1)
1222
1223             # If the locator has hints specifying a prefix (indicating a
1224             # remote keepproxy) or the UUID of a local gateway service,
1225             # read data from the indicated service(s) instead of the usual
1226             # list of local disk services.
1227             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1228                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1229             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1230                                for hint in locator.hints if (
1231                                        hint.startswith('K@') and
1232                                        len(hint) == 29 and
1233                                        self._gateway_services.get(hint[2:])
1234                                        )])
1235             # Map root URLs to their KeepService objects.
1236             roots_map = {
1237                 root: self.KeepService(root, self._user_agent_pool,
1238                                        upload_counter=self.upload_counter,
1239                                        download_counter=self.download_counter,
1240                                        headers=headers,
1241                                        insecure=self.insecure)
1242                 for root in hint_roots
1243             }
1244
1245             # See #3147 for a discussion of the loop implementation.  Highlights:
1246             # * Refresh the list of Keep services after each failure, in case
1247             #   it's being updated.
1248             # * Retry until we succeed, we're out of retries, or every available
1249             #   service has returned permanent failure.
1250             sorted_roots = []
1251             roots_map = {}
1252             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1253                                    backoff_start=2)
1254             for tries_left in loop:
1255                 try:
1256                     sorted_roots = self.map_new_services(
1257                         roots_map, locator,
1258                         force_rebuild=(tries_left < num_retries),
1259                         need_writable=False,
1260                         headers=headers)
1261                 except Exception as error:
1262                     loop.save_result(error)
1263                     continue
1264
1265                 # Query KeepService objects that haven't returned
1266                 # permanent failure, in our specified shuffle order.
1267                 services_to_try = [roots_map[root]
1268                                    for root in sorted_roots
1269                                    if roots_map[root].usable()]
1270                 for keep_service in services_to_try:
1271                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1272                     if blob is not None:
1273                         break
1274                 loop.save_result((blob, len(services_to_try)))
1275
1276             # Always cache the result, then return it if we succeeded.
1277             if loop.success():
1278                 return blob
1279         finally:
1280             if slot is not None:
1281                 self.block_cache.set(slot, blob)
1282
1283         # Q: Including 403 is necessary for the Keep tests to continue
1284         # passing, but maybe they should expect KeepReadError instead?
1285         not_founds = sum(1 for key in sorted_roots
1286                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1287         service_errors = ((key, roots_map[key].last_result()['error'])
1288                           for key in sorted_roots)
1289         if not roots_map:
1290             raise arvados.errors.KeepReadError(
1291                 "[{}] failed to read {}: no Keep services available ({})".format(
1292                     request_id, loc_s, loop.last_result()))
1293         elif not_founds == len(sorted_roots):
1294             raise arvados.errors.NotFoundError(
1295                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1296         else:
1297             raise arvados.errors.KeepReadError(
1298                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1299
1300     @retry.retry_method
1301     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1302         """Save data in Keep.
1303
1304         This method will get a list of Keep services from the API server, and
1305         send the data to each one simultaneously in a new thread.  Once the
1306         uploads are finished, if enough copies are saved, this method returns
1307         the most recent HTTP response body.  If requests fail to upload
1308         enough copies, this method raises KeepWriteError.
1309
1310         Arguments:
1311         * data: The string of data to upload.
1312         * copies: The number of copies that the user requires be saved.
1313           Default 2.
1314         * num_retries: The number of times to retry PUT requests to
1315           *each* Keep server if it returns temporary failures, with
1316           exponential backoff.  The default value is set when the
1317           KeepClient is initialized.
1318         * classes: An optional list of storage class names where copies should
1319           be written.
1320         """
1321
1322         classes = classes or self._default_classes
1323
1324         if not isinstance(data, bytes):
1325             data = data.encode()
1326
1327         self.put_counter.add(1)
1328
1329         data_hash = hashlib.md5(data).hexdigest()
1330         loc_s = data_hash + '+' + str(len(data))
1331         if copies < 1:
1332             return loc_s
1333         locator = KeepLocator(loc_s)
1334
1335         request_id = (request_id or
1336                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1337                       arvados.util.new_request_id())
1338         headers = {
1339             'X-Request-Id': request_id,
1340             'X-Keep-Desired-Replicas': str(copies),
1341         }
1342         roots_map = {}
1343         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1344                                backoff_start=2)
1345         done_copies = 0
1346         done_classes = []
1347         for tries_left in loop:
1348             try:
1349                 sorted_roots = self.map_new_services(
1350                     roots_map, locator,
1351                     force_rebuild=(tries_left < num_retries),
1352                     need_writable=True,
1353                     headers=headers)
1354             except Exception as error:
1355                 loop.save_result(error)
1356                 continue
1357
1358             pending_classes = []
1359             if done_classes is not None:
1360                 pending_classes = list(set(classes) - set(done_classes))
1361             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1362                                                         data_hash=data_hash,
1363                                                         copies=copies - done_copies,
1364                                                         max_service_replicas=self.max_replicas_per_service,
1365                                                         timeout=self.current_timeout(num_retries - tries_left),
1366                                                         classes=pending_classes)
1367             for service_root, ks in [(root, roots_map[root])
1368                                      for root in sorted_roots]:
1369                 if ks.finished():
1370                     continue
1371                 writer_pool.add_task(ks, service_root)
1372             writer_pool.join()
1373             pool_copies, pool_classes = writer_pool.done()
1374             done_copies += pool_copies
1375             if (done_classes is not None) and (pool_classes is not None):
1376                 done_classes += pool_classes
1377                 loop.save_result(
1378                     (done_copies >= copies and set(done_classes) == set(classes),
1379                     writer_pool.total_task_nr))
1380             else:
1381                 # Old keepstore contacted without storage classes support:
1382                 # success is determined only by successful copies.
1383                 #
1384                 # Disable storage classes tracking from this point forward.
1385                 if not self._storage_classes_unsupported_warning:
1386                     self._storage_classes_unsupported_warning = True
1387                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1388                 done_classes = None
1389                 loop.save_result(
1390                     (done_copies >= copies, writer_pool.total_task_nr))
1391
1392         if loop.success():
1393             return writer_pool.response()
1394         if not roots_map:
1395             raise arvados.errors.KeepWriteError(
1396                 "[{}] failed to write {}: no Keep services available ({})".format(
1397                     request_id, data_hash, loop.last_result()))
1398         else:
1399             service_errors = ((key, roots_map[key].last_result()['error'])
1400                               for key in sorted_roots
1401                               if roots_map[key].last_result()['error'])
1402             raise arvados.errors.KeepWriteError(
1403                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1404                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1405
1406     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1407         """A stub for put().
1408
1409         This method is used in place of the real put() method when
1410         using local storage (see constructor's local_store argument).
1411
1412         copies and num_retries arguments are ignored: they are here
1413         only for the sake of offering the same call signature as
1414         put().
1415
1416         Data stored this way can be retrieved via local_store_get().
1417         """
1418         md5 = hashlib.md5(data).hexdigest()
1419         locator = '%s+%d' % (md5, len(data))
1420         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1421             f.write(data)
1422         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1423                   os.path.join(self.local_store, md5))
1424         return locator
1425
1426     def local_store_get(self, loc_s, num_retries=None):
1427         """Companion to local_store_put()."""
1428         try:
1429             locator = KeepLocator(loc_s)
1430         except ValueError:
1431             raise arvados.errors.NotFoundError(
1432                 "Invalid data locator: '%s'" % loc_s)
1433         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1434             return b''
1435         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1436             return f.read()
1437
1438     def local_store_head(self, loc_s, num_retries=None):
1439         """Companion to local_store_put()."""
1440         try:
1441             locator = KeepLocator(loc_s)
1442         except ValueError:
1443             raise arvados.errors.NotFoundError(
1444                 "Invalid data locator: '%s'" % loc_s)
1445         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1446             return True
1447         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1448             return True