1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "git.arvados.org/arvados.git/sdk/go/ctxlog"
23 "github.com/aws/aws-sdk-go-v2/aws"
24 "github.com/aws/aws-sdk-go-v2/service/s3"
25 "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
27 "github.com/johannesboyne/gofakes3"
28 "github.com/johannesboyne/gofakes3/backend/s3mem"
29 "github.com/prometheus/client_golang/prometheus"
30 "github.com/sirupsen/logrus"
31 check "gopkg.in/check.v1"
35 s3TestBucketName = "testbucket"
38 type s3AWSFakeClock struct {
42 func (c *s3AWSFakeClock) Now() time.Time {
44 return time.Now().UTC()
49 func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
53 var _ = check.Suite(&stubbedS3Suite{})
55 var srv httptest.Server
57 type stubbedS3Suite struct {
58 s3server *httptest.Server
59 metadata *httptest.Server
60 cluster *arvados.Cluster
61 volumes []*testableS3Volume
64 func (s *stubbedS3Suite) SetUpTest(c *check.C) {
67 s.cluster = testCluster(c)
68 s.cluster.Volumes = map[string]arvados.Volume{
69 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
70 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
74 func (s *stubbedS3Suite) TestGeneric(c *check.C) {
75 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
76 // Use a negative raceWindow so s3test's 1-second
77 // timestamp precision doesn't confuse fixRace.
78 return s.newTestableVolume(c, params, -2*time.Second)
82 func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
83 DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
84 return s.newTestableVolume(c, params, -2*time.Second)
88 func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
89 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
90 v := s.newTestableVolume(c, params, -2*time.Second)
96 func (s *stubbedS3Suite) TestIndex(c *check.C) {
97 v := s.newTestableVolume(c, newVolumeParams{
99 ConfigVolume: arvados.Volume{Replication: 2},
100 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
101 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
104 for i := 0; i < 256; i++ {
105 err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
106 c.Assert(err, check.IsNil)
108 for _, spec := range []struct {
117 buf := new(bytes.Buffer)
118 err := v.Index(context.Background(), spec.prefix, buf)
119 c.Check(err, check.IsNil)
121 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
122 c.Check(len(idx), check.Equals, spec.expectMatch+1)
123 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
127 func (s *stubbedS3Suite) TestSignature(c *check.C) {
128 var header http.Header
129 stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
134 // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
135 // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
137 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
139 SecretAccessKey: "xxx",
141 Region: "test-region-1",
142 Bucket: "test-bucket-name",
145 logger: ctxlog.TestLogger(c),
146 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
149 // Our test S3 server uses the older 'Path Style'
150 vol.bucket.svc.ForcePathStyle = true
152 c.Check(err, check.IsNil)
153 err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
154 c.Check(err, check.IsNil)
155 c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
158 func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
159 s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
160 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
161 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
162 // Literal example from
163 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
164 // but with updated timestamps
165 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
167 defer s.metadata.Close()
170 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
171 IAMRole: s.metadata.URL + "/latest/api/token",
172 Endpoint: "http://localhost:12345",
173 Region: "test-region-1",
174 Bucket: "test-bucket-name",
177 logger: ctxlog.TestLogger(c),
178 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
180 err := v.check(s.metadata.URL + "/latest")
181 c.Check(err, check.IsNil)
182 creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
183 c.Check(err, check.IsNil)
184 c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
185 c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
187 s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
188 w.WriteHeader(http.StatusNotFound)
191 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
192 IAMRole: s.metadata.URL + "/fake-metadata/test-role",
193 Endpoint: "http://localhost:12345",
194 Region: "test-region-1",
195 Bucket: "test-bucket-name",
198 logger: ctxlog.TestLogger(c),
199 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
201 err = deadv.check(s.metadata.URL + "/latest")
202 c.Check(err, check.IsNil)
203 _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
204 c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
205 c.Check(err, check.ErrorMatches, `(?s).*404.*`)
208 func (s *stubbedS3Suite) TestStats(c *check.C) {
209 v := s.newTestableVolume(c, newVolumeParams{
211 ConfigVolume: arvados.Volume{Replication: 2},
212 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
213 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
215 stats := func() string {
216 buf, err := json.Marshal(v.InternalStats())
217 c.Check(err, check.IsNil)
221 c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
223 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
224 _, err := v.BlockRead(context.Background(), loc, io.Discard)
225 c.Check(err, check.NotNil)
226 c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
227 c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
228 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
230 err = v.BlockWrite(context.Background(), loc, []byte("foo"))
231 c.Check(err, check.IsNil)
232 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
233 c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
235 _, err = v.BlockRead(context.Background(), loc, io.Discard)
236 c.Check(err, check.IsNil)
237 _, err = v.BlockRead(context.Background(), loc, io.Discard)
238 c.Check(err, check.IsNil)
239 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
242 type s3AWSBlockingHandler struct {
243 requested chan *http.Request
244 unblock chan struct{}
247 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
248 if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
249 // Accept PutBucket ("PUT /bucketname/"), called by
253 if h.requested != nil {
256 if h.unblock != nil {
259 http.Error(w, "nothing here", http.StatusNotFound)
262 func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
263 s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
264 _, err := v.BlockRead(ctx, fooHash, io.Discard)
269 func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
270 s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
271 return v.BlockWrite(ctx, fooHash, []byte("foo"))
275 func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
276 handler := &s3AWSBlockingHandler{}
277 s.s3server = httptest.NewServer(handler)
278 defer s.s3server.Close()
280 v := s.newTestableVolume(c, newVolumeParams{
282 ConfigVolume: arvados.Volume{Replication: 2},
283 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
284 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
287 ctx, cancel := context.WithCancel(context.Background())
289 handler.requested = make(chan *http.Request)
290 handler.unblock = make(chan struct{})
291 defer close(handler.unblock)
293 doneFunc := make(chan struct{})
295 err := testFunc(ctx, v)
296 c.Check(err, check.Equals, context.Canceled)
300 timeout := time.After(10 * time.Second)
302 // Wait for the stub server to receive a request, meaning
303 // Get() is waiting for an s3 operation.
306 c.Fatal("timed out waiting for test func to call our handler")
308 c.Fatal("test func finished without even calling our handler!")
309 case <-handler.requested:
321 func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
322 s.cluster.Collections.BlobTrashLifetime.Set("1h")
323 s.cluster.Collections.BlobSigningTTL.Set("1h")
325 v := s.newTestableVolume(c, newVolumeParams{
327 ConfigVolume: arvados.Volume{Replication: 2},
328 Logger: ctxlog.TestLogger(c),
329 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
330 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
334 putS3Obj := func(t time.Time, key string, data []byte) {
338 v.serverClock.now = &t
339 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
340 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
341 Bucket: aws.String(v.bucket.bucket),
342 Key: aws.String(key),
343 Body: bytes.NewReader(data),
348 v.serverClock.now = nil
357 for _, scenario := range []struct {
364 canGetAfterTrash bool
366 haveTrashAfterEmpty bool
370 "No related objects",
372 false, false, false, false, false, false,
375 // Stored by older version, or there was a
376 // race between EmptyTrash and Put: Trash is a
377 // no-op even though the data object is very
380 t0.Add(-48 * time.Hour), none, none,
381 true, true, true, false, false, false,
384 "Not trash, but old enough to be eligible for trash",
385 t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
386 true, true, false, false, false, false,
389 "Not trash, and not old enough to be eligible for trash",
390 t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
391 true, true, true, false, false, false,
394 "Trashed + untrashed copies exist, due to recent race between Trash and Put",
395 t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
396 true, true, true, true, true, false,
399 "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
400 t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
401 true, false, true, true, true, false,
404 "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
405 t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
406 true, false, true, true, false, false,
409 "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
410 t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
411 true, false, true, true, true, true,
414 "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
415 t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
416 true, true, true, true, false, false,
419 "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
420 t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
421 true, false, true, true, false, false,
424 "Trash, not yet eligible for deletion",
425 none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
426 false, false, false, true, true, false,
429 "Trash, not yet eligible for deletion, prone to races",
430 none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
431 false, false, false, true, true, false,
434 "Trash, eligible for deletion",
435 none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
436 false, false, false, true, false, false,
439 "Erroneously trashed during a race, detected before BlobTrashLifetime",
440 none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
441 true, false, true, true, true, false,
444 "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
445 none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
446 true, false, true, true, true, false,
449 "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
450 none, none, t0.Add(-time.Minute),
451 false, false, false, true, true, true,
454 for _, prefixLength := range []int{0, 3} {
455 v.PrefixLength = prefixLength
456 c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
458 // We have a few tests to run for each scenario, and
459 // the tests are expected to change state. By calling
460 // this setup func between tests, we (re)create the
461 // scenario as specified, using a new unique block
462 // locator to prevent interference from previous
465 setupScenario := func() (string, []byte) {
467 blk := []byte(fmt.Sprintf("%d", nextKey))
468 loc := fmt.Sprintf("%x", md5.Sum(blk))
470 if prefixLength > 0 {
471 key = loc[:prefixLength] + "/" + loc
473 c.Log("\t", loc, "\t", key)
474 putS3Obj(scenario.dataT, key, blk)
475 putS3Obj(scenario.recentT, "recent/"+key, nil)
476 putS3Obj(scenario.trashT, "trash/"+key, blk)
477 v.serverClock.now = &t0
482 loc, blk := setupScenario()
483 _, err := v.BlockRead(context.Background(), loc, io.Discard)
484 c.Check(err == nil, check.Equals, scenario.canGet)
486 c.Check(os.IsNotExist(err), check.Equals, true)
489 // Call Trash, then check canTrash and canGetAfterTrash
490 loc, _ = setupScenario()
491 err = v.BlockTrash(loc)
492 c.Check(err == nil, check.Equals, scenario.canTrash)
493 _, err = v.BlockRead(context.Background(), loc, io.Discard)
494 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
496 c.Check(os.IsNotExist(err), check.Equals, true)
499 // Call Untrash, then check canUntrash
500 loc, _ = setupScenario()
501 err = v.BlockUntrash(loc)
502 c.Check(err == nil, check.Equals, scenario.canUntrash)
503 if scenario.dataT != none || scenario.trashT != none {
504 // In all scenarios where the data exists, we
505 // should be able to Get after Untrash --
506 // regardless of timestamps, errors, race
508 _, err = v.BlockRead(context.Background(), loc, io.Discard)
509 c.Check(err, check.IsNil)
512 // Call EmptyTrash, then check haveTrashAfterEmpty and
514 loc, _ = setupScenario()
516 _, err = v.head("trash/" + v.key(loc))
517 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
518 if scenario.freshAfterEmpty {
519 t, err := v.Mtime(loc)
520 c.Check(err, check.IsNil)
521 // new mtime must be current (with an
522 // allowance for 1s timestamp precision)
523 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
526 // Check for current Mtime after Put (applies to all
528 loc, blk = setupScenario()
529 err = v.BlockWrite(context.Background(), loc, blk)
530 c.Check(err, check.IsNil)
531 t, err := v.Mtime(loc)
532 c.Check(err, check.IsNil)
533 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
538 type testableS3Volume struct {
540 server *httptest.Server
542 serverClock *s3AWSFakeClock
545 type LogrusLog struct {
546 log *logrus.FieldLogger
549 func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
551 case gofakes3.LogErr:
552 (*l.log).Errorln(v...)
553 case gofakes3.LogWarn:
554 (*l.log).Warnln(v...)
555 case gofakes3.LogInfo:
556 (*l.log).Infoln(v...)
558 panic("unknown level")
562 func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
564 clock := &s3AWSFakeClock{}
566 backend := s3mem.New(s3mem.WithTimeSource(clock))
568 // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
569 /* logger := new(LogrusLog)
570 ctxLogger := ctxlog.FromContext(context.Background())
571 logger.log = &ctxLogger */
572 faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
573 srv := httptest.NewServer(faker.Server())
576 if s.s3server != nil {
577 endpoint = s.s3server.URL
580 iamRole, accessKey, secretKey := "", "xxx", "xxx"
581 if s.metadata != nil {
582 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
585 v := &testableS3Volume{
587 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
589 AccessKeyID: accessKey,
590 SecretAccessKey: secretKey,
591 Bucket: s3TestBucketName,
593 Region: "test-region-1",
594 LocationConstraint: true,
598 cluster: params.Cluster,
599 volume: params.ConfigVolume,
600 logger: params.Logger,
601 metrics: params.MetricsVecs,
602 bufferPool: params.BufferPool,
608 c.Assert(v.s3Volume.check(""), check.IsNil)
609 // Our test S3 server uses the older 'Path Style'
610 v.s3Volume.bucket.svc.ForcePathStyle = true
611 // Create the testbucket
612 input := &s3.CreateBucketInput{
613 Bucket: aws.String(s3TestBucketName),
615 req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
616 _, err := req.Send(context.Background())
617 c.Assert(err, check.IsNil)
618 // We couldn't set RaceWindow until now because check()
619 // rejects negative values.
620 v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
624 func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
626 r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
628 uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
629 u.PartSize = 5 * 1024 * 1024
633 _, err := uploader.Upload(&s3manager.UploadInput{
634 Bucket: aws.String(v.bucket.bucket),
635 Key: aws.String(key),
642 empty := bytes.NewReader([]byte{})
643 _, err = uploader.Upload(&s3manager.UploadInput{
644 Bucket: aws.String(v.bucket.bucket),
645 Key: aws.String("recent/" + key),
651 // TouchWithDate turns back the clock while doing a Touch(). We assume
652 // there are no other operations happening on the same s3test server
654 func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
655 v.serverClock.now = &lastPut
657 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
658 empty := bytes.NewReader([]byte{})
659 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
660 Bucket: aws.String(v.bucket.bucket),
661 Key: aws.String("recent/" + v.key(loc)),
668 v.serverClock.now = nil
671 func (v *testableS3Volume) Teardown() {
675 func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {