18842: Turn disk write errors into KeepCacheError
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34
35 if sys.version_info >= (3, 0):
36     from io import BytesIO
37 else:
38     from cStringIO import StringIO as BytesIO
39
40 import arvados
41 import arvados.config as config
42 import arvados.errors
43 import arvados.retry as retry
44 import arvados.util
45 import arvados.diskcache
46
47 _logger = logging.getLogger('arvados.keep')
48 global_client_object = None
49
50
51 # Monkey patch TCP constants when not available (apple). Values sourced from:
52 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
53 if sys.platform == 'darwin':
54     if not hasattr(socket, 'TCP_KEEPALIVE'):
55         socket.TCP_KEEPALIVE = 0x010
56     if not hasattr(socket, 'TCP_KEEPINTVL'):
57         socket.TCP_KEEPINTVL = 0x101
58     if not hasattr(socket, 'TCP_KEEPCNT'):
59         socket.TCP_KEEPCNT = 0x102
60
61
62 class KeepLocator(object):
63     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
64     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
65
66     def __init__(self, locator_str):
67         self.hints = []
68         self._perm_sig = None
69         self._perm_expiry = None
70         pieces = iter(locator_str.split('+'))
71         self.md5sum = next(pieces)
72         try:
73             self.size = int(next(pieces))
74         except StopIteration:
75             self.size = None
76         for hint in pieces:
77             if self.HINT_RE.match(hint) is None:
78                 raise ValueError("invalid hint format: {}".format(hint))
79             elif hint.startswith('A'):
80                 self.parse_permission_hint(hint)
81             else:
82                 self.hints.append(hint)
83
84     def __str__(self):
85         return '+'.join(
86             native_str(s)
87             for s in [self.md5sum, self.size,
88                       self.permission_hint()] + self.hints
89             if s is not None)
90
91     def stripped(self):
92         if self.size is not None:
93             return "%s+%i" % (self.md5sum, self.size)
94         else:
95             return self.md5sum
96
97     def _make_hex_prop(name, length):
98         # Build and return a new property with the given name that
99         # must be a hex string of the given length.
100         data_name = '_{}'.format(name)
101         def getter(self):
102             return getattr(self, data_name)
103         def setter(self, hex_str):
104             if not arvados.util.is_hex(hex_str, length):
105                 raise ValueError("{} is not a {}-digit hex string: {!r}".
106                                  format(name, length, hex_str))
107             setattr(self, data_name, hex_str)
108         return property(getter, setter)
109
110     md5sum = _make_hex_prop('md5sum', 32)
111     perm_sig = _make_hex_prop('perm_sig', 40)
112
113     @property
114     def perm_expiry(self):
115         return self._perm_expiry
116
117     @perm_expiry.setter
118     def perm_expiry(self, value):
119         if not arvados.util.is_hex(value, 1, 8):
120             raise ValueError(
121                 "permission timestamp must be a hex Unix timestamp: {}".
122                 format(value))
123         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
124
125     def permission_hint(self):
126         data = [self.perm_sig, self.perm_expiry]
127         if None in data:
128             return None
129         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
130         return "A{}@{:08x}".format(*data)
131
132     def parse_permission_hint(self, s):
133         try:
134             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
135         except IndexError:
136             raise ValueError("bad permission hint {}".format(s))
137
138     def permission_expired(self, as_of_dt=None):
139         if self.perm_expiry is None:
140             return False
141         elif as_of_dt is None:
142             as_of_dt = datetime.datetime.now()
143         return self.perm_expiry <= as_of_dt
144
145
146 class Keep(object):
147     """Simple interface to a global KeepClient object.
148
149     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
150     own API client.  The global KeepClient will build an API client from the
151     current Arvados configuration, which may not match the one you built.
152     """
153     _last_key = None
154
155     @classmethod
156     def global_client_object(cls):
157         global global_client_object
158         # Previously, KeepClient would change its behavior at runtime based
159         # on these configuration settings.  We simulate that behavior here
160         # by checking the values and returning a new KeepClient if any of
161         # them have changed.
162         key = (config.get('ARVADOS_API_HOST'),
163                config.get('ARVADOS_API_TOKEN'),
164                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
165                config.get('ARVADOS_KEEP_PROXY'),
166                os.environ.get('KEEP_LOCAL_STORE'))
167         if (global_client_object is None) or (cls._last_key != key):
168             global_client_object = KeepClient()
169             cls._last_key = key
170         return global_client_object
171
172     @staticmethod
173     def get(locator, **kwargs):
174         return Keep.global_client_object().get(locator, **kwargs)
175
176     @staticmethod
177     def put(data, **kwargs):
178         return Keep.global_client_object().put(data, **kwargs)
179
180 class KeepBlockCache(object):
181     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
182         self.cache_max = cache_max
183         self._cache = []
184         self._cache_lock = threading.Lock()
185         self._max_slots = max_slots
186         self._disk_cache = disk_cache
187         self._disk_cache_dir = disk_cache_dir
188
189         if self._disk_cache and self._disk_cache_dir is None:
190             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
191             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
192
193         if self._max_slots == 0:
194             if self._disk_cache:
195                 # default max slots to half of maximum file handles
196                 # NOFILE typically defaults to 1024 on Linux so this
197                 # will be 512 slots.
198                 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
199             else:
200                 # RAM cache slots
201                 self._max_slots = 512
202
203         if self.cache_max == 0:
204             if self._disk_cache:
205                 fs = os.statvfs(self._disk_cache_dir)
206                 # Calculation of available space incorporates existing cache usage
207                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
210                 # pick smallest of:
211                 # 10% of total disk size
212                 # 25% of available space
213                 # max_slots * 64 MiB
214                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
215             else:
216                 # 256 MiB in RAM
217                 self.cache_max = (256 * 1024 * 1024)
218
219         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
220
221         if self._disk_cache:
222             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
223             self.cap_cache()
224
225
226     class CacheSlot(object):
227         __slots__ = ("locator", "ready", "content")
228
229         def __init__(self, locator):
230             self.locator = locator
231             self.ready = threading.Event()
232             self.content = None
233
234         def get(self):
235             self.ready.wait()
236             return self.content
237
238         def set(self, value):
239             self.content = value
240             self.ready.set()
241
242         def size(self):
243             if self.content is None:
244                 return 0
245             else:
246                 return len(self.content)
247
248         def evict(self):
249             return True
250
251     def cap_cache(self):
252         '''Cap the cache size to self.cache_max'''
253         with self._cache_lock:
254             # Select all slots except those where ready.is_set() and content is
255             # None (that means there was an error reading the block).
256             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
257             sm = sum([slot.size() for slot in self._cache])
258             while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
259                 for i in range(len(self._cache)-1, -1, -1):
260                     # start from the back, find a slot that is a candidate to evict
261                     if self._cache[i].ready.is_set():
262                         sz = self._cache[i].size()
263
264                         # If evict returns false it means the
265                         # underlying disk cache couldn't lock the file
266                         # for deletion because another process was using
267                         # it. Don't count it as reducing the amount
268                         # of data in the cache, find something else to
269                         # throw out.
270                         if self._cache[i].evict():
271                             sm -= sz
272
273                         # either way we forget about it.  either the
274                         # other process will delete it, or if we need
275                         # it again and it is still there, we'll find
276                         # it on disk.
277                         del self._cache[i]
278                         break
279
280     def _get(self, locator):
281         # Test if the locator is already in the cache
282         for i in range(0, len(self._cache)):
283             if self._cache[i].locator == locator:
284                 n = self._cache[i]
285                 if i != 0:
286                     # move it to the front
287                     del self._cache[i]
288                     self._cache.insert(0, n)
289                 return n
290         if self._disk_cache:
291             # see if it exists on disk
292             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
293             if n is not None:
294                 self._cache.insert(0, n)
295                 return n
296         return None
297
298     def get(self, locator):
299         with self._cache_lock:
300             return self._get(locator)
301
302     def reserve_cache(self, locator):
303         '''Reserve a cache slot for the specified locator,
304         or return the existing slot.'''
305         with self._cache_lock:
306             n = self._get(locator)
307             if n:
308                 return n, False
309             else:
310                 # Add a new cache slot for the locator
311                 if self._disk_cache:
312                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
313                 else:
314                     n = KeepBlockCache.CacheSlot(locator)
315                 self._cache.insert(0, n)
316                 return n, True
317
318     def set(self, slot, blob):
319         tryagain = False
320
321         try:
322             slot.set(blob)
323         except OSError as e:
324             tryagain = True
325             if e.errno == errno.ENOMEM:
326                 # Reduce max slots to current - 4, cap cache and retry
327                 with self._cache_lock:
328                     self._max_slots = max(4, len(self._cache) - 4)
329             elif e.errno == errno.ENOSPC:
330                 # Reduce disk max space to current - 256 MiB, cap cache and retry
331                 with self._cache_lock:
332                     sm = sum([st.size() for st in self._cache])
333                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
334             elif e.errno == errno.ENODEV:
335                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
336         except Exception as e:
337             tryagain = True
338
339         try:
340             if tryagain:
341                 # There was an error.  Evict some slots and try again.
342                 self.cap_cache()
343                 slot.set(blob)
344         except Exception as e:
345             # It failed again.  Give up.
346             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
347         finally:
348             # Set the notice that that we are done with the cache
349             # slot one way or another.
350             slot.ready.set()
351
352         self.cap_cache()
353
354 class Counter(object):
355     def __init__(self, v=0):
356         self._lk = threading.Lock()
357         self._val = v
358
359     def add(self, v):
360         with self._lk:
361             self._val += v
362
363     def get(self):
364         with self._lk:
365             return self._val
366
367
368 class KeepClient(object):
369
370     # Default Keep server connection timeout:  2 seconds
371     # Default Keep server read timeout:       256 seconds
372     # Default Keep server bandwidth minimum:  32768 bytes per second
373     # Default Keep proxy connection timeout:  20 seconds
374     # Default Keep proxy read timeout:        256 seconds
375     # Default Keep proxy bandwidth minimum:   32768 bytes per second
376     DEFAULT_TIMEOUT = (2, 256, 32768)
377     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
378
379
380     class KeepService(object):
381         """Make requests to a single Keep service, and track results.
382
383         A KeepService is intended to last long enough to perform one
384         transaction (GET or PUT) against one Keep service. This can
385         involve calling either get() or put() multiple times in order
386         to retry after transient failures. However, calling both get()
387         and put() on a single instance -- or using the same instance
388         to access two different Keep services -- will not produce
389         sensible behavior.
390         """
391
392         HTTP_ERRORS = (
393             socket.error,
394             ssl.SSLError,
395             arvados.errors.HttpError,
396         )
397
398         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
399                      upload_counter=None,
400                      download_counter=None,
401                      headers={},
402                      insecure=False):
403             self.root = root
404             self._user_agent_pool = user_agent_pool
405             self._result = {'error': None}
406             self._usable = True
407             self._session = None
408             self._socket = None
409             self.get_headers = {'Accept': 'application/octet-stream'}
410             self.get_headers.update(headers)
411             self.put_headers = headers
412             self.upload_counter = upload_counter
413             self.download_counter = download_counter
414             self.insecure = insecure
415
416         def usable(self):
417             """Is it worth attempting a request?"""
418             return self._usable
419
420         def finished(self):
421             """Did the request succeed or encounter permanent failure?"""
422             return self._result['error'] == False or not self._usable
423
424         def last_result(self):
425             return self._result
426
427         def _get_user_agent(self):
428             try:
429                 return self._user_agent_pool.get(block=False)
430             except queue.Empty:
431                 return pycurl.Curl()
432
433         def _put_user_agent(self, ua):
434             try:
435                 ua.reset()
436                 self._user_agent_pool.put(ua, block=False)
437             except:
438                 ua.close()
439
440         def _socket_open(self, *args, **kwargs):
441             if len(args) + len(kwargs) == 2:
442                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
443             else:
444                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
445
446         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
447             return self._socket_open_pycurl_7_21_5(
448                 purpose=None,
449                 address=collections.namedtuple(
450                     'Address', ['family', 'socktype', 'protocol', 'addr'],
451                 )(family, socktype, protocol, address))
452
453         def _socket_open_pycurl_7_21_5(self, purpose, address):
454             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
455             s = socket.socket(address.family, address.socktype, address.protocol)
456             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
457             # Will throw invalid protocol error on mac. This test prevents that.
458             if hasattr(socket, 'TCP_KEEPIDLE'):
459                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
460             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
461             self._socket = s
462             return s
463
464         def get(self, locator, method="GET", timeout=None):
465             # locator is a KeepLocator object.
466             url = self.root + str(locator)
467             _logger.debug("Request: %s %s", method, url)
468             curl = self._get_user_agent()
469             ok = None
470             try:
471                 with timer.Timer() as t:
472                     self._headers = {}
473                     response_body = BytesIO()
474                     curl.setopt(pycurl.NOSIGNAL, 1)
475                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
476                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
477                     curl.setopt(pycurl.URL, url.encode('utf-8'))
478                     curl.setopt(pycurl.HTTPHEADER, [
479                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
480                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
481                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
482                     if self.insecure:
483                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
484                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
485                     else:
486                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
487                     if method == "HEAD":
488                         curl.setopt(pycurl.NOBODY, True)
489                     self._setcurltimeouts(curl, timeout, method=="HEAD")
490
491                     try:
492                         curl.perform()
493                     except Exception as e:
494                         raise arvados.errors.HttpError(0, str(e))
495                     finally:
496                         if self._socket:
497                             self._socket.close()
498                             self._socket = None
499                     self._result = {
500                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
501                         'body': response_body.getvalue(),
502                         'headers': self._headers,
503                         'error': False,
504                     }
505
506                 ok = retry.check_http_response_success(self._result['status_code'])
507                 if not ok:
508                     self._result['error'] = arvados.errors.HttpError(
509                         self._result['status_code'],
510                         self._headers.get('x-status-line', 'Error'))
511             except self.HTTP_ERRORS as e:
512                 self._result = {
513                     'error': e,
514                 }
515             self._usable = ok != False
516             if self._result.get('status_code', None):
517                 # The client worked well enough to get an HTTP status
518                 # code, so presumably any problems are just on the
519                 # server side and it's OK to reuse the client.
520                 self._put_user_agent(curl)
521             else:
522                 # Don't return this client to the pool, in case it's
523                 # broken.
524                 curl.close()
525             if not ok:
526                 _logger.debug("Request fail: GET %s => %s: %s",
527                               url, type(self._result['error']), str(self._result['error']))
528                 return None
529             if method == "HEAD":
530                 _logger.info("HEAD %s: %s bytes",
531                          self._result['status_code'],
532                          self._result.get('content-length'))
533                 if self._result['headers'].get('x-keep-locator'):
534                     # This is a response to a remote block copy request, return
535                     # the local copy block locator.
536                     return self._result['headers'].get('x-keep-locator')
537                 return True
538
539             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
540                          self._result['status_code'],
541                          len(self._result['body']),
542                          t.msecs,
543                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
544
545             if self.download_counter:
546                 self.download_counter.add(len(self._result['body']))
547             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
548             if resp_md5 != locator.md5sum:
549                 _logger.warning("Checksum fail: md5(%s) = %s",
550                                 url, resp_md5)
551                 self._result['error'] = arvados.errors.HttpError(
552                     0, 'Checksum fail')
553                 return None
554             return self._result['body']
555
556         def put(self, hash_s, body, timeout=None, headers={}):
557             put_headers = copy.copy(self.put_headers)
558             put_headers.update(headers)
559             url = self.root + hash_s
560             _logger.debug("Request: PUT %s", url)
561             curl = self._get_user_agent()
562             ok = None
563             try:
564                 with timer.Timer() as t:
565                     self._headers = {}
566                     body_reader = BytesIO(body)
567                     response_body = BytesIO()
568                     curl.setopt(pycurl.NOSIGNAL, 1)
569                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
570                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
571                     curl.setopt(pycurl.URL, url.encode('utf-8'))
572                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
573                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
574                     # response) instead of sending the request body immediately.
575                     # This allows the server to reject the request if the request
576                     # is invalid or the server is read-only, without waiting for
577                     # the client to send the entire block.
578                     curl.setopt(pycurl.UPLOAD, True)
579                     curl.setopt(pycurl.INFILESIZE, len(body))
580                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
581                     curl.setopt(pycurl.HTTPHEADER, [
582                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
583                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
584                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
585                     if self.insecure:
586                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
587                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
588                     else:
589                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
590                     self._setcurltimeouts(curl, timeout)
591                     try:
592                         curl.perform()
593                     except Exception as e:
594                         raise arvados.errors.HttpError(0, str(e))
595                     finally:
596                         if self._socket:
597                             self._socket.close()
598                             self._socket = None
599                     self._result = {
600                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
601                         'body': response_body.getvalue().decode('utf-8'),
602                         'headers': self._headers,
603                         'error': False,
604                     }
605                 ok = retry.check_http_response_success(self._result['status_code'])
606                 if not ok:
607                     self._result['error'] = arvados.errors.HttpError(
608                         self._result['status_code'],
609                         self._headers.get('x-status-line', 'Error'))
610             except self.HTTP_ERRORS as e:
611                 self._result = {
612                     'error': e,
613                 }
614             self._usable = ok != False # still usable if ok is True or None
615             if self._result.get('status_code', None):
616                 # Client is functional. See comment in get().
617                 self._put_user_agent(curl)
618             else:
619                 curl.close()
620             if not ok:
621                 _logger.debug("Request fail: PUT %s => %s: %s",
622                               url, type(self._result['error']), str(self._result['error']))
623                 return False
624             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
625                          self._result['status_code'],
626                          len(body),
627                          t.msecs,
628                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
629             if self.upload_counter:
630                 self.upload_counter.add(len(body))
631             return True
632
633         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
634             if not timeouts:
635                 return
636             elif isinstance(timeouts, tuple):
637                 if len(timeouts) == 2:
638                     conn_t, xfer_t = timeouts
639                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
640                 else:
641                     conn_t, xfer_t, bandwidth_bps = timeouts
642             else:
643                 conn_t, xfer_t = (timeouts, timeouts)
644                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
645             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
646             if not ignore_bandwidth:
647                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
648                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
649
650         def _headerfunction(self, header_line):
651             if isinstance(header_line, bytes):
652                 header_line = header_line.decode('iso-8859-1')
653             if ':' in header_line:
654                 name, value = header_line.split(':', 1)
655                 name = name.strip().lower()
656                 value = value.strip()
657             elif self._headers:
658                 name = self._lastheadername
659                 value = self._headers[name] + ' ' + header_line.strip()
660             elif header_line.startswith('HTTP/'):
661                 name = 'x-status-line'
662                 value = header_line
663             else:
664                 _logger.error("Unexpected header line: %s", header_line)
665                 return
666             self._lastheadername = name
667             self._headers[name] = value
668             # Returning None implies all bytes were written
669
670
671     class KeepWriterQueue(queue.Queue):
672         def __init__(self, copies, classes=[]):
673             queue.Queue.__init__(self) # Old-style superclass
674             self.wanted_copies = copies
675             self.wanted_storage_classes = classes
676             self.successful_copies = 0
677             self.confirmed_storage_classes = {}
678             self.response = None
679             self.storage_classes_tracking = True
680             self.queue_data_lock = threading.RLock()
681             self.pending_tries = max(copies, len(classes))
682             self.pending_tries_notification = threading.Condition()
683
684         def write_success(self, response, replicas_nr, classes_confirmed):
685             with self.queue_data_lock:
686                 self.successful_copies += replicas_nr
687                 if classes_confirmed is None:
688                     self.storage_classes_tracking = False
689                 elif self.storage_classes_tracking:
690                     for st_class, st_copies in classes_confirmed.items():
691                         try:
692                             self.confirmed_storage_classes[st_class] += st_copies
693                         except KeyError:
694                             self.confirmed_storage_classes[st_class] = st_copies
695                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
696                 self.response = response
697             with self.pending_tries_notification:
698                 self.pending_tries_notification.notify_all()
699
700         def write_fail(self, ks):
701             with self.pending_tries_notification:
702                 self.pending_tries += 1
703                 self.pending_tries_notification.notify()
704
705         def pending_copies(self):
706             with self.queue_data_lock:
707                 return self.wanted_copies - self.successful_copies
708
709         def satisfied_classes(self):
710             with self.queue_data_lock:
711                 if not self.storage_classes_tracking:
712                     # Notifies disabled storage classes expectation to
713                     # the outer loop.
714                     return None
715             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
716
717         def pending_classes(self):
718             with self.queue_data_lock:
719                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
720                     return []
721                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
722                 for st_class, st_copies in self.confirmed_storage_classes.items():
723                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
724                         unsatisfied_classes.remove(st_class)
725                 return unsatisfied_classes
726
727         def get_next_task(self):
728             with self.pending_tries_notification:
729                 while True:
730                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
731                         # This notify_all() is unnecessary --
732                         # write_success() already called notify_all()
733                         # when pending<1 became true, so it's not
734                         # possible for any other thread to be in
735                         # wait() now -- but it's cheap insurance
736                         # against deadlock so we do it anyway:
737                         self.pending_tries_notification.notify_all()
738                         # Drain the queue and then raise Queue.Empty
739                         while True:
740                             self.get_nowait()
741                             self.task_done()
742                     elif self.pending_tries > 0:
743                         service, service_root = self.get_nowait()
744                         if service.finished():
745                             self.task_done()
746                             continue
747                         self.pending_tries -= 1
748                         return service, service_root
749                     elif self.empty():
750                         self.pending_tries_notification.notify_all()
751                         raise queue.Empty
752                     else:
753                         self.pending_tries_notification.wait()
754
755
756     class KeepWriterThreadPool(object):
757         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
758             self.total_task_nr = 0
759             if (not max_service_replicas) or (max_service_replicas >= copies):
760                 num_threads = 1
761             else:
762                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
763             _logger.debug("Pool max threads is %d", num_threads)
764             self.workers = []
765             self.queue = KeepClient.KeepWriterQueue(copies, classes)
766             # Create workers
767             for _ in range(num_threads):
768                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
769                 self.workers.append(w)
770
771         def add_task(self, ks, service_root):
772             self.queue.put((ks, service_root))
773             self.total_task_nr += 1
774
775         def done(self):
776             return self.queue.successful_copies, self.queue.satisfied_classes()
777
778         def join(self):
779             # Start workers
780             for worker in self.workers:
781                 worker.start()
782             # Wait for finished work
783             self.queue.join()
784
785         def response(self):
786             return self.queue.response
787
788
789     class KeepWriterThread(threading.Thread):
790         class TaskFailed(RuntimeError): pass
791
792         def __init__(self, queue, data, data_hash, timeout=None):
793             super(KeepClient.KeepWriterThread, self).__init__()
794             self.timeout = timeout
795             self.queue = queue
796             self.data = data
797             self.data_hash = data_hash
798             self.daemon = True
799
800         def run(self):
801             while True:
802                 try:
803                     service, service_root = self.queue.get_next_task()
804                 except queue.Empty:
805                     return
806                 try:
807                     locator, copies, classes = self.do_task(service, service_root)
808                 except Exception as e:
809                     if not isinstance(e, self.TaskFailed):
810                         _logger.exception("Exception in KeepWriterThread")
811                     self.queue.write_fail(service)
812                 else:
813                     self.queue.write_success(locator, copies, classes)
814                 finally:
815                     self.queue.task_done()
816
817         def do_task(self, service, service_root):
818             classes = self.queue.pending_classes()
819             headers = {}
820             if len(classes) > 0:
821                 classes.sort()
822                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
823             success = bool(service.put(self.data_hash,
824                                         self.data,
825                                         timeout=self.timeout,
826                                         headers=headers))
827             result = service.last_result()
828
829             if not success:
830                 if result.get('status_code'):
831                     _logger.debug("Request fail: PUT %s => %s %s",
832                                   self.data_hash,
833                                   result.get('status_code'),
834                                   result.get('body'))
835                 raise self.TaskFailed()
836
837             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
838                           str(threading.current_thread()),
839                           self.data_hash,
840                           len(self.data),
841                           service_root)
842             try:
843                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
844             except (KeyError, ValueError):
845                 replicas_stored = 1
846
847             classes_confirmed = {}
848             try:
849                 scch = result['headers']['x-keep-storage-classes-confirmed']
850                 for confirmation in scch.replace(' ', '').split(','):
851                     if '=' in confirmation:
852                         stored_class, stored_copies = confirmation.split('=')[:2]
853                         classes_confirmed[stored_class] = int(stored_copies)
854             except (KeyError, ValueError):
855                 # Storage classes confirmed header missing or corrupt
856                 classes_confirmed = None
857
858             return result['body'].strip(), replicas_stored, classes_confirmed
859
860
861     def __init__(self, api_client=None, proxy=None,
862                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
863                  api_token=None, local_store=None, block_cache=None,
864                  num_retries=0, session=None):
865         """Initialize a new KeepClient.
866
867         Arguments:
868         :api_client:
869           The API client to use to find Keep services.  If not
870           provided, KeepClient will build one from available Arvados
871           configuration.
872
873         :proxy:
874           If specified, this KeepClient will send requests to this Keep
875           proxy.  Otherwise, KeepClient will fall back to the setting of the
876           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
877           If you want to KeepClient does not use a proxy, pass in an empty
878           string.
879
880         :timeout:
881           The initial timeout (in seconds) for HTTP requests to Keep
882           non-proxy servers.  A tuple of three floats is interpreted as
883           (connection_timeout, read_timeout, minimum_bandwidth). A connection
884           will be aborted if the average traffic rate falls below
885           minimum_bandwidth bytes per second over an interval of read_timeout
886           seconds. Because timeouts are often a result of transient server
887           load, the actual connection timeout will be increased by a factor
888           of two on each retry.
889           Default: (2, 256, 32768).
890
891         :proxy_timeout:
892           The initial timeout (in seconds) for HTTP requests to
893           Keep proxies. A tuple of three floats is interpreted as
894           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
895           described above for adjusting connection timeouts on retry also
896           applies.
897           Default: (20, 256, 32768).
898
899         :api_token:
900           If you're not using an API client, but only talking
901           directly to a Keep proxy, this parameter specifies an API token
902           to authenticate Keep requests.  It is an error to specify both
903           api_client and api_token.  If you specify neither, KeepClient
904           will use one available from the Arvados configuration.
905
906         :local_store:
907           If specified, this KeepClient will bypass Keep
908           services, and save data to the named directory.  If unspecified,
909           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
910           environment variable.  If you want to ensure KeepClient does not
911           use local storage, pass in an empty string.  This is primarily
912           intended to mock a server for testing.
913
914         :num_retries:
915           The default number of times to retry failed requests.
916           This will be used as the default num_retries value when get() and
917           put() are called.  Default 0.
918         """
919         self.lock = threading.Lock()
920         if proxy is None:
921             if config.get('ARVADOS_KEEP_SERVICES'):
922                 proxy = config.get('ARVADOS_KEEP_SERVICES')
923             else:
924                 proxy = config.get('ARVADOS_KEEP_PROXY')
925         if api_token is None:
926             if api_client is None:
927                 api_token = config.get('ARVADOS_API_TOKEN')
928             else:
929                 api_token = api_client.api_token
930         elif api_client is not None:
931             raise ValueError(
932                 "can't build KeepClient with both API client and token")
933         if local_store is None:
934             local_store = os.environ.get('KEEP_LOCAL_STORE')
935
936         if api_client is None:
937             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
938         else:
939             self.insecure = api_client.insecure
940
941         self.block_cache = block_cache if block_cache else KeepBlockCache()
942         self.timeout = timeout
943         self.proxy_timeout = proxy_timeout
944         self._user_agent_pool = queue.LifoQueue()
945         self.upload_counter = Counter()
946         self.download_counter = Counter()
947         self.put_counter = Counter()
948         self.get_counter = Counter()
949         self.hits_counter = Counter()
950         self.misses_counter = Counter()
951         self._storage_classes_unsupported_warning = False
952         self._default_classes = []
953
954         if local_store:
955             self.local_store = local_store
956             self.head = self.local_store_head
957             self.get = self.local_store_get
958             self.put = self.local_store_put
959         else:
960             self.num_retries = num_retries
961             self.max_replicas_per_service = None
962             if proxy:
963                 proxy_uris = proxy.split()
964                 for i in range(len(proxy_uris)):
965                     if not proxy_uris[i].endswith('/'):
966                         proxy_uris[i] += '/'
967                     # URL validation
968                     url = urllib.parse.urlparse(proxy_uris[i])
969                     if not (url.scheme and url.netloc):
970                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
971                 self.api_token = api_token
972                 self._gateway_services = {}
973                 self._keep_services = [{
974                     'uuid': "00000-bi6l4-%015d" % idx,
975                     'service_type': 'proxy',
976                     '_service_root': uri,
977                     } for idx, uri in enumerate(proxy_uris)]
978                 self._writable_services = self._keep_services
979                 self.using_proxy = True
980                 self._static_services_list = True
981             else:
982                 # It's important to avoid instantiating an API client
983                 # unless we actually need one, for testing's sake.
984                 if api_client is None:
985                     api_client = arvados.api('v1')
986                 self.api_client = api_client
987                 self.api_token = api_client.api_token
988                 self._gateway_services = {}
989                 self._keep_services = None
990                 self._writable_services = None
991                 self.using_proxy = None
992                 self._static_services_list = False
993                 try:
994                     self._default_classes = [
995                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
996                 except KeyError:
997                     # We're talking to an old cluster
998                     pass
999
1000     def current_timeout(self, attempt_number):
1001         """Return the appropriate timeout to use for this client.
1002
1003         The proxy timeout setting if the backend service is currently a proxy,
1004         the regular timeout setting otherwise.  The `attempt_number` indicates
1005         how many times the operation has been tried already (starting from 0
1006         for the first try), and scales the connection timeout portion of the
1007         return value accordingly.
1008
1009         """
1010         # TODO(twp): the timeout should be a property of a
1011         # KeepService, not a KeepClient. See #4488.
1012         t = self.proxy_timeout if self.using_proxy else self.timeout
1013         if len(t) == 2:
1014             return (t[0] * (1 << attempt_number), t[1])
1015         else:
1016             return (t[0] * (1 << attempt_number), t[1], t[2])
1017     def _any_nondisk_services(self, service_list):
1018         return any(ks.get('service_type', 'disk') != 'disk'
1019                    for ks in service_list)
1020
1021     def build_services_list(self, force_rebuild=False):
1022         if (self._static_services_list or
1023               (self._keep_services and not force_rebuild)):
1024             return
1025         with self.lock:
1026             try:
1027                 keep_services = self.api_client.keep_services().accessible()
1028             except Exception:  # API server predates Keep services.
1029                 keep_services = self.api_client.keep_disks().list()
1030
1031             # Gateway services are only used when specified by UUID,
1032             # so there's nothing to gain by filtering them by
1033             # service_type.
1034             self._gateway_services = {ks['uuid']: ks for ks in
1035                                       keep_services.execute()['items']}
1036             if not self._gateway_services:
1037                 raise arvados.errors.NoKeepServersError()
1038
1039             # Precompute the base URI for each service.
1040             for r in self._gateway_services.values():
1041                 host = r['service_host']
1042                 if not host.startswith('[') and host.find(':') >= 0:
1043                     # IPv6 URIs must be formatted like http://[::1]:80/...
1044                     host = '[' + host + ']'
1045                 r['_service_root'] = "{}://{}:{:d}/".format(
1046                     'https' if r['service_ssl_flag'] else 'http',
1047                     host,
1048                     r['service_port'])
1049
1050             _logger.debug(str(self._gateway_services))
1051             self._keep_services = [
1052                 ks for ks in self._gateway_services.values()
1053                 if not ks.get('service_type', '').startswith('gateway:')]
1054             self._writable_services = [ks for ks in self._keep_services
1055                                        if not ks.get('read_only')]
1056
1057             # For disk type services, max_replicas_per_service is 1
1058             # It is unknown (unlimited) for other service types.
1059             if self._any_nondisk_services(self._writable_services):
1060                 self.max_replicas_per_service = None
1061             else:
1062                 self.max_replicas_per_service = 1
1063
1064     def _service_weight(self, data_hash, service_uuid):
1065         """Compute the weight of a Keep service endpoint for a data
1066         block with a known hash.
1067
1068         The weight is md5(h + u) where u is the last 15 characters of
1069         the service endpoint's UUID.
1070         """
1071         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1072
1073     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1074         """Return an array of Keep service endpoints, in the order in
1075         which they should be probed when reading or writing data with
1076         the given hash+hints.
1077         """
1078         self.build_services_list(force_rebuild)
1079
1080         sorted_roots = []
1081         # Use the services indicated by the given +K@... remote
1082         # service hints, if any are present and can be resolved to a
1083         # URI.
1084         for hint in locator.hints:
1085             if hint.startswith('K@'):
1086                 if len(hint) == 7:
1087                     sorted_roots.append(
1088                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1089                 elif len(hint) == 29:
1090                     svc = self._gateway_services.get(hint[2:])
1091                     if svc:
1092                         sorted_roots.append(svc['_service_root'])
1093
1094         # Sort the available local services by weight (heaviest first)
1095         # for this locator, and return their service_roots (base URIs)
1096         # in that order.
1097         use_services = self._keep_services
1098         if need_writable:
1099             use_services = self._writable_services
1100         self.using_proxy = self._any_nondisk_services(use_services)
1101         sorted_roots.extend([
1102             svc['_service_root'] for svc in sorted(
1103                 use_services,
1104                 reverse=True,
1105                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1106         _logger.debug("{}: {}".format(locator, sorted_roots))
1107         return sorted_roots
1108
1109     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1110         # roots_map is a dictionary, mapping Keep service root strings
1111         # to KeepService objects.  Poll for Keep services, and add any
1112         # new ones to roots_map.  Return the current list of local
1113         # root strings.
1114         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1115         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1116         for root in local_roots:
1117             if root not in roots_map:
1118                 roots_map[root] = self.KeepService(
1119                     root, self._user_agent_pool,
1120                     upload_counter=self.upload_counter,
1121                     download_counter=self.download_counter,
1122                     headers=headers,
1123                     insecure=self.insecure)
1124         return local_roots
1125
1126     @staticmethod
1127     def _check_loop_result(result):
1128         # KeepClient RetryLoops should save results as a 2-tuple: the
1129         # actual result of the request, and the number of servers available
1130         # to receive the request this round.
1131         # This method returns True if there's a real result, False if
1132         # there are no more servers available, otherwise None.
1133         if isinstance(result, Exception):
1134             return None
1135         result, tried_server_count = result
1136         if (result is not None) and (result is not False):
1137             return True
1138         elif tried_server_count < 1:
1139             _logger.info("No more Keep services to try; giving up")
1140             return False
1141         else:
1142             return None
1143
1144     def get_from_cache(self, loc_s):
1145         """Fetch a block only if is in the cache, otherwise return None."""
1146         locator = KeepLocator(loc_s)
1147         slot = self.block_cache.get(locator.md5sum)
1148         if slot is not None and slot.ready.is_set():
1149             return slot.get()
1150         else:
1151             return None
1152
1153     def refresh_signature(self, loc):
1154         """Ask Keep to get the remote block and return its local signature"""
1155         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1156         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1157
1158     @retry.retry_method
1159     def head(self, loc_s, **kwargs):
1160         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1161
1162     @retry.retry_method
1163     def get(self, loc_s, **kwargs):
1164         return self._get_or_head(loc_s, method="GET", **kwargs)
1165
1166     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1167         """Get data from Keep.
1168
1169         This method fetches one or more blocks of data from Keep.  It
1170         sends a request each Keep service registered with the API
1171         server (or the proxy provided when this client was
1172         instantiated), then each service named in location hints, in
1173         sequence.  As soon as one service provides the data, it's
1174         returned.
1175
1176         Arguments:
1177         * loc_s: A string of one or more comma-separated locators to fetch.
1178           This method returns the concatenation of these blocks.
1179         * num_retries: The number of times to retry GET requests to
1180           *each* Keep server if it returns temporary failures, with
1181           exponential backoff.  Note that, in each loop, the method may try
1182           to fetch data from every available Keep service, along with any
1183           that are named in location hints in the locator.  The default value
1184           is set when the KeepClient is initialized.
1185         """
1186         if ',' in loc_s:
1187             return ''.join(self.get(x) for x in loc_s.split(','))
1188
1189         self.get_counter.add(1)
1190
1191         request_id = (request_id or
1192                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1193                       arvados.util.new_request_id())
1194         if headers is None:
1195             headers = {}
1196         headers['X-Request-Id'] = request_id
1197
1198         slot = None
1199         blob = None
1200         try:
1201             locator = KeepLocator(loc_s)
1202             if method == "GET":
1203                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1204                 if not first:
1205                     if prefetch:
1206                         # this is request for a prefetch, if it is
1207                         # already in flight, return immediately.
1208                         # clear 'slot' to prevent finally block from
1209                         # calling slot.set()
1210                         slot = None
1211                         return None
1212                     self.hits_counter.add(1)
1213                     blob = slot.get()
1214                     if blob is None:
1215                         raise arvados.errors.KeepReadError(
1216                             "failed to read {}".format(loc_s))
1217                     return blob
1218
1219             self.misses_counter.add(1)
1220
1221             # If the locator has hints specifying a prefix (indicating a
1222             # remote keepproxy) or the UUID of a local gateway service,
1223             # read data from the indicated service(s) instead of the usual
1224             # list of local disk services.
1225             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1226                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1227             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1228                                for hint in locator.hints if (
1229                                        hint.startswith('K@') and
1230                                        len(hint) == 29 and
1231                                        self._gateway_services.get(hint[2:])
1232                                        )])
1233             # Map root URLs to their KeepService objects.
1234             roots_map = {
1235                 root: self.KeepService(root, self._user_agent_pool,
1236                                        upload_counter=self.upload_counter,
1237                                        download_counter=self.download_counter,
1238                                        headers=headers,
1239                                        insecure=self.insecure)
1240                 for root in hint_roots
1241             }
1242
1243             # See #3147 for a discussion of the loop implementation.  Highlights:
1244             # * Refresh the list of Keep services after each failure, in case
1245             #   it's being updated.
1246             # * Retry until we succeed, we're out of retries, or every available
1247             #   service has returned permanent failure.
1248             sorted_roots = []
1249             roots_map = {}
1250             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1251                                    backoff_start=2)
1252             for tries_left in loop:
1253                 try:
1254                     sorted_roots = self.map_new_services(
1255                         roots_map, locator,
1256                         force_rebuild=(tries_left < num_retries),
1257                         need_writable=False,
1258                         headers=headers)
1259                 except Exception as error:
1260                     loop.save_result(error)
1261                     continue
1262
1263                 # Query KeepService objects that haven't returned
1264                 # permanent failure, in our specified shuffle order.
1265                 services_to_try = [roots_map[root]
1266                                    for root in sorted_roots
1267                                    if roots_map[root].usable()]
1268                 for keep_service in services_to_try:
1269                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1270                     if blob is not None:
1271                         break
1272                 loop.save_result((blob, len(services_to_try)))
1273
1274             # Always cache the result, then return it if we succeeded.
1275             if loop.success():
1276                 return blob
1277         finally:
1278             if slot is not None:
1279                 self.block_cache.set(slot, blob)
1280
1281         # Q: Including 403 is necessary for the Keep tests to continue
1282         # passing, but maybe they should expect KeepReadError instead?
1283         not_founds = sum(1 for key in sorted_roots
1284                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1285         service_errors = ((key, roots_map[key].last_result()['error'])
1286                           for key in sorted_roots)
1287         if not roots_map:
1288             raise arvados.errors.KeepReadError(
1289                 "[{}] failed to read {}: no Keep services available ({})".format(
1290                     request_id, loc_s, loop.last_result()))
1291         elif not_founds == len(sorted_roots):
1292             raise arvados.errors.NotFoundError(
1293                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1294         else:
1295             raise arvados.errors.KeepReadError(
1296                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1297
1298     @retry.retry_method
1299     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1300         """Save data in Keep.
1301
1302         This method will get a list of Keep services from the API server, and
1303         send the data to each one simultaneously in a new thread.  Once the
1304         uploads are finished, if enough copies are saved, this method returns
1305         the most recent HTTP response body.  If requests fail to upload
1306         enough copies, this method raises KeepWriteError.
1307
1308         Arguments:
1309         * data: The string of data to upload.
1310         * copies: The number of copies that the user requires be saved.
1311           Default 2.
1312         * num_retries: The number of times to retry PUT requests to
1313           *each* Keep server if it returns temporary failures, with
1314           exponential backoff.  The default value is set when the
1315           KeepClient is initialized.
1316         * classes: An optional list of storage class names where copies should
1317           be written.
1318         """
1319
1320         classes = classes or self._default_classes
1321
1322         if not isinstance(data, bytes):
1323             data = data.encode()
1324
1325         self.put_counter.add(1)
1326
1327         data_hash = hashlib.md5(data).hexdigest()
1328         loc_s = data_hash + '+' + str(len(data))
1329         if copies < 1:
1330             return loc_s
1331         locator = KeepLocator(loc_s)
1332
1333         request_id = (request_id or
1334                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1335                       arvados.util.new_request_id())
1336         headers = {
1337             'X-Request-Id': request_id,
1338             'X-Keep-Desired-Replicas': str(copies),
1339         }
1340         roots_map = {}
1341         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1342                                backoff_start=2)
1343         done_copies = 0
1344         done_classes = []
1345         for tries_left in loop:
1346             try:
1347                 sorted_roots = self.map_new_services(
1348                     roots_map, locator,
1349                     force_rebuild=(tries_left < num_retries),
1350                     need_writable=True,
1351                     headers=headers)
1352             except Exception as error:
1353                 loop.save_result(error)
1354                 continue
1355
1356             pending_classes = []
1357             if done_classes is not None:
1358                 pending_classes = list(set(classes) - set(done_classes))
1359             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1360                                                         data_hash=data_hash,
1361                                                         copies=copies - done_copies,
1362                                                         max_service_replicas=self.max_replicas_per_service,
1363                                                         timeout=self.current_timeout(num_retries - tries_left),
1364                                                         classes=pending_classes)
1365             for service_root, ks in [(root, roots_map[root])
1366                                      for root in sorted_roots]:
1367                 if ks.finished():
1368                     continue
1369                 writer_pool.add_task(ks, service_root)
1370             writer_pool.join()
1371             pool_copies, pool_classes = writer_pool.done()
1372             done_copies += pool_copies
1373             if (done_classes is not None) and (pool_classes is not None):
1374                 done_classes += pool_classes
1375                 loop.save_result(
1376                     (done_copies >= copies and set(done_classes) == set(classes),
1377                     writer_pool.total_task_nr))
1378             else:
1379                 # Old keepstore contacted without storage classes support:
1380                 # success is determined only by successful copies.
1381                 #
1382                 # Disable storage classes tracking from this point forward.
1383                 if not self._storage_classes_unsupported_warning:
1384                     self._storage_classes_unsupported_warning = True
1385                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1386                 done_classes = None
1387                 loop.save_result(
1388                     (done_copies >= copies, writer_pool.total_task_nr))
1389
1390         if loop.success():
1391             return writer_pool.response()
1392         if not roots_map:
1393             raise arvados.errors.KeepWriteError(
1394                 "[{}] failed to write {}: no Keep services available ({})".format(
1395                     request_id, data_hash, loop.last_result()))
1396         else:
1397             service_errors = ((key, roots_map[key].last_result()['error'])
1398                               for key in sorted_roots
1399                               if roots_map[key].last_result()['error'])
1400             raise arvados.errors.KeepWriteError(
1401                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1402                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1403
1404     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1405         """A stub for put().
1406
1407         This method is used in place of the real put() method when
1408         using local storage (see constructor's local_store argument).
1409
1410         copies and num_retries arguments are ignored: they are here
1411         only for the sake of offering the same call signature as
1412         put().
1413
1414         Data stored this way can be retrieved via local_store_get().
1415         """
1416         md5 = hashlib.md5(data).hexdigest()
1417         locator = '%s+%d' % (md5, len(data))
1418         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1419             f.write(data)
1420         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1421                   os.path.join(self.local_store, md5))
1422         return locator
1423
1424     def local_store_get(self, loc_s, num_retries=None):
1425         """Companion to local_store_put()."""
1426         try:
1427             locator = KeepLocator(loc_s)
1428         except ValueError:
1429             raise arvados.errors.NotFoundError(
1430                 "Invalid data locator: '%s'" % loc_s)
1431         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1432             return b''
1433         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1434             return f.read()
1435
1436     def local_store_head(self, loc_s, num_retries=None):
1437         """Companion to local_store_put()."""
1438         try:
1439             locator = KeepLocator(loc_s)
1440         except ValueError:
1441             raise arvados.errors.NotFoundError(
1442                 "Invalid data locator: '%s'" % loc_s)
1443         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1444             return True
1445         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1446             return True