7393: Add S3 volume type.
authorTom Clegg <tom@curoverse.com>
Wed, 9 Dec 2015 17:43:50 +0000 (12:43 -0500)
committerTom Clegg <tom@curoverse.com>
Thu, 10 Dec 2015 21:11:56 +0000 (16:11 -0500)
13 files changed:
sdk/go/arvadostest/fixtures.go
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/bufferpool_test.go
services/keepstore/collision_test.go
services/keepstore/gocheck_test.go [new file with mode: 0644]
services/keepstore/handlers_with_generic_volume_test.go
services/keepstore/keepstore_test.go
services/keepstore/pull_worker_test.go
services/keepstore/s3_volume.go [new file with mode: 0644]
services/keepstore/s3_volume_test.go [new file with mode: 0644]
services/keepstore/volume_generic_test.go
services/keepstore/volume_unix_test.go

index 3256ec27a2572c0d9889ab1067dc43845075c540..47b75b384577e50a244355b8ef3dd35ba92c20ab 100644 (file)
@@ -24,3 +24,12 @@ const PathologicalManifest = ". acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a751
        `./foo\040b\141r acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:b\141z\040w\141z` + "\n" +
        "./foo acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:zero 0:3:foo\n" +
        ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:foo/zero 0:3:foo/foo\n"
+
+// An MD5 collision.
+var (
+       MD5CollisionData = [][]byte{
+               []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef"),
+               []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef"),
+       }
+       MD5CollisionMD5 = "cee9a457e790cf20d4bdaa6d69f01e41"
+)
index e9fda2ab76dd41c6cdde36cb139e3f3f030c273b..0f98e6e3e238e2d4e4b97c140506760509d37c34 100644 (file)
@@ -189,7 +189,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
        return compareReaderWithBuf(rdr, expect, loc[:32])
 }
 
-// Put sotres a Keep block as a block blob in the container.
+// Put stores a Keep block as a block blob in the container.
 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
        if v.readonly {
                return MethodDisabledError
index a240c23e1622b525f62a6957a23c025651f94190..b8bf5cb4a1829b01a13d0cbf90164d1a1a5a6c56 100644 (file)
@@ -292,10 +292,10 @@ type TestableAzureBlobVolume struct {
        *AzureBlobVolume
        azHandler *azStubHandler
        azStub    *httptest.Server
-       t         *testing.T
+       t         TB
 }
 
-func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) *TestableAzureBlobVolume {
+func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableAzureBlobVolume {
        azHandler := newAzStubHandler()
        azStub := httptest.NewServer(azHandler)
 
@@ -341,7 +341,7 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) {
        }
        azureWriteRaceInterval = time.Millisecond
        azureWriteRacePollTime = time.Nanosecond
-       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+       DoGenericVolumeTests(t, func(t TB) TestableVolume {
                return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
        })
 }
@@ -355,7 +355,7 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
        }
        azureWriteRaceInterval = time.Millisecond
        azureWriteRacePollTime = time.Nanosecond
-       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+       DoGenericVolumeTests(t, func(t TB) TestableVolume {
                return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
        })
 }
index 8726a19150c7faf2f309cc6b3ba647d9aa40dd54..7b51b643cea54ba0b5643a2bf84c4ccfed9bcd5f 100644 (file)
@@ -2,15 +2,9 @@ package main
 
 import (
        . "gopkg.in/check.v1"
-       "testing"
        "time"
 )
 
-// Gocheck boilerplate
-func TestBufferPool(t *testing.T) {
-       TestingT(t)
-}
-
 var _ = Suite(&BufferPoolSuite{})
 
 type BufferPoolSuite struct{}
index 379daddd9803b945b68b6cbf290660c282a2bb2c..d9b7e614fc8aded50990ce41591dcb3a7685be09 100644 (file)
@@ -2,17 +2,11 @@ package main
 
 import (
        "bytes"
-       "testing"
        "testing/iotest"
 
        check "gopkg.in/check.v1"
 )
 
