//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"bytes"
})
}
+func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ v.PrefixLength = 3
+ return v
+ })
+}
+
func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
v.IndexPageSize = 3
panic(err)
}
v.serverClock.now = nil
- _, err = v.Head(key)
+ _, err = v.head(key)
if err != nil {
panic(err)
}
false, false, false, true, true, true,
},
} {
- c.Log("Scenario: ", scenario.label)
-
- // We have a few tests to run for each scenario, and
- // the tests are expected to change state. By calling
- // this setup func between tests, we (re)create the
- // scenario as specified, using a new unique block
- // locator to prevent interference from previous
- // tests.
-
- setupScenario := func() (string, []byte) {
- nextKey++
- blk := []byte(fmt.Sprintf("%d", nextKey))
- loc := fmt.Sprintf("%x", md5.Sum(blk))
- c.Log("\t", loc)
- putS3Obj(scenario.dataT, loc, blk)
- putS3Obj(scenario.recentT, "recent/"+loc, nil)
- putS3Obj(scenario.trashT, "trash/"+loc, blk)
- v.serverClock.now = &t0
- return loc, blk
- }
-
- // Check canGet
- loc, blk := setupScenario()
- buf := make([]byte, len(blk))
- _, err := v.Get(context.Background(), loc, buf)
- c.Check(err == nil, check.Equals, scenario.canGet)
- if err != nil {
- c.Check(os.IsNotExist(err), check.Equals, true)
- }
-
- // Call Trash, then check canTrash and canGetAfterTrash
- loc, _ = setupScenario()
- err = v.Trash(loc)
- c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.Get(context.Background(), loc, buf)
- c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
- if err != nil {
- c.Check(os.IsNotExist(err), check.Equals, true)
- }
-
- // Call Untrash, then check canUntrash
- loc, _ = setupScenario()
- err = v.Untrash(loc)
- c.Check(err == nil, check.Equals, scenario.canUntrash)
- if scenario.dataT != none || scenario.trashT != none {
- // In all scenarios where the data exists, we
- // should be able to Get after Untrash --
- // regardless of timestamps, errors, race
- // conditions, etc.
+ for _, prefixLength := range []int{0, 3} {
+ v.PrefixLength = prefixLength
+ c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+ // We have a few tests to run for each scenario, and
+ // the tests are expected to change state. By calling
+ // this setup func between tests, we (re)create the
+ // scenario as specified, using a new unique block
+ // locator to prevent interference from previous
+ // tests.
+
+ setupScenario := func() (string, []byte) {
+ nextKey++
+ blk := []byte(fmt.Sprintf("%d", nextKey))
+ loc := fmt.Sprintf("%x", md5.Sum(blk))
+ key := loc
+ if prefixLength > 0 {
+ key = loc[:prefixLength] + "/" + loc
+ }
+ c.Log("\t", loc, "\t", key)
+ putS3Obj(scenario.dataT, key, blk)
+ putS3Obj(scenario.recentT, "recent/"+key, nil)
+ putS3Obj(scenario.trashT, "trash/"+key, blk)
+ v.serverClock.now = &t0
+ return loc, blk
+ }
+
+ // Check canGet
+ loc, blk := setupScenario()
+ buf := make([]byte, len(blk))
+ _, err := v.Get(context.Background(), loc, buf)
+ c.Check(err == nil, check.Equals, scenario.canGet)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Trash, then check canTrash and canGetAfterTrash
+ loc, _ = setupScenario()
+ err = v.Trash(loc)
+ c.Check(err == nil, check.Equals, scenario.canTrash)
_, err = v.Get(context.Background(), loc, buf)
+ c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Untrash, then check canUntrash
+ loc, _ = setupScenario()
+ err = v.Untrash(loc)
+ c.Check(err == nil, check.Equals, scenario.canUntrash)
+ if scenario.dataT != none || scenario.trashT != none {
+ // In all scenarios where the data exists, we
+ // should be able to Get after Untrash --
+ // regardless of timestamps, errors, race
+ // conditions, etc.
+ _, err = v.Get(context.Background(), loc, buf)
+ c.Check(err, check.IsNil)
+ }
+
+ // Call EmptyTrash, then check haveTrashAfterEmpty and
+ // freshAfterEmpty
+ loc, _ = setupScenario()
+ v.EmptyTrash()
+ _, err = v.head("trash/" + v.key(loc))
+ c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+ if scenario.freshAfterEmpty {
+ t, err := v.Mtime(loc)
+ c.Check(err, check.IsNil)
+ // new mtime must be current (with an
+ // allowance for 1s timestamp precision)
+ c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+ }
+
+ // Check for current Mtime after Put (applies to all
+ // scenarios)
+ loc, blk = setupScenario()
+ err = v.Put(context.Background(), loc, blk)
c.Check(err, check.IsNil)
- }
-
- // Call EmptyTrash, then check haveTrashAfterEmpty and
- // freshAfterEmpty
- loc, _ = setupScenario()
- v.EmptyTrash()
- _, err = v.Head("trash/" + loc)
- c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
- if scenario.freshAfterEmpty {
t, err := v.Mtime(loc)
c.Check(err, check.IsNil)
- // new mtime must be current (with an
- // allowance for 1s timestamp precision)
c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
}
-
- // Check for current Mtime after Put (applies to all
- // scenarios)
- loc, blk = setupScenario()
- err = v.Put(context.Background(), loc, blk)
- c.Check(err, check.IsNil)
- t, err := v.Mtime(loc)
- c.Check(err, check.IsNil)
- c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
}
}
// PutRaw skips the ContentMD5 test
func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
-
+ key := v.key(loc)
r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String(loc),
+ Key: aws.String(key),
Body: r,
})
if err != nil {
- v.logger.Printf("PutRaw: %s: %+v", loc, err)
+ v.logger.Printf("PutRaw: %s: %+v", key, err)
}
empty := bytes.NewReader([]byte{})
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String("recent/" + loc),
+ Key: aws.String("recent/" + key),
Body: empty,
})
if err != nil {
- v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+ v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
}
}
// TouchWithDate turns back the clock while doing a Touch(). We assume
// there are no other operations happening on the same s3test server
// while we do this.
-func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
+func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
v.serverClock.now = &lastPut
uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
empty := bytes.NewReader([]byte{})
_, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String("recent/" + locator),
+ Key: aws.String("recent/" + v.key(loc)),
Body: empty,
})
if err != nil {