21720:
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "sync/atomic"
19         "time"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/ctxlog"
23
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
26         "github.com/aws/aws-sdk-go-v2/service/s3"
27
28         "github.com/johannesboyne/gofakes3"
29         "github.com/johannesboyne/gofakes3/backend/s3mem"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32         check "gopkg.in/check.v1"
33 )
34
35 type s3fakeClock struct {
36         now *time.Time
37 }
38
39 func (c *s3fakeClock) Now() time.Time {
40         if c.now == nil {
41                 return time.Now().UTC()
42         }
43         return c.now.UTC()
44 }
45
46 func (c *s3fakeClock) Since(t time.Time) time.Duration {
47         return c.Now().Sub(t)
48 }
49
50 var _ = check.Suite(&stubbedS3Suite{})
51
52 var srv httptest.Server
53
54 type stubbedS3Suite struct {
55         s3server    *httptest.Server
56         s3fakeClock *s3fakeClock
57         metadata    *httptest.Server
58         cluster     *arvados.Cluster
59         volumes     []*testableS3Volume
60 }
61
62 func (s *stubbedS3Suite) SetUpTest(c *check.C) {
63         s.s3server = nil
64         s.s3fakeClock = &s3fakeClock{}
65         s.metadata = nil
66         s.cluster = testCluster(c)
67         s.cluster.Volumes = map[string]arvados.Volume{
68                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
69                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
70         }
71 }
72
73 func (s *stubbedS3Suite) TearDownTest(c *check.C) {
74         if s.s3server != nil {
75                 s.s3server.Close()
76         }
77 }
78
79 func (s *stubbedS3Suite) TestGeneric(c *check.C) {
80         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
81                 // Use a negative raceWindow so s3test's 1-second
82                 // timestamp precision doesn't confuse fixRace.
83                 return s.newTestableVolume(c, params, -2*time.Second)
84         })
85 }
86
87 func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
88         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
89                 return s.newTestableVolume(c, params, -2*time.Second)
90         })
91 }
92
93 func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
94         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
95                 v := s.newTestableVolume(c, params, -2*time.Second)
96                 v.PrefixLength = 3
97                 return v
98         })
99 }
100
101 func (s *stubbedS3Suite) TestIndex(c *check.C) {
102         v := s.newTestableVolume(c, newVolumeParams{
103                 Cluster:      s.cluster,
104                 ConfigVolume: arvados.Volume{Replication: 2},
105                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
106                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
107         }, 0)
108         v.IndexPageSize = 3
109         for i := 0; i < 256; i++ {
110                 err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
111                 c.Assert(err, check.IsNil)
112         }
113         for _, spec := range []struct {
114                 prefix      string
115                 expectMatch int
116         }{
117                 {"", 256},
118                 {"c", 16},
119                 {"bc", 1},
120                 {"abc", 0},
121         } {
122                 buf := new(bytes.Buffer)
123                 err := v.Index(context.Background(), spec.prefix, buf)
124                 c.Check(err, check.IsNil)
125
126                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
127                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
128                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
129         }
130 }
131
132 func (s *stubbedS3Suite) TestSignature(c *check.C) {
133         var header http.Header
134         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
135                 header = r.Header
136         }))
137         defer stub.Close()
138
139         // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
140         // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
141         vol := s3Volume{
142                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
143                         AccessKeyID:     "xxx",
144                         SecretAccessKey: "xxx",
145                         Endpoint:        stub.URL,
146                         Region:          "test-region-1",
147                         Bucket:          "test-bucket-name",
148                 },
149                 cluster: s.cluster,
150                 logger:  ctxlog.TestLogger(c),
151                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
152         }
153         // Our test S3 server uses the older 'Path Style'
154         vol.usePathStyle = true
155         err := vol.check("")
156
157         c.Check(err, check.IsNil)
158         err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
159         c.Check(err, check.IsNil)
160         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
161 }
162
163 func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
164         var reqHeader http.Header
165         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
166                 reqHeader = r.Header
167         }))
168         defer stub.Close()
169
170         retrievedMetadata := false
171         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
172                 retrievedMetadata = true
173                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
174                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
175                 c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
176                 switch {
177                 case r.URL.Path == "/latest/meta-data/iam/security-credentials/":
178                         io.WriteString(w, "testcredential\n")
179                 case r.URL.Path == "/latest/api/token",
180                         r.URL.Path == "/latest/meta-data/iam/security-credentials/testcredential":
181                         // Literal example from
182                         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
183                         // but with updated timestamps
184                         io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
185                 default:
186                         w.WriteHeader(http.StatusNotFound)
187                 }
188         }))
189         defer s.metadata.Close()
190
191         v := &s3Volume{
192                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
193                         Endpoint: stub.URL,
194                         Region:   "test-region-1",
195                         Bucket:   "test-bucket-name",
196                 },
197                 cluster: s.cluster,
198                 logger:  ctxlog.TestLogger(c),
199                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
200         }
201         err := v.check(s.metadata.URL + "/latest")
202         c.Check(err, check.IsNil)
203         resp, err := v.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
204         c.Check(err, check.IsNil)
205         c.Check(resp.Buckets, check.HasLen, 0)
206         c.Check(retrievedMetadata, check.Equals, true)
207         c.Check(reqHeader.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 Credential=ASIAIOSFODNN7EXAMPLE/\d+/test-region-1/s3/aws4_request, SignedHeaders=.*`)
208
209         retrievedMetadata = false
210         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
211                 retrievedMetadata = true
212                 c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
213                 w.WriteHeader(http.StatusNotFound)
214         }))
215         deadv := &s3Volume{
216                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
217                         Endpoint: "http://localhost:9",
218                         Region:   "test-region-1",
219                         Bucket:   "test-bucket-name",
220                 },
221                 cluster: s.cluster,
222                 logger:  ctxlog.TestLogger(c),
223                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
224         }
225         err = deadv.check(s.metadata.URL + "/latest")
226         c.Check(err, check.IsNil)
227         _, err = deadv.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
228         c.Check(err, check.ErrorMatches, `(?s).*failed to refresh cached credentials, no EC2 IMDS role found.*`)
229         c.Check(err, check.ErrorMatches, `(?s).*404.*`)
230         c.Check(retrievedMetadata, check.Equals, true)
231 }
232
233 func (s *stubbedS3Suite) TestStats(c *check.C) {
234         v := s.newTestableVolume(c, newVolumeParams{
235                 Cluster:      s.cluster,
236                 ConfigVolume: arvados.Volume{Replication: 2},
237                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
238                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
239         }, 5*time.Minute)
240         stats := func() string {
241                 buf, err := json.Marshal(v.InternalStats())
242                 c.Check(err, check.IsNil)
243                 return string(buf)
244         }
245
246         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
247
248         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
249         err := v.BlockRead(context.Background(), loc, brdiscard)
250         c.Check(err, check.NotNil)
251         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
252         c.Check(stats(), check.Matches, `.*"\*smithy.OperationError 404 NoSuchKey":[^0].*`)
253         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
254
255         err = v.BlockWrite(context.Background(), loc, []byte("foo"))
256         c.Check(err, check.IsNil)
257         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
258         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
259
260         err = v.BlockRead(context.Background(), loc, brdiscard)
261         c.Check(err, check.IsNil)
262         err = v.BlockRead(context.Background(), loc, brdiscard)
263         c.Check(err, check.IsNil)
264         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
265 }
266
267 type s3AWSBlockingHandler struct {
268         requested chan *http.Request
269         unblock   chan struct{}
270 }
271
272 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
273         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
274                 // Accept PutBucket ("PUT /bucketname/"), called by
275                 // newTestableVolume
276                 return
277         }
278         if h.requested != nil {
279                 h.requested <- r
280         }
281         if h.unblock != nil {
282                 <-h.unblock
283         }
284         http.Error(w, "nothing here", http.StatusNotFound)
285 }
286
287 func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
288         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
289                 return v.BlockRead(ctx, fooHash, brdiscard)
290         })
291 }
292
293 func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
294         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
295                 return v.BlockWrite(ctx, fooHash, []byte("foo"))
296         })
297 }
298
299 func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
300         handler := &s3AWSBlockingHandler{}
301         s.s3server = httptest.NewServer(handler)
302         defer s.s3server.Close()
303
304         v := s.newTestableVolume(c, newVolumeParams{
305                 Cluster:      s.cluster,
306                 ConfigVolume: arvados.Volume{Replication: 2},
307                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
308                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
309         }, 5*time.Minute)
310
311         ctx, cancel := context.WithCancel(context.Background())
312
313         handler.requested = make(chan *http.Request)
314         handler.unblock = make(chan struct{})
315         defer close(handler.unblock)
316
317         doneFunc := make(chan struct{})
318         go func() {
319                 err := testFunc(ctx, v)
320                 c.Check(err, check.Equals, context.Canceled)
321                 close(doneFunc)
322         }()
323
324         timeout := time.After(10 * time.Second)
325
326         // Wait for the stub server to receive a request, meaning
327         // Get() is waiting for an s3 operation.
328         select {
329         case <-timeout:
330                 c.Fatal("timed out waiting for test func to call our handler")
331         case <-doneFunc:
332                 c.Fatal("test func finished without even calling our handler!")
333         case <-handler.requested:
334         }
335
336         cancel()
337
338         select {
339         case <-timeout:
340                 c.Fatal("timed out")
341         case <-doneFunc:
342         }
343 }
344
345 func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
346         s.cluster.Collections.BlobTrashLifetime.Set("1h")
347         s.cluster.Collections.BlobSigningTTL.Set("1h")
348
349         v := s.newTestableVolume(c, newVolumeParams{
350                 Cluster:      s.cluster,
351                 ConfigVolume: arvados.Volume{Replication: 2},
352                 Logger:       ctxlog.TestLogger(c),
353                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
354                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
355         }, 5*time.Minute)
356         var none time.Time
357
358         putS3Obj := func(t time.Time, key string, data []byte) {
359                 if t == none {
360                         return
361                 }
362                 s.s3fakeClock.now = &t
363                 uploader := manager.NewUploader(v.bucket.svc)
364                 _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
365                         Bucket: aws.String(v.bucket.bucket),
366                         Key:    aws.String(key),
367                         Body:   bytes.NewReader(data),
368                 })
369                 if err != nil {
370                         panic(err)
371                 }
372                 s.s3fakeClock.now = nil
373                 _, err = v.head(key)
374                 if err != nil {
375                         panic(err)
376                 }
377         }
378
379         t0 := time.Now()
380         nextKey := 0
381         for _, scenario := range []struct {
382                 label               string
383                 dataT               time.Time
384                 recentT             time.Time
385                 trashT              time.Time
386                 canGet              bool
387                 canTrash            bool
388                 canGetAfterTrash    bool
389                 canUntrash          bool
390                 haveTrashAfterEmpty bool
391                 freshAfterEmpty     bool
392         }{
393                 {
394                         "No related objects",
395                         none, none, none,
396                         false, false, false, false, false, false,
397                 },
398                 {
399                         // Stored by older version, or there was a
400                         // race between EmptyTrash and Put: Trash is a
401                         // no-op even though the data object is very
402                         // old
403                         "No recent/X",
404                         t0.Add(-48 * time.Hour), none, none,
405                         true, true, true, false, false, false,
406                 },
407                 {
408                         "Not trash, but old enough to be eligible for trash",
409                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
410                         true, true, false, false, false, false,
411                 },
412                 {
413                         "Not trash, and not old enough to be eligible for trash",
414                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
415                         true, true, true, false, false, false,
416                 },
417                 {
418                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
419                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
420                         true, true, true, true, true, false,
421                 },
422                 {
423                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
424                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
425                         true, false, true, true, true, false,
426                 },
427                 {
428                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
429                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
430                         true, false, true, true, false, false,
431                 },
432                 {
433                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
434                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
435                         true, false, true, true, true, true,
436                 },
437                 {
438                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
439                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
440                         true, true, true, true, false, false,
441                 },
442                 {
443                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
444                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
445                         true, false, true, true, false, false,
446                 },
447                 {
448                         "Trash, not yet eligible for deletion",
449                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
450                         false, false, false, true, true, false,
451                 },
452                 {
453                         "Trash, not yet eligible for deletion, prone to races",
454                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
455                         false, false, false, true, true, false,
456                 },
457                 {
458                         "Trash, eligible for deletion",
459                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
460                         false, false, false, true, false, false,
461                 },
462                 {
463                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
464                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
465                         true, false, true, true, true, false,
466                 },
467                 {
468                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
469                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
470                         true, false, true, true, true, false,
471                 },
472                 {
473                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
474                         none, none, t0.Add(-time.Minute),
475                         false, false, false, true, true, true,
476                 },
477         } {
478                 for _, prefixLength := range []int{0, 3} {
479                         v.PrefixLength = prefixLength
480                         c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
481
482                         // We have a few tests to run for each scenario, and
483                         // the tests are expected to change state. By calling
484                         // this setup func between tests, we (re)create the
485                         // scenario as specified, using a new unique block
486                         // locator to prevent interference from previous
487                         // tests.
488
489                         setupScenario := func() (string, []byte) {
490                                 nextKey++
491                                 blk := []byte(fmt.Sprintf("%d", nextKey))
492                                 loc := fmt.Sprintf("%x", md5.Sum(blk))
493                                 key := loc
494                                 if prefixLength > 0 {
495                                         key = loc[:prefixLength] + "/" + loc
496                                 }
497                                 c.Log("\t", loc, "\t", key)
498                                 putS3Obj(scenario.dataT, key, blk)
499                                 putS3Obj(scenario.recentT, "recent/"+key, nil)
500                                 putS3Obj(scenario.trashT, "trash/"+key, blk)
501                                 v.s3fakeClock.now = &t0
502                                 return loc, blk
503                         }
504
505                         // Check canGet
506                         loc, blk := setupScenario()
507                         err := v.BlockRead(context.Background(), loc, brdiscard)
508                         c.Check(err == nil, check.Equals, scenario.canGet, check.Commentf("err was %+v", err))
509                         if err != nil {
510                                 c.Check(os.IsNotExist(err), check.Equals, true)
511                         }
512
513                         // Call Trash, then check canTrash and canGetAfterTrash
514                         loc, _ = setupScenario()
515                         err = v.BlockTrash(loc)
516                         c.Check(err == nil, check.Equals, scenario.canTrash)
517                         err = v.BlockRead(context.Background(), loc, brdiscard)
518                         c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
519                         if err != nil {
520                                 c.Check(os.IsNotExist(err), check.Equals, true)
521                         }
522
523                         // Call Untrash, then check canUntrash
524                         loc, _ = setupScenario()
525                         err = v.BlockUntrash(loc)
526                         c.Check(err == nil, check.Equals, scenario.canUntrash)
527                         if scenario.dataT != none || scenario.trashT != none {
528                                 // In all scenarios where the data exists, we
529                                 // should be able to Get after Untrash --
530                                 // regardless of timestamps, errors, race
531                                 // conditions, etc.
532                                 err = v.BlockRead(context.Background(), loc, brdiscard)
533                                 c.Check(err, check.IsNil)
534                         }
535
536                         // Call EmptyTrash, then check haveTrashAfterEmpty and
537                         // freshAfterEmpty
538                         loc, _ = setupScenario()
539                         v.EmptyTrash()
540                         _, err = v.head("trash/" + v.key(loc))
541                         c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
542                         if scenario.freshAfterEmpty {
543                                 t, err := v.Mtime(loc)
544                                 c.Check(err, check.IsNil)
545                                 // new mtime must be current (with an
546                                 // allowance for 1s timestamp precision)
547                                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
548                         }
549
550                         // Check for current Mtime after Put (applies to all
551                         // scenarios)
552                         loc, blk = setupScenario()
553                         err = v.BlockWrite(context.Background(), loc, blk)
554                         c.Check(err, check.IsNil)
555                         t, err := v.Mtime(loc)
556                         c.Check(err, check.IsNil)
557                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
558                 }
559         }
560 }
561
562 type testableS3Volume struct {
563         *s3Volume
564         server      *httptest.Server
565         c           *check.C
566         s3fakeClock *s3fakeClock
567 }
568
569 type gofakes3logger struct {
570         logrus.FieldLogger
571 }
572
573 func (l gofakes3logger) Print(level gofakes3.LogLevel, v ...interface{}) {
574         switch level {
575         case gofakes3.LogErr:
576                 l.Errorln(v...)
577         case gofakes3.LogWarn:
578                 l.Warnln(v...)
579         case gofakes3.LogInfo:
580                 l.Infoln(v...)
581         default:
582                 panic("unknown level")
583         }
584 }
585
586 var testBucketSerial atomic.Int64
587
588 func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
589         if params.Logger == nil {
590                 params.Logger = ctxlog.TestLogger(c)
591         }
592         if s.s3server == nil {
593                 backend := s3mem.New(s3mem.WithTimeSource(s.s3fakeClock))
594                 logger := ctxlog.TestLogger(c)
595                 faker := gofakes3.New(backend,
596                         gofakes3.WithTimeSource(s.s3fakeClock),
597                         gofakes3.WithLogger(gofakes3logger{FieldLogger: logger}),
598                         gofakes3.WithTimeSkewLimit(0))
599                 s.s3server = httptest.NewServer(faker.Server())
600         }
601         endpoint := s.s3server.URL
602         bucketName := fmt.Sprintf("testbucket%d", testBucketSerial.Add(1))
603
604         var metadataURL, accessKey, secretKey string
605         if s.metadata != nil {
606                 metadataURL = s.metadata.URL
607         } else {
608                 accessKey, secretKey = "xxx", "xxx"
609         }
610
611         v := &testableS3Volume{
612                 s3Volume: &s3Volume{
613                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
614                                 AccessKeyID:        accessKey,
615                                 SecretAccessKey:    secretKey,
616                                 Bucket:             bucketName,
617                                 Endpoint:           endpoint,
618                                 Region:             "test-region-1",
619                                 LocationConstraint: true,
620                                 UnsafeDelete:       true,
621                                 IndexPageSize:      1000,
622                         },
623                         cluster:      params.Cluster,
624                         volume:       params.ConfigVolume,
625                         logger:       params.Logger,
626                         metrics:      params.MetricsVecs,
627                         bufferPool:   params.BufferPool,
628                         usePathStyle: true,
629                 },
630                 c:           c,
631                 s3fakeClock: s.s3fakeClock,
632         }
633         c.Assert(v.s3Volume.check(metadataURL), check.IsNil)
634         // Create the testbucket
635         input := &s3.CreateBucketInput{
636                 Bucket: aws.String(bucketName),
637         }
638         _, err := v.s3Volume.bucket.svc.CreateBucket(context.Background(), input)
639         c.Assert(err, check.IsNil)
640         // We couldn't set RaceWindow until now because check()
641         // rejects negative values.
642         v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
643         return v
644 }
645
646 func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
647         key := v.key(loc)
648         r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
649
650         uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
651                 u.PartSize = 5 * 1024 * 1024
652                 u.Concurrency = 13
653         })
654
655         _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
656                 Bucket: aws.String(v.bucket.bucket),
657                 Key:    aws.String(key),
658                 Body:   r,
659         })
660         if err != nil {
661                 return err
662         }
663
664         empty := bytes.NewReader([]byte{})
665         _, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
666                 Bucket: aws.String(v.bucket.bucket),
667                 Key:    aws.String("recent/" + key),
668                 Body:   empty,
669         })
670         return err
671 }
672
673 // TouchWithDate turns back the clock while doing a Touch(). We assume
674 // there are no other operations happening on the same s3test server
675 // while we do this.
676 func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
677         v.s3fakeClock.now = &lastPut
678
679         uploader := manager.NewUploader(v.bucket.svc)
680         empty := bytes.NewReader([]byte{})
681         _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
682                 Bucket: aws.String(v.bucket.bucket),
683                 Key:    aws.String("recent/" + v.key(loc)),
684                 Body:   empty,
685         })
686         if err != nil {
687                 panic(err)
688         }
689
690         v.s3fakeClock.now = nil
691 }
692
693 func (v *testableS3Volume) Teardown() {
694 }
695
696 func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
697         return "get", "put"
698 }