"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
- "git.arvados.org/arvados.git/services/keepstore"
. "gopkg.in/check.v1"
)
volume.Replication = 2
cluster.Volumes[uuid] = volume
- var v keepstore.UnixVolume
+ var v struct {
+ Root string
+ }
err = json.Unmarshal(volume.DriverParameters, &v)
c.Assert(err, IsNil)
err = os.Mkdir(v.Root, 0777)
}
func newAzureBlobVolume(params newVolumeParams) (volume, error) {
- v := &AzureBlobVolume{
+ v := &azureBlobVolume{
RequestTimeout: azureDefaultRequestTimeout,
WriteRaceInterval: azureDefaultWriteRaceInterval,
WriteRacePollTime: azureDefaultWriteRacePollTime,
return v, v.check()
}
-func (v *AzureBlobVolume) check() error {
+func (v *azureBlobVolume) check() error {
lbls := prometheus.Labels{"device_id": v.DeviceID()}
v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
return nil
azureDefaultWriteRacePollTime = arvados.Duration(time.Second)
)
-// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
+// An azureBlobVolume stores and retrieves blocks in an Azure Blob
// container.
-type AzureBlobVolume struct {
+type azureBlobVolume struct {
StorageAccountName string
StorageAccountKey string
StorageBaseURL string // "" means default, "core.windows.net"
}
// DeviceID returns a globally unique ID for the storage container.
-func (v *AzureBlobVolume) DeviceID() string {
+func (v *azureBlobVolume) DeviceID() string {
return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
}
// Return true if expires_at metadata attribute is found on the block
-func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
+func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
metadata, err := v.container.GetBlobMetadata(loc)
if err != nil {
return false, metadata, v.translateError(err)
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a BlockWrite operation is in progress,
// and wait for it to finish writing.
-func (v *AzureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
trashed, _, err := v.checkTrashed(hash)
if err != nil {
return 0, err
return streamer.Wrote(), err
}
-func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
+func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// BlockWrite stores a block on the volume. If it already exists, its
// timestamp is updated.
-func (v *AzureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *azureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
// Send the block data through a pipe, so that (if we need to)
// we can close the pipe early and abandon our
// CreateBlockBlobFromReader() goroutine, without worrying
}
// BlockTouch updates the last-modified property of a block blob.
-func (v *AzureBlobVolume) BlockTouch(hash string) error {
+func (v *azureBlobVolume) BlockTouch(hash string) error {
trashed, metadata, err := v.checkTrashed(hash)
if err != nil {
return err
}
// Mtime returns the last-modified property of a block blob.
-func (v *AzureBlobVolume) Mtime(hash string) (time.Time, error) {
+func (v *azureBlobVolume) Mtime(hash string) (time.Time, error) {
trashed, _, err := v.checkTrashed(hash)
if err != nil {
return time.Time{}, err
// Index writes a list of Keep blocks that are stored in the
// container.
-func (v *AzureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
+func (v *azureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
params := storage.ListBlobsParameters{
Prefix: prefix,
Include: &storage.IncludeBlobDataset{Metadata: true},
}
// call v.container.ListBlobs, retrying if needed.
-func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
+func (v *azureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
for i := 0; i < v.ListBlobsMaxAttempts; i++ {
resp, err = v.container.ListBlobs(params)
err = v.translateError(err)
}
// Trash a Keep block.
-func (v *AzureBlobVolume) BlockTrash(loc string) error {
+func (v *azureBlobVolume) BlockTrash(loc string) error {
// Ideally we would use If-Unmodified-Since, but that
// particular condition seems to be ignored by Azure. Instead,
// we get the Etag before checking Mtime, and use If-Match to
// BlockUntrash deletes the expires_at metadata attribute for the
// specified block blob.
-func (v *AzureBlobVolume) BlockUntrash(hash string) error {
+func (v *azureBlobVolume) BlockUntrash(hash string) error {
// if expires_at does not exist, return NotFoundError
metadata, err := v.container.GetBlobMetadata(hash)
if err != nil {
// If possible, translate an Azure SDK error to a recognizable error
// like os.ErrNotExist.
-func (v *AzureBlobVolume) translateError(err error) error {
+func (v *azureBlobVolume) translateError(err error) error {
switch {
case err == nil:
return err
var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-func (v *AzureBlobVolume) isKeepBlock(s string) bool {
+func (v *azureBlobVolume) isKeepBlock(s string) bool {
return keepBlockRegexp.MatchString(s)
}
// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
// and deletes them from the volume.
-func (v *AzureBlobVolume) EmptyTrash() {
+func (v *azureBlobVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
var blocksDeleted, blocksInTrash int64
}
// InternalStats returns bucket I/O and API call counters.
-func (v *AzureBlobVolume) InternalStats() interface{} {
+func (v *azureBlobVolume) InternalStats() interface{} {
return &v.container.stats
}
rw.WriteHeader(http.StatusCreated)
case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
// "Set Metadata Headers" API. We don't bother
- // stubbing "Get Metadata Headers": AzureBlobVolume
+ // stubbing "Get Metadata Headers": azureBlobVolume
// sets metadata headers only as a way to bump Etag
// and Last-Modified.
if !blobExists {
}
type testableAzureBlobVolume struct {
- *AzureBlobVolume
+ *azureBlobVolume
azHandler *azStubHandler
azStub *httptest.Server
t TB
azClient.Sender = &singleSender{}
bs := azClient.GetBlobService()
- v := &AzureBlobVolume{
+ v := &azureBlobVolume{
ContainerName: container,
WriteRaceInterval: arvados.Duration(time.Millisecond),
WriteRacePollTime: arvados.Duration(time.Nanosecond),
}
return &testableAzureBlobVolume{
- AzureBlobVolume: v,
+ azureBlobVolume: v,
azHandler: azHandler,
azStub: azStub,
t: t,
MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
})
- v.AzureBlobVolume.WriteRaceInterval.Set("2s")
- v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
+ v.azureBlobVolume.WriteRaceInterval.Set("2s")
+ v.azureBlobVolume.WriteRacePollTime.Set("5ms")
defer v.Teardown()
v.BlockWriteRaw(TestHash, nil)
//
// SPDX-License-Identifier: AGPL-3.0
+// Package keepstore implements the keepstore service component and
+// back-end storage drivers.
+//
+// It is an internal module, only intended to be imported by
+// /cmd/arvados-server and other server-side components in this
+// repository.
package keepstore
import (
driver = make(map[string]volumeDriver)
)
-type IndexOptions struct {
+type indexOptions struct {
MountUUID string
Prefix string
WriteTo io.Writer
return ks.mountsR
}
-func (ks *keepstore) Index(ctx context.Context, opts IndexOptions) error {
+func (ks *keepstore) Index(ctx context.Context, opts indexOptions) error {
mounts := ks.mountsR
if opts.MountUUID != "" {
mnt, ok := ks.mounts[opts.MountUUID]
prefix = mux.Vars(req)["prefix"]
}
cw := &countingWriter{writer: w}
- err := rtr.keepstore.Index(req.Context(), IndexOptions{
+ err := rtr.keepstore.Index(req.Context(), indexOptions{
MountUUID: mux.Vars(req)["uuid"],
Prefix: prefix,
WriteTo: cw,
"GET /mounts/blocks/123",
"GET /trash",
"GET /pull",
- "GET /debug.json",
- "GET /status.json",
+ "GET /debug.json", // old endpoint, no longer exists
+ "GET /status.json", // old endpoint, no longer exists
"POST /",
"POST /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"POST /trash",
)
func init() {
- driver["S3"] = newS3AWSVolume
+ driver["S3"] = news3Volume
}
const (
)
var (
- ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+ errS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+ s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+ s3AWSZeroTime time.Time
)
-// S3AWSVolume implements Volume using an S3 bucket.
-type S3AWSVolume struct {
+// s3Volume implements Volume using an S3 bucket.
+type s3Volume struct {
arvados.S3VolumeDriverParameters
AuthToken string // populated automatically when IAMRole is used
AuthExpiration time.Time // populated automatically when IAMRole is used
mu sync.Mutex
}
-const ()
-
-var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-var s3AWSZeroTime time.Time
-
-func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
+func (v *s3Volume) isKeepBlock(s string) (string, bool) {
if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
s = s[v.PrefixLength+1:]
}
// Return the key used for a given loc. If PrefixLength==0 then
// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
// "abc/abcdef0123", etc.
-func (v *S3AWSVolume) key(loc string) string {
+func (v *s3Volume) key(loc string) string {
if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
return loc[:v.PrefixLength] + "/" + loc
} else {
}
}
-func newS3AWSVolume(params newVolumeParams) (volume, error) {
- v := &S3AWSVolume{
+func news3Volume(params newVolumeParams) (volume, error) {
+ v := &s3Volume{
cluster: params.Cluster,
volume: params.ConfigVolume,
metrics: params.MetricsVecs,
return v, v.check("")
}
-func (v *S3AWSVolume) translateError(err error) error {
+func (v *s3Volume) translateError(err error) error {
if _, ok := err.(*aws.RequestCanceledError); ok {
return context.Canceled
} else if aerr, ok := err.(awserr.Error); ok {
//
// (If something goes wrong during the copy, the error will be
// embedded in the 200 OK response)
-func (v *S3AWSVolume) safeCopy(dst, src string) error {
+func (v *s3Volume) safeCopy(dst, src string) error {
input := &s3.CopyObjectInput{
Bucket: aws.String(v.bucket.bucket),
ContentType: aws.String("application/octet-stream"),
return nil
}
-func (v *S3AWSVolume) check(ec2metadataHostname string) error {
+func (v *s3Volume) check(ec2metadataHostname string) error {
if v.Bucket == "" {
return errors.New("DriverParameters: Bucket must be provided")
}
}
// DeviceID returns a globally unique ID for the storage bucket.
-func (v *S3AWSVolume) DeviceID() string {
+func (v *s3Volume) DeviceID() string {
return "s3://" + v.Endpoint + "/" + v.Bucket
}
// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
// and deletes them from the volume.
-func (v *S3AWSVolume) EmptyTrash() {
+func (v *s3Volume) EmptyTrash() {
var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
// Define "ready to delete" as "...when EmptyTrash started".
// exist. If the timestamps on "recent/X" and "trash/X" indicate there
// was a race between Put and Trash, fixRace recovers from the race by
// Untrashing the block.
-func (v *S3AWSVolume) fixRace(key string) bool {
+func (v *s3Volume) fixRace(key string) bool {
trash, err := v.head("trash/" + key)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
return true
}
-func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
+func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
input := &s3.HeadObjectInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String(key),
// BlockRead reads a Keep block that has been stored as a block blob
// in the S3 bucket.
-func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
key := v.key(hash)
buf, err := v.bufferPool.GetContext(ctx)
if err != nil {
return streamer.Wrote(), nil
}
-func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
+func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
u.PartSize = s3downloaderPartSize
u.Concurrency = s3downloaderReadConcurrency
return v.translateError(err)
}
-func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
+func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) error {
if r == nil {
// r == nil leads to a memory violation in func readFillBuf in
// aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
}
// Put writes a block.
-func (v *S3AWSVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) error {
// Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
// sdk to avoid memory allocation there. See #17339 for more information.
rdr := bytes.NewReader(data)
// Index writes a complete list of locators with the given prefix
// for which Get() can retrieve data.
-func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
+func (v *s3Volume) Index(ctx context.Context, prefix string, writer io.Writer) error {
prefix = v.key(prefix)
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3awsLister{
}
// Mtime returns the stored timestamp for the given locator.
-func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
+func (v *s3Volume) Mtime(loc string) (time.Time, error) {
key := v.key(loc)
_, err := v.head(key)
if err != nil {
}
// InternalStats returns bucket I/O and API call counters.
-func (v *S3AWSVolume) InternalStats() interface{} {
+func (v *s3Volume) InternalStats() interface{} {
return &v.bucket.stats
}
// BlockTouch sets the timestamp for the given locator to the current time.
-func (v *S3AWSVolume) BlockTouch(hash string) error {
+func (v *s3Volume) BlockTouch(hash string) error {
key := v.key(hash)
_, err := v.head(key)
err = v.translateError(err)
// checkRaceWindow returns a non-nil error if trash/key is, or might
// be, in the race window (i.e., it's not safe to trash key).
-func (v *S3AWSVolume) checkRaceWindow(key string) error {
+func (v *s3Volume) checkRaceWindow(key string) error {
resp, err := v.head("trash/" + key)
err = v.translateError(err)
if os.IsNotExist(err) {
}
// Trash a Keep block.
-func (v *S3AWSVolume) BlockTrash(loc string) error {
+func (v *s3Volume) BlockTrash(loc string) error {
if t, err := v.Mtime(loc); err != nil {
return err
} else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
key := v.key(loc)
if v.cluster.Collections.BlobTrashLifetime == 0 {
if !v.UnsafeDelete {
- return ErrS3TrashDisabled
+ return errS3TrashDisabled
}
return v.translateError(v.bucket.Del(key))
}
}
// BlockUntrash moves block from trash back into store
-func (v *S3AWSVolume) BlockUntrash(hash string) error {
+func (v *s3Volume) BlockUntrash(hash string) error {
key := v.key(hash)
err := v.safeCopy(key, "trash/"+key)
if err != nil {
s3server *httptest.Server
metadata *httptest.Server
cluster *arvados.Cluster
- volumes []*TestableS3AWSVolume
+ volumes []*testableS3Volume
}
func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
}, 0)
v.IndexPageSize = 3
for i := 0; i < 256; i++ {
- v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+ err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+ c.Assert(err, check.IsNil)
}
for _, spec := range []struct {
prefix string
// The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
// as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
- vol := S3AWSVolume{
+ vol := s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
AccessKeyID: "xxx",
SecretAccessKey: "xxx",
}))
defer s.metadata.Close()
- v := &S3AWSVolume{
+ v := &s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
IAMRole: s.metadata.URL + "/latest/api/token",
Endpoint: "http://localhost:12345",
s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
- deadv := &S3AWSVolume{
+ deadv := &s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
IAMRole: s.metadata.URL + "/fake-metadata/test-role",
Endpoint: "http://localhost:12345",
}
func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
- s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+ s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
_, err := v.BlockRead(ctx, fooHash, io.Discard)
return err
})
}
func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
- s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+ s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
return v.BlockWrite(ctx, fooHash, []byte("foo"))
})
}
-func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
+func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
handler := &s3AWSBlockingHandler{}
s.s3server = httptest.NewServer(handler)
defer s.s3server.Close()
}
}
-type TestableS3AWSVolume struct {
- *S3AWSVolume
+type testableS3Volume struct {
+ *s3Volume
server *httptest.Server
c *check.C
serverClock *s3AWSFakeClock
}
}
-func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *TestableS3AWSVolume {
+func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
clock := &s3AWSFakeClock{}
// fake s3
iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
}
- v := &TestableS3AWSVolume{
- S3AWSVolume: &S3AWSVolume{
+ v := &testableS3Volume{
+ s3Volume: &s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
IAMRole: iamRole,
AccessKeyID: accessKey,
server: srv,
serverClock: clock,
}
- c.Assert(v.S3AWSVolume.check(""), check.IsNil)
+ c.Assert(v.s3Volume.check(""), check.IsNil)
// Our test S3 server uses the older 'Path Style'
- v.S3AWSVolume.bucket.svc.ForcePathStyle = true
+ v.s3Volume.bucket.svc.ForcePathStyle = true
// Create the testbucket
input := &s3.CreateBucketInput{
Bucket: aws.String(S3AWSTestBucketName),
}
- req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
+ req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
_, err := req.Send(context.Background())
c.Assert(err, check.IsNil)
// We couldn't set RaceWindow until now because check()
// rejects negative values.
- v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
+ v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
return v
}
-// PutRaw skips the ContentMD5 test
-func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
+func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
key := v.key(loc)
r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
Body: r,
})
if err != nil {
- v.logger.Printf("PutRaw: %s: %+v", key, err)
+ return err
}
empty := bytes.NewReader([]byte{})
Key: aws.String("recent/" + key),
Body: empty,
})
- if err != nil {
- v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
- }
+ return err
}
// TouchWithDate turns back the clock while doing a Touch(). We assume
// there are no other operations happening on the same s3test server
// while we do this.
-func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
+func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
v.serverClock.now = &lastPut
uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
v.serverClock.now = nil
}
-func (v *TestableS3AWSVolume) Teardown() {
+func (v *testableS3Volume) Teardown() {
v.server.Close()
}
-func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
+func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
return "get", "put"
}
}
func newUnixVolume(params newVolumeParams) (volume, error) {
- v := &UnixVolume{
+ v := &unixVolume{
uuid: params.UUID,
cluster: params.Cluster,
volume: params.ConfigVolume,
return v, v.check()
}
-func (v *UnixVolume) check() error {
+func (v *unixVolume) check() error {
if v.Root == "" {
return errors.New("DriverParameters.Root was not provided")
}
return err
}
-// A UnixVolume stores and retrieves blocks in a local directory.
-type UnixVolume struct {
+// A unixVolume stores and retrieves blocks in a local directory.
+type unixVolume struct {
Root string // path to the volume's root directory
Serialize bool
// filesystem root to storage directory, joined by "/". For example,
// the device ID for a local directory "/mnt/xvda1/keep" might be
// "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
-func (v *UnixVolume) DeviceID() string {
+func (v *unixVolume) DeviceID() string {
giveup := func(f string, args ...interface{}) string {
v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...)
host, _ := os.Hostname()
}
// BlockTouch sets the timestamp for the given locator to the current time
-func (v *UnixVolume) BlockTouch(hash string) error {
+func (v *unixVolume) BlockTouch(hash string) error {
p := v.blockPath(hash)
f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
}
// Mtime returns the stored timestamp for the given locator.
-func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
+func (v *unixVolume) Mtime(loc string) (time.Time, error) {
p := v.blockPath(loc)
fi, err := v.os.Stat(p)
if err != nil {
// Lock the locker (if one is in use), open the file for reading, and
// call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
+func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
if err := v.lock(ctx); err != nil {
return err
}
}
// stat is os.Stat() with some extra sanity checks.
-func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+func (v *unixVolume) stat(path string) (os.FileInfo, error) {
stat, err := v.os.Stat(path)
if err == nil {
if stat.Size() < 0 {
}
// BlockRead reads a block from the volume.
-func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
path := v.blockPath(hash)
stat, err := v.stat(path)
if err != nil {
// BlockWrite stores a block on the volume. If it already exists, its
// timestamp is updated.
-func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *unixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
if v.isFull() {
return errFull
}
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
-func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
+func (v *unixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
rootdir, err := v.os.Open(v.Root)
if err != nil {
return err
// BlobTrashLifetime == 0, the block is deleted; otherwise, the block
// is renamed as path/{loc}.trash.{deadline}, where deadline = now +
// BlobTrashLifetime.
-func (v *UnixVolume) BlockTrash(loc string) error {
+func (v *unixVolume) BlockTrash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// and Trash() because either (a) the file will be trashed and Touch()
// BlockUntrash moves block from trash back into store
// Look for path/{loc}.trash.{deadline} in storage,
// and rename the first such file as path/{loc}
-func (v *UnixVolume) BlockUntrash(hash string) error {
+func (v *unixVolume) BlockUntrash(hash string) error {
v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
files, err := ioutil.ReadDir(v.blockDir(hash))
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
-func (v *UnixVolume) blockDir(loc string) string {
+func (v *unixVolume) blockDir(loc string) string {
return filepath.Join(v.Root, loc[0:3])
}
// blockPath returns the fully qualified pathname for the path to loc
// on this volume.
-func (v *UnixVolume) blockPath(loc string) string {
+func (v *unixVolume) blockPath(loc string) string {
return filepath.Join(v.blockDir(loc), loc)
}
// isFull returns true if the free space on the volume is less than
// MinFreeKilobytes.
-func (v *UnixVolume) isFull() (isFull bool) {
+func (v *unixVolume) isFull() (isFull bool) {
fullSymlink := v.Root + "/full"
// Check if the volume has been marked as full in the last hour.
// FreeDiskSpace returns the number of unused 1k blocks available on
// the volume.
-func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+func (v *unixVolume) FreeDiskSpace() (free uint64, err error) {
var fs syscall.Statfs_t
err = syscall.Statfs(v.Root, &fs)
if err == nil {
}
// InternalStats returns I/O and filesystem ops counters.
-func (v *UnixVolume) InternalStats() interface{} {
+func (v *unixVolume) InternalStats() interface{} {
return &v.os.stats
}
// lock acquires the serialize lock, if one is in use. If ctx is done
// before the lock is acquired, lock returns ctx.Err() instead of
// acquiring the lock.
-func (v *UnixVolume) lock(ctx context.Context) error {
+func (v *unixVolume) lock(ctx context.Context) error {
if v.locker == nil {
return nil
}
}
// unlock releases the serialize lock, if one is in use.
-func (v *UnixVolume) unlock() {
+func (v *unixVolume) unlock() {
if v.locker == nil {
return
}
}
// lockfile and unlockfile use flock(2) to manage kernel file locks.
-func (v *UnixVolume) lockfile(f *os.File) error {
+func (v *unixVolume) lockfile(f *os.File) error {
v.os.stats.TickOps("flock")
v.os.stats.Tick(&v.os.stats.FlockOps)
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
return err
}
-func (v *UnixVolume) unlockfile(f *os.File) error {
+func (v *unixVolume) unlockfile(f *os.File) error {
err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
v.os.stats.TickErr(err)
return err
// Where appropriate, translate a more specific filesystem error to an
// error recognized by handlers, like os.ErrNotExist.
-func (v *UnixVolume) translateError(err error) error {
+func (v *unixVolume) translateError(err error) error {
switch err.(type) {
case *os.PathError:
// stat() returns a PathError if the parent directory
// EmptyTrash walks hierarchy looking for {hash}.trash.*
// and deletes those with deadline < now.
-func (v *UnixVolume) EmptyTrash() {
+func (v *unixVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
var blocksDeleted, blocksInTrash int64
)
type testableUnixVolume struct {
- UnixVolume
+ unixVolume
t TB
}
locker = &sync.Mutex{}
}
v := &testableUnixVolume{
- UnixVolume: UnixVolume{
+ unixVolume: unixVolume{
Root: d,
locker: locker,
uuid: params.UUID,
return string(buf)
}
- c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once
+ c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
_, err := vol.BlockRead(context.Background(), fooHash, io.Discard)
func (s *unixVolumeSuite) TestSkipUnusedDirs(c *check.C) {
vol := s.newTestableUnixVolume(c, s.params, false)
- err := os.Mkdir(vol.UnixVolume.Root+"/aaa", 0777)
+ err := os.Mkdir(vol.unixVolume.Root+"/aaa", 0777)
c.Assert(err, check.IsNil)
- err = os.Mkdir(vol.UnixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
+ err = os.Mkdir(vol.unixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
c.Assert(err, check.IsNil)
- deleteme := vol.UnixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
+ deleteme := vol.unixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777)
c.Assert(err, check.IsNil)
- skipme := vol.UnixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
+ skipme := vol.unixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777)
c.Assert(err, check.IsNil)
vol.EmptyTrash()
InBytes uint64
OutBytes uint64
}
-
-type InternalStatser interface {
- InternalStats() interface{}
-}