Merge branch '21504-arv-mount-reference'
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22
23         "github.com/aws/aws-sdk-go-v2/aws"
24         "github.com/aws/aws-sdk-go-v2/service/s3"
25         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
26
27         "github.com/johannesboyne/gofakes3"
28         "github.com/johannesboyne/gofakes3/backend/s3mem"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31         check "gopkg.in/check.v1"
32 )
33
34 const (
35         s3TestBucketName = "testbucket"
36 )
37
38 type s3AWSFakeClock struct {
39         now *time.Time
40 }
41
42 func (c *s3AWSFakeClock) Now() time.Time {
43         if c.now == nil {
44                 return time.Now().UTC()
45         }
46         return c.now.UTC()
47 }
48
49 func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
50         return c.Now().Sub(t)
51 }
52
53 var _ = check.Suite(&stubbedS3Suite{})
54
55 var srv httptest.Server
56
57 type stubbedS3Suite struct {
58         s3server *httptest.Server
59         metadata *httptest.Server
60         cluster  *arvados.Cluster
61         volumes  []*testableS3Volume
62 }
63
64 func (s *stubbedS3Suite) SetUpTest(c *check.C) {
65         s.s3server = nil
66         s.metadata = nil
67         s.cluster = testCluster(c)
68         s.cluster.Volumes = map[string]arvados.Volume{
69                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
70                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
71         }
72 }
73
74 func (s *stubbedS3Suite) TestGeneric(c *check.C) {
75         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
76                 // Use a negative raceWindow so s3test's 1-second
77                 // timestamp precision doesn't confuse fixRace.
78                 return s.newTestableVolume(c, params, -2*time.Second)
79         })
80 }
81
82 func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
83         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
84                 return s.newTestableVolume(c, params, -2*time.Second)
85         })
86 }
87
88 func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
89         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
90                 v := s.newTestableVolume(c, params, -2*time.Second)
91                 v.PrefixLength = 3
92                 return v
93         })
94 }
95
96 func (s *stubbedS3Suite) TestIndex(c *check.C) {
97         v := s.newTestableVolume(c, newVolumeParams{
98                 Cluster:      s.cluster,
99                 ConfigVolume: arvados.Volume{Replication: 2},
100                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
101                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
102         }, 0)
103         v.IndexPageSize = 3
104         for i := 0; i < 256; i++ {
105                 err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
106                 c.Assert(err, check.IsNil)
107         }
108         for _, spec := range []struct {
109                 prefix      string
110                 expectMatch int
111         }{
112                 {"", 256},
113                 {"c", 16},
114                 {"bc", 1},
115                 {"abc", 0},
116         } {
117                 buf := new(bytes.Buffer)
118                 err := v.Index(context.Background(), spec.prefix, buf)
119                 c.Check(err, check.IsNil)
120
121                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
122                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
123                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
124         }
125 }
126
127 func (s *stubbedS3Suite) TestSignature(c *check.C) {
128         var header http.Header
129         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
130                 header = r.Header
131         }))
132         defer stub.Close()
133
134         // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
135         // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
136         vol := s3Volume{
137                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
138                         AccessKeyID:     "xxx",
139                         SecretAccessKey: "xxx",
140                         Endpoint:        stub.URL,
141                         Region:          "test-region-1",
142                         Bucket:          "test-bucket-name",
143                 },
144                 cluster: s.cluster,
145                 logger:  ctxlog.TestLogger(c),
146                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
147         }
148         err := vol.check("")
149         // Our test S3 server uses the older 'Path Style'
150         vol.bucket.svc.ForcePathStyle = true
151
152         c.Check(err, check.IsNil)
153         err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
154         c.Check(err, check.IsNil)
155         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
156 }
157
158 func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
159         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
160                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
161                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
162                 // Literal example from
163                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
164                 // but with updated timestamps
165                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
166         }))
167         defer s.metadata.Close()
168
169         v := &s3Volume{
170                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
171                         IAMRole:  s.metadata.URL + "/latest/api/token",
172                         Endpoint: "http://localhost:12345",
173                         Region:   "test-region-1",
174                         Bucket:   "test-bucket-name",
175                 },
176                 cluster: s.cluster,
177                 logger:  ctxlog.TestLogger(c),
178                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
179         }
180         err := v.check(s.metadata.URL + "/latest")
181         c.Check(err, check.IsNil)
182         creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
183         c.Check(err, check.IsNil)
184         c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
185         c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
186
187         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
188                 w.WriteHeader(http.StatusNotFound)
189         }))
190         deadv := &s3Volume{
191                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
192                         IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
193                         Endpoint: "http://localhost:12345",
194                         Region:   "test-region-1",
195                         Bucket:   "test-bucket-name",
196                 },
197                 cluster: s.cluster,
198                 logger:  ctxlog.TestLogger(c),
199                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
200         }
201         err = deadv.check(s.metadata.URL + "/latest")
202         c.Check(err, check.IsNil)
203         _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
204         c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
205         c.Check(err, check.ErrorMatches, `(?s).*404.*`)
206 }
207
208 func (s *stubbedS3Suite) TestStats(c *check.C) {
209         v := s.newTestableVolume(c, newVolumeParams{
210                 Cluster:      s.cluster,
211                 ConfigVolume: arvados.Volume{Replication: 2},
212                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
213                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
214         }, 5*time.Minute)
215         stats := func() string {
216                 buf, err := json.Marshal(v.InternalStats())
217                 c.Check(err, check.IsNil)
218                 return string(buf)
219         }
220
221         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
222
223         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
224         err := v.BlockRead(context.Background(), loc, brdiscard)
225         c.Check(err, check.NotNil)
226         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
227         c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
228         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
229
230         err = v.BlockWrite(context.Background(), loc, []byte("foo"))
231         c.Check(err, check.IsNil)
232         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
233         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
234
235         err = v.BlockRead(context.Background(), loc, brdiscard)
236         c.Check(err, check.IsNil)
237         err = v.BlockRead(context.Background(), loc, brdiscard)
238         c.Check(err, check.IsNil)
239         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
240 }
241
242 type s3AWSBlockingHandler struct {
243         requested chan *http.Request
244         unblock   chan struct{}
245 }
246
247 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
248         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
249                 // Accept PutBucket ("PUT /bucketname/"), called by
250                 // newTestableVolume
251                 return
252         }
253         if h.requested != nil {
254                 h.requested <- r
255         }
256         if h.unblock != nil {
257                 <-h.unblock
258         }
259         http.Error(w, "nothing here", http.StatusNotFound)
260 }
261
262 func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
263         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
264                 return v.BlockRead(ctx, fooHash, brdiscard)
265         })
266 }
267
268 func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
269         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
270                 return v.BlockWrite(ctx, fooHash, []byte("foo"))
271         })
272 }
273
274 func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
275         handler := &s3AWSBlockingHandler{}
276         s.s3server = httptest.NewServer(handler)
277         defer s.s3server.Close()
278
279         v := s.newTestableVolume(c, newVolumeParams{
280                 Cluster:      s.cluster,
281                 ConfigVolume: arvados.Volume{Replication: 2},
282                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
283                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
284         }, 5*time.Minute)
285
286         ctx, cancel := context.WithCancel(context.Background())
287
288         handler.requested = make(chan *http.Request)
289         handler.unblock = make(chan struct{})
290         defer close(handler.unblock)
291
292         doneFunc := make(chan struct{})
293         go func() {
294                 err := testFunc(ctx, v)
295                 c.Check(err, check.Equals, context.Canceled)
296                 close(doneFunc)
297         }()
298
299         timeout := time.After(10 * time.Second)
300
301         // Wait for the stub server to receive a request, meaning
302         // Get() is waiting for an s3 operation.
303         select {
304         case <-timeout:
305                 c.Fatal("timed out waiting for test func to call our handler")
306         case <-doneFunc:
307                 c.Fatal("test func finished without even calling our handler!")
308         case <-handler.requested:
309         }
310
311         cancel()
312
313         select {
314         case <-timeout:
315                 c.Fatal("timed out")
316         case <-doneFunc:
317         }
318 }
319
320 func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
321         s.cluster.Collections.BlobTrashLifetime.Set("1h")
322         s.cluster.Collections.BlobSigningTTL.Set("1h")
323
324         v := s.newTestableVolume(c, newVolumeParams{
325                 Cluster:      s.cluster,
326                 ConfigVolume: arvados.Volume{Replication: 2},
327                 Logger:       ctxlog.TestLogger(c),
328                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
329                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
330         }, 5*time.Minute)
331         var none time.Time
332
333         putS3Obj := func(t time.Time, key string, data []byte) {
334                 if t == none {
335                         return
336                 }
337                 v.serverClock.now = &t
338                 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
339                 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
340                         Bucket: aws.String(v.bucket.bucket),
341                         Key:    aws.String(key),
342                         Body:   bytes.NewReader(data),
343                 })
344                 if err != nil {
345                         panic(err)
346                 }
347                 v.serverClock.now = nil
348                 _, err = v.head(key)
349                 if err != nil {
350                         panic(err)
351                 }
352         }
353
354         t0 := time.Now()
355         nextKey := 0
356         for _, scenario := range []struct {
357                 label               string
358                 dataT               time.Time
359                 recentT             time.Time
360                 trashT              time.Time
361                 canGet              bool
362                 canTrash            bool
363                 canGetAfterTrash    bool
364                 canUntrash          bool
365                 haveTrashAfterEmpty bool
366                 freshAfterEmpty     bool
367         }{
368                 {
369                         "No related objects",
370                         none, none, none,
371                         false, false, false, false, false, false,
372                 },
373                 {
374                         // Stored by older version, or there was a
375                         // race between EmptyTrash and Put: Trash is a
376                         // no-op even though the data object is very
377                         // old
378                         "No recent/X",
379                         t0.Add(-48 * time.Hour), none, none,
380                         true, true, true, false, false, false,
381                 },
382                 {
383                         "Not trash, but old enough to be eligible for trash",
384                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
385                         true, true, false, false, false, false,
386                 },
387                 {
388                         "Not trash, and not old enough to be eligible for trash",
389                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
390                         true, true, true, false, false, false,
391                 },
392                 {
393                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
394                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
395                         true, true, true, true, true, false,
396                 },
397                 {
398                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
399                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
400                         true, false, true, true, true, false,
401                 },
402                 {
403                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
404                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
405                         true, false, true, true, false, false,
406                 },
407                 {
408                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
409                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
410                         true, false, true, true, true, true,
411                 },
412                 {
413                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
414                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
415                         true, true, true, true, false, false,
416                 },
417                 {
418                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
419                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
420                         true, false, true, true, false, false,
421                 },
422                 {
423                         "Trash, not yet eligible for deletion",
424                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
425                         false, false, false, true, true, false,
426                 },
427                 {
428                         "Trash, not yet eligible for deletion, prone to races",
429                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
430                         false, false, false, true, true, false,
431                 },
432                 {
433                         "Trash, eligible for deletion",
434                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
435                         false, false, false, true, false, false,
436                 },
437                 {
438                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
439                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
440                         true, false, true, true, true, false,
441                 },
442                 {
443                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
444                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
445                         true, false, true, true, true, false,
446                 },
447                 {
448                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
449                         none, none, t0.Add(-time.Minute),
450                         false, false, false, true, true, true,
451                 },
452         } {
453                 for _, prefixLength := range []int{0, 3} {
454                         v.PrefixLength = prefixLength
455                         c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
456
457                         // We have a few tests to run for each scenario, and
458                         // the tests are expected to change state. By calling
459                         // this setup func between tests, we (re)create the
460                         // scenario as specified, using a new unique block
461                         // locator to prevent interference from previous
462                         // tests.
463
464                         setupScenario := func() (string, []byte) {
465                                 nextKey++
466                                 blk := []byte(fmt.Sprintf("%d", nextKey))
467                                 loc := fmt.Sprintf("%x", md5.Sum(blk))
468                                 key := loc
469                                 if prefixLength > 0 {
470                                         key = loc[:prefixLength] + "/" + loc
471                                 }
472                                 c.Log("\t", loc, "\t", key)
473                                 putS3Obj(scenario.dataT, key, blk)
474                                 putS3Obj(scenario.recentT, "recent/"+key, nil)
475                                 putS3Obj(scenario.trashT, "trash/"+key, blk)
476                                 v.serverClock.now = &t0
477                                 return loc, blk
478                         }
479
480                         // Check canGet
481                         loc, blk := setupScenario()
482                         err := v.BlockRead(context.Background(), loc, brdiscard)
483                         c.Check(err == nil, check.Equals, scenario.canGet)
484                         if err != nil {
485                                 c.Check(os.IsNotExist(err), check.Equals, true)
486                         }
487
488                         // Call Trash, then check canTrash and canGetAfterTrash
489                         loc, _ = setupScenario()
490                         err = v.BlockTrash(loc)
491                         c.Check(err == nil, check.Equals, scenario.canTrash)
492                         err = v.BlockRead(context.Background(), loc, brdiscard)
493                         c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
494                         if err != nil {
495                                 c.Check(os.IsNotExist(err), check.Equals, true)
496                         }
497
498                         // Call Untrash, then check canUntrash
499                         loc, _ = setupScenario()
500                         err = v.BlockUntrash(loc)
501                         c.Check(err == nil, check.Equals, scenario.canUntrash)
502                         if scenario.dataT != none || scenario.trashT != none {
503                                 // In all scenarios where the data exists, we
504                                 // should be able to Get after Untrash --
505                                 // regardless of timestamps, errors, race
506                                 // conditions, etc.
507                                 err = v.BlockRead(context.Background(), loc, brdiscard)
508                                 c.Check(err, check.IsNil)
509                         }
510
511                         // Call EmptyTrash, then check haveTrashAfterEmpty and
512                         // freshAfterEmpty
513                         loc, _ = setupScenario()
514                         v.EmptyTrash()
515                         _, err = v.head("trash/" + v.key(loc))
516                         c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
517                         if scenario.freshAfterEmpty {
518                                 t, err := v.Mtime(loc)
519                                 c.Check(err, check.IsNil)
520                                 // new mtime must be current (with an
521                                 // allowance for 1s timestamp precision)
522                                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
523                         }
524
525                         // Check for current Mtime after Put (applies to all
526                         // scenarios)
527                         loc, blk = setupScenario()
528                         err = v.BlockWrite(context.Background(), loc, blk)
529                         c.Check(err, check.IsNil)
530                         t, err := v.Mtime(loc)
531                         c.Check(err, check.IsNil)
532                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
533                 }
534         }
535 }
536
537 type testableS3Volume struct {
538         *s3Volume
539         server      *httptest.Server
540         c           *check.C
541         serverClock *s3AWSFakeClock
542 }
543
544 type LogrusLog struct {
545         log *logrus.FieldLogger
546 }
547
548 func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
549         switch level {
550         case gofakes3.LogErr:
551                 (*l.log).Errorln(v...)
552         case gofakes3.LogWarn:
553                 (*l.log).Warnln(v...)
554         case gofakes3.LogInfo:
555                 (*l.log).Infoln(v...)
556         default:
557                 panic("unknown level")
558         }
559 }
560
561 func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
562
563         clock := &s3AWSFakeClock{}
564         // fake s3
565         backend := s3mem.New(s3mem.WithTimeSource(clock))
566
567         // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
568         /* logger := new(LogrusLog)
569         ctxLogger := ctxlog.FromContext(context.Background())
570         logger.log = &ctxLogger */
571         faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
572         srv := httptest.NewServer(faker.Server())
573
574         endpoint := srv.URL
575         if s.s3server != nil {
576                 endpoint = s.s3server.URL
577         }
578
579         iamRole, accessKey, secretKey := "", "xxx", "xxx"
580         if s.metadata != nil {
581                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
582         }
583
584         v := &testableS3Volume{
585                 s3Volume: &s3Volume{
586                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
587                                 IAMRole:            iamRole,
588                                 AccessKeyID:        accessKey,
589                                 SecretAccessKey:    secretKey,
590                                 Bucket:             s3TestBucketName,
591                                 Endpoint:           endpoint,
592                                 Region:             "test-region-1",
593                                 LocationConstraint: true,
594                                 UnsafeDelete:       true,
595                                 IndexPageSize:      1000,
596                         },
597                         cluster:    params.Cluster,
598                         volume:     params.ConfigVolume,
599                         logger:     params.Logger,
600                         metrics:    params.MetricsVecs,
601                         bufferPool: params.BufferPool,
602                 },
603                 c:           c,
604                 server:      srv,
605                 serverClock: clock,
606         }
607         c.Assert(v.s3Volume.check(""), check.IsNil)
608         // Our test S3 server uses the older 'Path Style'
609         v.s3Volume.bucket.svc.ForcePathStyle = true
610         // Create the testbucket
611         input := &s3.CreateBucketInput{
612                 Bucket: aws.String(s3TestBucketName),
613         }
614         req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
615         _, err := req.Send(context.Background())
616         c.Assert(err, check.IsNil)
617         // We couldn't set RaceWindow until now because check()
618         // rejects negative values.
619         v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
620         return v
621 }
622
623 func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
624         key := v.key(loc)
625         r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
626
627         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
628                 u.PartSize = 5 * 1024 * 1024
629                 u.Concurrency = 13
630         })
631
632         _, err := uploader.Upload(&s3manager.UploadInput{
633                 Bucket: aws.String(v.bucket.bucket),
634                 Key:    aws.String(key),
635                 Body:   r,
636         })
637         if err != nil {
638                 return err
639         }
640
641         empty := bytes.NewReader([]byte{})
642         _, err = uploader.Upload(&s3manager.UploadInput{
643                 Bucket: aws.String(v.bucket.bucket),
644                 Key:    aws.String("recent/" + key),
645                 Body:   empty,
646         })
647         return err
648 }
649
650 // TouchWithDate turns back the clock while doing a Touch(). We assume
651 // there are no other operations happening on the same s3test server
652 // while we do this.
653 func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
654         v.serverClock.now = &lastPut
655
656         uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
657         empty := bytes.NewReader([]byte{})
658         _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
659                 Bucket: aws.String(v.bucket.bucket),
660                 Key:    aws.String("recent/" + v.key(loc)),
661                 Body:   empty,
662         })
663         if err != nil {
664                 panic(err)
665         }
666
667         v.serverClock.now = nil
668 }
669
670 func (v *testableS3Volume) Teardown() {
671         v.server.Close()
672 }
673
674 func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
675         return "get", "put"
676 }