-// Gocheck boilerplate
-func Test(t *testing.T) {
-       check.TestingT(t)
-}
-
 var _ = check.Suite(&CollisionSuite{})
 
 type CollisionSuite struct{}
diff --git a/services/keepstore/gocheck_test.go b/services/keepstore/gocheck_test.go
new file mode 100644 (file)
index 0000000..133ed6e
--- /dev/null
@@ -0,0 +1,10 @@
+package main
+
+import (
+       "gopkg.in/check.v1"
+       "testing"
+)
+
+func TestGocheck(t *testing.T) {
+       check.TestingT(t)
+}
index 9f31f5f6fe5f31d38745046f7a771982f6b2ad00..c5349d399c32ebc5692d0a39d4cc8c9c3ad83e1a 100644 (file)
@@ -2,19 +2,18 @@ package main
 
 import (
        "bytes"
-       "testing"
 )
 
 // A TestableVolumeManagerFactory creates a volume manager with at least two TestableVolume instances.
 // The factory function, and the TestableVolume instances it returns, can use "t" to write
 // logs, fail the current test, etc.
-type TestableVolumeManagerFactory func(t *testing.T) (*RRVolumeManager, []TestableVolume)
+type TestableVolumeManagerFactory func(t TB) (*RRVolumeManager, []TestableVolume)
 
 // DoHandlersWithGenericVolumeTests runs a set of handler tests with a
 // Volume Manager comprised of TestableVolume instances.
 // It calls factory to create a volume manager with TestableVolume
 // instances for each test case, to avoid leaking state between tests.
-func DoHandlersWithGenericVolumeTests(t *testing.T, factory TestableVolumeManagerFactory) {
+func DoHandlersWithGenericVolumeTests(t TB, factory TestableVolumeManagerFactory) {
        testGetBlock(t, factory, TestHash, TestBlock)
        testGetBlock(t, factory, EmptyHash, EmptyBlock)
        testPutRawBadDataGetBlock(t, factory, TestHash, TestBlock, []byte("baddata"))
@@ -26,7 +25,7 @@ func DoHandlersWithGenericVolumeTests(t *testing.T, factory TestableVolumeManage
 }
 
 // Setup RRVolumeManager with TestableVolumes
-func setupHandlersWithGenericVolumeTest(t *testing.T, factory TestableVolumeManagerFactory) []TestableVolume {
+func setupHandlersWithGenericVolumeTest(t TB, factory TestableVolumeManagerFactory) []TestableVolume {
        vm, testableVolumes := factory(t)
        KeepVM = vm
 
@@ -39,7 +38,7 @@ func setupHandlersWithGenericVolumeTest(t *testing.T, factory TestableVolumeMana
 }
 
 // Put a block using PutRaw in just one volume and Get it using GetBlock
-func testGetBlock(t *testing.T, factory TestableVolumeManagerFactory, testHash string, testBlock []byte) {
+func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, testBlock []byte) {
        testableVolumes := setupHandlersWithGenericVolumeTest(t, factory)
 
        // Put testBlock in one volume
@@ -56,7 +55,7 @@ func testGetBlock(t *testing.T, factory TestableVolumeManagerFactory, testHash s
 }
 
 // Put a bad block using PutRaw and get it.
-func testPutRawBadDataGetBlock(t *testing.T, factory TestableVolumeManagerFactory,
+func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory,
        testHash string, testBlock []byte, badData []byte) {
        testableVolumes := setupHandlersWithGenericVolumeTest(t, factory)
 
@@ -72,7 +71,7 @@ func testPutRawBadDataGetBlock(t *testing.T, factory TestableVolumeManagerFactor
 }
 
 // Invoke PutBlock twice to ensure CompareAndTouch path is tested.
-func testPutBlock(t *testing.T, factory TestableVolumeManagerFactory, testHash string, testBlock []byte) {
+func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, testBlock []byte) {
        setupHandlersWithGenericVolumeTest(t, factory)
 
        // PutBlock
@@ -95,7 +94,7 @@ func testPutBlock(t *testing.T, factory TestableVolumeManagerFactory, testHash s
 }
 
 // Put a bad block using PutRaw, overwrite it using PutBlock and get it.
-func testPutBlockCorrupt(t *testing.T, factory TestableVolumeManagerFactory,
+func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory,
        testHash string, testBlock []byte, badData []byte) {
        testableVolumes := setupHandlersWithGenericVolumeTest(t, factory)
 
index 8a004b73d44913b562574170b21b19dc780b8ff2..2a1c3d243ab922855b2bf6344f69631a78272662 100644 (file)
@@ -10,6 +10,8 @@ import (
        "sort"
        "strings"
        "testing"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 )
 
 var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
@@ -229,9 +231,9 @@ func TestPutBlockCollision(t *testing.T) {
        defer teardown()
 
        // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
-       var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
-       var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
-       var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
+       b1 := arvadostest.MD5CollisionData[0]
+       b2 := arvadostest.MD5CollisionData[1]
+       locator := arvadostest.MD5CollisionMD5
 
        // Prepare two test Keep volumes.
        KeepVM = MakeTestVolumeManager(2)
index c6a41953842b6c2ab9403c84c04d17cee839fabd..5076b85e20703e8305b74ab4ce5a048cd17a81b0 100644 (file)
@@ -8,20 +8,13 @@ import (
        . "gopkg.in/check.v1"
        "io"
        "net/http"
-       "testing"
        "time"
 )
 
-type PullWorkerTestSuite struct{}
-
-// Gocheck boilerplate
-func TestPullWorker(t *testing.T) {
-       TestingT(t)
-}
-
-// Gocheck boilerplate
 var _ = Suite(&PullWorkerTestSuite{})
 
+type PullWorkerTestSuite struct{}
+
 var testPullLists map[string]string
 var readContent string
 var readError error
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
new file mode 100644 (file)
index 0000000..572ee46
--- /dev/null
@@ -0,0 +1,312 @@
+package main
+
+import (
+       "encoding/base64"
+       "encoding/hex"
+       "flag"
+       "fmt"
+       "io"
+       "log"
+       "net/http"
+       "os"
+       "regexp"
+       "time"
+
+       "github.com/AdRoll/goamz/aws"
+       "github.com/AdRoll/goamz/s3"
+)
+
+var (
+       ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
+
+       s3AccessKeyFile string
+       s3SecretKeyFile string
+       s3RegionName    string
+       s3Endpoint      string
+       s3Replication   int
+       s3UnsafeDelete  bool
+
+       s3ACL = s3.Private
+)
+
+const (
+       maxClockSkew  = 600 * time.Second
+       nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
+)
+
+type s3VolumeAdder struct {
+       *volumeSet
+}
+
+func (s *s3VolumeAdder) Set(bucketName string) error {
+       if bucketName == "" {
+               return fmt.Errorf("no container name given")
+       }
+       if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
+               return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
+       }
+       region, ok := aws.Regions[s3RegionName]
+       if s3Endpoint == "" {
+               if !ok {
+                       return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
+               }
+       } else {
+               if ok {
+                       return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
+                               "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
+               }
+               region = aws.Region{
+                       Name:       s3RegionName,
+                       S3Endpoint: s3Endpoint,
+               }
+       }
+       var err error
+       var auth aws.Auth
+       auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
+       if err != nil {
+               return err
+       }
+       auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
+       if err != nil {
+               return err
+       }
+       if flagSerializeIO {
+               log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
+       }
+       v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
+       if err := v.Check(); err != nil {
+               return err
+       }
+       *s.volumeSet = append(*s.volumeSet, v)
+       return nil
+}
+
+func s3regions() (okList []string) {
+       for r, _ := range aws.Regions {
+               okList = append(okList, r)
+       }
+       return
+}
+
+func init() {
+       flag.Var(&s3VolumeAdder{&volumes},
+               "s3-bucket-volume",
+               "Use the given bucket as a storage volume. Can be given multiple times.")
+       flag.StringVar(
+               &s3RegionName,
+               "s3-region",
+               "",
+               fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
+       flag.StringVar(
+               &s3Endpoint,
+               "s3-endpoint",
+               "",
+               "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
+       flag.StringVar(
+               &s3AccessKeyFile,
+               "s3-access-key-file",
+               "",
+               "File containing the access key used for subsequent -s3-bucket-volume arguments.")
+       flag.StringVar(
+               &s3SecretKeyFile,
+               "s3-secret-key-file",
+               "",
+               "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
+       flag.IntVar(
+               &s3Replication,
+               "s3-replication",
+               2,
+               "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
+       flag.BoolVar(
+               &s3UnsafeDelete,
+               "s3-unsafe-delete",
+               false,
+               "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
+}
+
+type S3Volume struct {
+       *s3.Bucket
+       readonly      bool
+       replication   int
+       indexPageSize int
+}
+
+// NewS3Volume returns a new S3Volume using the given auth, region,
+// and bucket name. The replication argument specifies the replication
+// level to report when writing data.
+func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
+       return &S3Volume{
+               Bucket: &s3.Bucket{
+                       S3:   s3.New(auth, region),
+                       Name: bucket,
+               },
+               readonly:      readonly,
+               replication:   replication,
+               indexPageSize: 1000,
+       }
+}
+
+func (v *S3Volume) Check() error {
+       return nil
+}
+
+func (v *S3Volume) Get(loc string) ([]byte, error) {
+       rdr, err := v.Bucket.GetReader(loc)
+       if err != nil {
+               return nil, v.translateError(err)
+       }
+       defer rdr.Close()
+       buf := bufs.Get(BlockSize)
+       n, err := io.ReadFull(rdr, buf)
+       switch err {
+       case nil, io.EOF, io.ErrUnexpectedEOF:
+               return buf[:n], nil
+       default:
+               bufs.Put(buf)
+               return nil, v.translateError(err)
+       }
+}
+
+func (v *S3Volume) Compare(loc string, expect []byte) error {
+       rdr, err := v.Bucket.GetReader(loc)
+       if err != nil {
+               return v.translateError(err)
+       }
+       defer rdr.Close()
+       return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
+}
+
+func (v *S3Volume) Put(loc string, block []byte) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
+       var opts s3.Options
+       if len(block) > 0 {
+               md5, err := hex.DecodeString(loc)
+               if err != nil {
+                       return err
+               }
+               opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
+       }
+       return v.translateError(
+               v.Bucket.Put(
+                       loc, block, "application/octet-stream", s3ACL, opts))
+}
+
+func (v *S3Volume) Touch(loc string) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
+       result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
+               ContentType:       "application/octet-stream",
+               MetadataDirective: "REPLACE",
+       }, v.Bucket.Name+"/"+loc)
+       if err != nil {
+               return v.translateError(err)
+       }
+       t, err := time.Parse(time.RFC3339, result.LastModified)
+       if err != nil {
+               return err
+       }
+       if time.Since(t) > maxClockSkew {
+               return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
+       }
+       return nil
+}
+
+func (v *S3Volume) Mtime(loc string) (time.Time, error) {
+       resp, err := v.Bucket.Head(loc, nil)
+       if err != nil {
+               return zeroTime, v.translateError(err)
+       }
+       hdr := resp.Header.Get("Last-Modified")
+       t, err := time.Parse(time.RFC1123, hdr)
+       if err != nil && hdr != "" {
+               // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
+               // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
+               // as required by HTTP spec. If it's not a valid HTTP
+               // header value, it's probably AWS (or s3test) giving
+               // us a nearly-RFC1123 timestamp.
+               t, err = time.Parse(nearlyRFC1123, hdr)
+       }
+       return t, err
+}
+
+func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
+       nextMarker := ""
+       for {
+               listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
+               if err != nil {
+                       return err
+               }
+               for _, key := range listResp.Contents {
+                       t, err := time.Parse(time.RFC3339, key.LastModified)
+                       if err != nil {
+                               return err
+                       }
+                       if !v.isKeepBlock(key.Key) {
+                               continue
+                       }
+                       fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
+               }
+               if !listResp.IsTruncated {
+                       break
+               }
+               nextMarker = listResp.NextMarker
+       }
+       return nil
+}
+
+func (v *S3Volume) Delete(loc string) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
+       if t, err := v.Mtime(loc); err != nil {
+               return err
+       } else if time.Since(t) < blobSignatureTTL {
+               return nil
+       }
+       if !s3UnsafeDelete {
+               return ErrS3DeleteNotAvailable
+       }
+       return v.Bucket.Del(loc)
+}
+
+func (v *S3Volume) Status() *VolumeStatus {
+       return &VolumeStatus{
+               DeviceNum: 1,
+               BytesFree: BlockSize * 1000,
+               BytesUsed: 1,
+       }
+}
+
+func (v *S3Volume) String() string {
+       return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
+}
+
+func (v *S3Volume) Writable() bool {
+       return !v.readonly
+}
+func (v *S3Volume) Replication() int {
+       return v.replication
+}
+
+var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+
+func (v *S3Volume) isKeepBlock(s string) bool {
+       return s3KeepBlockRegexp.MatchString(s)
+}
+
+func (v *S3Volume) translateError(err error) error {
+       switch err := err.(type) {
+       case *s3.Error:
+               if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
+                       return os.ErrNotExist
+               }
+               // Other 404 errors like NoSuchVersion and
+               // NoSuchBucket are different problems which should
+               // get called out downstream, so we don't convert them
+               // to os.ErrNotExist.
+       }
+       return err
+}
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
new file mode 100644 (file)
index 0000000..0c2cd49
--- /dev/null
@@ -0,0 +1,133 @@
+package main
+
+import (
+       "bytes"
+       "fmt"
+       "log"
+       "strings"
+       "time"
+
+       "github.com/AdRoll/goamz/aws"
+       "github.com/AdRoll/goamz/s3"
+       "github.com/AdRoll/goamz/s3/s3test"
+       check "gopkg.in/check.v1"
+)
+
+type TestableS3Volume struct {
+       *S3Volume
+       server      *s3test.Server
+       c           *check.C
+       serverClock *fakeClock
+}
+
+const (
+       TestBucketName = "testbucket"
+)
+
+type fakeClock struct {
+       now *time.Time
+}
+
+func (c *fakeClock) Now() time.Time {
+       if c.now == nil {
+               return time.Now()
+       }
+       return *c.now
+}
+
+func init() {
+       // Deleting isn't safe from races, but if it's turned on
+       // anyway we do expect it to pass the generic volume tests.
+       s3UnsafeDelete = true
+}
+
+func NewTestableS3Volume(c *check.C, readonly bool, replication int) *TestableS3Volume {
+       clock := &fakeClock{}
+       srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
+       c.Assert(err, check.IsNil)
+       auth := aws.Auth{}
+       region := aws.Region{
+               Name:                 "test-region-1",
+               S3Endpoint:           srv.URL(),
+               S3LocationConstraint: true,
+       }
+       bucket := &s3.Bucket{
+               S3:   s3.New(auth, region),
+               Name: TestBucketName,
+       }
+       err = bucket.PutBucket(s3.ACL("private"))
+       c.Assert(err, check.IsNil)
+
+       return &TestableS3Volume{
+               S3Volume:    NewS3Volume(auth, region, TestBucketName, readonly, replication),
+               server:      srv,
+               serverClock: clock,
+       }
+}
+
+var _ = check.Suite(&StubbedS3Suite{})
+
+type StubbedS3Suite struct {
+       volumes []*TestableS3Volume
+}
+
+func (s *StubbedS3Suite) TestGeneric(c *check.C) {
+       DoGenericVolumeTests(c, func(t TB) TestableVolume {
+               return NewTestableS3Volume(c, false, 2)
+       })
+}
+
+func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
+       DoGenericVolumeTests(c, func(t TB) TestableVolume {
+               return NewTestableS3Volume(c, true, 2)
+       })
+}
+
+func (s *StubbedS3Suite) TestIndex(c *check.C) {
+       v := NewTestableS3Volume(c, false, 2)
+       v.indexPageSize = 3
+       for i := 0; i < 256; i++ {
+               v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+       }
+       for _, spec := range []struct {
+               prefix      string
+               expectMatch int
+       }{
+               {"", 256},
+               {"c", 16},
+               {"bc", 1},
+               {"abc", 0},
+       } {
+               buf := new(bytes.Buffer)
+               err := v.IndexTo(spec.prefix, buf)
+               c.Check(err, check.IsNil)
+
+               idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
+               c.Check(len(idx), check.Equals, spec.expectMatch+1)
+               c.Check(len(idx[len(idx)-1]), check.Equals, 0)
+       }
+}
+
+// PutRaw skips the ContentMD5 test
+func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
+       err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+       if err != nil {
+               log.Printf("PutRaw: %+v", err)
+       }
+}
+
+// TouchWithDate turns back the clock while doing a Touch(). We assume
+// there are no other operations happening on the same s3test server
+// while we do this.
+func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
+       v.serverClock.now = &lastPut
+       err := v.Touch(locator)
+       if err != nil && !strings.Contains(err.Error(), "PutCopy returned old LastModified") {
+               log.Printf("Touch: %+v", err)
+       }
+       v.serverClock.now = nil
+}
+
+func (v *TestableS3Volume) Teardown() {
+       v.server.Quit()
+}
index 61088f10fa2d4ef30e969a77e107b824073684e6..fae4a9ee807d5c799b5b27b61bee5ddc1bc21bab 100644 (file)
@@ -8,19 +8,32 @@ import (
        "regexp"
        "sort"
        "strings"
-       "testing"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 )
 
+type TB interface {
+        Error(args ...interface{})
+        Errorf(format string, args ...interface{})
+        Fail()
+        FailNow()
+        Failed() bool
+        Fatal(args ...interface{})
+        Fatalf(format string, args ...interface{})
+        Log(args ...interface{})
+        Logf(format string, args ...interface{})
+}
+
 // A TestableVolumeFactory returns a new TestableVolume. The factory
 // function, and the TestableVolume it returns, can use "t" to write
 // logs, fail the current test, etc.
-type TestableVolumeFactory func(t *testing.T) TestableVolume
+type TestableVolumeFactory func(t TB) TestableVolume
 
 // DoGenericVolumeTests runs a set of tests that every TestableVolume
 // is expected to pass. It calls factory to create a new TestableVolume
 // for each test case, to avoid leaking state between tests.
-func DoGenericVolumeTests(t *testing.T, factory TestableVolumeFactory) {
+func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
        testGet(t, factory)
        testGetNoSuchBlock(t, factory)
 
@@ -36,10 +49,10 @@ func DoGenericVolumeTests(t *testing.T, factory TestableVolumeFactory) {
 
        testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
        testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
-       testPutBlockWithDifferentContent(t, factory, TestHash, TestBlock, TestBlock2)
-       testPutBlockWithDifferentContent(t, factory, TestHash, EmptyBlock, TestBlock)
-       testPutBlockWithDifferentContent(t, factory, TestHash, TestBlock, EmptyBlock)
-       testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, TestBlock)
+       testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
+       testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
+       testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
+       testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
        testPutMultipleBlocks(t, factory)
 
        testPutAndTouch(t, factory)
@@ -67,7 +80,7 @@ func DoGenericVolumeTests(t *testing.T, factory TestableVolumeFactory) {
 
 // Put a test block, get it and verify content
 // Test should pass for both writable and read-only volumes
-func testGet(t *testing.T, factory TestableVolumeFactory) {
+func testGet(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -87,7 +100,7 @@ func testGet(t *testing.T, factory TestableVolumeFactory) {
 
 // Invoke get on a block that does not exist in volume; should result in error
 // Test should pass for both writable and read-only volumes
-func testGetNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -99,7 +112,7 @@ func testGetNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
 // Compare() should return os.ErrNotExist if the block does not exist.
 // Otherwise, writing new data causes CompareAndTouch() to generate
 // error logs even though everything is working fine.
-func testCompareNonexistent(t *testing.T, factory TestableVolumeFactory) {
+func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -111,7 +124,7 @@ func testCompareNonexistent(t *testing.T, factory TestableVolumeFactory) {
 
 // Put a test block and compare the locator with same content
 // Test should pass for both writable and read-only volumes
-func testCompareSameContent(t *testing.T, factory TestableVolumeFactory, testHash string, testData []byte) {
+func testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
        v := factory(t)
        defer v.Teardown()
 
@@ -129,7 +142,7 @@ func testCompareSameContent(t *testing.T, factory TestableVolumeFactory, testHas
 // testHash = md5(testDataA).
 //
 // Test should pass for both writable and read-only volumes
-func testCompareWithCollision(t *testing.T, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
+func testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
        v := factory(t)
        defer v.Teardown()
 
@@ -146,7 +159,7 @@ func testCompareWithCollision(t *testing.T, factory TestableVolumeFactory, testH
 // corrupted. Requires testHash = md5(testDataA) != md5(testDataB).
 //
 // Test should pass for both writable and read-only volumes
-func testCompareWithCorruptStoredData(t *testing.T, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
+func testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
        v := factory(t)
        defer v.Teardown()
 
@@ -160,7 +173,7 @@ func testCompareWithCorruptStoredData(t *testing.T, factory TestableVolumeFactor
 
 // Put a block and put again with same content
 // Test is intended for only writable volumes
-func testPutBlockWithSameContent(t *testing.T, factory TestableVolumeFactory, testHash string, testData []byte) {
+func testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
        v := factory(t)
        defer v.Teardown()
 
@@ -181,7 +194,7 @@ func testPutBlockWithSameContent(t *testing.T, factory TestableVolumeFactory, te
 
 // Put a block and put again with different content
 // Test is intended for only writable volumes
-func testPutBlockWithDifferentContent(t *testing.T, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
+func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
        v := factory(t)
        defer v.Teardown()
 
@@ -189,10 +202,7 @@ func testPutBlockWithDifferentContent(t *testing.T, factory TestableVolumeFactor
                return
        }
 
-       err := v.Put(testHash, testDataA)
-       if err != nil {
-               t.Errorf("Got err putting block %q: %q, expected nil", testDataA, err)
-       }
+       v.PutRaw(testHash, testDataA)
 
        putErr := v.Put(testHash, testDataB)
        buf, getErr := v.Get(testHash)
@@ -217,7 +227,7 @@ func testPutBlockWithDifferentContent(t *testing.T, factory TestableVolumeFactor
 
 // Put and get multiple blocks
 // Test is intended for only writable volumes
-func testPutMultipleBlocks(t *testing.T, factory TestableVolumeFactory) {
+func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -275,7 +285,7 @@ func testPutMultipleBlocks(t *testing.T, factory TestableVolumeFactory) {
 //   Test that when applying PUT to a block that already exists,
 //   the block's modification time is updated.
 // Test is intended for only writable volumes
-func testPutAndTouch(t *testing.T, factory TestableVolumeFactory) {
+func testPutAndTouch(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -317,7 +327,7 @@ func testPutAndTouch(t *testing.T, factory TestableVolumeFactory) {
 
 // Touching a non-existing block should result in error.
 // Test should pass for both writable and read-only volumes
-func testTouchNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+func testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -328,7 +338,7 @@ func testTouchNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
 
 // Invoking Mtime on a non-existing block should result in error.
 // Test should pass for both writable and read-only volumes
-func testMtimeNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+func testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -342,7 +352,7 @@ func testMtimeNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
 // * with a prefix
 // * with no such prefix
 // Test should pass for both writable and read-only volumes
-func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
+func testIndexTo(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -399,7 +409,7 @@ func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
 // Calling Delete() for a block immediately after writing it (not old enough)
 // should neither delete the data nor return an error.
 // Test is intended for only writable volumes
-func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
+func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
        blobSignatureTTL = 300 * time.Second
@@ -427,7 +437,7 @@ func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
 // Calling Delete() for a block with a timestamp older than
 // blobSignatureTTL seconds in the past should delete the data.
 // Test is intended for only writable volumes
-func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
+func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
        blobSignatureTTL = 300 * time.Second
@@ -449,7 +459,7 @@ func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
 
 // Calling Delete() for a block that does not exist should result in error.
 // Test should pass for both writable and read-only volumes
-func testDeleteNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -460,7 +470,7 @@ func testDeleteNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
 
 // Invoke Status and verify that VolumeStatus is returned
 // Test should pass for both writable and read-only volumes
-func testStatus(t *testing.T, factory TestableVolumeFactory) {
+func testStatus(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -481,7 +491,7 @@ func testStatus(t *testing.T, factory TestableVolumeFactory) {
 
 // Invoke String for the volume; expect non-empty result
 // Test should pass for both writable and read-only volumes
-func testString(t *testing.T, factory TestableVolumeFactory) {
+func testString(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -492,7 +502,7 @@ func testString(t *testing.T, factory TestableVolumeFactory) {
 
 // Putting, updating, touching, and deleting blocks from a read-only volume result in error.
 // Test is intended for only read-only volumes
-func testUpdateReadOnly(t *testing.T, factory TestableVolumeFactory) {
+func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -539,7 +549,7 @@ func testUpdateReadOnly(t *testing.T, factory TestableVolumeFactory) {
 
 // Launch concurrent Gets
 // Test should pass for both writable and read-only volumes
-func testGetConcurrent(t *testing.T, factory TestableVolumeFactory) {
+func testGetConcurrent(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -592,7 +602,7 @@ func testGetConcurrent(t *testing.T, factory TestableVolumeFactory) {
 
 // Launch concurrent Puts
 // Test is intended for only writable volumes
-func testPutConcurrent(t *testing.T, factory TestableVolumeFactory) {
+func testPutConcurrent(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
@@ -660,7 +670,7 @@ func testPutConcurrent(t *testing.T, factory TestableVolumeFactory) {
 }
 
 // Write and read back a full size block
-func testPutFullBlock(t *testing.T, factory TestableVolumeFactory) {
+func testPutFullBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
index 924637f58e5004f1cec307266c87c0b53ae81d03..b216810f8cb0fc1008e4bb7bc99b3c306728d70a 100644 (file)
@@ -16,10 +16,10 @@ import (
 
 type TestableUnixVolume struct {
        UnixVolume
-       t *testing.T
+       t TB
 }
 
-func NewTestableUnixVolume(t *testing.T, serialize bool, readonly bool) *TestableUnixVolume {
+func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
        d, err := ioutil.TempDir("", "volume_test")
        if err != nil {
                t.Fatal(err)
@@ -66,28 +66,28 @@ func (v *TestableUnixVolume) Teardown() {
 
 // serialize = false; readonly = false
 func TestUnixVolumeWithGenericTests(t *testing.T) {
-       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+       DoGenericVolumeTests(t, func(t TB) TestableVolume {
                return NewTestableUnixVolume(t, false, false)
        })
 }
 
 // serialize = false; readonly = true
 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
-       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+       DoGenericVolumeTests(t, func(t TB) TestableVolume {
                return NewTestableUnixVolume(t, false, true)
        })
 }
 
 // serialize = true; readonly = false
 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
-       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+       DoGenericVolumeTests(t, func(t TB) TestableVolume {
                return NewTestableUnixVolume(t, true, false)
        })
 }
 
 // serialize = false; readonly = false
 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
-       DoHandlersWithGenericVolumeTests(t, func(t *testing.T) (*RRVolumeManager, []TestableVolume) {
+       DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
                vols := make([]Volume, 2)
                testableUnixVols := make([]TestableVolume, 2)