7241: Add -azure-storage-replication flag.
authorTom Clegg <tom@curoverse.com>
Fri, 25 Sep 2015 18:56:27 +0000 (14:56 -0400)
committerTom Clegg <tom@curoverse.com>
Fri, 25 Sep 2015 19:31:55 +0000 (15:31 -0400)
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore_test.go
services/keepstore/pull_worker.go
services/keepstore/volume.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go

index 35b1dc79bfc28b3a2c336f26dd3732bee2f07afc..8f93c18fd811ebc2b6ed385a7be805d5b4ace942 100644 (file)
@@ -17,6 +17,7 @@ import (
 var (
        azureStorageAccountName    string
        azureStorageAccountKeyFile string
+       azureStorageReplication    int
 )
 
 func readKeyFromFile(file string) (string, error) {
@@ -50,7 +51,7 @@ func (s *azureVolumeAdder) Set(containerName string) error {
        if flagSerializeIO {
                log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
        }
-       v := NewAzureBlobVolume(azClient, containerName, flagReadonly)
+       v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
        if err := v.Check(); err != nil {
                return err
        }
@@ -72,6 +73,11 @@ func init() {
                "azure-storage-account-key-file",
                "",
                "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+       flag.IntVar(
+               &azureStorageReplication,
+               "azure-storage-replication",
+               3,
+               "Replication level to report to clients when data is stored in an Azure container.")
 }
 
 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
@@ -81,14 +87,16 @@ type AzureBlobVolume struct {
        bsClient      storage.BlobStorageClient
        containerName string
        readonly      bool
+       replication   int
 }
 
-func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool) *AzureBlobVolume {
+func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
        return &AzureBlobVolume{
                azClient: client,
                bsClient: client.GetBlobService(),
                containerName: containerName,
                readonly: readonly,
+               replication: replication,
        }
 }
 
@@ -225,3 +233,7 @@ func (v *AzureBlobVolume) String() string {
 func (v *AzureBlobVolume) Writable() bool {
        return !v.readonly
 }
+
+func (v *AzureBlobVolume) Replication() int {
+       return v.replication
+}
index 619c013a3164aada97147fbe348aa954edaf2457..dc1a7e46e48f365001a27045729f040feaa21537 100644 (file)
@@ -232,7 +232,7 @@ type TestableAzureBlobVolume struct {
        t         *testing.T
 }
 
-func NewTestableAzureBlobVolume(t *testing.T, readonly bool) TestableVolume {
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume {
        azHandler := newAzStubHandler()
        azStub := httptest.NewServer(azHandler)
 
@@ -259,7 +259,7 @@ func NewTestableAzureBlobVolume(t *testing.T, readonly bool) TestableVolume {
                }
        }
 
-       v := NewAzureBlobVolume(azClient, container, readonly)
+       v := NewAzureBlobVolume(azClient, container, readonly, replication)
 
        return &TestableAzureBlobVolume{
                AzureBlobVolume: v,
@@ -277,7 +277,7 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) {
                Dial: (&azStubDialer{}).Dial,
        }
        DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
-               return NewTestableAzureBlobVolume(t, false)
+               return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
        })
 }
 
@@ -289,10 +289,20 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
                Dial: (&azStubDialer{}).Dial,
        }
        DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
-               return NewTestableAzureBlobVolume(t, true)
+               return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
        })
 }
 
+func TestAzureBlobVolumeReplication(t *testing.T) {
+       for r := 1; r <= 4; r++ {
+               v := NewTestableAzureBlobVolume(t, false, r)
+               defer v.Teardown()
+               if n := v.Replication(); n != r {
+                       t.Errorf("Got replication %d, expected %d", n, r)
+               }
+       }
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
        v.azHandler.PutRaw(v.containerName, locator, data)
 }
index dd8366671b6b056f8e4161cfae8169cd6a086cc6..38c06acacf4a7d647bffba7c23850cb4383ae6d6 100644 (file)
@@ -924,3 +924,19 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
        case <-ok:
        }
 }
+
+func TestPutReplicationHeader(t *testing.T) {
+       defer teardown()
+
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
+       resp := IssueRequest(&RequestTester{
+               method:      "PUT",
+               uri:         "/" + TestHash,
+               requestBody: TestBlock,
+       })
+       if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
+               t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
+       }
+}
index 2b96dbc582f8b584e731401f3e467dae635ea837..c44bfb0c0336f77cf28efee3f367067a855b44cb 100644 (file)
@@ -120,7 +120,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       err = PutBlock(buf, hash)
+       replication, err := PutBlock(buf, hash)
        bufs.Put(buf)
 
        if err != nil {
@@ -137,6 +137,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                expiry := time.Now().Add(blobSignatureTTL)
                returnHash = SignLocator(returnHash, apiToken, expiry)
        }
+       resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
        resp.Write([]byte(returnHash + "\n"))
 }
 
