//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"bytes"
func (c *s3AWSFakeClock) Now() time.Time {
if c.now == nil {
- return time.Now()
+ return time.Now().UTC()
}
- return *c.now
+ return c.now.UTC()
}
func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
})
}
+func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ v.PrefixLength = 3
+ return v
+ })
+}
+
func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
v.IndexPageSize = 3
// as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
vol := S3AWSVolume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
- AccessKey: "xxx",
- SecretKey: "xxx",
- Endpoint: stub.URL,
- Region: "test-region-1",
- Bucket: "test-bucket-name",
+ AccessKeyID: "xxx",
+ SecretAccessKey: "xxx",
+ Endpoint: stub.URL,
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
},
cluster: s.cluster,
logger: ctxlog.TestLogger(c),
metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
}
err := v.check(s.metadata.URL + "/latest")
+ c.Check(err, check.IsNil)
creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
- fmt.Printf("%+v, %s\n", creds, err)
+ c.Check(err, check.IsNil)
c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
}
err = deadv.check(s.metadata.URL + "/latest")
+ c.Check(err, check.IsNil)
_, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
c.Check(err, check.ErrorMatches, `(?s).*404.*`)
return
}
v.serverClock.now = &t
- fmt.Printf("USING TIMESTAMP %s to write key %s", t, key)
uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
- resp, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+ _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
if err != nil {
panic(err)
}
- fmt.Println(resp)
v.serverClock.now = nil
- resp2, err := v.Head(key)
- fmt.Printf("KEY: %s\n%s", key, resp2)
+ _, err = v.head(key)
+ if err != nil {
+ panic(err)
+ }
}
t0 := time.Now()
false, false, false, true, true, true,
},
} {
- c.Log("Scenario: ", scenario.label)
-
- // We have a few tests to run for each scenario, and
- // the tests are expected to change state. By calling
- // this setup func between tests, we (re)create the
- // scenario as specified, using a new unique block
- // locator to prevent interference from previous
- // tests.
-
- setupScenario := func() (string, []byte) {
- nextKey++
- blk := []byte(fmt.Sprintf("%d", nextKey))
- loc := fmt.Sprintf("%x", md5.Sum(blk))
- c.Log("\t", loc)
- putS3Obj(scenario.dataT, loc, blk)
- putS3Obj(scenario.recentT, "recent/"+loc, nil)
- putS3Obj(scenario.trashT, "trash/"+loc, blk)
- v.serverClock.now = &t0
- return loc, blk
- }
-
- // Check canGet
- loc, blk := setupScenario()
- buf := make([]byte, len(blk))
- _, err := v.Get(context.Background(), loc, buf)
- c.Check(err == nil, check.Equals, scenario.canGet)
- if err != nil {
- c.Check(os.IsNotExist(err), check.Equals, true)
- }
-
- // Call Trash, then check canTrash and canGetAfterTrash
- loc, _ = setupScenario()
- err = v.Trash(loc)
- c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.Get(context.Background(), loc, buf)
- c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
- if err != nil {
- c.Check(os.IsNotExist(err), check.Equals, true)
- }
-
- // Call Untrash, then check canUntrash
- loc, _ = setupScenario()
- err = v.Untrash(loc)
- c.Check(err == nil, check.Equals, scenario.canUntrash)
- if scenario.dataT != none || scenario.trashT != none {
- // In all scenarios where the data exists, we
- // should be able to Get after Untrash --
- // regardless of timestamps, errors, race
- // conditions, etc.
+ for _, prefixLength := range []int{0, 3} {
+ v.PrefixLength = prefixLength
+ c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+ // We have a few tests to run for each scenario, and
+ // the tests are expected to change state. By calling
+ // this setup func between tests, we (re)create the
+ // scenario as specified, using a new unique block
+ // locator to prevent interference from previous
+ // tests.
+
+ setupScenario := func() (string, []byte) {
+ nextKey++
+ blk := []byte(fmt.Sprintf("%d", nextKey))
+ loc := fmt.Sprintf("%x", md5.Sum(blk))
+ key := loc
+ if prefixLength > 0 {
+ key = loc[:prefixLength] + "/" + loc
+ }
+ c.Log("\t", loc, "\t", key)
+ putS3Obj(scenario.dataT, key, blk)
+ putS3Obj(scenario.recentT, "recent/"+key, nil)
+ putS3Obj(scenario.trashT, "trash/"+key, blk)
+ v.serverClock.now = &t0
+ return loc, blk
+ }
+
+ // Check canGet
+ loc, blk := setupScenario()
+ buf := make([]byte, len(blk))
+ _, err := v.Get(context.Background(), loc, buf)
+ c.Check(err == nil, check.Equals, scenario.canGet)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Trash, then check canTrash and canGetAfterTrash
+ loc, _ = setupScenario()
+ err = v.Trash(loc)
+ c.Check(err == nil, check.Equals, scenario.canTrash)
_, err = v.Get(context.Background(), loc, buf)
+ c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Untrash, then check canUntrash
+ loc, _ = setupScenario()
+ err = v.Untrash(loc)
+ c.Check(err == nil, check.Equals, scenario.canUntrash)
+ if scenario.dataT != none || scenario.trashT != none {
+ // In all scenarios where the data exists, we
+ // should be able to Get after Untrash --
+ // regardless of timestamps, errors, race
+ // conditions, etc.
+ _, err = v.Get(context.Background(), loc, buf)
+ c.Check(err, check.IsNil)
+ }
+
+ // Call EmptyTrash, then check haveTrashAfterEmpty and
+ // freshAfterEmpty
+ loc, _ = setupScenario()
+ v.EmptyTrash()
+ _, err = v.head("trash/" + v.key(loc))
+ c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+ if scenario.freshAfterEmpty {
+ t, err := v.Mtime(loc)
+ c.Check(err, check.IsNil)
+ // new mtime must be current (with an
+ // allowance for 1s timestamp precision)
+ c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+ }
+
+ // Check for current Mtime after Put (applies to all
+ // scenarios)
+ loc, blk = setupScenario()
+ err = v.Put(context.Background(), loc, blk)
c.Check(err, check.IsNil)
- }
-
- // Call EmptyTrash, then check haveTrashAfterEmpty and
- // freshAfterEmpty
- loc, _ = setupScenario()
- v.EmptyTrash()
- _, err = v.Head("trash/" + loc)
- c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
- if scenario.freshAfterEmpty {
t, err := v.Mtime(loc)
c.Check(err, check.IsNil)
- // new mtime must be current (with an
- // allowance for 1s timestamp precision)
c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
}
-
- // Check for current Mtime after Put (applies to all
- // scenarios)
- loc, blk = setupScenario()
- err = v.Put(context.Background(), loc, blk)
- c.Check(err, check.IsNil)
- t, err := v.Mtime(loc)
- c.Check(err, check.IsNil)
- c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
}
}
// fake s3
backend := s3mem.New(s3mem.WithTimeSource(clock))
- logger := new(LogrusLog)
+ // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
+ /* logger := new(LogrusLog)
ctxLogger := ctxlog.FromContext(context.Background())
- logger.log = &ctxLogger
- faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(logger), gofakes3.WithTimeSkewLimit(0))
+ logger.log = &ctxLogger */
+ faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
srv := httptest.NewServer(faker.Server())
endpoint := srv.URL
S3AWSVolume: &S3AWSVolume{
S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
IAMRole: iamRole,
- AccessKey: accessKey,
- SecretKey: secretKey,
+ AccessKeyID: accessKey,
+ SecretAccessKey: secretKey,
Bucket: S3AWSTestBucketName,
Endpoint: endpoint,
Region: "test-region-1",
// PutRaw skips the ContentMD5 test
func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
-
+ key := v.key(loc)
r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String(loc),
+ Key: aws.String(key),
Body: r,
})
if err != nil {
- v.logger.Printf("PutRaw: %s: %+v", loc, err)
+ v.logger.Printf("PutRaw: %s: %+v", key, err)
}
empty := bytes.NewReader([]byte{})
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String("recent/" + loc),
+ Key: aws.String("recent/" + key),
Body: empty,
})
if err != nil {
- v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+ v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
}
}
// TouchWithDate turns back the clock while doing a Touch(). We assume
// there are no other operations happening on the same s3test server
// while we do this.
-func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
+func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
v.serverClock.now = &lastPut
uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
empty := bytes.NewReader([]byte{})
_, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String("recent/" + locator),
+ Key: aws.String("recent/" + v.key(loc)),
Body: empty,
})
if err != nil {