curl.setopt(pycurl.SSL_VERIFYPEER, 0)
if method == "HEAD":
curl.setopt(pycurl.NOBODY, True)
- self._setcurltimeouts(curl, timeout)
+ self._setcurltimeouts(curl, timeout, method=="HEAD")
try:
curl.perform()
self.upload_counter.add(len(body))
return True
- def _setcurltimeouts(self, curl, timeouts):
+ def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
if not timeouts:
return
elif isinstance(timeouts, tuple):
conn_t, xfer_t = (timeouts, timeouts)
bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
- curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+ if not ignore_bandwidth:
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
if isinstance(header_line, bytes):
if local_store:
self.local_store = local_store
+ self.head = self.local_store_head
self.get = self.local_store_get
self.put = self.local_store_put
else:
# Always cache the result, then return it if we succeeded.
if loop.success():
- if method == "HEAD":
- return blob or True
- else:
- return blob
+ return blob
finally:
if slot is not None:
slot.set(blob)
"{} not found".format(loc_s), service_errors)
else:
raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s), service_errors, label="service")
+ "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
@retry.retry_method
def put(self, data, copies=2, num_retries=None, request_id=None):
for key in sorted_roots
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
- "failed to write {} (wanted {} copies but wrote {})".format(
- data_hash, copies, writer_pool.done()), service_errors, label="service")
+ "failed to write {} after {} (wanted {} copies but wrote {})".format(
+ data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
def local_store_put(self, data, copies=1, num_retries=None):
"""A stub for put().
with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
return f.read()
+ def local_store_head(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
+ try:
+ locator = KeepLocator(loc_s)
+ except ValueError:
+ raise arvados.errors.NotFoundError(
+ "Invalid data locator: '%s'" % loc_s)
+ if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+ return True
+ if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+ return True
+
def is_cached(self, locator):
return self.block_cache.reserve_cache(expect_hash)