18842: put the file locking back
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import io
19 import logging
20 import math
21 import os
22 import pycurl
23 import queue
24 import re
25 import socket
26 import ssl
27 import sys
28 import threading
29 import resource
30 from . import timer
31 import urllib.parse
32
33 if sys.version_info >= (3, 0):
34     from io import BytesIO
35 else:
36     from cStringIO import StringIO as BytesIO
37
38 import arvados
39 import arvados.config as config
40 import arvados.errors
41 import arvados.retry as retry
42 import arvados.util
43 import arvados.diskcache
44
45 _logger = logging.getLogger('arvados.keep')
46 global_client_object = None
47
48
49 # Monkey patch TCP constants when not available (apple). Values sourced from:
50 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
51 if sys.platform == 'darwin':
52     if not hasattr(socket, 'TCP_KEEPALIVE'):
53         socket.TCP_KEEPALIVE = 0x010
54     if not hasattr(socket, 'TCP_KEEPINTVL'):
55         socket.TCP_KEEPINTVL = 0x101
56     if not hasattr(socket, 'TCP_KEEPCNT'):
57         socket.TCP_KEEPCNT = 0x102
58
59
60 class KeepLocator(object):
61     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
62     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
63
64     def __init__(self, locator_str):
65         self.hints = []
66         self._perm_sig = None
67         self._perm_expiry = None
68         pieces = iter(locator_str.split('+'))
69         self.md5sum = next(pieces)
70         try:
71             self.size = int(next(pieces))
72         except StopIteration:
73             self.size = None
74         for hint in pieces:
75             if self.HINT_RE.match(hint) is None:
76                 raise ValueError("invalid hint format: {}".format(hint))
77             elif hint.startswith('A'):
78                 self.parse_permission_hint(hint)
79             else:
80                 self.hints.append(hint)
81
82     def __str__(self):
83         return '+'.join(
84             native_str(s)
85             for s in [self.md5sum, self.size,
86                       self.permission_hint()] + self.hints
87             if s is not None)
88
89     def stripped(self):
90         if self.size is not None:
91             return "%s+%i" % (self.md5sum, self.size)
92         else:
93             return self.md5sum
94
95     def _make_hex_prop(name, length):
96         # Build and return a new property with the given name that
97         # must be a hex string of the given length.
98         data_name = '_{}'.format(name)
99         def getter(self):
100             return getattr(self, data_name)
101         def setter(self, hex_str):
102             if not arvados.util.is_hex(hex_str, length):
103                 raise ValueError("{} is not a {}-digit hex string: {!r}".
104                                  format(name, length, hex_str))
105             setattr(self, data_name, hex_str)
106         return property(getter, setter)
107
108     md5sum = _make_hex_prop('md5sum', 32)
109     perm_sig = _make_hex_prop('perm_sig', 40)
110
111     @property
112     def perm_expiry(self):
113         return self._perm_expiry
114
115     @perm_expiry.setter
116     def perm_expiry(self, value):
117         if not arvados.util.is_hex(value, 1, 8):
118             raise ValueError(
119                 "permission timestamp must be a hex Unix timestamp: {}".
120                 format(value))
121         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
122
123     def permission_hint(self):
124         data = [self.perm_sig, self.perm_expiry]
125         if None in data:
126             return None
127         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
128         return "A{}@{:08x}".format(*data)
129
130     def parse_permission_hint(self, s):
131         try:
132             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
133         except IndexError:
134             raise ValueError("bad permission hint {}".format(s))
135
136     def permission_expired(self, as_of_dt=None):
137         if self.perm_expiry is None:
138             return False
139         elif as_of_dt is None:
140             as_of_dt = datetime.datetime.now()
141         return self.perm_expiry <= as_of_dt
142
143
144 class Keep(object):
145     """Simple interface to a global KeepClient object.
146
147     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
148     own API client.  The global KeepClient will build an API client from the
149     current Arvados configuration, which may not match the one you built.
150     """
151     _last_key = None
152
153     @classmethod
154     def global_client_object(cls):
155         global global_client_object
156         # Previously, KeepClient would change its behavior at runtime based
157         # on these configuration settings.  We simulate that behavior here
158         # by checking the values and returning a new KeepClient if any of
159         # them have changed.
160         key = (config.get('ARVADOS_API_HOST'),
161                config.get('ARVADOS_API_TOKEN'),
162                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
163                config.get('ARVADOS_KEEP_PROXY'),
164                os.environ.get('KEEP_LOCAL_STORE'))
165         if (global_client_object is None) or (cls._last_key != key):
166             global_client_object = KeepClient()
167             cls._last_key = key
168         return global_client_object
169
170     @staticmethod
171     def get(locator, **kwargs):
172         return Keep.global_client_object().get(locator, **kwargs)
173
174     @staticmethod
175     def put(data, **kwargs):
176         return Keep.global_client_object().put(data, **kwargs)
177
178 class KeepBlockCache(object):
179     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
180         self.cache_max = cache_max
181         self._cache = []
182         self._cache_lock = threading.Lock()
183         self._max_slots = max_slots
184         self._disk_cache = disk_cache
185         self._disk_cache_dir = disk_cache_dir
186
187         if self._disk_cache and self._disk_cache_dir is None:
188             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
189             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
190
191         if self._max_slots == 0:
192             if self._disk_cache:
193                 # default set max slots to half of maximum file handles
194                 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
195             else:
196                 self._max_slots = 1024
197
198         if self.cache_max == 0:
199             if self._disk_cache:
200                 fs = os.statvfs(self._disk_cache_dir)
201                 avail = (fs.f_bavail * fs.f_bsize) / 2
202                 # Half the available space or max_slots * 64 MiB
203                 self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
204             else:
205                 # 256 GiB in RAM
206                 self.cache_max = (256 * 1024 * 1024)
207
208         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
209
210         if self._disk_cache:
211             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
212             self.cap_cache()
213
214
215     class CacheSlot(object):
216         __slots__ = ("locator", "ready", "content")
217
218         def __init__(self, locator):
219             self.locator = locator
220             self.ready = threading.Event()
221             self.content = None
222
223         def get(self):
224             self.ready.wait()
225             return self.content
226
227         def set(self, value):
228             self.content = value
229             self.ready.set()
230
231         def size(self):
232             if self.content is None:
233                 return 0
234             else:
235                 return len(self.content)
236
237         def evict(self):
238             return True
239
240     def cap_cache(self):
241         '''Cap the cache size to self.cache_max'''
242         with self._cache_lock:
243             # Select all slots except those where ready.is_set() and content is
244             # None (that means there was an error reading the block).
245             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
246             sm = sum([slot.size() for slot in self._cache])
247             while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
248                 for i in range(len(self._cache)-1, -1, -1):
249                     # start from the back, find a slot that is a candidate to evict
250                     if self._cache[i].ready.is_set():
251                         sz = self._cache[i].size()
252
253                         # If evict returns false it means the
254                         # underlying disk cache couldn't lock the file
255                         # for deletion because another process was using
256                         # it. Don't count it as reducing the amount
257                         # of data in the cache, find something else to
258                         # throw out.
259                         if self._cache[i].evict():
260                             sm -= sz
261
262                         # either way we forget about it.  either the
263                         # other process will delete it, or if we need
264                         # it again and it is still there, we'll find
265                         # it on disk.
266                         del self._cache[i]
267                         break
268
269     def _get(self, locator):
270         # Test if the locator is already in the cache
271         for i in range(0, len(self._cache)):
272             if self._cache[i].locator == locator:
273                 n = self._cache[i]
274                 if i != 0:
275                     # move it to the front
276                     del self._cache[i]
277                     self._cache.insert(0, n)
278                 return n
279         if self._disk_cache:
280             # see if it exists on disk
281             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
282             if n is not None:
283                 self._cache.insert(0, n)
284                 return n
285         return None
286
287     def get(self, locator):
288         with self._cache_lock:
289             return self._get(locator)
290
291     def reserve_cache(self, locator):
292         '''Reserve a cache slot for the specified locator,
293         or return the existing slot.'''
294         with self._cache_lock:
295             n = self._get(locator)
296             if n:
297                 return n, False
298             else:
299                 # Add a new cache slot for the locator
300                 if self._disk_cache:
301                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
302                 else:
303                     n = KeepBlockCache.CacheSlot(locator)
304                 self._cache.insert(0, n)
305                 return n, True
306
307 class Counter(object):
308     def __init__(self, v=0):
309         self._lk = threading.Lock()
310         self._val = v
311
312     def add(self, v):
313         with self._lk:
314             self._val += v
315
316     def get(self):
317         with self._lk:
318             return self._val
319
320
321 class KeepClient(object):
322
323     # Default Keep server connection timeout:  2 seconds
324     # Default Keep server read timeout:       256 seconds
325     # Default Keep server bandwidth minimum:  32768 bytes per second
326     # Default Keep proxy connection timeout:  20 seconds
327     # Default Keep proxy read timeout:        256 seconds
328     # Default Keep proxy bandwidth minimum:   32768 bytes per second
329     DEFAULT_TIMEOUT = (2, 256, 32768)
330     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
331
332
333     class KeepService(object):
334         """Make requests to a single Keep service, and track results.
335
336         A KeepService is intended to last long enough to perform one
337         transaction (GET or PUT) against one Keep service. This can
338         involve calling either get() or put() multiple times in order
339         to retry after transient failures. However, calling both get()
340         and put() on a single instance -- or using the same instance
341         to access two different Keep services -- will not produce
342         sensible behavior.
343         """
344
345         HTTP_ERRORS = (
346             socket.error,
347             ssl.SSLError,
348             arvados.errors.HttpError,
349         )
350
351         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
352                      upload_counter=None,
353                      download_counter=None,
354                      headers={},
355                      insecure=False):
356             self.root = root
357             self._user_agent_pool = user_agent_pool
358             self._result = {'error': None}
359             self._usable = True
360             self._session = None
361             self._socket = None
362             self.get_headers = {'Accept': 'application/octet-stream'}
363             self.get_headers.update(headers)
364             self.put_headers = headers
365             self.upload_counter = upload_counter
366             self.download_counter = download_counter
367             self.insecure = insecure
368
369         def usable(self):
370             """Is it worth attempting a request?"""
371             return self._usable
372
373         def finished(self):
374             """Did the request succeed or encounter permanent failure?"""
375             return self._result['error'] == False or not self._usable
376
377         def last_result(self):
378             return self._result
379
380         def _get_user_agent(self):
381             try:
382                 return self._user_agent_pool.get(block=False)
383             except queue.Empty:
384                 return pycurl.Curl()
385
386         def _put_user_agent(self, ua):
387             try:
388                 ua.reset()
389                 self._user_agent_pool.put(ua, block=False)
390             except:
391                 ua.close()
392
393         def _socket_open(self, *args, **kwargs):
394             if len(args) + len(kwargs) == 2:
395                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
396             else:
397                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
398
399         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
400             return self._socket_open_pycurl_7_21_5(
401                 purpose=None,
402                 address=collections.namedtuple(
403                     'Address', ['family', 'socktype', 'protocol', 'addr'],
404                 )(family, socktype, protocol, address))
405
406         def _socket_open_pycurl_7_21_5(self, purpose, address):
407             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
408             s = socket.socket(address.family, address.socktype, address.protocol)
409             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
410             # Will throw invalid protocol error on mac. This test prevents that.
411             if hasattr(socket, 'TCP_KEEPIDLE'):
412                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
413             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
414             self._socket = s
415             return s
416
417         def get(self, locator, method="GET", timeout=None):
418             # locator is a KeepLocator object.
419             url = self.root + str(locator)
420             _logger.debug("Request: %s %s", method, url)
421             curl = self._get_user_agent()
422             ok = None
423             try:
424                 with timer.Timer() as t:
425                     self._headers = {}
426                     response_body = BytesIO()
427                     curl.setopt(pycurl.NOSIGNAL, 1)
428                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
429                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
430                     curl.setopt(pycurl.URL, url.encode('utf-8'))
431                     curl.setopt(pycurl.HTTPHEADER, [
432                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
433                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
434                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
435                     if self.insecure:
436                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
437                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
438                     else:
439                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
440                     if method == "HEAD":
441                         curl.setopt(pycurl.NOBODY, True)
442                     self._setcurltimeouts(curl, timeout, method=="HEAD")
443
444                     try:
445                         curl.perform()
446                     except Exception as e:
447                         raise arvados.errors.HttpError(0, str(e))
448                     finally:
449                         if self._socket:
450                             self._socket.close()
451                             self._socket = None
452                     self._result = {
453                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
454                         'body': response_body.getvalue(),
455                         'headers': self._headers,
456                         'error': False,
457                     }
458
459                 ok = retry.check_http_response_success(self._result['status_code'])
460                 if not ok:
461                     self._result['error'] = arvados.errors.HttpError(
462                         self._result['status_code'],
463                         self._headers.get('x-status-line', 'Error'))
464             except self.HTTP_ERRORS as e:
465                 self._result = {
466                     'error': e,
467                 }
468             self._usable = ok != False
469             if self._result.get('status_code', None):
470                 # The client worked well enough to get an HTTP status
471                 # code, so presumably any problems are just on the
472                 # server side and it's OK to reuse the client.
473                 self._put_user_agent(curl)
474             else:
475                 # Don't return this client to the pool, in case it's
476                 # broken.
477                 curl.close()
478             if not ok:
479                 _logger.debug("Request fail: GET %s => %s: %s",
480                               url, type(self._result['error']), str(self._result['error']))
481                 return None
482             if method == "HEAD":
483                 _logger.info("HEAD %s: %s bytes",
484                          self._result['status_code'],
485                          self._result.get('content-length'))
486                 if self._result['headers'].get('x-keep-locator'):
487                     # This is a response to a remote block copy request, return
488                     # the local copy block locator.
489                     return self._result['headers'].get('x-keep-locator')
490                 return True
491
492             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
493                          self._result['status_code'],
494                          len(self._result['body']),
495                          t.msecs,
496                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
497
498             if self.download_counter:
499                 self.download_counter.add(len(self._result['body']))
500             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
501             if resp_md5 != locator.md5sum:
502                 _logger.warning("Checksum fail: md5(%s) = %s",
503                                 url, resp_md5)
504                 self._result['error'] = arvados.errors.HttpError(
505                     0, 'Checksum fail')
506                 return None
507             return self._result['body']
508
509         def put(self, hash_s, body, timeout=None, headers={}):
510             put_headers = copy.copy(self.put_headers)
511             put_headers.update(headers)
512             url = self.root + hash_s
513             _logger.debug("Request: PUT %s", url)
514             curl = self._get_user_agent()
515             ok = None
516             try:
517                 with timer.Timer() as t:
518                     self._headers = {}
519                     body_reader = BytesIO(body)
520                     response_body = BytesIO()
521                     curl.setopt(pycurl.NOSIGNAL, 1)
522                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
523                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
524                     curl.setopt(pycurl.URL, url.encode('utf-8'))
525                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
526                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
527                     # response) instead of sending the request body immediately.
528                     # This allows the server to reject the request if the request
529                     # is invalid or the server is read-only, without waiting for
530                     # the client to send the entire block.
531                     curl.setopt(pycurl.UPLOAD, True)
532                     curl.setopt(pycurl.INFILESIZE, len(body))
533                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
534                     curl.setopt(pycurl.HTTPHEADER, [
535                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
536                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
537                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
538                     if self.insecure:
539                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
540                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
541                     else:
542                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
543                     self._setcurltimeouts(curl, timeout)
544                     try:
545                         curl.perform()
546                     except Exception as e:
547                         raise arvados.errors.HttpError(0, str(e))
548                     finally:
549                         if self._socket:
550                             self._socket.close()
551                             self._socket = None
552                     self._result = {
553                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
554                         'body': response_body.getvalue().decode('utf-8'),
555                         'headers': self._headers,
556                         'error': False,
557                     }
558                 ok = retry.check_http_response_success(self._result['status_code'])
559                 if not ok:
560                     self._result['error'] = arvados.errors.HttpError(
561                         self._result['status_code'],
562                         self._headers.get('x-status-line', 'Error'))
563             except self.HTTP_ERRORS as e:
564                 self._result = {
565                     'error': e,
566                 }
567             self._usable = ok != False # still usable if ok is True or None
568             if self._result.get('status_code', None):
569                 # Client is functional. See comment in get().
570                 self._put_user_agent(curl)
571             else:
572                 curl.close()
573             if not ok:
574                 _logger.debug("Request fail: PUT %s => %s: %s",
575                               url, type(self._result['error']), str(self._result['error']))
576                 return False
577             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
578                          self._result['status_code'],
579                          len(body),
580                          t.msecs,
581                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
582             if self.upload_counter:
583                 self.upload_counter.add(len(body))
584             return True
585
586         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
587             if not timeouts:
588                 return
589             elif isinstance(timeouts, tuple):
590                 if len(timeouts) == 2:
591                     conn_t, xfer_t = timeouts
592                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
593                 else:
594                     conn_t, xfer_t, bandwidth_bps = timeouts
595             else:
596                 conn_t, xfer_t = (timeouts, timeouts)
597                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
598             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
599             if not ignore_bandwidth:
600                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
601                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
602
603         def _headerfunction(self, header_line):
604             if isinstance(header_line, bytes):
605                 header_line = header_line.decode('iso-8859-1')
606             if ':' in header_line:
607                 name, value = header_line.split(':', 1)
608                 name = name.strip().lower()
609                 value = value.strip()
610             elif self._headers:
611                 name = self._lastheadername
612                 value = self._headers[name] + ' ' + header_line.strip()
613             elif header_line.startswith('HTTP/'):
614                 name = 'x-status-line'
615                 value = header_line
616             else:
617                 _logger.error("Unexpected header line: %s", header_line)
618                 return
619             self._lastheadername = name
620             self._headers[name] = value
621             # Returning None implies all bytes were written
622
623
624     class KeepWriterQueue(queue.Queue):
625         def __init__(self, copies, classes=[]):
626             queue.Queue.__init__(self) # Old-style superclass
627             self.wanted_copies = copies
628             self.wanted_storage_classes = classes
629             self.successful_copies = 0
630             self.confirmed_storage_classes = {}
631             self.response = None
632             self.storage_classes_tracking = True
633             self.queue_data_lock = threading.RLock()
634             self.pending_tries = max(copies, len(classes))
635             self.pending_tries_notification = threading.Condition()
636
637         def write_success(self, response, replicas_nr, classes_confirmed):
638             with self.queue_data_lock:
639                 self.successful_copies += replicas_nr
640                 if classes_confirmed is None:
641                     self.storage_classes_tracking = False
642                 elif self.storage_classes_tracking:
643                     for st_class, st_copies in classes_confirmed.items():
644                         try:
645                             self.confirmed_storage_classes[st_class] += st_copies
646                         except KeyError:
647                             self.confirmed_storage_classes[st_class] = st_copies
648                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
649                 self.response = response
650             with self.pending_tries_notification:
651                 self.pending_tries_notification.notify_all()
652
653         def write_fail(self, ks):
654             with self.pending_tries_notification:
655                 self.pending_tries += 1
656                 self.pending_tries_notification.notify()
657
658         def pending_copies(self):
659             with self.queue_data_lock:
660                 return self.wanted_copies - self.successful_copies
661
662         def satisfied_classes(self):
663             with self.queue_data_lock:
664                 if not self.storage_classes_tracking:
665                     # Notifies disabled storage classes expectation to
666                     # the outer loop.
667                     return None
668             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
669
670         def pending_classes(self):
671             with self.queue_data_lock:
672                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
673                     return []
674                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
675                 for st_class, st_copies in self.confirmed_storage_classes.items():
676                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
677                         unsatisfied_classes.remove(st_class)
678                 return unsatisfied_classes
679
680         def get_next_task(self):
681             with self.pending_tries_notification:
682                 while True:
683                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
684                         # This notify_all() is unnecessary --
685                         # write_success() already called notify_all()
686                         # when pending<1 became true, so it's not
687                         # possible for any other thread to be in
688                         # wait() now -- but it's cheap insurance
689                         # against deadlock so we do it anyway:
690                         self.pending_tries_notification.notify_all()
691                         # Drain the queue and then raise Queue.Empty
692                         while True:
693                             self.get_nowait()
694                             self.task_done()
695                     elif self.pending_tries > 0:
696                         service, service_root = self.get_nowait()
697                         if service.finished():
698                             self.task_done()
699                             continue
700                         self.pending_tries -= 1
701                         return service, service_root
702                     elif self.empty():
703                         self.pending_tries_notification.notify_all()
704                         raise queue.Empty
705                     else:
706                         self.pending_tries_notification.wait()
707
708
709     class KeepWriterThreadPool(object):
710         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
711             self.total_task_nr = 0
712             if (not max_service_replicas) or (max_service_replicas >= copies):
713                 num_threads = 1
714             else:
715                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
716             _logger.debug("Pool max threads is %d", num_threads)
717             self.workers = []
718             self.queue = KeepClient.KeepWriterQueue(copies, classes)
719             # Create workers
720             for _ in range(num_threads):
721                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
722                 self.workers.append(w)
723
724         def add_task(self, ks, service_root):
725             self.queue.put((ks, service_root))
726             self.total_task_nr += 1
727
728         def done(self):
729             return self.queue.successful_copies, self.queue.satisfied_classes()
730
731         def join(self):
732             # Start workers
733             for worker in self.workers:
734                 worker.start()
735             # Wait for finished work
736             self.queue.join()
737
738         def response(self):
739             return self.queue.response
740
741
742     class KeepWriterThread(threading.Thread):
743         class TaskFailed(RuntimeError): pass
744
745         def __init__(self, queue, data, data_hash, timeout=None):
746             super(KeepClient.KeepWriterThread, self).__init__()
747             self.timeout = timeout
748             self.queue = queue
749             self.data = data
750             self.data_hash = data_hash
751             self.daemon = True
752
753         def run(self):
754             while True:
755                 try:
756                     service, service_root = self.queue.get_next_task()
757                 except queue.Empty:
758                     return
759                 try:
760                     locator, copies, classes = self.do_task(service, service_root)
761                 except Exception as e:
762                     if not isinstance(e, self.TaskFailed):
763                         _logger.exception("Exception in KeepWriterThread")
764                     self.queue.write_fail(service)
765                 else:
766                     self.queue.write_success(locator, copies, classes)
767                 finally:
768                     self.queue.task_done()
769
770         def do_task(self, service, service_root):
771             classes = self.queue.pending_classes()
772             headers = {}
773             if len(classes) > 0:
774                 classes.sort()
775                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
776             success = bool(service.put(self.data_hash,
777                                         self.data,
778                                         timeout=self.timeout,
779                                         headers=headers))
780             result = service.last_result()
781
782             if not success:
783                 if result.get('status_code'):
784                     _logger.debug("Request fail: PUT %s => %s %s",
785                                   self.data_hash,
786                                   result.get('status_code'),
787                                   result.get('body'))
788                 raise self.TaskFailed()
789
790             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
791                           str(threading.current_thread()),
792                           self.data_hash,
793                           len(self.data),
794                           service_root)
795             try:
796                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
797             except (KeyError, ValueError):
798                 replicas_stored = 1
799
800             classes_confirmed = {}
801             try:
802                 scch = result['headers']['x-keep-storage-classes-confirmed']
803                 for confirmation in scch.replace(' ', '').split(','):
804                     if '=' in confirmation:
805                         stored_class, stored_copies = confirmation.split('=')[:2]
806                         classes_confirmed[stored_class] = int(stored_copies)
807             except (KeyError, ValueError):
808                 # Storage classes confirmed header missing or corrupt
809                 classes_confirmed = None
810
811             return result['body'].strip(), replicas_stored, classes_confirmed
812
813
814     def __init__(self, api_client=None, proxy=None,
815                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
816                  api_token=None, local_store=None, block_cache=None,
817                  num_retries=0, session=None):
818         """Initialize a new KeepClient.
819
820         Arguments:
821         :api_client:
822           The API client to use to find Keep services.  If not
823           provided, KeepClient will build one from available Arvados
824           configuration.
825
826         :proxy:
827           If specified, this KeepClient will send requests to this Keep
828           proxy.  Otherwise, KeepClient will fall back to the setting of the
829           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
830           If you want to KeepClient does not use a proxy, pass in an empty
831           string.
832
833         :timeout:
834           The initial timeout (in seconds) for HTTP requests to Keep
835           non-proxy servers.  A tuple of three floats is interpreted as
836           (connection_timeout, read_timeout, minimum_bandwidth). A connection
837           will be aborted if the average traffic rate falls below
838           minimum_bandwidth bytes per second over an interval of read_timeout
839           seconds. Because timeouts are often a result of transient server
840           load, the actual connection timeout will be increased by a factor
841           of two on each retry.
842           Default: (2, 256, 32768).
843
844         :proxy_timeout:
845           The initial timeout (in seconds) for HTTP requests to
846           Keep proxies. A tuple of three floats is interpreted as
847           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
848           described above for adjusting connection timeouts on retry also
849           applies.
850           Default: (20, 256, 32768).
851
852         :api_token:
853           If you're not using an API client, but only talking
854           directly to a Keep proxy, this parameter specifies an API token
855           to authenticate Keep requests.  It is an error to specify both
856           api_client and api_token.  If you specify neither, KeepClient
857           will use one available from the Arvados configuration.
858
859         :local_store:
860           If specified, this KeepClient will bypass Keep
861           services, and save data to the named directory.  If unspecified,
862           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
863           environment variable.  If you want to ensure KeepClient does not
864           use local storage, pass in an empty string.  This is primarily
865           intended to mock a server for testing.
866
867         :num_retries:
868           The default number of times to retry failed requests.
869           This will be used as the default num_retries value when get() and
870           put() are called.  Default 0.
871         """
872         self.lock = threading.Lock()
873         if proxy is None:
874             if config.get('ARVADOS_KEEP_SERVICES'):
875                 proxy = config.get('ARVADOS_KEEP_SERVICES')
876             else:
877                 proxy = config.get('ARVADOS_KEEP_PROXY')
878         if api_token is None:
879             if api_client is None:
880                 api_token = config.get('ARVADOS_API_TOKEN')
881             else:
882                 api_token = api_client.api_token
883         elif api_client is not None:
884             raise ValueError(
885                 "can't build KeepClient with both API client and token")
886         if local_store is None:
887             local_store = os.environ.get('KEEP_LOCAL_STORE')
888
889         if api_client is None:
890             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
891         else:
892             self.insecure = api_client.insecure
893
894         self.block_cache = block_cache if block_cache else KeepBlockCache()
895         self.timeout = timeout
896         self.proxy_timeout = proxy_timeout
897         self._user_agent_pool = queue.LifoQueue()
898         self.upload_counter = Counter()
899         self.download_counter = Counter()
900         self.put_counter = Counter()
901         self.get_counter = Counter()
902         self.hits_counter = Counter()
903         self.misses_counter = Counter()
904         self._storage_classes_unsupported_warning = False
905         self._default_classes = []
906
907         if local_store:
908             self.local_store = local_store
909             self.head = self.local_store_head
910             self.get = self.local_store_get
911             self.put = self.local_store_put
912         else:
913             self.num_retries = num_retries
914             self.max_replicas_per_service = None
915             if proxy:
916                 proxy_uris = proxy.split()
917                 for i in range(len(proxy_uris)):
918                     if not proxy_uris[i].endswith('/'):
919                         proxy_uris[i] += '/'
920                     # URL validation
921                     url = urllib.parse.urlparse(proxy_uris[i])
922                     if not (url.scheme and url.netloc):
923                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
924                 self.api_token = api_token
925                 self._gateway_services = {}
926                 self._keep_services = [{
927                     'uuid': "00000-bi6l4-%015d" % idx,
928                     'service_type': 'proxy',
929                     '_service_root': uri,
930                     } for idx, uri in enumerate(proxy_uris)]
931                 self._writable_services = self._keep_services
932                 self.using_proxy = True
933                 self._static_services_list = True
934             else:
935                 # It's important to avoid instantiating an API client
936                 # unless we actually need one, for testing's sake.
937                 if api_client is None:
938                     api_client = arvados.api('v1')
939                 self.api_client = api_client
940                 self.api_token = api_client.api_token
941                 self._gateway_services = {}
942                 self._keep_services = None
943                 self._writable_services = None
944                 self.using_proxy = None
945                 self._static_services_list = False
946                 try:
947                     self._default_classes = [
948                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
949                 except KeyError:
950                     # We're talking to an old cluster
951                     pass
952
953     def current_timeout(self, attempt_number):
954         """Return the appropriate timeout to use for this client.
955
956         The proxy timeout setting if the backend service is currently a proxy,
957         the regular timeout setting otherwise.  The `attempt_number` indicates
958         how many times the operation has been tried already (starting from 0
959         for the first try), and scales the connection timeout portion of the
960         return value accordingly.
961
962         """
963         # TODO(twp): the timeout should be a property of a
964         # KeepService, not a KeepClient. See #4488.
965         t = self.proxy_timeout if self.using_proxy else self.timeout
966         if len(t) == 2:
967             return (t[0] * (1 << attempt_number), t[1])
968         else:
969             return (t[0] * (1 << attempt_number), t[1], t[2])
970     def _any_nondisk_services(self, service_list):
971         return any(ks.get('service_type', 'disk') != 'disk'
972                    for ks in service_list)
973
974     def build_services_list(self, force_rebuild=False):
975         if (self._static_services_list or
976               (self._keep_services and not force_rebuild)):
977             return
978         with self.lock:
979             try:
980                 keep_services = self.api_client.keep_services().accessible()
981             except Exception:  # API server predates Keep services.
982                 keep_services = self.api_client.keep_disks().list()
983
984             # Gateway services are only used when specified by UUID,
985             # so there's nothing to gain by filtering them by
986             # service_type.
987             self._gateway_services = {ks['uuid']: ks for ks in
988                                       keep_services.execute()['items']}
989             if not self._gateway_services:
990                 raise arvados.errors.NoKeepServersError()
991
992             # Precompute the base URI for each service.
993             for r in self._gateway_services.values():
994                 host = r['service_host']
995                 if not host.startswith('[') and host.find(':') >= 0:
996                     # IPv6 URIs must be formatted like http://[::1]:80/...
997                     host = '[' + host + ']'
998                 r['_service_root'] = "{}://{}:{:d}/".format(
999                     'https' if r['service_ssl_flag'] else 'http',
1000                     host,
1001                     r['service_port'])
1002
1003             _logger.debug(str(self._gateway_services))
1004             self._keep_services = [
1005                 ks for ks in self._gateway_services.values()
1006                 if not ks.get('service_type', '').startswith('gateway:')]
1007             self._writable_services = [ks for ks in self._keep_services
1008                                        if not ks.get('read_only')]
1009
1010             # For disk type services, max_replicas_per_service is 1
1011             # It is unknown (unlimited) for other service types.
1012             if self._any_nondisk_services(self._writable_services):
1013                 self.max_replicas_per_service = None
1014             else:
1015                 self.max_replicas_per_service = 1
1016
1017     def _service_weight(self, data_hash, service_uuid):
1018         """Compute the weight of a Keep service endpoint for a data
1019         block with a known hash.
1020
1021         The weight is md5(h + u) where u is the last 15 characters of
1022         the service endpoint's UUID.
1023         """
1024         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1025
1026     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1027         """Return an array of Keep service endpoints, in the order in
1028         which they should be probed when reading or writing data with
1029         the given hash+hints.
1030         """
1031         self.build_services_list(force_rebuild)
1032
1033         sorted_roots = []
1034         # Use the services indicated by the given +K@... remote
1035         # service hints, if any are present and can be resolved to a
1036         # URI.
1037         for hint in locator.hints:
1038             if hint.startswith('K@'):
1039                 if len(hint) == 7:
1040                     sorted_roots.append(
1041                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1042                 elif len(hint) == 29:
1043                     svc = self._gateway_services.get(hint[2:])
1044                     if svc:
1045                         sorted_roots.append(svc['_service_root'])
1046
1047         # Sort the available local services by weight (heaviest first)
1048         # for this locator, and return their service_roots (base URIs)
1049         # in that order.
1050         use_services = self._keep_services
1051         if need_writable:
1052             use_services = self._writable_services
1053         self.using_proxy = self._any_nondisk_services(use_services)
1054         sorted_roots.extend([
1055             svc['_service_root'] for svc in sorted(
1056                 use_services,
1057                 reverse=True,
1058                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1059         _logger.debug("{}: {}".format(locator, sorted_roots))
1060         return sorted_roots
1061
1062     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1063         # roots_map is a dictionary, mapping Keep service root strings
1064         # to KeepService objects.  Poll for Keep services, and add any
1065         # new ones to roots_map.  Return the current list of local
1066         # root strings.
1067         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1068         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1069         for root in local_roots:
1070             if root not in roots_map:
1071                 roots_map[root] = self.KeepService(
1072                     root, self._user_agent_pool,
1073                     upload_counter=self.upload_counter,
1074                     download_counter=self.download_counter,
1075                     headers=headers,
1076                     insecure=self.insecure)
1077         return local_roots
1078
1079     @staticmethod
1080     def _check_loop_result(result):
1081         # KeepClient RetryLoops should save results as a 2-tuple: the
1082         # actual result of the request, and the number of servers available
1083         # to receive the request this round.
1084         # This method returns True if there's a real result, False if
1085         # there are no more servers available, otherwise None.
1086         if isinstance(result, Exception):
1087             return None
1088         result, tried_server_count = result
1089         if (result is not None) and (result is not False):
1090             return True
1091         elif tried_server_count < 1:
1092             _logger.info("No more Keep services to try; giving up")
1093             return False
1094         else:
1095             return None
1096
1097     def get_from_cache(self, loc_s):
1098         """Fetch a block only if is in the cache, otherwise return None."""
1099         locator = KeepLocator(loc_s)
1100         slot = self.block_cache.get(locator.md5sum)
1101         if slot is not None and slot.ready.is_set():
1102             return slot.get()
1103         else:
1104             return None
1105
1106     def refresh_signature(self, loc):
1107         """Ask Keep to get the remote block and return its local signature"""
1108         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1109         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1110
1111     @retry.retry_method
1112     def head(self, loc_s, **kwargs):
1113         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1114
1115     @retry.retry_method
1116     def get(self, loc_s, **kwargs):
1117         return self._get_or_head(loc_s, method="GET", **kwargs)
1118
1119     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1120         """Get data from Keep.
1121
1122         This method fetches one or more blocks of data from Keep.  It
1123         sends a request each Keep service registered with the API
1124         server (or the proxy provided when this client was
1125         instantiated), then each service named in location hints, in
1126         sequence.  As soon as one service provides the data, it's
1127         returned.
1128
1129         Arguments:
1130         * loc_s: A string of one or more comma-separated locators to fetch.
1131           This method returns the concatenation of these blocks.
1132         * num_retries: The number of times to retry GET requests to
1133           *each* Keep server if it returns temporary failures, with
1134           exponential backoff.  Note that, in each loop, the method may try
1135           to fetch data from every available Keep service, along with any
1136           that are named in location hints in the locator.  The default value
1137           is set when the KeepClient is initialized.
1138         """
1139         if ',' in loc_s:
1140             return ''.join(self.get(x) for x in loc_s.split(','))
1141
1142         self.get_counter.add(1)
1143
1144         request_id = (request_id or
1145                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1146                       arvados.util.new_request_id())
1147         if headers is None:
1148             headers = {}
1149         headers['X-Request-Id'] = request_id
1150
1151         slot = None
1152         blob = None
1153         try:
1154             locator = KeepLocator(loc_s)
1155             if method == "GET":
1156                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1157                 if not first:
1158                     if prefetch:
1159                         # this is request for a prefetch, if it is
1160                         # already in flight, return immediately.
1161                         # clear 'slot' to prevent finally block from
1162                         # calling slot.set()
1163                         slot = None
1164                         return None
1165                     self.hits_counter.add(1)
1166                     blob = slot.get()
1167                     if blob is None:
1168                         raise arvados.errors.KeepReadError(
1169                             "failed to read {}".format(loc_s))
1170                     return blob
1171
1172             self.misses_counter.add(1)
1173
1174             # If the locator has hints specifying a prefix (indicating a
1175             # remote keepproxy) or the UUID of a local gateway service,
1176             # read data from the indicated service(s) instead of the usual
1177             # list of local disk services.
1178             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1179                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1180             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1181                                for hint in locator.hints if (
1182                                        hint.startswith('K@') and
1183                                        len(hint) == 29 and
1184                                        self._gateway_services.get(hint[2:])
1185                                        )])
1186             # Map root URLs to their KeepService objects.
1187             roots_map = {
1188                 root: self.KeepService(root, self._user_agent_pool,
1189                                        upload_counter=self.upload_counter,
1190                                        download_counter=self.download_counter,
1191                                        headers=headers,
1192                                        insecure=self.insecure)
1193                 for root in hint_roots
1194             }
1195
1196             # See #3147 for a discussion of the loop implementation.  Highlights:
1197             # * Refresh the list of Keep services after each failure, in case
1198             #   it's being updated.
1199             # * Retry until we succeed, we're out of retries, or every available
1200             #   service has returned permanent failure.
1201             sorted_roots = []
1202             roots_map = {}
1203             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1204                                    backoff_start=2)
1205             for tries_left in loop:
1206                 try:
1207                     sorted_roots = self.map_new_services(
1208                         roots_map, locator,
1209                         force_rebuild=(tries_left < num_retries),
1210                         need_writable=False,
1211                         headers=headers)
1212                 except Exception as error:
1213                     loop.save_result(error)
1214                     continue
1215
1216                 # Query KeepService objects that haven't returned
1217                 # permanent failure, in our specified shuffle order.
1218                 services_to_try = [roots_map[root]
1219                                    for root in sorted_roots
1220                                    if roots_map[root].usable()]
1221                 for keep_service in services_to_try:
1222                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1223                     if blob is not None:
1224                         break
1225                 loop.save_result((blob, len(services_to_try)))
1226
1227             # Always cache the result, then return it if we succeeded.
1228             if loop.success():
1229                 return blob
1230         finally:
1231             if slot is not None:
1232                 slot.set(blob)
1233                 self.block_cache.cap_cache()
1234
1235         # Q: Including 403 is necessary for the Keep tests to continue
1236         # passing, but maybe they should expect KeepReadError instead?
1237         not_founds = sum(1 for key in sorted_roots
1238                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1239         service_errors = ((key, roots_map[key].last_result()['error'])
1240                           for key in sorted_roots)
1241         if not roots_map:
1242             raise arvados.errors.KeepReadError(
1243                 "[{}] failed to read {}: no Keep services available ({})".format(
1244                     request_id, loc_s, loop.last_result()))
1245         elif not_founds == len(sorted_roots):
1246             raise arvados.errors.NotFoundError(
1247                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1248         else:
1249             raise arvados.errors.KeepReadError(
1250                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1251
1252     @retry.retry_method
1253     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1254         """Save data in Keep.
1255
1256         This method will get a list of Keep services from the API server, and
1257         send the data to each one simultaneously in a new thread.  Once the
1258         uploads are finished, if enough copies are saved, this method returns
1259         the most recent HTTP response body.  If requests fail to upload
1260         enough copies, this method raises KeepWriteError.
1261
1262         Arguments:
1263         * data: The string of data to upload.
1264         * copies: The number of copies that the user requires be saved.
1265           Default 2.
1266         * num_retries: The number of times to retry PUT requests to
1267           *each* Keep server if it returns temporary failures, with
1268           exponential backoff.  The default value is set when the
1269           KeepClient is initialized.
1270         * classes: An optional list of storage class names where copies should
1271           be written.
1272         """
1273
1274         classes = classes or self._default_classes
1275
1276         if not isinstance(data, bytes):
1277             data = data.encode()
1278
1279         self.put_counter.add(1)
1280
1281         data_hash = hashlib.md5(data).hexdigest()
1282         loc_s = data_hash + '+' + str(len(data))
1283         if copies < 1:
1284             return loc_s
1285         locator = KeepLocator(loc_s)
1286
1287         request_id = (request_id or
1288                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1289                       arvados.util.new_request_id())
1290         headers = {
1291             'X-Request-Id': request_id,
1292             'X-Keep-Desired-Replicas': str(copies),
1293         }
1294         roots_map = {}
1295         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1296                                backoff_start=2)
1297         done_copies = 0
1298         done_classes = []
1299         for tries_left in loop:
1300             try:
1301                 sorted_roots = self.map_new_services(
1302                     roots_map, locator,
1303                     force_rebuild=(tries_left < num_retries),
1304                     need_writable=True,
1305                     headers=headers)
1306             except Exception as error:
1307                 loop.save_result(error)
1308                 continue
1309
1310             pending_classes = []
1311             if done_classes is not None:
1312                 pending_classes = list(set(classes) - set(done_classes))
1313             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1314                                                         data_hash=data_hash,
1315                                                         copies=copies - done_copies,
1316                                                         max_service_replicas=self.max_replicas_per_service,
1317                                                         timeout=self.current_timeout(num_retries - tries_left),
1318                                                         classes=pending_classes)
1319             for service_root, ks in [(root, roots_map[root])
1320                                      for root in sorted_roots]:
1321                 if ks.finished():
1322                     continue
1323                 writer_pool.add_task(ks, service_root)
1324             writer_pool.join()
1325             pool_copies, pool_classes = writer_pool.done()
1326             done_copies += pool_copies
1327             if (done_classes is not None) and (pool_classes is not None):
1328                 done_classes += pool_classes
1329                 loop.save_result(
1330                     (done_copies >= copies and set(done_classes) == set(classes),
1331                     writer_pool.total_task_nr))
1332             else:
1333                 # Old keepstore contacted without storage classes support:
1334                 # success is determined only by successful copies.
1335                 #
1336                 # Disable storage classes tracking from this point forward.
1337                 if not self._storage_classes_unsupported_warning:
1338                     self._storage_classes_unsupported_warning = True
1339                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1340                 done_classes = None
1341                 loop.save_result(
1342                     (done_copies >= copies, writer_pool.total_task_nr))
1343
1344         if loop.success():
1345             return writer_pool.response()
1346         if not roots_map:
1347             raise arvados.errors.KeepWriteError(
1348                 "[{}] failed to write {}: no Keep services available ({})".format(
1349                     request_id, data_hash, loop.last_result()))
1350         else:
1351             service_errors = ((key, roots_map[key].last_result()['error'])
1352                               for key in sorted_roots
1353                               if roots_map[key].last_result()['error'])
1354             raise arvados.errors.KeepWriteError(
1355                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1356                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1357
1358     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1359         """A stub for put().
1360
1361         This method is used in place of the real put() method when
1362         using local storage (see constructor's local_store argument).
1363
1364         copies and num_retries arguments are ignored: they are here
1365         only for the sake of offering the same call signature as
1366         put().
1367
1368         Data stored this way can be retrieved via local_store_get().
1369         """
1370         md5 = hashlib.md5(data).hexdigest()
1371         locator = '%s+%d' % (md5, len(data))
1372         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1373             f.write(data)
1374         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1375                   os.path.join(self.local_store, md5))
1376         return locator
1377
1378     def local_store_get(self, loc_s, num_retries=None):
1379         """Companion to local_store_put()."""
1380         try:
1381             locator = KeepLocator(loc_s)
1382         except ValueError:
1383             raise arvados.errors.NotFoundError(
1384                 "Invalid data locator: '%s'" % loc_s)
1385         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1386             return b''
1387         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1388             return f.read()
1389
1390     def local_store_head(self, loc_s, num_retries=None):
1391         """Companion to local_store_put()."""
1392         try:
1393             locator = KeepLocator(loc_s)
1394         except ValueError:
1395             raise arvados.errors.NotFoundError(
1396                 "Invalid data locator: '%s'" % loc_s)
1397         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1398             return True
1399         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1400             return True