}
// Put stores a Keep block as a block blob in the container.
-func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
data[i] = byte((i + 7) & 0xff)
}
hash := fmt.Sprintf("%x", md5.Sum(data))
- err := v.Put(hash, data)
+ err := v.Put(context.TODO(), hash, data)
if err != nil {
t.Error(err)
}
allDone := make(chan struct{})
v.azHandler.race = make(chan chan struct{})
go func() {
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.TODO(), TestHash, TestBlock)
if err != nil {
t.Error(err)
}
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- if err := vols[0].Put(TestHash, TestBlock); err != nil {
+ if err := vols[0].Put(context.TODO(), TestHash, TestBlock); err != nil {
t.Error(err)
}
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, TestBlock)
- vols[1].Put(TestHash2, TestBlock2)
- vols[0].Put(TestHash+".meta", []byte("metadata"))
- vols[1].Put(TestHash2+".meta", []byte("metadata"))
+ vols[0].Put(context.TODO(), TestHash, TestBlock)
+ vols[1].Put(context.TODO(), TestHash2, TestBlock2)
+ vols[0].Put(context.TODO(), TestHash+".meta", []byte("metadata"))
+ vols[1].Put(context.TODO(), TestHash2+".meta", []byte("metadata"))
theConfig.systemAuthToken = "DATA MANAGER TOKEN"
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, TestBlock)
+ vols[0].Put(context.TODO(), TestHash, TestBlock)
// Explicitly set the BlobSignatureTTL to 0 for these
// tests, to ensure the MockVolume deletes the blocks
// A DELETE request on a block newer than BlobSignatureTTL
// should return success but leave the block on the volume.
- vols[0].Put(TestHash, TestBlock)
+ vols[0].Put(context.TODO(), TestHash, TestBlock)
theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
response = IssueRequest(superuserExistingBlockReq)
KeepVM = MakeTestVolumeManager(2)
defer KeepVM.Close()
- if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+ if err := KeepVM.AllWritable()[0].Put(context.TODO(), TestHash, TestBlock); err != nil {
t.Error(err)
}
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- if err := vols[0].Put(TestHash, TestBlock); err != nil {
+ if err := vols[0].Put(context.TODO(), TestHash, TestBlock); err != nil {
t.Error(err)
}
KeepVM = MakeTestVolumeManager(2)
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, TestBlock)
+ vols[0].Put(context.TODO(), TestHash, TestBlock)
theConfig.systemAuthToken = "DATA MANAGER TOKEN"
// PutBlockHandler is a HandleFunc to address Put block requests.
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+ ctx := contextForResponse(context.TODO(), resp)
+
hash := mux.Vars(req)["hash"]
// Detect as many error conditions as possible before reading
return
}
- replication, err := PutBlock(buf, hash)
+ replication, err := PutBlock(ctx, buf, hash)
bufs.Put(buf)
if err != nil {
// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
//
-// PutBlock(block, hash)
+// PutBlock(ctx, block, hash)
// Stores the BLOCK (identified by the content id HASH) in Keep.
//
// The MD5 checksum of the block must be identical to the content id HASH.
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if vol := KeepVM.NextWritable(); vol != nil {
- if err := vol.Put(hash, block); err == nil {
+ if err := vol.Put(context.TODO(), hash, block); err == nil {
return vol.Replication(), nil // success!
}
}
allFull := true
for _, vol := range writables {
- err := vol.Put(hash, block)
+ err := vol.Put(ctx, hash, block)
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ default:
+ }
if err == nil {
return vol.Replication(), nil // success!
}
setupHandlersWithGenericVolumeTest(t, factory)
// PutBlock
- if _, err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(context.TODO(), testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
// Check that PutBlock succeeds again even after CompareAndTouch
- if _, err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(context.TODO(), testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
testableVolumes[1].PutRaw(testHash, badData)
// Check that PutBlock with good data succeeds
- if _, err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(context.TODO(), testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock for %q: %s", testHash, err)
}
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- if err := vols[1].Put(TestHash, TestBlock); err != nil {
+ if err := vols[1].Put(context.TODO(), TestHash, TestBlock); err != nil {
t.Error(err)
}
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- vols[0].Put(TestHash, BadBlock)
+ vols[0].Put(context.TODO(), TestHash, BadBlock)
// Check that GetBlock returns failure.
buf := make([]byte, BlockSize)
defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ if n, err := PutBlock(context.TODO(), TestBlock, TestHash); err != nil || n < 1 {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ if n, err := PutBlock(context.TODO(), TestBlock, TestHash); err != nil || n < 1 {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
// Check that PutBlock returns the expected error when the hash does
// not match the block.
- if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+ if _, err := PutBlock(context.TODO(), BadBlock, TestHash); err != RequestHashError {
t.Errorf("Expected RequestHashError, got %v", err)
}
// Store a corrupted block under TestHash.
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, BadBlock)
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ vols[0].Put(context.TODO(), TestHash, BadBlock)
+ if n, err := PutBlock(context.TODO(), TestBlock, TestHash); err != nil || n < 1 {
t.Errorf("PutBlock: n %d err %v", n, err)
}
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
- if _, err := PutBlock(b1, locator); err != nil {
+ if _, err := PutBlock(context.TODO(), b1, locator); err != nil {
t.Error(err)
}
- if _, err := PutBlock(b2, locator); err == nil {
+ if _, err := PutBlock(context.TODO(), b2, locator); err == nil {
t.Error("PutBlock did not report a collision")
} else if err != CollisionError {
t.Errorf("PutBlock returned %v", err)
// Store a block and then make the underlying volume bad,
// so a subsequent attempt to update the file timestamp
// will fail.
- vols[0].Put(TestHash, BadBlock)
+ vols[0].Put(context.TODO(), TestHash, BadBlock)
oldMtime, err := vols[0].Mtime(TestHash)
if err != nil {
t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
// vols[0].Touch will fail on the next call, so the volume
// manager will store a copy on vols[1] instead.
vols[0].(*MockVolume).Touchable = false
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ if n, err := PutBlock(context.TODO(), TestBlock, TestHash); err != nil || n < 1 {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols[0].(*MockVolume).Touchable = true
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- vols[0].Put(TestHash, TestBlock)
- vols[1].Put(TestHash2, TestBlock2)
- vols[0].Put(TestHash3, TestBlock3)
- vols[0].Put(TestHash+".meta", []byte("metadata"))
- vols[1].Put(TestHash2+".meta", []byte("metadata"))
+ vols[0].Put(context.TODO(), TestHash, TestBlock)
+ vols[1].Put(context.TODO(), TestHash2, TestBlock2)
+ vols[0].Put(context.TODO(), TestHash3, TestBlock3)
+ vols[0].Put(context.TODO(), TestHash+".meta", []byte("metadata"))
+ vols[1].Put(context.TODO(), TestHash2+".meta", []byte("metadata"))
buf := new(bytes.Buffer)
vols[0].IndexTo("", buf)
package main
import (
+ "context"
"crypto/rand"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
// Put block
var PutContent = func(content []byte, locator string) (err error) {
- _, err = PutBlock(content, locator)
+ _, err = PutBlock(context.TODO(), content, locator)
return
}
package main
import (
+ "bytes"
"context"
"encoding/base64"
"encoding/hex"
"flag"
"fmt"
"io"
+ "io/ioutil"
"log"
"net/http"
"os"
}
// Put writes a block.
-func (v *S3Volume) Put(loc string, block []byte) error {
+func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
var opts s3.Options
- if len(block) > 0 {
+ size := len(block)
+ if size > 0 {
md5, err := hex.DecodeString(loc)
if err != nil {
return err
}
opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
}
- err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
- if err != nil {
- return v.translateError(err)
+
+ // Send the block data through a pipe, so that (if we need to)
+ // we can close the pipe early and abandon our PutReader()
+ // goroutine, without worrying about PutReader() accessing our
+ // block buffer after we release it.
+ bufr, bufw := io.Pipe()
+ go func() {
+ io.Copy(bufw, bytes.NewReader(block))
+ bufw.Close()
+ }()
+
+ var err error
+ ready := make(chan bool)
+ go func() {
+ defer func() {
+ select {
+ case <-ctx.Done():
+ theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+ default:
+ }
+ }()
+ defer close(ready)
+ err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+ if err != nil {
+ err = v.translateError(err)
+ return
+ }
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.translateError(err)
+ }()
+ select {
+ case <-ctx.Done():
+ theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
+ // Our pipe might be stuck in Write(), waiting for
+ // io.Copy() to read. If so, un-stick it. This means
+ // PutReader will get corrupt data, but that's OK: the
+ // size and MD5 won't match, so the write will fail.
+ go io.Copy(ioutil.Discard, bufr)
+ // CloseWithError() will return once pending I/O is done.
+ bufw.CloseWithError(ctx.Err())
+ theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
+ return ctx.Err()
+ case <-ready:
+ return err
}
- err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
- return v.translateError(err)
}
// Touch sets the timestamp for the given locator to the current time.
// Check for current Mtime after Put (applies to all
// scenarios)
loc, blk = setupScenario()
- err = v.Put(loc, blk)
+ err = v.Put(context.TODO(), loc, blk)
c.Check(err, check.IsNil)
t, err := v.Mtime(loc)
c.Check(err, check.IsNil)
// Put test content
vols := KeepVM.AllWritable()
if testData.CreateData {
- vols[0].Put(testData.Locator1, testData.Block1)
- vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
+ vols[0].Put(context.TODO(), testData.Locator1, testData.Block1)
+ vols[0].Put(context.TODO(), testData.Locator1+".meta", []byte("metadata"))
if testData.CreateInVolume1 {
- vols[0].Put(testData.Locator2, testData.Block2)
- vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
+ vols[0].Put(context.TODO(), testData.Locator2, testData.Block2)
+ vols[0].Put(context.TODO(), testData.Locator2+".meta", []byte("metadata"))
} else {
- vols[1].Put(testData.Locator2, testData.Block2)
- vols[1].Put(testData.Locator2+".meta", []byte("metadata"))
+ vols[1].Put(context.TODO(), testData.Locator2, testData.Block2)
+ vols[1].Put(context.TODO(), testData.Locator2+".meta", []byte("metadata"))
}
}
//
// Put should not verify that loc==hash(block): this is the
// caller's responsibility.
- Put(loc string, block []byte) error
+ Put(ctx context.Context, loc string, block []byte) error
// Touch sets the timestamp for the given locator to the
// current time.
return
}
- err := v.Put(testHash, testData)
+ err := v.Put(context.TODO(), testHash, testData)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
}
- err = v.Put(testHash, testData)
+ err = v.Put(context.TODO(), testHash, testData)
if err != nil {
t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
}
v.PutRaw(testHash, testDataA)
- putErr := v.Put(testHash, testDataB)
+ putErr := v.Put(context.TODO(), testHash, testDataB)
buf := make([]byte, BlockSize)
n, getErr := v.Get(context.TODO(), testHash, buf)
if putErr == nil {
return
}
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.TODO(), TestHash, TestBlock)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
}
- err = v.Put(TestHash2, TestBlock2)
+ err = v.Put(context.TODO(), TestHash2, TestBlock2)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
}
- err = v.Put(TestHash3, TestBlock3)
+ err = v.Put(context.TODO(), TestHash3, TestBlock3)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
}
return
}
- if err := v.Put(TestHash, TestBlock); err != nil {
+ if err := v.Put(context.TODO(), TestHash, TestBlock); err != nil {
t.Error(err)
}
}
// Write the same block again.
- if err := v.Put(TestHash, TestBlock); err != nil {
+ if err := v.Put(context.TODO(), TestHash, TestBlock); err != nil {
t.Error(err)
}
return
}
- v.Put(TestHash, TestBlock)
+ v.Put(context.TODO(), TestHash, TestBlock)
if err := v.Trash(TestHash); err != nil {
t.Error(err)
return
}
- v.Put(TestHash, TestBlock)
+ v.Put(context.TODO(), TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
if err := v.Trash(TestHash); err != nil {
}
// Put a new block to read-only volume should result in error
- err = v.Put(TestHash2, TestBlock2)
+ err = v.Put(context.TODO(), TestHash2, TestBlock2)
if err == nil {
t.Errorf("Expected error when putting block in a read-only volume")
}
}
// Overwriting an existing block in read-only volume should result in error
- err = v.Put(TestHash, TestBlock)
+ err = v.Put(context.TODO(), TestHash, TestBlock)
if err == nil {
t.Errorf("Expected error when putting block in a read-only volume")
}
sem := make(chan int)
go func(sem chan int) {
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.TODO(), TestHash, TestBlock)
if err != nil {
t.Errorf("err1: %v", err)
}
}(sem)
go func(sem chan int) {
- err := v.Put(TestHash2, TestBlock2)
+ err := v.Put(context.TODO(), TestHash2, TestBlock2)
if err != nil {
t.Errorf("err2: %v", err)
}
}(sem)
go func(sem chan int) {
- err := v.Put(TestHash3, TestBlock3)
+ err := v.Put(context.TODO(), TestHash3, TestBlock3)
if err != nil {
t.Errorf("err3: %v", err)
}
wdata[0] = 'a'
wdata[BlockSize-1] = 'z'
hash := fmt.Sprintf("%x", md5.Sum(wdata))
- err := v.Put(hash, wdata)
+ err := v.Put(context.TODO(), hash, wdata)
if err != nil {
t.Fatal(err)
}
return 0, os.ErrNotExist
}
-func (v *MockVolume) Put(loc string, block []byte) error {
+func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
v.gotCall("Put")
<-v.Gate
if v.Bad {
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
// that error is returned.
-func (v *UnixVolume) Put(loc string, block []byte) error {
+func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
v.locker.Lock()
defer v.locker.Unlock()
}
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
tmpfile.Close()
v.ReadOnly = orig
}(v.ReadOnly)
v.ReadOnly = false
- err := v.Put(locator, data)
+ err := v.Put(context.TODO(), locator, data)
if err != nil {
v.t.Fatal(err)
}
func TestGetNotFound(t *testing.T) {
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.TODO(), TestHash, TestBlock)
buf := make([]byte, BlockSize)
n, err := v.Get(context.TODO(), TestHash2, buf)
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.TODO(), TestHash, TestBlock)
if err != nil {
t.Error(err)
}
defer v.Teardown()
os.Chmod(v.Root, 000)
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.TODO(), TestHash, TestBlock)
if err == nil {
t.Error("Write should have failed")
}
t.Errorf("got err %v, expected nil", err)
}
- err = v.Put(TestHash, TestBlock)
+ err = v.Put(context.TODO(), TestHash, TestBlock)
if err != MethodDisabledError {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.TODO(), TestHash, TestBlock)
mockErr := errors.New("Mock error")
err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
return mockErr
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.TODO(), TestHash, TestBlock)
mtx := NewMockMutex()
v.locker = mtx
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.TODO(), TestHash, TestBlock)
err := v.Compare(TestHash, TestBlock)
if err != nil {
t.Errorf("Got err %q, expected nil", err)
t.Errorf("Got err %q, expected %q", err, CollisionError)
}
- v.Put(TestHash, []byte("baddata"))
+ v.Put(context.TODO(), TestHash, []byte("baddata"))
err = v.Compare(TestHash, TestBlock)
if err != DiskHashError {
t.Errorf("Got err %q, expected %q", err, DiskHashError)