import (
"bytes"
+ "context"
"errors"
"flag"
"fmt"
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a PutBlob operation is in progress, and
// wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return 0, err
import (
"bytes"
+ "context"
"crypto/md5"
"encoding/base64"
"encoding/xml"
t.Error(err)
}
gotData := make([]byte, len(data))
- gotLen, err := v.Get(hash, gotData)
+ gotLen, err := v.Get(context.TODO(), hash, gotData)
if err != nil {
t.Error(err)
}
v.azHandler.race <- continuePut
go func() {
buf := make([]byte, len(TestBlock))
- _, err := v.Get(TestHash, buf)
+ _, err := v.Get(context.TODO(), TestHash, buf)
if err != nil {
t.Error(err)
}
go func() {
defer close(allDone)
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.TODO(), TestHash, buf)
if err != nil {
t.Error(err)
return
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"net/http"
}
// Confirm the block has been deleted
buf := make([]byte, BlockSize)
- _, err := vols[0].Get(TestHash, buf)
+ _, err := vols[0].Get(context.TODO(), TestHash, buf)
var blockDeleted = os.IsNotExist(err)
if !blockDeleted {
t.Error("superuserExistingBlockReq: block not deleted")
expectedDc, responseDc)
}
// Confirm the block has NOT been deleted.
- _, err = vols[0].Get(TestHash, buf)
+ _, err = vols[0].Get(context.TODO(), TestHash, buf)
if err != nil {
t.Errorf("testing delete on new block: %s\n", err)
}
import (
"container/list"
+ "context"
"crypto/md5"
"encoding/json"
"fmt"
}
defer bufs.Put(buf)
- size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+ ctx, cancel := context.WithCancel(context.TODO())
+ if resp, ok := resp.(http.CloseNotifier); ok {
+ go func() {
+ <-resp.CloseNotify()
+ cancel()
+ }()
+ }
+ size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
// If the block found does not have the correct MD5 hash, returns
// DiskHashError.
//
-func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
for _, vol := range KeepVM.AllReadable() {
- size, err := vol.Get(hash, buf)
+ size, err := vol.Get(ctx, hash, buf)
if err != nil {
// IsNotExist is an expected error and may be
// ignored. All other errors are logged. In
import (
"bytes"
+ "context"
)
// A TestableVolumeManagerFactory creates a volume manager with at least two TestableVolume instances.
// Get should pass
buf := make([]byte, len(testBlock))
- n, err := GetBlock(testHash, buf, nil)
+ n, err := GetBlock(context.TODO(), testHash, buf, nil)
if err != nil {
t.Fatalf("Error while getting block %s", err)
}
// Get should fail
buf := make([]byte, BlockSize)
- size, err := GetBlock(testHash, buf, nil)
+ size, err := GetBlock(context.TODO(), testHash, buf, nil)
if err == nil {
t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
}
// Check that PutBlock stored the data as expected
buf := make([]byte, BlockSize)
- size, err := GetBlock(testHash, buf, nil)
+ size, err := GetBlock(context.TODO(), testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
} else if bytes.Compare(buf[:size], testBlock) != 0 {
// Put succeeded and overwrote the badData in one volume,
// and Get should return the testBlock now, ignoring the bad data.
buf := make([]byte, BlockSize)
- size, err := GetBlock(testHash, buf, nil)
+ size, err := GetBlock(context.TODO(), testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
} else if bytes.Compare(buf[:size], testBlock) != 0 {
import (
"bytes"
+ "context"
"fmt"
"io/ioutil"
"os"
// Check that GetBlock returns success.
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.TODO(), TestHash, buf, nil)
if err != nil {
t.Errorf("GetBlock error: %s", err)
}
// Check that GetBlock returns failure.
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.TODO(), TestHash, buf, nil)
if err != NotFoundError {
t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
}
// Check that GetBlock returns failure.
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.TODO(), TestHash, buf, nil)
if err != DiskHashError {
t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
}
vols := KeepVM.AllReadable()
buf := make([]byte, BlockSize)
- n, err := vols[1].Get(TestHash, buf)
+ n, err := vols[1].Get(context.TODO(), TestHash, buf)
if err != nil {
t.Fatalf("Volume #0 Get returned error: %v", err)
}
}
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.TODO(), TestHash, buf, nil)
if err != nil {
t.Fatalf("GetBlock: %v", err)
}
}
// Confirm that GetBlock fails to return anything.
- if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
+ if result, err := GetBlock(context.TODO(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
string(result), err)
}
// The block on disk should now match TestBlock.
buf := make([]byte, BlockSize)
- if size, err := GetBlock(TestHash, buf, nil); err != nil {
+ if size, err := GetBlock(context.TODO(), TestHash, buf, nil); err != nil {
t.Errorf("GetBlock: %v", err)
} else if bytes.Compare(buf[:size], TestBlock) != 0 {
t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
oldMtime, newMtime)
}
buf := make([]byte, BlockSize)
- n, err := vols[1].Get(TestHash, buf)
+ n, err := vols[1].Get(context.TODO(), TestHash, buf)
if err != nil {
t.Fatalf("vols[1]: %v", err)
}
package main
import (
+ "context"
"encoding/base64"
"encoding/hex"
"flag"
// Get a block: copy the block data into buf, and return the number of
// bytes copied.
-func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
- rdr, err := v.getReader(loc)
- if err != nil {
- return 0, err
+func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+ ready := make(chan bool)
+ var rdr io.ReadCloser
+ var err error
+ go func() {
+ rdr, err = v.getReader(loc)
+ close(ready)
+ }()
+ select {
+ case <-ctx.Done():
+ // Client hung up before we could even send our S3 request
+ return 0, ctx.Err()
+ case <-ready:
+ if err != nil {
+ return 0, err
+ }
}
- defer rdr.Close()
- n, err := io.ReadFull(rdr, buf)
- switch err {
- case nil, io.EOF, io.ErrUnexpectedEOF:
- return n, nil
- default:
- return 0, v.translateError(err)
+
+ var n int
+ ready = make(chan bool)
+ go func() {
+ defer close(ready)
+
+ defer rdr.Close()
+ n, err = io.ReadFull(rdr, buf)
+
+ switch err {
+ case nil, io.EOF, io.ErrUnexpectedEOF:
+ err = nil
+ default:
+ err = v.translateError(err)
+ }
+ }()
+ select {
+ case <-ctx.Done():
+ rdr.Close()
+ // Must wait for ReadFull to return, to ensure it
+ // doesn't write to buf after we return.
+ <-ready
+ return 0, ctx.Err()
+ case <-ready:
+ return n, err
}
}
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io/ioutil"
// Check canGet
loc, blk := setupScenario()
buf := make([]byte, len(blk))
- _, err := v.Get(loc, buf)
+ _, err := v.Get(context.TODO(), loc, buf)
c.Check(err == nil, check.Equals, scenario.canGet)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
loc, blk = setupScenario()
err = v.Trash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.Get(loc, buf)
+ _, err = v.Get(context.TODO(), loc, buf)
c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
// should be able to Get after Untrash --
// regardless of timestamps, errors, race
// conditions, etc.
- _, err = v.Get(loc, buf)
+ _, err = v.Get(context.TODO(), loc, buf)
c.Check(err, check.IsNil)
}
import (
"container/list"
+ "context"
"testing"
"time"
)
// Verify Locator1 to be un/deleted as expected
buf := make([]byte, BlockSize)
- size, err := GetBlock(testData.Locator1, buf, nil)
+ size, err := GetBlock(context.TODO(), testData.Locator1, buf, nil)
if testData.ExpectLocator1 {
if size == 0 || err != nil {
t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- size, err = GetBlock(testData.Locator2, buf, nil)
+ size, err = GetBlock(context.TODO(), testData.Locator2, buf, nil)
if testData.ExpectLocator2 {
if size == 0 || err != nil {
t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
locatorFoundIn := 0
for _, volume := range KeepVM.AllReadable() {
buf := make([]byte, BlockSize)
- if _, err := volume.Get(testData.Locator1, buf); err == nil {
+ if _, err := volume.Get(context.TODO(), testData.Locator1, buf); err == nil {
locatorFoundIn = locatorFoundIn + 1
}
}
package main
import (
+ "context"
"io"
"sync/atomic"
"time"
// any of the data.
//
// len(buf) will not exceed BlockSize.
- Get(loc string, buf []byte) (int, error)
+ Get(ctx context.Context, loc string, buf []byte) (int, error)
// Compare the given data with the stored data (i.e., what Get
// would return). If equal, return nil. If not, return
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"os"
v.PutRaw(TestHash, TestBlock)
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.TODO(), TestHash, buf)
if err != nil {
t.Fatal(err)
}
defer v.Teardown()
buf := make([]byte, BlockSize)
- if _, err := v.Get(TestHash2, buf); err == nil {
+ if _, err := v.Get(context.TODO(), TestHash2, buf); err == nil {
t.Errorf("Expected error while getting non-existing block %v", TestHash2)
}
}
putErr := v.Put(testHash, testDataB)
buf := make([]byte, BlockSize)
- n, getErr := v.Get(testHash, buf)
+ n, getErr := v.Get(context.TODO(), testHash, buf)
if putErr == nil {
// Put must not return a nil error unless it has
// overwritten the existing data.
}
data := make([]byte, BlockSize)
- n, err := v.Get(TestHash, data)
+ n, err := v.Get(context.TODO(), TestHash, data)
if err != nil {
t.Error(err)
} else {
}
}
- n, err = v.Get(TestHash2, data)
+ n, err = v.Get(context.TODO(), TestHash2, data)
if err != nil {
t.Error(err)
} else {
}
}
- n, err = v.Get(TestHash3, data)
+ n, err = v.Get(context.TODO(), TestHash3, data)
if err != nil {
t.Error(err)
} else {
t.Error(err)
}
data := make([]byte, BlockSize)
- n, err := v.Get(TestHash, data)
+ n, err := v.Get(context.TODO(), TestHash, data)
if err != nil {
t.Error(err)
} else if bytes.Compare(data[:n], TestBlock) != 0 {
t.Error(err)
}
data := make([]byte, BlockSize)
- if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
+ if _, err := v.Get(context.TODO(), TestHash, data); err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
buf := make([]byte, BlockSize)
// Get from read-only volume should succeed
- _, err := v.Get(TestHash, buf)
+ _, err := v.Get(context.TODO(), TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}
if err == nil {
t.Errorf("Expected error when putting block in a read-only volume")
}
- _, err = v.Get(TestHash2, buf)
+ _, err = v.Get(context.TODO(), TestHash2, buf)
if err == nil {
t.Errorf("Expected error when getting block whose put in read-only volume failed")
}
sem := make(chan int)
go func() {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.TODO(), TestHash, buf)
if err != nil {
t.Errorf("err1: %v", err)
}
go func() {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash2, buf)
+ n, err := v.Get(context.TODO(), TestHash2, buf)
if err != nil {
t.Errorf("err2: %v", err)
}
go func() {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash3, buf)
+ n, err := v.Get(context.TODO(), TestHash3, buf)
if err != nil {
t.Errorf("err3: %v", err)
}
// Double check that we actually wrote the blocks we expected to write.
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.TODO(), TestHash, buf)
if err != nil {
t.Errorf("Get #1: %v", err)
}
t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
}
- n, err = v.Get(TestHash2, buf)
+ n, err = v.Get(context.TODO(), TestHash2, buf)
if err != nil {
t.Errorf("Get #2: %v", err)
}
t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
}
- n, err = v.Get(TestHash3, buf)
+ n, err = v.Get(context.TODO(), TestHash3, buf)
if err != nil {
t.Errorf("Get #3: %v", err)
}
t.Fatal(err)
}
buf := make([]byte, BlockSize)
- n, err := v.Get(hash, buf)
+ n, err := v.Get(context.TODO(), hash, buf)
if err != nil {
t.Error(err)
}
v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.TODO(), TestHash, buf)
if err != nil {
t.Fatal(err)
}
t.Fatal(err)
}
} else {
- _, err = v.Get(TestHash, buf)
+ _, err = v.Get(context.TODO(), TestHash, buf)
if err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
}
// Get the block - after trash and untrash sequence
- n, err = v.Get(TestHash, buf)
+ n, err = v.Get(context.TODO(), TestHash, buf)
if err != nil {
t.Fatal(err)
}
checkGet := func() error {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.TODO(), TestHash, buf)
if err != nil {
return err
}
import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
}
}
-func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
+func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
v.gotCall("Get")
<-v.Gate
if v.Bad {
import (
"bufio"
+ "context"
"flag"
"fmt"
"io"
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
-func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
+func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
path := v.blockPath(loc)
stat, err := v.stat(path)
if err != nil {
import (
"bytes"
+ "context"
"errors"
"fmt"
"io"
v.Put(TestHash, TestBlock)
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash2, buf)
+ n, err := v.Get(context.TODO(), TestHash2, buf)
switch {
case os.IsNotExist(err):
break
v.PutRaw(TestHash, TestBlock)
buf := make([]byte, BlockSize)
- _, err := v.Get(TestHash, buf)
+ _, err := v.Get(context.TODO(), TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}