18842: Clean up keep cache set logic a little more
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34
35 if sys.version_info >= (3, 0):
36     from io import BytesIO
37 else:
38     from cStringIO import StringIO as BytesIO
39
40 import arvados
41 import arvados.config as config
42 import arvados.errors
43 import arvados.retry as retry
44 import arvados.util
45 import arvados.diskcache
46
47 _logger = logging.getLogger('arvados.keep')
48 global_client_object = None
49
50
51 # Monkey patch TCP constants when not available (apple). Values sourced from:
52 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
53 if sys.platform == 'darwin':
54     if not hasattr(socket, 'TCP_KEEPALIVE'):
55         socket.TCP_KEEPALIVE = 0x010
56     if not hasattr(socket, 'TCP_KEEPINTVL'):
57         socket.TCP_KEEPINTVL = 0x101
58     if not hasattr(socket, 'TCP_KEEPCNT'):
59         socket.TCP_KEEPCNT = 0x102
60
61
62 class KeepLocator(object):
63     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
64     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
65
66     def __init__(self, locator_str):
67         self.hints = []
68         self._perm_sig = None
69         self._perm_expiry = None
70         pieces = iter(locator_str.split('+'))
71         self.md5sum = next(pieces)
72         try:
73             self.size = int(next(pieces))
74         except StopIteration:
75             self.size = None
76         for hint in pieces:
77             if self.HINT_RE.match(hint) is None:
78                 raise ValueError("invalid hint format: {}".format(hint))
79             elif hint.startswith('A'):
80                 self.parse_permission_hint(hint)
81             else:
82                 self.hints.append(hint)
83
84     def __str__(self):
85         return '+'.join(
86             native_str(s)
87             for s in [self.md5sum, self.size,
88                       self.permission_hint()] + self.hints
89             if s is not None)
90
91     def stripped(self):
92         if self.size is not None:
93             return "%s+%i" % (self.md5sum, self.size)
94         else:
95             return self.md5sum
96
97     def _make_hex_prop(name, length):
98         # Build and return a new property with the given name that
99         # must be a hex string of the given length.
100         data_name = '_{}'.format(name)
101         def getter(self):
102             return getattr(self, data_name)
103         def setter(self, hex_str):
104             if not arvados.util.is_hex(hex_str, length):
105                 raise ValueError("{} is not a {}-digit hex string: {!r}".
106                                  format(name, length, hex_str))
107             setattr(self, data_name, hex_str)
108         return property(getter, setter)
109
110     md5sum = _make_hex_prop('md5sum', 32)
111     perm_sig = _make_hex_prop('perm_sig', 40)
112
113     @property
114     def perm_expiry(self):
115         return self._perm_expiry
116
117     @perm_expiry.setter
118     def perm_expiry(self, value):
119         if not arvados.util.is_hex(value, 1, 8):
120             raise ValueError(
121                 "permission timestamp must be a hex Unix timestamp: {}".
122                 format(value))
123         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
124
125     def permission_hint(self):
126         data = [self.perm_sig, self.perm_expiry]
127         if None in data:
128             return None
129         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
130         return "A{}@{:08x}".format(*data)
131
132     def parse_permission_hint(self, s):
133         try:
134             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
135         except IndexError:
136             raise ValueError("bad permission hint {}".format(s))
137
138     def permission_expired(self, as_of_dt=None):
139         if self.perm_expiry is None:
140             return False
141         elif as_of_dt is None:
142             as_of_dt = datetime.datetime.now()
143         return self.perm_expiry <= as_of_dt
144
145
146 class Keep(object):
147     """Simple interface to a global KeepClient object.
148
149     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
150     own API client.  The global KeepClient will build an API client from the
151     current Arvados configuration, which may not match the one you built.
152     """
153     _last_key = None
154
155     @classmethod
156     def global_client_object(cls):
157         global global_client_object
158         # Previously, KeepClient would change its behavior at runtime based
159         # on these configuration settings.  We simulate that behavior here
160         # by checking the values and returning a new KeepClient if any of
161         # them have changed.
162         key = (config.get('ARVADOS_API_HOST'),
163                config.get('ARVADOS_API_TOKEN'),
164                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
165                config.get('ARVADOS_KEEP_PROXY'),
166                os.environ.get('KEEP_LOCAL_STORE'))
167         if (global_client_object is None) or (cls._last_key != key):
168             global_client_object = KeepClient()
169             cls._last_key = key
170         return global_client_object
171
172     @staticmethod
173     def get(locator, **kwargs):
174         return Keep.global_client_object().get(locator, **kwargs)
175
176     @staticmethod
177     def put(data, **kwargs):
178         return Keep.global_client_object().put(data, **kwargs)
179
180 class KeepBlockCache(object):
181     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
182         self.cache_max = cache_max
183         self._cache = []
184         self._cache_lock = threading.Lock()
185         self._max_slots = max_slots
186         self._disk_cache = disk_cache
187         self._disk_cache_dir = disk_cache_dir
188
189         if self._disk_cache and self._disk_cache_dir is None:
190             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
191             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
192
193         if self._max_slots == 0:
194             if self._disk_cache:
195                 # default max slots to half of maximum file handles
196                 # NOFILE typically defaults to 1024 on Linux so this
197                 # will be 512 slots.
198                 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
199             else:
200                 # RAM cache slots
201                 self._max_slots = 512
202
203         if self.cache_max == 0:
204             if self._disk_cache:
205                 fs = os.statvfs(self._disk_cache_dir)
206                 # Calculation of available space incorporates existing cache usage
207                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
210                 # pick smallest of:
211                 # 10% of total disk size
212                 # 25% of available space
213                 # max_slots * 64 MiB
214                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
215             else:
216                 # 256 MiB in RAM
217                 self.cache_max = (256 * 1024 * 1024)
218
219         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
220
221         if self._disk_cache:
222             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
223             self.cap_cache()
224
225
226     class CacheSlot(object):
227         __slots__ = ("locator", "ready", "content")
228
229         def __init__(self, locator):
230             self.locator = locator
231             self.ready = threading.Event()
232             self.content = None
233
234         def get(self):
235             self.ready.wait()
236             return self.content
237
238         def set(self, value):
239             self.content = value
240             self.ready.set()
241
242         def size(self):
243             if self.content is None:
244                 return 0
245             else:
246                 return len(self.content)
247
248         def evict(self):
249             return True
250
251     def cap_cache(self):
252         '''Cap the cache size to self.cache_max'''
253         with self._cache_lock:
254             # Select all slots except those where ready.is_set() and content is
255             # None (that means there was an error reading the block).
256             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
257             sm = sum([slot.size() for slot in self._cache])
258             while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
259                 for i in range(len(self._cache)-1, -1, -1):
260                     # start from the back, find a slot that is a candidate to evict
261                     if self._cache[i].ready.is_set():
262                         sz = self._cache[i].size()
263
264                         # If evict returns false it means the
265                         # underlying disk cache couldn't lock the file
266                         # for deletion because another process was using
267                         # it. Don't count it as reducing the amount
268                         # of data in the cache, find something else to
269                         # throw out.
270                         if self._cache[i].evict():
271                             sm -= sz
272
273                         # either way we forget about it.  either the
274                         # other process will delete it, or if we need
275                         # it again and it is still there, we'll find
276                         # it on disk.
277                         del self._cache[i]
278                         break
279
280     def _get(self, locator):
281         # Test if the locator is already in the cache
282         for i in range(0, len(self._cache)):
283             if self._cache[i].locator == locator:
284                 n = self._cache[i]
285                 if i != 0:
286                     # move it to the front
287                     del self._cache[i]
288                     self._cache.insert(0, n)
289                 return n
290         if self._disk_cache:
291             # see if it exists on disk
292             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
293             if n is not None:
294                 self._cache.insert(0, n)
295                 return n
296         return None
297
298     def get(self, locator):
299         with self._cache_lock:
300             return self._get(locator)
301
302     def reserve_cache(self, locator):
303         '''Reserve a cache slot for the specified locator,
304         or return the existing slot.'''
305         with self._cache_lock:
306             n = self._get(locator)
307             if n:
308                 return n, False
309             else:
310                 # Add a new cache slot for the locator
311                 if self._disk_cache:
312                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
313                 else:
314                     n = KeepBlockCache.CacheSlot(locator)
315                 self._cache.insert(0, n)
316                 return n, True
317
318     def set(self, slot, blob):
319         try:
320             slot.set(blob)
321             return
322         except OSError as e:
323             if e.errno == errno.ENOMEM:
324                 # Reduce max slots to current - 4, cap cache and retry
325                 with self._cache_lock:
326                     self._max_slots = max(4, len(self._cache) - 4)
327             elif e.errno == errno.ENOSPC:
328                 # Reduce disk max space to current - 256 MiB, cap cache and retry
329                 with self._cache_lock:
330                     sm = sum([st.size() for st in self._cache])
331                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
332             elif e.errno == errno.ENODEV:
333                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
334         except Exception as e:
335             pass
336         finally:
337             # Check if we should evict things from the cache.  Either
338             # because we added a new thing or there was an error and
339             # we possibly adjusted the limits down, so we might need
340             # to push something out.
341             self.cap_cache()
342
343         try:
344             # Only gets here if there was an error the first time. The
345             # exception handler adjusts limits downward in some cases
346             # to free up resources, which would make the operation
347             # succeed.
348             slot.set(blob)
349             self.cap_cache()
350         except Exception as e:
351             # It failed again.  Give up.
352             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
353         finally:
354             # Set the notice that that we are done with the cache
355             # slot one way or another.
356             slot.ready.set()
357
358
359 class Counter(object):
360     def __init__(self, v=0):
361         self._lk = threading.Lock()
362         self._val = v
363
364     def add(self, v):
365         with self._lk:
366             self._val += v
367
368     def get(self):
369         with self._lk:
370             return self._val
371
372
373 class KeepClient(object):
374
375     # Default Keep server connection timeout:  2 seconds
376     # Default Keep server read timeout:       256 seconds
377     # Default Keep server bandwidth minimum:  32768 bytes per second
378     # Default Keep proxy connection timeout:  20 seconds
379     # Default Keep proxy read timeout:        256 seconds
380     # Default Keep proxy bandwidth minimum:   32768 bytes per second
381     DEFAULT_TIMEOUT = (2, 256, 32768)
382     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
383
384
385     class KeepService(object):
386         """Make requests to a single Keep service, and track results.
387
388         A KeepService is intended to last long enough to perform one
389         transaction (GET or PUT) against one Keep service. This can
390         involve calling either get() or put() multiple times in order
391         to retry after transient failures. However, calling both get()
392         and put() on a single instance -- or using the same instance
393         to access two different Keep services -- will not produce
394         sensible behavior.
395         """
396
397         HTTP_ERRORS = (
398             socket.error,
399             ssl.SSLError,
400             arvados.errors.HttpError,
401         )
402
403         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
404                      upload_counter=None,
405                      download_counter=None,
406                      headers={},
407                      insecure=False):
408             self.root = root
409             self._user_agent_pool = user_agent_pool
410             self._result = {'error': None}
411             self._usable = True
412             self._session = None
413             self._socket = None
414             self.get_headers = {'Accept': 'application/octet-stream'}
415             self.get_headers.update(headers)
416             self.put_headers = headers
417             self.upload_counter = upload_counter
418             self.download_counter = download_counter
419             self.insecure = insecure
420
421         def usable(self):
422             """Is it worth attempting a request?"""
423             return self._usable
424
425         def finished(self):
426             """Did the request succeed or encounter permanent failure?"""
427             return self._result['error'] == False or not self._usable
428
429         def last_result(self):
430             return self._result
431
432         def _get_user_agent(self):
433             try:
434                 return self._user_agent_pool.get(block=False)
435             except queue.Empty:
436                 return pycurl.Curl()
437
438         def _put_user_agent(self, ua):
439             try:
440                 ua.reset()
441                 self._user_agent_pool.put(ua, block=False)
442             except:
443                 ua.close()
444
445         def _socket_open(self, *args, **kwargs):
446             if len(args) + len(kwargs) == 2:
447                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
448             else:
449                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
450
451         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
452             return self._socket_open_pycurl_7_21_5(
453                 purpose=None,
454                 address=collections.namedtuple(
455                     'Address', ['family', 'socktype', 'protocol', 'addr'],
456                 )(family, socktype, protocol, address))
457
458         def _socket_open_pycurl_7_21_5(self, purpose, address):
459             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
460             s = socket.socket(address.family, address.socktype, address.protocol)
461             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
462             # Will throw invalid protocol error on mac. This test prevents that.
463             if hasattr(socket, 'TCP_KEEPIDLE'):
464                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
465             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
466             self._socket = s
467             return s
468
469         def get(self, locator, method="GET", timeout=None):
470             # locator is a KeepLocator object.
471             url = self.root + str(locator)
472             _logger.debug("Request: %s %s", method, url)
473             curl = self._get_user_agent()
474             ok = None
475             try:
476                 with timer.Timer() as t:
477                     self._headers = {}
478                     response_body = BytesIO()
479                     curl.setopt(pycurl.NOSIGNAL, 1)
480                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
481                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
482                     curl.setopt(pycurl.URL, url.encode('utf-8'))
483                     curl.setopt(pycurl.HTTPHEADER, [
484                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
485                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
486                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
487                     if self.insecure:
488                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
489                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
490                     else:
491                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
492                     if method == "HEAD":
493                         curl.setopt(pycurl.NOBODY, True)
494                     self._setcurltimeouts(curl, timeout, method=="HEAD")
495
496                     try:
497                         curl.perform()
498                     except Exception as e:
499                         raise arvados.errors.HttpError(0, str(e))
500                     finally:
501                         if self._socket:
502                             self._socket.close()
503                             self._socket = None
504                     self._result = {
505                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
506                         'body': response_body.getvalue(),
507                         'headers': self._headers,
508                         'error': False,
509                     }
510
511                 ok = retry.check_http_response_success(self._result['status_code'])
512                 if not ok:
513                     self._result['error'] = arvados.errors.HttpError(
514                         self._result['status_code'],
515                         self._headers.get('x-status-line', 'Error'))
516             except self.HTTP_ERRORS as e:
517                 self._result = {
518                     'error': e,
519                 }
520             self._usable = ok != False
521             if self._result.get('status_code', None):
522                 # The client worked well enough to get an HTTP status
523                 # code, so presumably any problems are just on the
524                 # server side and it's OK to reuse the client.
525                 self._put_user_agent(curl)
526             else:
527                 # Don't return this client to the pool, in case it's
528                 # broken.
529                 curl.close()
530             if not ok:
531                 _logger.debug("Request fail: GET %s => %s: %s",
532                               url, type(self._result['error']), str(self._result['error']))
533                 return None
534             if method == "HEAD":
535                 _logger.info("HEAD %s: %s bytes",
536                          self._result['status_code'],
537                          self._result.get('content-length'))
538                 if self._result['headers'].get('x-keep-locator'):
539                     # This is a response to a remote block copy request, return
540                     # the local copy block locator.
541                     return self._result['headers'].get('x-keep-locator')
542                 return True
543
544             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
545                          self._result['status_code'],
546                          len(self._result['body']),
547                          t.msecs,
548                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
549
550             if self.download_counter:
551                 self.download_counter.add(len(self._result['body']))
552             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
553             if resp_md5 != locator.md5sum:
554                 _logger.warning("Checksum fail: md5(%s) = %s",
555                                 url, resp_md5)
556                 self._result['error'] = arvados.errors.HttpError(
557                     0, 'Checksum fail')
558                 return None
559             return self._result['body']
560
561         def put(self, hash_s, body, timeout=None, headers={}):
562             put_headers = copy.copy(self.put_headers)
563             put_headers.update(headers)
564             url = self.root + hash_s
565             _logger.debug("Request: PUT %s", url)
566             curl = self._get_user_agent()
567             ok = None
568             try:
569                 with timer.Timer() as t:
570                     self._headers = {}
571                     body_reader = BytesIO(body)
572                     response_body = BytesIO()
573                     curl.setopt(pycurl.NOSIGNAL, 1)
574                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
575                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
576                     curl.setopt(pycurl.URL, url.encode('utf-8'))
577                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
578                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
579                     # response) instead of sending the request body immediately.
580                     # This allows the server to reject the request if the request
581                     # is invalid or the server is read-only, without waiting for
582                     # the client to send the entire block.
583                     curl.setopt(pycurl.UPLOAD, True)
584                     curl.setopt(pycurl.INFILESIZE, len(body))
585                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
586                     curl.setopt(pycurl.HTTPHEADER, [
587                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
588                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
589                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
590                     if self.insecure:
591                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
592                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
593                     else:
594                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
595                     self._setcurltimeouts(curl, timeout)
596                     try:
597                         curl.perform()
598                     except Exception as e:
599                         raise arvados.errors.HttpError(0, str(e))
600                     finally:
601                         if self._socket:
602                             self._socket.close()
603                             self._socket = None
604                     self._result = {
605                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
606                         'body': response_body.getvalue().decode('utf-8'),
607                         'headers': self._headers,
608                         'error': False,
609                     }
610                 ok = retry.check_http_response_success(self._result['status_code'])
611                 if not ok:
612                     self._result['error'] = arvados.errors.HttpError(
613                         self._result['status_code'],
614                         self._headers.get('x-status-line', 'Error'))
615             except self.HTTP_ERRORS as e:
616                 self._result = {
617                     'error': e,
618                 }
619             self._usable = ok != False # still usable if ok is True or None
620             if self._result.get('status_code', None):
621                 # Client is functional. See comment in get().
622                 self._put_user_agent(curl)
623             else:
624                 curl.close()
625             if not ok:
626                 _logger.debug("Request fail: PUT %s => %s: %s",
627                               url, type(self._result['error']), str(self._result['error']))
628                 return False
629             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
630                          self._result['status_code'],
631                          len(body),
632                          t.msecs,
633                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
634             if self.upload_counter:
635                 self.upload_counter.add(len(body))
636             return True
637
638         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
639             if not timeouts:
640                 return
641             elif isinstance(timeouts, tuple):
642                 if len(timeouts) == 2:
643                     conn_t, xfer_t = timeouts
644                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
645                 else:
646                     conn_t, xfer_t, bandwidth_bps = timeouts
647             else:
648                 conn_t, xfer_t = (timeouts, timeouts)
649                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
650             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
651             if not ignore_bandwidth:
652                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
653                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
654
655         def _headerfunction(self, header_line):
656             if isinstance(header_line, bytes):
657                 header_line = header_line.decode('iso-8859-1')
658             if ':' in header_line:
659                 name, value = header_line.split(':', 1)
660                 name = name.strip().lower()
661                 value = value.strip()
662             elif self._headers:
663                 name = self._lastheadername
664                 value = self._headers[name] + ' ' + header_line.strip()
665             elif header_line.startswith('HTTP/'):
666                 name = 'x-status-line'
667                 value = header_line
668             else:
669                 _logger.error("Unexpected header line: %s", header_line)
670                 return
671             self._lastheadername = name
672             self._headers[name] = value
673             # Returning None implies all bytes were written
674
675
676     class KeepWriterQueue(queue.Queue):
677         def __init__(self, copies, classes=[]):
678             queue.Queue.__init__(self) # Old-style superclass
679             self.wanted_copies = copies
680             self.wanted_storage_classes = classes
681             self.successful_copies = 0
682             self.confirmed_storage_classes = {}
683             self.response = None
684             self.storage_classes_tracking = True
685             self.queue_data_lock = threading.RLock()
686             self.pending_tries = max(copies, len(classes))
687             self.pending_tries_notification = threading.Condition()
688
689         def write_success(self, response, replicas_nr, classes_confirmed):
690             with self.queue_data_lock:
691                 self.successful_copies += replicas_nr
692                 if classes_confirmed is None:
693                     self.storage_classes_tracking = False
694                 elif self.storage_classes_tracking:
695                     for st_class, st_copies in classes_confirmed.items():
696                         try:
697                             self.confirmed_storage_classes[st_class] += st_copies
698                         except KeyError:
699                             self.confirmed_storage_classes[st_class] = st_copies
700                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
701                 self.response = response
702             with self.pending_tries_notification:
703                 self.pending_tries_notification.notify_all()
704
705         def write_fail(self, ks):
706             with self.pending_tries_notification:
707                 self.pending_tries += 1
708                 self.pending_tries_notification.notify()
709
710         def pending_copies(self):
711             with self.queue_data_lock:
712                 return self.wanted_copies - self.successful_copies
713
714         def satisfied_classes(self):
715             with self.queue_data_lock:
716                 if not self.storage_classes_tracking:
717                     # Notifies disabled storage classes expectation to
718                     # the outer loop.
719                     return None
720             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
721
722         def pending_classes(self):
723             with self.queue_data_lock:
724                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
725                     return []
726                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
727                 for st_class, st_copies in self.confirmed_storage_classes.items():
728                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
729                         unsatisfied_classes.remove(st_class)
730                 return unsatisfied_classes
731
732         def get_next_task(self):
733             with self.pending_tries_notification:
734                 while True:
735                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
736                         # This notify_all() is unnecessary --
737                         # write_success() already called notify_all()
738                         # when pending<1 became true, so it's not
739                         # possible for any other thread to be in
740                         # wait() now -- but it's cheap insurance
741                         # against deadlock so we do it anyway:
742                         self.pending_tries_notification.notify_all()
743                         # Drain the queue and then raise Queue.Empty
744                         while True:
745                             self.get_nowait()
746                             self.task_done()
747                     elif self.pending_tries > 0:
748                         service, service_root = self.get_nowait()
749                         if service.finished():
750                             self.task_done()
751                             continue
752                         self.pending_tries -= 1
753                         return service, service_root
754                     elif self.empty():
755                         self.pending_tries_notification.notify_all()
756                         raise queue.Empty
757                     else:
758                         self.pending_tries_notification.wait()
759
760
761     class KeepWriterThreadPool(object):
762         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
763             self.total_task_nr = 0
764             if (not max_service_replicas) or (max_service_replicas >= copies):
765                 num_threads = 1
766             else:
767                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
768             _logger.debug("Pool max threads is %d", num_threads)
769             self.workers = []
770             self.queue = KeepClient.KeepWriterQueue(copies, classes)
771             # Create workers
772             for _ in range(num_threads):
773                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
774                 self.workers.append(w)
775
776         def add_task(self, ks, service_root):
777             self.queue.put((ks, service_root))
778             self.total_task_nr += 1
779
780         def done(self):
781             return self.queue.successful_copies, self.queue.satisfied_classes()
782
783         def join(self):
784             # Start workers
785             for worker in self.workers:
786                 worker.start()
787             # Wait for finished work
788             self.queue.join()
789
790         def response(self):
791             return self.queue.response
792
793
794     class KeepWriterThread(threading.Thread):
795         class TaskFailed(RuntimeError): pass
796
797         def __init__(self, queue, data, data_hash, timeout=None):
798             super(KeepClient.KeepWriterThread, self).__init__()
799             self.timeout = timeout
800             self.queue = queue
801             self.data = data
802             self.data_hash = data_hash
803             self.daemon = True
804
805         def run(self):
806             while True:
807                 try:
808                     service, service_root = self.queue.get_next_task()
809                 except queue.Empty:
810                     return
811                 try:
812                     locator, copies, classes = self.do_task(service, service_root)
813                 except Exception as e:
814                     if not isinstance(e, self.TaskFailed):
815                         _logger.exception("Exception in KeepWriterThread")
816                     self.queue.write_fail(service)
817                 else:
818                     self.queue.write_success(locator, copies, classes)
819                 finally:
820                     self.queue.task_done()
821
822         def do_task(self, service, service_root):
823             classes = self.queue.pending_classes()
824             headers = {}
825             if len(classes) > 0:
826                 classes.sort()
827                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
828             success = bool(service.put(self.data_hash,
829                                         self.data,
830                                         timeout=self.timeout,
831                                         headers=headers))
832             result = service.last_result()
833
834             if not success:
835                 if result.get('status_code'):
836                     _logger.debug("Request fail: PUT %s => %s %s",
837                                   self.data_hash,
838                                   result.get('status_code'),
839                                   result.get('body'))
840                 raise self.TaskFailed()
841
842             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
843                           str(threading.current_thread()),
844                           self.data_hash,
845                           len(self.data),
846                           service_root)
847             try:
848                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
849             except (KeyError, ValueError):
850                 replicas_stored = 1
851
852             classes_confirmed = {}
853             try:
854                 scch = result['headers']['x-keep-storage-classes-confirmed']
855                 for confirmation in scch.replace(' ', '').split(','):
856                     if '=' in confirmation:
857                         stored_class, stored_copies = confirmation.split('=')[:2]
858                         classes_confirmed[stored_class] = int(stored_copies)
859             except (KeyError, ValueError):
860                 # Storage classes confirmed header missing or corrupt
861                 classes_confirmed = None
862
863             return result['body'].strip(), replicas_stored, classes_confirmed
864
865
866     def __init__(self, api_client=None, proxy=None,
867                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
868                  api_token=None, local_store=None, block_cache=None,
869                  num_retries=0, session=None):
870         """Initialize a new KeepClient.
871
872         Arguments:
873         :api_client:
874           The API client to use to find Keep services.  If not
875           provided, KeepClient will build one from available Arvados
876           configuration.
877
878         :proxy:
879           If specified, this KeepClient will send requests to this Keep
880           proxy.  Otherwise, KeepClient will fall back to the setting of the
881           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
882           If you want to KeepClient does not use a proxy, pass in an empty
883           string.
884
885         :timeout:
886           The initial timeout (in seconds) for HTTP requests to Keep
887           non-proxy servers.  A tuple of three floats is interpreted as
888           (connection_timeout, read_timeout, minimum_bandwidth). A connection
889           will be aborted if the average traffic rate falls below
890           minimum_bandwidth bytes per second over an interval of read_timeout
891           seconds. Because timeouts are often a result of transient server
892           load, the actual connection timeout will be increased by a factor
893           of two on each retry.
894           Default: (2, 256, 32768).
895
896         :proxy_timeout:
897           The initial timeout (in seconds) for HTTP requests to
898           Keep proxies. A tuple of three floats is interpreted as
899           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
900           described above for adjusting connection timeouts on retry also
901           applies.
902           Default: (20, 256, 32768).
903
904         :api_token:
905           If you're not using an API client, but only talking
906           directly to a Keep proxy, this parameter specifies an API token
907           to authenticate Keep requests.  It is an error to specify both
908           api_client and api_token.  If you specify neither, KeepClient
909           will use one available from the Arvados configuration.
910
911         :local_store:
912           If specified, this KeepClient will bypass Keep
913           services, and save data to the named directory.  If unspecified,
914           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
915           environment variable.  If you want to ensure KeepClient does not
916           use local storage, pass in an empty string.  This is primarily
917           intended to mock a server for testing.
918
919         :num_retries:
920           The default number of times to retry failed requests.
921           This will be used as the default num_retries value when get() and
922           put() are called.  Default 0.
923         """
924         self.lock = threading.Lock()
925         if proxy is None:
926             if config.get('ARVADOS_KEEP_SERVICES'):
927                 proxy = config.get('ARVADOS_KEEP_SERVICES')
928             else:
929                 proxy = config.get('ARVADOS_KEEP_PROXY')
930         if api_token is None:
931             if api_client is None:
932                 api_token = config.get('ARVADOS_API_TOKEN')
933             else:
934                 api_token = api_client.api_token
935         elif api_client is not None:
936             raise ValueError(
937                 "can't build KeepClient with both API client and token")
938         if local_store is None:
939             local_store = os.environ.get('KEEP_LOCAL_STORE')
940
941         if api_client is None:
942             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
943         else:
944             self.insecure = api_client.insecure
945
946         self.block_cache = block_cache if block_cache else KeepBlockCache()
947         self.timeout = timeout
948         self.proxy_timeout = proxy_timeout
949         self._user_agent_pool = queue.LifoQueue()
950         self.upload_counter = Counter()
951         self.download_counter = Counter()
952         self.put_counter = Counter()
953         self.get_counter = Counter()
954         self.hits_counter = Counter()
955         self.misses_counter = Counter()
956         self._storage_classes_unsupported_warning = False
957         self._default_classes = []
958
959         if local_store:
960             self.local_store = local_store
961             self.head = self.local_store_head
962             self.get = self.local_store_get
963             self.put = self.local_store_put
964         else:
965             self.num_retries = num_retries
966             self.max_replicas_per_service = None
967             if proxy:
968                 proxy_uris = proxy.split()
969                 for i in range(len(proxy_uris)):
970                     if not proxy_uris[i].endswith('/'):
971                         proxy_uris[i] += '/'
972                     # URL validation
973                     url = urllib.parse.urlparse(proxy_uris[i])
974                     if not (url.scheme and url.netloc):
975                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
976                 self.api_token = api_token
977                 self._gateway_services = {}
978                 self._keep_services = [{
979                     'uuid': "00000-bi6l4-%015d" % idx,
980                     'service_type': 'proxy',
981                     '_service_root': uri,
982                     } for idx, uri in enumerate(proxy_uris)]
983                 self._writable_services = self._keep_services
984                 self.using_proxy = True
985                 self._static_services_list = True
986             else:
987                 # It's important to avoid instantiating an API client
988                 # unless we actually need one, for testing's sake.
989                 if api_client is None:
990                     api_client = arvados.api('v1')
991                 self.api_client = api_client
992                 self.api_token = api_client.api_token
993                 self._gateway_services = {}
994                 self._keep_services = None
995                 self._writable_services = None
996                 self.using_proxy = None
997                 self._static_services_list = False
998                 try:
999                     self._default_classes = [
1000                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
1001                 except KeyError:
1002                     # We're talking to an old cluster
1003                     pass
1004
1005     def current_timeout(self, attempt_number):
1006         """Return the appropriate timeout to use for this client.
1007
1008         The proxy timeout setting if the backend service is currently a proxy,
1009         the regular timeout setting otherwise.  The `attempt_number` indicates
1010         how many times the operation has been tried already (starting from 0
1011         for the first try), and scales the connection timeout portion of the
1012         return value accordingly.
1013
1014         """
1015         # TODO(twp): the timeout should be a property of a
1016         # KeepService, not a KeepClient. See #4488.
1017         t = self.proxy_timeout if self.using_proxy else self.timeout
1018         if len(t) == 2:
1019             return (t[0] * (1 << attempt_number), t[1])
1020         else:
1021             return (t[0] * (1 << attempt_number), t[1], t[2])
1022     def _any_nondisk_services(self, service_list):
1023         return any(ks.get('service_type', 'disk') != 'disk'
1024                    for ks in service_list)
1025
1026     def build_services_list(self, force_rebuild=False):
1027         if (self._static_services_list or
1028               (self._keep_services and not force_rebuild)):
1029             return
1030         with self.lock:
1031             try:
1032                 keep_services = self.api_client.keep_services().accessible()
1033             except Exception:  # API server predates Keep services.
1034                 keep_services = self.api_client.keep_disks().list()
1035
1036             # Gateway services are only used when specified by UUID,
1037             # so there's nothing to gain by filtering them by
1038             # service_type.
1039             self._gateway_services = {ks['uuid']: ks for ks in
1040                                       keep_services.execute()['items']}
1041             if not self._gateway_services:
1042                 raise arvados.errors.NoKeepServersError()
1043
1044             # Precompute the base URI for each service.
1045             for r in self._gateway_services.values():
1046                 host = r['service_host']
1047                 if not host.startswith('[') and host.find(':') >= 0:
1048                     # IPv6 URIs must be formatted like http://[::1]:80/...
1049                     host = '[' + host + ']'
1050                 r['_service_root'] = "{}://{}:{:d}/".format(
1051                     'https' if r['service_ssl_flag'] else 'http',
1052                     host,
1053                     r['service_port'])
1054
1055             _logger.debug(str(self._gateway_services))
1056             self._keep_services = [
1057                 ks for ks in self._gateway_services.values()
1058                 if not ks.get('service_type', '').startswith('gateway:')]
1059             self._writable_services = [ks for ks in self._keep_services
1060                                        if not ks.get('read_only')]
1061
1062             # For disk type services, max_replicas_per_service is 1
1063             # It is unknown (unlimited) for other service types.
1064             if self._any_nondisk_services(self._writable_services):
1065                 self.max_replicas_per_service = None
1066             else:
1067                 self.max_replicas_per_service = 1
1068
1069     def _service_weight(self, data_hash, service_uuid):
1070         """Compute the weight of a Keep service endpoint for a data
1071         block with a known hash.
1072
1073         The weight is md5(h + u) where u is the last 15 characters of
1074         the service endpoint's UUID.
1075         """
1076         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1077
1078     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1079         """Return an array of Keep service endpoints, in the order in
1080         which they should be probed when reading or writing data with
1081         the given hash+hints.
1082         """
1083         self.build_services_list(force_rebuild)
1084
1085         sorted_roots = []
1086         # Use the services indicated by the given +K@... remote
1087         # service hints, if any are present and can be resolved to a
1088         # URI.
1089         for hint in locator.hints:
1090             if hint.startswith('K@'):
1091                 if len(hint) == 7:
1092                     sorted_roots.append(
1093                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1094                 elif len(hint) == 29:
1095                     svc = self._gateway_services.get(hint[2:])
1096                     if svc:
1097                         sorted_roots.append(svc['_service_root'])
1098
1099         # Sort the available local services by weight (heaviest first)
1100         # for this locator, and return their service_roots (base URIs)
1101         # in that order.
1102         use_services = self._keep_services
1103         if need_writable:
1104             use_services = self._writable_services
1105         self.using_proxy = self._any_nondisk_services(use_services)
1106         sorted_roots.extend([
1107             svc['_service_root'] for svc in sorted(
1108                 use_services,
1109                 reverse=True,
1110                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1111         _logger.debug("{}: {}".format(locator, sorted_roots))
1112         return sorted_roots
1113
1114     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1115         # roots_map is a dictionary, mapping Keep service root strings
1116         # to KeepService objects.  Poll for Keep services, and add any
1117         # new ones to roots_map.  Return the current list of local
1118         # root strings.
1119         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1120         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1121         for root in local_roots:
1122             if root not in roots_map:
1123                 roots_map[root] = self.KeepService(
1124                     root, self._user_agent_pool,
1125                     upload_counter=self.upload_counter,
1126                     download_counter=self.download_counter,
1127                     headers=headers,
1128                     insecure=self.insecure)
1129         return local_roots
1130
1131     @staticmethod
1132     def _check_loop_result(result):
1133         # KeepClient RetryLoops should save results as a 2-tuple: the
1134         # actual result of the request, and the number of servers available
1135         # to receive the request this round.
1136         # This method returns True if there's a real result, False if
1137         # there are no more servers available, otherwise None.
1138         if isinstance(result, Exception):
1139             return None
1140         result, tried_server_count = result
1141         if (result is not None) and (result is not False):
1142             return True
1143         elif tried_server_count < 1:
1144             _logger.info("No more Keep services to try; giving up")
1145             return False
1146         else:
1147             return None
1148
1149     def get_from_cache(self, loc_s):
1150         """Fetch a block only if is in the cache, otherwise return None."""
1151         locator = KeepLocator(loc_s)
1152         slot = self.block_cache.get(locator.md5sum)
1153         if slot is not None and slot.ready.is_set():
1154             return slot.get()
1155         else:
1156             return None
1157
1158     def refresh_signature(self, loc):
1159         """Ask Keep to get the remote block and return its local signature"""
1160         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1161         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1162
1163     @retry.retry_method
1164     def head(self, loc_s, **kwargs):
1165         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1166
1167     @retry.retry_method
1168     def get(self, loc_s, **kwargs):
1169         return self._get_or_head(loc_s, method="GET", **kwargs)
1170
1171     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1172         """Get data from Keep.
1173
1174         This method fetches one or more blocks of data from Keep.  It
1175         sends a request each Keep service registered with the API
1176         server (or the proxy provided when this client was
1177         instantiated), then each service named in location hints, in
1178         sequence.  As soon as one service provides the data, it's
1179         returned.
1180
1181         Arguments:
1182         * loc_s: A string of one or more comma-separated locators to fetch.
1183           This method returns the concatenation of these blocks.
1184         * num_retries: The number of times to retry GET requests to
1185           *each* Keep server if it returns temporary failures, with
1186           exponential backoff.  Note that, in each loop, the method may try
1187           to fetch data from every available Keep service, along with any
1188           that are named in location hints in the locator.  The default value
1189           is set when the KeepClient is initialized.
1190         """
1191         if ',' in loc_s:
1192             return ''.join(self.get(x) for x in loc_s.split(','))
1193
1194         self.get_counter.add(1)
1195
1196         request_id = (request_id or
1197                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1198                       arvados.util.new_request_id())
1199         if headers is None:
1200             headers = {}
1201         headers['X-Request-Id'] = request_id
1202
1203         slot = None
1204         blob = None
1205         try:
1206             locator = KeepLocator(loc_s)
1207             if method == "GET":
1208                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1209                 if not first:
1210                     if prefetch:
1211                         # this is request for a prefetch, if it is
1212                         # already in flight, return immediately.
1213                         # clear 'slot' to prevent finally block from
1214                         # calling slot.set()
1215                         slot = None
1216                         return None
1217                     self.hits_counter.add(1)
1218                     blob = slot.get()
1219                     if blob is None:
1220                         raise arvados.errors.KeepReadError(
1221                             "failed to read {}".format(loc_s))
1222                     return blob
1223
1224             self.misses_counter.add(1)
1225
1226             # If the locator has hints specifying a prefix (indicating a
1227             # remote keepproxy) or the UUID of a local gateway service,
1228             # read data from the indicated service(s) instead of the usual
1229             # list of local disk services.
1230             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1231                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1232             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1233                                for hint in locator.hints if (
1234                                        hint.startswith('K@') and
1235                                        len(hint) == 29 and
1236                                        self._gateway_services.get(hint[2:])
1237                                        )])
1238             # Map root URLs to their KeepService objects.
1239             roots_map = {
1240                 root: self.KeepService(root, self._user_agent_pool,
1241                                        upload_counter=self.upload_counter,
1242                                        download_counter=self.download_counter,
1243                                        headers=headers,
1244                                        insecure=self.insecure)
1245                 for root in hint_roots
1246             }
1247
1248             # See #3147 for a discussion of the loop implementation.  Highlights:
1249             # * Refresh the list of Keep services after each failure, in case
1250             #   it's being updated.
1251             # * Retry until we succeed, we're out of retries, or every available
1252             #   service has returned permanent failure.
1253             sorted_roots = []
1254             roots_map = {}
1255             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1256                                    backoff_start=2)
1257             for tries_left in loop:
1258                 try:
1259                     sorted_roots = self.map_new_services(
1260                         roots_map, locator,
1261                         force_rebuild=(tries_left < num_retries),
1262                         need_writable=False,
1263                         headers=headers)
1264                 except Exception as error:
1265                     loop.save_result(error)
1266                     continue
1267
1268                 # Query KeepService objects that haven't returned
1269                 # permanent failure, in our specified shuffle order.
1270                 services_to_try = [roots_map[root]
1271                                    for root in sorted_roots
1272                                    if roots_map[root].usable()]
1273                 for keep_service in services_to_try:
1274                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1275                     if blob is not None:
1276                         break
1277                 loop.save_result((blob, len(services_to_try)))
1278
1279             # Always cache the result, then return it if we succeeded.
1280             if loop.success():
1281                 return blob
1282         finally:
1283             if slot is not None:
1284                 self.block_cache.set(slot, blob)
1285
1286         # Q: Including 403 is necessary for the Keep tests to continue
1287         # passing, but maybe they should expect KeepReadError instead?
1288         not_founds = sum(1 for key in sorted_roots
1289                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1290         service_errors = ((key, roots_map[key].last_result()['error'])
1291                           for key in sorted_roots)
1292         if not roots_map:
1293             raise arvados.errors.KeepReadError(
1294                 "[{}] failed to read {}: no Keep services available ({})".format(
1295                     request_id, loc_s, loop.last_result()))
1296         elif not_founds == len(sorted_roots):
1297             raise arvados.errors.NotFoundError(
1298                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1299         else:
1300             raise arvados.errors.KeepReadError(
1301                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1302
1303     @retry.retry_method
1304     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1305         """Save data in Keep.
1306
1307         This method will get a list of Keep services from the API server, and
1308         send the data to each one simultaneously in a new thread.  Once the
1309         uploads are finished, if enough copies are saved, this method returns
1310         the most recent HTTP response body.  If requests fail to upload
1311         enough copies, this method raises KeepWriteError.
1312
1313         Arguments:
1314         * data: The string of data to upload.
1315         * copies: The number of copies that the user requires be saved.
1316           Default 2.
1317         * num_retries: The number of times to retry PUT requests to
1318           *each* Keep server if it returns temporary failures, with
1319           exponential backoff.  The default value is set when the
1320           KeepClient is initialized.
1321         * classes: An optional list of storage class names where copies should
1322           be written.
1323         """
1324
1325         classes = classes or self._default_classes
1326
1327         if not isinstance(data, bytes):
1328             data = data.encode()
1329
1330         self.put_counter.add(1)
1331
1332         data_hash = hashlib.md5(data).hexdigest()
1333         loc_s = data_hash + '+' + str(len(data))
1334         if copies < 1:
1335             return loc_s
1336         locator = KeepLocator(loc_s)
1337
1338         request_id = (request_id or
1339                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1340                       arvados.util.new_request_id())
1341         headers = {
1342             'X-Request-Id': request_id,
1343             'X-Keep-Desired-Replicas': str(copies),
1344         }
1345         roots_map = {}
1346         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1347                                backoff_start=2)
1348         done_copies = 0
1349         done_classes = []
1350         for tries_left in loop:
1351             try:
1352                 sorted_roots = self.map_new_services(
1353                     roots_map, locator,
1354                     force_rebuild=(tries_left < num_retries),
1355                     need_writable=True,
1356                     headers=headers)
1357             except Exception as error:
1358                 loop.save_result(error)
1359                 continue
1360
1361             pending_classes = []
1362             if done_classes is not None:
1363                 pending_classes = list(set(classes) - set(done_classes))
1364             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1365                                                         data_hash=data_hash,
1366                                                         copies=copies - done_copies,
1367                                                         max_service_replicas=self.max_replicas_per_service,
1368                                                         timeout=self.current_timeout(num_retries - tries_left),
1369                                                         classes=pending_classes)
1370             for service_root, ks in [(root, roots_map[root])
1371                                      for root in sorted_roots]:
1372                 if ks.finished():
1373                     continue
1374                 writer_pool.add_task(ks, service_root)
1375             writer_pool.join()
1376             pool_copies, pool_classes = writer_pool.done()
1377             done_copies += pool_copies
1378             if (done_classes is not None) and (pool_classes is not None):
1379                 done_classes += pool_classes
1380                 loop.save_result(
1381                     (done_copies >= copies and set(done_classes) == set(classes),
1382                     writer_pool.total_task_nr))
1383             else:
1384                 # Old keepstore contacted without storage classes support:
1385                 # success is determined only by successful copies.
1386                 #
1387                 # Disable storage classes tracking from this point forward.
1388                 if not self._storage_classes_unsupported_warning:
1389                     self._storage_classes_unsupported_warning = True
1390                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1391                 done_classes = None
1392                 loop.save_result(
1393                     (done_copies >= copies, writer_pool.total_task_nr))
1394
1395         if loop.success():
1396             return writer_pool.response()
1397         if not roots_map:
1398             raise arvados.errors.KeepWriteError(
1399                 "[{}] failed to write {}: no Keep services available ({})".format(
1400                     request_id, data_hash, loop.last_result()))
1401         else:
1402             service_errors = ((key, roots_map[key].last_result()['error'])
1403                               for key in sorted_roots
1404                               if roots_map[key].last_result()['error'])
1405             raise arvados.errors.KeepWriteError(
1406                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1407                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1408
1409     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1410         """A stub for put().
1411
1412         This method is used in place of the real put() method when
1413         using local storage (see constructor's local_store argument).
1414
1415         copies and num_retries arguments are ignored: they are here
1416         only for the sake of offering the same call signature as
1417         put().
1418
1419         Data stored this way can be retrieved via local_store_get().
1420         """
1421         md5 = hashlib.md5(data).hexdigest()
1422         locator = '%s+%d' % (md5, len(data))
1423         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1424             f.write(data)
1425         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1426                   os.path.join(self.local_store, md5))
1427         return locator
1428
1429     def local_store_get(self, loc_s, num_retries=None):
1430         """Companion to local_store_put()."""
1431         try:
1432             locator = KeepLocator(loc_s)
1433         except ValueError:
1434             raise arvados.errors.NotFoundError(
1435                 "Invalid data locator: '%s'" % loc_s)
1436         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1437             return b''
1438         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1439             return f.read()
1440
1441     def local_store_head(self, loc_s, num_retries=None):
1442         """Companion to local_store_put()."""
1443         try:
1444             locator = KeepLocator(loc_s)
1445         except ValueError:
1446             raise arvados.errors.NotFoundError(
1447                 "Invalid data locator: '%s'" % loc_s)
1448         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1449             return True
1450         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1451             return True