18842: Respond to review comments
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import io
19 import logging
20 import math
21 import os
22 import pycurl
23 import queue
24 import re
25 import socket
26 import ssl
27 import sys
28 import threading
29 import resource
30 from . import timer
31 import urllib.parse
32
33 if sys.version_info >= (3, 0):
34     from io import BytesIO
35 else:
36     from cStringIO import StringIO as BytesIO
37
38 import arvados
39 import arvados.config as config
40 import arvados.errors
41 import arvados.retry as retry
42 import arvados.util
43 import arvados.diskcache
44
45 _logger = logging.getLogger('arvados.keep')
46 global_client_object = None
47
48
49 # Monkey patch TCP constants when not available (apple). Values sourced from:
50 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
51 if sys.platform == 'darwin':
52     if not hasattr(socket, 'TCP_KEEPALIVE'):
53         socket.TCP_KEEPALIVE = 0x010
54     if not hasattr(socket, 'TCP_KEEPINTVL'):
55         socket.TCP_KEEPINTVL = 0x101
56     if not hasattr(socket, 'TCP_KEEPCNT'):
57         socket.TCP_KEEPCNT = 0x102
58
59
60 class KeepLocator(object):
61     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
62     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
63
64     def __init__(self, locator_str):
65         self.hints = []
66         self._perm_sig = None
67         self._perm_expiry = None
68         pieces = iter(locator_str.split('+'))
69         self.md5sum = next(pieces)
70         try:
71             self.size = int(next(pieces))
72         except StopIteration:
73             self.size = None
74         for hint in pieces:
75             if self.HINT_RE.match(hint) is None:
76                 raise ValueError("invalid hint format: {}".format(hint))
77             elif hint.startswith('A'):
78                 self.parse_permission_hint(hint)
79             else:
80                 self.hints.append(hint)
81
82     def __str__(self):
83         return '+'.join(
84             native_str(s)
85             for s in [self.md5sum, self.size,
86                       self.permission_hint()] + self.hints
87             if s is not None)
88
89     def stripped(self):
90         if self.size is not None:
91             return "%s+%i" % (self.md5sum, self.size)
92         else:
93             return self.md5sum
94
95     def _make_hex_prop(name, length):
96         # Build and return a new property with the given name that
97         # must be a hex string of the given length.
98         data_name = '_{}'.format(name)
99         def getter(self):
100             return getattr(self, data_name)
101         def setter(self, hex_str):
102             if not arvados.util.is_hex(hex_str, length):
103                 raise ValueError("{} is not a {}-digit hex string: {!r}".
104                                  format(name, length, hex_str))
105             setattr(self, data_name, hex_str)
106         return property(getter, setter)
107
108     md5sum = _make_hex_prop('md5sum', 32)
109     perm_sig = _make_hex_prop('perm_sig', 40)
110
111     @property
112     def perm_expiry(self):
113         return self._perm_expiry
114
115     @perm_expiry.setter
116     def perm_expiry(self, value):
117         if not arvados.util.is_hex(value, 1, 8):
118             raise ValueError(
119                 "permission timestamp must be a hex Unix timestamp: {}".
120                 format(value))
121         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
122
123     def permission_hint(self):
124         data = [self.perm_sig, self.perm_expiry]
125         if None in data:
126             return None
127         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
128         return "A{}@{:08x}".format(*data)
129
130     def parse_permission_hint(self, s):
131         try:
132             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
133         except IndexError:
134             raise ValueError("bad permission hint {}".format(s))
135
136     def permission_expired(self, as_of_dt=None):
137         if self.perm_expiry is None:
138             return False
139         elif as_of_dt is None:
140             as_of_dt = datetime.datetime.now()
141         return self.perm_expiry <= as_of_dt
142
143
144 class Keep(object):
145     """Simple interface to a global KeepClient object.
146
147     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
148     own API client.  The global KeepClient will build an API client from the
149     current Arvados configuration, which may not match the one you built.
150     """
151     _last_key = None
152
153     @classmethod
154     def global_client_object(cls):
155         global global_client_object
156         # Previously, KeepClient would change its behavior at runtime based
157         # on these configuration settings.  We simulate that behavior here
158         # by checking the values and returning a new KeepClient if any of
159         # them have changed.
160         key = (config.get('ARVADOS_API_HOST'),
161                config.get('ARVADOS_API_TOKEN'),
162                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
163                config.get('ARVADOS_KEEP_PROXY'),
164                os.environ.get('KEEP_LOCAL_STORE'))
165         if (global_client_object is None) or (cls._last_key != key):
166             global_client_object = KeepClient()
167             cls._last_key = key
168         return global_client_object
169
170     @staticmethod
171     def get(locator, **kwargs):
172         return Keep.global_client_object().get(locator, **kwargs)
173
174     @staticmethod
175     def put(data, **kwargs):
176         return Keep.global_client_object().put(data, **kwargs)
177
178 class KeepBlockCache(object):
179     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
180         self.cache_max = cache_max
181         self._cache = []
182         self._cache_lock = threading.Lock()
183         self._max_slots = max_slots
184         self._disk_cache = disk_cache
185         self._disk_cache_dir = disk_cache_dir
186
187         if self._disk_cache and self._disk_cache_dir is None:
188             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
189             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
190
191         if self._max_slots == 0:
192             if self._disk_cache:
193                 # default max slots to half of maximum file handles
194                 # NOFILE typically defaults to 1024 on Linux so this
195                 # will be 512 slots.
196                 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
197             else:
198                 # RAM cache slots
199                 self._max_slots = 512
200
201         if self.cache_max == 0:
202             if self._disk_cache:
203                 fs = os.statvfs(self._disk_cache_dir)
204                 avail = (fs.f_bavail * fs.f_bsize) / 4
205                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
206                 # pick smallest of:
207                 # 10% of total disk size
208                 # 25% of available space
209                 # max_slots * 64 MiB
210                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
211             else:
212                 # 256 GiB in RAM
213                 self.cache_max = (256 * 1024 * 1024)
214
215         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
216
217         if self._disk_cache:
218             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
219             self.cap_cache()
220
221
222     class CacheSlot(object):
223         __slots__ = ("locator", "ready", "content")
224
225         def __init__(self, locator):
226             self.locator = locator
227             self.ready = threading.Event()
228             self.content = None
229
230         def get(self):
231             self.ready.wait()
232             return self.content
233
234         def set(self, value):
235             self.content = value
236             self.ready.set()
237
238         def size(self):
239             if self.content is None:
240                 return 0
241             else:
242                 return len(self.content)
243
244         def evict(self):
245             return True
246
247     def cap_cache(self):
248         '''Cap the cache size to self.cache_max'''
249         with self._cache_lock:
250             # Select all slots except those where ready.is_set() and content is
251             # None (that means there was an error reading the block).
252             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
253             sm = sum([slot.size() for slot in self._cache])
254             while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
255                 for i in range(len(self._cache)-1, -1, -1):
256                     # start from the back, find a slot that is a candidate to evict
257                     if self._cache[i].ready.is_set():
258                         sz = self._cache[i].size()
259
260                         # If evict returns false it means the
261                         # underlying disk cache couldn't lock the file
262                         # for deletion because another process was using
263                         # it. Don't count it as reducing the amount
264                         # of data in the cache, find something else to
265                         # throw out.
266                         if self._cache[i].evict():
267                             sm -= sz
268
269                         # either way we forget about it.  either the
270                         # other process will delete it, or if we need
271                         # it again and it is still there, we'll find
272                         # it on disk.
273                         del self._cache[i]
274                         break
275
276     def _get(self, locator):
277         # Test if the locator is already in the cache
278         for i in range(0, len(self._cache)):
279             if self._cache[i].locator == locator:
280                 n = self._cache[i]
281                 if i != 0:
282                     # move it to the front
283                     del self._cache[i]
284                     self._cache.insert(0, n)
285                 return n
286         if self._disk_cache:
287             # see if it exists on disk
288             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
289             if n is not None:
290                 self._cache.insert(0, n)
291                 return n
292         return None
293
294     def get(self, locator):
295         with self._cache_lock:
296             return self._get(locator)
297
298     def reserve_cache(self, locator):
299         '''Reserve a cache slot for the specified locator,
300         or return the existing slot.'''
301         with self._cache_lock:
302             n = self._get(locator)
303             if n:
304                 return n, False
305             else:
306                 # Add a new cache slot for the locator
307                 if self._disk_cache:
308                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
309                 else:
310                     n = KeepBlockCache.CacheSlot(locator)
311                 self._cache.insert(0, n)
312                 return n, True
313
314 class Counter(object):
315     def __init__(self, v=0):
316         self._lk = threading.Lock()
317         self._val = v
318
319     def add(self, v):
320         with self._lk:
321             self._val += v
322
323     def get(self):
324         with self._lk:
325             return self._val
326
327
328 class KeepClient(object):
329
330     # Default Keep server connection timeout:  2 seconds
331     # Default Keep server read timeout:       256 seconds
332     # Default Keep server bandwidth minimum:  32768 bytes per second
333     # Default Keep proxy connection timeout:  20 seconds
334     # Default Keep proxy read timeout:        256 seconds
335     # Default Keep proxy bandwidth minimum:   32768 bytes per second
336     DEFAULT_TIMEOUT = (2, 256, 32768)
337     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
338
339
340     class KeepService(object):
341         """Make requests to a single Keep service, and track results.
342
343         A KeepService is intended to last long enough to perform one
344         transaction (GET or PUT) against one Keep service. This can
345         involve calling either get() or put() multiple times in order
346         to retry after transient failures. However, calling both get()
347         and put() on a single instance -- or using the same instance
348         to access two different Keep services -- will not produce
349         sensible behavior.
350         """
351
352         HTTP_ERRORS = (
353             socket.error,
354             ssl.SSLError,
355             arvados.errors.HttpError,
356         )
357
358         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
359                      upload_counter=None,
360                      download_counter=None,
361                      headers={},
362                      insecure=False):
363             self.root = root
364             self._user_agent_pool = user_agent_pool
365             self._result = {'error': None}
366             self._usable = True
367             self._session = None
368             self._socket = None
369             self.get_headers = {'Accept': 'application/octet-stream'}
370             self.get_headers.update(headers)
371             self.put_headers = headers
372             self.upload_counter = upload_counter
373             self.download_counter = download_counter
374             self.insecure = insecure
375
376         def usable(self):
377             """Is it worth attempting a request?"""
378             return self._usable
379
380         def finished(self):
381             """Did the request succeed or encounter permanent failure?"""
382             return self._result['error'] == False or not self._usable
383
384         def last_result(self):
385             return self._result
386
387         def _get_user_agent(self):
388             try:
389                 return self._user_agent_pool.get(block=False)
390             except queue.Empty:
391                 return pycurl.Curl()
392
393         def _put_user_agent(self, ua):
394             try:
395                 ua.reset()
396                 self._user_agent_pool.put(ua, block=False)
397             except:
398                 ua.close()
399
400         def _socket_open(self, *args, **kwargs):
401             if len(args) + len(kwargs) == 2:
402                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
403             else:
404                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
405
406         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
407             return self._socket_open_pycurl_7_21_5(
408                 purpose=None,
409                 address=collections.namedtuple(
410                     'Address', ['family', 'socktype', 'protocol', 'addr'],
411                 )(family, socktype, protocol, address))
412
413         def _socket_open_pycurl_7_21_5(self, purpose, address):
414             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
415             s = socket.socket(address.family, address.socktype, address.protocol)
416             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
417             # Will throw invalid protocol error on mac. This test prevents that.
418             if hasattr(socket, 'TCP_KEEPIDLE'):
419                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
420             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
421             self._socket = s
422             return s
423
424         def get(self, locator, method="GET", timeout=None):
425             # locator is a KeepLocator object.
426             url = self.root + str(locator)
427             _logger.debug("Request: %s %s", method, url)
428             curl = self._get_user_agent()
429             ok = None
430             try:
431                 with timer.Timer() as t:
432                     self._headers = {}
433                     response_body = BytesIO()
434                     curl.setopt(pycurl.NOSIGNAL, 1)
435                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
436                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
437                     curl.setopt(pycurl.URL, url.encode('utf-8'))
438                     curl.setopt(pycurl.HTTPHEADER, [
439                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
440                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
441                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
442                     if self.insecure:
443                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
444                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
445                     else:
446                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
447                     if method == "HEAD":
448                         curl.setopt(pycurl.NOBODY, True)
449                     self._setcurltimeouts(curl, timeout, method=="HEAD")
450
451                     try:
452                         curl.perform()
453                     except Exception as e:
454                         raise arvados.errors.HttpError(0, str(e))
455                     finally:
456                         if self._socket:
457                             self._socket.close()
458                             self._socket = None
459                     self._result = {
460                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
461                         'body': response_body.getvalue(),
462                         'headers': self._headers,
463                         'error': False,
464                     }
465
466                 ok = retry.check_http_response_success(self._result['status_code'])
467                 if not ok:
468                     self._result['error'] = arvados.errors.HttpError(
469                         self._result['status_code'],
470                         self._headers.get('x-status-line', 'Error'))
471             except self.HTTP_ERRORS as e:
472                 self._result = {
473                     'error': e,
474                 }
475             self._usable = ok != False
476             if self._result.get('status_code', None):
477                 # The client worked well enough to get an HTTP status
478                 # code, so presumably any problems are just on the
479                 # server side and it's OK to reuse the client.
480                 self._put_user_agent(curl)
481             else:
482                 # Don't return this client to the pool, in case it's
483                 # broken.
484                 curl.close()
485             if not ok:
486                 _logger.debug("Request fail: GET %s => %s: %s",
487                               url, type(self._result['error']), str(self._result['error']))
488                 return None
489             if method == "HEAD":
490                 _logger.info("HEAD %s: %s bytes",
491                          self._result['status_code'],
492                          self._result.get('content-length'))
493                 if self._result['headers'].get('x-keep-locator'):
494                     # This is a response to a remote block copy request, return
495                     # the local copy block locator.
496                     return self._result['headers'].get('x-keep-locator')
497                 return True
498
499             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
500                          self._result['status_code'],
501                          len(self._result['body']),
502                          t.msecs,
503                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
504
505             if self.download_counter:
506                 self.download_counter.add(len(self._result['body']))
507             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
508             if resp_md5 != locator.md5sum:
509                 _logger.warning("Checksum fail: md5(%s) = %s",
510                                 url, resp_md5)
511                 self._result['error'] = arvados.errors.HttpError(
512                     0, 'Checksum fail')
513                 return None
514             return self._result['body']
515
516         def put(self, hash_s, body, timeout=None, headers={}):
517             put_headers = copy.copy(self.put_headers)
518             put_headers.update(headers)
519             url = self.root + hash_s
520             _logger.debug("Request: PUT %s", url)
521             curl = self._get_user_agent()
522             ok = None
523             try:
524                 with timer.Timer() as t:
525                     self._headers = {}
526                     body_reader = BytesIO(body)
527                     response_body = BytesIO()
528                     curl.setopt(pycurl.NOSIGNAL, 1)
529                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
530                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
531                     curl.setopt(pycurl.URL, url.encode('utf-8'))
532                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
533                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
534                     # response) instead of sending the request body immediately.
535                     # This allows the server to reject the request if the request
536                     # is invalid or the server is read-only, without waiting for
537                     # the client to send the entire block.
538                     curl.setopt(pycurl.UPLOAD, True)
539                     curl.setopt(pycurl.INFILESIZE, len(body))
540                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
541                     curl.setopt(pycurl.HTTPHEADER, [
542                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
543                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
544                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
545                     if self.insecure:
546                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
547                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
548                     else:
549                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
550                     self._setcurltimeouts(curl, timeout)
551                     try:
552                         curl.perform()
553                     except Exception as e:
554                         raise arvados.errors.HttpError(0, str(e))
555                     finally:
556                         if self._socket:
557                             self._socket.close()
558                             self._socket = None
559                     self._result = {
560                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
561                         'body': response_body.getvalue().decode('utf-8'),
562                         'headers': self._headers,
563                         'error': False,
564                     }
565                 ok = retry.check_http_response_success(self._result['status_code'])
566                 if not ok:
567                     self._result['error'] = arvados.errors.HttpError(
568                         self._result['status_code'],
569                         self._headers.get('x-status-line', 'Error'))
570             except self.HTTP_ERRORS as e:
571                 self._result = {
572                     'error': e,
573                 }
574             self._usable = ok != False # still usable if ok is True or None
575             if self._result.get('status_code', None):
576                 # Client is functional. See comment in get().
577                 self._put_user_agent(curl)
578             else:
579                 curl.close()
580             if not ok:
581                 _logger.debug("Request fail: PUT %s => %s: %s",
582                               url, type(self._result['error']), str(self._result['error']))
583                 return False
584             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
585                          self._result['status_code'],
586                          len(body),
587                          t.msecs,
588                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
589             if self.upload_counter:
590                 self.upload_counter.add(len(body))
591             return True
592
593         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
594             if not timeouts:
595                 return
596             elif isinstance(timeouts, tuple):
597                 if len(timeouts) == 2:
598                     conn_t, xfer_t = timeouts
599                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
600                 else:
601                     conn_t, xfer_t, bandwidth_bps = timeouts
602             else:
603                 conn_t, xfer_t = (timeouts, timeouts)
604                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
605             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
606             if not ignore_bandwidth:
607                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
608                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
609
610         def _headerfunction(self, header_line):
611             if isinstance(header_line, bytes):
612                 header_line = header_line.decode('iso-8859-1')
613             if ':' in header_line:
614                 name, value = header_line.split(':', 1)
615                 name = name.strip().lower()
616                 value = value.strip()
617             elif self._headers:
618                 name = self._lastheadername
619                 value = self._headers[name] + ' ' + header_line.strip()
620             elif header_line.startswith('HTTP/'):
621                 name = 'x-status-line'
622                 value = header_line
623             else:
624                 _logger.error("Unexpected header line: %s", header_line)
625                 return
626             self._lastheadername = name
627             self._headers[name] = value
628             # Returning None implies all bytes were written
629
630
631     class KeepWriterQueue(queue.Queue):
632         def __init__(self, copies, classes=[]):
633             queue.Queue.__init__(self) # Old-style superclass
634             self.wanted_copies = copies
635             self.wanted_storage_classes = classes
636             self.successful_copies = 0
637             self.confirmed_storage_classes = {}
638             self.response = None
639             self.storage_classes_tracking = True
640             self.queue_data_lock = threading.RLock()
641             self.pending_tries = max(copies, len(classes))
642             self.pending_tries_notification = threading.Condition()
643
644         def write_success(self, response, replicas_nr, classes_confirmed):
645             with self.queue_data_lock:
646                 self.successful_copies += replicas_nr
647                 if classes_confirmed is None:
648                     self.storage_classes_tracking = False
649                 elif self.storage_classes_tracking:
650                     for st_class, st_copies in classes_confirmed.items():
651                         try:
652                             self.confirmed_storage_classes[st_class] += st_copies
653                         except KeyError:
654                             self.confirmed_storage_classes[st_class] = st_copies
655                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
656                 self.response = response
657             with self.pending_tries_notification:
658                 self.pending_tries_notification.notify_all()
659
660         def write_fail(self, ks):
661             with self.pending_tries_notification:
662                 self.pending_tries += 1
663                 self.pending_tries_notification.notify()
664
665         def pending_copies(self):
666             with self.queue_data_lock:
667                 return self.wanted_copies - self.successful_copies
668
669         def satisfied_classes(self):
670             with self.queue_data_lock:
671                 if not self.storage_classes_tracking:
672                     # Notifies disabled storage classes expectation to
673                     # the outer loop.
674                     return None
675             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
676
677         def pending_classes(self):
678             with self.queue_data_lock:
679                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
680                     return []
681                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
682                 for st_class, st_copies in self.confirmed_storage_classes.items():
683                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
684                         unsatisfied_classes.remove(st_class)
685                 return unsatisfied_classes
686
687         def get_next_task(self):
688             with self.pending_tries_notification:
689                 while True:
690                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
691                         # This notify_all() is unnecessary --
692                         # write_success() already called notify_all()
693                         # when pending<1 became true, so it's not
694                         # possible for any other thread to be in
695                         # wait() now -- but it's cheap insurance
696                         # against deadlock so we do it anyway:
697                         self.pending_tries_notification.notify_all()
698                         # Drain the queue and then raise Queue.Empty
699                         while True:
700                             self.get_nowait()
701                             self.task_done()
702                     elif self.pending_tries > 0:
703                         service, service_root = self.get_nowait()
704                         if service.finished():
705                             self.task_done()
706                             continue
707                         self.pending_tries -= 1
708                         return service, service_root
709                     elif self.empty():
710                         self.pending_tries_notification.notify_all()
711                         raise queue.Empty
712                     else:
713                         self.pending_tries_notification.wait()
714
715
716     class KeepWriterThreadPool(object):
717         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
718             self.total_task_nr = 0
719             if (not max_service_replicas) or (max_service_replicas >= copies):
720                 num_threads = 1
721             else:
722                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
723             _logger.debug("Pool max threads is %d", num_threads)
724             self.workers = []
725             self.queue = KeepClient.KeepWriterQueue(copies, classes)
726             # Create workers
727             for _ in range(num_threads):
728                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
729                 self.workers.append(w)
730
731         def add_task(self, ks, service_root):
732             self.queue.put((ks, service_root))
733             self.total_task_nr += 1
734
735         def done(self):
736             return self.queue.successful_copies, self.queue.satisfied_classes()
737
738         def join(self):
739             # Start workers
740             for worker in self.workers:
741                 worker.start()
742             # Wait for finished work
743             self.queue.join()
744
745         def response(self):
746             return self.queue.response
747
748
749     class KeepWriterThread(threading.Thread):
750         class TaskFailed(RuntimeError): pass
751
752         def __init__(self, queue, data, data_hash, timeout=None):
753             super(KeepClient.KeepWriterThread, self).__init__()
754             self.timeout = timeout
755             self.queue = queue
756             self.data = data
757             self.data_hash = data_hash
758             self.daemon = True
759
760         def run(self):
761             while True:
762                 try:
763                     service, service_root = self.queue.get_next_task()
764                 except queue.Empty:
765                     return
766                 try:
767                     locator, copies, classes = self.do_task(service, service_root)
768                 except Exception as e:
769                     if not isinstance(e, self.TaskFailed):
770                         _logger.exception("Exception in KeepWriterThread")
771                     self.queue.write_fail(service)
772                 else:
773                     self.queue.write_success(locator, copies, classes)
774                 finally:
775                     self.queue.task_done()
776
777         def do_task(self, service, service_root):
778             classes = self.queue.pending_classes()
779             headers = {}
780             if len(classes) > 0:
781                 classes.sort()
782                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
783             success = bool(service.put(self.data_hash,
784                                         self.data,
785                                         timeout=self.timeout,
786                                         headers=headers))
787             result = service.last_result()
788
789             if not success:
790                 if result.get('status_code'):
791                     _logger.debug("Request fail: PUT %s => %s %s",
792                                   self.data_hash,
793                                   result.get('status_code'),
794                                   result.get('body'))
795                 raise self.TaskFailed()
796
797             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
798                           str(threading.current_thread()),
799                           self.data_hash,
800                           len(self.data),
801                           service_root)
802             try:
803                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
804             except (KeyError, ValueError):
805                 replicas_stored = 1
806
807             classes_confirmed = {}
808             try:
809                 scch = result['headers']['x-keep-storage-classes-confirmed']
810                 for confirmation in scch.replace(' ', '').split(','):
811                     if '=' in confirmation:
812                         stored_class, stored_copies = confirmation.split('=')[:2]
813                         classes_confirmed[stored_class] = int(stored_copies)
814             except (KeyError, ValueError):
815                 # Storage classes confirmed header missing or corrupt
816                 classes_confirmed = None
817
818             return result['body'].strip(), replicas_stored, classes_confirmed
819
820
821     def __init__(self, api_client=None, proxy=None,
822                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
823                  api_token=None, local_store=None, block_cache=None,
824                  num_retries=0, session=None):
825         """Initialize a new KeepClient.
826
827         Arguments:
828         :api_client:
829           The API client to use to find Keep services.  If not
830           provided, KeepClient will build one from available Arvados
831           configuration.
832
833         :proxy:
834           If specified, this KeepClient will send requests to this Keep
835           proxy.  Otherwise, KeepClient will fall back to the setting of the
836           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
837           If you want to KeepClient does not use a proxy, pass in an empty
838           string.
839
840         :timeout:
841           The initial timeout (in seconds) for HTTP requests to Keep
842           non-proxy servers.  A tuple of three floats is interpreted as
843           (connection_timeout, read_timeout, minimum_bandwidth). A connection
844           will be aborted if the average traffic rate falls below
845           minimum_bandwidth bytes per second over an interval of read_timeout
846           seconds. Because timeouts are often a result of transient server
847           load, the actual connection timeout will be increased by a factor
848           of two on each retry.
849           Default: (2, 256, 32768).
850
851         :proxy_timeout:
852           The initial timeout (in seconds) for HTTP requests to
853           Keep proxies. A tuple of three floats is interpreted as
854           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
855           described above for adjusting connection timeouts on retry also
856           applies.
857           Default: (20, 256, 32768).
858
859         :api_token:
860           If you're not using an API client, but only talking
861           directly to a Keep proxy, this parameter specifies an API token
862           to authenticate Keep requests.  It is an error to specify both
863           api_client and api_token.  If you specify neither, KeepClient
864           will use one available from the Arvados configuration.
865
866         :local_store:
867           If specified, this KeepClient will bypass Keep
868           services, and save data to the named directory.  If unspecified,
869           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
870           environment variable.  If you want to ensure KeepClient does not
871           use local storage, pass in an empty string.  This is primarily
872           intended to mock a server for testing.
873
874         :num_retries:
875           The default number of times to retry failed requests.
876           This will be used as the default num_retries value when get() and
877           put() are called.  Default 0.
878         """
879         self.lock = threading.Lock()
880         if proxy is None:
881             if config.get('ARVADOS_KEEP_SERVICES'):
882                 proxy = config.get('ARVADOS_KEEP_SERVICES')
883             else:
884                 proxy = config.get('ARVADOS_KEEP_PROXY')
885         if api_token is None:
886             if api_client is None:
887                 api_token = config.get('ARVADOS_API_TOKEN')
888             else:
889                 api_token = api_client.api_token
890         elif api_client is not None:
891             raise ValueError(
892                 "can't build KeepClient with both API client and token")
893         if local_store is None:
894             local_store = os.environ.get('KEEP_LOCAL_STORE')
895
896         if api_client is None:
897             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
898         else:
899             self.insecure = api_client.insecure
900
901         self.block_cache = block_cache if block_cache else KeepBlockCache()
902         self.timeout = timeout
903         self.proxy_timeout = proxy_timeout
904         self._user_agent_pool = queue.LifoQueue()
905         self.upload_counter = Counter()
906         self.download_counter = Counter()
907         self.put_counter = Counter()
908         self.get_counter = Counter()
909         self.hits_counter = Counter()
910         self.misses_counter = Counter()
911         self._storage_classes_unsupported_warning = False
912         self._default_classes = []
913
914         if local_store:
915             self.local_store = local_store
916             self.head = self.local_store_head
917             self.get = self.local_store_get
918             self.put = self.local_store_put
919         else:
920             self.num_retries = num_retries
921             self.max_replicas_per_service = None
922             if proxy:
923                 proxy_uris = proxy.split()
924                 for i in range(len(proxy_uris)):
925                     if not proxy_uris[i].endswith('/'):
926                         proxy_uris[i] += '/'
927                     # URL validation
928                     url = urllib.parse.urlparse(proxy_uris[i])
929                     if not (url.scheme and url.netloc):
930                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
931                 self.api_token = api_token
932                 self._gateway_services = {}
933                 self._keep_services = [{
934                     'uuid': "00000-bi6l4-%015d" % idx,
935                     'service_type': 'proxy',
936                     '_service_root': uri,
937                     } for idx, uri in enumerate(proxy_uris)]
938                 self._writable_services = self._keep_services
939                 self.using_proxy = True
940                 self._static_services_list = True
941             else:
942                 # It's important to avoid instantiating an API client
943                 # unless we actually need one, for testing's sake.
944                 if api_client is None:
945                     api_client = arvados.api('v1')
946                 self.api_client = api_client
947                 self.api_token = api_client.api_token
948                 self._gateway_services = {}
949                 self._keep_services = None
950                 self._writable_services = None
951                 self.using_proxy = None
952                 self._static_services_list = False
953                 try:
954                     self._default_classes = [
955                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
956                 except KeyError:
957                     # We're talking to an old cluster
958                     pass
959
960     def current_timeout(self, attempt_number):
961         """Return the appropriate timeout to use for this client.
962
963         The proxy timeout setting if the backend service is currently a proxy,
964         the regular timeout setting otherwise.  The `attempt_number` indicates
965         how many times the operation has been tried already (starting from 0
966         for the first try), and scales the connection timeout portion of the
967         return value accordingly.
968
969         """
970         # TODO(twp): the timeout should be a property of a
971         # KeepService, not a KeepClient. See #4488.
972         t = self.proxy_timeout if self.using_proxy else self.timeout
973         if len(t) == 2:
974             return (t[0] * (1 << attempt_number), t[1])
975         else:
976             return (t[0] * (1 << attempt_number), t[1], t[2])
977     def _any_nondisk_services(self, service_list):
978         return any(ks.get('service_type', 'disk') != 'disk'
979                    for ks in service_list)
980
981     def build_services_list(self, force_rebuild=False):
982         if (self._static_services_list or
983               (self._keep_services and not force_rebuild)):
984             return
985         with self.lock:
986             try:
987                 keep_services = self.api_client.keep_services().accessible()
988             except Exception:  # API server predates Keep services.
989                 keep_services = self.api_client.keep_disks().list()
990
991             # Gateway services are only used when specified by UUID,
992             # so there's nothing to gain by filtering them by
993             # service_type.
994             self._gateway_services = {ks['uuid']: ks for ks in
995                                       keep_services.execute()['items']}
996             if not self._gateway_services:
997                 raise arvados.errors.NoKeepServersError()
998
999             # Precompute the base URI for each service.
1000             for r in self._gateway_services.values():
1001                 host = r['service_host']
1002                 if not host.startswith('[') and host.find(':') >= 0:
1003                     # IPv6 URIs must be formatted like http://[::1]:80/...
1004                     host = '[' + host + ']'
1005                 r['_service_root'] = "{}://{}:{:d}/".format(
1006                     'https' if r['service_ssl_flag'] else 'http',
1007                     host,
1008                     r['service_port'])
1009
1010             _logger.debug(str(self._gateway_services))
1011             self._keep_services = [
1012                 ks for ks in self._gateway_services.values()
1013                 if not ks.get('service_type', '').startswith('gateway:')]
1014             self._writable_services = [ks for ks in self._keep_services
1015                                        if not ks.get('read_only')]
1016
1017             # For disk type services, max_replicas_per_service is 1
1018             # It is unknown (unlimited) for other service types.
1019             if self._any_nondisk_services(self._writable_services):
1020                 self.max_replicas_per_service = None
1021             else:
1022                 self.max_replicas_per_service = 1
1023
1024     def _service_weight(self, data_hash, service_uuid):
1025         """Compute the weight of a Keep service endpoint for a data
1026         block with a known hash.
1027
1028         The weight is md5(h + u) where u is the last 15 characters of
1029         the service endpoint's UUID.
1030         """
1031         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1032
1033     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1034         """Return an array of Keep service endpoints, in the order in
1035         which they should be probed when reading or writing data with
1036         the given hash+hints.
1037         """
1038         self.build_services_list(force_rebuild)
1039
1040         sorted_roots = []
1041         # Use the services indicated by the given +K@... remote
1042         # service hints, if any are present and can be resolved to a
1043         # URI.
1044         for hint in locator.hints:
1045             if hint.startswith('K@'):
1046                 if len(hint) == 7:
1047                     sorted_roots.append(
1048                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1049                 elif len(hint) == 29:
1050                     svc = self._gateway_services.get(hint[2:])
1051                     if svc:
1052                         sorted_roots.append(svc['_service_root'])
1053
1054         # Sort the available local services by weight (heaviest first)
1055         # for this locator, and return their service_roots (base URIs)
1056         # in that order.
1057         use_services = self._keep_services
1058         if need_writable:
1059             use_services = self._writable_services
1060         self.using_proxy = self._any_nondisk_services(use_services)
1061         sorted_roots.extend([
1062             svc['_service_root'] for svc in sorted(
1063                 use_services,
1064                 reverse=True,
1065                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1066         _logger.debug("{}: {}".format(locator, sorted_roots))
1067         return sorted_roots
1068
1069     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1070         # roots_map is a dictionary, mapping Keep service root strings
1071         # to KeepService objects.  Poll for Keep services, and add any
1072         # new ones to roots_map.  Return the current list of local
1073         # root strings.
1074         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1075         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1076         for root in local_roots:
1077             if root not in roots_map:
1078                 roots_map[root] = self.KeepService(
1079                     root, self._user_agent_pool,
1080                     upload_counter=self.upload_counter,
1081                     download_counter=self.download_counter,
1082                     headers=headers,
1083                     insecure=self.insecure)
1084         return local_roots
1085
1086     @staticmethod
1087     def _check_loop_result(result):
1088         # KeepClient RetryLoops should save results as a 2-tuple: the
1089         # actual result of the request, and the number of servers available
1090         # to receive the request this round.
1091         # This method returns True if there's a real result, False if
1092         # there are no more servers available, otherwise None.
1093         if isinstance(result, Exception):
1094             return None
1095         result, tried_server_count = result
1096         if (result is not None) and (result is not False):
1097             return True
1098         elif tried_server_count < 1:
1099             _logger.info("No more Keep services to try; giving up")
1100             return False
1101         else:
1102             return None
1103
1104     def get_from_cache(self, loc_s):
1105         """Fetch a block only if is in the cache, otherwise return None."""
1106         locator = KeepLocator(loc_s)
1107         slot = self.block_cache.get(locator.md5sum)
1108         if slot is not None and slot.ready.is_set():
1109             return slot.get()
1110         else:
1111             return None
1112
1113     def refresh_signature(self, loc):
1114         """Ask Keep to get the remote block and return its local signature"""
1115         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1116         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1117
1118     @retry.retry_method
1119     def head(self, loc_s, **kwargs):
1120         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1121
1122     @retry.retry_method
1123     def get(self, loc_s, **kwargs):
1124         return self._get_or_head(loc_s, method="GET", **kwargs)
1125
1126     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1127         """Get data from Keep.
1128
1129         This method fetches one or more blocks of data from Keep.  It
1130         sends a request each Keep service registered with the API
1131         server (or the proxy provided when this client was
1132         instantiated), then each service named in location hints, in
1133         sequence.  As soon as one service provides the data, it's
1134         returned.
1135
1136         Arguments:
1137         * loc_s: A string of one or more comma-separated locators to fetch.
1138           This method returns the concatenation of these blocks.
1139         * num_retries: The number of times to retry GET requests to
1140           *each* Keep server if it returns temporary failures, with
1141           exponential backoff.  Note that, in each loop, the method may try
1142           to fetch data from every available Keep service, along with any
1143           that are named in location hints in the locator.  The default value
1144           is set when the KeepClient is initialized.
1145         """
1146         if ',' in loc_s:
1147             return ''.join(self.get(x) for x in loc_s.split(','))
1148
1149         self.get_counter.add(1)
1150
1151         request_id = (request_id or
1152                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1153                       arvados.util.new_request_id())
1154         if headers is None:
1155             headers = {}
1156         headers['X-Request-Id'] = request_id
1157
1158         slot = None
1159         blob = None
1160         try:
1161             locator = KeepLocator(loc_s)
1162             if method == "GET":
1163                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1164                 if not first:
1165                     if prefetch:
1166                         # this is request for a prefetch, if it is
1167                         # already in flight, return immediately.
1168                         # clear 'slot' to prevent finally block from
1169                         # calling slot.set()
1170                         slot = None
1171                         return None
1172                     self.hits_counter.add(1)
1173                     blob = slot.get()
1174                     if blob is None:
1175                         raise arvados.errors.KeepReadError(
1176                             "failed to read {}".format(loc_s))
1177                     return blob
1178
1179             self.misses_counter.add(1)
1180
1181             # If the locator has hints specifying a prefix (indicating a
1182             # remote keepproxy) or the UUID of a local gateway service,
1183             # read data from the indicated service(s) instead of the usual
1184             # list of local disk services.
1185             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1186                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1187             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1188                                for hint in locator.hints if (
1189                                        hint.startswith('K@') and
1190                                        len(hint) == 29 and
1191                                        self._gateway_services.get(hint[2:])
1192                                        )])
1193             # Map root URLs to their KeepService objects.
1194             roots_map = {
1195                 root: self.KeepService(root, self._user_agent_pool,
1196                                        upload_counter=self.upload_counter,
1197                                        download_counter=self.download_counter,
1198                                        headers=headers,
1199                                        insecure=self.insecure)
1200                 for root in hint_roots
1201             }
1202
1203             # See #3147 for a discussion of the loop implementation.  Highlights:
1204             # * Refresh the list of Keep services after each failure, in case
1205             #   it's being updated.
1206             # * Retry until we succeed, we're out of retries, or every available
1207             #   service has returned permanent failure.
1208             sorted_roots = []
1209             roots_map = {}
1210             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1211                                    backoff_start=2)
1212             for tries_left in loop:
1213                 try:
1214                     sorted_roots = self.map_new_services(
1215                         roots_map, locator,
1216                         force_rebuild=(tries_left < num_retries),
1217                         need_writable=False,
1218                         headers=headers)
1219                 except Exception as error:
1220                     loop.save_result(error)
1221                     continue
1222
1223                 # Query KeepService objects that haven't returned
1224                 # permanent failure, in our specified shuffle order.
1225                 services_to_try = [roots_map[root]
1226                                    for root in sorted_roots
1227                                    if roots_map[root].usable()]
1228                 for keep_service in services_to_try:
1229                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1230                     if blob is not None:
1231                         break
1232                 loop.save_result((blob, len(services_to_try)))
1233
1234             # Always cache the result, then return it if we succeeded.
1235             if loop.success():
1236                 return blob
1237         finally:
1238             if slot is not None:
1239                 slot.set(blob)
1240                 self.block_cache.cap_cache()
1241
1242         # Q: Including 403 is necessary for the Keep tests to continue
1243         # passing, but maybe they should expect KeepReadError instead?
1244         not_founds = sum(1 for key in sorted_roots
1245                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1246         service_errors = ((key, roots_map[key].last_result()['error'])
1247                           for key in sorted_roots)
1248         if not roots_map:
1249             raise arvados.errors.KeepReadError(
1250                 "[{}] failed to read {}: no Keep services available ({})".format(
1251                     request_id, loc_s, loop.last_result()))
1252         elif not_founds == len(sorted_roots):
1253             raise arvados.errors.NotFoundError(
1254                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1255         else:
1256             raise arvados.errors.KeepReadError(
1257                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1258
1259     @retry.retry_method
1260     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1261         """Save data in Keep.
1262
1263         This method will get a list of Keep services from the API server, and
1264         send the data to each one simultaneously in a new thread.  Once the
1265         uploads are finished, if enough copies are saved, this method returns
1266         the most recent HTTP response body.  If requests fail to upload
1267         enough copies, this method raises KeepWriteError.
1268
1269         Arguments:
1270         * data: The string of data to upload.
1271         * copies: The number of copies that the user requires be saved.
1272           Default 2.
1273         * num_retries: The number of times to retry PUT requests to
1274           *each* Keep server if it returns temporary failures, with
1275           exponential backoff.  The default value is set when the
1276           KeepClient is initialized.
1277         * classes: An optional list of storage class names where copies should
1278           be written.
1279         """
1280
1281         classes = classes or self._default_classes
1282
1283         if not isinstance(data, bytes):
1284             data = data.encode()
1285
1286         self.put_counter.add(1)
1287
1288         data_hash = hashlib.md5(data).hexdigest()
1289         loc_s = data_hash + '+' + str(len(data))
1290         if copies < 1:
1291             return loc_s
1292         locator = KeepLocator(loc_s)
1293
1294         request_id = (request_id or
1295                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1296                       arvados.util.new_request_id())
1297         headers = {
1298             'X-Request-Id': request_id,
1299             'X-Keep-Desired-Replicas': str(copies),
1300         }
1301         roots_map = {}
1302         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1303                                backoff_start=2)
1304         done_copies = 0
1305         done_classes = []
1306         for tries_left in loop:
1307             try:
1308                 sorted_roots = self.map_new_services(
1309                     roots_map, locator,
1310                     force_rebuild=(tries_left < num_retries),
1311                     need_writable=True,
1312                     headers=headers)
1313             except Exception as error:
1314                 loop.save_result(error)
1315                 continue
1316
1317             pending_classes = []
1318             if done_classes is not None:
1319                 pending_classes = list(set(classes) - set(done_classes))
1320             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1321                                                         data_hash=data_hash,
1322                                                         copies=copies - done_copies,
1323                                                         max_service_replicas=self.max_replicas_per_service,
1324                                                         timeout=self.current_timeout(num_retries - tries_left),
1325                                                         classes=pending_classes)
1326             for service_root, ks in [(root, roots_map[root])
1327                                      for root in sorted_roots]:
1328                 if ks.finished():
1329                     continue
1330                 writer_pool.add_task(ks, service_root)
1331             writer_pool.join()
1332             pool_copies, pool_classes = writer_pool.done()
1333             done_copies += pool_copies
1334             if (done_classes is not None) and (pool_classes is not None):
1335                 done_classes += pool_classes
1336                 loop.save_result(
1337                     (done_copies >= copies and set(done_classes) == set(classes),
1338                     writer_pool.total_task_nr))
1339             else:
1340                 # Old keepstore contacted without storage classes support:
1341                 # success is determined only by successful copies.
1342                 #
1343                 # Disable storage classes tracking from this point forward.
1344                 if not self._storage_classes_unsupported_warning:
1345                     self._storage_classes_unsupported_warning = True
1346                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1347                 done_classes = None
1348                 loop.save_result(
1349                     (done_copies >= copies, writer_pool.total_task_nr))
1350
1351         if loop.success():
1352             return writer_pool.response()
1353         if not roots_map:
1354             raise arvados.errors.KeepWriteError(
1355                 "[{}] failed to write {}: no Keep services available ({})".format(
1356                     request_id, data_hash, loop.last_result()))
1357         else:
1358             service_errors = ((key, roots_map[key].last_result()['error'])
1359                               for key in sorted_roots
1360                               if roots_map[key].last_result()['error'])
1361             raise arvados.errors.KeepWriteError(
1362                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1363                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1364
1365     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1366         """A stub for put().
1367
1368         This method is used in place of the real put() method when
1369         using local storage (see constructor's local_store argument).
1370
1371         copies and num_retries arguments are ignored: they are here
1372         only for the sake of offering the same call signature as
1373         put().
1374
1375         Data stored this way can be retrieved via local_store_get().
1376         """
1377         md5 = hashlib.md5(data).hexdigest()
1378         locator = '%s+%d' % (md5, len(data))
1379         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1380             f.write(data)
1381         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1382                   os.path.join(self.local_store, md5))
1383         return locator
1384
1385     def local_store_get(self, loc_s, num_retries=None):
1386         """Companion to local_store_put()."""
1387         try:
1388             locator = KeepLocator(loc_s)
1389         except ValueError:
1390             raise arvados.errors.NotFoundError(
1391                 "Invalid data locator: '%s'" % loc_s)
1392         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1393             return b''
1394         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1395             return f.read()
1396
1397     def local_store_head(self, loc_s, num_retries=None):
1398         """Companion to local_store_put()."""
1399         try:
1400             locator = KeepLocator(loc_s)
1401         except ValueError:
1402             raise arvados.errors.NotFoundError(
1403                 "Invalid data locator: '%s'" % loc_s)
1404         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1405             return True
1406         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1407             return True