Merge branch '8784-dir-listings'
[arvados.git] / sdk / python / arvados / keep.py
index db7835be3746f8f67eddd61d2aac505356e601f4..e6e93f080659abf9a51cec9d4425cafe8bdc976b 100644 (file)
@@ -1,18 +1,37 @@
-import cStringIO
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import native_str
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import range
+from builtins import object
+import collections
 import datetime
 import hashlib
+import io
 import logging
 import math
 import os
 import pycurl
-import Queue
+import queue
 import re
 import socket
 import ssl
 import sys
 import threading
-import timer
-import urlparse
+from . import timer
+import urllib.parse
+
+if sys.version_info >= (3, 0):
+    from io import BytesIO
+else:
+    from cStringIO import StringIO as BytesIO
 
 import arvados
 import arvados.config as config
@@ -59,8 +78,9 @@ class KeepLocator(object):
 
     def __str__(self):
         return '+'.join(
-            str(s) for s in [self.md5sum, self.size,
-                             self.permission_hint()] + self.hints
+            native_str(s)
+            for s in [self.md5sum, self.size,
+                      self.permission_hint()] + self.hints
             if s is not None)
 
     def stripped(self):
@@ -77,7 +97,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} is not a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {!r}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -190,7 +210,7 @@ class KeepBlockCache(object):
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
             while len(self._cache) > 0 and sm > self.cache_max:
-                for i in xrange(len(self._cache)-1, -1, -1):
+                for i in range(len(self._cache)-1, -1, -1):
                     if self._cache[i].ready.is_set():
                         del self._cache[i]
                         break
@@ -198,7 +218,7 @@ class KeepBlockCache(object):
 
     def _get(self, locator):
         # Test if the locator is already in the cache
-        for i in xrange(0, len(self._cache)):
+        for i in range(0, len(self._cache)):
             if self._cache[i].locator == locator:
                 n = self._cache[i]
                 if i != 0:
@@ -269,7 +289,7 @@ class KeepClient(object):
             arvados.errors.HttpError,
         )
 
-        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
                      upload_counter=None,
                      download_counter=None, **headers):
             self.root = root
@@ -277,6 +297,7 @@ class KeepClient(object):
             self._result = {'error': None}
             self._usable = True
             self._session = None
+            self._socket = None
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
@@ -296,26 +317,39 @@ class KeepClient(object):
 
         def _get_user_agent(self):
             try:
-                return self._user_agent_pool.get(False)
-            except Queue.Empty:
+                return self._user_agent_pool.get(block=False)
+            except queue.Empty:
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
             try:
                 ua.reset()
-                self._user_agent_pool.put(ua, False)
+                self._user_agent_pool.put(ua, block=False)
             except:
                 ua.close()
 
-        @staticmethod
-        def _socket_open(family, socktype, protocol, address=None):
+        def _socket_open(self, *args, **kwargs):
+            if len(args) + len(kwargs) == 2:
+                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+            else:
+                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+            return self._socket_open_pycurl_7_21_5(
+                purpose=None,
+                address=collections.namedtuple(
+                    'Address', ['family', 'socktype', 'protocol', 'addr'],
+                )(family, socktype, protocol, address))
+
+        def _socket_open_pycurl_7_21_5(self, purpose, address):
             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
-            s = socket.socket(family, socktype, protocol)
+            s = socket.socket(address.family, address.socktype, address.protocol)
             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
             # Will throw invalid protocol error on mac. This test prevents that.
             if hasattr(socket, 'TCP_KEEPIDLE'):
                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            self._socket = s
             return s
 
         def get(self, locator, method="GET", timeout=None):
@@ -327,12 +361,13 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    response_body = cStringIO.StringIO()
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if method == "HEAD":
@@ -343,6 +378,10 @@ class KeepClient(object):
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                         'body': response_body.getvalue(),
@@ -383,7 +422,7 @@ class KeepClient(object):
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
-                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
@@ -404,10 +443,11 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    body_reader = cStringIO.StringIO(body)
-                    response_body = cStringIO.StringIO()
+                    body_reader = BytesIO(body)
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
@@ -419,7 +459,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     self._setcurltimeouts(curl, timeout)
@@ -427,9 +467,13 @@ class KeepClient(object):
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
-                        'body': response_body.getvalue(),
+                        'body': response_body.getvalue().decode('utf-8'),
                         'headers': self._headers,
                         'error': False,
                     }
@@ -456,7 +500,7 @@ class KeepClient(object):
                          self._result['status_code'],
                          len(body),
                          t.msecs,
-                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
             if self.upload_counter:
                 self.upload_counter.add(len(body))
             return True
@@ -478,7 +522,8 @@ class KeepClient(object):
             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
-            header_line = header_line.decode('iso-8859-1')
+            if isinstance(header_line, bytes):
+                header_line = header_line.decode('iso-8859-1')
             if ':' in header_line:
                 name, value = header_line.split(':', 1)
                 name = name.strip().lower()
@@ -497,9 +542,9 @@ class KeepClient(object):
             # Returning None implies all bytes were written
     
 
-    class KeepWriterQueue(Queue.Queue):
+    class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
-            Queue.Queue.__init__(self) # Old-style superclass
+            queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
             self.successful_copies = 0
             self.response = None
@@ -511,8 +556,10 @@ class KeepClient(object):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
+            with self.pending_tries_notification:
+                self.pending_tries_notification.notify_all()
         
-        def write_fail(self, ks, status_code):
+        def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
@@ -520,8 +567,36 @@ class KeepClient(object):
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
-    
-    
+
+        def get_next_task(self):
+            with self.pending_tries_notification:
+                while True:
+                    if self.pending_copies() < 1:
+                        # This notify_all() is unnecessary --
+                        # write_success() already called notify_all()
+                        # when pending<1 became true, so it's not
+                        # possible for any other thread to be in
+                        # wait() now -- but it's cheap insurance
+                        # against deadlock so we do it anyway:
+                        self.pending_tries_notification.notify_all()
+                        # Drain the queue and then raise Queue.Empty
+                        while True:
+                            self.get_nowait()
+                            self.task_done()
+                    elif self.pending_tries > 0:
+                        service, service_root = self.get_nowait()
+                        if service.finished():
+                            self.task_done()
+                            continue
+                        self.pending_tries -= 1
+                        return service, service_root
+                    elif self.empty():
+                        self.pending_tries_notification.notify_all()
+                        raise queue.Empty
+                    else:
+                        self.pending_tries_notification.wait()
+
+
     class KeepWriterThreadPool(object):
         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
             self.total_task_nr = 0
@@ -529,7 +604,7 @@ class KeepClient(object):
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
-                num_threads = int(math.ceil(float(copies) / max_service_replicas))
+                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
@@ -551,74 +626,64 @@ class KeepClient(object):
                 worker.start()
             # Wait for finished work
             self.queue.join()
-            with self.queue.pending_tries_notification:
-                self.queue.pending_tries_notification.notify_all()
-            for worker in self.workers:
-                worker.join()
         
         def response(self):
             return self.queue.response
     
     
     class KeepWriterThread(threading.Thread):
+        TaskFailed = RuntimeError()
+
         def __init__(self, queue, data, data_hash, timeout=None):
             super(KeepClient.KeepWriterThread, self).__init__()
             self.timeout = timeout
             self.queue = queue
             self.data = data
             self.data_hash = data_hash
-        
+            self.daemon = True
+
         def run(self):
-            while not self.queue.empty():
-                if self.queue.pending_copies() > 0:
-                    # Avoid overreplication, wait for some needed re-attempt
-                    with self.queue.pending_tries_notification:
-                        if self.queue.pending_tries <= 0:
-                            self.queue.pending_tries_notification.wait()
-                            continue # try again when awake
-                        self.queue.pending_tries -= 1
-
-                    # Get to work
-                    try:
-                        service, service_root = self.queue.get_nowait()
-                    except Queue.Empty:
-                        continue
-                    if service.finished():
-                        self.queue.task_done()
-                        continue
-                    success = bool(service.put(self.data_hash,
-                                                self.data,
-                                                timeout=self.timeout))
-                    result = service.last_result()
-                    if success:
-                        _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
-                                      str(threading.current_thread()),
-                                      self.data_hash,
-                                      len(self.data),
-                                      service_root)
-                        try:
-                            replicas_stored = int(result['headers']['x-keep-replicas-stored'])
-                        except (KeyError, ValueError):
-                            replicas_stored = 1
-                        
-                        self.queue.write_success(result['body'].strip(), replicas_stored)
-                    else:
-                        if result.get('status_code', None):
-                            _logger.debug("Request fail: PUT %s => %s %s",
-                                          self.data_hash,
-                                          result['status_code'],
-                                          result['body'])
-                        self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
-                    # Mark as done so the queue can be join()ed
-                    self.queue.task_done()
+            while True:
+                try:
+                    service, service_root = self.queue.get_next_task()
+                except queue.Empty:
+                    return
+                try:
+                    locator, copies = self.do_task(service, service_root)
+                except Exception as e:
+                    if e is not self.TaskFailed:
+                        _logger.exception("Exception in KeepWriterThread")
+                    self.queue.write_fail(service)
                 else:
-                    # Remove the task from the queue anyways
-                    try:
-                        self.queue.get_nowait()
-                        # Mark as done so the queue can be join()ed
-                        self.queue.task_done()
-                    except Queue.Empty:
-                        continue
+                    self.queue.write_success(locator, copies)
+                finally:
+                    self.queue.task_done()
+
+        def do_task(self, service, service_root):
+            success = bool(service.put(self.data_hash,
+                                        self.data,
+                                        timeout=self.timeout))
+            result = service.last_result()
+
+            if not success:
+                if result.get('status_code', None):
+                    _logger.debug("Request fail: PUT %s => %s %s",
+                                  self.data_hash,
+                                  result['status_code'],
+                                  result['body'])
+                raise self.TaskFailed
+
+            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+                          str(threading.current_thread()),
+                          self.data_hash,
+                          len(self.data),
+                          service_root)
+            try:
+                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+            except (KeyError, ValueError):
+                replicas_stored = 1
+
+            return result['body'].strip(), replicas_stored
 
 
     def __init__(self, api_client=None, proxy=None,
@@ -699,7 +764,7 @@ class KeepClient(object):
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
-        self._user_agent_pool = Queue.LifoQueue()
+        self._user_agent_pool = queue.LifoQueue()
         self.upload_counter = Counter()
         self.download_counter = Counter()
         self.put_counter = Counter()
@@ -720,7 +785,7 @@ class KeepClient(object):
                     if not proxy_uris[i].endswith('/'):
                         proxy_uris[i] += '/'
                     # URL validation
-                    url = urlparse.urlparse(proxy_uris[i])
+                    url = urllib.parse.urlparse(proxy_uris[i])
                     if not (url.scheme and url.netloc):
                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
                 self.api_token = api_token
@@ -786,7 +851,7 @@ class KeepClient(object):
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
-            for r in self._gateway_services.itervalues():
+            for r in self._gateway_services.values():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -798,7 +863,7 @@ class KeepClient(object):
 
             _logger.debug(str(self._gateway_services))
             self._keep_services = [
-                ks for ks in self._gateway_services.itervalues()
+                ks for ks in self._gateway_services.values()
                 if not ks.get('service_type', '').startswith('gateway:')]
             self._writable_services = [ks for ks in self._keep_services
                                        if not ks.get('read_only')]
@@ -817,7 +882,7 @@ class KeepClient(object):
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
-        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
 
     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
@@ -1038,10 +1103,8 @@ class KeepClient(object):
           KeepClient is initialized.
         """
 
-        if isinstance(data, unicode):
-            data = data.encode("ascii")
-        elif not isinstance(data, str):
-            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+        if not isinstance(data, bytes):
+            data = data.encode()
 
         self.put_counter.add(1)
 
@@ -1109,7 +1172,7 @@ class KeepClient(object):
         """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
-        with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
+        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
             f.write(data)
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
@@ -1123,8 +1186,8 @@ class KeepClient(object):
             raise arvados.errors.NotFoundError(
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
-            return ''
-        with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
+            return b''
+        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
     def is_cached(self, locator):