21388: Delist CentOS 8 support
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import copy
6 import collections
7 import datetime
8 import hashlib
9 import errno
10 import io
11 import logging
12 import math
13 import os
14 import pycurl
15 import queue
16 import re
17 import socket
18 import ssl
19 import sys
20 import threading
21 import resource
22 import urllib.parse
23 import traceback
24 import weakref
25
26 from io import BytesIO
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33 import arvados.diskcache
34 from arvados._pycurlhelper import PyCurlHelper
35 from . import timer
36
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
39
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43     if not hasattr(socket, 'TCP_KEEPALIVE'):
44         socket.TCP_KEEPALIVE = 0x010
45     if not hasattr(socket, 'TCP_KEEPINTVL'):
46         socket.TCP_KEEPINTVL = 0x101
47     if not hasattr(socket, 'TCP_KEEPCNT'):
48         socket.TCP_KEEPCNT = 0x102
49
50 class KeepLocator(object):
51     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53
54     def __init__(self, locator_str):
55         self.hints = []
56         self._perm_sig = None
57         self._perm_expiry = None
58         pieces = iter(locator_str.split('+'))
59         self.md5sum = next(pieces)
60         try:
61             self.size = int(next(pieces))
62         except StopIteration:
63             self.size = None
64         for hint in pieces:
65             if self.HINT_RE.match(hint) is None:
66                 raise ValueError("invalid hint format: {}".format(hint))
67             elif hint.startswith('A'):
68                 self.parse_permission_hint(hint)
69             else:
70                 self.hints.append(hint)
71
72     def __str__(self):
73         return '+'.join(
74             str(s)
75             for s in [self.md5sum, self.size,
76                       self.permission_hint()] + self.hints
77             if s is not None)
78
79     def stripped(self):
80         if self.size is not None:
81             return "%s+%i" % (self.md5sum, self.size)
82         else:
83             return self.md5sum
84
85     def _make_hex_prop(name, length):
86         # Build and return a new property with the given name that
87         # must be a hex string of the given length.
88         data_name = '_{}'.format(name)
89         def getter(self):
90             return getattr(self, data_name)
91         def setter(self, hex_str):
92             if not arvados.util.is_hex(hex_str, length):
93                 raise ValueError("{} is not a {}-digit hex string: {!r}".
94                                  format(name, length, hex_str))
95             setattr(self, data_name, hex_str)
96         return property(getter, setter)
97
98     md5sum = _make_hex_prop('md5sum', 32)
99     perm_sig = _make_hex_prop('perm_sig', 40)
100
101     @property
102     def perm_expiry(self):
103         return self._perm_expiry
104
105     @perm_expiry.setter
106     def perm_expiry(self, value):
107         if not arvados.util.is_hex(value, 1, 8):
108             raise ValueError(
109                 "permission timestamp must be a hex Unix timestamp: {}".
110                 format(value))
111         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
112
113     def permission_hint(self):
114         data = [self.perm_sig, self.perm_expiry]
115         if None in data:
116             return None
117         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118         return "A{}@{:08x}".format(*data)
119
120     def parse_permission_hint(self, s):
121         try:
122             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
123         except IndexError:
124             raise ValueError("bad permission hint {}".format(s))
125
126     def permission_expired(self, as_of_dt=None):
127         if self.perm_expiry is None:
128             return False
129         elif as_of_dt is None:
130             as_of_dt = datetime.datetime.now()
131         return self.perm_expiry <= as_of_dt
132
133
134 class Keep(object):
135     """Simple interface to a global KeepClient object.
136
137     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
138     own API client.  The global KeepClient will build an API client from the
139     current Arvados configuration, which may not match the one you built.
140     """
141     _last_key = None
142
143     @classmethod
144     def global_client_object(cls):
145         global global_client_object
146         # Previously, KeepClient would change its behavior at runtime based
147         # on these configuration settings.  We simulate that behavior here
148         # by checking the values and returning a new KeepClient if any of
149         # them have changed.
150         key = (config.get('ARVADOS_API_HOST'),
151                config.get('ARVADOS_API_TOKEN'),
152                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
153                config.get('ARVADOS_KEEP_PROXY'),
154                os.environ.get('KEEP_LOCAL_STORE'))
155         if (global_client_object is None) or (cls._last_key != key):
156             global_client_object = KeepClient()
157             cls._last_key = key
158         return global_client_object
159
160     @staticmethod
161     def get(locator, **kwargs):
162         return Keep.global_client_object().get(locator, **kwargs)
163
164     @staticmethod
165     def put(data, **kwargs):
166         return Keep.global_client_object().put(data, **kwargs)
167
168 class KeepBlockCache(object):
169     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
170         self.cache_max = cache_max
171         self._cache = collections.OrderedDict()
172         self._cache_lock = threading.Lock()
173         self._max_slots = max_slots
174         self._disk_cache = disk_cache
175         self._disk_cache_dir = disk_cache_dir
176         self._cache_updating = threading.Condition(self._cache_lock)
177
178         if self._disk_cache and self._disk_cache_dir is None:
179             self._disk_cache_dir = str(arvados.util._BaseDirectories('CACHE').storage_path('keep'))
180
181         if self._max_slots == 0:
182             if self._disk_cache:
183                 # Each block uses two file descriptors, one used to
184                 # open it initially and hold the flock(), and a second
185                 # hidden one used by mmap().
186                 #
187                 # Set max slots to 1/8 of maximum file handles.  This
188                 # means we'll use at most 1/4 of total file handles.
189                 #
190                 # NOFILE typically defaults to 1024 on Linux so this
191                 # is 128 slots (256 file handles), which means we can
192                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
193                 # 768 file handles for sockets and other stuff.
194                 #
195                 # When we want the ability to have more cache (e.g. in
196                 # arv-mount) we'll increase rlimit before calling
197                 # this.
198                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
199             else:
200                 # RAM cache slots
201                 self._max_slots = 512
202
203         if self.cache_max == 0:
204             if self._disk_cache:
205                 fs = os.statvfs(self._disk_cache_dir)
206                 # Calculation of available space incorporates existing cache usage
207                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
210                 # pick smallest of:
211                 # 10% of total disk size
212                 # 25% of available space
213                 # max_slots * 64 MiB
214                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
215             else:
216                 # 256 MiB in RAM
217                 self.cache_max = (256 * 1024 * 1024)
218
219         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
220
221         self.cache_total = 0
222         if self._disk_cache:
223             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
224             for slot in self._cache.values():
225                 self.cache_total += slot.size()
226             self.cap_cache()
227
228     class CacheSlot(object):
229         __slots__ = ("locator", "ready", "content")
230
231         def __init__(self, locator):
232             self.locator = locator
233             self.ready = threading.Event()
234             self.content = None
235
236         def get(self):
237             self.ready.wait()
238             return self.content
239
240         def set(self, value):
241             if self.content is not None:
242                 return False
243             self.content = value
244             self.ready.set()
245             return True
246
247         def size(self):
248             if self.content is None:
249                 return 0
250             else:
251                 return len(self.content)
252
253         def evict(self):
254             self.content = None
255
256
257     def _resize_cache(self, cache_max, max_slots):
258         # Try and make sure the contents of the cache do not exceed
259         # the supplied maximums.
260
261         if self.cache_total <= cache_max and len(self._cache) <= max_slots:
262             return
263
264         _evict_candidates = collections.deque(self._cache.values())
265         while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
266             slot = _evict_candidates.popleft()
267             if not slot.ready.is_set():
268                 continue
269
270             sz = slot.size()
271             slot.evict()
272             self.cache_total -= sz
273             del self._cache[slot.locator]
274
275
276     def cap_cache(self):
277         '''Cap the cache size to self.cache_max'''
278         with self._cache_updating:
279             self._resize_cache(self.cache_max, self._max_slots)
280             self._cache_updating.notify_all()
281
282     def _get(self, locator):
283         # Test if the locator is already in the cache
284         if locator in self._cache:
285             n = self._cache[locator]
286             if n.ready.is_set() and n.content is None:
287                 del self._cache[n.locator]
288                 return None
289             self._cache.move_to_end(locator)
290             return n
291         if self._disk_cache:
292             # see if it exists on disk
293             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
294             if n is not None:
295                 self._cache[n.locator] = n
296                 self.cache_total += n.size()
297                 return n
298         return None
299
300     def get(self, locator):
301         with self._cache_lock:
302             return self._get(locator)
303
304     def reserve_cache(self, locator):
305         '''Reserve a cache slot for the specified locator,
306         or return the existing slot.'''
307         with self._cache_updating:
308             n = self._get(locator)
309             if n:
310                 return n, False
311             else:
312                 # Add a new cache slot for the locator
313                 self._resize_cache(self.cache_max, self._max_slots-1)
314                 while len(self._cache) >= self._max_slots:
315                     # If there isn't a slot available, need to wait
316                     # for something to happen that releases one of the
317                     # cache slots.  Idle for 200 ms or woken up by
318                     # another thread
319                     self._cache_updating.wait(timeout=0.2)
320                     self._resize_cache(self.cache_max, self._max_slots-1)
321
322                 if self._disk_cache:
323                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
324                 else:
325                     n = KeepBlockCache.CacheSlot(locator)
326                 self._cache[n.locator] = n
327                 return n, True
328
329     def set(self, slot, blob):
330         try:
331             if slot.set(blob):
332                 self.cache_total += slot.size()
333             return
334         except OSError as e:
335             if e.errno == errno.ENOMEM:
336                 # Reduce max slots to current - 4, cap cache and retry
337                 with self._cache_lock:
338                     self._max_slots = max(4, len(self._cache) - 4)
339             elif e.errno == errno.ENOSPC:
340                 # Reduce disk max space to current - 256 MiB, cap cache and retry
341                 with self._cache_lock:
342                     sm = sum(st.size() for st in self._cache.values())
343                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
344             elif e.errno == errno.ENODEV:
345                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
346         except Exception as e:
347             pass
348         finally:
349             # Check if we should evict things from the cache.  Either
350             # because we added a new thing or there was an error and
351             # we possibly adjusted the limits down, so we might need
352             # to push something out.
353             self.cap_cache()
354
355         try:
356             # Only gets here if there was an error the first time. The
357             # exception handler adjusts limits downward in some cases
358             # to free up resources, which would make the operation
359             # succeed.
360             if slot.set(blob):
361                 self.cache_total += slot.size()
362         except Exception as e:
363             # It failed again.  Give up.
364             slot.set(None)
365             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
366
367         self.cap_cache()
368
369 class Counter(object):
370     def __init__(self, v=0):
371         self._lk = threading.Lock()
372         self._val = v
373
374     def add(self, v):
375         with self._lk:
376             self._val += v
377
378     def get(self):
379         with self._lk:
380             return self._val
381
382
383 class KeepClient(object):
384     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
385     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
386
387     class KeepService(PyCurlHelper):
388         """Make requests to a single Keep service, and track results.
389
390         A KeepService is intended to last long enough to perform one
391         transaction (GET or PUT) against one Keep service. This can
392         involve calling either get() or put() multiple times in order
393         to retry after transient failures. However, calling both get()
394         and put() on a single instance -- or using the same instance
395         to access two different Keep services -- will not produce
396         sensible behavior.
397         """
398
399         HTTP_ERRORS = (
400             socket.error,
401             ssl.SSLError,
402             arvados.errors.HttpError,
403         )
404
405         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
406                      upload_counter=None,
407                      download_counter=None,
408                      headers={},
409                      insecure=False):
410             super(KeepClient.KeepService, self).__init__()
411             self.root = root
412             self._user_agent_pool = user_agent_pool
413             self._result = {'error': None}
414             self._usable = True
415             self._session = None
416             self._socket = None
417             self.get_headers = {'Accept': 'application/octet-stream'}
418             self.get_headers.update(headers)
419             self.put_headers = headers
420             self.upload_counter = upload_counter
421             self.download_counter = download_counter
422             self.insecure = insecure
423
424         def usable(self):
425             """Is it worth attempting a request?"""
426             return self._usable
427
428         def finished(self):
429             """Did the request succeed or encounter permanent failure?"""
430             return self._result['error'] == False or not self._usable
431
432         def last_result(self):
433             return self._result
434
435         def _get_user_agent(self):
436             try:
437                 return self._user_agent_pool.get(block=False)
438             except queue.Empty:
439                 return pycurl.Curl()
440
441         def _put_user_agent(self, ua):
442             try:
443                 ua.reset()
444                 self._user_agent_pool.put(ua, block=False)
445             except:
446                 ua.close()
447
448         def get(self, locator, method="GET", timeout=None):
449             # locator is a KeepLocator object.
450             url = self.root + str(locator)
451             _logger.debug("Request: %s %s", method, url)
452             curl = self._get_user_agent()
453             ok = None
454             try:
455                 with timer.Timer() as t:
456                     self._headers = {}
457                     response_body = BytesIO()
458                     curl.setopt(pycurl.NOSIGNAL, 1)
459                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
460                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
461                     curl.setopt(pycurl.URL, url.encode('utf-8'))
462                     curl.setopt(pycurl.HTTPHEADER, [
463                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
464                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
465                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
466                     if self.insecure:
467                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
468                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
469                     else:
470                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
471                     if method == "HEAD":
472                         curl.setopt(pycurl.NOBODY, True)
473                     else:
474                         curl.setopt(pycurl.HTTPGET, True)
475                     self._setcurltimeouts(curl, timeout, method=="HEAD")
476
477                     try:
478                         curl.perform()
479                     except Exception as e:
480                         raise arvados.errors.HttpError(0, str(e))
481                     finally:
482                         if self._socket:
483                             self._socket.close()
484                             self._socket = None
485                     self._result = {
486                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
487                         'body': response_body.getvalue(),
488                         'headers': self._headers,
489                         'error': False,
490                     }
491
492                 ok = retry.check_http_response_success(self._result['status_code'])
493                 if not ok:
494                     self._result['error'] = arvados.errors.HttpError(
495                         self._result['status_code'],
496                         self._headers.get('x-status-line', 'Error'))
497             except self.HTTP_ERRORS as e:
498                 self._result = {
499                     'error': e,
500                 }
501             self._usable = ok != False
502             if self._result.get('status_code', None):
503                 # The client worked well enough to get an HTTP status
504                 # code, so presumably any problems are just on the
505                 # server side and it's OK to reuse the client.
506                 self._put_user_agent(curl)
507             else:
508                 # Don't return this client to the pool, in case it's
509                 # broken.
510                 curl.close()
511             if not ok:
512                 _logger.debug("Request fail: GET %s => %s: %s",
513                               url, type(self._result['error']), str(self._result['error']))
514                 return None
515             if method == "HEAD":
516                 _logger.info("HEAD %s: %s bytes",
517                          self._result['status_code'],
518                          self._result.get('content-length'))
519                 if self._result['headers'].get('x-keep-locator'):
520                     # This is a response to a remote block copy request, return
521                     # the local copy block locator.
522                     return self._result['headers'].get('x-keep-locator')
523                 return True
524
525             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
526                          self._result['status_code'],
527                          len(self._result['body']),
528                          t.msecs,
529                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
530
531             if self.download_counter:
532                 self.download_counter.add(len(self._result['body']))
533             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
534             if resp_md5 != locator.md5sum:
535                 _logger.warning("Checksum fail: md5(%s) = %s",
536                                 url, resp_md5)
537                 self._result['error'] = arvados.errors.HttpError(
538                     0, 'Checksum fail')
539                 return None
540             return self._result['body']
541
542         def put(self, hash_s, body, timeout=None, headers={}):
543             put_headers = copy.copy(self.put_headers)
544             put_headers.update(headers)
545             url = self.root + hash_s
546             _logger.debug("Request: PUT %s", url)
547             curl = self._get_user_agent()
548             ok = None
549             try:
550                 with timer.Timer() as t:
551                     self._headers = {}
552                     body_reader = BytesIO(body)
553                     response_body = BytesIO()
554                     curl.setopt(pycurl.NOSIGNAL, 1)
555                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
556                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
557                     curl.setopt(pycurl.URL, url.encode('utf-8'))
558                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
559                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
560                     # response) instead of sending the request body immediately.
561                     # This allows the server to reject the request if the request
562                     # is invalid or the server is read-only, without waiting for
563                     # the client to send the entire block.
564                     curl.setopt(pycurl.UPLOAD, True)
565                     curl.setopt(pycurl.INFILESIZE, len(body))
566                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
567                     curl.setopt(pycurl.HTTPHEADER, [
568                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
569                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
570                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
571                     if self.insecure:
572                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
573                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
574                     else:
575                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
576                     self._setcurltimeouts(curl, timeout)
577                     try:
578                         curl.perform()
579                     except Exception as e:
580                         raise arvados.errors.HttpError(0, str(e))
581                     finally:
582                         if self._socket:
583                             self._socket.close()
584                             self._socket = None
585                     self._result = {
586                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
587                         'body': response_body.getvalue().decode('utf-8'),
588                         'headers': self._headers,
589                         'error': False,
590                     }
591                 ok = retry.check_http_response_success(self._result['status_code'])
592                 if not ok:
593                     self._result['error'] = arvados.errors.HttpError(
594                         self._result['status_code'],
595                         self._headers.get('x-status-line', 'Error'))
596             except self.HTTP_ERRORS as e:
597                 self._result = {
598                     'error': e,
599                 }
600             self._usable = ok != False # still usable if ok is True or None
601             if self._result.get('status_code', None):
602                 # Client is functional. See comment in get().
603                 self._put_user_agent(curl)
604             else:
605                 curl.close()
606             if not ok:
607                 _logger.debug("Request fail: PUT %s => %s: %s",
608                               url, type(self._result['error']), str(self._result['error']))
609                 return False
610             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
611                          self._result['status_code'],
612                          len(body),
613                          t.msecs,
614                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
615             if self.upload_counter:
616                 self.upload_counter.add(len(body))
617             return True
618
619
620     class KeepWriterQueue(queue.Queue):
621         def __init__(self, copies, classes=[]):
622             queue.Queue.__init__(self) # Old-style superclass
623             self.wanted_copies = copies
624             self.wanted_storage_classes = classes
625             self.successful_copies = 0
626             self.confirmed_storage_classes = {}
627             self.response = None
628             self.storage_classes_tracking = True
629             self.queue_data_lock = threading.RLock()
630             self.pending_tries = max(copies, len(classes))
631             self.pending_tries_notification = threading.Condition()
632
633         def write_success(self, response, replicas_nr, classes_confirmed):
634             with self.queue_data_lock:
635                 self.successful_copies += replicas_nr
636                 if classes_confirmed is None:
637                     self.storage_classes_tracking = False
638                 elif self.storage_classes_tracking:
639                     for st_class, st_copies in classes_confirmed.items():
640                         try:
641                             self.confirmed_storage_classes[st_class] += st_copies
642                         except KeyError:
643                             self.confirmed_storage_classes[st_class] = st_copies
644                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
645                 self.response = response
646             with self.pending_tries_notification:
647                 self.pending_tries_notification.notify_all()
648
649         def write_fail(self, ks):
650             with self.pending_tries_notification:
651                 self.pending_tries += 1
652                 self.pending_tries_notification.notify()
653
654         def pending_copies(self):
655             with self.queue_data_lock:
656                 return self.wanted_copies - self.successful_copies
657
658         def satisfied_classes(self):
659             with self.queue_data_lock:
660                 if not self.storage_classes_tracking:
661                     # Notifies disabled storage classes expectation to
662                     # the outer loop.
663                     return None
664             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
665
666         def pending_classes(self):
667             with self.queue_data_lock:
668                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
669                     return []
670                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
671                 for st_class, st_copies in self.confirmed_storage_classes.items():
672                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
673                         unsatisfied_classes.remove(st_class)
674                 return unsatisfied_classes
675
676         def get_next_task(self):
677             with self.pending_tries_notification:
678                 while True:
679                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
680                         # This notify_all() is unnecessary --
681                         # write_success() already called notify_all()
682                         # when pending<1 became true, so it's not
683                         # possible for any other thread to be in
684                         # wait() now -- but it's cheap insurance
685                         # against deadlock so we do it anyway:
686                         self.pending_tries_notification.notify_all()
687                         # Drain the queue and then raise Queue.Empty
688                         while True:
689                             self.get_nowait()
690                             self.task_done()
691                     elif self.pending_tries > 0:
692                         service, service_root = self.get_nowait()
693                         if service.finished():
694                             self.task_done()
695                             continue
696                         self.pending_tries -= 1
697                         return service, service_root
698                     elif self.empty():
699                         self.pending_tries_notification.notify_all()
700                         raise queue.Empty
701                     else:
702                         self.pending_tries_notification.wait()
703
704
705     class KeepWriterThreadPool(object):
706         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
707             self.total_task_nr = 0
708             if (not max_service_replicas) or (max_service_replicas >= copies):
709                 num_threads = 1
710             else:
711                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
712             _logger.debug("Pool max threads is %d", num_threads)
713             self.workers = []
714             self.queue = KeepClient.KeepWriterQueue(copies, classes)
715             # Create workers
716             for _ in range(num_threads):
717                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
718                 self.workers.append(w)
719
720         def add_task(self, ks, service_root):
721             self.queue.put((ks, service_root))
722             self.total_task_nr += 1
723
724         def done(self):
725             return self.queue.successful_copies, self.queue.satisfied_classes()
726
727         def join(self):
728             # Start workers
729             for worker in self.workers:
730                 worker.start()
731             # Wait for finished work
732             self.queue.join()
733
734         def response(self):
735             return self.queue.response
736
737
738     class KeepWriterThread(threading.Thread):
739         class TaskFailed(RuntimeError): pass
740
741         def __init__(self, queue, data, data_hash, timeout=None):
742             super(KeepClient.KeepWriterThread, self).__init__()
743             self.timeout = timeout
744             self.queue = queue
745             self.data = data
746             self.data_hash = data_hash
747             self.daemon = True
748
749         def run(self):
750             while True:
751                 try:
752                     service, service_root = self.queue.get_next_task()
753                 except queue.Empty:
754                     return
755                 try:
756                     locator, copies, classes = self.do_task(service, service_root)
757                 except Exception as e:
758                     if not isinstance(e, self.TaskFailed):
759                         _logger.exception("Exception in KeepWriterThread")
760                     self.queue.write_fail(service)
761                 else:
762                     self.queue.write_success(locator, copies, classes)
763                 finally:
764                     self.queue.task_done()
765
766         def do_task(self, service, service_root):
767             classes = self.queue.pending_classes()
768             headers = {}
769             if len(classes) > 0:
770                 classes.sort()
771                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
772             success = bool(service.put(self.data_hash,
773                                         self.data,
774                                         timeout=self.timeout,
775                                         headers=headers))
776             result = service.last_result()
777
778             if not success:
779                 if result.get('status_code'):
780                     _logger.debug("Request fail: PUT %s => %s %s",
781                                   self.data_hash,
782                                   result.get('status_code'),
783                                   result.get('body'))
784                 raise self.TaskFailed()
785
786             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
787                           str(threading.current_thread()),
788                           self.data_hash,
789                           len(self.data),
790                           service_root)
791             try:
792                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
793             except (KeyError, ValueError):
794                 replicas_stored = 1
795
796             classes_confirmed = {}
797             try:
798                 scch = result['headers']['x-keep-storage-classes-confirmed']
799                 for confirmation in scch.replace(' ', '').split(','):
800                     if '=' in confirmation:
801                         stored_class, stored_copies = confirmation.split('=')[:2]
802                         classes_confirmed[stored_class] = int(stored_copies)
803             except (KeyError, ValueError):
804                 # Storage classes confirmed header missing or corrupt
805                 classes_confirmed = None
806
807             return result['body'].strip(), replicas_stored, classes_confirmed
808
809
810     def __init__(self, api_client=None, proxy=None,
811                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
812                  api_token=None, local_store=None, block_cache=None,
813                  num_retries=10, session=None, num_prefetch_threads=None):
814         """Initialize a new KeepClient.
815
816         Arguments:
817         :api_client:
818           The API client to use to find Keep services.  If not
819           provided, KeepClient will build one from available Arvados
820           configuration.
821
822         :proxy:
823           If specified, this KeepClient will send requests to this Keep
824           proxy.  Otherwise, KeepClient will fall back to the setting of the
825           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
826           If you want to KeepClient does not use a proxy, pass in an empty
827           string.
828
829         :timeout:
830           The initial timeout (in seconds) for HTTP requests to Keep
831           non-proxy servers.  A tuple of three floats is interpreted as
832           (connection_timeout, read_timeout, minimum_bandwidth). A connection
833           will be aborted if the average traffic rate falls below
834           minimum_bandwidth bytes per second over an interval of read_timeout
835           seconds. Because timeouts are often a result of transient server
836           load, the actual connection timeout will be increased by a factor
837           of two on each retry.
838           Default: (2, 256, 32768).
839
840         :proxy_timeout:
841           The initial timeout (in seconds) for HTTP requests to
842           Keep proxies. A tuple of three floats is interpreted as
843           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
844           described above for adjusting connection timeouts on retry also
845           applies.
846           Default: (20, 256, 32768).
847
848         :api_token:
849           If you're not using an API client, but only talking
850           directly to a Keep proxy, this parameter specifies an API token
851           to authenticate Keep requests.  It is an error to specify both
852           api_client and api_token.  If you specify neither, KeepClient
853           will use one available from the Arvados configuration.
854
855         :local_store:
856           If specified, this KeepClient will bypass Keep
857           services, and save data to the named directory.  If unspecified,
858           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
859           environment variable.  If you want to ensure KeepClient does not
860           use local storage, pass in an empty string.  This is primarily
861           intended to mock a server for testing.
862
863         :num_retries:
864           The default number of times to retry failed requests.
865           This will be used as the default num_retries value when get() and
866           put() are called.  Default 10.
867         """
868         self.lock = threading.Lock()
869         if proxy is None:
870             if config.get('ARVADOS_KEEP_SERVICES'):
871                 proxy = config.get('ARVADOS_KEEP_SERVICES')
872             else:
873                 proxy = config.get('ARVADOS_KEEP_PROXY')
874         if api_token is None:
875             if api_client is None:
876                 api_token = config.get('ARVADOS_API_TOKEN')
877             else:
878                 api_token = api_client.api_token
879         elif api_client is not None:
880             raise ValueError(
881                 "can't build KeepClient with both API client and token")
882         if local_store is None:
883             local_store = os.environ.get('KEEP_LOCAL_STORE')
884
885         if api_client is None:
886             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
887         else:
888             self.insecure = api_client.insecure
889
890         self.block_cache = block_cache if block_cache else KeepBlockCache()
891         self.timeout = timeout
892         self.proxy_timeout = proxy_timeout
893         self._user_agent_pool = queue.LifoQueue()
894         self.upload_counter = Counter()
895         self.download_counter = Counter()
896         self.put_counter = Counter()
897         self.get_counter = Counter()
898         self.hits_counter = Counter()
899         self.misses_counter = Counter()
900         self._storage_classes_unsupported_warning = False
901         self._default_classes = []
902         if num_prefetch_threads is not None:
903             self.num_prefetch_threads = num_prefetch_threads
904         else:
905             self.num_prefetch_threads = 2
906         self._prefetch_queue = None
907         self._prefetch_threads = None
908
909         if local_store:
910             self.local_store = local_store
911             self.head = self.local_store_head
912             self.get = self.local_store_get
913             self.put = self.local_store_put
914         else:
915             self.num_retries = num_retries
916             self.max_replicas_per_service = None
917             if proxy:
918                 proxy_uris = proxy.split()
919                 for i in range(len(proxy_uris)):
920                     if not proxy_uris[i].endswith('/'):
921                         proxy_uris[i] += '/'
922                     # URL validation
923                     url = urllib.parse.urlparse(proxy_uris[i])
924                     if not (url.scheme and url.netloc):
925                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
926                 self.api_token = api_token
927                 self._gateway_services = {}
928                 self._keep_services = [{
929                     'uuid': "00000-bi6l4-%015d" % idx,
930                     'service_type': 'proxy',
931                     '_service_root': uri,
932                     } for idx, uri in enumerate(proxy_uris)]
933                 self._writable_services = self._keep_services
934                 self.using_proxy = True
935                 self._static_services_list = True
936             else:
937                 # It's important to avoid instantiating an API client
938                 # unless we actually need one, for testing's sake.
939                 if api_client is None:
940                     api_client = arvados.api('v1')
941                 self.api_client = api_client
942                 self.api_token = api_client.api_token
943                 self._gateway_services = {}
944                 self._keep_services = None
945                 self._writable_services = None
946                 self.using_proxy = None
947                 self._static_services_list = False
948                 try:
949                     self._default_classes = [
950                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
951                 except KeyError:
952                     # We're talking to an old cluster
953                     pass
954
955     def current_timeout(self, attempt_number):
956         """Return the appropriate timeout to use for this client.
957
958         The proxy timeout setting if the backend service is currently a proxy,
959         the regular timeout setting otherwise.  The `attempt_number` indicates
960         how many times the operation has been tried already (starting from 0
961         for the first try), and scales the connection timeout portion of the
962         return value accordingly.
963
964         """
965         # TODO(twp): the timeout should be a property of a
966         # KeepService, not a KeepClient. See #4488.
967         t = self.proxy_timeout if self.using_proxy else self.timeout
968         if len(t) == 2:
969             return (t[0] * (1 << attempt_number), t[1])
970         else:
971             return (t[0] * (1 << attempt_number), t[1], t[2])
972     def _any_nondisk_services(self, service_list):
973         return any(ks.get('service_type', 'disk') != 'disk'
974                    for ks in service_list)
975
976     def build_services_list(self, force_rebuild=False):
977         if (self._static_services_list or
978               (self._keep_services and not force_rebuild)):
979             return
980         with self.lock:
981             try:
982                 keep_services = self.api_client.keep_services().accessible()
983             except Exception:  # API server predates Keep services.
984                 keep_services = self.api_client.keep_disks().list()
985
986             # Gateway services are only used when specified by UUID,
987             # so there's nothing to gain by filtering them by
988             # service_type.
989             self._gateway_services = {ks['uuid']: ks for ks in
990                                       keep_services.execute()['items']}
991             if not self._gateway_services:
992                 raise arvados.errors.NoKeepServersError()
993
994             # Precompute the base URI for each service.
995             for r in self._gateway_services.values():
996                 host = r['service_host']
997                 if not host.startswith('[') and host.find(':') >= 0:
998                     # IPv6 URIs must be formatted like http://[::1]:80/...
999                     host = '[' + host + ']'
1000                 r['_service_root'] = "{}://{}:{:d}/".format(
1001                     'https' if r['service_ssl_flag'] else 'http',
1002                     host,
1003                     r['service_port'])
1004
1005             _logger.debug(str(self._gateway_services))
1006             self._keep_services = [
1007                 ks for ks in self._gateway_services.values()
1008                 if not ks.get('service_type', '').startswith('gateway:')]
1009             self._writable_services = [ks for ks in self._keep_services
1010                                        if not ks.get('read_only')]
1011
1012             # For disk type services, max_replicas_per_service is 1
1013             # It is unknown (unlimited) for other service types.
1014             if self._any_nondisk_services(self._writable_services):
1015                 self.max_replicas_per_service = None
1016             else:
1017                 self.max_replicas_per_service = 1
1018
1019     def _service_weight(self, data_hash, service_uuid):
1020         """Compute the weight of a Keep service endpoint for a data
1021         block with a known hash.
1022
1023         The weight is md5(h + u) where u is the last 15 characters of
1024         the service endpoint's UUID.
1025         """
1026         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1027
1028     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1029         """Return an array of Keep service endpoints, in the order in
1030         which they should be probed when reading or writing data with
1031         the given hash+hints.
1032         """
1033         self.build_services_list(force_rebuild)
1034
1035         sorted_roots = []
1036         # Use the services indicated by the given +K@... remote
1037         # service hints, if any are present and can be resolved to a
1038         # URI.
1039         for hint in locator.hints:
1040             if hint.startswith('K@'):
1041                 if len(hint) == 7:
1042                     sorted_roots.append(
1043                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1044                 elif len(hint) == 29:
1045                     svc = self._gateway_services.get(hint[2:])
1046                     if svc:
1047                         sorted_roots.append(svc['_service_root'])
1048
1049         # Sort the available local services by weight (heaviest first)
1050         # for this locator, and return their service_roots (base URIs)
1051         # in that order.
1052         use_services = self._keep_services
1053         if need_writable:
1054             use_services = self._writable_services
1055         self.using_proxy = self._any_nondisk_services(use_services)
1056         sorted_roots.extend([
1057             svc['_service_root'] for svc in sorted(
1058                 use_services,
1059                 reverse=True,
1060                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1061         _logger.debug("{}: {}".format(locator, sorted_roots))
1062         return sorted_roots
1063
1064     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1065         # roots_map is a dictionary, mapping Keep service root strings
1066         # to KeepService objects.  Poll for Keep services, and add any
1067         # new ones to roots_map.  Return the current list of local
1068         # root strings.
1069         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1070         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1071         for root in local_roots:
1072             if root not in roots_map:
1073                 roots_map[root] = self.KeepService(
1074                     root, self._user_agent_pool,
1075                     upload_counter=self.upload_counter,
1076                     download_counter=self.download_counter,
1077                     headers=headers,
1078                     insecure=self.insecure)
1079         return local_roots
1080
1081     @staticmethod
1082     def _check_loop_result(result):
1083         # KeepClient RetryLoops should save results as a 2-tuple: the
1084         # actual result of the request, and the number of servers available
1085         # to receive the request this round.
1086         # This method returns True if there's a real result, False if
1087         # there are no more servers available, otherwise None.
1088         if isinstance(result, Exception):
1089             return None
1090         result, tried_server_count = result
1091         if (result is not None) and (result is not False):
1092             return True
1093         elif tried_server_count < 1:
1094             _logger.info("No more Keep services to try; giving up")
1095             return False
1096         else:
1097             return None
1098
1099     def get_from_cache(self, loc_s):
1100         """Fetch a block only if is in the cache, otherwise return None."""
1101         locator = KeepLocator(loc_s)
1102         slot = self.block_cache.get(locator.md5sum)
1103         if slot is not None and slot.ready.is_set():
1104             return slot.get()
1105         else:
1106             return None
1107
1108     def refresh_signature(self, loc):
1109         """Ask Keep to get the remote block and return its local signature"""
1110         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1111         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1112
1113     @retry.retry_method
1114     def head(self, loc_s, **kwargs):
1115         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1116
1117     @retry.retry_method
1118     def get(self, loc_s, **kwargs):
1119         return self._get_or_head(loc_s, method="GET", **kwargs)
1120
1121     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1122         """Get data from Keep.
1123
1124         This method fetches one or more blocks of data from Keep.  It
1125         sends a request each Keep service registered with the API
1126         server (or the proxy provided when this client was
1127         instantiated), then each service named in location hints, in
1128         sequence.  As soon as one service provides the data, it's
1129         returned.
1130
1131         Arguments:
1132         * loc_s: A string of one or more comma-separated locators to fetch.
1133           This method returns the concatenation of these blocks.
1134         * num_retries: The number of times to retry GET requests to
1135           *each* Keep server if it returns temporary failures, with
1136           exponential backoff.  Note that, in each loop, the method may try
1137           to fetch data from every available Keep service, along with any
1138           that are named in location hints in the locator.  The default value
1139           is set when the KeepClient is initialized.
1140         """
1141         if ',' in loc_s:
1142             return ''.join(self.get(x) for x in loc_s.split(','))
1143
1144         self.get_counter.add(1)
1145
1146         request_id = (request_id or
1147                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1148                       arvados.util.new_request_id())
1149         if headers is None:
1150             headers = {}
1151         headers['X-Request-Id'] = request_id
1152
1153         slot = None
1154         blob = None
1155         try:
1156             locator = KeepLocator(loc_s)
1157             if method == "GET":
1158                 while slot is None:
1159                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1160                     if first:
1161                         # Fresh and empty "first time it is used" slot
1162                         break
1163                     if prefetch:
1164                         # this is request for a prefetch to fill in
1165                         # the cache, don't need to wait for the
1166                         # result, so if it is already in flight return
1167                         # immediately.  Clear 'slot' to prevent
1168                         # finally block from calling slot.set()
1169                         if slot.ready.is_set():
1170                             slot.get()
1171                         slot = None
1172                         return None
1173
1174                     blob = slot.get()
1175                     if blob is not None:
1176                         self.hits_counter.add(1)
1177                         return blob
1178
1179                     # If blob is None, this means either
1180                     #
1181                     # (a) another thread was fetching this block and
1182                     # failed with an error or
1183                     #
1184                     # (b) cache thrashing caused the slot to be
1185                     # evicted (content set to None) by another thread
1186                     # between the call to reserve_cache() and get().
1187                     #
1188                     # We'll handle these cases by reserving a new slot
1189                     # and then doing a full GET request.
1190                     slot = None
1191
1192             self.misses_counter.add(1)
1193
1194             # If the locator has hints specifying a prefix (indicating a
1195             # remote keepproxy) or the UUID of a local gateway service,
1196             # read data from the indicated service(s) instead of the usual
1197             # list of local disk services.
1198             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1199                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1200             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1201                                for hint in locator.hints if (
1202                                        hint.startswith('K@') and
1203                                        len(hint) == 29 and
1204                                        self._gateway_services.get(hint[2:])
1205                                        )])
1206             # Map root URLs to their KeepService objects.
1207             roots_map = {
1208                 root: self.KeepService(root, self._user_agent_pool,
1209                                        upload_counter=self.upload_counter,
1210                                        download_counter=self.download_counter,
1211                                        headers=headers,
1212                                        insecure=self.insecure)
1213                 for root in hint_roots
1214             }
1215
1216             # See #3147 for a discussion of the loop implementation.  Highlights:
1217             # * Refresh the list of Keep services after each failure, in case
1218             #   it's being updated.
1219             # * Retry until we succeed, we're out of retries, or every available
1220             #   service has returned permanent failure.
1221             sorted_roots = []
1222             roots_map = {}
1223             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1224                                    backoff_start=2)
1225             for tries_left in loop:
1226                 try:
1227                     sorted_roots = self.map_new_services(
1228                         roots_map, locator,
1229                         force_rebuild=(tries_left < num_retries),
1230                         need_writable=False,
1231                         headers=headers)
1232                 except Exception as error:
1233                     loop.save_result(error)
1234                     continue
1235
1236                 # Query KeepService objects that haven't returned
1237                 # permanent failure, in our specified shuffle order.
1238                 services_to_try = [roots_map[root]
1239                                    for root in sorted_roots
1240                                    if roots_map[root].usable()]
1241                 for keep_service in services_to_try:
1242                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1243                     if blob is not None:
1244                         break
1245                 loop.save_result((blob, len(services_to_try)))
1246
1247             # Always cache the result, then return it if we succeeded.
1248             if loop.success():
1249                 return blob
1250         finally:
1251             if slot is not None:
1252                 self.block_cache.set(slot, blob)
1253
1254         # Q: Including 403 is necessary for the Keep tests to continue
1255         # passing, but maybe they should expect KeepReadError instead?
1256         not_founds = sum(1 for key in sorted_roots
1257                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1258         service_errors = ((key, roots_map[key].last_result()['error'])
1259                           for key in sorted_roots)
1260         if not roots_map:
1261             raise arvados.errors.KeepReadError(
1262                 "[{}] failed to read {}: no Keep services available ({})".format(
1263                     request_id, loc_s, loop.last_result()))
1264         elif not_founds == len(sorted_roots):
1265             raise arvados.errors.NotFoundError(
1266                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1267         else:
1268             raise arvados.errors.KeepReadError(
1269                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1270
1271     @retry.retry_method
1272     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1273         """Save data in Keep.
1274
1275         This method will get a list of Keep services from the API server, and
1276         send the data to each one simultaneously in a new thread.  Once the
1277         uploads are finished, if enough copies are saved, this method returns
1278         the most recent HTTP response body.  If requests fail to upload
1279         enough copies, this method raises KeepWriteError.
1280
1281         Arguments:
1282         * data: The string of data to upload.
1283         * copies: The number of copies that the user requires be saved.
1284           Default 2.
1285         * num_retries: The number of times to retry PUT requests to
1286           *each* Keep server if it returns temporary failures, with
1287           exponential backoff.  The default value is set when the
1288           KeepClient is initialized.
1289         * classes: An optional list of storage class names where copies should
1290           be written.
1291         """
1292
1293         classes = classes or self._default_classes
1294
1295         if not isinstance(data, bytes):
1296             data = data.encode()
1297
1298         self.put_counter.add(1)
1299
1300         data_hash = hashlib.md5(data).hexdigest()
1301         loc_s = data_hash + '+' + str(len(data))
1302         if copies < 1:
1303             return loc_s
1304         locator = KeepLocator(loc_s)
1305
1306         request_id = (request_id or
1307                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1308                       arvados.util.new_request_id())
1309         headers = {
1310             'X-Request-Id': request_id,
1311             'X-Keep-Desired-Replicas': str(copies),
1312         }
1313         roots_map = {}
1314         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1315                                backoff_start=2)
1316         done_copies = 0
1317         done_classes = []
1318         for tries_left in loop:
1319             try:
1320                 sorted_roots = self.map_new_services(
1321                     roots_map, locator,
1322                     force_rebuild=(tries_left < num_retries),
1323                     need_writable=True,
1324                     headers=headers)
1325             except Exception as error:
1326                 loop.save_result(error)
1327                 continue
1328
1329             pending_classes = []
1330             if done_classes is not None:
1331                 pending_classes = list(set(classes) - set(done_classes))
1332             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1333                                                         data_hash=data_hash,
1334                                                         copies=copies - done_copies,
1335                                                         max_service_replicas=self.max_replicas_per_service,
1336                                                         timeout=self.current_timeout(num_retries - tries_left),
1337                                                         classes=pending_classes)
1338             for service_root, ks in [(root, roots_map[root])
1339                                      for root in sorted_roots]:
1340                 if ks.finished():
1341                     continue
1342                 writer_pool.add_task(ks, service_root)
1343             writer_pool.join()
1344             pool_copies, pool_classes = writer_pool.done()
1345             done_copies += pool_copies
1346             if (done_classes is not None) and (pool_classes is not None):
1347                 done_classes += pool_classes
1348                 loop.save_result(
1349                     (done_copies >= copies and set(done_classes) == set(classes),
1350                     writer_pool.total_task_nr))
1351             else:
1352                 # Old keepstore contacted without storage classes support:
1353                 # success is determined only by successful copies.
1354                 #
1355                 # Disable storage classes tracking from this point forward.
1356                 if not self._storage_classes_unsupported_warning:
1357                     self._storage_classes_unsupported_warning = True
1358                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1359                 done_classes = None
1360                 loop.save_result(
1361                     (done_copies >= copies, writer_pool.total_task_nr))
1362
1363         if loop.success():
1364             return writer_pool.response()
1365         if not roots_map:
1366             raise arvados.errors.KeepWriteError(
1367                 "[{}] failed to write {}: no Keep services available ({})".format(
1368                     request_id, data_hash, loop.last_result()))
1369         else:
1370             service_errors = ((key, roots_map[key].last_result()['error'])
1371                               for key in sorted_roots
1372                               if roots_map[key].last_result()['error'])
1373             raise arvados.errors.KeepWriteError(
1374                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1375                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1376
1377     def _block_prefetch_worker(self):
1378         """The background downloader thread."""
1379         while True:
1380             try:
1381                 b = self._prefetch_queue.get()
1382                 if b is None:
1383                     return
1384                 self.get(b, prefetch=True)
1385             except Exception:
1386                 _logger.exception("Exception doing block prefetch")
1387
1388     def _start_prefetch_threads(self):
1389         if self._prefetch_threads is None:
1390             with self.lock:
1391                 if self._prefetch_threads is not None:
1392                     return
1393                 self._prefetch_queue = queue.Queue()
1394                 self._prefetch_threads = []
1395                 for i in range(0, self.num_prefetch_threads):
1396                     thread = threading.Thread(target=self._block_prefetch_worker)
1397                     self._prefetch_threads.append(thread)
1398                     thread.daemon = True
1399                     thread.start()
1400
1401     def block_prefetch(self, locator):
1402         """
1403         This relies on the fact that KeepClient implements a block cache,
1404         so repeated requests for the same block will not result in repeated
1405         downloads (unless the block is evicted from the cache.)  This method
1406         does not block.
1407         """
1408
1409         if self.block_cache.get(locator) is not None:
1410             return
1411
1412         self._start_prefetch_threads()
1413         self._prefetch_queue.put(locator)
1414
1415     def stop_prefetch_threads(self):
1416         with self.lock:
1417             if self._prefetch_threads is not None:
1418                 for t in self._prefetch_threads:
1419                     self._prefetch_queue.put(None)
1420                 for t in self._prefetch_threads:
1421                     t.join()
1422             self._prefetch_threads = None
1423             self._prefetch_queue = None
1424
1425     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1426         """A stub for put().
1427
1428         This method is used in place of the real put() method when
1429         using local storage (see constructor's local_store argument).
1430
1431         copies and num_retries arguments are ignored: they are here
1432         only for the sake of offering the same call signature as
1433         put().
1434
1435         Data stored this way can be retrieved via local_store_get().
1436         """
1437         md5 = hashlib.md5(data).hexdigest()
1438         locator = '%s+%d' % (md5, len(data))
1439         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1440             f.write(data)
1441         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1442                   os.path.join(self.local_store, md5))
1443         return locator
1444
1445     def local_store_get(self, loc_s, num_retries=None):
1446         """Companion to local_store_put()."""
1447         try:
1448             locator = KeepLocator(loc_s)
1449         except ValueError:
1450             raise arvados.errors.NotFoundError(
1451                 "Invalid data locator: '%s'" % loc_s)
1452         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1453             return b''
1454         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1455             return f.read()
1456
1457     def local_store_head(self, loc_s, num_retries=None):
1458         """Companion to local_store_put()."""
1459         try:
1460             locator = KeepLocator(loc_s)
1461         except ValueError:
1462             raise arvados.errors.NotFoundError(
1463                 "Invalid data locator: '%s'" % loc_s)
1464         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1465             return True
1466         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1467             return True