1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "git.arvados.org/arvados.git/sdk/go/ctxlog"
23 "github.com/aws/aws-sdk-go-v2/aws"
24 "github.com/aws/aws-sdk-go-v2/service/s3"
25 "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
27 "github.com/johannesboyne/gofakes3"
28 "github.com/johannesboyne/gofakes3/backend/s3mem"
29 "github.com/prometheus/client_golang/prometheus"
30 "github.com/sirupsen/logrus"
31 check "gopkg.in/check.v1"
35 S3AWSTestBucketName = "testbucket"
38 type s3AWSFakeClock struct {
42 func (c *s3AWSFakeClock) Now() time.Time {
44 return time.Now().UTC()
49 func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
53 var _ = check.Suite(&StubbedS3AWSSuite{})
55 var srv httptest.Server
57 type StubbedS3AWSSuite struct {
58 s3server *httptest.Server
59 metadata *httptest.Server
60 cluster *arvados.Cluster
62 volumes []*TestableS3AWSVolume
65 func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
68 s.cluster = testCluster(c)
69 s.cluster.Volumes = map[string]arvados.Volume{
70 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
71 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
73 s.handler = &handler{}
76 func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
77 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
78 // Use a negative raceWindow so s3test's 1-second
79 // timestamp precision doesn't confuse fixRace.
80 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
84 func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
85 DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
86 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
90 func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
91 v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
93 for i := 0; i < 256; i++ {
94 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
96 for _, spec := range []struct {
105 buf := new(bytes.Buffer)
106 err := v.IndexTo(spec.prefix, buf)
107 c.Check(err, check.IsNil)
109 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
110 c.Check(len(idx), check.Equals, spec.expectMatch+1)
111 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
115 func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
116 var header http.Header
117 stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
122 // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
123 // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
125 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
127 SecretAccessKey: "xxx",
129 Region: "test-region-1",
130 Bucket: "test-bucket-name",
133 logger: ctxlog.TestLogger(c),
134 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
137 // Our test S3 server uses the older 'Path Style'
138 vol.bucket.svc.ForcePathStyle = true
140 c.Check(err, check.IsNil)
141 err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
142 c.Check(err, check.IsNil)
143 c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
146 func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
147 s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
148 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
149 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
150 // Literal example from
151 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
152 // but with updated timestamps
153 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
155 defer s.metadata.Close()
158 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
159 IAMRole: s.metadata.URL + "/latest/api/token",
160 Endpoint: "http://localhost:12345",
161 Region: "test-region-1",
162 Bucket: "test-bucket-name",
165 logger: ctxlog.TestLogger(c),
166 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
168 err := v.check(s.metadata.URL + "/latest")
169 c.Check(err, check.IsNil)
170 creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
171 c.Check(err, check.IsNil)
172 c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
173 c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
175 s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
176 w.WriteHeader(http.StatusNotFound)
178 deadv := &S3AWSVolume{
179 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
180 IAMRole: s.metadata.URL + "/fake-metadata/test-role",
181 Endpoint: "http://localhost:12345",
182 Region: "test-region-1",
183 Bucket: "test-bucket-name",
186 logger: ctxlog.TestLogger(c),
187 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
189 err = deadv.check(s.metadata.URL + "/latest")
190 c.Check(err, check.IsNil)
191 _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
192 c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
193 c.Check(err, check.ErrorMatches, `(?s).*404.*`)
196 func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
197 v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
198 stats := func() string {
199 buf, err := json.Marshal(v.InternalStats())
200 c.Check(err, check.IsNil)
204 c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
206 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
207 _, err := v.Get(context.Background(), loc, make([]byte, 3))
208 c.Check(err, check.NotNil)
209 c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
210 c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
211 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
213 err = v.Put(context.Background(), loc, []byte("foo"))
214 c.Check(err, check.IsNil)
215 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
216 c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
218 _, err = v.Get(context.Background(), loc, make([]byte, 3))
219 c.Check(err, check.IsNil)
220 _, err = v.Get(context.Background(), loc, make([]byte, 3))
221 c.Check(err, check.IsNil)
222 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
225 type s3AWSBlockingHandler struct {
226 requested chan *http.Request
227 unblock chan struct{}
230 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
231 if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
232 // Accept PutBucket ("PUT /bucketname/"), called by
236 if h.requested != nil {
239 if h.unblock != nil {
242 http.Error(w, "nothing here", http.StatusNotFound)
245 func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
246 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
247 buf := make([]byte, 3)
249 s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
250 _, err := v.Get(ctx, loc, buf)
255 func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
256 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
259 s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
260 return v.Compare(ctx, loc, buf)
264 func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
265 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
268 s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
269 return v.Put(ctx, loc, buf)
273 func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
274 handler := &s3AWSBlockingHandler{}
275 s.s3server = httptest.NewServer(handler)
276 defer s.s3server.Close()
278 v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
280 ctx, cancel := context.WithCancel(context.Background())
282 handler.requested = make(chan *http.Request)
283 handler.unblock = make(chan struct{})
284 defer close(handler.unblock)
286 doneFunc := make(chan struct{})
288 err := testFunc(ctx, v)
289 c.Check(err, check.Equals, context.Canceled)
293 timeout := time.After(10 * time.Second)
295 // Wait for the stub server to receive a request, meaning
296 // Get() is waiting for an s3 operation.
299 c.Fatal("timed out waiting for test func to call our handler")
301 c.Fatal("test func finished without even calling our handler!")
302 case <-handler.requested:
314 func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
315 s.cluster.Collections.BlobTrashLifetime.Set("1h")
316 s.cluster.Collections.BlobSigningTTL.Set("1h")
318 v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
321 putS3Obj := func(t time.Time, key string, data []byte) {
325 v.serverClock.now = &t
326 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
327 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
328 Bucket: aws.String(v.bucket.bucket),
329 Key: aws.String(key),
330 Body: bytes.NewReader(data),
335 v.serverClock.now = nil
344 for _, scenario := range []struct {
351 canGetAfterTrash bool
353 haveTrashAfterEmpty bool
357 "No related objects",
359 false, false, false, false, false, false,
362 // Stored by older version, or there was a
363 // race between EmptyTrash and Put: Trash is a
364 // no-op even though the data object is very
367 t0.Add(-48 * time.Hour), none, none,
368 true, true, true, false, false, false,
371 "Not trash, but old enough to be eligible for trash",
372 t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
373 true, true, false, false, false, false,
376 "Not trash, and not old enough to be eligible for trash",
377 t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
378 true, true, true, false, false, false,
381 "Trashed + untrashed copies exist, due to recent race between Trash and Put",
382 t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
383 true, true, true, true, true, false,
386 "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
387 t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
388 true, false, true, true, true, false,
391 "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
392 t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
393 true, false, true, true, false, false,
396 "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
397 t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
398 true, false, true, true, true, true,
401 "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
402 t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
403 true, true, true, true, false, false,
406 "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
407 t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
408 true, false, true, true, false, false,
411 "Trash, not yet eligible for deletion",
412 none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
413 false, false, false, true, true, false,
416 "Trash, not yet eligible for deletion, prone to races",
417 none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
418 false, false, false, true, true, false,
421 "Trash, eligible for deletion",
422 none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
423 false, false, false, true, false, false,
426 "Erroneously trashed during a race, detected before BlobTrashLifetime",
427 none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
428 true, false, true, true, true, false,
431 "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
432 none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
433 true, false, true, true, true, false,
436 "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
437 none, none, t0.Add(-time.Minute),
438 false, false, false, true, true, true,
441 c.Log("Scenario: ", scenario.label)
443 // We have a few tests to run for each scenario, and
444 // the tests are expected to change state. By calling
445 // this setup func between tests, we (re)create the
446 // scenario as specified, using a new unique block
447 // locator to prevent interference from previous
450 setupScenario := func() (string, []byte) {
452 blk := []byte(fmt.Sprintf("%d", nextKey))
453 loc := fmt.Sprintf("%x", md5.Sum(blk))
455 putS3Obj(scenario.dataT, loc, blk)
456 putS3Obj(scenario.recentT, "recent/"+loc, nil)
457 putS3Obj(scenario.trashT, "trash/"+loc, blk)
458 v.serverClock.now = &t0
463 loc, blk := setupScenario()
464 buf := make([]byte, len(blk))
465 _, err := v.Get(context.Background(), loc, buf)
466 c.Check(err == nil, check.Equals, scenario.canGet)
468 c.Check(os.IsNotExist(err), check.Equals, true)
471 // Call Trash, then check canTrash and canGetAfterTrash
472 loc, _ = setupScenario()
474 c.Check(err == nil, check.Equals, scenario.canTrash)
475 _, err = v.Get(context.Background(), loc, buf)
476 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
478 c.Check(os.IsNotExist(err), check.Equals, true)
481 // Call Untrash, then check canUntrash
482 loc, _ = setupScenario()
484 c.Check(err == nil, check.Equals, scenario.canUntrash)
485 if scenario.dataT != none || scenario.trashT != none {
486 // In all scenarios where the data exists, we
487 // should be able to Get after Untrash --
488 // regardless of timestamps, errors, race
490 _, err = v.Get(context.Background(), loc, buf)
491 c.Check(err, check.IsNil)
494 // Call EmptyTrash, then check haveTrashAfterEmpty and
496 loc, _ = setupScenario()
498 _, err = v.Head("trash/" + loc)
499 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
500 if scenario.freshAfterEmpty {
501 t, err := v.Mtime(loc)
502 c.Check(err, check.IsNil)
503 // new mtime must be current (with an
504 // allowance for 1s timestamp precision)
505 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
508 // Check for current Mtime after Put (applies to all
510 loc, blk = setupScenario()
511 err = v.Put(context.Background(), loc, blk)
512 c.Check(err, check.IsNil)
513 t, err := v.Mtime(loc)
514 c.Check(err, check.IsNil)
515 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
519 type TestableS3AWSVolume struct {
521 server *httptest.Server
523 serverClock *s3AWSFakeClock
526 type LogrusLog struct {
527 log *logrus.FieldLogger
530 func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
532 case gofakes3.LogErr:
533 (*l.log).Errorln(v...)
534 case gofakes3.LogWarn:
535 (*l.log).Warnln(v...)
536 case gofakes3.LogInfo:
537 (*l.log).Infoln(v...)
539 panic("unknown level")
543 func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
545 clock := &s3AWSFakeClock{}
547 backend := s3mem.New(s3mem.WithTimeSource(clock))
549 // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
550 /* logger := new(LogrusLog)
551 ctxLogger := ctxlog.FromContext(context.Background())
552 logger.log = &ctxLogger */
553 faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
554 srv := httptest.NewServer(faker.Server())
557 if s.s3server != nil {
558 endpoint = s.s3server.URL
561 iamRole, accessKey, secretKey := "", "xxx", "xxx"
562 if s.metadata != nil {
563 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
566 v := &TestableS3AWSVolume{
567 S3AWSVolume: &S3AWSVolume{
568 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
570 AccessKeyID: accessKey,
571 SecretAccessKey: secretKey,
572 Bucket: S3AWSTestBucketName,
574 Region: "test-region-1",
575 LocationConstraint: true,
581 logger: ctxlog.TestLogger(c),
588 c.Assert(v.S3AWSVolume.check(""), check.IsNil)
589 // Our test S3 server uses the older 'Path Style'
590 v.S3AWSVolume.bucket.svc.ForcePathStyle = true
591 // Create the testbucket
592 input := &s3.CreateBucketInput{
593 Bucket: aws.String(S3AWSTestBucketName),
595 req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
596 _, err := req.Send(context.Background())
597 c.Assert(err, check.IsNil)
598 // We couldn't set RaceWindow until now because check()
599 // rejects negative values.
600 v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
604 // PutRaw skips the ContentMD5 test
605 func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
607 r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
609 uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
610 u.PartSize = 5 * 1024 * 1024
614 _, err := uploader.Upload(&s3manager.UploadInput{
615 Bucket: aws.String(v.bucket.bucket),
616 Key: aws.String(loc),
620 v.logger.Printf("PutRaw: %s: %+v", loc, err)
623 empty := bytes.NewReader([]byte{})
624 _, err = uploader.Upload(&s3manager.UploadInput{
625 Bucket: aws.String(v.bucket.bucket),
626 Key: aws.String("recent/" + loc),
630 v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
634 // TouchWithDate turns back the clock while doing a Touch(). We assume
635 // there are no other operations happening on the same s3test server
637 func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
638 v.serverClock.now = &lastPut
640 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
641 empty := bytes.NewReader([]byte{})
642 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
643 Bucket: aws.String(v.bucket.bucket),
644 Key: aws.String("recent/" + locator),
651 v.serverClock.now = nil
654 func (v *TestableS3AWSVolume) Teardown() {
658 func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {