18842: Clean up keep cache set() a little bit
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34
35 if sys.version_info >= (3, 0):
36     from io import BytesIO
37 else:
38     from cStringIO import StringIO as BytesIO
39
40 import arvados
41 import arvados.config as config
42 import arvados.errors
43 import arvados.retry as retry
44 import arvados.util
45 import arvados.diskcache
46
47 _logger = logging.getLogger('arvados.keep')
48 global_client_object = None
49
50
51 # Monkey patch TCP constants when not available (apple). Values sourced from:
52 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
53 if sys.platform == 'darwin':
54     if not hasattr(socket, 'TCP_KEEPALIVE'):
55         socket.TCP_KEEPALIVE = 0x010
56     if not hasattr(socket, 'TCP_KEEPINTVL'):
57         socket.TCP_KEEPINTVL = 0x101
58     if not hasattr(socket, 'TCP_KEEPCNT'):
59         socket.TCP_KEEPCNT = 0x102
60
61
62 class KeepLocator(object):
63     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
64     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
65
66     def __init__(self, locator_str):
67         self.hints = []
68         self._perm_sig = None
69         self._perm_expiry = None
70         pieces = iter(locator_str.split('+'))
71         self.md5sum = next(pieces)
72         try:
73             self.size = int(next(pieces))
74         except StopIteration:
75             self.size = None
76         for hint in pieces:
77             if self.HINT_RE.match(hint) is None:
78                 raise ValueError("invalid hint format: {}".format(hint))
79             elif hint.startswith('A'):
80                 self.parse_permission_hint(hint)
81             else:
82                 self.hints.append(hint)
83
84     def __str__(self):
85         return '+'.join(
86             native_str(s)
87             for s in [self.md5sum, self.size,
88                       self.permission_hint()] + self.hints
89             if s is not None)
90
91     def stripped(self):
92         if self.size is not None:
93             return "%s+%i" % (self.md5sum, self.size)
94         else:
95             return self.md5sum
96
97     def _make_hex_prop(name, length):
98         # Build and return a new property with the given name that
99         # must be a hex string of the given length.
100         data_name = '_{}'.format(name)
101         def getter(self):
102             return getattr(self, data_name)
103         def setter(self, hex_str):
104             if not arvados.util.is_hex(hex_str, length):
105                 raise ValueError("{} is not a {}-digit hex string: {!r}".
106                                  format(name, length, hex_str))
107             setattr(self, data_name, hex_str)
108         return property(getter, setter)
109
110     md5sum = _make_hex_prop('md5sum', 32)
111     perm_sig = _make_hex_prop('perm_sig', 40)
112
113     @property
114     def perm_expiry(self):
115         return self._perm_expiry
116
117     @perm_expiry.setter
118     def perm_expiry(self, value):
119         if not arvados.util.is_hex(value, 1, 8):
120             raise ValueError(
121                 "permission timestamp must be a hex Unix timestamp: {}".
122                 format(value))
123         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
124
125     def permission_hint(self):
126         data = [self.perm_sig, self.perm_expiry]
127         if None in data:
128             return None
129         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
130         return "A{}@{:08x}".format(*data)
131
132     def parse_permission_hint(self, s):
133         try:
134             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
135         except IndexError:
136             raise ValueError("bad permission hint {}".format(s))
137
138     def permission_expired(self, as_of_dt=None):
139         if self.perm_expiry is None:
140             return False
141         elif as_of_dt is None:
142             as_of_dt = datetime.datetime.now()
143         return self.perm_expiry <= as_of_dt
144
145
146 class Keep(object):
147     """Simple interface to a global KeepClient object.
148
149     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
150     own API client.  The global KeepClient will build an API client from the
151     current Arvados configuration, which may not match the one you built.
152     """
153     _last_key = None
154
155     @classmethod
156     def global_client_object(cls):
157         global global_client_object
158         # Previously, KeepClient would change its behavior at runtime based
159         # on these configuration settings.  We simulate that behavior here
160         # by checking the values and returning a new KeepClient if any of
161         # them have changed.
162         key = (config.get('ARVADOS_API_HOST'),
163                config.get('ARVADOS_API_TOKEN'),
164                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
165                config.get('ARVADOS_KEEP_PROXY'),
166                os.environ.get('KEEP_LOCAL_STORE'))
167         if (global_client_object is None) or (cls._last_key != key):
168             global_client_object = KeepClient()
169             cls._last_key = key
170         return global_client_object
171
172     @staticmethod
173     def get(locator, **kwargs):
174         return Keep.global_client_object().get(locator, **kwargs)
175
176     @staticmethod
177     def put(data, **kwargs):
178         return Keep.global_client_object().put(data, **kwargs)
179
180 class KeepBlockCache(object):
181     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
182         self.cache_max = cache_max
183         self._cache = []
184         self._cache_lock = threading.Lock()
185         self._max_slots = max_slots
186         self._disk_cache = disk_cache
187         self._disk_cache_dir = disk_cache_dir
188
189         if self._disk_cache and self._disk_cache_dir is None:
190             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
191             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
192
193         if self._max_slots == 0:
194             if self._disk_cache:
195                 # default max slots to half of maximum file handles
196                 # NOFILE typically defaults to 1024 on Linux so this
197                 # will be 512 slots.
198                 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
199             else:
200                 # RAM cache slots
201                 self._max_slots = 512
202
203         if self.cache_max == 0:
204             if self._disk_cache:
205                 fs = os.statvfs(self._disk_cache_dir)
206                 # Calculation of available space incorporates existing cache usage
207                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
210                 # pick smallest of:
211                 # 10% of total disk size
212                 # 25% of available space
213                 # max_slots * 64 MiB
214                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
215             else:
216                 # 256 MiB in RAM
217                 self.cache_max = (256 * 1024 * 1024)
218
219         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
220
221         if self._disk_cache:
222             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
223             self.cap_cache()
224
225
226     class CacheSlot(object):
227         __slots__ = ("locator", "ready", "content")
228
229         def __init__(self, locator):
230             self.locator = locator
231             self.ready = threading.Event()
232             self.content = None
233
234         def get(self):
235             self.ready.wait()
236             return self.content
237
238         def set(self, value):
239             self.content = value
240             self.ready.set()
241
242         def size(self):
243             if self.content is None:
244                 return 0
245             else:
246                 return len(self.content)
247
248         def evict(self):
249             return True
250
251     def cap_cache(self):
252         '''Cap the cache size to self.cache_max'''
253         with self._cache_lock:
254             # Select all slots except those where ready.is_set() and content is
255             # None (that means there was an error reading the block).
256             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
257             sm = sum([slot.size() for slot in self._cache])
258             while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
259                 for i in range(len(self._cache)-1, -1, -1):
260                     # start from the back, find a slot that is a candidate to evict
261                     if self._cache[i].ready.is_set():
262                         sz = self._cache[i].size()
263
264                         # If evict returns false it means the
265                         # underlying disk cache couldn't lock the file
266                         # for deletion because another process was using
267                         # it. Don't count it as reducing the amount
268                         # of data in the cache, find something else to
269                         # throw out.
270                         if self._cache[i].evict():
271                             sm -= sz
272
273                         # either way we forget about it.  either the
274                         # other process will delete it, or if we need
275                         # it again and it is still there, we'll find
276                         # it on disk.
277                         del self._cache[i]
278                         break
279
280     def _get(self, locator):
281         # Test if the locator is already in the cache
282         for i in range(0, len(self._cache)):
283             if self._cache[i].locator == locator:
284                 n = self._cache[i]
285                 if i != 0:
286                     # move it to the front
287                     del self._cache[i]
288                     self._cache.insert(0, n)
289                 return n
290         if self._disk_cache:
291             # see if it exists on disk
292             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
293             if n is not None:
294                 self._cache.insert(0, n)
295                 return n
296         return None
297
298     def get(self, locator):
299         with self._cache_lock:
300             return self._get(locator)
301
302     def reserve_cache(self, locator):
303         '''Reserve a cache slot for the specified locator,
304         or return the existing slot.'''
305         with self._cache_lock:
306             n = self._get(locator)
307             if n:
308                 return n, False
309             else:
310                 # Add a new cache slot for the locator
311                 if self._disk_cache:
312                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
313                 else:
314                     n = KeepBlockCache.CacheSlot(locator)
315                 self._cache.insert(0, n)
316                 return n, True
317
318     def set(self, slot, blob):
319         tryagain = False
320
321         try:
322             slot.set(blob)
323         except OSError as e:
324             tryagain = True
325             if e.errno == errno.ENOMEM:
326                 # Reduce max slots to current - 4, cap cache and retry
327                 with self._cache_lock:
328                     self._max_slots = max(4, len(self._cache) - 4)
329             elif e.errno == errno.ENOSPC:
330                 # Reduce disk max space to current - 256 MiB, cap cache and retry
331                 with self._cache_lock:
332                     sm = sum([st.size() for st in self._cache])
333                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
334             elif e.errno == errno.ENODEV:
335                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
336         except Exception as e:
337             tryagain = True
338
339         # Check if we should evict things from the cache.  Either
340         # because we added a new thing or we adjusted the limits down,
341         # so we might need to push something out.
342         self.cap_cache()
343
344         if not tryagain:
345             # Done
346             return
347
348         try:
349             # There was an error, we ran cap_cache so try one more time.
350             slot.set(blob)
351         except Exception as e:
352             # It failed again.  Give up.
353             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
354         finally:
355             # Set the notice that that we are done with the cache
356             # slot one way or another.
357             slot.ready.set()
358
359
360 class Counter(object):
361     def __init__(self, v=0):
362         self._lk = threading.Lock()
363         self._val = v
364
365     def add(self, v):
366         with self._lk:
367             self._val += v
368
369     def get(self):
370         with self._lk:
371             return self._val
372
373
374 class KeepClient(object):
375
376     # Default Keep server connection timeout:  2 seconds
377     # Default Keep server read timeout:       256 seconds
378     # Default Keep server bandwidth minimum:  32768 bytes per second
379     # Default Keep proxy connection timeout:  20 seconds
380     # Default Keep proxy read timeout:        256 seconds
381     # Default Keep proxy bandwidth minimum:   32768 bytes per second
382     DEFAULT_TIMEOUT = (2, 256, 32768)
383     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
384
385
386     class KeepService(object):
387         """Make requests to a single Keep service, and track results.
388
389         A KeepService is intended to last long enough to perform one
390         transaction (GET or PUT) against one Keep service. This can
391         involve calling either get() or put() multiple times in order
392         to retry after transient failures. However, calling both get()
393         and put() on a single instance -- or using the same instance
394         to access two different Keep services -- will not produce
395         sensible behavior.
396         """
397
398         HTTP_ERRORS = (
399             socket.error,
400             ssl.SSLError,
401             arvados.errors.HttpError,
402         )
403
404         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
405                      upload_counter=None,
406                      download_counter=None,
407                      headers={},
408                      insecure=False):
409             self.root = root
410             self._user_agent_pool = user_agent_pool
411             self._result = {'error': None}
412             self._usable = True
413             self._session = None
414             self._socket = None
415             self.get_headers = {'Accept': 'application/octet-stream'}
416             self.get_headers.update(headers)
417             self.put_headers = headers
418             self.upload_counter = upload_counter
419             self.download_counter = download_counter
420             self.insecure = insecure
421
422         def usable(self):
423             """Is it worth attempting a request?"""
424             return self._usable
425
426         def finished(self):
427             """Did the request succeed or encounter permanent failure?"""
428             return self._result['error'] == False or not self._usable
429
430         def last_result(self):
431             return self._result
432
433         def _get_user_agent(self):
434             try:
435                 return self._user_agent_pool.get(block=False)
436             except queue.Empty:
437                 return pycurl.Curl()
438
439         def _put_user_agent(self, ua):
440             try:
441                 ua.reset()
442                 self._user_agent_pool.put(ua, block=False)
443             except:
444                 ua.close()
445
446         def _socket_open(self, *args, **kwargs):
447             if len(args) + len(kwargs) == 2:
448                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
449             else:
450                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
451
452         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
453             return self._socket_open_pycurl_7_21_5(
454                 purpose=None,
455                 address=collections.namedtuple(
456                     'Address', ['family', 'socktype', 'protocol', 'addr'],
457                 )(family, socktype, protocol, address))
458
459         def _socket_open_pycurl_7_21_5(self, purpose, address):
460             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
461             s = socket.socket(address.family, address.socktype, address.protocol)
462             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
463             # Will throw invalid protocol error on mac. This test prevents that.
464             if hasattr(socket, 'TCP_KEEPIDLE'):
465                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
466             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
467             self._socket = s
468             return s
469
470         def get(self, locator, method="GET", timeout=None):
471             # locator is a KeepLocator object.
472             url = self.root + str(locator)
473             _logger.debug("Request: %s %s", method, url)
474             curl = self._get_user_agent()
475             ok = None
476             try:
477                 with timer.Timer() as t:
478                     self._headers = {}
479                     response_body = BytesIO()
480                     curl.setopt(pycurl.NOSIGNAL, 1)
481                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
482                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
483                     curl.setopt(pycurl.URL, url.encode('utf-8'))
484                     curl.setopt(pycurl.HTTPHEADER, [
485                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
486                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
487                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
488                     if self.insecure:
489                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
490                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
491                     else:
492                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
493                     if method == "HEAD":
494                         curl.setopt(pycurl.NOBODY, True)
495                     self._setcurltimeouts(curl, timeout, method=="HEAD")
496
497                     try:
498                         curl.perform()
499                     except Exception as e:
500                         raise arvados.errors.HttpError(0, str(e))
501                     finally:
502                         if self._socket:
503                             self._socket.close()
504                             self._socket = None
505                     self._result = {
506                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
507                         'body': response_body.getvalue(),
508                         'headers': self._headers,
509                         'error': False,
510                     }
511
512                 ok = retry.check_http_response_success(self._result['status_code'])
513                 if not ok:
514                     self._result['error'] = arvados.errors.HttpError(
515                         self._result['status_code'],
516                         self._headers.get('x-status-line', 'Error'))
517             except self.HTTP_ERRORS as e:
518                 self._result = {
519                     'error': e,
520                 }
521             self._usable = ok != False
522             if self._result.get('status_code', None):
523                 # The client worked well enough to get an HTTP status
524                 # code, so presumably any problems are just on the
525                 # server side and it's OK to reuse the client.
526                 self._put_user_agent(curl)
527             else:
528                 # Don't return this client to the pool, in case it's
529                 # broken.
530                 curl.close()
531             if not ok:
532                 _logger.debug("Request fail: GET %s => %s: %s",
533                               url, type(self._result['error']), str(self._result['error']))
534                 return None
535             if method == "HEAD":
536                 _logger.info("HEAD %s: %s bytes",
537                          self._result['status_code'],
538                          self._result.get('content-length'))
539                 if self._result['headers'].get('x-keep-locator'):
540                     # This is a response to a remote block copy request, return
541                     # the local copy block locator.
542                     return self._result['headers'].get('x-keep-locator')
543                 return True
544
545             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
546                          self._result['status_code'],
547                          len(self._result['body']),
548                          t.msecs,
549                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
550
551             if self.download_counter:
552                 self.download_counter.add(len(self._result['body']))
553             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
554             if resp_md5 != locator.md5sum:
555                 _logger.warning("Checksum fail: md5(%s) = %s",
556                                 url, resp_md5)
557                 self._result['error'] = arvados.errors.HttpError(
558                     0, 'Checksum fail')
559                 return None
560             return self._result['body']
561
562         def put(self, hash_s, body, timeout=None, headers={}):
563             put_headers = copy.copy(self.put_headers)
564             put_headers.update(headers)
565             url = self.root + hash_s
566             _logger.debug("Request: PUT %s", url)
567             curl = self._get_user_agent()
568             ok = None
569             try:
570                 with timer.Timer() as t:
571                     self._headers = {}
572                     body_reader = BytesIO(body)
573                     response_body = BytesIO()
574                     curl.setopt(pycurl.NOSIGNAL, 1)
575                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
576                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
577                     curl.setopt(pycurl.URL, url.encode('utf-8'))
578                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
579                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
580                     # response) instead of sending the request body immediately.
581                     # This allows the server to reject the request if the request
582                     # is invalid or the server is read-only, without waiting for
583                     # the client to send the entire block.
584                     curl.setopt(pycurl.UPLOAD, True)
585                     curl.setopt(pycurl.INFILESIZE, len(body))
586                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
587                     curl.setopt(pycurl.HTTPHEADER, [
588                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
589                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
590                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
591                     if self.insecure:
592                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
593                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
594                     else:
595                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
596                     self._setcurltimeouts(curl, timeout)
597                     try:
598                         curl.perform()
599                     except Exception as e:
600                         raise arvados.errors.HttpError(0, str(e))
601                     finally:
602                         if self._socket:
603                             self._socket.close()
604                             self._socket = None
605                     self._result = {
606                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
607                         'body': response_body.getvalue().decode('utf-8'),
608                         'headers': self._headers,
609                         'error': False,
610                     }
611                 ok = retry.check_http_response_success(self._result['status_code'])
612                 if not ok:
613                     self._result['error'] = arvados.errors.HttpError(
614                         self._result['status_code'],
615                         self._headers.get('x-status-line', 'Error'))
616             except self.HTTP_ERRORS as e:
617                 self._result = {
618                     'error': e,
619                 }
620             self._usable = ok != False # still usable if ok is True or None
621             if self._result.get('status_code', None):
622                 # Client is functional. See comment in get().
623                 self._put_user_agent(curl)
624             else:
625                 curl.close()
626             if not ok:
627                 _logger.debug("Request fail: PUT %s => %s: %s",
628                               url, type(self._result['error']), str(self._result['error']))
629                 return False
630             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
631                          self._result['status_code'],
632                          len(body),
633                          t.msecs,
634                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
635             if self.upload_counter:
636                 self.upload_counter.add(len(body))
637             return True
638
639         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
640             if not timeouts:
641                 return
642             elif isinstance(timeouts, tuple):
643                 if len(timeouts) == 2:
644                     conn_t, xfer_t = timeouts
645                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
646                 else:
647                     conn_t, xfer_t, bandwidth_bps = timeouts
648             else:
649                 conn_t, xfer_t = (timeouts, timeouts)
650                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
651             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
652             if not ignore_bandwidth:
653                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
654                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
655
656         def _headerfunction(self, header_line):
657             if isinstance(header_line, bytes):
658                 header_line = header_line.decode('iso-8859-1')
659             if ':' in header_line:
660                 name, value = header_line.split(':', 1)
661                 name = name.strip().lower()
662                 value = value.strip()
663             elif self._headers:
664                 name = self._lastheadername
665                 value = self._headers[name] + ' ' + header_line.strip()
666             elif header_line.startswith('HTTP/'):
667                 name = 'x-status-line'
668                 value = header_line
669             else:
670                 _logger.error("Unexpected header line: %s", header_line)
671                 return
672             self._lastheadername = name
673             self._headers[name] = value
674             # Returning None implies all bytes were written
675
676
677     class KeepWriterQueue(queue.Queue):
678         def __init__(self, copies, classes=[]):
679             queue.Queue.__init__(self) # Old-style superclass
680             self.wanted_copies = copies
681             self.wanted_storage_classes = classes
682             self.successful_copies = 0
683             self.confirmed_storage_classes = {}
684             self.response = None
685             self.storage_classes_tracking = True
686             self.queue_data_lock = threading.RLock()
687             self.pending_tries = max(copies, len(classes))
688             self.pending_tries_notification = threading.Condition()
689
690         def write_success(self, response, replicas_nr, classes_confirmed):
691             with self.queue_data_lock:
692                 self.successful_copies += replicas_nr
693                 if classes_confirmed is None:
694                     self.storage_classes_tracking = False
695                 elif self.storage_classes_tracking:
696                     for st_class, st_copies in classes_confirmed.items():
697                         try:
698                             self.confirmed_storage_classes[st_class] += st_copies
699                         except KeyError:
700                             self.confirmed_storage_classes[st_class] = st_copies
701                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
702                 self.response = response
703             with self.pending_tries_notification:
704                 self.pending_tries_notification.notify_all()
705
706         def write_fail(self, ks):
707             with self.pending_tries_notification:
708                 self.pending_tries += 1
709                 self.pending_tries_notification.notify()
710
711         def pending_copies(self):
712             with self.queue_data_lock:
713                 return self.wanted_copies - self.successful_copies
714
715         def satisfied_classes(self):
716             with self.queue_data_lock:
717                 if not self.storage_classes_tracking:
718                     # Notifies disabled storage classes expectation to
719                     # the outer loop.
720                     return None
721             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
722
723         def pending_classes(self):
724             with self.queue_data_lock:
725                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
726                     return []
727                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
728                 for st_class, st_copies in self.confirmed_storage_classes.items():
729                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
730                         unsatisfied_classes.remove(st_class)
731                 return unsatisfied_classes
732
733         def get_next_task(self):
734             with self.pending_tries_notification:
735                 while True:
736                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
737                         # This notify_all() is unnecessary --
738                         # write_success() already called notify_all()
739                         # when pending<1 became true, so it's not
740                         # possible for any other thread to be in
741                         # wait() now -- but it's cheap insurance
742                         # against deadlock so we do it anyway:
743                         self.pending_tries_notification.notify_all()
744                         # Drain the queue and then raise Queue.Empty
745                         while True:
746                             self.get_nowait()
747                             self.task_done()
748                     elif self.pending_tries > 0:
749                         service, service_root = self.get_nowait()
750                         if service.finished():
751                             self.task_done()
752                             continue
753                         self.pending_tries -= 1
754                         return service, service_root
755                     elif self.empty():
756                         self.pending_tries_notification.notify_all()
757                         raise queue.Empty
758                     else:
759                         self.pending_tries_notification.wait()
760
761
762     class KeepWriterThreadPool(object):
763         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
764             self.total_task_nr = 0
765             if (not max_service_replicas) or (max_service_replicas >= copies):
766                 num_threads = 1
767             else:
768                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
769             _logger.debug("Pool max threads is %d", num_threads)
770             self.workers = []
771             self.queue = KeepClient.KeepWriterQueue(copies, classes)
772             # Create workers
773             for _ in range(num_threads):
774                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
775                 self.workers.append(w)
776
777         def add_task(self, ks, service_root):
778             self.queue.put((ks, service_root))
779             self.total_task_nr += 1
780
781         def done(self):
782             return self.queue.successful_copies, self.queue.satisfied_classes()
783
784         def join(self):
785             # Start workers
786             for worker in self.workers:
787                 worker.start()
788             # Wait for finished work
789             self.queue.join()
790
791         def response(self):
792             return self.queue.response
793
794
795     class KeepWriterThread(threading.Thread):
796         class TaskFailed(RuntimeError): pass
797
798         def __init__(self, queue, data, data_hash, timeout=None):
799             super(KeepClient.KeepWriterThread, self).__init__()
800             self.timeout = timeout
801             self.queue = queue
802             self.data = data
803             self.data_hash = data_hash
804             self.daemon = True
805
806         def run(self):
807             while True:
808                 try:
809                     service, service_root = self.queue.get_next_task()
810                 except queue.Empty:
811                     return
812                 try:
813                     locator, copies, classes = self.do_task(service, service_root)
814                 except Exception as e:
815                     if not isinstance(e, self.TaskFailed):
816                         _logger.exception("Exception in KeepWriterThread")
817                     self.queue.write_fail(service)
818                 else:
819                     self.queue.write_success(locator, copies, classes)
820                 finally:
821                     self.queue.task_done()
822
823         def do_task(self, service, service_root):
824             classes = self.queue.pending_classes()
825             headers = {}
826             if len(classes) > 0:
827                 classes.sort()
828                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
829             success = bool(service.put(self.data_hash,
830                                         self.data,
831                                         timeout=self.timeout,
832                                         headers=headers))
833             result = service.last_result()
834
835             if not success:
836                 if result.get('status_code'):
837                     _logger.debug("Request fail: PUT %s => %s %s",
838                                   self.data_hash,
839                                   result.get('status_code'),
840                                   result.get('body'))
841                 raise self.TaskFailed()
842
843             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
844                           str(threading.current_thread()),
845                           self.data_hash,
846                           len(self.data),
847                           service_root)
848             try:
849                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
850             except (KeyError, ValueError):
851                 replicas_stored = 1
852
853             classes_confirmed = {}
854             try:
855                 scch = result['headers']['x-keep-storage-classes-confirmed']
856                 for confirmation in scch.replace(' ', '').split(','):
857                     if '=' in confirmation:
858                         stored_class, stored_copies = confirmation.split('=')[:2]
859                         classes_confirmed[stored_class] = int(stored_copies)
860             except (KeyError, ValueError):
861                 # Storage classes confirmed header missing or corrupt
862                 classes_confirmed = None
863
864             return result['body'].strip(), replicas_stored, classes_confirmed
865
866
867     def __init__(self, api_client=None, proxy=None,
868                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
869                  api_token=None, local_store=None, block_cache=None,
870                  num_retries=0, session=None):
871         """Initialize a new KeepClient.
872
873         Arguments:
874         :api_client:
875           The API client to use to find Keep services.  If not
876           provided, KeepClient will build one from available Arvados
877           configuration.
878
879         :proxy:
880           If specified, this KeepClient will send requests to this Keep
881           proxy.  Otherwise, KeepClient will fall back to the setting of the
882           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
883           If you want to KeepClient does not use a proxy, pass in an empty
884           string.
885
886         :timeout:
887           The initial timeout (in seconds) for HTTP requests to Keep
888           non-proxy servers.  A tuple of three floats is interpreted as
889           (connection_timeout, read_timeout, minimum_bandwidth). A connection
890           will be aborted if the average traffic rate falls below
891           minimum_bandwidth bytes per second over an interval of read_timeout
892           seconds. Because timeouts are often a result of transient server
893           load, the actual connection timeout will be increased by a factor
894           of two on each retry.
895           Default: (2, 256, 32768).
896
897         :proxy_timeout:
898           The initial timeout (in seconds) for HTTP requests to
899           Keep proxies. A tuple of three floats is interpreted as
900           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
901           described above for adjusting connection timeouts on retry also
902           applies.
903           Default: (20, 256, 32768).
904
905         :api_token:
906           If you're not using an API client, but only talking
907           directly to a Keep proxy, this parameter specifies an API token
908           to authenticate Keep requests.  It is an error to specify both
909           api_client and api_token.  If you specify neither, KeepClient
910           will use one available from the Arvados configuration.
911
912         :local_store:
913           If specified, this KeepClient will bypass Keep
914           services, and save data to the named directory.  If unspecified,
915           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
916           environment variable.  If you want to ensure KeepClient does not
917           use local storage, pass in an empty string.  This is primarily
918           intended to mock a server for testing.
919
920         :num_retries:
921           The default number of times to retry failed requests.
922           This will be used as the default num_retries value when get() and
923           put() are called.  Default 0.
924         """
925         self.lock = threading.Lock()
926         if proxy is None:
927             if config.get('ARVADOS_KEEP_SERVICES'):
928                 proxy = config.get('ARVADOS_KEEP_SERVICES')
929             else:
930                 proxy = config.get('ARVADOS_KEEP_PROXY')
931         if api_token is None:
932             if api_client is None:
933                 api_token = config.get('ARVADOS_API_TOKEN')
934             else:
935                 api_token = api_client.api_token
936         elif api_client is not None:
937             raise ValueError(
938                 "can't build KeepClient with both API client and token")
939         if local_store is None:
940             local_store = os.environ.get('KEEP_LOCAL_STORE')
941
942         if api_client is None:
943             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
944         else:
945             self.insecure = api_client.insecure
946
947         self.block_cache = block_cache if block_cache else KeepBlockCache()
948         self.timeout = timeout
949         self.proxy_timeout = proxy_timeout
950         self._user_agent_pool = queue.LifoQueue()
951         self.upload_counter = Counter()
952         self.download_counter = Counter()
953         self.put_counter = Counter()
954         self.get_counter = Counter()
955         self.hits_counter = Counter()
956         self.misses_counter = Counter()
957         self._storage_classes_unsupported_warning = False
958         self._default_classes = []
959
960         if local_store:
961             self.local_store = local_store
962             self.head = self.local_store_head
963             self.get = self.local_store_get
964             self.put = self.local_store_put
965         else:
966             self.num_retries = num_retries
967             self.max_replicas_per_service = None
968             if proxy:
969                 proxy_uris = proxy.split()
970                 for i in range(len(proxy_uris)):
971                     if not proxy_uris[i].endswith('/'):
972                         proxy_uris[i] += '/'
973                     # URL validation
974                     url = urllib.parse.urlparse(proxy_uris[i])
975                     if not (url.scheme and url.netloc):
976                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
977                 self.api_token = api_token
978                 self._gateway_services = {}
979                 self._keep_services = [{
980                     'uuid': "00000-bi6l4-%015d" % idx,
981                     'service_type': 'proxy',
982                     '_service_root': uri,
983                     } for idx, uri in enumerate(proxy_uris)]
984                 self._writable_services = self._keep_services
985                 self.using_proxy = True
986                 self._static_services_list = True
987             else:
988                 # It's important to avoid instantiating an API client
989                 # unless we actually need one, for testing's sake.
990                 if api_client is None:
991                     api_client = arvados.api('v1')
992                 self.api_client = api_client
993                 self.api_token = api_client.api_token
994                 self._gateway_services = {}
995                 self._keep_services = None
996                 self._writable_services = None
997                 self.using_proxy = None
998                 self._static_services_list = False
999                 try:
1000                     self._default_classes = [
1001                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
1002                 except KeyError:
1003                     # We're talking to an old cluster
1004                     pass
1005
1006     def current_timeout(self, attempt_number):
1007         """Return the appropriate timeout to use for this client.
1008
1009         The proxy timeout setting if the backend service is currently a proxy,
1010         the regular timeout setting otherwise.  The `attempt_number` indicates
1011         how many times the operation has been tried already (starting from 0
1012         for the first try), and scales the connection timeout portion of the
1013         return value accordingly.
1014
1015         """
1016         # TODO(twp): the timeout should be a property of a
1017         # KeepService, not a KeepClient. See #4488.
1018         t = self.proxy_timeout if self.using_proxy else self.timeout
1019         if len(t) == 2:
1020             return (t[0] * (1 << attempt_number), t[1])
1021         else:
1022             return (t[0] * (1 << attempt_number), t[1], t[2])
1023     def _any_nondisk_services(self, service_list):
1024         return any(ks.get('service_type', 'disk') != 'disk'
1025                    for ks in service_list)
1026
1027     def build_services_list(self, force_rebuild=False):
1028         if (self._static_services_list or
1029               (self._keep_services and not force_rebuild)):
1030             return
1031         with self.lock:
1032             try:
1033                 keep_services = self.api_client.keep_services().accessible()
1034             except Exception:  # API server predates Keep services.
1035                 keep_services = self.api_client.keep_disks().list()
1036
1037             # Gateway services are only used when specified by UUID,
1038             # so there's nothing to gain by filtering them by
1039             # service_type.
1040             self._gateway_services = {ks['uuid']: ks for ks in
1041                                       keep_services.execute()['items']}
1042             if not self._gateway_services:
1043                 raise arvados.errors.NoKeepServersError()
1044
1045             # Precompute the base URI for each service.
1046             for r in self._gateway_services.values():
1047                 host = r['service_host']
1048                 if not host.startswith('[') and host.find(':') >= 0:
1049                     # IPv6 URIs must be formatted like http://[::1]:80/...
1050                     host = '[' + host + ']'
1051                 r['_service_root'] = "{}://{}:{:d}/".format(
1052                     'https' if r['service_ssl_flag'] else 'http',
1053                     host,
1054                     r['service_port'])
1055
1056             _logger.debug(str(self._gateway_services))
1057             self._keep_services = [
1058                 ks for ks in self._gateway_services.values()
1059                 if not ks.get('service_type', '').startswith('gateway:')]
1060             self._writable_services = [ks for ks in self._keep_services
1061                                        if not ks.get('read_only')]
1062
1063             # For disk type services, max_replicas_per_service is 1
1064             # It is unknown (unlimited) for other service types.
1065             if self._any_nondisk_services(self._writable_services):
1066                 self.max_replicas_per_service = None
1067             else:
1068                 self.max_replicas_per_service = 1
1069
1070     def _service_weight(self, data_hash, service_uuid):
1071         """Compute the weight of a Keep service endpoint for a data
1072         block with a known hash.
1073
1074         The weight is md5(h + u) where u is the last 15 characters of
1075         the service endpoint's UUID.
1076         """
1077         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1078
1079     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1080         """Return an array of Keep service endpoints, in the order in
1081         which they should be probed when reading or writing data with
1082         the given hash+hints.
1083         """
1084         self.build_services_list(force_rebuild)
1085
1086         sorted_roots = []
1087         # Use the services indicated by the given +K@... remote
1088         # service hints, if any are present and can be resolved to a
1089         # URI.
1090         for hint in locator.hints:
1091             if hint.startswith('K@'):
1092                 if len(hint) == 7:
1093                     sorted_roots.append(
1094                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1095                 elif len(hint) == 29:
1096                     svc = self._gateway_services.get(hint[2:])
1097                     if svc:
1098                         sorted_roots.append(svc['_service_root'])
1099
1100         # Sort the available local services by weight (heaviest first)
1101         # for this locator, and return their service_roots (base URIs)
1102         # in that order.
1103         use_services = self._keep_services
1104         if need_writable:
1105             use_services = self._writable_services
1106         self.using_proxy = self._any_nondisk_services(use_services)
1107         sorted_roots.extend([
1108             svc['_service_root'] for svc in sorted(
1109                 use_services,
1110                 reverse=True,
1111                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1112         _logger.debug("{}: {}".format(locator, sorted_roots))
1113         return sorted_roots
1114
1115     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1116         # roots_map is a dictionary, mapping Keep service root strings
1117         # to KeepService objects.  Poll for Keep services, and add any
1118         # new ones to roots_map.  Return the current list of local
1119         # root strings.
1120         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1121         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1122         for root in local_roots:
1123             if root not in roots_map:
1124                 roots_map[root] = self.KeepService(
1125                     root, self._user_agent_pool,
1126                     upload_counter=self.upload_counter,
1127                     download_counter=self.download_counter,
1128                     headers=headers,
1129                     insecure=self.insecure)
1130         return local_roots
1131
1132     @staticmethod
1133     def _check_loop_result(result):
1134         # KeepClient RetryLoops should save results as a 2-tuple: the
1135         # actual result of the request, and the number of servers available
1136         # to receive the request this round.
1137         # This method returns True if there's a real result, False if
1138         # there are no more servers available, otherwise None.
1139         if isinstance(result, Exception):
1140             return None
1141         result, tried_server_count = result
1142         if (result is not None) and (result is not False):
1143             return True
1144         elif tried_server_count < 1:
1145             _logger.info("No more Keep services to try; giving up")
1146             return False
1147         else:
1148             return None
1149
1150     def get_from_cache(self, loc_s):
1151         """Fetch a block only if is in the cache, otherwise return None."""
1152         locator = KeepLocator(loc_s)
1153         slot = self.block_cache.get(locator.md5sum)
1154         if slot is not None and slot.ready.is_set():
1155             return slot.get()
1156         else:
1157             return None
1158
1159     def refresh_signature(self, loc):
1160         """Ask Keep to get the remote block and return its local signature"""
1161         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1162         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1163
1164     @retry.retry_method
1165     def head(self, loc_s, **kwargs):
1166         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1167
1168     @retry.retry_method
1169     def get(self, loc_s, **kwargs):
1170         return self._get_or_head(loc_s, method="GET", **kwargs)
1171
1172     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1173         """Get data from Keep.
1174
1175         This method fetches one or more blocks of data from Keep.  It
1176         sends a request each Keep service registered with the API
1177         server (or the proxy provided when this client was
1178         instantiated), then each service named in location hints, in
1179         sequence.  As soon as one service provides the data, it's
1180         returned.
1181
1182         Arguments:
1183         * loc_s: A string of one or more comma-separated locators to fetch.
1184           This method returns the concatenation of these blocks.
1185         * num_retries: The number of times to retry GET requests to
1186           *each* Keep server if it returns temporary failures, with
1187           exponential backoff.  Note that, in each loop, the method may try
1188           to fetch data from every available Keep service, along with any
1189           that are named in location hints in the locator.  The default value
1190           is set when the KeepClient is initialized.
1191         """
1192         if ',' in loc_s:
1193             return ''.join(self.get(x) for x in loc_s.split(','))
1194
1195         self.get_counter.add(1)
1196
1197         request_id = (request_id or
1198                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1199                       arvados.util.new_request_id())
1200         if headers is None:
1201             headers = {}
1202         headers['X-Request-Id'] = request_id
1203
1204         slot = None
1205         blob = None
1206         try:
1207             locator = KeepLocator(loc_s)
1208             if method == "GET":
1209                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1210                 if not first:
1211                     if prefetch:
1212                         # this is request for a prefetch, if it is
1213                         # already in flight, return immediately.
1214                         # clear 'slot' to prevent finally block from
1215                         # calling slot.set()
1216                         slot = None
1217                         return None
1218                     self.hits_counter.add(1)
1219                     blob = slot.get()
1220                     if blob is None:
1221                         raise arvados.errors.KeepReadError(
1222                             "failed to read {}".format(loc_s))
1223                     return blob
1224
1225             self.misses_counter.add(1)
1226
1227             # If the locator has hints specifying a prefix (indicating a
1228             # remote keepproxy) or the UUID of a local gateway service,
1229             # read data from the indicated service(s) instead of the usual
1230             # list of local disk services.
1231             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1232                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1233             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1234                                for hint in locator.hints if (
1235                                        hint.startswith('K@') and
1236                                        len(hint) == 29 and
1237                                        self._gateway_services.get(hint[2:])
1238                                        )])
1239             # Map root URLs to their KeepService objects.
1240             roots_map = {
1241                 root: self.KeepService(root, self._user_agent_pool,
1242                                        upload_counter=self.upload_counter,
1243                                        download_counter=self.download_counter,
1244                                        headers=headers,
1245                                        insecure=self.insecure)
1246                 for root in hint_roots
1247             }
1248
1249             # See #3147 for a discussion of the loop implementation.  Highlights:
1250             # * Refresh the list of Keep services after each failure, in case
1251             #   it's being updated.
1252             # * Retry until we succeed, we're out of retries, or every available
1253             #   service has returned permanent failure.
1254             sorted_roots = []
1255             roots_map = {}
1256             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1257                                    backoff_start=2)
1258             for tries_left in loop:
1259                 try:
1260                     sorted_roots = self.map_new_services(
1261                         roots_map, locator,
1262                         force_rebuild=(tries_left < num_retries),
1263                         need_writable=False,
1264                         headers=headers)
1265                 except Exception as error:
1266                     loop.save_result(error)
1267                     continue
1268
1269                 # Query KeepService objects that haven't returned
1270                 # permanent failure, in our specified shuffle order.
1271                 services_to_try = [roots_map[root]
1272                                    for root in sorted_roots
1273                                    if roots_map[root].usable()]
1274                 for keep_service in services_to_try:
1275                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1276                     if blob is not None:
1277                         break
1278                 loop.save_result((blob, len(services_to_try)))
1279
1280             # Always cache the result, then return it if we succeeded.
1281             if loop.success():
1282                 return blob
1283         finally:
1284             if slot is not None:
1285                 self.block_cache.set(slot, blob)
1286
1287         # Q: Including 403 is necessary for the Keep tests to continue
1288         # passing, but maybe they should expect KeepReadError instead?
1289         not_founds = sum(1 for key in sorted_roots
1290                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1291         service_errors = ((key, roots_map[key].last_result()['error'])
1292                           for key in sorted_roots)
1293         if not roots_map:
1294             raise arvados.errors.KeepReadError(
1295                 "[{}] failed to read {}: no Keep services available ({})".format(
1296                     request_id, loc_s, loop.last_result()))
1297         elif not_founds == len(sorted_roots):
1298             raise arvados.errors.NotFoundError(
1299                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1300         else:
1301             raise arvados.errors.KeepReadError(
1302                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1303
1304     @retry.retry_method
1305     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1306         """Save data in Keep.
1307
1308         This method will get a list of Keep services from the API server, and
1309         send the data to each one simultaneously in a new thread.  Once the
1310         uploads are finished, if enough copies are saved, this method returns
1311         the most recent HTTP response body.  If requests fail to upload
1312         enough copies, this method raises KeepWriteError.
1313
1314         Arguments:
1315         * data: The string of data to upload.
1316         * copies: The number of copies that the user requires be saved.
1317           Default 2.
1318         * num_retries: The number of times to retry PUT requests to
1319           *each* Keep server if it returns temporary failures, with
1320           exponential backoff.  The default value is set when the
1321           KeepClient is initialized.
1322         * classes: An optional list of storage class names where copies should
1323           be written.
1324         """
1325
1326         classes = classes or self._default_classes
1327
1328         if not isinstance(data, bytes):
1329             data = data.encode()
1330
1331         self.put_counter.add(1)
1332
1333         data_hash = hashlib.md5(data).hexdigest()
1334         loc_s = data_hash + '+' + str(len(data))
1335         if copies < 1:
1336             return loc_s
1337         locator = KeepLocator(loc_s)
1338
1339         request_id = (request_id or
1340                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1341                       arvados.util.new_request_id())
1342         headers = {
1343             'X-Request-Id': request_id,
1344             'X-Keep-Desired-Replicas': str(copies),
1345         }
1346         roots_map = {}
1347         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1348                                backoff_start=2)
1349         done_copies = 0
1350         done_classes = []
1351         for tries_left in loop:
1352             try:
1353                 sorted_roots = self.map_new_services(
1354                     roots_map, locator,
1355                     force_rebuild=(tries_left < num_retries),
1356                     need_writable=True,
1357                     headers=headers)
1358             except Exception as error:
1359                 loop.save_result(error)
1360                 continue
1361
1362             pending_classes = []
1363             if done_classes is not None:
1364                 pending_classes = list(set(classes) - set(done_classes))
1365             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1366                                                         data_hash=data_hash,
1367                                                         copies=copies - done_copies,
1368                                                         max_service_replicas=self.max_replicas_per_service,
1369                                                         timeout=self.current_timeout(num_retries - tries_left),
1370                                                         classes=pending_classes)
1371             for service_root, ks in [(root, roots_map[root])
1372                                      for root in sorted_roots]:
1373                 if ks.finished():
1374                     continue
1375                 writer_pool.add_task(ks, service_root)
1376             writer_pool.join()
1377             pool_copies, pool_classes = writer_pool.done()
1378             done_copies += pool_copies
1379             if (done_classes is not None) and (pool_classes is not None):
1380                 done_classes += pool_classes
1381                 loop.save_result(
1382                     (done_copies >= copies and set(done_classes) == set(classes),
1383                     writer_pool.total_task_nr))
1384             else:
1385                 # Old keepstore contacted without storage classes support:
1386                 # success is determined only by successful copies.
1387                 #
1388                 # Disable storage classes tracking from this point forward.
1389                 if not self._storage_classes_unsupported_warning:
1390                     self._storage_classes_unsupported_warning = True
1391                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1392                 done_classes = None
1393                 loop.save_result(
1394                     (done_copies >= copies, writer_pool.total_task_nr))
1395
1396         if loop.success():
1397             return writer_pool.response()
1398         if not roots_map:
1399             raise arvados.errors.KeepWriteError(
1400                 "[{}] failed to write {}: no Keep services available ({})".format(
1401                     request_id, data_hash, loop.last_result()))
1402         else:
1403             service_errors = ((key, roots_map[key].last_result()['error'])
1404                               for key in sorted_roots
1405                               if roots_map[key].last_result()['error'])
1406             raise arvados.errors.KeepWriteError(
1407                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1408                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1409
1410     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1411         """A stub for put().
1412
1413         This method is used in place of the real put() method when
1414         using local storage (see constructor's local_store argument).
1415
1416         copies and num_retries arguments are ignored: they are here
1417         only for the sake of offering the same call signature as
1418         put().
1419
1420         Data stored this way can be retrieved via local_store_get().
1421         """
1422         md5 = hashlib.md5(data).hexdigest()
1423         locator = '%s+%d' % (md5, len(data))
1424         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1425             f.write(data)
1426         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1427                   os.path.join(self.local_store, md5))
1428         return locator
1429
1430     def local_store_get(self, loc_s, num_retries=None):
1431         """Companion to local_store_put()."""
1432         try:
1433             locator = KeepLocator(loc_s)
1434         except ValueError:
1435             raise arvados.errors.NotFoundError(
1436                 "Invalid data locator: '%s'" % loc_s)
1437         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1438             return b''
1439         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1440             return f.read()
1441
1442     def local_store_head(self, loc_s, num_retries=None):
1443         """Companion to local_store_put()."""
1444         try:
1445             locator = KeepLocator(loc_s)
1446         except ValueError:
1447             raise arvados.errors.NotFoundError(
1448                 "Invalid data locator: '%s'" % loc_s)
1449         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1450             return True
1451         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1452             return True