X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/19f96717d0a7f26c28f8e5c61417c4246cfcffe1..093ec98e4a065acfc537ea22c08c337c115fe273:/services/keepstore/s3aws_volume_test.go diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3aws_volume_test.go index 0387d52f18..c7e2d485df 100644 --- a/services/keepstore/s3aws_volume_test.go +++ b/services/keepstore/s3aws_volume_test.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "bytes" @@ -87,6 +87,14 @@ func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) { }) } +func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) { + DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { + v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second) + v.PrefixLength = 3 + return v + }) +} + func (s *StubbedS3AWSSuite) TestIndex(c *check.C) { v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0) v.IndexPageSize = 3 @@ -333,7 +341,7 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) { panic(err) } v.serverClock.now = nil - _, err = v.Head(key) + _, err = v.head(key) if err != nil { panic(err) } @@ -438,81 +446,88 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) { false, false, false, true, true, true, }, } { - c.Log("Scenario: ", scenario.label) - - // We have a few tests to run for each scenario, and - // the tests are expected to change state. By calling - // this setup func between tests, we (re)create the - // scenario as specified, using a new unique block - // locator to prevent interference from previous - // tests. - - setupScenario := func() (string, []byte) { - nextKey++ - blk := []byte(fmt.Sprintf("%d", nextKey)) - loc := fmt.Sprintf("%x", md5.Sum(blk)) - c.Log("\t", loc) - putS3Obj(scenario.dataT, loc, blk) - putS3Obj(scenario.recentT, "recent/"+loc, nil) - putS3Obj(scenario.trashT, "trash/"+loc, blk) - v.serverClock.now = &t0 - return loc, blk - } - - // Check canGet - loc, blk := setupScenario() - buf := make([]byte, len(blk)) - _, err := v.Get(context.Background(), loc, buf) - c.Check(err == nil, check.Equals, scenario.canGet) - if err != nil { - c.Check(os.IsNotExist(err), check.Equals, true) - } - - // Call Trash, then check canTrash and canGetAfterTrash - loc, _ = setupScenario() - err = v.Trash(loc) - c.Check(err == nil, check.Equals, scenario.canTrash) - _, err = v.Get(context.Background(), loc, buf) - c.Check(err == nil, check.Equals, scenario.canGetAfterTrash) - if err != nil { - c.Check(os.IsNotExist(err), check.Equals, true) - } - - // Call Untrash, then check canUntrash - loc, _ = setupScenario() - err = v.Untrash(loc) - c.Check(err == nil, check.Equals, scenario.canUntrash) - if scenario.dataT != none || scenario.trashT != none { - // In all scenarios where the data exists, we - // should be able to Get after Untrash -- - // regardless of timestamps, errors, race - // conditions, etc. + for _, prefixLength := range []int{0, 3} { + v.PrefixLength = prefixLength + c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength) + + // We have a few tests to run for each scenario, and + // the tests are expected to change state. By calling + // this setup func between tests, we (re)create the + // scenario as specified, using a new unique block + // locator to prevent interference from previous + // tests. + + setupScenario := func() (string, []byte) { + nextKey++ + blk := []byte(fmt.Sprintf("%d", nextKey)) + loc := fmt.Sprintf("%x", md5.Sum(blk)) + key := loc + if prefixLength > 0 { + key = loc[:prefixLength] + "/" + loc + } + c.Log("\t", loc, "\t", key) + putS3Obj(scenario.dataT, key, blk) + putS3Obj(scenario.recentT, "recent/"+key, nil) + putS3Obj(scenario.trashT, "trash/"+key, blk) + v.serverClock.now = &t0 + return loc, blk + } + + // Check canGet + loc, blk := setupScenario() + buf := make([]byte, len(blk)) + _, err := v.Get(context.Background(), loc, buf) + c.Check(err == nil, check.Equals, scenario.canGet) + if err != nil { + c.Check(os.IsNotExist(err), check.Equals, true) + } + + // Call Trash, then check canTrash and canGetAfterTrash + loc, _ = setupScenario() + err = v.Trash(loc) + c.Check(err == nil, check.Equals, scenario.canTrash) _, err = v.Get(context.Background(), loc, buf) + c.Check(err == nil, check.Equals, scenario.canGetAfterTrash) + if err != nil { + c.Check(os.IsNotExist(err), check.Equals, true) + } + + // Call Untrash, then check canUntrash + loc, _ = setupScenario() + err = v.Untrash(loc) + c.Check(err == nil, check.Equals, scenario.canUntrash) + if scenario.dataT != none || scenario.trashT != none { + // In all scenarios where the data exists, we + // should be able to Get after Untrash -- + // regardless of timestamps, errors, race + // conditions, etc. + _, err = v.Get(context.Background(), loc, buf) + c.Check(err, check.IsNil) + } + + // Call EmptyTrash, then check haveTrashAfterEmpty and + // freshAfterEmpty + loc, _ = setupScenario() + v.EmptyTrash() + _, err = v.head("trash/" + v.key(loc)) + c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty) + if scenario.freshAfterEmpty { + t, err := v.Mtime(loc) + c.Check(err, check.IsNil) + // new mtime must be current (with an + // allowance for 1s timestamp precision) + c.Check(t.After(t0.Add(-time.Second)), check.Equals, true) + } + + // Check for current Mtime after Put (applies to all + // scenarios) + loc, blk = setupScenario() + err = v.Put(context.Background(), loc, blk) c.Check(err, check.IsNil) - } - - // Call EmptyTrash, then check haveTrashAfterEmpty and - // freshAfterEmpty - loc, _ = setupScenario() - v.EmptyTrash() - _, err = v.Head("trash/" + loc) - c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty) - if scenario.freshAfterEmpty { t, err := v.Mtime(loc) c.Check(err, check.IsNil) - // new mtime must be current (with an - // allowance for 1s timestamp precision) c.Check(t.After(t0.Add(-time.Second)), check.Equals, true) } - - // Check for current Mtime after Put (applies to all - // scenarios) - loc, blk = setupScenario() - err = v.Put(context.Background(), loc, blk) - c.Check(err, check.IsNil) - t, err := v.Mtime(loc) - c.Check(err, check.IsNil) - c.Check(t.After(t0.Add(-time.Second)), check.Equals, true) } } @@ -603,7 +618,7 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Clust // PutRaw skips the ContentMD5 test func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) { - + key := v.key(loc) r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes) uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) { @@ -613,35 +628,35 @@ func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) { _, err := uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(v.bucket.bucket), - Key: aws.String(loc), + Key: aws.String(key), Body: r, }) if err != nil { - v.logger.Printf("PutRaw: %s: %+v", loc, err) + v.logger.Printf("PutRaw: %s: %+v", key, err) } empty := bytes.NewReader([]byte{}) _, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(v.bucket.bucket), - Key: aws.String("recent/" + loc), + Key: aws.String("recent/" + key), Body: empty, }) if err != nil { - v.logger.Printf("PutRaw: recent/%s: %+v", loc, err) + v.logger.Printf("PutRaw: recent/%s: %+v", key, err) } } // TouchWithDate turns back the clock while doing a Touch(). We assume // there are no other operations happening on the same s3test server // while we do this. -func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) { +func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) { v.serverClock.now = &lastPut uploader := s3manager.NewUploaderWithClient(v.bucket.svc) empty := bytes.NewReader([]byte{}) _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{ Bucket: aws.String(v.bucket.bucket), - Key: aws.String("recent/" + locator), + Key: aws.String("recent/" + v.key(loc)), Body: empty, }) if err != nil {