21705: Migrate keepstore s3 driver to latest aws-sdk-go-v2.
[arvados.git] / services / keepstore / s3_volume_test.go
index fb68e1c0574c338e9c016404e456f623acdcb477..50010b3bef4832ccdec7942c8e66a48be5cfc12f 100644 (file)
@@ -15,14 +15,15 @@ import (
        "net/http/httptest"
        "os"
        "strings"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
 
        "github.com/aws/aws-sdk-go-v2/aws"
+       "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
        "github.com/aws/aws-sdk-go-v2/service/s3"
-       "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
 
        "github.com/johannesboyne/gofakes3"
        "github.com/johannesboyne/gofakes3/backend/s3mem"
@@ -31,22 +32,18 @@ import (
        check "gopkg.in/check.v1"
 )
 
-const (
-       s3TestBucketName = "testbucket"
-)
-
-type s3AWSFakeClock struct {
+type s3fakeClock struct {
        now *time.Time
 }
 
-func (c *s3AWSFakeClock) Now() time.Time {
+func (c *s3fakeClock) Now() time.Time {
        if c.now == nil {
                return time.Now().UTC()
        }
        return c.now.UTC()
 }
 
-func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
+func (c *s3fakeClock) Since(t time.Time) time.Duration {
        return c.Now().Sub(t)
 }
 
@@ -55,14 +52,16 @@ var _ = check.Suite(&stubbedS3Suite{})
 var srv httptest.Server
 
 type stubbedS3Suite struct {
-       s3server *httptest.Server
-       metadata *httptest.Server
-       cluster  *arvados.Cluster
-       volumes  []*testableS3Volume
+       s3server    *httptest.Server
+       s3fakeClock *s3fakeClock
+       metadata    *httptest.Server
+       cluster     *arvados.Cluster
+       volumes     []*testableS3Volume
 }
 
 func (s *stubbedS3Suite) SetUpTest(c *check.C) {
        s.s3server = nil
+       s.s3fakeClock = &s3fakeClock{}
        s.metadata = nil
        s.cluster = testCluster(c)
        s.cluster.Volumes = map[string]arvados.Volume{
@@ -71,6 +70,12 @@ func (s *stubbedS3Suite) SetUpTest(c *check.C) {
        }
 }
 
+func (s *stubbedS3Suite) TearDownTest(c *check.C) {
+       if s.s3server != nil {
+               s.s3server.Close()
+       }
+}
+
 func (s *stubbedS3Suite) TestGeneric(c *check.C) {
        DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
                // Use a negative raceWindow so s3test's 1-second
@@ -145,9 +150,9 @@ func (s *stubbedS3Suite) TestSignature(c *check.C) {
                logger:  ctxlog.TestLogger(c),
                metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
        }
-       err := vol.check("")
        // Our test S3 server uses the older 'Path Style'
-       vol.bucket.svc.ForcePathStyle = true
+       vol.usePathStyle = true
+       err := vol.check("")
 
        c.Check(err, check.IsNil)
        err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
@@ -156,20 +161,37 @@ func (s *stubbedS3Suite) TestSignature(c *check.C) {
 }
 
 func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
+       var reqHeader http.Header
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               reqHeader = r.Header
+       }))
+       defer stub.Close()
+
+       retrievedMetadata := false
        s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               retrievedMetadata = true
                upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
                exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
-               // Literal example from
-               // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
-               // but with updated timestamps
-               io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+               c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
+               switch {
+               case r.URL.Path == "/latest/meta-data/iam/security-credentials/":
+                       io.WriteString(w, "testcredential\n")
+               case r.URL.Path == "/latest/api/token",
+                       r.URL.Path == "/latest/meta-data/iam/security-credentials/testcredential":
+                       // Literal example from
+                       // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+                       // but with updated timestamps
+                       io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+               default:
+                       w.WriteHeader(http.StatusNotFound)
+               }
        }))
        defer s.metadata.Close()
 
        v := &s3Volume{
                S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
                        IAMRole:  s.metadata.URL + "/latest/api/token",
-                       Endpoint: "http://localhost:12345",
+                       Endpoint: stub.URL,
                        Region:   "test-region-1",
                        Bucket:   "test-bucket-name",
                },
@@ -179,18 +201,21 @@ func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
        }
        err := v.check(s.metadata.URL + "/latest")
        c.Check(err, check.IsNil)
-       creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+       resp, err := v.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
        c.Check(err, check.IsNil)
-       c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
-       c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+       c.Check(resp.Buckets, check.HasLen, 0)
+       c.Check(retrievedMetadata, check.Equals, true)
+       c.Check(reqHeader.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 Credential=ASIAIOSFODNN7EXAMPLE/\d+/test-region-1/s3/aws4_request, SignedHeaders=.*`)
 
+       retrievedMetadata = false
        s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               retrievedMetadata = true
+               c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
                w.WriteHeader(http.StatusNotFound)
        }))
        deadv := &s3Volume{
                S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
-                       IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
-                       Endpoint: "http://localhost:12345",
+                       Endpoint: "http://localhost:9",
                        Region:   "test-region-1",
                        Bucket:   "test-bucket-name",
                },
@@ -200,9 +225,10 @@ func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
        }
        err = deadv.check(s.metadata.URL + "/latest")
        c.Check(err, check.IsNil)
-       _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
-       c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
+       _, err = deadv.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
+       c.Check(err, check.ErrorMatches, `(?s).*failed to refresh cached credentials, no EC2 IMDS role found.*`)
        c.Check(err, check.ErrorMatches, `(?s).*404.*`)
+       c.Check(retrievedMetadata, check.Equals, true)
 }
 
 func (s *stubbedS3Suite) TestStats(c *check.C) {
@@ -224,7 +250,7 @@ func (s *stubbedS3Suite) TestStats(c *check.C) {
        err := v.BlockRead(context.Background(), loc, brdiscard)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
-       c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
+       c.Check(stats(), check.Matches, `.*"\*smithy.OperationError 404 NoSuchKey":[^0].*`)
        c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
 
        err = v.BlockWrite(context.Background(), loc, []byte("foo"))
@@ -334,9 +360,9 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
                if t == none {
                        return
                }
-               v.serverClock.now = &t
-               uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
-               _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+               s.s3fakeClock.now = &t
+               uploader := manager.NewUploader(v.bucket.svc)
+               _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
                        Bucket: aws.String(v.bucket.bucket),
                        Key:    aws.String(key),
                        Body:   bytes.NewReader(data),
@@ -344,7 +370,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
                if err != nil {
                        panic(err)
                }
-               v.serverClock.now = nil
+               s.s3fakeClock.now = nil
                _, err = v.head(key)
                if err != nil {
                        panic(err)
@@ -473,14 +499,14 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
                                putS3Obj(scenario.dataT, key, blk)
                                putS3Obj(scenario.recentT, "recent/"+key, nil)
                                putS3Obj(scenario.trashT, "trash/"+key, blk)
-                               v.serverClock.now = &t0
+                               v.s3fakeClock.now = &t0
                                return loc, blk
                        }
 
                        // Check canGet
                        loc, blk := setupScenario()
                        err := v.BlockRead(context.Background(), loc, brdiscard)
-                       c.Check(err == nil, check.Equals, scenario.canGet)
+                       c.Check(err == nil, check.Equals, scenario.canGet, check.Commentf("err was %+v", err))
                        if err != nil {
                                c.Check(os.IsNotExist(err), check.Equals, true)
                        }
@@ -538,47 +564,49 @@ type testableS3Volume struct {
        *s3Volume
        server      *httptest.Server
        c           *check.C
-       serverClock *s3AWSFakeClock
+       s3fakeClock *s3fakeClock
 }
 
-type LogrusLog struct {
-       log *logrus.FieldLogger
+type gofakes3logger struct {
+       logrus.FieldLogger
 }
 
-func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
+func (l gofakes3logger) Print(level gofakes3.LogLevel, v ...interface{}) {
        switch level {
        case gofakes3.LogErr:
-               (*l.log).Errorln(v...)
+               l.Errorln(v...)
        case gofakes3.LogWarn:
-               (*l.log).Warnln(v...)
+               l.Warnln(v...)
        case gofakes3.LogInfo:
-               (*l.log).Infoln(v...)
+               l.Infoln(v...)
        default:
                panic("unknown level")
        }
 }
 
-func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
-
-       clock := &s3AWSFakeClock{}
-       // fake s3
-       backend := s3mem.New(s3mem.WithTimeSource(clock))
+var testBucketSerial atomic.Int64
 
-       // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
-       /* logger := new(LogrusLog)
-       ctxLogger := ctxlog.FromContext(context.Background())
-       logger.log = &ctxLogger */
-       faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
-       srv := httptest.NewServer(faker.Server())
-
-       endpoint := srv.URL
-       if s.s3server != nil {
-               endpoint = s.s3server.URL
+func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
+       if params.Logger == nil {
+               params.Logger = ctxlog.TestLogger(c)
        }
+       if s.s3server == nil {
+               backend := s3mem.New(s3mem.WithTimeSource(s.s3fakeClock))
+               logger := ctxlog.TestLogger(c)
+               faker := gofakes3.New(backend,
+                       gofakes3.WithTimeSource(s.s3fakeClock),
+                       gofakes3.WithLogger(gofakes3logger{FieldLogger: logger}),
+                       gofakes3.WithTimeSkewLimit(0))
+               s.s3server = httptest.NewServer(faker.Server())
+       }
+       endpoint := s.s3server.URL
+       bucketName := fmt.Sprintf("testbucket%d", testBucketSerial.Add(1))
 
-       iamRole, accessKey, secretKey := "", "xxx", "xxx"
+       var metadataURL, iamRole, accessKey, secretKey string
        if s.metadata != nil {
-               iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+               metadataURL, iamRole = s.metadata.URL, s.metadata.URL+"/fake-metadata/test-role"
+       } else {
+               accessKey, secretKey = "xxx", "xxx"
        }
 
        v := &testableS3Volume{
@@ -587,32 +615,29 @@ func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, r
                                IAMRole:            iamRole,
                                AccessKeyID:        accessKey,
                                SecretAccessKey:    secretKey,
-                               Bucket:             s3TestBucketName,
+                               Bucket:             bucketName,
                                Endpoint:           endpoint,
                                Region:             "test-region-1",
                                LocationConstraint: true,
                                UnsafeDelete:       true,
                                IndexPageSize:      1000,
                        },
-                       cluster:    params.Cluster,
-                       volume:     params.ConfigVolume,
-                       logger:     params.Logger,
-                       metrics:    params.MetricsVecs,
-                       bufferPool: params.BufferPool,
+                       cluster:      params.Cluster,
+                       volume:       params.ConfigVolume,
+                       logger:       params.Logger,
+                       metrics:      params.MetricsVecs,
+                       bufferPool:   params.BufferPool,
+                       usePathStyle: true,
                },
                c:           c,
-               server:      srv,
-               serverClock: clock,
+               s3fakeClock: s.s3fakeClock,
        }
-       c.Assert(v.s3Volume.check(""), check.IsNil)
-       // Our test S3 server uses the older 'Path Style'
-       v.s3Volume.bucket.svc.ForcePathStyle = true
+       c.Assert(v.s3Volume.check(metadataURL), check.IsNil)
        // Create the testbucket
        input := &s3.CreateBucketInput{
-               Bucket: aws.String(s3TestBucketName),
+               Bucket: aws.String(bucketName),
        }
-       req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
-       _, err := req.Send(context.Background())
+       _, err := v.s3Volume.bucket.svc.CreateBucket(context.Background(), input)
        c.Assert(err, check.IsNil)
        // We couldn't set RaceWindow until now because check()
        // rejects negative values.
@@ -624,12 +649,12 @@ func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) e
        key := v.key(loc)
        r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
 
-       uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+       uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
                u.PartSize = 5 * 1024 * 1024
                u.Concurrency = 13
        })
 
-       _, err := uploader.Upload(&s3manager.UploadInput{
+       _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
                Bucket: aws.String(v.bucket.bucket),
                Key:    aws.String(key),
                Body:   r,
@@ -639,7 +664,7 @@ func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) e
        }
 
        empty := bytes.NewReader([]byte{})
-       _, err = uploader.Upload(&s3manager.UploadInput{
+       _, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
                Bucket: aws.String(v.bucket.bucket),
                Key:    aws.String("recent/" + key),
                Body:   empty,
@@ -651,11 +676,11 @@ func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) e
 // there are no other operations happening on the same s3test server
 // while we do this.
 func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
-       v.serverClock.now = &lastPut
+       v.s3fakeClock.now = &lastPut
 
-       uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+       uploader := manager.NewUploader(v.bucket.svc)
        empty := bytes.NewReader([]byte{})
-       _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+       _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
                Bucket: aws.String(v.bucket.bucket),
                Key:    aws.String("recent/" + v.key(loc)),
                Body:   empty,
@@ -664,11 +689,10 @@ func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
                panic(err)
        }
 
-       v.serverClock.now = nil
+       v.s3fakeClock.now = nil
 }
 
 func (v *testableS3Volume) Teardown() {
-       v.server.Close()
 }
 
 func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {