+ def put(data, **kwargs):
+ return Keep.global_client_object().put(data, **kwargs)
+
+class KeepClient(object):
+
+ class ThreadLimiter(object):
+ """
+ Limit the number of threads running at a given time to
+ {desired successes} minus {successes reported}. When successes
+ reported == desired, wake up the remaining threads and tell
+ them to quit.
+
+ Should be used in a "with" block.
+ """
+ def __init__(self, todo):
+ self._todo = todo
+ self._done = 0
+ self._todo_lock = threading.Semaphore(todo)
+ self._done_lock = threading.Lock()
+ def __enter__(self):
+ self._todo_lock.acquire()
+ return self
+ def __exit__(self, type, value, traceback):
+ self._todo_lock.release()
+ def shall_i_proceed(self):
+ """
+ Return true if the current thread should do stuff. Return
+ false if the current thread should just stop.
+ """
+ with self._done_lock:
+ return (self._done < self._todo)
+ def increment_done(self):
+ """
+ Report that the current thread was successful.
+ """
+ with self._done_lock:
+ self._done += 1
+ def done(self):
+ """
+ Return how many successes were reported.
+ """
+ with self._done_lock:
+ return self._done
+
+ class KeepWriterThread(threading.Thread):
+ """
+ Write a blob of data to the given Keep server. Call
+ increment_done() of the given ThreadLimiter if the write
+ succeeds.
+ """
+ def __init__(self, **kwargs):
+ super(KeepClient.KeepWriterThread, self).__init__()
+ self.args = kwargs
+ def run(self):
+ with self.args['thread_limiter'] as limiter:
+ if not limiter.shall_i_proceed():
+ # My turn arrived, but the job has been done without
+ # me.
+ return
+ logging.debug("KeepWriterThread %s proceeding %s %s" %
+ (str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root']))
+ h = httplib2.Http()
+ url = self.args['service_root'] + self.args['data_hash']
+ api_token = os.environ['ARVADOS_API_TOKEN']
+ headers = {'Authorization': "OAuth2 %s" % api_token}
+ try:
+ resp, content = h.request(url.encode('utf-8'), 'PUT',
+ headers=headers,
+ body=self.args['data'])
+ if (resp['status'] == '401' and
+ re.match(r'Timestamp verification failed', content)):
+ body = KeepClient.sign_for_old_server(
+ self.args['data_hash'],
+ self.args['data'])
+ h = httplib2.Http()
+ resp, content = h.request(url.encode('utf-8'), 'PUT',
+ headers=headers,
+ body=body)
+ if re.match(r'^2\d\d$', resp['status']):
+ logging.debug("KeepWriterThread %s succeeded %s %s" %
+ (str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root']))
+ return limiter.increment_done()
+ logging.warning("Request fail: PUT %s => %s %s" %
+ (url, resp['status'], content))
+ except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
+ logging.warning("Request fail: PUT %s => %s: %s" %
+ (url, type(e), str(e)))