X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e1953022010bc0679a2d79baf5c040b8312c5d8b..90ce981c11ba7812f722727d6d06225b91b7b9f9:/services/keepstore/s3_volume_test.go diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go index fb68e1c057..df9fe72683 100644 --- a/services/keepstore/s3_volume_test.go +++ b/services/keepstore/s3_volume_test.go @@ -15,14 +15,15 @@ import ( "net/http/httptest" "os" "strings" + "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/s3manager" "github.com/johannesboyne/gofakes3" "github.com/johannesboyne/gofakes3/backend/s3mem" @@ -31,22 +32,18 @@ import ( check "gopkg.in/check.v1" ) -const ( - s3TestBucketName = "testbucket" -) - -type s3AWSFakeClock struct { +type s3fakeClock struct { now *time.Time } -func (c *s3AWSFakeClock) Now() time.Time { +func (c *s3fakeClock) Now() time.Time { if c.now == nil { return time.Now().UTC() } return c.now.UTC() } -func (c *s3AWSFakeClock) Since(t time.Time) time.Duration { +func (c *s3fakeClock) Since(t time.Time) time.Duration { return c.Now().Sub(t) } @@ -55,14 +52,16 @@ var _ = check.Suite(&stubbedS3Suite{}) var srv httptest.Server type stubbedS3Suite struct { - s3server *httptest.Server - metadata *httptest.Server - cluster *arvados.Cluster - volumes []*testableS3Volume + s3server *httptest.Server + s3fakeClock *s3fakeClock + metadata *httptest.Server + cluster *arvados.Cluster + volumes []*testableS3Volume } func (s *stubbedS3Suite) SetUpTest(c *check.C) { s.s3server = nil + s.s3fakeClock = &s3fakeClock{} s.metadata = nil s.cluster = testCluster(c) s.cluster.Volumes = map[string]arvados.Volume{ @@ -71,6 +70,12 @@ func (s *stubbedS3Suite) SetUpTest(c *check.C) { } } +func (s *stubbedS3Suite) TearDownTest(c *check.C) { + if s.s3server != nil { + s.s3server.Close() + } +} + func (s *stubbedS3Suite) TestGeneric(c *check.C) { DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume { // Use a negative raceWindow so s3test's 1-second @@ -145,9 +150,9 @@ func (s *stubbedS3Suite) TestSignature(c *check.C) { logger: ctxlog.TestLogger(c), metrics: newVolumeMetricsVecs(prometheus.NewRegistry()), } - err := vol.check("") // Our test S3 server uses the older 'Path Style' - vol.bucket.svc.ForcePathStyle = true + vol.usePathStyle = true + err := vol.check("") c.Check(err, check.IsNil) err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo")) @@ -156,20 +161,36 @@ func (s *stubbedS3Suite) TestSignature(c *check.C) { } func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) { + var reqHeader http.Header + stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + reqHeader = r.Header + })) + defer stub.Close() + + retrievedMetadata := false s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + retrievedMetadata = true upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339) exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339) - // Literal example from - // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials - // but with updated timestamps - io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`) + c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path) + switch { + case r.URL.Path == "/latest/meta-data/iam/security-credentials/": + io.WriteString(w, "testcredential\n") + case r.URL.Path == "/latest/api/token", + r.URL.Path == "/latest/meta-data/iam/security-credentials/testcredential": + // Literal example from + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials + // but with updated timestamps + io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`) + default: + w.WriteHeader(http.StatusNotFound) + } })) defer s.metadata.Close() v := &s3Volume{ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ - IAMRole: s.metadata.URL + "/latest/api/token", - Endpoint: "http://localhost:12345", + Endpoint: stub.URL, Region: "test-region-1", Bucket: "test-bucket-name", }, @@ -179,18 +200,21 @@ func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) { } err := v.check(s.metadata.URL + "/latest") c.Check(err, check.IsNil) - creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background()) + resp, err := v.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{}) c.Check(err, check.IsNil) - c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE") - c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") + c.Check(resp.Buckets, check.HasLen, 0) + c.Check(retrievedMetadata, check.Equals, true) + c.Check(reqHeader.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 Credential=ASIAIOSFODNN7EXAMPLE/\d+/test-region-1/s3/aws4_request, SignedHeaders=.*`) + retrievedMetadata = false s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + retrievedMetadata = true + c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path) w.WriteHeader(http.StatusNotFound) })) deadv := &s3Volume{ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ - IAMRole: s.metadata.URL + "/fake-metadata/test-role", - Endpoint: "http://localhost:12345", + Endpoint: "http://localhost:9", Region: "test-region-1", Bucket: "test-bucket-name", }, @@ -200,9 +224,10 @@ func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) { } err = deadv.check(s.metadata.URL + "/latest") c.Check(err, check.IsNil) - _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background()) - c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`) + _, err = deadv.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{}) + c.Check(err, check.ErrorMatches, `(?s).*failed to refresh cached credentials, no EC2 IMDS role found.*`) c.Check(err, check.ErrorMatches, `(?s).*404.*`) + c.Check(retrievedMetadata, check.Equals, true) } func (s *stubbedS3Suite) TestStats(c *check.C) { @@ -224,7 +249,7 @@ func (s *stubbedS3Suite) TestStats(c *check.C) { err := v.BlockRead(context.Background(), loc, brdiscard) c.Check(err, check.NotNil) c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`) - c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`) + c.Check(stats(), check.Matches, `.*"\*smithy.OperationError 404 NoSuchKey":[^0].*`) c.Check(stats(), check.Matches, `.*"InBytes":0,.*`) err = v.BlockWrite(context.Background(), loc, []byte("foo")) @@ -334,9 +359,9 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) { if t == none { return } - v.serverClock.now = &t - uploader := s3manager.NewUploaderWithClient(v.bucket.svc) - _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{ + s.s3fakeClock.now = &t + uploader := manager.NewUploader(v.bucket.svc) + _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(v.bucket.bucket), Key: aws.String(key), Body: bytes.NewReader(data), @@ -344,7 +369,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) { if err != nil { panic(err) } - v.serverClock.now = nil + s.s3fakeClock.now = nil _, err = v.head(key) if err != nil { panic(err) @@ -473,14 +498,14 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) { putS3Obj(scenario.dataT, key, blk) putS3Obj(scenario.recentT, "recent/"+key, nil) putS3Obj(scenario.trashT, "trash/"+key, blk) - v.serverClock.now = &t0 + v.s3fakeClock.now = &t0 return loc, blk } // Check canGet loc, blk := setupScenario() err := v.BlockRead(context.Background(), loc, brdiscard) - c.Check(err == nil, check.Equals, scenario.canGet) + c.Check(err == nil, check.Equals, scenario.canGet, check.Commentf("err was %+v", err)) if err != nil { c.Check(os.IsNotExist(err), check.Equals, true) } @@ -538,81 +563,79 @@ type testableS3Volume struct { *s3Volume server *httptest.Server c *check.C - serverClock *s3AWSFakeClock + s3fakeClock *s3fakeClock } -type LogrusLog struct { - log *logrus.FieldLogger +type gofakes3logger struct { + logrus.FieldLogger } -func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) { +func (l gofakes3logger) Print(level gofakes3.LogLevel, v ...interface{}) { switch level { case gofakes3.LogErr: - (*l.log).Errorln(v...) + l.Errorln(v...) case gofakes3.LogWarn: - (*l.log).Warnln(v...) + l.Warnln(v...) case gofakes3.LogInfo: - (*l.log).Infoln(v...) + l.Infoln(v...) default: panic("unknown level") } } -func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume { - - clock := &s3AWSFakeClock{} - // fake s3 - backend := s3mem.New(s3mem.WithTimeSource(clock)) +var testBucketSerial atomic.Int64 - // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger() - /* logger := new(LogrusLog) - ctxLogger := ctxlog.FromContext(context.Background()) - logger.log = &ctxLogger */ - faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0)) - srv := httptest.NewServer(faker.Server()) - - endpoint := srv.URL - if s.s3server != nil { - endpoint = s.s3server.URL +func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume { + if params.Logger == nil { + params.Logger = ctxlog.TestLogger(c) } + if s.s3server == nil { + backend := s3mem.New(s3mem.WithTimeSource(s.s3fakeClock)) + logger := ctxlog.TestLogger(c) + faker := gofakes3.New(backend, + gofakes3.WithTimeSource(s.s3fakeClock), + gofakes3.WithLogger(gofakes3logger{FieldLogger: logger}), + gofakes3.WithTimeSkewLimit(0)) + s.s3server = httptest.NewServer(faker.Server()) + } + endpoint := s.s3server.URL + bucketName := fmt.Sprintf("testbucket%d", testBucketSerial.Add(1)) - iamRole, accessKey, secretKey := "", "xxx", "xxx" + var metadataURL, accessKey, secretKey string if s.metadata != nil { - iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", "" + metadataURL = s.metadata.URL + } else { + accessKey, secretKey = "xxx", "xxx" } v := &testableS3Volume{ s3Volume: &s3Volume{ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ - IAMRole: iamRole, AccessKeyID: accessKey, SecretAccessKey: secretKey, - Bucket: s3TestBucketName, + Bucket: bucketName, Endpoint: endpoint, Region: "test-region-1", LocationConstraint: true, UnsafeDelete: true, IndexPageSize: 1000, }, - cluster: params.Cluster, - volume: params.ConfigVolume, - logger: params.Logger, - metrics: params.MetricsVecs, - bufferPool: params.BufferPool, + cluster: params.Cluster, + volume: params.ConfigVolume, + logger: params.Logger, + metrics: params.MetricsVecs, + bufferPool: params.BufferPool, + usePathStyle: true, }, c: c, - server: srv, - serverClock: clock, + s3fakeClock: s.s3fakeClock, } - c.Assert(v.s3Volume.check(""), check.IsNil) - // Our test S3 server uses the older 'Path Style' - v.s3Volume.bucket.svc.ForcePathStyle = true + c.Assert(v.s3Volume.check(metadataURL), check.IsNil) // Create the testbucket input := &s3.CreateBucketInput{ - Bucket: aws.String(s3TestBucketName), + Bucket: aws.String(bucketName), } - req := v.s3Volume.bucket.svc.CreateBucketRequest(input) - _, err := req.Send(context.Background()) + _, err := v.s3Volume.bucket.svc.CreateBucket(context.Background(), input) c.Assert(err, check.IsNil) // We couldn't set RaceWindow until now because check() // rejects negative values. @@ -624,12 +647,12 @@ func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) e key := v.key(loc) r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes) - uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) { + uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) { u.PartSize = 5 * 1024 * 1024 u.Concurrency = 13 }) - _, err := uploader.Upload(&s3manager.UploadInput{ + _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(v.bucket.bucket), Key: aws.String(key), Body: r, @@ -639,7 +662,7 @@ func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) e } empty := bytes.NewReader([]byte{}) - _, err = uploader.Upload(&s3manager.UploadInput{ + _, err = uploader.Upload(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(v.bucket.bucket), Key: aws.String("recent/" + key), Body: empty, @@ -651,11 +674,11 @@ func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) e // there are no other operations happening on the same s3test server // while we do this. func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) { - v.serverClock.now = &lastPut + v.s3fakeClock.now = &lastPut - uploader := s3manager.NewUploaderWithClient(v.bucket.svc) + uploader := manager.NewUploader(v.bucket.svc) empty := bytes.NewReader([]byte{}) - _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{ + _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(v.bucket.bucket), Key: aws.String("recent/" + v.key(loc)), Body: empty, @@ -664,11 +687,10 @@ func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) { panic(err) } - v.serverClock.now = nil + v.s3fakeClock.now = nil } func (v *testableS3Volume) Teardown() { - v.server.Close() } func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {