"net/http/httptest"
"os"
"strings"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
- "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
"github.com/johannesboyne/gofakes3"
"github.com/johannesboyne/gofakes3/backend/s3mem"
check "gopkg.in/check.v1"
)
-const (
- s3TestBucketName = "testbucket"
-)
-
-type s3AWSFakeClock struct {
+type s3fakeClock struct {
now *time.Time
}
-func (c *s3AWSFakeClock) Now() time.Time {
+func (c *s3fakeClock) Now() time.Time {
if c.now == nil {
return time.Now().UTC()
}
return c.now.UTC()
}
-func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
+func (c *s3fakeClock) Since(t time.Time) time.Duration {
return c.Now().Sub(t)
}
var srv httptest.Server
type stubbedS3Suite struct {
- s3server *httptest.Server
- metadata *httptest.Server
- cluster *arvados.Cluster
- volumes []*testableS3Volume
+ s3server *httptest.Server
+ s3fakeClock *s3fakeClock
+ metadata *httptest.Server
+ cluster *arvados.Cluster
+ volumes []*testableS3Volume
}
func (s *stubbedS3Suite) SetUpTest(c *check.C) {
s.s3server = nil
+ s.s3fakeClock = &s3fakeClock{}
s.metadata = nil
s.cluster = testCluster(c)
s.cluster.Volumes = map[string]arvados.Volume{
}
}
+func (s *stubbedS3Suite) TearDownTest(c *check.C) {
+ if s.s3server != nil {
+ s.s3server.Close()
+ }
+}
+
func (s *stubbedS3Suite) TestGeneric(c *check.C) {
DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
logger: ctxlog.TestLogger(c),
metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
}
- err := vol.check("")
// Our test S3 server uses the older 'Path Style'
- vol.bucket.svc.ForcePathStyle = true
+ vol.usePathStyle = true
+ err := vol.check("")
c.Check(err, check.IsNil)
err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
}
func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
+ var reqHeader http.Header
+ stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ reqHeader = r.Header
+ }))
+ defer stub.Close()
+
+ retrievedMetadata := false
s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ retrievedMetadata = true
upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
- // Literal example from
- // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
- // but with updated timestamps
- io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+ c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
+ switch {
+ case r.URL.Path == "/latest/meta-data/iam/security-credentials/":
+ io.WriteString(w, "testcredential\n")
+ case r.URL.Path == "/latest/api/token",
+ r.URL.Path == "/latest/meta-data/iam/security-credentials/testcredential":
+ // Literal example from
+ // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+ // but with updated timestamps
+ io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
}))
defer s.metadata.Close()
v := &s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
- IAMRole: s.metadata.URL + "/latest/api/token",
- Endpoint: "http://localhost:12345",
+ Endpoint: stub.URL,
Region: "test-region-1",
Bucket: "test-bucket-name",
},
}
err := v.check(s.metadata.URL + "/latest")
c.Check(err, check.IsNil)
- creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+ resp, err := v.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
c.Check(err, check.IsNil)
- c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
- c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+ c.Check(resp.Buckets, check.HasLen, 0)
+ c.Check(retrievedMetadata, check.Equals, true)
+ c.Check(reqHeader.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 Credential=ASIAIOSFODNN7EXAMPLE/\d+/test-region-1/s3/aws4_request, SignedHeaders=.*`)
+ retrievedMetadata = false
s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ retrievedMetadata = true
+ c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
}))
deadv := &s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
- IAMRole: s.metadata.URL + "/fake-metadata/test-role",
- Endpoint: "http://localhost:12345",
+ Endpoint: "http://localhost:9",
Region: "test-region-1",
Bucket: "test-bucket-name",
},
}
err = deadv.check(s.metadata.URL + "/latest")
c.Check(err, check.IsNil)
- _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
- c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
+ _, err = deadv.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
+ c.Check(err, check.ErrorMatches, `(?s).*failed to refresh cached credentials, no EC2 IMDS role found.*`)
c.Check(err, check.ErrorMatches, `(?s).*404.*`)
+ c.Check(retrievedMetadata, check.Equals, true)
}
func (s *stubbedS3Suite) TestStats(c *check.C) {
err := v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
- c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
+ c.Check(stats(), check.Matches, `.*"\*smithy.OperationError 404 NoSuchKey":[^0].*`)
c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
err = v.BlockWrite(context.Background(), loc, []byte("foo"))
if t == none {
return
}
- v.serverClock.now = &t
- uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
- _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+ s.s3fakeClock.now = &t
+ uploader := manager.NewUploader(v.bucket.svc)
+ _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
if err != nil {
panic(err)
}
- v.serverClock.now = nil
+ s.s3fakeClock.now = nil
_, err = v.head(key)
if err != nil {
panic(err)
putS3Obj(scenario.dataT, key, blk)
putS3Obj(scenario.recentT, "recent/"+key, nil)
putS3Obj(scenario.trashT, "trash/"+key, blk)
- v.serverClock.now = &t0
+ v.s3fakeClock.now = &t0
return loc, blk
}
// Check canGet
loc, blk := setupScenario()
err := v.BlockRead(context.Background(), loc, brdiscard)
- c.Check(err == nil, check.Equals, scenario.canGet)
+ c.Check(err == nil, check.Equals, scenario.canGet, check.Commentf("err was %+v", err))
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
}
*s3Volume
server *httptest.Server
c *check.C
- serverClock *s3AWSFakeClock
+ s3fakeClock *s3fakeClock
}
-type LogrusLog struct {
- log *logrus.FieldLogger
+type gofakes3logger struct {
+ logrus.FieldLogger
}
-func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
+func (l gofakes3logger) Print(level gofakes3.LogLevel, v ...interface{}) {
switch level {
case gofakes3.LogErr:
- (*l.log).Errorln(v...)
+ l.Errorln(v...)
case gofakes3.LogWarn:
- (*l.log).Warnln(v...)
+ l.Warnln(v...)
case gofakes3.LogInfo:
- (*l.log).Infoln(v...)
+ l.Infoln(v...)
default:
panic("unknown level")
}
}
-func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
-
- clock := &s3AWSFakeClock{}
- // fake s3
- backend := s3mem.New(s3mem.WithTimeSource(clock))
+var testBucketSerial atomic.Int64
- // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
- /* logger := new(LogrusLog)
- ctxLogger := ctxlog.FromContext(context.Background())
- logger.log = &ctxLogger */
- faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
- srv := httptest.NewServer(faker.Server())
-
- endpoint := srv.URL
- if s.s3server != nil {
- endpoint = s.s3server.URL
+func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
+ if params.Logger == nil {
+ params.Logger = ctxlog.TestLogger(c)
}
+ if s.s3server == nil {
+ backend := s3mem.New(s3mem.WithTimeSource(s.s3fakeClock))
+ logger := ctxlog.TestLogger(c)
+ faker := gofakes3.New(backend,
+ gofakes3.WithTimeSource(s.s3fakeClock),
+ gofakes3.WithLogger(gofakes3logger{FieldLogger: logger}),
+ gofakes3.WithTimeSkewLimit(0))
+ s.s3server = httptest.NewServer(faker.Server())
+ }
+ endpoint := s.s3server.URL
+ bucketName := fmt.Sprintf("testbucket%d", testBucketSerial.Add(1))
- iamRole, accessKey, secretKey := "", "xxx", "xxx"
+ var metadataURL, accessKey, secretKey string
if s.metadata != nil {
- iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+ metadataURL = s.metadata.URL
+ } else {
+ accessKey, secretKey = "xxx", "xxx"
}
v := &testableS3Volume{
s3Volume: &s3Volume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
- IAMRole: iamRole,
AccessKeyID: accessKey,
SecretAccessKey: secretKey,
- Bucket: s3TestBucketName,
+ Bucket: bucketName,
Endpoint: endpoint,
Region: "test-region-1",
LocationConstraint: true,
UnsafeDelete: true,
IndexPageSize: 1000,
},
- cluster: params.Cluster,
- volume: params.ConfigVolume,
- logger: params.Logger,
- metrics: params.MetricsVecs,
- bufferPool: params.BufferPool,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
+ usePathStyle: true,
},
c: c,
- server: srv,
- serverClock: clock,
+ s3fakeClock: s.s3fakeClock,
}
- c.Assert(v.s3Volume.check(""), check.IsNil)
- // Our test S3 server uses the older 'Path Style'
- v.s3Volume.bucket.svc.ForcePathStyle = true
+ c.Assert(v.s3Volume.check(metadataURL), check.IsNil)
// Create the testbucket
input := &s3.CreateBucketInput{
- Bucket: aws.String(s3TestBucketName),
+ Bucket: aws.String(bucketName),
}
- req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
- _, err := req.Send(context.Background())
+ _, err := v.s3Volume.bucket.svc.CreateBucket(context.Background(), input)
c.Assert(err, check.IsNil)
// We couldn't set RaceWindow until now because check()
// rejects negative values.
key := v.key(loc)
r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
- uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+ uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
u.PartSize = 5 * 1024 * 1024
u.Concurrency = 13
})
- _, err := uploader.Upload(&s3manager.UploadInput{
+ _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String(key),
Body: r,
}
empty := bytes.NewReader([]byte{})
- _, err = uploader.Upload(&s3manager.UploadInput{
+ _, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String("recent/" + key),
Body: empty,
// there are no other operations happening on the same s3test server
// while we do this.
func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
- v.serverClock.now = &lastPut
+ v.s3fakeClock.now = &lastPut
- uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+ uploader := manager.NewUploader(v.bucket.svc)
empty := bytes.NewReader([]byte{})
- _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+ _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String("recent/" + v.key(loc)),
Body: empty,
panic(err)
}
- v.serverClock.now = nil
+ v.s3fakeClock.now = nil
}
func (v *testableS3Volume) Teardown() {
- v.server.Close()
}
func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {