21020: Get default Keep cache directory from environment
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import copy
6 import collections
7 import datetime
8 import hashlib
9 import errno
10 import io
11 import logging
12 import math
13 import os
14 import pycurl
15 import queue
16 import re
17 import socket
18 import ssl
19 import sys
20 import threading
21 import resource
22 import urllib.parse
23 import traceback
24 import weakref
25
26 from io import BytesIO
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33 import arvados.diskcache
34 from arvados._pycurlhelper import PyCurlHelper
35 from . import timer
36
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
39
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43     if not hasattr(socket, 'TCP_KEEPALIVE'):
44         socket.TCP_KEEPALIVE = 0x010
45     if not hasattr(socket, 'TCP_KEEPINTVL'):
46         socket.TCP_KEEPINTVL = 0x101
47     if not hasattr(socket, 'TCP_KEEPCNT'):
48         socket.TCP_KEEPCNT = 0x102
49
50 class KeepLocator(object):
51     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53
54     def __init__(self, locator_str):
55         self.hints = []
56         self._perm_sig = None
57         self._perm_expiry = None
58         pieces = iter(locator_str.split('+'))
59         self.md5sum = next(pieces)
60         try:
61             self.size = int(next(pieces))
62         except StopIteration:
63             self.size = None
64         for hint in pieces:
65             if self.HINT_RE.match(hint) is None:
66                 raise ValueError("invalid hint format: {}".format(hint))
67             elif hint.startswith('A'):
68                 self.parse_permission_hint(hint)
69             else:
70                 self.hints.append(hint)
71
72     def __str__(self):
73         return '+'.join(
74             str(s)
75             for s in [self.md5sum, self.size,
76                       self.permission_hint()] + self.hints
77             if s is not None)
78
79     def stripped(self):
80         if self.size is not None:
81             return "%s+%i" % (self.md5sum, self.size)
82         else:
83             return self.md5sum
84
85     def _make_hex_prop(name, length):
86         # Build and return a new property with the given name that
87         # must be a hex string of the given length.
88         data_name = '_{}'.format(name)
89         def getter(self):
90             return getattr(self, data_name)
91         def setter(self, hex_str):
92             if not arvados.util.is_hex(hex_str, length):
93                 raise ValueError("{} is not a {}-digit hex string: {!r}".
94                                  format(name, length, hex_str))
95             setattr(self, data_name, hex_str)
96         return property(getter, setter)
97
98     md5sum = _make_hex_prop('md5sum', 32)
99     perm_sig = _make_hex_prop('perm_sig', 40)
100
101     @property
102     def perm_expiry(self):
103         return self._perm_expiry
104
105     @perm_expiry.setter
106     def perm_expiry(self, value):
107         if not arvados.util.is_hex(value, 1, 8):
108             raise ValueError(
109                 "permission timestamp must be a hex Unix timestamp: {}".
110                 format(value))
111         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
112
113     def permission_hint(self):
114         data = [self.perm_sig, self.perm_expiry]
115         if None in data:
116             return None
117         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118         return "A{}@{:08x}".format(*data)
119
120     def parse_permission_hint(self, s):
121         try:
122             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
123         except IndexError:
124             raise ValueError("bad permission hint {}".format(s))
125
126     def permission_expired(self, as_of_dt=None):
127         if self.perm_expiry is None:
128             return False
129         elif as_of_dt is None:
130             as_of_dt = datetime.datetime.now()
131         return self.perm_expiry <= as_of_dt
132
133
134 class Keep(object):
135     """Simple interface to a global KeepClient object.
136
137     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
138     own API client.  The global KeepClient will build an API client from the
139     current Arvados configuration, which may not match the one you built.
140     """
141     _last_key = None
142
143     @classmethod
144     def global_client_object(cls):
145         global global_client_object
146         # Previously, KeepClient would change its behavior at runtime based
147         # on these configuration settings.  We simulate that behavior here
148         # by checking the values and returning a new KeepClient if any of
149         # them have changed.
150         key = (config.get('ARVADOS_API_HOST'),
151                config.get('ARVADOS_API_TOKEN'),
152                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
153                config.get('ARVADOS_KEEP_PROXY'),
154                os.environ.get('KEEP_LOCAL_STORE'))
155         if (global_client_object is None) or (cls._last_key != key):
156             global_client_object = KeepClient()
157             cls._last_key = key
158         return global_client_object
159
160     @staticmethod
161     def get(locator, **kwargs):
162         return Keep.global_client_object().get(locator, **kwargs)
163
164     @staticmethod
165     def put(data, **kwargs):
166         return Keep.global_client_object().put(data, **kwargs)
167
168 class KeepBlockCache(object):
169     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
170         self.cache_max = cache_max
171         self._cache = collections.OrderedDict()
172         self._cache_lock = threading.Lock()
173         self._max_slots = max_slots
174         self._disk_cache = disk_cache
175         self._disk_cache_dir = disk_cache_dir
176         self._cache_updating = threading.Condition(self._cache_lock)
177
178         if self._disk_cache and self._disk_cache_dir is None:
179             cache_path = arvados.util._BaseDirectories('CACHE').storage_path() / 'keep'
180             cache_path.mkdir(mode=0o700, exist_ok=True)
181             self._disk_cache_dir = str(cache_path)
182
183         if self._max_slots == 0:
184             if self._disk_cache:
185                 # Each block uses two file descriptors, one used to
186                 # open it initially and hold the flock(), and a second
187                 # hidden one used by mmap().
188                 #
189                 # Set max slots to 1/8 of maximum file handles.  This
190                 # means we'll use at most 1/4 of total file handles.
191                 #
192                 # NOFILE typically defaults to 1024 on Linux so this
193                 # is 128 slots (256 file handles), which means we can
194                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
195                 # 768 file handles for sockets and other stuff.
196                 #
197                 # When we want the ability to have more cache (e.g. in
198                 # arv-mount) we'll increase rlimit before calling
199                 # this.
200                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
201             else:
202                 # RAM cache slots
203                 self._max_slots = 512
204
205         if self.cache_max == 0:
206             if self._disk_cache:
207                 fs = os.statvfs(self._disk_cache_dir)
208                 # Calculation of available space incorporates existing cache usage
209                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
210                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
211                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
212                 # pick smallest of:
213                 # 10% of total disk size
214                 # 25% of available space
215                 # max_slots * 64 MiB
216                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
217             else:
218                 # 256 MiB in RAM
219                 self.cache_max = (256 * 1024 * 1024)
220
221         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
222
223         self.cache_total = 0
224         if self._disk_cache:
225             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
226             for slot in self._cache.values():
227                 self.cache_total += slot.size()
228             self.cap_cache()
229
230     class CacheSlot(object):
231         __slots__ = ("locator", "ready", "content")
232
233         def __init__(self, locator):
234             self.locator = locator
235             self.ready = threading.Event()
236             self.content = None
237
238         def get(self):
239             self.ready.wait()
240             return self.content
241
242         def set(self, value):
243             if self.content is not None:
244                 return False
245             self.content = value
246             self.ready.set()
247             return True
248
249         def size(self):
250             if self.content is None:
251                 return 0
252             else:
253                 return len(self.content)
254
255         def evict(self):
256             self.content = None
257
258
259     def _resize_cache(self, cache_max, max_slots):
260         # Try and make sure the contents of the cache do not exceed
261         # the supplied maximums.
262
263         if self.cache_total <= cache_max and len(self._cache) <= max_slots:
264             return
265
266         _evict_candidates = collections.deque(self._cache.values())
267         while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
268             slot = _evict_candidates.popleft()
269             if not slot.ready.is_set():
270                 continue
271
272             sz = slot.size()
273             slot.evict()
274             self.cache_total -= sz
275             del self._cache[slot.locator]
276
277
278     def cap_cache(self):
279         '''Cap the cache size to self.cache_max'''
280         with self._cache_updating:
281             self._resize_cache(self.cache_max, self._max_slots)
282             self._cache_updating.notify_all()
283
284     def _get(self, locator):
285         # Test if the locator is already in the cache
286         if locator in self._cache:
287             n = self._cache[locator]
288             if n.ready.is_set() and n.content is None:
289                 del self._cache[n.locator]
290                 return None
291             self._cache.move_to_end(locator)
292             return n
293         if self._disk_cache:
294             # see if it exists on disk
295             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
296             if n is not None:
297                 self._cache[n.locator] = n
298                 self.cache_total += n.size()
299                 return n
300         return None
301
302     def get(self, locator):
303         with self._cache_lock:
304             return self._get(locator)
305
306     def reserve_cache(self, locator):
307         '''Reserve a cache slot for the specified locator,
308         or return the existing slot.'''
309         with self._cache_updating:
310             n = self._get(locator)
311             if n:
312                 return n, False
313             else:
314                 # Add a new cache slot for the locator
315                 self._resize_cache(self.cache_max, self._max_slots-1)
316                 while len(self._cache) >= self._max_slots:
317                     # If there isn't a slot available, need to wait
318                     # for something to happen that releases one of the
319                     # cache slots.  Idle for 200 ms or woken up by
320                     # another thread
321                     self._cache_updating.wait(timeout=0.2)
322                     self._resize_cache(self.cache_max, self._max_slots-1)
323
324                 if self._disk_cache:
325                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
326                 else:
327                     n = KeepBlockCache.CacheSlot(locator)
328                 self._cache[n.locator] = n
329                 return n, True
330
331     def set(self, slot, blob):
332         try:
333             if slot.set(blob):
334                 self.cache_total += slot.size()
335             return
336         except OSError as e:
337             if e.errno == errno.ENOMEM:
338                 # Reduce max slots to current - 4, cap cache and retry
339                 with self._cache_lock:
340                     self._max_slots = max(4, len(self._cache) - 4)
341             elif e.errno == errno.ENOSPC:
342                 # Reduce disk max space to current - 256 MiB, cap cache and retry
343                 with self._cache_lock:
344                     sm = sum(st.size() for st in self._cache.values())
345                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
346             elif e.errno == errno.ENODEV:
347                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
348         except Exception as e:
349             pass
350         finally:
351             # Check if we should evict things from the cache.  Either
352             # because we added a new thing or there was an error and
353             # we possibly adjusted the limits down, so we might need
354             # to push something out.
355             self.cap_cache()
356
357         try:
358             # Only gets here if there was an error the first time. The
359             # exception handler adjusts limits downward in some cases
360             # to free up resources, which would make the operation
361             # succeed.
362             if slot.set(blob):
363                 self.cache_total += slot.size()
364         except Exception as e:
365             # It failed again.  Give up.
366             slot.set(None)
367             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
368
369         self.cap_cache()
370
371 class Counter(object):
372     def __init__(self, v=0):
373         self._lk = threading.Lock()
374         self._val = v
375
376     def add(self, v):
377         with self._lk:
378             self._val += v
379
380     def get(self):
381         with self._lk:
382             return self._val
383
384
385 class KeepClient(object):
386     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
387     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
388
389     class KeepService(PyCurlHelper):
390         """Make requests to a single Keep service, and track results.
391
392         A KeepService is intended to last long enough to perform one
393         transaction (GET or PUT) against one Keep service. This can
394         involve calling either get() or put() multiple times in order
395         to retry after transient failures. However, calling both get()
396         and put() on a single instance -- or using the same instance
397         to access two different Keep services -- will not produce
398         sensible behavior.
399         """
400
401         HTTP_ERRORS = (
402             socket.error,
403             ssl.SSLError,
404             arvados.errors.HttpError,
405         )
406
407         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
408                      upload_counter=None,
409                      download_counter=None,
410                      headers={},
411                      insecure=False):
412             super(KeepClient.KeepService, self).__init__()
413             self.root = root
414             self._user_agent_pool = user_agent_pool
415             self._result = {'error': None}
416             self._usable = True
417             self._session = None
418             self._socket = None
419             self.get_headers = {'Accept': 'application/octet-stream'}
420             self.get_headers.update(headers)
421             self.put_headers = headers
422             self.upload_counter = upload_counter
423             self.download_counter = download_counter
424             self.insecure = insecure
425
426         def usable(self):
427             """Is it worth attempting a request?"""
428             return self._usable
429
430         def finished(self):
431             """Did the request succeed or encounter permanent failure?"""
432             return self._result['error'] == False or not self._usable
433
434         def last_result(self):
435             return self._result
436
437         def _get_user_agent(self):
438             try:
439                 return self._user_agent_pool.get(block=False)
440             except queue.Empty:
441                 return pycurl.Curl()
442
443         def _put_user_agent(self, ua):
444             try:
445                 ua.reset()
446                 self._user_agent_pool.put(ua, block=False)
447             except:
448                 ua.close()
449
450         def get(self, locator, method="GET", timeout=None):
451             # locator is a KeepLocator object.
452             url = self.root + str(locator)
453             _logger.debug("Request: %s %s", method, url)
454             curl = self._get_user_agent()
455             ok = None
456             try:
457                 with timer.Timer() as t:
458                     self._headers = {}
459                     response_body = BytesIO()
460                     curl.setopt(pycurl.NOSIGNAL, 1)
461                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
462                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
463                     curl.setopt(pycurl.URL, url.encode('utf-8'))
464                     curl.setopt(pycurl.HTTPHEADER, [
465                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
466                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
467                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
468                     if self.insecure:
469                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
470                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
471                     else:
472                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
473                     if method == "HEAD":
474                         curl.setopt(pycurl.NOBODY, True)
475                     else:
476                         curl.setopt(pycurl.HTTPGET, True)
477                     self._setcurltimeouts(curl, timeout, method=="HEAD")
478
479                     try:
480                         curl.perform()
481                     except Exception as e:
482                         raise arvados.errors.HttpError(0, str(e))
483                     finally:
484                         if self._socket:
485                             self._socket.close()
486                             self._socket = None
487                     self._result = {
488                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
489                         'body': response_body.getvalue(),
490                         'headers': self._headers,
491                         'error': False,
492                     }
493
494                 ok = retry.check_http_response_success(self._result['status_code'])
495                 if not ok:
496                     self._result['error'] = arvados.errors.HttpError(
497                         self._result['status_code'],
498                         self._headers.get('x-status-line', 'Error'))
499             except self.HTTP_ERRORS as e:
500                 self._result = {
501                     'error': e,
502                 }
503             self._usable = ok != False
504             if self._result.get('status_code', None):
505                 # The client worked well enough to get an HTTP status
506                 # code, so presumably any problems are just on the
507                 # server side and it's OK to reuse the client.
508                 self._put_user_agent(curl)
509             else:
510                 # Don't return this client to the pool, in case it's
511                 # broken.
512                 curl.close()
513             if not ok:
514                 _logger.debug("Request fail: GET %s => %s: %s",
515                               url, type(self._result['error']), str(self._result['error']))
516                 return None
517             if method == "HEAD":
518                 _logger.info("HEAD %s: %s bytes",
519                          self._result['status_code'],
520                          self._result.get('content-length'))
521                 if self._result['headers'].get('x-keep-locator'):
522                     # This is a response to a remote block copy request, return
523                     # the local copy block locator.
524                     return self._result['headers'].get('x-keep-locator')
525                 return True
526
527             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
528                          self._result['status_code'],
529                          len(self._result['body']),
530                          t.msecs,
531                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
532
533             if self.download_counter:
534                 self.download_counter.add(len(self._result['body']))
535             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
536             if resp_md5 != locator.md5sum:
537                 _logger.warning("Checksum fail: md5(%s) = %s",
538                                 url, resp_md5)
539                 self._result['error'] = arvados.errors.HttpError(
540                     0, 'Checksum fail')
541                 return None
542             return self._result['body']
543
544         def put(self, hash_s, body, timeout=None, headers={}):
545             put_headers = copy.copy(self.put_headers)
546             put_headers.update(headers)
547             url = self.root + hash_s
548             _logger.debug("Request: PUT %s", url)
549             curl = self._get_user_agent()
550             ok = None
551             try:
552                 with timer.Timer() as t:
553                     self._headers = {}
554                     body_reader = BytesIO(body)
555                     response_body = BytesIO()
556                     curl.setopt(pycurl.NOSIGNAL, 1)
557                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
558                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
559                     curl.setopt(pycurl.URL, url.encode('utf-8'))
560                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
561                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
562                     # response) instead of sending the request body immediately.
563                     # This allows the server to reject the request if the request
564                     # is invalid or the server is read-only, without waiting for
565                     # the client to send the entire block.
566                     curl.setopt(pycurl.UPLOAD, True)
567                     curl.setopt(pycurl.INFILESIZE, len(body))
568                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
569                     curl.setopt(pycurl.HTTPHEADER, [
570                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
571                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
572                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
573                     if self.insecure:
574                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
575                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
576                     else:
577                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
578                     self._setcurltimeouts(curl, timeout)
579                     try:
580                         curl.perform()
581                     except Exception as e:
582                         raise arvados.errors.HttpError(0, str(e))
583                     finally:
584                         if self._socket:
585                             self._socket.close()
586                             self._socket = None
587                     self._result = {
588                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
589                         'body': response_body.getvalue().decode('utf-8'),
590                         'headers': self._headers,
591                         'error': False,
592                     }
593                 ok = retry.check_http_response_success(self._result['status_code'])
594                 if not ok:
595                     self._result['error'] = arvados.errors.HttpError(
596                         self._result['status_code'],
597                         self._headers.get('x-status-line', 'Error'))
598             except self.HTTP_ERRORS as e:
599                 self._result = {
600                     'error': e,
601                 }
602             self._usable = ok != False # still usable if ok is True or None
603             if self._result.get('status_code', None):
604                 # Client is functional. See comment in get().
605                 self._put_user_agent(curl)
606             else:
607                 curl.close()
608             if not ok:
609                 _logger.debug("Request fail: PUT %s => %s: %s",
610                               url, type(self._result['error']), str(self._result['error']))
611                 return False
612             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
613                          self._result['status_code'],
614                          len(body),
615                          t.msecs,
616                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
617             if self.upload_counter:
618                 self.upload_counter.add(len(body))
619             return True
620
621
622     class KeepWriterQueue(queue.Queue):
623         def __init__(self, copies, classes=[]):
624             queue.Queue.__init__(self) # Old-style superclass
625             self.wanted_copies = copies
626             self.wanted_storage_classes = classes
627             self.successful_copies = 0
628             self.confirmed_storage_classes = {}
629             self.response = None
630             self.storage_classes_tracking = True
631             self.queue_data_lock = threading.RLock()
632             self.pending_tries = max(copies, len(classes))
633             self.pending_tries_notification = threading.Condition()
634
635         def write_success(self, response, replicas_nr, classes_confirmed):
636             with self.queue_data_lock:
637                 self.successful_copies += replicas_nr
638                 if classes_confirmed is None:
639                     self.storage_classes_tracking = False
640                 elif self.storage_classes_tracking:
641                     for st_class, st_copies in classes_confirmed.items():
642                         try:
643                             self.confirmed_storage_classes[st_class] += st_copies
644                         except KeyError:
645                             self.confirmed_storage_classes[st_class] = st_copies
646                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
647                 self.response = response
648             with self.pending_tries_notification:
649                 self.pending_tries_notification.notify_all()
650
651         def write_fail(self, ks):
652             with self.pending_tries_notification:
653                 self.pending_tries += 1
654                 self.pending_tries_notification.notify()
655
656         def pending_copies(self):
657             with self.queue_data_lock:
658                 return self.wanted_copies - self.successful_copies
659
660         def satisfied_classes(self):
661             with self.queue_data_lock:
662                 if not self.storage_classes_tracking:
663                     # Notifies disabled storage classes expectation to
664                     # the outer loop.
665                     return None
666             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
667
668         def pending_classes(self):
669             with self.queue_data_lock:
670                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
671                     return []
672                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
673                 for st_class, st_copies in self.confirmed_storage_classes.items():
674                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
675                         unsatisfied_classes.remove(st_class)
676                 return unsatisfied_classes
677
678         def get_next_task(self):
679             with self.pending_tries_notification:
680                 while True:
681                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
682                         # This notify_all() is unnecessary --
683                         # write_success() already called notify_all()
684                         # when pending<1 became true, so it's not
685                         # possible for any other thread to be in
686                         # wait() now -- but it's cheap insurance
687                         # against deadlock so we do it anyway:
688                         self.pending_tries_notification.notify_all()
689                         # Drain the queue and then raise Queue.Empty
690                         while True:
691                             self.get_nowait()
692                             self.task_done()
693                     elif self.pending_tries > 0:
694                         service, service_root = self.get_nowait()
695                         if service.finished():
696                             self.task_done()
697                             continue
698                         self.pending_tries -= 1
699                         return service, service_root
700                     elif self.empty():
701                         self.pending_tries_notification.notify_all()
702                         raise queue.Empty
703                     else:
704                         self.pending_tries_notification.wait()
705
706
707     class KeepWriterThreadPool(object):
708         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
709             self.total_task_nr = 0
710             if (not max_service_replicas) or (max_service_replicas >= copies):
711                 num_threads = 1
712             else:
713                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
714             _logger.debug("Pool max threads is %d", num_threads)
715             self.workers = []
716             self.queue = KeepClient.KeepWriterQueue(copies, classes)
717             # Create workers
718             for _ in range(num_threads):
719                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
720                 self.workers.append(w)
721
722         def add_task(self, ks, service_root):
723             self.queue.put((ks, service_root))
724             self.total_task_nr += 1
725
726         def done(self):
727             return self.queue.successful_copies, self.queue.satisfied_classes()
728
729         def join(self):
730             # Start workers
731             for worker in self.workers:
732                 worker.start()
733             # Wait for finished work
734             self.queue.join()
735
736         def response(self):
737             return self.queue.response
738
739
740     class KeepWriterThread(threading.Thread):
741         class TaskFailed(RuntimeError): pass
742
743         def __init__(self, queue, data, data_hash, timeout=None):
744             super(KeepClient.KeepWriterThread, self).__init__()
745             self.timeout = timeout
746             self.queue = queue
747             self.data = data
748             self.data_hash = data_hash
749             self.daemon = True
750
751         def run(self):
752             while True:
753                 try:
754                     service, service_root = self.queue.get_next_task()
755                 except queue.Empty:
756                     return
757                 try:
758                     locator, copies, classes = self.do_task(service, service_root)
759                 except Exception as e:
760                     if not isinstance(e, self.TaskFailed):
761                         _logger.exception("Exception in KeepWriterThread")
762                     self.queue.write_fail(service)
763                 else:
764                     self.queue.write_success(locator, copies, classes)
765                 finally:
766                     self.queue.task_done()
767
768         def do_task(self, service, service_root):
769             classes = self.queue.pending_classes()
770             headers = {}
771             if len(classes) > 0:
772                 classes.sort()
773                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
774             success = bool(service.put(self.data_hash,
775                                         self.data,
776                                         timeout=self.timeout,
777                                         headers=headers))
778             result = service.last_result()
779
780             if not success:
781                 if result.get('status_code'):
782                     _logger.debug("Request fail: PUT %s => %s %s",
783                                   self.data_hash,
784                                   result.get('status_code'),
785                                   result.get('body'))
786                 raise self.TaskFailed()
787
788             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
789                           str(threading.current_thread()),
790                           self.data_hash,
791                           len(self.data),
792                           service_root)
793             try:
794                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
795             except (KeyError, ValueError):
796                 replicas_stored = 1
797
798             classes_confirmed = {}
799             try:
800                 scch = result['headers']['x-keep-storage-classes-confirmed']
801                 for confirmation in scch.replace(' ', '').split(','):
802                     if '=' in confirmation:
803                         stored_class, stored_copies = confirmation.split('=')[:2]
804                         classes_confirmed[stored_class] = int(stored_copies)
805             except (KeyError, ValueError):
806                 # Storage classes confirmed header missing or corrupt
807                 classes_confirmed = None
808
809             return result['body'].strip(), replicas_stored, classes_confirmed
810
811
812     def __init__(self, api_client=None, proxy=None,
813                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
814                  api_token=None, local_store=None, block_cache=None,
815                  num_retries=10, session=None, num_prefetch_threads=None):
816         """Initialize a new KeepClient.
817
818         Arguments:
819         :api_client:
820           The API client to use to find Keep services.  If not
821           provided, KeepClient will build one from available Arvados
822           configuration.
823
824         :proxy:
825           If specified, this KeepClient will send requests to this Keep
826           proxy.  Otherwise, KeepClient will fall back to the setting of the
827           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
828           If you want to KeepClient does not use a proxy, pass in an empty
829           string.
830
831         :timeout:
832           The initial timeout (in seconds) for HTTP requests to Keep
833           non-proxy servers.  A tuple of three floats is interpreted as
834           (connection_timeout, read_timeout, minimum_bandwidth). A connection
835           will be aborted if the average traffic rate falls below
836           minimum_bandwidth bytes per second over an interval of read_timeout
837           seconds. Because timeouts are often a result of transient server
838           load, the actual connection timeout will be increased by a factor
839           of two on each retry.
840           Default: (2, 256, 32768).
841
842         :proxy_timeout:
843           The initial timeout (in seconds) for HTTP requests to
844           Keep proxies. A tuple of three floats is interpreted as
845           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
846           described above for adjusting connection timeouts on retry also
847           applies.
848           Default: (20, 256, 32768).
849
850         :api_token:
851           If you're not using an API client, but only talking
852           directly to a Keep proxy, this parameter specifies an API token
853           to authenticate Keep requests.  It is an error to specify both
854           api_client and api_token.  If you specify neither, KeepClient
855           will use one available from the Arvados configuration.
856
857         :local_store:
858           If specified, this KeepClient will bypass Keep
859           services, and save data to the named directory.  If unspecified,
860           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
861           environment variable.  If you want to ensure KeepClient does not
862           use local storage, pass in an empty string.  This is primarily
863           intended to mock a server for testing.
864
865         :num_retries:
866           The default number of times to retry failed requests.
867           This will be used as the default num_retries value when get() and
868           put() are called.  Default 10.
869         """
870         self.lock = threading.Lock()
871         if proxy is None:
872             if config.get('ARVADOS_KEEP_SERVICES'):
873                 proxy = config.get('ARVADOS_KEEP_SERVICES')
874             else:
875                 proxy = config.get('ARVADOS_KEEP_PROXY')
876         if api_token is None:
877             if api_client is None:
878                 api_token = config.get('ARVADOS_API_TOKEN')
879             else:
880                 api_token = api_client.api_token
881         elif api_client is not None:
882             raise ValueError(
883                 "can't build KeepClient with both API client and token")
884         if local_store is None:
885             local_store = os.environ.get('KEEP_LOCAL_STORE')
886
887         if api_client is None:
888             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
889         else:
890             self.insecure = api_client.insecure
891
892         self.block_cache = block_cache if block_cache else KeepBlockCache()
893         self.timeout = timeout
894         self.proxy_timeout = proxy_timeout
895         self._user_agent_pool = queue.LifoQueue()
896         self.upload_counter = Counter()
897         self.download_counter = Counter()
898         self.put_counter = Counter()
899         self.get_counter = Counter()
900         self.hits_counter = Counter()
901         self.misses_counter = Counter()
902         self._storage_classes_unsupported_warning = False
903         self._default_classes = []
904         if num_prefetch_threads is not None:
905             self.num_prefetch_threads = num_prefetch_threads
906         else:
907             self.num_prefetch_threads = 2
908         self._prefetch_queue = None
909         self._prefetch_threads = None
910
911         if local_store:
912             self.local_store = local_store
913             self.head = self.local_store_head
914             self.get = self.local_store_get
915             self.put = self.local_store_put
916         else:
917             self.num_retries = num_retries
918             self.max_replicas_per_service = None
919             if proxy:
920                 proxy_uris = proxy.split()
921                 for i in range(len(proxy_uris)):
922                     if not proxy_uris[i].endswith('/'):
923                         proxy_uris[i] += '/'
924                     # URL validation
925                     url = urllib.parse.urlparse(proxy_uris[i])
926                     if not (url.scheme and url.netloc):
927                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
928                 self.api_token = api_token
929                 self._gateway_services = {}
930                 self._keep_services = [{
931                     'uuid': "00000-bi6l4-%015d" % idx,
932                     'service_type': 'proxy',
933                     '_service_root': uri,
934                     } for idx, uri in enumerate(proxy_uris)]
935                 self._writable_services = self._keep_services
936                 self.using_proxy = True
937                 self._static_services_list = True
938             else:
939                 # It's important to avoid instantiating an API client
940                 # unless we actually need one, for testing's sake.
941                 if api_client is None:
942                     api_client = arvados.api('v1')
943                 self.api_client = api_client
944                 self.api_token = api_client.api_token
945                 self._gateway_services = {}
946                 self._keep_services = None
947                 self._writable_services = None
948                 self.using_proxy = None
949                 self._static_services_list = False
950                 try:
951                     self._default_classes = [
952                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
953                 except KeyError:
954                     # We're talking to an old cluster
955                     pass
956
957     def current_timeout(self, attempt_number):
958         """Return the appropriate timeout to use for this client.
959
960         The proxy timeout setting if the backend service is currently a proxy,
961         the regular timeout setting otherwise.  The `attempt_number` indicates
962         how many times the operation has been tried already (starting from 0
963         for the first try), and scales the connection timeout portion of the
964         return value accordingly.
965
966         """
967         # TODO(twp): the timeout should be a property of a
968         # KeepService, not a KeepClient. See #4488.
969         t = self.proxy_timeout if self.using_proxy else self.timeout
970         if len(t) == 2:
971             return (t[0] * (1 << attempt_number), t[1])
972         else:
973             return (t[0] * (1 << attempt_number), t[1], t[2])
974     def _any_nondisk_services(self, service_list):
975         return any(ks.get('service_type', 'disk') != 'disk'
976                    for ks in service_list)
977
978     def build_services_list(self, force_rebuild=False):
979         if (self._static_services_list or
980               (self._keep_services and not force_rebuild)):
981             return
982         with self.lock:
983             try:
984                 keep_services = self.api_client.keep_services().accessible()
985             except Exception:  # API server predates Keep services.
986                 keep_services = self.api_client.keep_disks().list()
987
988             # Gateway services are only used when specified by UUID,
989             # so there's nothing to gain by filtering them by
990             # service_type.
991             self._gateway_services = {ks['uuid']: ks for ks in
992                                       keep_services.execute()['items']}
993             if not self._gateway_services:
994                 raise arvados.errors.NoKeepServersError()
995
996             # Precompute the base URI for each service.
997             for r in self._gateway_services.values():
998                 host = r['service_host']
999                 if not host.startswith('[') and host.find(':') >= 0:
1000                     # IPv6 URIs must be formatted like http://[::1]:80/...
1001                     host = '[' + host + ']'
1002                 r['_service_root'] = "{}://{}:{:d}/".format(
1003                     'https' if r['service_ssl_flag'] else 'http',
1004                     host,
1005                     r['service_port'])
1006
1007             _logger.debug(str(self._gateway_services))
1008             self._keep_services = [
1009                 ks for ks in self._gateway_services.values()
1010                 if not ks.get('service_type', '').startswith('gateway:')]
1011             self._writable_services = [ks for ks in self._keep_services
1012                                        if not ks.get('read_only')]
1013
1014             # For disk type services, max_replicas_per_service is 1
1015             # It is unknown (unlimited) for other service types.
1016             if self._any_nondisk_services(self._writable_services):
1017                 self.max_replicas_per_service = None
1018             else:
1019                 self.max_replicas_per_service = 1
1020
1021     def _service_weight(self, data_hash, service_uuid):
1022         """Compute the weight of a Keep service endpoint for a data
1023         block with a known hash.
1024
1025         The weight is md5(h + u) where u is the last 15 characters of
1026         the service endpoint's UUID.
1027         """
1028         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1029
1030     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1031         """Return an array of Keep service endpoints, in the order in
1032         which they should be probed when reading or writing data with
1033         the given hash+hints.
1034         """
1035         self.build_services_list(force_rebuild)
1036
1037         sorted_roots = []
1038         # Use the services indicated by the given +K@... remote
1039         # service hints, if any are present and can be resolved to a
1040         # URI.
1041         for hint in locator.hints:
1042             if hint.startswith('K@'):
1043                 if len(hint) == 7:
1044                     sorted_roots.append(
1045                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1046                 elif len(hint) == 29:
1047                     svc = self._gateway_services.get(hint[2:])
1048                     if svc:
1049                         sorted_roots.append(svc['_service_root'])
1050
1051         # Sort the available local services by weight (heaviest first)
1052         # for this locator, and return their service_roots (base URIs)
1053         # in that order.
1054         use_services = self._keep_services
1055         if need_writable:
1056             use_services = self._writable_services
1057         self.using_proxy = self._any_nondisk_services(use_services)
1058         sorted_roots.extend([
1059             svc['_service_root'] for svc in sorted(
1060                 use_services,
1061                 reverse=True,
1062                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1063         _logger.debug("{}: {}".format(locator, sorted_roots))
1064         return sorted_roots
1065
1066     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1067         # roots_map is a dictionary, mapping Keep service root strings
1068         # to KeepService objects.  Poll for Keep services, and add any
1069         # new ones to roots_map.  Return the current list of local
1070         # root strings.
1071         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1072         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1073         for root in local_roots:
1074             if root not in roots_map:
1075                 roots_map[root] = self.KeepService(
1076                     root, self._user_agent_pool,
1077                     upload_counter=self.upload_counter,
1078                     download_counter=self.download_counter,
1079                     headers=headers,
1080                     insecure=self.insecure)
1081         return local_roots
1082
1083     @staticmethod
1084     def _check_loop_result(result):
1085         # KeepClient RetryLoops should save results as a 2-tuple: the
1086         # actual result of the request, and the number of servers available
1087         # to receive the request this round.
1088         # This method returns True if there's a real result, False if
1089         # there are no more servers available, otherwise None.
1090         if isinstance(result, Exception):
1091             return None
1092         result, tried_server_count = result
1093         if (result is not None) and (result is not False):
1094             return True
1095         elif tried_server_count < 1:
1096             _logger.info("No more Keep services to try; giving up")
1097             return False
1098         else:
1099             return None
1100
1101     def get_from_cache(self, loc_s):
1102         """Fetch a block only if is in the cache, otherwise return None."""
1103         locator = KeepLocator(loc_s)
1104         slot = self.block_cache.get(locator.md5sum)
1105         if slot is not None and slot.ready.is_set():
1106             return slot.get()
1107         else:
1108             return None
1109
1110     def refresh_signature(self, loc):
1111         """Ask Keep to get the remote block and return its local signature"""
1112         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1113         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1114
1115     @retry.retry_method
1116     def head(self, loc_s, **kwargs):
1117         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1118
1119     @retry.retry_method
1120     def get(self, loc_s, **kwargs):
1121         return self._get_or_head(loc_s, method="GET", **kwargs)
1122
1123     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1124         """Get data from Keep.
1125
1126         This method fetches one or more blocks of data from Keep.  It
1127         sends a request each Keep service registered with the API
1128         server (or the proxy provided when this client was
1129         instantiated), then each service named in location hints, in
1130         sequence.  As soon as one service provides the data, it's
1131         returned.
1132
1133         Arguments:
1134         * loc_s: A string of one or more comma-separated locators to fetch.
1135           This method returns the concatenation of these blocks.
1136         * num_retries: The number of times to retry GET requests to
1137           *each* Keep server if it returns temporary failures, with
1138           exponential backoff.  Note that, in each loop, the method may try
1139           to fetch data from every available Keep service, along with any
1140           that are named in location hints in the locator.  The default value
1141           is set when the KeepClient is initialized.
1142         """
1143         if ',' in loc_s:
1144             return ''.join(self.get(x) for x in loc_s.split(','))
1145
1146         self.get_counter.add(1)
1147
1148         request_id = (request_id or
1149                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1150                       arvados.util.new_request_id())
1151         if headers is None:
1152             headers = {}
1153         headers['X-Request-Id'] = request_id
1154
1155         slot = None
1156         blob = None
1157         try:
1158             locator = KeepLocator(loc_s)
1159             if method == "GET":
1160                 while slot is None:
1161                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1162                     if first:
1163                         # Fresh and empty "first time it is used" slot
1164                         break
1165                     if prefetch:
1166                         # this is request for a prefetch to fill in
1167                         # the cache, don't need to wait for the
1168                         # result, so if it is already in flight return
1169                         # immediately.  Clear 'slot' to prevent
1170                         # finally block from calling slot.set()
1171                         if slot.ready.is_set():
1172                             slot.get()
1173                         slot = None
1174                         return None
1175
1176                     blob = slot.get()
1177                     if blob is not None:
1178                         self.hits_counter.add(1)
1179                         return blob
1180
1181                     # If blob is None, this means either
1182                     #
1183                     # (a) another thread was fetching this block and
1184                     # failed with an error or
1185                     #
1186                     # (b) cache thrashing caused the slot to be
1187                     # evicted (content set to None) by another thread
1188                     # between the call to reserve_cache() and get().
1189                     #
1190                     # We'll handle these cases by reserving a new slot
1191                     # and then doing a full GET request.
1192                     slot = None
1193
1194             self.misses_counter.add(1)
1195
1196             # If the locator has hints specifying a prefix (indicating a
1197             # remote keepproxy) or the UUID of a local gateway service,
1198             # read data from the indicated service(s) instead of the usual
1199             # list of local disk services.
1200             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1201                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1202             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1203                                for hint in locator.hints if (
1204                                        hint.startswith('K@') and
1205                                        len(hint) == 29 and
1206                                        self._gateway_services.get(hint[2:])
1207                                        )])
1208             # Map root URLs to their KeepService objects.
1209             roots_map = {
1210                 root: self.KeepService(root, self._user_agent_pool,
1211                                        upload_counter=self.upload_counter,
1212                                        download_counter=self.download_counter,
1213                                        headers=headers,
1214                                        insecure=self.insecure)
1215                 for root in hint_roots
1216             }
1217
1218             # See #3147 for a discussion of the loop implementation.  Highlights:
1219             # * Refresh the list of Keep services after each failure, in case
1220             #   it's being updated.
1221             # * Retry until we succeed, we're out of retries, or every available
1222             #   service has returned permanent failure.
1223             sorted_roots = []
1224             roots_map = {}
1225             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1226                                    backoff_start=2)
1227             for tries_left in loop:
1228                 try:
1229                     sorted_roots = self.map_new_services(
1230                         roots_map, locator,
1231                         force_rebuild=(tries_left < num_retries),
1232                         need_writable=False,
1233                         headers=headers)
1234                 except Exception as error:
1235                     loop.save_result(error)
1236                     continue
1237
1238                 # Query KeepService objects that haven't returned
1239                 # permanent failure, in our specified shuffle order.
1240                 services_to_try = [roots_map[root]
1241                                    for root in sorted_roots
1242                                    if roots_map[root].usable()]
1243                 for keep_service in services_to_try:
1244                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1245                     if blob is not None:
1246                         break
1247                 loop.save_result((blob, len(services_to_try)))
1248
1249             # Always cache the result, then return it if we succeeded.
1250             if loop.success():
1251                 return blob
1252         finally:
1253             if slot is not None:
1254                 self.block_cache.set(slot, blob)
1255
1256         # Q: Including 403 is necessary for the Keep tests to continue
1257         # passing, but maybe they should expect KeepReadError instead?
1258         not_founds = sum(1 for key in sorted_roots
1259                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1260         service_errors = ((key, roots_map[key].last_result()['error'])
1261                           for key in sorted_roots)
1262         if not roots_map:
1263             raise arvados.errors.KeepReadError(
1264                 "[{}] failed to read {}: no Keep services available ({})".format(
1265                     request_id, loc_s, loop.last_result()))
1266         elif not_founds == len(sorted_roots):
1267             raise arvados.errors.NotFoundError(
1268                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1269         else:
1270             raise arvados.errors.KeepReadError(
1271                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1272
1273     @retry.retry_method
1274     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1275         """Save data in Keep.
1276
1277         This method will get a list of Keep services from the API server, and
1278         send the data to each one simultaneously in a new thread.  Once the
1279         uploads are finished, if enough copies are saved, this method returns
1280         the most recent HTTP response body.  If requests fail to upload
1281         enough copies, this method raises KeepWriteError.
1282
1283         Arguments:
1284         * data: The string of data to upload.
1285         * copies: The number of copies that the user requires be saved.
1286           Default 2.
1287         * num_retries: The number of times to retry PUT requests to
1288           *each* Keep server if it returns temporary failures, with
1289           exponential backoff.  The default value is set when the
1290           KeepClient is initialized.
1291         * classes: An optional list of storage class names where copies should
1292           be written.
1293         """
1294
1295         classes = classes or self._default_classes
1296
1297         if not isinstance(data, bytes):
1298             data = data.encode()
1299
1300         self.put_counter.add(1)
1301
1302         data_hash = hashlib.md5(data).hexdigest()
1303         loc_s = data_hash + '+' + str(len(data))
1304         if copies < 1:
1305             return loc_s
1306         locator = KeepLocator(loc_s)
1307
1308         request_id = (request_id or
1309                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1310                       arvados.util.new_request_id())
1311         headers = {
1312             'X-Request-Id': request_id,
1313             'X-Keep-Desired-Replicas': str(copies),
1314         }
1315         roots_map = {}
1316         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1317                                backoff_start=2)
1318         done_copies = 0
1319         done_classes = []
1320         for tries_left in loop:
1321             try:
1322                 sorted_roots = self.map_new_services(
1323                     roots_map, locator,
1324                     force_rebuild=(tries_left < num_retries),
1325                     need_writable=True,
1326                     headers=headers)
1327             except Exception as error:
1328                 loop.save_result(error)
1329                 continue
1330
1331             pending_classes = []
1332             if done_classes is not None:
1333                 pending_classes = list(set(classes) - set(done_classes))
1334             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1335                                                         data_hash=data_hash,
1336                                                         copies=copies - done_copies,
1337                                                         max_service_replicas=self.max_replicas_per_service,
1338                                                         timeout=self.current_timeout(num_retries - tries_left),
1339                                                         classes=pending_classes)
1340             for service_root, ks in [(root, roots_map[root])
1341                                      for root in sorted_roots]:
1342                 if ks.finished():
1343                     continue
1344                 writer_pool.add_task(ks, service_root)
1345             writer_pool.join()
1346             pool_copies, pool_classes = writer_pool.done()
1347             done_copies += pool_copies
1348             if (done_classes is not None) and (pool_classes is not None):
1349                 done_classes += pool_classes
1350                 loop.save_result(
1351                     (done_copies >= copies and set(done_classes) == set(classes),
1352                     writer_pool.total_task_nr))
1353             else:
1354                 # Old keepstore contacted without storage classes support:
1355                 # success is determined only by successful copies.
1356                 #
1357                 # Disable storage classes tracking from this point forward.
1358                 if not self._storage_classes_unsupported_warning:
1359                     self._storage_classes_unsupported_warning = True
1360                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1361                 done_classes = None
1362                 loop.save_result(
1363                     (done_copies >= copies, writer_pool.total_task_nr))
1364
1365         if loop.success():
1366             return writer_pool.response()
1367         if not roots_map:
1368             raise arvados.errors.KeepWriteError(
1369                 "[{}] failed to write {}: no Keep services available ({})".format(
1370                     request_id, data_hash, loop.last_result()))
1371         else:
1372             service_errors = ((key, roots_map[key].last_result()['error'])
1373                               for key in sorted_roots
1374                               if roots_map[key].last_result()['error'])
1375             raise arvados.errors.KeepWriteError(
1376                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1377                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1378
1379     def _block_prefetch_worker(self):
1380         """The background downloader thread."""
1381         while True:
1382             try:
1383                 b = self._prefetch_queue.get()
1384                 if b is None:
1385                     return
1386                 self.get(b, prefetch=True)
1387             except Exception:
1388                 _logger.exception("Exception doing block prefetch")
1389
1390     def _start_prefetch_threads(self):
1391         if self._prefetch_threads is None:
1392             with self.lock:
1393                 if self._prefetch_threads is not None:
1394                     return
1395                 self._prefetch_queue = queue.Queue()
1396                 self._prefetch_threads = []
1397                 for i in range(0, self.num_prefetch_threads):
1398                     thread = threading.Thread(target=self._block_prefetch_worker)
1399                     self._prefetch_threads.append(thread)
1400                     thread.daemon = True
1401                     thread.start()
1402
1403     def block_prefetch(self, locator):
1404         """
1405         This relies on the fact that KeepClient implements a block cache,
1406         so repeated requests for the same block will not result in repeated
1407         downloads (unless the block is evicted from the cache.)  This method
1408         does not block.
1409         """
1410
1411         if self.block_cache.get(locator) is not None:
1412             return
1413
1414         self._start_prefetch_threads()
1415         self._prefetch_queue.put(locator)
1416
1417     def stop_prefetch_threads(self):
1418         with self.lock:
1419             if self._prefetch_threads is not None:
1420                 for t in self._prefetch_threads:
1421                     self._prefetch_queue.put(None)
1422                 for t in self._prefetch_threads:
1423                     t.join()
1424             self._prefetch_threads = None
1425             self._prefetch_queue = None
1426
1427     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1428         """A stub for put().
1429
1430         This method is used in place of the real put() method when
1431         using local storage (see constructor's local_store argument).
1432
1433         copies and num_retries arguments are ignored: they are here
1434         only for the sake of offering the same call signature as
1435         put().
1436
1437         Data stored this way can be retrieved via local_store_get().
1438         """
1439         md5 = hashlib.md5(data).hexdigest()
1440         locator = '%s+%d' % (md5, len(data))
1441         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1442             f.write(data)
1443         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1444                   os.path.join(self.local_store, md5))
1445         return locator
1446
1447     def local_store_get(self, loc_s, num_retries=None):
1448         """Companion to local_store_put()."""
1449         try:
1450             locator = KeepLocator(loc_s)
1451         except ValueError:
1452             raise arvados.errors.NotFoundError(
1453                 "Invalid data locator: '%s'" % loc_s)
1454         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1455             return b''
1456         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1457             return f.read()
1458
1459     def local_store_head(self, loc_s, num_retries=None):
1460         """Companion to local_store_put()."""
1461         try:
1462             locator = KeepLocator(loc_s)
1463         except ValueError:
1464             raise arvados.errors.NotFoundError(
1465                 "Invalid data locator: '%s'" % loc_s)
1466         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1467             return True
1468         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1469             return True