var (
azureStorageAccountName string
azureStorageAccountKeyFile string
+ azureStorageReplication int
)
func readKeyFromFile(file string) (string, error) {
if flagSerializeIO {
log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
}
- v := NewAzureBlobVolume(azClient, containerName, flagReadonly)
+ v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
if err := v.Check(); err != nil {
return err
}
"azure-storage-account-key-file",
"",
"File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+ flag.IntVar(
+ &azureStorageReplication,
+ "azure-storage-replication",
+ 3,
+ "Replication level to report to clients when data is stored in an Azure container.")
}
// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
bsClient storage.BlobStorageClient
containerName string
readonly bool
+ replication int
}
-func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool) *AzureBlobVolume {
+func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
return &AzureBlobVolume{
azClient: client,
bsClient: client.GetBlobService(),
containerName: containerName,
readonly: readonly,
+ replication: replication,
}
}
func (v *AzureBlobVolume) Writable() bool {
return !v.readonly
}
+
+func (v *AzureBlobVolume) Replication() int {
+ return v.replication
+}
t *testing.T
}
-func NewTestableAzureBlobVolume(t *testing.T, readonly bool) TestableVolume {
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume {
azHandler := newAzStubHandler()
azStub := httptest.NewServer(azHandler)
}
}
- v := NewAzureBlobVolume(azClient, container, readonly)
+ v := NewAzureBlobVolume(azClient, container, readonly, replication)
return &TestableAzureBlobVolume{
AzureBlobVolume: v,
Dial: (&azStubDialer{}).Dial,
}
DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
- return NewTestableAzureBlobVolume(t, false)
+ return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
})
}
Dial: (&azStubDialer{}).Dial,
}
DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
- return NewTestableAzureBlobVolume(t, true)
+ return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
})
}
+func TestAzureBlobVolumeReplication(t *testing.T) {
+ for r := 1; r <= 4; r++ {
+ v := NewTestableAzureBlobVolume(t, false, r)
+ defer v.Teardown()
+ if n := v.Replication(); n != r {
+ t.Errorf("Got replication %d, expected %d", n, r)
+ }
+ }
+}
+
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
v.azHandler.PutRaw(v.containerName, locator, data)
}
case <-ok:
}
}
+
+func TestPutReplicationHeader(t *testing.T) {
+ defer teardown()
+
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+
+ resp := IssueRequest(&RequestTester{
+ method: "PUT",
+ uri: "/" + TestHash,
+ requestBody: TestBlock,
+ })
+ if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
+ t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
+ }
+}
return
}
- err = PutBlock(buf, hash)
+ replication, err := PutBlock(buf, hash)
bufs.Put(buf)
if err != nil {
expiry := time.Now().Add(blobSignatureTTL)
returnHash = SignLocator(returnHash, apiToken, expiry)
}
+ resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
resp.Write([]byte(returnHash + "\n"))
}
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(block []byte, hash string) error {
+func PutBlock(block []byte, hash string) (int, error) {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return RequestHashError
+ return 0, RequestHashError
}
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
- return err
+ return 0, err
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if vol := KeepVM.NextWritable(); vol != nil {
if err := vol.Put(hash, block); err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
}
writables := KeepVM.AllWritable()
if len(writables) == 0 {
log.Print("No writable volumes.")
- return FullError
+ return 0, FullError
}
allFull := true
for _, vol := range writables {
err := vol.Put(hash, block)
if err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
if err != FullError {
// The volume is not full but the
if allFull {
log.Print("All volumes are full.")
- return FullError
+ return 0, FullError
}
// Already logged the non-full errors.
- return GenericError
+ return 0, GenericError
}
// CompareAndTouch returns nil if one of the volumes already has the
defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols := KeepVM.AllReadable()
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
result, err := GetBlock(TestHash)
// Check that PutBlock returns the expected error when the hash does
// not match the block.
- if err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+ if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
t.Error("Expected RequestHashError, got %v", err)
}
// Store a corrupted block under TestHash.
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, BadBlock)
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Errorf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Errorf("PutBlock: n %d err %v", n, err)
}
// The block on disk should now match TestBlock.
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
- if err := PutBlock(b1, locator); err != nil {
+ if _, err := PutBlock(b1, locator); err != nil {
t.Error(err)
}
- if err := PutBlock(b2, locator); err == nil {
+ if _, err := PutBlock(b2, locator); err == nil {
t.Error("PutBlock did not report a collision")
} else if err != CollisionError {
t.Errorf("PutBlock returned %v", err)
// vols[0].Touch will fail on the next call, so the volume
// manager will store a copy on vols[1] instead.
vols[0].(*MockVolume).Touchable = false
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols[0].(*MockVolume).Touchable = true
// Put block
var PutContent = func(content []byte, locator string) (err error) {
- err = PutBlock(content, locator)
+ _, err = PutBlock(content, locator)
return
}
// will fail because it is full, but Mtime or Delete can
// succeed -- then Writable should return false.
Writable() bool
+
+ // Replication returns the storage redundancy of the
+ // underlying device. It will be passed on to clients in
+ // responses to PUT requests.
+ Replication() int
}
// A VolumeManager tells callers which volumes can read, which volumes
func (v *MockVolume) Writable() bool {
return !v.Readonly
}
+
+func (v *MockVolume) Replication() int {
+ return 1
+}
return !v.readonly
}
+func (v *UnixVolume) Replication() int {
+ return 1
+}
+
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func lockfile(f *os.File) error {
return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)