2960: Finish renaming s3aws_volume to s3_volume.
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22
23         "github.com/aws/aws-sdk-go-v2/aws"
24         "github.com/aws/aws-sdk-go-v2/service/s3"
25         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
26
27         "github.com/johannesboyne/gofakes3"
28         "github.com/johannesboyne/gofakes3/backend/s3mem"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31         check "gopkg.in/check.v1"
32 )
33
34 const (
35         s3TestBucketName = "testbucket"
36 )
37
38 type s3AWSFakeClock struct {
39         now *time.Time
40 }
41
42 func (c *s3AWSFakeClock) Now() time.Time {
43         if c.now == nil {
44                 return time.Now().UTC()
45         }
46         return c.now.UTC()
47 }
48
49 func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
50         return c.Now().Sub(t)
51 }
52
53 var _ = check.Suite(&stubbedS3Suite{})
54
55 var srv httptest.Server
56
57 type stubbedS3Suite struct {
58         s3server *httptest.Server
59         metadata *httptest.Server
60         cluster  *arvados.Cluster
61         volumes  []*testableS3Volume
62 }
63
64 func (s *stubbedS3Suite) SetUpTest(c *check.C) {
65         s.s3server = nil
66         s.metadata = nil
67         s.cluster = testCluster(c)
68         s.cluster.Volumes = map[string]arvados.Volume{
69                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
70                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
71         }
72 }
73
74 func (s *stubbedS3Suite) TestGeneric(c *check.C) {
75         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
76                 // Use a negative raceWindow so s3test's 1-second
77                 // timestamp precision doesn't confuse fixRace.
78                 return s.newTestableVolume(c, params, -2*time.Second)
79         })
80 }
81
82 func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
83         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
84                 return s.newTestableVolume(c, params, -2*time.Second)
85         })
86 }
87
88 func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
89         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
90                 v := s.newTestableVolume(c, params, -2*time.Second)
91                 v.PrefixLength = 3
92                 return v
93         })
94 }
95
96 func (s *stubbedS3Suite) TestIndex(c *check.C) {
97         v := s.newTestableVolume(c, newVolumeParams{
98                 Cluster:      s.cluster,
99                 ConfigVolume: arvados.Volume{Replication: 2},
100                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
101                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
102         }, 0)
103         v.IndexPageSize = 3
104         for i := 0; i < 256; i++ {
105                 err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
106                 c.Assert(err, check.IsNil)
107         }
108         for _, spec := range []struct {
109                 prefix      string
110                 expectMatch int
111         }{
112                 {"", 256},
113                 {"c", 16},
114                 {"bc", 1},
115                 {"abc", 0},
116         } {
117                 buf := new(bytes.Buffer)
118                 err := v.Index(context.Background(), spec.prefix, buf)
119                 c.Check(err, check.IsNil)
120
121                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
122                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
123                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
124         }
125 }
126
127 func (s *stubbedS3Suite) TestSignature(c *check.C) {
128         var header http.Header
129         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
130                 header = r.Header
131         }))
132         defer stub.Close()
133
134         // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
135         // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
136         vol := s3Volume{
137                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
138                         AccessKeyID:     "xxx",
139                         SecretAccessKey: "xxx",
140                         Endpoint:        stub.URL,
141                         Region:          "test-region-1",
142                         Bucket:          "test-bucket-name",
143                 },
144                 cluster: s.cluster,
145                 logger:  ctxlog.TestLogger(c),
146                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
147         }
148         err := vol.check("")
149         // Our test S3 server uses the older 'Path Style'
150         vol.bucket.svc.ForcePathStyle = true
151
152         c.Check(err, check.IsNil)
153         err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
154         c.Check(err, check.IsNil)
155         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
156 }
157
158 func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
159         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
160                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
161                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
162                 // Literal example from
163                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
164                 // but with updated timestamps
165                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
166         }))
167         defer s.metadata.Close()
168
169         v := &s3Volume{
170                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
171                         IAMRole:  s.metadata.URL + "/latest/api/token",
172                         Endpoint: "http://localhost:12345",
173                         Region:   "test-region-1",
174                         Bucket:   "test-bucket-name",
175                 },
176                 cluster: s.cluster,
177                 logger:  ctxlog.TestLogger(c),
178                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
179         }
180         err := v.check(s.metadata.URL + "/latest")
181         c.Check(err, check.IsNil)
182         creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
183         c.Check(err, check.IsNil)
184         c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
185         c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
186
187         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
188                 w.WriteHeader(http.StatusNotFound)
189         }))
190         deadv := &s3Volume{
191                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
192                         IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
193                         Endpoint: "http://localhost:12345",
194                         Region:   "test-region-1",
195                         Bucket:   "test-bucket-name",
196                 },
197                 cluster: s.cluster,
198                 logger:  ctxlog.TestLogger(c),
199                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
200         }
201         err = deadv.check(s.metadata.URL + "/latest")
202         c.Check(err, check.IsNil)
203         _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
204         c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
205         c.Check(err, check.ErrorMatches, `(?s).*404.*`)
206 }
207
208 func (s *stubbedS3Suite) TestStats(c *check.C) {
209         v := s.newTestableVolume(c, newVolumeParams{
210                 Cluster:      s.cluster,
211                 ConfigVolume: arvados.Volume{Replication: 2},
212                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
213                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
214         }, 5*time.Minute)
215         stats := func() string {
216                 buf, err := json.Marshal(v.InternalStats())
217                 c.Check(err, check.IsNil)
218                 return string(buf)
219         }
220
221         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
222
223         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
224         _, err := v.BlockRead(context.Background(), loc, io.Discard)
225         c.Check(err, check.NotNil)
226         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
227         c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
228         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
229
230         err = v.BlockWrite(context.Background(), loc, []byte("foo"))
231         c.Check(err, check.IsNil)
232         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
233         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
234
235         _, err = v.BlockRead(context.Background(), loc, io.Discard)
236         c.Check(err, check.IsNil)
237         _, err = v.BlockRead(context.Background(), loc, io.Discard)
238         c.Check(err, check.IsNil)
239         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
240 }
241
242 type s3AWSBlockingHandler struct {
243         requested chan *http.Request
244         unblock   chan struct{}
245 }
246
247 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
248         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
249                 // Accept PutBucket ("PUT /bucketname/"), called by
250                 // newTestableVolume
251                 return
252         }
253         if h.requested != nil {
254                 h.requested <- r
255         }
256         if h.unblock != nil {
257                 <-h.unblock
258         }
259         http.Error(w, "nothing here", http.StatusNotFound)
260 }
261
262 func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
263         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
264                 _, err := v.BlockRead(ctx, fooHash, io.Discard)
265                 return err
266         })
267 }
268
269 func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
270         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
271                 return v.BlockWrite(ctx, fooHash, []byte("foo"))
272         })
273 }
274
275 func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
276         handler := &s3AWSBlockingHandler{}
277         s.s3server = httptest.NewServer(handler)
278         defer s.s3server.Close()
279
280         v := s.newTestableVolume(c, newVolumeParams{
281                 Cluster:      s.cluster,
282                 ConfigVolume: arvados.Volume{Replication: 2},
283                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
284                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
285         }, 5*time.Minute)
286
287         ctx, cancel := context.WithCancel(context.Background())
288
289         handler.requested = make(chan *http.Request)
290         handler.unblock = make(chan struct{})
291         defer close(handler.unblock)
292
293         doneFunc := make(chan struct{})
294         go func() {
295                 err := testFunc(ctx, v)
296                 c.Check(err, check.Equals, context.Canceled)
297                 close(doneFunc)
298         }()
299
300         timeout := time.After(10 * time.Second)
301
302         // Wait for the stub server to receive a request, meaning
303         // Get() is waiting for an s3 operation.
304         select {
305         case <-timeout:
306                 c.Fatal("timed out waiting for test func to call our handler")
307         case <-doneFunc:
308                 c.Fatal("test func finished without even calling our handler!")
309         case <-handler.requested:
310         }
311
312         cancel()
313
314         select {
315         case <-timeout:
316                 c.Fatal("timed out")
317         case <-doneFunc:
318         }
319 }
320
321 func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
322         s.cluster.Collections.BlobTrashLifetime.Set("1h")
323         s.cluster.Collections.BlobSigningTTL.Set("1h")
324
325         v := s.newTestableVolume(c, newVolumeParams{
326                 Cluster:      s.cluster,
327                 ConfigVolume: arvados.Volume{Replication: 2},
328                 Logger:       ctxlog.TestLogger(c),
329                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
330                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
331         }, 5*time.Minute)
332         var none time.Time
333
334         putS3Obj := func(t time.Time, key string, data []byte) {
335                 if t == none {
336                         return
337                 }
338                 v.serverClock.now = &t
339                 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
340                 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
341                         Bucket: aws.String(v.bucket.bucket),
342                         Key:    aws.String(key),
343                         Body:   bytes.NewReader(data),
344                 })
345                 if err != nil {
346                         panic(err)
347                 }
348                 v.serverClock.now = nil
349                 _, err = v.head(key)
350                 if err != nil {
351                         panic(err)
352                 }
353         }
354
355         t0 := time.Now()
356         nextKey := 0
357         for _, scenario := range []struct {
358                 label               string
359                 dataT               time.Time
360                 recentT             time.Time
361                 trashT              time.Time
362                 canGet              bool
363                 canTrash            bool
364                 canGetAfterTrash    bool
365                 canUntrash          bool
366                 haveTrashAfterEmpty bool
367                 freshAfterEmpty     bool
368         }{
369                 {
370                         "No related objects",
371                         none, none, none,
372                         false, false, false, false, false, false,
373                 },
374                 {
375                         // Stored by older version, or there was a
376                         // race between EmptyTrash and Put: Trash is a
377                         // no-op even though the data object is very
378                         // old
379                         "No recent/X",
380                         t0.Add(-48 * time.Hour), none, none,
381                         true, true, true, false, false, false,
382                 },
383                 {
384                         "Not trash, but old enough to be eligible for trash",
385                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
386                         true, true, false, false, false, false,
387                 },
388                 {
389                         "Not trash, and not old enough to be eligible for trash",
390                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
391                         true, true, true, false, false, false,
392                 },
393                 {
394                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
395                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
396                         true, true, true, true, true, false,
397                 },
398                 {
399                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
400                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
401                         true, false, true, true, true, false,
402                 },
403                 {
404                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
405                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
406                         true, false, true, true, false, false,
407                 },
408                 {
409                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
410                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
411                         true, false, true, true, true, true,
412                 },
413                 {
414                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
415                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
416                         true, true, true, true, false, false,
417                 },
418                 {
419                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
420                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
421                         true, false, true, true, false, false,
422                 },
423                 {
424                         "Trash, not yet eligible for deletion",
425                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
426                         false, false, false, true, true, false,
427                 },
428                 {
429                         "Trash, not yet eligible for deletion, prone to races",
430                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
431                         false, false, false, true, true, false,
432                 },
433                 {
434                         "Trash, eligible for deletion",
435                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
436                         false, false, false, true, false, false,
437                 },
438                 {
439                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
440                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
441                         true, false, true, true, true, false,
442                 },
443                 {
444                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
445                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
446                         true, false, true, true, true, false,
447                 },
448                 {
449                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
450                         none, none, t0.Add(-time.Minute),
451                         false, false, false, true, true, true,
452                 },
453         } {
454                 for _, prefixLength := range []int{0, 3} {
455                         v.PrefixLength = prefixLength
456                         c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
457
458                         // We have a few tests to run for each scenario, and
459                         // the tests are expected to change state. By calling
460                         // this setup func between tests, we (re)create the
461                         // scenario as specified, using a new unique block
462                         // locator to prevent interference from previous
463                         // tests.
464
465                         setupScenario := func() (string, []byte) {
466                                 nextKey++
467                                 blk := []byte(fmt.Sprintf("%d", nextKey))
468                                 loc := fmt.Sprintf("%x", md5.Sum(blk))
469                                 key := loc
470                                 if prefixLength > 0 {
471                                         key = loc[:prefixLength] + "/" + loc
472                                 }
473                                 c.Log("\t", loc, "\t", key)
474                                 putS3Obj(scenario.dataT, key, blk)
475                                 putS3Obj(scenario.recentT, "recent/"+key, nil)
476                                 putS3Obj(scenario.trashT, "trash/"+key, blk)
477                                 v.serverClock.now = &t0
478                                 return loc, blk
479                         }
480
481                         // Check canGet
482                         loc, blk := setupScenario()
483                         _, err := v.BlockRead(context.Background(), loc, io.Discard)
484                         c.Check(err == nil, check.Equals, scenario.canGet)
485                         if err != nil {
486                                 c.Check(os.IsNotExist(err), check.Equals, true)
487                         }
488
489                         // Call Trash, then check canTrash and canGetAfterTrash
490                         loc, _ = setupScenario()
491                         err = v.BlockTrash(loc)
492                         c.Check(err == nil, check.Equals, scenario.canTrash)
493                         _, err = v.BlockRead(context.Background(), loc, io.Discard)
494                         c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
495                         if err != nil {
496                                 c.Check(os.IsNotExist(err), check.Equals, true)
497                         }
498
499                         // Call Untrash, then check canUntrash
500                         loc, _ = setupScenario()
501                         err = v.BlockUntrash(loc)
502                         c.Check(err == nil, check.Equals, scenario.canUntrash)
503                         if scenario.dataT != none || scenario.trashT != none {
504                                 // In all scenarios where the data exists, we
505                                 // should be able to Get after Untrash --
506                                 // regardless of timestamps, errors, race
507                                 // conditions, etc.
508                                 _, err = v.BlockRead(context.Background(), loc, io.Discard)
509                                 c.Check(err, check.IsNil)
510                         }
511
512                         // Call EmptyTrash, then check haveTrashAfterEmpty and
513                         // freshAfterEmpty
514                         loc, _ = setupScenario()
515                         v.EmptyTrash()
516                         _, err = v.head("trash/" + v.key(loc))
517                         c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
518                         if scenario.freshAfterEmpty {
519                                 t, err := v.Mtime(loc)
520                                 c.Check(err, check.IsNil)
521                                 // new mtime must be current (with an
522                                 // allowance for 1s timestamp precision)
523                                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
524                         }
525
526                         // Check for current Mtime after Put (applies to all
527                         // scenarios)
528                         loc, blk = setupScenario()
529                         err = v.BlockWrite(context.Background(), loc, blk)
530                         c.Check(err, check.IsNil)
531                         t, err := v.Mtime(loc)
532                         c.Check(err, check.IsNil)
533                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
534                 }
535         }
536 }
537
538 type testableS3Volume struct {
539         *s3Volume
540         server      *httptest.Server
541         c           *check.C
542         serverClock *s3AWSFakeClock
543 }
544
545 type LogrusLog struct {
546         log *logrus.FieldLogger
547 }
548
549 func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
550         switch level {
551         case gofakes3.LogErr:
552                 (*l.log).Errorln(v...)
553         case gofakes3.LogWarn:
554                 (*l.log).Warnln(v...)
555         case gofakes3.LogInfo:
556                 (*l.log).Infoln(v...)
557         default:
558                 panic("unknown level")
559         }
560 }
561
562 func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
563
564         clock := &s3AWSFakeClock{}
565         // fake s3
566         backend := s3mem.New(s3mem.WithTimeSource(clock))
567
568         // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
569         /* logger := new(LogrusLog)
570         ctxLogger := ctxlog.FromContext(context.Background())
571         logger.log = &ctxLogger */
572         faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
573         srv := httptest.NewServer(faker.Server())
574
575         endpoint := srv.URL
576         if s.s3server != nil {
577                 endpoint = s.s3server.URL
578         }
579
580         iamRole, accessKey, secretKey := "", "xxx", "xxx"
581         if s.metadata != nil {
582                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
583         }
584
585         v := &testableS3Volume{
586                 s3Volume: &s3Volume{
587                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
588                                 IAMRole:            iamRole,
589                                 AccessKeyID:        accessKey,
590                                 SecretAccessKey:    secretKey,
591                                 Bucket:             s3TestBucketName,
592                                 Endpoint:           endpoint,
593                                 Region:             "test-region-1",
594                                 LocationConstraint: true,
595                                 UnsafeDelete:       true,
596                                 IndexPageSize:      1000,
597                         },
598                         cluster:    params.Cluster,
599                         volume:     params.ConfigVolume,
600                         logger:     params.Logger,
601                         metrics:    params.MetricsVecs,
602                         bufferPool: params.BufferPool,
603                 },
604                 c:           c,
605                 server:      srv,
606                 serverClock: clock,
607         }
608         c.Assert(v.s3Volume.check(""), check.IsNil)
609         // Our test S3 server uses the older 'Path Style'
610         v.s3Volume.bucket.svc.ForcePathStyle = true
611         // Create the testbucket
612         input := &s3.CreateBucketInput{
613                 Bucket: aws.String(s3TestBucketName),
614         }
615         req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
616         _, err := req.Send(context.Background())
617         c.Assert(err, check.IsNil)
618         // We couldn't set RaceWindow until now because check()
619         // rejects negative values.
620         v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
621         return v
622 }
623
624 func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
625         key := v.key(loc)
626         r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
627
628         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
629                 u.PartSize = 5 * 1024 * 1024
630                 u.Concurrency = 13
631         })
632
633         _, err := uploader.Upload(&s3manager.UploadInput{
634                 Bucket: aws.String(v.bucket.bucket),
635                 Key:    aws.String(key),
636                 Body:   r,
637         })
638         if err != nil {
639                 return err
640         }
641
642         empty := bytes.NewReader([]byte{})
643         _, err = uploader.Upload(&s3manager.UploadInput{
644                 Bucket: aws.String(v.bucket.bucket),
645                 Key:    aws.String("recent/" + key),
646                 Body:   empty,
647         })
648         return err
649 }
650
651 // TouchWithDate turns back the clock while doing a Touch(). We assume
652 // there are no other operations happening on the same s3test server
653 // while we do this.
654 func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
655         v.serverClock.now = &lastPut
656
657         uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
658         empty := bytes.NewReader([]byte{})
659         _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
660                 Bucket: aws.String(v.bucket.bucket),
661                 Key:    aws.String("recent/" + v.key(loc)),
662                 Body:   empty,
663         })
664         if err != nil {
665                 panic(err)
666         }
667
668         v.serverClock.now = nil
669 }
670
671 func (v *testableS3Volume) Teardown() {
672         v.server.Close()
673 }
674
675 func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
676         return "get", "put"
677 }