curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
if self.insecure:
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+ curl.setopt(pycurl.SSL_VERIFYHOST, 0)
else:
curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
if method == "HEAD":
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
if self.insecure:
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+ curl.setopt(pycurl.SSL_VERIFYHOST, 0)
else:
curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
self._setcurltimeouts(curl, timeout)
result = service.last_result()
if not success:
- if result.get('status_code', None):
+ if result.get('status_code'):
_logger.debug("Request fail: PUT %s => %s %s",
self.data_hash,
- result['status_code'],
- result['body'])
+ result.get('status_code'),
+ result.get('body'))
raise self.TaskFailed()
_logger.debug("KeepWriterThread %s succeeded %s+%i %s",
else:
return None
- def get_from_cache(self, loc):
+ def get_from_cache(self, loc_s):
"""Fetch a block only if is in the cache, otherwise return None."""
- slot = self.block_cache.get(loc)
+ locator = KeepLocator(loc_s)
+ slot = self.block_cache.get(locator.md5sum)
if slot is not None and slot.ready.is_set():
return slot.get()
else:
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
self.get_counter.add(1)
+ request_id = (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id())
+ if headers is None:
+ headers = {}
+ headers['X-Request-Id'] = request_id
+
slot = None
blob = None
try:
if method == "GET":
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
+ if prefetch:
+ # this is request for a prefetch, if it is
+ # already in flight, return immediately.
+ # clear 'slot' to prevent finally block from
+ # calling slot.set()
+ slot = None
+ return None
self.hits_counter.add(1)
blob = slot.get()
if blob is None:
self.misses_counter.add(1)
- if headers is None:
- headers = {}
- headers['X-Request-Id'] = (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id())
-
# If the locator has hints specifying a prefix (indicating a
# remote keepproxy) or the UUID of a local gateway service,
# read data from the indicated service(s) instead of the usual
for key in sorted_roots)
if not roots_map:
raise arvados.errors.KeepReadError(
- "failed to read {}: no Keep services available ({})".format(
- loc_s, loop.last_result()))
+ "[{}] failed to read {}: no Keep services available ({})".format(
+ request_id, loc_s, loop.last_result()))
elif not_founds == len(sorted_roots):
raise arvados.errors.NotFoundError(
- "{} not found".format(loc_s), service_errors)
+ "[{}] {} not found".format(request_id, loc_s), service_errors)
else:
raise arvados.errors.KeepReadError(
- "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
+ "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
@retry.retry_method
def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
return loc_s
locator = KeepLocator(loc_s)
+ request_id = (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id())
headers = {
- 'X-Request-Id': (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id()),
+ 'X-Request-Id': request_id,
'X-Keep-Desired-Replicas': str(copies),
}
roots_map = {}
return writer_pool.response()
if not roots_map:
raise arvados.errors.KeepWriteError(
- "failed to write {}: no Keep services available ({})".format(
- data_hash, loop.last_result()))
+ "[{}] failed to write {}: no Keep services available ({})".format(
+ request_id, data_hash, loop.last_result()))
else:
service_errors = ((key, roots_map[key].last_result()['error'])
for key in sorted_roots
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
- "failed to write {} after {} (wanted {} copies but wrote {})".format(
- data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
+ "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
+ request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
"""A stub for put().
return True
if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
return True
-
- def is_cached(self, locator):
- return self.block_cache.reserve_cache(expect_hash)