2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / s3aws_volume_test.go
index c7e2d485dfc6f793f107947a3340606c993f63ac..f05cbee848742a436073230eac00b767930dd9b5 100644 (file)
@@ -58,7 +58,6 @@ type StubbedS3AWSSuite struct {
        s3server *httptest.Server
        metadata *httptest.Server
        cluster  *arvados.Cluster
-       handler  *handler
        volumes  []*TestableS3AWSVolume
 }
 
@@ -70,33 +69,37 @@ func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
                "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
                "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
        }
-       s.handler = &handler{}
 }
 
 func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
-       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+       DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
                // Use a negative raceWindow so s3test's 1-second
                // timestamp precision doesn't confuse fixRace.
-               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+               return s.newTestableVolume(c, params, -2*time.Second)
        })
 }
 
 func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
-       DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
-               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+       DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
+               return s.newTestableVolume(c, params, -2*time.Second)
        })
 }
 
 func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
-       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
-               v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+       DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
+               v := s.newTestableVolume(c, params, -2*time.Second)
                v.PrefixLength = 3
                return v
        })
 }
 
 func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 0)
        v.IndexPageSize = 3
        for i := 0; i < 256; i++ {
                v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
@@ -111,7 +114,7 @@ func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
                {"abc", 0},
        } {
                buf := new(bytes.Buffer)
-               err := v.IndexTo(spec.prefix, buf)
+               err := v.Index(context.Background(), spec.prefix, buf)
                c.Check(err, check.IsNil)
 
                idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
@@ -146,7 +149,7 @@ func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
        vol.bucket.svc.ForcePathStyle = true
 
        c.Check(err, check.IsNil)
-       err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+       err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
        c.Check(err, check.IsNil)
        c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
 }
@@ -202,7 +205,12 @@ func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
 }
 
 func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 5*time.Minute)
        stats := func() string {
                buf, err := json.Marshal(v.InternalStats())
                c.Check(err, check.IsNil)
@@ -212,20 +220,20 @@ func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
 
        loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       _, err := v.Get(context.Background(), loc, make([]byte, 3))
+       _, err := v.BlockRead(context.Background(), loc, io.Discard)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
        c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
        c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
 
-       err = v.Put(context.Background(), loc, []byte("foo"))
+       err = v.BlockWrite(context.Background(), loc, []byte("foo"))
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
        c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
 
-       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       _, err = v.BlockRead(context.Background(), loc, io.Discard)
        c.Check(err, check.IsNil)
-       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       _, err = v.BlockRead(context.Background(), loc, io.Discard)
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
@@ -251,30 +259,15 @@ func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 }
 
 func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := make([]byte, 3)
-
        s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
-               _, err := v.Get(ctx, loc, buf)
+               _, err := v.BlockRead(ctx, fooHash, io.Discard)
                return err
        })
 }
 
-func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := []byte("bar")
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
-               return v.Compare(ctx, loc, buf)
-       })
-}
-
 func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := []byte("foo")
-
        s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
-               return v.Put(ctx, loc, buf)
+               return v.BlockWrite(ctx, fooHash, []byte("foo"))
        })
 }
 
@@ -283,7 +276,12 @@ func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.
        s.s3server = httptest.NewServer(handler)
        defer s.s3server.Close()
 
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 5*time.Minute)
 
        ctx, cancel := context.WithCancel(context.Background())
 
@@ -323,7 +321,13 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
        s.cluster.Collections.BlobTrashLifetime.Set("1h")
        s.cluster.Collections.BlobSigningTTL.Set("1h")
 
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       v := s.newTestableVolume(c, newVolumeParams{
+               Cluster:      s.cluster,
+               ConfigVolume: arvados.Volume{Replication: 2},
+               Logger:       ctxlog.TestLogger(c),
+               MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+               BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
+       }, 5*time.Minute)
        var none time.Time
 
        putS3Obj := func(t time.Time, key string, data []byte) {
@@ -475,8 +479,7 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
 
                        // Check canGet
                        loc, blk := setupScenario()
-                       buf := make([]byte, len(blk))
-                       _, err := v.Get(context.Background(), loc, buf)
+                       _, err := v.BlockRead(context.Background(), loc, io.Discard)
                        c.Check(err == nil, check.Equals, scenario.canGet)
                        if err != nil {
                                c.Check(os.IsNotExist(err), check.Equals, true)
@@ -484,9 +487,9 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
 
                        // Call Trash, then check canTrash and canGetAfterTrash
                        loc, _ = setupScenario()
-                       err = v.Trash(loc)
+                       err = v.BlockTrash(loc)
                        c.Check(err == nil, check.Equals, scenario.canTrash)
-                       _, err = v.Get(context.Background(), loc, buf)
+                       _, err = v.BlockRead(context.Background(), loc, io.Discard)
                        c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
                        if err != nil {
                                c.Check(os.IsNotExist(err), check.Equals, true)
@@ -494,14 +497,14 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
 
                        // Call Untrash, then check canUntrash
                        loc, _ = setupScenario()
-                       err = v.Untrash(loc)
+                       err = v.BlockUntrash(loc)
                        c.Check(err == nil, check.Equals, scenario.canUntrash)
                        if scenario.dataT != none || scenario.trashT != none {
                                // In all scenarios where the data exists, we
                                // should be able to Get after Untrash --
                                // regardless of timestamps, errors, race
                                // conditions, etc.
-                               _, err = v.Get(context.Background(), loc, buf)
+                               _, err = v.BlockRead(context.Background(), loc, io.Discard)
                                c.Check(err, check.IsNil)
                        }
 
@@ -522,7 +525,7 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
                        // Check for current Mtime after Put (applies to all
                        // scenarios)
                        loc, blk = setupScenario()
-                       err = v.Put(context.Background(), loc, blk)
+                       err = v.BlockWrite(context.Background(), loc, blk)
                        c.Check(err, check.IsNil)
                        t, err := v.Mtime(loc)
                        c.Check(err, check.IsNil)
@@ -555,7 +558,7 @@ func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
        }
 }
 
-func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
+func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *TestableS3AWSVolume {
 
        clock := &s3AWSFakeClock{}
        // fake s3
@@ -591,10 +594,11 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Clust
                                UnsafeDelete:       true,
                                IndexPageSize:      1000,
                        },
-                       cluster: cluster,
-                       volume:  volume,
-                       logger:  ctxlog.TestLogger(c),
-                       metrics: metrics,
+                       cluster:    params.Cluster,
+                       volume:     params.ConfigVolume,
+                       logger:     params.Logger,
+                       metrics:    params.MetricsVecs,
+                       bufferPool: params.BufferPool,
                },
                c:           c,
                server:      srv,
@@ -619,7 +623,7 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Clust
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
        key := v.key(loc)
-       r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
+       r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
 
        uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
                u.PartSize = 5 * 1024 * 1024