1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "git.arvados.org/arvados.git/sdk/go/ctxlog"
24 "github.com/aws/aws-sdk-go-v2/aws"
25 "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
26 "github.com/aws/aws-sdk-go-v2/service/s3"
28 "github.com/johannesboyne/gofakes3"
29 "github.com/johannesboyne/gofakes3/backend/s3mem"
30 "github.com/prometheus/client_golang/prometheus"
31 "github.com/sirupsen/logrus"
32 check "gopkg.in/check.v1"
35 type s3fakeClock struct {
39 func (c *s3fakeClock) Now() time.Time {
41 return time.Now().UTC()
46 func (c *s3fakeClock) Since(t time.Time) time.Duration {
50 var _ = check.Suite(&stubbedS3Suite{})
52 var srv httptest.Server
54 type stubbedS3Suite struct {
55 s3server *httptest.Server
56 s3fakeClock *s3fakeClock
57 metadata *httptest.Server
58 cluster *arvados.Cluster
59 volumes []*testableS3Volume
62 func (s *stubbedS3Suite) SetUpTest(c *check.C) {
64 s.s3fakeClock = &s3fakeClock{}
66 s.cluster = testCluster(c)
67 s.cluster.Volumes = map[string]arvados.Volume{
68 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
69 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
73 func (s *stubbedS3Suite) TearDownTest(c *check.C) {
74 if s.s3server != nil {
79 func (s *stubbedS3Suite) TestGeneric(c *check.C) {
80 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
81 // Use a negative raceWindow so s3test's 1-second
82 // timestamp precision doesn't confuse fixRace.
83 return s.newTestableVolume(c, params, -2*time.Second)
87 func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
88 DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
89 return s.newTestableVolume(c, params, -2*time.Second)
93 func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
94 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
95 v := s.newTestableVolume(c, params, -2*time.Second)
101 func (s *stubbedS3Suite) TestIndex(c *check.C) {
102 v := s.newTestableVolume(c, newVolumeParams{
104 ConfigVolume: arvados.Volume{Replication: 2},
105 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
106 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
109 for i := 0; i < 256; i++ {
110 err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
111 c.Assert(err, check.IsNil)
113 for _, spec := range []struct {
122 buf := new(bytes.Buffer)
123 err := v.Index(context.Background(), spec.prefix, buf)
124 c.Check(err, check.IsNil)
126 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
127 c.Check(len(idx), check.Equals, spec.expectMatch+1)
128 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
132 func (s *stubbedS3Suite) TestSignature(c *check.C) {
133 var header http.Header
134 stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
139 // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
140 // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
142 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
144 SecretAccessKey: "xxx",
146 Region: "test-region-1",
147 Bucket: "test-bucket-name",
150 logger: ctxlog.TestLogger(c),
151 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
153 // Our test S3 server uses the older 'Path Style'
154 vol.usePathStyle = true
157 c.Check(err, check.IsNil)
158 err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
159 c.Check(err, check.IsNil)
160 c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
163 func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
164 var reqHeader http.Header
165 stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
170 retrievedMetadata := false
171 s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
172 retrievedMetadata = true
173 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
174 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
175 c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
177 case r.URL.Path == "/latest/meta-data/iam/security-credentials/":
178 io.WriteString(w, "testcredential\n")
179 case r.URL.Path == "/latest/api/token",
180 r.URL.Path == "/latest/meta-data/iam/security-credentials/testcredential":
181 // Literal example from
182 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
183 // but with updated timestamps
184 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
186 w.WriteHeader(http.StatusNotFound)
189 defer s.metadata.Close()
192 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
193 IAMRole: s.metadata.URL + "/latest/api/token",
195 Region: "test-region-1",
196 Bucket: "test-bucket-name",
199 logger: ctxlog.TestLogger(c),
200 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
202 err := v.check(s.metadata.URL + "/latest")
203 c.Check(err, check.IsNil)
204 resp, err := v.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
205 c.Check(err, check.IsNil)
206 c.Check(resp.Buckets, check.HasLen, 0)
207 c.Check(retrievedMetadata, check.Equals, true)
208 c.Check(reqHeader.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 Credential=ASIAIOSFODNN7EXAMPLE/\d+/test-region-1/s3/aws4_request, SignedHeaders=.*`)
210 retrievedMetadata = false
211 s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
212 retrievedMetadata = true
213 c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
214 w.WriteHeader(http.StatusNotFound)
217 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
218 Endpoint: "http://localhost:9",
219 Region: "test-region-1",
220 Bucket: "test-bucket-name",
223 logger: ctxlog.TestLogger(c),
224 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
226 err = deadv.check(s.metadata.URL + "/latest")
227 c.Check(err, check.IsNil)
228 _, err = deadv.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
229 c.Check(err, check.ErrorMatches, `(?s).*failed to refresh cached credentials, no EC2 IMDS role found.*`)
230 c.Check(err, check.ErrorMatches, `(?s).*404.*`)
231 c.Check(retrievedMetadata, check.Equals, true)
234 func (s *stubbedS3Suite) TestStats(c *check.C) {
235 v := s.newTestableVolume(c, newVolumeParams{
237 ConfigVolume: arvados.Volume{Replication: 2},
238 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
239 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
241 stats := func() string {
242 buf, err := json.Marshal(v.InternalStats())
243 c.Check(err, check.IsNil)
247 c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
249 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
250 err := v.BlockRead(context.Background(), loc, brdiscard)
251 c.Check(err, check.NotNil)
252 c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
253 c.Check(stats(), check.Matches, `.*"\*smithy.OperationError 404 NoSuchKey":[^0].*`)
254 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
256 err = v.BlockWrite(context.Background(), loc, []byte("foo"))
257 c.Check(err, check.IsNil)
258 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
259 c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
261 err = v.BlockRead(context.Background(), loc, brdiscard)
262 c.Check(err, check.IsNil)
263 err = v.BlockRead(context.Background(), loc, brdiscard)
264 c.Check(err, check.IsNil)
265 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
268 type s3AWSBlockingHandler struct {
269 requested chan *http.Request
270 unblock chan struct{}
273 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
274 if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
275 // Accept PutBucket ("PUT /bucketname/"), called by
279 if h.requested != nil {
282 if h.unblock != nil {
285 http.Error(w, "nothing here", http.StatusNotFound)
288 func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
289 s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
290 return v.BlockRead(ctx, fooHash, brdiscard)
294 func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
295 s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
296 return v.BlockWrite(ctx, fooHash, []byte("foo"))
300 func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
301 handler := &s3AWSBlockingHandler{}
302 s.s3server = httptest.NewServer(handler)
303 defer s.s3server.Close()
305 v := s.newTestableVolume(c, newVolumeParams{
307 ConfigVolume: arvados.Volume{Replication: 2},
308 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
309 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
312 ctx, cancel := context.WithCancel(context.Background())
314 handler.requested = make(chan *http.Request)
315 handler.unblock = make(chan struct{})
316 defer close(handler.unblock)
318 doneFunc := make(chan struct{})
320 err := testFunc(ctx, v)
321 c.Check(err, check.Equals, context.Canceled)
325 timeout := time.After(10 * time.Second)
327 // Wait for the stub server to receive a request, meaning
328 // Get() is waiting for an s3 operation.
331 c.Fatal("timed out waiting for test func to call our handler")
333 c.Fatal("test func finished without even calling our handler!")
334 case <-handler.requested:
346 func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
347 s.cluster.Collections.BlobTrashLifetime.Set("1h")
348 s.cluster.Collections.BlobSigningTTL.Set("1h")
350 v := s.newTestableVolume(c, newVolumeParams{
352 ConfigVolume: arvados.Volume{Replication: 2},
353 Logger: ctxlog.TestLogger(c),
354 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
355 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
359 putS3Obj := func(t time.Time, key string, data []byte) {
363 s.s3fakeClock.now = &t
364 uploader := manager.NewUploader(v.bucket.svc)
365 _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
366 Bucket: aws.String(v.bucket.bucket),
367 Key: aws.String(key),
368 Body: bytes.NewReader(data),
373 s.s3fakeClock.now = nil
382 for _, scenario := range []struct {
389 canGetAfterTrash bool
391 haveTrashAfterEmpty bool
395 "No related objects",
397 false, false, false, false, false, false,
400 // Stored by older version, or there was a
401 // race between EmptyTrash and Put: Trash is a
402 // no-op even though the data object is very
405 t0.Add(-48 * time.Hour), none, none,
406 true, true, true, false, false, false,
409 "Not trash, but old enough to be eligible for trash",
410 t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
411 true, true, false, false, false, false,
414 "Not trash, and not old enough to be eligible for trash",
415 t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
416 true, true, true, false, false, false,
419 "Trashed + untrashed copies exist, due to recent race between Trash and Put",
420 t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
421 true, true, true, true, true, false,
424 "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
425 t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
426 true, false, true, true, true, false,
429 "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
430 t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
431 true, false, true, true, false, false,
434 "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
435 t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
436 true, false, true, true, true, true,
439 "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
440 t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
441 true, true, true, true, false, false,
444 "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
445 t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
446 true, false, true, true, false, false,
449 "Trash, not yet eligible for deletion",
450 none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
451 false, false, false, true, true, false,
454 "Trash, not yet eligible for deletion, prone to races",
455 none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
456 false, false, false, true, true, false,
459 "Trash, eligible for deletion",
460 none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
461 false, false, false, true, false, false,
464 "Erroneously trashed during a race, detected before BlobTrashLifetime",
465 none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
466 true, false, true, true, true, false,
469 "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
470 none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
471 true, false, true, true, true, false,
474 "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
475 none, none, t0.Add(-time.Minute),
476 false, false, false, true, true, true,
479 for _, prefixLength := range []int{0, 3} {
480 v.PrefixLength = prefixLength
481 c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
483 // We have a few tests to run for each scenario, and
484 // the tests are expected to change state. By calling
485 // this setup func between tests, we (re)create the
486 // scenario as specified, using a new unique block
487 // locator to prevent interference from previous
490 setupScenario := func() (string, []byte) {
492 blk := []byte(fmt.Sprintf("%d", nextKey))
493 loc := fmt.Sprintf("%x", md5.Sum(blk))
495 if prefixLength > 0 {
496 key = loc[:prefixLength] + "/" + loc
498 c.Log("\t", loc, "\t", key)
499 putS3Obj(scenario.dataT, key, blk)
500 putS3Obj(scenario.recentT, "recent/"+key, nil)
501 putS3Obj(scenario.trashT, "trash/"+key, blk)
502 v.s3fakeClock.now = &t0
507 loc, blk := setupScenario()
508 err := v.BlockRead(context.Background(), loc, brdiscard)
509 c.Check(err == nil, check.Equals, scenario.canGet, check.Commentf("err was %+v", err))
511 c.Check(os.IsNotExist(err), check.Equals, true)
514 // Call Trash, then check canTrash and canGetAfterTrash
515 loc, _ = setupScenario()
516 err = v.BlockTrash(loc)
517 c.Check(err == nil, check.Equals, scenario.canTrash)
518 err = v.BlockRead(context.Background(), loc, brdiscard)
519 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
521 c.Check(os.IsNotExist(err), check.Equals, true)
524 // Call Untrash, then check canUntrash
525 loc, _ = setupScenario()
526 err = v.BlockUntrash(loc)
527 c.Check(err == nil, check.Equals, scenario.canUntrash)
528 if scenario.dataT != none || scenario.trashT != none {
529 // In all scenarios where the data exists, we
530 // should be able to Get after Untrash --
531 // regardless of timestamps, errors, race
533 err = v.BlockRead(context.Background(), loc, brdiscard)
534 c.Check(err, check.IsNil)
537 // Call EmptyTrash, then check haveTrashAfterEmpty and
539 loc, _ = setupScenario()
541 _, err = v.head("trash/" + v.key(loc))
542 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
543 if scenario.freshAfterEmpty {
544 t, err := v.Mtime(loc)
545 c.Check(err, check.IsNil)
546 // new mtime must be current (with an
547 // allowance for 1s timestamp precision)
548 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
551 // Check for current Mtime after Put (applies to all
553 loc, blk = setupScenario()
554 err = v.BlockWrite(context.Background(), loc, blk)
555 c.Check(err, check.IsNil)
556 t, err := v.Mtime(loc)
557 c.Check(err, check.IsNil)
558 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
563 type testableS3Volume struct {
565 server *httptest.Server
567 s3fakeClock *s3fakeClock
570 type gofakes3logger struct {
574 func (l gofakes3logger) Print(level gofakes3.LogLevel, v ...interface{}) {
576 case gofakes3.LogErr:
578 case gofakes3.LogWarn:
580 case gofakes3.LogInfo:
583 panic("unknown level")
587 var testBucketSerial atomic.Int64
589 func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
590 if params.Logger == nil {
591 params.Logger = ctxlog.TestLogger(c)
593 if s.s3server == nil {
594 backend := s3mem.New(s3mem.WithTimeSource(s.s3fakeClock))
595 logger := ctxlog.TestLogger(c)
596 faker := gofakes3.New(backend,
597 gofakes3.WithTimeSource(s.s3fakeClock),
598 gofakes3.WithLogger(gofakes3logger{FieldLogger: logger}),
599 gofakes3.WithTimeSkewLimit(0))
600 s.s3server = httptest.NewServer(faker.Server())
602 endpoint := s.s3server.URL
603 bucketName := fmt.Sprintf("testbucket%d", testBucketSerial.Add(1))
605 var metadataURL, iamRole, accessKey, secretKey string
606 if s.metadata != nil {
607 metadataURL, iamRole = s.metadata.URL, s.metadata.URL+"/fake-metadata/test-role"
609 accessKey, secretKey = "xxx", "xxx"
612 v := &testableS3Volume{
614 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
616 AccessKeyID: accessKey,
617 SecretAccessKey: secretKey,
620 Region: "test-region-1",
621 LocationConstraint: true,
625 cluster: params.Cluster,
626 volume: params.ConfigVolume,
627 logger: params.Logger,
628 metrics: params.MetricsVecs,
629 bufferPool: params.BufferPool,
633 s3fakeClock: s.s3fakeClock,
635 c.Assert(v.s3Volume.check(metadataURL), check.IsNil)
636 // Create the testbucket
637 input := &s3.CreateBucketInput{
638 Bucket: aws.String(bucketName),
640 _, err := v.s3Volume.bucket.svc.CreateBucket(context.Background(), input)
641 c.Assert(err, check.IsNil)
642 // We couldn't set RaceWindow until now because check()
643 // rejects negative values.
644 v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
648 func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
650 r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
652 uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
653 u.PartSize = 5 * 1024 * 1024
657 _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
658 Bucket: aws.String(v.bucket.bucket),
659 Key: aws.String(key),
666 empty := bytes.NewReader([]byte{})
667 _, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
668 Bucket: aws.String(v.bucket.bucket),
669 Key: aws.String("recent/" + key),
675 // TouchWithDate turns back the clock while doing a Touch(). We assume
676 // there are no other operations happening on the same s3test server
678 func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
679 v.s3fakeClock.now = &lastPut
681 uploader := manager.NewUploader(v.bucket.svc)
682 empty := bytes.NewReader([]byte{})
683 _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
684 Bucket: aws.String(v.bucket.bucket),
685 Key: aws.String("recent/" + v.key(loc)),
692 v.s3fakeClock.now = nil
695 func (v *testableS3Volume) Teardown() {
698 func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {