21020: Introduce BaseDirectory classes to arvados.util
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "sync/atomic"
19         "time"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/ctxlog"
23
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
26         "github.com/aws/aws-sdk-go-v2/service/s3"
27
28         "github.com/johannesboyne/gofakes3"
29         "github.com/johannesboyne/gofakes3/backend/s3mem"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32         check "gopkg.in/check.v1"
33 )
34
35 type s3fakeClock struct {
36         now *time.Time
37 }
38
39 func (c *s3fakeClock) Now() time.Time {
40         if c.now == nil {
41                 return time.Now().UTC()
42         }
43         return c.now.UTC()
44 }
45
46 func (c *s3fakeClock) Since(t time.Time) time.Duration {
47         return c.Now().Sub(t)
48 }
49
50 var _ = check.Suite(&stubbedS3Suite{})
51
52 var srv httptest.Server
53
54 type stubbedS3Suite struct {
55         s3server    *httptest.Server
56         s3fakeClock *s3fakeClock
57         metadata    *httptest.Server
58         cluster     *arvados.Cluster
59         volumes     []*testableS3Volume
60 }
61
62 func (s *stubbedS3Suite) SetUpTest(c *check.C) {
63         s.s3server = nil
64         s.s3fakeClock = &s3fakeClock{}
65         s.metadata = nil
66         s.cluster = testCluster(c)
67         s.cluster.Volumes = map[string]arvados.Volume{
68                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
69                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
70         }
71 }
72
73 func (s *stubbedS3Suite) TearDownTest(c *check.C) {
74         if s.s3server != nil {
75                 s.s3server.Close()
76         }
77 }
78
79 func (s *stubbedS3Suite) TestGeneric(c *check.C) {
80         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
81                 // Use a negative raceWindow so s3test's 1-second
82                 // timestamp precision doesn't confuse fixRace.
83                 return s.newTestableVolume(c, params, -2*time.Second)
84         })
85 }
86
87 func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
88         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
89                 return s.newTestableVolume(c, params, -2*time.Second)
90         })
91 }
92
93 func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
94         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
95                 v := s.newTestableVolume(c, params, -2*time.Second)
96                 v.PrefixLength = 3
97                 return v
98         })
99 }
100
101 func (s *stubbedS3Suite) TestIndex(c *check.C) {
102         v := s.newTestableVolume(c, newVolumeParams{
103                 Cluster:      s.cluster,
104                 ConfigVolume: arvados.Volume{Replication: 2},
105                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
106                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
107         }, 0)
108         v.IndexPageSize = 3
109         for i := 0; i < 256; i++ {
110                 err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
111                 c.Assert(err, check.IsNil)
112         }
113         for _, spec := range []struct {
114                 prefix      string
115                 expectMatch int
116         }{
117                 {"", 256},
118                 {"c", 16},
119                 {"bc", 1},
120                 {"abc", 0},
121         } {
122                 buf := new(bytes.Buffer)
123                 err := v.Index(context.Background(), spec.prefix, buf)
124                 c.Check(err, check.IsNil)
125
126                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
127                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
128                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
129         }
130 }
131
132 func (s *stubbedS3Suite) TestSignature(c *check.C) {
133         var header http.Header
134         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
135                 header = r.Header
136         }))
137         defer stub.Close()
138
139         // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
140         // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
141         vol := s3Volume{
142                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
143                         AccessKeyID:     "xxx",
144                         SecretAccessKey: "xxx",
145                         Endpoint:        stub.URL,
146                         Region:          "test-region-1",
147                         Bucket:          "test-bucket-name",
148                 },
149                 cluster: s.cluster,
150                 logger:  ctxlog.TestLogger(c),
151                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
152         }
153         // Our test S3 server uses the older 'Path Style'
154         vol.usePathStyle = true
155         err := vol.check("")
156
157         c.Check(err, check.IsNil)
158         err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
159         c.Check(err, check.IsNil)
160         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
161 }
162
163 func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
164         var reqHeader http.Header
165         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
166                 reqHeader = r.Header
167         }))
168         defer stub.Close()
169
170         retrievedMetadata := false
171         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
172                 retrievedMetadata = true
173                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
174                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
175                 c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
176                 switch {
177                 case r.URL.Path == "/latest/meta-data/iam/security-credentials/":
178                         io.WriteString(w, "testcredential\n")
179                 case r.URL.Path == "/latest/api/token",
180                         r.URL.Path == "/latest/meta-data/iam/security-credentials/testcredential":
181                         // Literal example from
182                         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
183                         // but with updated timestamps
184                         io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
185                 default:
186                         w.WriteHeader(http.StatusNotFound)
187                 }
188         }))
189         defer s.metadata.Close()
190
191         v := &s3Volume{
192                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
193                         IAMRole:  s.metadata.URL + "/latest/api/token",
194                         Endpoint: stub.URL,
195                         Region:   "test-region-1",
196                         Bucket:   "test-bucket-name",
197                 },
198                 cluster: s.cluster,
199                 logger:  ctxlog.TestLogger(c),
200                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
201         }
202         err := v.check(s.metadata.URL + "/latest")
203         c.Check(err, check.IsNil)
204         resp, err := v.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
205         c.Check(err, check.IsNil)
206         c.Check(resp.Buckets, check.HasLen, 0)
207         c.Check(retrievedMetadata, check.Equals, true)
208         c.Check(reqHeader.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 Credential=ASIAIOSFODNN7EXAMPLE/\d+/test-region-1/s3/aws4_request, SignedHeaders=.*`)
209
210         retrievedMetadata = false
211         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
212                 retrievedMetadata = true
213                 c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
214                 w.WriteHeader(http.StatusNotFound)
215         }))
216         deadv := &s3Volume{
217                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
218                         Endpoint: "http://localhost:9",
219                         Region:   "test-region-1",
220                         Bucket:   "test-bucket-name",
221                 },
222                 cluster: s.cluster,
223                 logger:  ctxlog.TestLogger(c),
224                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
225         }
226         err = deadv.check(s.metadata.URL + "/latest")
227         c.Check(err, check.IsNil)
228         _, err = deadv.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
229         c.Check(err, check.ErrorMatches, `(?s).*failed to refresh cached credentials, no EC2 IMDS role found.*`)
230         c.Check(err, check.ErrorMatches, `(?s).*404.*`)
231         c.Check(retrievedMetadata, check.Equals, true)
232 }
233
234 func (s *stubbedS3Suite) TestStats(c *check.C) {
235         v := s.newTestableVolume(c, newVolumeParams{
236                 Cluster:      s.cluster,
237                 ConfigVolume: arvados.Volume{Replication: 2},
238                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
239                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
240         }, 5*time.Minute)
241         stats := func() string {
242                 buf, err := json.Marshal(v.InternalStats())
243                 c.Check(err, check.IsNil)
244                 return string(buf)
245         }
246
247         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
248
249         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
250         err := v.BlockRead(context.Background(), loc, brdiscard)
251         c.Check(err, check.NotNil)
252         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
253         c.Check(stats(), check.Matches, `.*"\*smithy.OperationError 404 NoSuchKey":[^0].*`)
254         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
255
256         err = v.BlockWrite(context.Background(), loc, []byte("foo"))
257         c.Check(err, check.IsNil)
258         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
259         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
260
261         err = v.BlockRead(context.Background(), loc, brdiscard)
262         c.Check(err, check.IsNil)
263         err = v.BlockRead(context.Background(), loc, brdiscard)
264         c.Check(err, check.IsNil)
265         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
266 }
267
268 type s3AWSBlockingHandler struct {
269         requested chan *http.Request
270         unblock   chan struct{}
271 }
272
273 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
274         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
275                 // Accept PutBucket ("PUT /bucketname/"), called by
276                 // newTestableVolume
277                 return
278         }
279         if h.requested != nil {
280                 h.requested <- r
281         }
282         if h.unblock != nil {
283                 <-h.unblock
284         }
285         http.Error(w, "nothing here", http.StatusNotFound)
286 }
287
288 func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
289         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
290                 return v.BlockRead(ctx, fooHash, brdiscard)
291         })
292 }
293
294 func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
295         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
296                 return v.BlockWrite(ctx, fooHash, []byte("foo"))
297         })
298 }
299
300 func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
301         handler := &s3AWSBlockingHandler{}
302         s.s3server = httptest.NewServer(handler)
303         defer s.s3server.Close()
304
305         v := s.newTestableVolume(c, newVolumeParams{
306                 Cluster:      s.cluster,
307                 ConfigVolume: arvados.Volume{Replication: 2},
308                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
309                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
310         }, 5*time.Minute)
311
312         ctx, cancel := context.WithCancel(context.Background())
313
314         handler.requested = make(chan *http.Request)
315         handler.unblock = make(chan struct{})
316         defer close(handler.unblock)
317
318         doneFunc := make(chan struct{})
319         go func() {
320                 err := testFunc(ctx, v)
321                 c.Check(err, check.Equals, context.Canceled)
322                 close(doneFunc)
323         }()
324
325         timeout := time.After(10 * time.Second)
326
327         // Wait for the stub server to receive a request, meaning
328         // Get() is waiting for an s3 operation.
329         select {
330         case <-timeout:
331                 c.Fatal("timed out waiting for test func to call our handler")
332         case <-doneFunc:
333                 c.Fatal("test func finished without even calling our handler!")
334         case <-handler.requested:
335         }
336
337         cancel()
338
339         select {
340         case <-timeout:
341                 c.Fatal("timed out")
342         case <-doneFunc:
343         }
344 }
345
346 func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
347         s.cluster.Collections.BlobTrashLifetime.Set("1h")
348         s.cluster.Collections.BlobSigningTTL.Set("1h")
349
350         v := s.newTestableVolume(c, newVolumeParams{
351                 Cluster:      s.cluster,
352                 ConfigVolume: arvados.Volume{Replication: 2},
353                 Logger:       ctxlog.TestLogger(c),
354                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
355                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
356         }, 5*time.Minute)
357         var none time.Time
358
359         putS3Obj := func(t time.Time, key string, data []byte) {
360                 if t == none {
361                         return
362                 }
363                 s.s3fakeClock.now = &t
364                 uploader := manager.NewUploader(v.bucket.svc)
365                 _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
366                         Bucket: aws.String(v.bucket.bucket),
367                         Key:    aws.String(key),
368                         Body:   bytes.NewReader(data),
369                 })
370                 if err != nil {
371                         panic(err)
372                 }
373                 s.s3fakeClock.now = nil
374                 _, err = v.head(key)
375                 if err != nil {
376                         panic(err)
377                 }
378         }
379
380         t0 := time.Now()
381         nextKey := 0
382         for _, scenario := range []struct {
383                 label               string
384                 dataT               time.Time
385                 recentT             time.Time
386                 trashT              time.Time
387                 canGet              bool
388                 canTrash            bool
389                 canGetAfterTrash    bool
390                 canUntrash          bool
391                 haveTrashAfterEmpty bool
392                 freshAfterEmpty     bool
393         }{
394                 {
395                         "No related objects",
396                         none, none, none,
397                         false, false, false, false, false, false,
398                 },
399                 {
400                         // Stored by older version, or there was a
401                         // race between EmptyTrash and Put: Trash is a
402                         // no-op even though the data object is very
403                         // old
404                         "No recent/X",
405                         t0.Add(-48 * time.Hour), none, none,
406                         true, true, true, false, false, false,
407                 },
408                 {
409                         "Not trash, but old enough to be eligible for trash",
410                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
411                         true, true, false, false, false, false,
412                 },
413                 {
414                         "Not trash, and not old enough to be eligible for trash",
415                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
416                         true, true, true, false, false, false,
417                 },
418                 {
419                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
420                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
421                         true, true, true, true, true, false,
422                 },
423                 {
424                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
425                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
426                         true, false, true, true, true, false,
427                 },
428                 {
429                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
430                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
431                         true, false, true, true, false, false,
432                 },
433                 {
434                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
435                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
436                         true, false, true, true, true, true,
437                 },
438                 {
439                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
440                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
441                         true, true, true, true, false, false,
442                 },
443                 {
444                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
445                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
446                         true, false, true, true, false, false,
447                 },
448                 {
449                         "Trash, not yet eligible for deletion",
450                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
451                         false, false, false, true, true, false,
452                 },
453                 {
454                         "Trash, not yet eligible for deletion, prone to races",
455                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
456                         false, false, false, true, true, false,
457                 },
458                 {
459                         "Trash, eligible for deletion",
460                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
461                         false, false, false, true, false, false,
462                 },
463                 {
464                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
465                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
466                         true, false, true, true, true, false,
467                 },
468                 {
469                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
470                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
471                         true, false, true, true, true, false,
472                 },
473                 {
474                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
475                         none, none, t0.Add(-time.Minute),
476                         false, false, false, true, true, true,
477                 },
478         } {
479                 for _, prefixLength := range []int{0, 3} {
480                         v.PrefixLength = prefixLength
481                         c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
482
483                         // We have a few tests to run for each scenario, and
484                         // the tests are expected to change state. By calling
485                         // this setup func between tests, we (re)create the
486                         // scenario as specified, using a new unique block
487                         // locator to prevent interference from previous
488                         // tests.
489
490                         setupScenario := func() (string, []byte) {
491                                 nextKey++
492                                 blk := []byte(fmt.Sprintf("%d", nextKey))
493                                 loc := fmt.Sprintf("%x", md5.Sum(blk))
494                                 key := loc
495                                 if prefixLength > 0 {
496                                         key = loc[:prefixLength] + "/" + loc
497                                 }
498                                 c.Log("\t", loc, "\t", key)
499                                 putS3Obj(scenario.dataT, key, blk)
500                                 putS3Obj(scenario.recentT, "recent/"+key, nil)
501                                 putS3Obj(scenario.trashT, "trash/"+key, blk)
502                                 v.s3fakeClock.now = &t0
503                                 return loc, blk
504                         }
505
506                         // Check canGet
507                         loc, blk := setupScenario()
508                         err := v.BlockRead(context.Background(), loc, brdiscard)
509                         c.Check(err == nil, check.Equals, scenario.canGet, check.Commentf("err was %+v", err))
510                         if err != nil {
511                                 c.Check(os.IsNotExist(err), check.Equals, true)
512                         }
513
514                         // Call Trash, then check canTrash and canGetAfterTrash
515                         loc, _ = setupScenario()
516                         err = v.BlockTrash(loc)
517                         c.Check(err == nil, check.Equals, scenario.canTrash)
518                         err = v.BlockRead(context.Background(), loc, brdiscard)
519                         c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
520                         if err != nil {
521                                 c.Check(os.IsNotExist(err), check.Equals, true)
522                         }
523
524                         // Call Untrash, then check canUntrash
525                         loc, _ = setupScenario()
526                         err = v.BlockUntrash(loc)
527                         c.Check(err == nil, check.Equals, scenario.canUntrash)
528                         if scenario.dataT != none || scenario.trashT != none {
529                                 // In all scenarios where the data exists, we
530                                 // should be able to Get after Untrash --
531                                 // regardless of timestamps, errors, race
532                                 // conditions, etc.
533                                 err = v.BlockRead(context.Background(), loc, brdiscard)
534                                 c.Check(err, check.IsNil)
535                         }
536
537                         // Call EmptyTrash, then check haveTrashAfterEmpty and
538                         // freshAfterEmpty
539                         loc, _ = setupScenario()
540                         v.EmptyTrash()
541                         _, err = v.head("trash/" + v.key(loc))
542                         c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
543                         if scenario.freshAfterEmpty {
544                                 t, err := v.Mtime(loc)
545                                 c.Check(err, check.IsNil)
546                                 // new mtime must be current (with an
547                                 // allowance for 1s timestamp precision)
548                                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
549                         }
550
551                         // Check for current Mtime after Put (applies to all
552                         // scenarios)
553                         loc, blk = setupScenario()
554                         err = v.BlockWrite(context.Background(), loc, blk)
555                         c.Check(err, check.IsNil)
556                         t, err := v.Mtime(loc)
557                         c.Check(err, check.IsNil)
558                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
559                 }
560         }
561 }
562
563 type testableS3Volume struct {
564         *s3Volume
565         server      *httptest.Server
566         c           *check.C
567         s3fakeClock *s3fakeClock
568 }
569
570 type gofakes3logger struct {
571         logrus.FieldLogger
572 }
573
574 func (l gofakes3logger) Print(level gofakes3.LogLevel, v ...interface{}) {
575         switch level {
576         case gofakes3.LogErr:
577                 l.Errorln(v...)
578         case gofakes3.LogWarn:
579                 l.Warnln(v...)
580         case gofakes3.LogInfo:
581                 l.Infoln(v...)
582         default:
583                 panic("unknown level")
584         }
585 }
586
587 var testBucketSerial atomic.Int64
588
589 func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
590         if params.Logger == nil {
591                 params.Logger = ctxlog.TestLogger(c)
592         }
593         if s.s3server == nil {
594                 backend := s3mem.New(s3mem.WithTimeSource(s.s3fakeClock))
595                 logger := ctxlog.TestLogger(c)
596                 faker := gofakes3.New(backend,
597                         gofakes3.WithTimeSource(s.s3fakeClock),
598                         gofakes3.WithLogger(gofakes3logger{FieldLogger: logger}),
599                         gofakes3.WithTimeSkewLimit(0))
600                 s.s3server = httptest.NewServer(faker.Server())
601         }
602         endpoint := s.s3server.URL
603         bucketName := fmt.Sprintf("testbucket%d", testBucketSerial.Add(1))
604
605         var metadataURL, iamRole, accessKey, secretKey string
606         if s.metadata != nil {
607                 metadataURL, iamRole = s.metadata.URL, s.metadata.URL+"/fake-metadata/test-role"
608         } else {
609                 accessKey, secretKey = "xxx", "xxx"
610         }
611
612         v := &testableS3Volume{
613                 s3Volume: &s3Volume{
614                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
615                                 IAMRole:            iamRole,
616                                 AccessKeyID:        accessKey,
617                                 SecretAccessKey:    secretKey,
618                                 Bucket:             bucketName,
619                                 Endpoint:           endpoint,
620                                 Region:             "test-region-1",
621                                 LocationConstraint: true,
622                                 UnsafeDelete:       true,
623                                 IndexPageSize:      1000,
624                         },
625                         cluster:      params.Cluster,
626                         volume:       params.ConfigVolume,
627                         logger:       params.Logger,
628                         metrics:      params.MetricsVecs,
629                         bufferPool:   params.BufferPool,
630                         usePathStyle: true,
631                 },
632                 c:           c,
633                 s3fakeClock: s.s3fakeClock,
634         }
635         c.Assert(v.s3Volume.check(metadataURL), check.IsNil)
636         // Create the testbucket
637         input := &s3.CreateBucketInput{
638                 Bucket: aws.String(bucketName),
639         }
640         _, err := v.s3Volume.bucket.svc.CreateBucket(context.Background(), input)
641         c.Assert(err, check.IsNil)
642         // We couldn't set RaceWindow until now because check()
643         // rejects negative values.
644         v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
645         return v
646 }
647
648 func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
649         key := v.key(loc)
650         r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
651
652         uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
653                 u.PartSize = 5 * 1024 * 1024
654                 u.Concurrency = 13
655         })
656
657         _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
658                 Bucket: aws.String(v.bucket.bucket),
659                 Key:    aws.String(key),
660                 Body:   r,
661         })
662         if err != nil {
663                 return err
664         }
665
666         empty := bytes.NewReader([]byte{})
667         _, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
668                 Bucket: aws.String(v.bucket.bucket),
669                 Key:    aws.String("recent/" + key),
670                 Body:   empty,
671         })
672         return err
673 }
674
675 // TouchWithDate turns back the clock while doing a Touch(). We assume
676 // there are no other operations happening on the same s3test server
677 // while we do this.
678 func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
679         v.s3fakeClock.now = &lastPut
680
681         uploader := manager.NewUploader(v.bucket.svc)
682         empty := bytes.NewReader([]byte{})
683         _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
684                 Bucket: aws.String(v.bucket.bucket),
685                 Key:    aws.String("recent/" + v.key(loc)),
686                 Body:   empty,
687         })
688         if err != nil {
689                 panic(err)
690         }
691
692         v.s3fakeClock.now = nil
693 }
694
695 func (v *testableS3Volume) Teardown() {
696 }
697
698 func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
699         return "get", "put"
700 }