@@ -517,40 +518,40 @@ func GetBlock(hash string) ([]byte, error) {
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(block []byte, hash string) error {
+func PutBlock(block []byte, hash string) (int, error) {
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return RequestHashError
+               return 0, RequestHashError
        }
 
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
        if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
-               return err
+               return 0, err
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if vol := KeepVM.NextWritable(); vol != nil {
                if err := vol.Put(hash, block); err == nil {
-                       return nil // success!
+                       return vol.Replication(), nil // success!
                }
        }
 
        writables := KeepVM.AllWritable()
        if len(writables) == 0 {
                log.Print("No writable volumes.")
-               return FullError
+               return 0, FullError
        }
 
        allFull := true
        for _, vol := range writables {
                err := vol.Put(hash, block)
                if err == nil {
-                       return nil // success!
+                       return vol.Replication(), nil // success!
                }
                if err != FullError {
                        // The volume is not full but the
@@ -563,10 +564,10 @@ func PutBlock(block []byte, hash string) error {
 
        if allFull {
                log.Print("All volumes are full.")
-               return FullError
+               return 0, FullError
        }
        // Already logged the non-full errors.
-       return GenericError
+       return 0, GenericError
 }
 
 // CompareAndTouch returns nil if one of the volumes already has the
index daa9199795ce370007c5caa799232ba614811254..8682e23f56d82061d013b52b6cf916a233127591 100644 (file)
@@ -122,8 +122,8 @@ func TestPutBlockOK(t *testing.T) {
        defer KeepVM.Close()
 
        // Check that PutBlock stores the data as expected.
-       if err := PutBlock(TestBlock, TestHash); err != nil {
-               t.Fatalf("PutBlock: %v", err)
+       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+               t.Fatalf("PutBlock: n %d err %v", n, err)
        }
 
        vols := KeepVM.AllReadable()
@@ -152,8 +152,8 @@ func TestPutBlockOneVol(t *testing.T) {
        vols[0].(*MockVolume).Bad = true
 
        // Check that PutBlock stores the data as expected.
-       if err := PutBlock(TestBlock, TestHash); err != nil {
-               t.Fatalf("PutBlock: %v", err)
+       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+               t.Fatalf("PutBlock: n %d err %v", n, err)
        }
 
        result, err := GetBlock(TestHash)
@@ -180,7 +180,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
 
        // Check that PutBlock returns the expected error when the hash does
        // not match the block.
-       if err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+       if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
                t.Error("Expected RequestHashError, got %v", err)
        }
 
@@ -205,8 +205,8 @@ func TestPutBlockCorrupt(t *testing.T) {
        // Store a corrupted block under TestHash.
        vols := KeepVM.AllWritable()
        vols[0].Put(TestHash, BadBlock)
-       if err := PutBlock(TestBlock, TestHash); err != nil {
-               t.Errorf("PutBlock: %v", err)
+       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+               t.Errorf("PutBlock: n %d err %v", n, err)
        }
 
        // The block on disk should now match TestBlock.
@@ -235,10 +235,10 @@ func TestPutBlockCollision(t *testing.T) {
 
        // Store one block, then attempt to store the other. Confirm that
        // PutBlock reported a CollisionError.
-       if err := PutBlock(b1, locator); err != nil {
+       if _, err := PutBlock(b1, locator); err != nil {
                t.Error(err)
        }
-       if err := PutBlock(b2, locator); err == nil {
+       if _, err := PutBlock(b2, locator); err == nil {
                t.Error("PutBlock did not report a collision")
        } else if err != CollisionError {
                t.Errorf("PutBlock returned %v", err)
@@ -269,8 +269,8 @@ func TestPutBlockTouchFails(t *testing.T) {
        // vols[0].Touch will fail on the next call, so the volume
        // manager will store a copy on vols[1] instead.
        vols[0].(*MockVolume).Touchable = false
-       if err := PutBlock(TestBlock, TestHash); err != nil {
-               t.Fatalf("PutBlock: %v", err)
+       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+               t.Fatalf("PutBlock: n %d err %v", n, err)
        }
        vols[0].(*MockVolume).Touchable = true
 
index 9f0b96fa35b1c32af4b06be92ea02632547ef249..2626d4bf68e1594f394ad4539f0f32a90fe00339 100644 (file)
@@ -95,6 +95,6 @@ func GenerateRandomAPIToken() string {
 
 // Put block
 var PutContent = func(content []byte, locator string) (err error) {
-       err = PutBlock(content, locator)
+       _, err = PutBlock(content, locator)
        return
 }
index 9bf291bdea28dce8b05baae738c8d42fbca7481c..7966c41b92bd89958308ec77765f0b7a5a1f0fd9 100644 (file)
@@ -195,6 +195,11 @@ type Volume interface {
        // will fail because it is full, but Mtime or Delete can
        // succeed -- then Writable should return false.
        Writable() bool
+
+       // Replication returns the storage redundancy of the
+       // underlying device. It will be passed on to clients in
+       // responses to PUT requests.
+       Replication() int
 }
 
 // A VolumeManager tells callers which volumes can read, which volumes
index f272c84c837676b12cc45ad3b2e962f635e746b1..d6714365de5bef98ad082b93f595231993bafa48 100644 (file)
@@ -214,3 +214,7 @@ func (v *MockVolume) String() string {
 func (v *MockVolume) Writable() bool {
        return !v.Readonly
 }
+
+func (v *MockVolume) Replication() int {
+       return 1
+}
index f498c3c32d3d1fad71609061d32a7ea8c9a222e8..98c31d1eab6d0c18f3d242daf898c3d25345e490 100644 (file)
@@ -467,6 +467,10 @@ func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }
 
+func (v *UnixVolume) Replication() int {
+       return 1
+}
+
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)