-import cStringIO
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import range
+from builtins import object
+import io
import datetime
import hashlib
import logging
import math
import os
import pycurl
-import Queue
+import queue
import re
import socket
import ssl
import sys
import threading
-import timer
-import urlparse
+from . import timer
+import urllib.parse
+
+if sys.version_info >= (3, 0):
+ from io import BytesIO
+else:
+ from cStringIO import StringIO as BytesIO
import arvados
import arvados.config as config
self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
sm = sum([slot.size() for slot in self._cache])
while len(self._cache) > 0 and sm > self.cache_max:
- for i in xrange(len(self._cache)-1, -1, -1):
+ for i in range(len(self._cache)-1, -1, -1):
if self._cache[i].ready.is_set():
del self._cache[i]
break
def _get(self, locator):
# Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
+ for i in range(0, len(self._cache)):
if self._cache[i].locator == locator:
n = self._cache[i]
if i != 0:
arvados.errors.HttpError,
)
- def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+ def __init__(self, root, user_agent_pool=queue.LifoQueue(),
upload_counter=None,
download_counter=None, **headers):
self.root = root
def _get_user_agent(self):
try:
return self._user_agent_pool.get(block=False)
- except Queue.Empty:
+ except queue.Empty:
return pycurl.Curl()
def _put_user_agent(self, ua):
try:
with timer.Timer() as t:
self._headers = {}
- response_body = cStringIO.StringIO()
+ response_body = BytesIO()
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
curl.setopt(pycurl.URL, url.encode('utf-8'))
curl.setopt(pycurl.HTTPHEADER, [
- '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+ '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
if method == "HEAD":
self._result['status_code'],
len(self._result['body']),
t.msecs,
- (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
if self.download_counter:
self.download_counter.add(len(self._result['body']))
try:
with timer.Timer() as t:
self._headers = {}
- body_reader = cStringIO.StringIO(body)
- response_body = cStringIO.StringIO()
+ body_reader = BytesIO(body)
+ response_body = BytesIO()
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
curl.setopt(pycurl.URL, url.encode('utf-8'))
curl.setopt(pycurl.INFILESIZE, len(body))
curl.setopt(pycurl.READFUNCTION, body_reader.read)
curl.setopt(pycurl.HTTPHEADER, [
- '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+ '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
self._setcurltimeouts(curl, timeout)
self._result['status_code'],
len(body),
t.msecs,
- (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
if self.upload_counter:
self.upload_counter.add(len(body))
return True
curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
- header_line = header_line.decode('iso-8859-1')
+ if isinstance(header_line, bytes):
+ header_line = header_line.decode('iso-8859-1')
if ':' in header_line:
name, value = header_line.split(':', 1)
name = name.strip().lower()
# Returning None implies all bytes were written
- class KeepWriterQueue(Queue.Queue):
+ class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
- Queue.Queue.__init__(self) # Old-style superclass
+ queue.Queue.__init__(self) # Old-style superclass
self.wanted_copies = copies
self.successful_copies = 0
self.response = None
return service, service_root
elif self.empty():
self.pending_tries_notification.notify_all()
- raise Queue.Empty
+ raise queue.Empty
else:
self.pending_tries_notification.wait()
if (not max_service_replicas) or (max_service_replicas >= copies):
num_threads = 1
else:
- num_threads = int(math.ceil(float(copies) / max_service_replicas))
+ num_threads = int(math.ceil(1.0*copies/max_service_replicas))
_logger.debug("Pool max threads is %d", num_threads)
self.workers = []
self.queue = KeepClient.KeepWriterQueue(copies)
while True:
try:
service, service_root = self.queue.get_next_task()
- except Queue.Empty:
+ except queue.Empty:
return
try:
locator, copies = self.do_task(service, service_root)
self.block_cache = block_cache if block_cache else KeepBlockCache()
self.timeout = timeout
self.proxy_timeout = proxy_timeout
- self._user_agent_pool = Queue.LifoQueue()
+ self._user_agent_pool = queue.LifoQueue()
self.upload_counter = Counter()
self.download_counter = Counter()
self.put_counter = Counter()
if not proxy_uris[i].endswith('/'):
proxy_uris[i] += '/'
# URL validation
- url = urlparse.urlparse(proxy_uris[i])
+ url = urllib.parse.urlparse(proxy_uris[i])
if not (url.scheme and url.netloc):
raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
self.api_token = api_token
raise arvados.errors.NoKeepServersError()
# Precompute the base URI for each service.
- for r in self._gateway_services.itervalues():
+ for r in self._gateway_services.values():
host = r['service_host']
if not host.startswith('[') and host.find(':') >= 0:
# IPv6 URIs must be formatted like http://[::1]:80/...
_logger.debug(str(self._gateway_services))
self._keep_services = [
- ks for ks in self._gateway_services.itervalues()
+ ks for ks in self._gateway_services.values()
if not ks.get('service_type', '').startswith('gateway:')]
self._writable_services = [ks for ks in self._keep_services
if not ks.get('read_only')]
KeepClient is initialized.
"""
- if isinstance(data, unicode):
- data = data.encode("ascii")
- elif not isinstance(data, str):
- raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+ if not isinstance(data, bytes):
+ data = data.encode()
self.put_counter.add(1)