install_R_sdk() {
cd "$WORKSPACE/sdk/R" \
- && R --quiet --vanilla <<EOF
-options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
-if (!requireNamespace("devtools")) {
- install.packages("devtools")
-}
-if (!requireNamespace("roxygen2")) {
- install.packages("roxygen2")
-}
-if (!requireNamespace("pkgdown")) {
- devtools::install_github("hadley/pkgdown")
-}
-devtools::install_dev_deps()
-EOF
+ && R --quiet --vanilla --file=install_deps.R
}
do_install sdk/R R_sdk
--- /dev/null
+options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
+if (!requireNamespace("devtools")) {
+ install.packages("devtools")
+}
+if (!requireNamespace("roxygen2")) {
+ install.packages("roxygen2")
+}
+
+# These install from github so install known-good versions instead of
+# letting any push to master break our build.
+if (!requireNamespace("pkgload")) {
+ devtools::install_github("r-lib/pkgload", ref="7a97de62adf1793c03e73095937e4655baad79c9")
+}
+if (!requireNamespace("pkgdown")) {
+ devtools::install_github("r-lib/pkgdown", ref="897ffbc016549c11c4263cb5d1f6e9f5c99efb45")
+}
+
+devtools::install_dev_deps()
"success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+ self.eval_timeout = kwargs.get("eval_timeout")
kwargs["make_fs_access"] = make_fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
kwargs["compute_checksum"] = kwargs.get("compute_checksum")
if self.work_api == "containers":
+ if self.ignore_docker_for_reuse:
+ raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
kwargs["outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "/var/spool/cwl"
kwargs["tmpdir"] = "/tmp"
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
+ command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
'state': 'Committed',
'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue',
+ '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = [
'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--disable-reuse', '--on-error=continue',
+ '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = [
'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--disable-reuse', '--on-error=continue',
+ '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
expect_container["name"] = "submit_wf_no_reuse.cwl"
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=stop',
+ '--enable-reuse', '--on-error=stop', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- "--output-name="+output_name, '--enable-reuse', '--on-error=continue',
+ "--output-name="+output_name, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["output_name"] = output_name
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=continue',
- "--intermediate-output-ttl=3600",
+ "--intermediate-output-ttl=3600", '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=continue',
- "--trash-intermediate",
+ "--trash-intermediate", '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
+ "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
'name': 'expect_arvworkflow.cwl#main',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue',
+ '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
'name': 'a test workflow',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue',
+ '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["owner_uuid"] = project_uuid
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+ '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid, '--eval-timeout=20',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+ @stubs
+ def test_submit_container_eval_timeout(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--eval-timeout=60",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=continue', '--eval-timeout=60.0',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
err = s.fs.Remove("foo/bar")
c.Check(err, check.IsNil)
- // mkdir succeds after the file is deleted
+ // mkdir succeeds after the file is deleted
err = s.fs.Mkdir("foo/bar", 0755)
c.Check(err, check.IsNil)
"""
+ __slots__ = ('parent', 'name', '_writers', '_committed',
+ '_segments', 'lock', '_current_bblock', 'fuse_entry')
+
def __init__(self, parent, name, stream=[], segments=[]):
"""
ArvadosFile constructor.
return text
+ _token_re = re.compile(r'(\S+)(\s+|$)')
+ _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+ _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
stream_name = None
state = STREAM_NAME
- for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+ for token_and_separator in self._token_re.finditer(manifest_text):
tok = token_and_separator.group(1)
sep = token_and_separator.group(2)
continue
if state == BLOCKS:
- block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+ block_locator = self._block_re.match(tok)
if block_locator:
blocksize = int(block_locator.group(1))
blocks.append(Range(tok, streamoffset, blocksize, 0))
state = SEGMENTS
if state == SEGMENTS:
- file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ file_segment = self._segment_re.match(tok)
if file_segment:
pos = int(file_segment.group(1))
size = int(file_segment.group(2))
self._lastheadername = name
self._headers[name] = value
# Returning None implies all bytes were written
-
+
class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
self.successful_copies_lock = threading.Lock()
self.pending_tries = copies
self.pending_tries_notification = threading.Condition()
-
+
def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
-
+
def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
-
+
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
for _ in range(num_threads):
w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
self.workers.append(w)
-
+
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
self.total_task_nr += 1
-
+
def done(self):
return self.queue.successful_copies
-
+
def join(self):
# Start workers
for worker in self.workers:
worker.start()
# Wait for finished work
self.queue.join()
-
+
def response(self):
return self.queue.response
-
-
+
+
class KeepWriterThread(threading.Thread):
TaskFailed = RuntimeError()
self.get_counter.add(1)
- locator = KeepLocator(loc_s)
- if method == "GET":
- slot, first = self.block_cache.reserve_cache(locator.md5sum)
- if not first:
- self.hits_counter.add(1)
- v = slot.get()
- return v
-
- self.misses_counter.add(1)
-
- headers = {
- 'X-Request-Id': (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id()),
- }
-
- # If the locator has hints specifying a prefix (indicating a
- # remote keepproxy) or the UUID of a local gateway service,
- # read data from the indicated service(s) instead of the usual
- # list of local disk services.
- hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
- for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
- hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
- for hint in locator.hints if (
- hint.startswith('K@') and
- len(hint) == 29 and
- self._gateway_services.get(hint[2:])
- )])
- # Map root URLs to their KeepService objects.
- roots_map = {
- root: self.KeepService(root, self._user_agent_pool,
- upload_counter=self.upload_counter,
- download_counter=self.download_counter,
- headers=headers)
- for root in hint_roots
- }
-
- # See #3147 for a discussion of the loop implementation. Highlights:
- # * Refresh the list of Keep services after each failure, in case
- # it's being updated.
- # * Retry until we succeed, we're out of retries, or every available
- # service has returned permanent failure.
- sorted_roots = []
- roots_map = {}
+ slot = None
blob = None
- loop = retry.RetryLoop(num_retries, self._check_loop_result,
- backoff_start=2)
- for tries_left in loop:
- try:
- sorted_roots = self.map_new_services(
- roots_map, locator,
- force_rebuild=(tries_left < num_retries),
- need_writable=False,
- headers=headers)
- except Exception as error:
- loop.save_result(error)
- continue
+ try:
+ locator = KeepLocator(loc_s)
+ if method == "GET":
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
+ if not first:
+ self.hits_counter.add(1)
+ blob = slot.get()
+ if blob is None:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s))
+ return blob
+
+ self.misses_counter.add(1)
+
+ headers = {
+ 'X-Request-Id': (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id()),
+ }
+
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool,
+ upload_counter=self.upload_counter,
+ download_counter=self.download_counter,
+ headers=headers)
+ for root in hint_roots
+ }
+
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ sorted_roots = []
+ roots_map = {}
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries),
+ need_writable=False,
+ headers=headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
- # Query KeepService objects that haven't returned
- # permanent failure, in our specified shuffle order.
- services_to_try = [roots_map[root]
- for root in sorted_roots
- if roots_map[root].usable()]
- for keep_service in services_to_try:
- blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
- if blob is not None:
- break
- loop.save_result((blob, len(services_to_try)))
-
- # Always cache the result, then return it if we succeeded.
- if method == "GET":
- slot.set(blob)
- self.block_cache.cap_cache()
- if loop.success():
- if method == "HEAD":
- return True
- else:
- return blob
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
+ services_to_try = [roots_map[root]
+ for root in sorted_roots
+ if roots_map[root].usable()]
+ for keep_service in services_to_try:
+ blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+ if blob is not None:
+ break
+ loop.save_result((blob, len(services_to_try)))
+
+ # Always cache the result, then return it if we succeeded.
+ if loop.success():
+ if method == "HEAD":
+ return True
+ else:
+ return blob
+ finally:
+ if slot is not None:
+ slot.set(blob)
+ self.block_cache.cap_cache()
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
loop.save_result(error)
continue
- writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
copies=copies - done,
max_service_replicas=self.max_replicas_per_service,
def finished(self):
return False
-
+
def setUp(self):
self.copies = 3
self.pool = arvados.KeepClient.KeepWriterThreadPool(
self.pool.add_task(ks, None)
self.pool.join()
self.assertEqual(self.pool.done(), self.copies-1)
-
+
@tutil.skip_sleep
class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
with self.assertRaises(arvados.errors.KeepWriteError):
self.keep_client.put('foo', num_retries=1, copies=2)
self.assertEqual(2, req_mock.call_count)
+
+class KeepClientAPIErrorTest(unittest.TestCase):
+ def test_api_fail(self):
+ class ApiMock(object):
+ def __getattr__(self, r):
+ if r == "api_token":
+ return "abc"
+ else:
+ raise arvados.errors.KeepReadError()
+ keep_client = arvados.KeepClient(api_client=ApiMock(),
+ proxy='', local_store='')
+
+ # The bug this is testing for is that if an API (not
+ # keepstore) exception is thrown as part of a get(), the next
+ # attempt to get that same block will result in a deadlock.
+ # This is why there are two get()s in a row. Unfortunately,
+ # the failure mode for this test is that the test suite
+ # deadlocks, there isn't a good way to avoid that without
+ # adding a special case that has no use except for this test.
+
+ with self.assertRaises(arvados.errors.KeepReadError):
+ keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+ with self.assertRaises(arvados.errors.KeepReadError):
+ keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
}
@object.update_attributes!(attrs_to_update)
@object.assign_slot if params[:assign_slot]
+ @object.save!
show
end
assert_operator 0, :<, json_response['slot_number']
n = json_response['slot_number']
assert_equal "compute#{n}", json_response['hostname']
+
+ node = Node.where(uuid: json_response['uuid']).first
+ assert_equal n, node.slot_number
+ assert_equal "compute#{n}", node.hostname
end
test "update node and assign slot" do
assert_operator 0, :<, json_response['slot_number']
n = json_response['slot_number']
assert_equal "compute#{n}", json_response['hostname']
+
+ node.reload
+ assert_equal n, node.slot_number
+ assert_equal "compute#{n}", node.hostname
end
test "update node and assign slot, don't clobber hostname" do
def _remove(self, obj, clear):
if clear:
- if obj.in_use():
- _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
- return
+ # Kernel behavior seems to be that if a file is
+ # referenced, its parents remain referenced too. This
+ # means has_ref() exits early when a collection is not
+ # candidate for eviction.
+ #
+ # By contrast, in_use() doesn't increment references on
+ # parents, so it requires a full tree walk to determine if
+ # a collection is a candidate for eviction. This takes
+ # .07s for 240000 files, which becomes a major drag when
+ # cap_cache is being called several times a second and
+ # there are multiple non-evictable collections in the
+ # cache.
+ #
+ # So it is important for performance that we do the
+ # has_ref() check first.
+
if obj.has_ref(True):
_logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
return
+
+ if obj.in_use():
+ _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
+ return
+
obj.kernel_invalidate()
_logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
obj.clear()
if obj not in self._by_uuid[obj.cache_uuid]:
self._by_uuid[obj.cache_uuid].append(obj)
self._total += obj.objsize()
- _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
+ _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
+ obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
self.cap_cache()
def touch(self, obj):
* Clear the object contents (invalidates the object)
"""
+
+ __slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count",
+ "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache")
+
def __init__(self):
self._stale = True
self._poll = False
class File(FreshBase):
"""Base for file objects."""
+ __slots__ = ("inode", "parent_inode", "_mtime")
+
def __init__(self, parent_inode, _mtime=0):
super(File, self).__init__()
self.inode = None
class FuseArvadosFile(File):
"""Wraps a ArvadosFile."""
+ __slots__ = ('arvfile',)
+
def __init__(self, parent_inode, arvfile, _mtime):
super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
self.arvfile = arvfile
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/curoverse/azure-sdk-for-go/storage"
+ "github.com/Azure/azure-sdk-for-go/storage"
)
const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
AzureReplication int
ReadOnly bool
RequestTimeout arvados.Duration
+ StorageClasses []string
- azClient storage.Client
- bsClient *azureBlobClient
+ azClient storage.Client
+ container *azureContainer
+}
+
+// singleSender is a single-attempt storage.Sender.
+type singleSender struct{}
+
+// Send performs req exactly once.
+func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
+ return c.HTTPClient.Do(req)
}
// Examples implements VolumeWithExamples.
if err != nil {
return fmt.Errorf("creating Azure storage client: %s", err)
}
+ v.azClient.Sender = &singleSender{}
if v.RequestTimeout == 0 {
v.RequestTimeout = azureDefaultRequestTimeout
Timeout: time.Duration(v.RequestTimeout),
}
bs := v.azClient.GetBlobService()
- v.bsClient = &azureBlobClient{
- client: &bs,
+ v.container = &azureContainer{
+ ctr: bs.GetContainerReference(v.ContainerName),
}
- ok, err := v.bsClient.ContainerExists(v.ContainerName)
- if err != nil {
+ if ok, err := v.container.Exists(); err != nil {
return err
- }
- if !ok {
+ } else if !ok {
return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
}
return nil
// Return true if expires_at metadata attribute is found on the block
func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
- metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
+ metadata, err := v.container.GetBlobMetadata(loc)
if err != nil {
return false, metadata, v.translateError(err)
}
if azureMaxGetBytes < BlockSize {
// Unfortunately the handler doesn't tell us how long the blob
// is expected to be, so we have to ask Azure.
- props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+ props, err := v.container.GetBlobProperties(loc)
if err != nil {
return 0, v.translateError(err)
}
go func() {
defer close(gotRdr)
if startPos == 0 && endPos == expectSize {
- rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+ rdr, err = v.container.GetBlob(loc)
} else {
- rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+ rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
}
}()
select {
gotRdr := make(chan struct{})
go func() {
defer close(gotRdr)
- rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+ rdr, err = v.container.GetBlob(loc)
}()
select {
case <-ctx.Done():
body = http.NoBody
bufr.Close()
}
- errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), body, nil)
+ errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
}()
select {
case <-ctx.Done():
}
metadata["touch"] = fmt.Sprintf("%d", time.Now())
- return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
+ return v.container.SetBlobMetadata(loc, metadata, nil)
}
// Mtime returns the last-modified property of a block blob.
return time.Time{}, os.ErrNotExist
}
- props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+ props, err := v.container.GetBlobProperties(loc)
if err != nil {
return time.Time{}, err
}
- return time.Parse(time.RFC1123, props.LastModified)
+ return time.Time(props.LastModified), nil
}
// IndexTo writes a list of Keep blocks that are stored in the
func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
params := storage.ListBlobsParameters{
Prefix: prefix,
- Include: "metadata",
+ Include: &storage.IncludeBlobDataset{Metadata: true},
}
for {
- resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
+ resp, err := v.container.ListBlobs(params)
if err != nil {
return err
}
for _, b := range resp.Blobs {
- t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
- if err != nil {
- return err
- }
if !v.isKeepBlock(b.Name) {
continue
}
- if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
+ modtime := time.Time(b.Properties.LastModified)
+ if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
// A new zero-length blob is probably
// just a new non-empty blob that
// hasn't committed its data yet (see
// Trashed blob; exclude it from response
continue
}
- fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
+ fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
}
if resp.NextMarker == "" {
return nil
// we get the Etag before checking Mtime, and use If-Match to
// ensure we don't delete data if Put() or Touch() happens
// between our calls to Mtime() and DeleteBlob().
- props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+ props, err := v.container.GetBlobProperties(loc)
if err != nil {
return err
}
// If TrashLifetime == 0, just delete it
if theConfig.TrashLifetime == 0 {
- return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
- "If-Match": props.Etag,
+ return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
+ IfMatch: props.Etag,
})
}
// Otherwise, mark as trash
- return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
+ return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
"expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
- }, map[string]string{
- "If-Match": props.Etag,
+ }, &storage.SetBlobMetadataOptions{
+ IfMatch: props.Etag,
})
}
// Delete the expires_at metadata attribute
func (v *AzureBlobVolume) Untrash(loc string) error {
// if expires_at does not exist, return NotFoundError
- metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
+ metadata, err := v.container.GetBlobMetadata(loc)
if err != nil {
return v.translateError(err)
}
// reset expires_at metadata attribute
metadata["expires_at"] = ""
- err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
+ err = v.container.SetBlobMetadata(loc, metadata, nil)
return v.translateError(err)
}
return v.AzureReplication
}
+// GetStorageClasses implements Volume
+func (v *AzureBlobVolume) GetStorageClasses() []string {
+ return v.StorageClasses
+}
+
// If possible, translate an Azure SDK error to a recognizable error
// like os.ErrNotExist.
func (v *AzureBlobVolume) translateError(err error) error {
func (v *AzureBlobVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
var blocksDeleted, blocksInTrash int
- params := storage.ListBlobsParameters{Include: "metadata"}
+ params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
for {
- resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
+ resp, err := v.container.ListBlobs(params)
if err != nil {
log.Printf("EmptyTrash: ListBlobs: %v", err)
break
continue
}
- err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
- "If-Match": b.Properties.Etag,
+ err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
+ IfMatch: b.Properties.Etag,
})
if err != nil {
log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
// InternalStats returns bucket I/O and API call counters.
func (v *AzureBlobVolume) InternalStats() interface{} {
- return &v.bsClient.stats
+ return &v.container.stats
}
type azureBlobStats struct {
s.statsTicker.TickErr(err, errType)
}
-// azureBlobClient wraps storage.BlobStorageClient in order to count
-// I/O and API usage stats.
-type azureBlobClient struct {
- client *storage.BlobStorageClient
- stats azureBlobStats
+// azureContainer wraps storage.Container in order to count I/O and
+// API usage stats.
+type azureContainer struct {
+ ctr *storage.Container
+ stats azureBlobStats
}
-func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
+func (c *azureContainer) Exists() (bool, error) {
c.stats.Tick(&c.stats.Ops)
- ok, err := c.client.ContainerExists(cname)
+ ok, err := c.ctr.Exists()
c.stats.TickErr(err)
return ok, err
}
-func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
+func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
- m, err := c.client.GetBlobMetadata(cname, bname)
+ b := c.ctr.GetBlobReference(bname)
+ err := b.GetMetadata(nil)
c.stats.TickErr(err)
- return m, err
+ return b.Metadata, err
}
-func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
+func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
- p, err := c.client.GetBlobProperties(cname, bname)
+ b := c.ctr.GetBlobReference(bname)
+ err := b.GetProperties(nil)
c.stats.TickErr(err)
- return p, err
+ return &b.Properties, err
}
-func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
+func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
- rdr, err := c.client.GetBlob(cname, bname)
+ b := c.ctr.GetBlobReference(bname)
+ rdr, err := b.Get(nil)
c.stats.TickErr(err)
return NewCountingReader(rdr, c.stats.TickInBytes), err
}
-func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
+func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
- rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
+ b := c.ctr.GetBlobReference(bname)
+ rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
+ Range: &storage.BlobRange{
+ Start: uint64(start),
+ End: uint64(end),
+ },
+ GetBlobOptions: opts,
+ })
c.stats.TickErr(err)
return NewCountingReader(rdr, c.stats.TickInBytes), err
}
-func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
+// If we give it an io.Reader that doesn't also have a Len() int
+// method, the Azure SDK determines data size by copying the data into
+// a new buffer, which is not a good use of memory.
+type readerWithAzureLen struct {
+ io.Reader
+ len int
+}
+
+// Len satisfies the private lener interface in azure-sdk-for-go.
+func (r *readerWithAzureLen) Len() int {
+ return r.len
+}
+
+func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
if size != 0 {
- rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+ rdr = &readerWithAzureLen{
+ Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
+ len: size,
+ }
}
- err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
+ b := c.ctr.GetBlobReference(bname)
+ err := b.CreateBlockBlobFromReader(rdr, opts)
c.stats.TickErr(err)
return err
}
-func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
+func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
- err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
+ b := c.ctr.GetBlobReference(bname)
+ b.Metadata = m
+ err := b.SetMetadata(opts)
c.stats.TickErr(err)
return err
}
-func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
- resp, err := c.client.ListBlobs(cname, params)
+ resp, err := c.ctr.ListBlobs(params)
c.stats.TickErr(err)
return resp, err
}
-func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
+func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
- err := c.client.DeleteBlob(cname, bname, hdrs)
+ b := c.ctr.GetBlobReference(bname)
+ err := b.Delete(opts)
c.stats.TickErr(err)
return err
}
"testing"
"time"
- "github.com/curoverse/azure-sdk-for-go/storage"
+ "github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
// used by Microsoft's Azure emulator: the Azure SDK
// recognizes that magic string and changes its behavior to
// cater to the Azure SDK's own test suite.
- fakeAccountName = "fakeAccountName"
+ fakeAccountName = "fakeaccountname"
fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
)
b := storage.Blob{
Name: hash,
Properties: storage.BlobProperties{
- LastModified: blob.Mtime.Format(time.RFC1123),
+ LastModified: storage.TimeRFC1123(blob.Mtime),
ContentLength: int64(len(blob.Data)),
Etag: blob.Etag,
},
ReadOnly: readonly,
AzureReplication: replication,
azClient: azClient,
- bsClient: &azureBlobClient{client: &bs},
+ container: &azureContainer{ctr: bs.GetContainerReference(container)},
}
return &TestableAzureBlobVolume{
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
+func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: Azure
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
v.azHandler.PutRaw(v.ContainerName, locator, data)
}
resp := s.call("GET", "/mounts", "", nil)
c.Check(resp.Code, check.Equals, http.StatusOK)
var mntList []struct {
- UUID string
- DeviceID string
- ReadOnly bool
- Replication int
- Tier int
+ UUID string
+ DeviceID string
+ ReadOnly bool
+ Replication int
+ StorageClasses []string
}
err := json.Unmarshal(resp.Body.Bytes(), &mntList)
c.Assert(err, check.IsNil)
c.Check(m.DeviceID, check.Equals, "mock-device-id")
c.Check(m.ReadOnly, check.Equals, false)
c.Check(m.Replication, check.Equals, 1)
- c.Check(m.Tier, check.Equals, 1)
+ c.Check(m.StorageClasses, check.DeepEquals, []string{"default"})
}
c.Check(mntList[0].UUID, check.Not(check.Equals), mntList[1].UUID)
RaceWindow arvados.Duration
ReadOnly bool
UnsafeDelete bool
+ StorageClasses []string
bucket *s3bucket
return v.S3Replication
}
+// GetStorageClasses implements Volume
+func (v *S3Volume) GetStorageClasses() []string {
+ return v.StorageClasses
+}
+
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
func (v *S3Volume) isKeepBlock(s string) bool {
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
return v
}
+func (s *StubbedS3Suite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: S3
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
func (v *TestableS3Volume) Start() error {
tmp, err := ioutil.TempFile("", "keepstore")
v.c.Assert(err, check.IsNil)
// Return a globally unique ID of the underlying storage
// device if possible, otherwise "".
DeviceID() string
+
+ // Get the storage classes associated with this volume
+ GetStorageClasses() []string
}
// A VolumeWithExamples provides example configs to display in the
// A VolumeMount is an attachment of a Volume to a VolumeManager.
type VolumeMount struct {
- UUID string
- DeviceID string
- ReadOnly bool
- Replication int
- Tier int
- volume Volume
+ UUID string
+ DeviceID string
+ ReadOnly bool
+ Replication int
+ StorageClasses []string
+ volume Volume
}
// Generate a UUID the way API server would for a "KeepVolumeMount"
}
vm.mountMap = make(map[string]*VolumeMount)
for _, v := range volumes {
+ sc := v.GetStorageClasses()
+ if len(sc) == 0 {
+ sc = []string{"default"}
+ }
mnt := &VolumeMount{
- UUID: (*VolumeMount)(nil).generateUUID(),
- DeviceID: v.DeviceID(),
- ReadOnly: !v.Writable(),
- Replication: v.Replication(),
- Tier: 1,
- volume: v,
+ UUID: (*VolumeMount)(nil).generateUUID(),
+ DeviceID: v.DeviceID(),
+ ReadOnly: !v.Writable(),
+ Replication: v.Replication(),
+ StorageClasses: sc,
+ volume: v,
}
vm.iostats[v] = &ioStats{}
vm.mounts = append(vm.mounts, mnt)
func (v *MockVolume) EmptyTrash() {
}
+
+func (v *MockVolume) GetStorageClasses() []string {
+ return nil
+}
ReadOnly bool
Serialize bool
DirectoryReplication int
+ StorageClasses []string
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
return v.DirectoryReplication
}
+// GetStorageClasses implements Volume
+func (v *UnixVolume) GetStorageClasses() []string {
+ return v.StorageClasses
+}
+
// InternalStats returns I/O and filesystem ops counters.
func (v *UnixVolume) InternalStats() interface{} {
return &v.os.stats
"testing"
"time"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
}
+
+func (s *UnixVolumeSuite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: Directory
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
def prepare_arvados_node(self, node):
self._clean_arvados_node(node, "Prepared by Node Manager")
self.arvados_node = self._arvados.nodes().update(
- body={}, assign_slot=True).execute()
+ uuid=node['uuid'], body={}, assign_slot=True).execute()
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
run_bundler --without=development
cd /usr/src/arvados/sdk/R
-R --quiet --vanilla <<EOF
-options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
-if (!requireNamespace("devtools")) {
- install.packages("devtools")
-}
-if (!requireNamespace("roxygen2")) {
- install.packages("roxygen2")
-}
-if (!requireNamespace("pkgdown")) {
- devtools::install_github("hadley/pkgdown")
-}
-devtools::install_dev_deps()
-EOF
+R --quiet --vanilla --file=install_deps.R
if test "$1" = "--only-deps" ; then
exit
return fmt.Errorf("error searching for parent group: %s", err)
}
if len(gl.Items) == 0 {
- // Default parent group not existant, create one.
+ // Default parent group does not exist, create it.
if cfg.Verbose {
log.Println("Default parent group not found, creating...")
}
"revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
"revisionTime": "2017-07-27T13:52:37Z"
},
+ {
+ "checksumSHA1": "xHZe/h/tyrqmS9qiR03bLfRv5FI=",
+ "path": "github.com/Azure/azure-sdk-for-go/storage",
+ "revision": "f8eeb65a1a1f969696b49aada9d24073f2c2acd1",
+ "revisionTime": "2018-02-15T19:19:13Z"
+ },
+ {
+ "checksumSHA1": "PfyfOXsPbGEWmdh54cguqzdwloY=",
+ "path": "github.com/Azure/azure-sdk-for-go/version",
+ "revision": "471256ff7c6c93b96131845cef5309d20edd313d",
+ "revisionTime": "2018-02-14T01:17:07Z"
+ },
+ {
+ "checksumSHA1": "LQWU/2M2E4L/hVzT9BVW1SkLrpA=",
+ "path": "github.com/Azure/go-autorest/autorest",
+ "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+ "revisionTime": "2017-11-30T17:00:06Z"
+ },
+ {
+ "checksumSHA1": "nBQ7cdhoeYUur6G6HG97uueoDmE=",
+ "path": "github.com/Azure/go-autorest/autorest/adal",
+ "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+ "revisionTime": "2017-11-30T17:00:06Z"
+ },
+ {
+ "checksumSHA1": "zXyLmDVpkYkIsL0yinNLoW82IZc=",
+ "path": "github.com/Azure/go-autorest/autorest/azure",
+ "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+ "revisionTime": "2017-11-30T17:00:06Z"
+ },
+ {
+ "checksumSHA1": "9nXCi9qQsYjxCeajJKWttxgEt0I=",
+ "path": "github.com/Azure/go-autorest/autorest/date",
+ "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+ "revisionTime": "2017-11-30T17:00:06Z"
+ },
{
"checksumSHA1": "o/3cn04KAiwC7NqNVvmfVTD+hgA=",
"path": "github.com/Microsoft/go-winio",
"revisionTime": "2018-01-08T08:51:32Z"
},
{
- "checksumSHA1": "pAu+do4x7E5SFLfIqJeGwhcOd6E=",
- "path": "github.com/curoverse/azure-sdk-for-go/storage",
- "revision": "1620af6b32398bfc91827ceae54a8cc1f55df04d",
- "revisionTime": "2016-12-14T20:08:43Z"
+ "checksumSHA1": "+TKtBzv23ywvmmqRiGEjUba4YmI=",
+ "path": "github.com/dgrijalva/jwt-go",
+ "revision": "dbeaa9332f19a944acb5736b4456cfcc02140e29",
+ "revisionTime": "2017-10-19T21:57:19Z"
},
{
"checksumSHA1": "Gj+xR1VgFKKmFXYOJMnAczC3Znk=",
"revision": "83612a56d3dd153a94a629cd64925371c9adad78",
"revisionTime": "2017-11-26T05:04:59Z"
},
+ {
+ "checksumSHA1": "T9E+5mKBQ/BX4wlNxgaPfetxdeI=",
+ "path": "github.com/marstr/guid",
+ "revision": "8bdf7d1a087ccc975cf37dd6507da50698fd19ca",
+ "revisionTime": "2017-04-27T23:51:15Z"
+ },
{
"checksumSHA1": "bKMZjd2wPw13VwoE7mBeSv5djFA=",
"path": "github.com/matttproud/golang_protobuf_extensions/pbutil",
"revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
"revisionTime": "2018-01-25T13:30:57Z"
},
+ {
+ "checksumSHA1": "eDQ6f1EsNf+frcRO/9XukSEchm8=",
+ "path": "github.com/satori/go.uuid",
+ "revision": "36e9d2ebbde5e3f13ab2e25625fd453271d6522e",
+ "revisionTime": "2018-01-03T17:44:51Z"
+ },
{
"checksumSHA1": "UwtyqB7CaUWPlw0DVJQvw0IFQZs=",
"path": "github.com/sergi/go-diff/diffmatchpatch",