-import bz2
+import cStringIO
import datetime
-import fcntl
-import functools
-import gflags
import hashlib
-import json
import logging
import os
-import pprint
import pycurl
import Queue
import re
import socket
import ssl
-import string
-import cStringIO
-import subprocess
-import sys
import threading
-import time
import timer
-import types
-import UserDict
-import util
-import zlib
import arvados
import arvados.config as config
Should be used in a "with" block.
"""
def __init__(self, todo):
+ self._started = 0
self._todo = todo
self._done = 0
self._response = None
+ self._start_lock = threading.Condition()
self._todo_lock = threading.Semaphore(todo)
self._done_lock = threading.Lock()
+ self._local = threading.local()
def __enter__(self):
+ self._start_lock.acquire()
+ if getattr(self._local, 'sequence', None) is not None:
+ # If the calling thread has used set_sequence(N), then
+ # we wait here until N other threads have started.
+ while self._started < self._local.sequence:
+ self._start_lock.wait()
self._todo_lock.acquire()
+ self._started += 1
+ self._start_lock.notifyAll()
+ self._start_lock.release()
return self
def __exit__(self, type, value, traceback):
self._todo_lock.release()
+ def set_sequence(self, sequence):
+ self._local.sequence = sequence
+
def shall_i_proceed(self):
"""
Return true if the current thread should do stuff. Return
return self._success
def run(self):
- with self.args['thread_limiter'] as limiter:
+ limiter = self.args['thread_limiter']
+ sequence = self.args['thread_sequence']
+ if sequence is not None:
+ limiter.set_sequence(sequence)
+ with limiter:
if not limiter.shall_i_proceed():
# My turn arrived, but the job has been done without
# me.
self._gateway_services = {}
self._keep_services = [{
'uuid': 'proxy',
+ 'service_type': 'proxy',
'_service_root': proxy,
}]
self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
- self.thread_count = None
+ self.max_replicas_per_service = None
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
self._writable_services = None
self.using_proxy = None
self._static_services_list = False
- self.thread_count = None
+ self.max_replicas_per_service = 1
def current_timeout(self, attempt_number):
"""Return the appropriate timeout to use for this client.
t = self.proxy_timeout if self.using_proxy else self.timeout
return (t[0] * (1 << attempt_number), t[1])
+ def _any_nondisk_services(self, service_list):
+ return any(ks.get('service_type', 'disk') != 'disk'
+ for ks in service_list)
+
def build_services_list(self, force_rebuild=False):
if (self._static_services_list or
(self._keep_services and not force_rebuild)):
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- accessible = keep_services.execute().get('items')
- if not accessible:
+ # Gateway services are only used when specified by UUID,
+ # so there's nothing to gain by filtering them by
+ # service_type.
+ self._gateway_services = {ks['uuid']: ks for ks in
+ keep_services.execute()['items']}
+ if not self._gateway_services:
raise arvados.errors.NoKeepServersError()
# Precompute the base URI for each service.
- for r in accessible:
+ for r in self._gateway_services.itervalues():
host = r['service_host']
if not host.startswith('[') and host.find(':') >= 0:
# IPv6 URIs must be formatted like http://[::1]:80/...
host,
r['service_port'])
- # Gateway services are only used when specified by UUID,
- # so there's nothing to gain by filtering them by
- # service_type.
- self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
_logger.debug(str(self._gateway_services))
-
self._keep_services = [
- ks for ks in accessible
- if ks.get('service_type') in ['disk', 'proxy']]
- self._writable_services = [
- ks for ks in accessible
- if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
- _logger.debug(str(self._keep_services))
-
- self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in self._keep_services)
- # Use a thread_count of 1 if the service is not a disk
- for ks in accessible:
- if ('disk' != ks.get('service_type')) and (True != ks.get('read_only')):
- self.thread_count = 1
+ ks for ks in self._gateway_services.itervalues()
+ if not ks.get('service_type', '').startswith('gateway:')]
+ self._writable_services = [ks for ks in self._keep_services
+ if not ks.get('read_only')]
+
+ # For disk type services, max_replicas_per_service is 1
+ # It is unknown (unlimited) for other service types.
+ if self._any_nondisk_services(self._writable_services):
+ self.max_replicas_per_service = None
+ else:
+ self.max_replicas_per_service = 1
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
self.build_services_list(force_rebuild)
sorted_roots = []
-
- # Use the services indicated by the given hints that are
- # not size or authorization hints.
- # If it is a K@ hint of size 7, it is a keepproxy
- # Otherwise, expect the hint to be of len 29 and a uuid
- # of a remote service that can be resolved to a URI.
+ # Use the services indicated by the given +K@... remote
+ # service hints, if any are present and can be resolved to a
+ # URI.
for hint in locator.hints:
- if not hint.startswith('A') and not hint[0].isdigit():
- if len(hint) == 7 and hint.startswith('K@'):
+ if hint.startswith('K@'):
+ if len(hint) == 7:
sorted_roots.append(
"https://keep.{}.arvadosapi.com/".format(hint[2:]))
- elif len(hint) == 29 and re.match(util.uuid_pattern, hint[2:]):
+ elif len(hint) == 29:
svc = self._gateway_services.get(hint[2:])
if svc:
sorted_roots.append(svc['_service_root'])
# in that order.
use_services = self._keep_services
if need_writable:
- use_services = self._writable_services
+ use_services = self._writable_services
+ self.using_proxy = self._any_nondisk_services(use_services)
sorted_roots.extend([
svc['_service_root'] for svc in sorted(
use_services,
# Tell the proxy how many copies we want it to store
headers['X-Keep-Desired-Replication'] = str(copies)
roots_map = {}
- thread_limiter = KeepClient.ThreadLimiter(1 if 1 == self.thread_count else copies)
+ thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
+ thread_sequence = 0
for tries_left in loop:
try:
- local_roots = self.map_new_services(
+ sorted_roots = self.map_new_services(
roots_map, locator,
force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
except Exception as error:
continue
threads = []
- for service_root, ks in roots_map.iteritems():
+ for service_root, ks in [(root, roots_map[root])
+ for root in sorted_roots]:
if ks.finished():
continue
t = KeepClient.KeepWriterThread(
data_hash=data_hash,
service_root=service_root,
thread_limiter=thread_limiter,
- timeout=self.current_timeout(num_retries-tries_left))
+ timeout=self.current_timeout(num_retries-tries_left),
+ thread_sequence=thread_sequence)
t.start()
threads.append(t)
+ thread_sequence += 1
for t in threads:
t.join()
loop.save_result((thread_limiter.done() >= copies, len(threads)))
data_hash, loop.last_result()))
else:
service_errors = ((key, roots_map[key].last_result()['error'])
- for key in local_roots
+ for key in sorted_roots
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(