Merge branch 'master' into 10477-upgrade-aws-s3-driver
[arvados.git] / services / keepstore / s3aws_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22
23         "github.com/aws/aws-sdk-go-v2/aws"
24         "github.com/aws/aws-sdk-go-v2/service/s3"
25         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
26
27         "github.com/johannesboyne/gofakes3"
28         "github.com/johannesboyne/gofakes3/backend/s3mem"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31         check "gopkg.in/check.v1"
32 )
33
34 const (
35         S3AWSTestBucketName = "testbucket"
36 )
37
38 type s3AWSFakeClock struct {
39         now *time.Time
40 }
41
42 func (c *s3AWSFakeClock) Now() time.Time {
43         if c.now == nil {
44                 return time.Now().UTC()
45         }
46         return c.now.UTC()
47 }
48
49 func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
50         return c.Now().Sub(t)
51 }
52
53 var _ = check.Suite(&StubbedS3AWSSuite{})
54
55 var srv httptest.Server
56
57 type StubbedS3AWSSuite struct {
58         s3server *httptest.Server
59         metadata *httptest.Server
60         cluster  *arvados.Cluster
61         handler  *handler
62         volumes  []*TestableS3AWSVolume
63 }
64
65 func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
66         s.s3server = nil
67         s.metadata = nil
68         s.cluster = testCluster(c)
69         s.cluster.Volumes = map[string]arvados.Volume{
70                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
71                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
72         }
73         s.handler = &handler{}
74 }
75
76 func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
77         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
78                 // Use a negative raceWindow so s3test's 1-second
79                 // timestamp precision doesn't confuse fixRace.
80                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
81         })
82 }
83
84 func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
85         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
86                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
87         })
88 }
89
90 func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
91         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
92         v.IndexPageSize = 3
93         for i := 0; i < 256; i++ {
94                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
95         }
96         for _, spec := range []struct {
97                 prefix      string
98                 expectMatch int
99         }{
100                 {"", 256},
101                 {"c", 16},
102                 {"bc", 1},
103                 {"abc", 0},
104         } {
105                 buf := new(bytes.Buffer)
106                 err := v.IndexTo(spec.prefix, buf)
107                 c.Check(err, check.IsNil)
108
109                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
110                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
111                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
112         }
113 }
114
115 func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
116         var header http.Header
117         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
118                 header = r.Header
119         }))
120         defer stub.Close()
121
122         // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
123         // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
124         vol := S3AWSVolume{
125                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
126                         AccessKey: "xxx",
127                         SecretKey: "xxx",
128                         Endpoint:  stub.URL,
129                         Region:    "test-region-1",
130                         Bucket:    "test-bucket-name",
131                 },
132                 cluster: s.cluster,
133                 logger:  ctxlog.TestLogger(c),
134                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
135         }
136         err := vol.check("")
137         // Our test S3 server uses the older 'Path Style'
138         vol.bucket.svc.ForcePathStyle = true
139
140         c.Check(err, check.IsNil)
141         err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
142         c.Check(err, check.IsNil)
143         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
144 }
145
146 func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
147         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
148                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
149                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
150                 // Literal example from
151                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
152                 // but with updated timestamps
153                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
154         }))
155         defer s.metadata.Close()
156
157         v := &S3AWSVolume{
158                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
159                         IAMRole:  s.metadata.URL + "/latest/api/token",
160                         Endpoint: "http://localhost:12345",
161                         Region:   "test-region-1",
162                         Bucket:   "test-bucket-name",
163                 },
164                 cluster: s.cluster,
165                 logger:  ctxlog.TestLogger(c),
166                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
167         }
168         err := v.check(s.metadata.URL + "/latest")
169         creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
170         c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
171         c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
172
173         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
174                 w.WriteHeader(http.StatusNotFound)
175         }))
176         deadv := &S3AWSVolume{
177                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
178                         IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
179                         Endpoint: "http://localhost:12345",
180                         Region:   "test-region-1",
181                         Bucket:   "test-bucket-name",
182                 },
183                 cluster: s.cluster,
184                 logger:  ctxlog.TestLogger(c),
185                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
186         }
187         err = deadv.check(s.metadata.URL + "/latest")
188         _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
189         c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
190         c.Check(err, check.ErrorMatches, `(?s).*404.*`)
191 }
192
193 func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
194         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
195         stats := func() string {
196                 buf, err := json.Marshal(v.InternalStats())
197                 c.Check(err, check.IsNil)
198                 return string(buf)
199         }
200
201         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
202
203         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
204         _, err := v.Get(context.Background(), loc, make([]byte, 3))
205         c.Check(err, check.NotNil)
206         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
207         c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
208         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
209
210         err = v.Put(context.Background(), loc, []byte("foo"))
211         c.Check(err, check.IsNil)
212         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
213         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
214
215         _, err = v.Get(context.Background(), loc, make([]byte, 3))
216         c.Check(err, check.IsNil)
217         _, err = v.Get(context.Background(), loc, make([]byte, 3))
218         c.Check(err, check.IsNil)
219         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
220 }
221
222 type s3AWSBlockingHandler struct {
223         requested chan *http.Request
224         unblock   chan struct{}
225 }
226
227 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
228         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
229                 // Accept PutBucket ("PUT /bucketname/"), called by
230                 // newTestableVolume
231                 return
232         }
233         if h.requested != nil {
234                 h.requested <- r
235         }
236         if h.unblock != nil {
237                 <-h.unblock
238         }
239         http.Error(w, "nothing here", http.StatusNotFound)
240 }
241
242 func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
243         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
244         buf := make([]byte, 3)
245
246         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
247                 _, err := v.Get(ctx, loc, buf)
248                 return err
249         })
250 }
251
252 func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
253         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
254         buf := []byte("bar")
255
256         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
257                 return v.Compare(ctx, loc, buf)
258         })
259 }
260
261 func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
262         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
263         buf := []byte("foo")
264
265         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
266                 return v.Put(ctx, loc, buf)
267         })
268 }
269
270 func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
271         handler := &s3AWSBlockingHandler{}
272         s.s3server = httptest.NewServer(handler)
273         defer s.s3server.Close()
274
275         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
276
277         ctx, cancel := context.WithCancel(context.Background())
278
279         handler.requested = make(chan *http.Request)
280         handler.unblock = make(chan struct{})
281         defer close(handler.unblock)
282
283         doneFunc := make(chan struct{})
284         go func() {
285                 err := testFunc(ctx, v)
286                 c.Check(err, check.Equals, context.Canceled)
287                 close(doneFunc)
288         }()
289
290         timeout := time.After(10 * time.Second)
291
292         // Wait for the stub server to receive a request, meaning
293         // Get() is waiting for an s3 operation.
294         select {
295         case <-timeout:
296                 c.Fatal("timed out waiting for test func to call our handler")
297         case <-doneFunc:
298                 c.Fatal("test func finished without even calling our handler!")
299         case <-handler.requested:
300         }
301
302         cancel()
303
304         select {
305         case <-timeout:
306                 c.Fatal("timed out")
307         case <-doneFunc:
308         }
309 }
310
311 func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
312         s.cluster.Collections.BlobTrashLifetime.Set("1h")
313         s.cluster.Collections.BlobSigningTTL.Set("1h")
314
315         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
316         var none time.Time
317
318         putS3Obj := func(t time.Time, key string, data []byte) {
319                 if t == none {
320                         return
321                 }
322                 v.serverClock.now = &t
323                 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
324                 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
325                         Bucket: aws.String(v.bucket.bucket),
326                         Key:    aws.String(key),
327                         Body:   bytes.NewReader(data),
328                 })
329                 if err != nil {
330                         panic(err)
331                 }
332                 v.serverClock.now = nil
333                 _, err = v.Head(key)
334                 if err != nil {
335                         panic(err)
336                 }
337         }
338
339         t0 := time.Now()
340         nextKey := 0
341         for _, scenario := range []struct {
342                 label               string
343                 dataT               time.Time
344                 recentT             time.Time
345                 trashT              time.Time
346                 canGet              bool
347                 canTrash            bool
348                 canGetAfterTrash    bool
349                 canUntrash          bool
350                 haveTrashAfterEmpty bool
351                 freshAfterEmpty     bool
352         }{
353                 {
354                         "No related objects",
355                         none, none, none,
356                         false, false, false, false, false, false,
357                 },
358                 {
359                         // Stored by older version, or there was a
360                         // race between EmptyTrash and Put: Trash is a
361                         // no-op even though the data object is very
362                         // old
363                         "No recent/X",
364                         t0.Add(-48 * time.Hour), none, none,
365                         true, true, true, false, false, false,
366                 },
367                 {
368                         "Not trash, but old enough to be eligible for trash",
369                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
370                         true, true, false, false, false, false,
371                 },
372                 {
373                         "Not trash, and not old enough to be eligible for trash",
374                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
375                         true, true, true, false, false, false,
376                 },
377                 {
378                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
379                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
380                         true, true, true, true, true, false,
381                 },
382                 {
383                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
384                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
385                         true, false, true, true, true, false,
386                 },
387                 {
388                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
389                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
390                         true, false, true, true, false, false,
391                 },
392                 {
393                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
394                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
395                         true, false, true, true, true, true,
396                 },
397                 {
398                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
399                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
400                         true, true, true, true, false, false,
401                 },
402                 {
403                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
404                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
405                         true, false, true, true, false, false,
406                 },
407                 {
408                         "Trash, not yet eligible for deletion",
409                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
410                         false, false, false, true, true, false,
411                 },
412                 {
413                         "Trash, not yet eligible for deletion, prone to races",
414                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
415                         false, false, false, true, true, false,
416                 },
417                 {
418                         "Trash, eligible for deletion",
419                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
420                         false, false, false, true, false, false,
421                 },
422                 {
423                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
424                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
425                         true, false, true, true, true, false,
426                 },
427                 {
428                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
429                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
430                         true, false, true, true, true, false,
431                 },
432                 {
433                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
434                         none, none, t0.Add(-time.Minute),
435                         false, false, false, true, true, true,
436                 },
437         } {
438                 c.Log("Scenario: ", scenario.label)
439
440                 // We have a few tests to run for each scenario, and
441                 // the tests are expected to change state. By calling
442                 // this setup func between tests, we (re)create the
443                 // scenario as specified, using a new unique block
444                 // locator to prevent interference from previous
445                 // tests.
446
447                 setupScenario := func() (string, []byte) {
448                         nextKey++
449                         blk := []byte(fmt.Sprintf("%d", nextKey))
450                         loc := fmt.Sprintf("%x", md5.Sum(blk))
451                         c.Log("\t", loc)
452                         putS3Obj(scenario.dataT, loc, blk)
453                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
454                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
455                         v.serverClock.now = &t0
456                         return loc, blk
457                 }
458
459                 // Check canGet
460                 loc, blk := setupScenario()
461                 buf := make([]byte, len(blk))
462                 _, err := v.Get(context.Background(), loc, buf)
463                 c.Check(err == nil, check.Equals, scenario.canGet)
464                 if err != nil {
465                         c.Check(os.IsNotExist(err), check.Equals, true)
466                 }
467
468                 // Call Trash, then check canTrash and canGetAfterTrash
469                 loc, _ = setupScenario()
470                 err = v.Trash(loc)
471                 c.Check(err == nil, check.Equals, scenario.canTrash)
472                 _, err = v.Get(context.Background(), loc, buf)
473                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
474                 if err != nil {
475                         c.Check(os.IsNotExist(err), check.Equals, true)
476                 }
477
478                 // Call Untrash, then check canUntrash
479                 loc, _ = setupScenario()
480                 err = v.Untrash(loc)
481                 c.Check(err == nil, check.Equals, scenario.canUntrash)
482                 if scenario.dataT != none || scenario.trashT != none {
483                         // In all scenarios where the data exists, we
484                         // should be able to Get after Untrash --
485                         // regardless of timestamps, errors, race
486                         // conditions, etc.
487                         _, err = v.Get(context.Background(), loc, buf)
488                         c.Check(err, check.IsNil)
489                 }
490
491                 // Call EmptyTrash, then check haveTrashAfterEmpty and
492                 // freshAfterEmpty
493                 loc, _ = setupScenario()
494                 v.EmptyTrash()
495                 _, err = v.Head("trash/" + loc)
496                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
497                 if scenario.freshAfterEmpty {
498                         t, err := v.Mtime(loc)
499                         c.Check(err, check.IsNil)
500                         // new mtime must be current (with an
501                         // allowance for 1s timestamp precision)
502                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
503                 }
504
505                 // Check for current Mtime after Put (applies to all
506                 // scenarios)
507                 loc, blk = setupScenario()
508                 err = v.Put(context.Background(), loc, blk)
509                 c.Check(err, check.IsNil)
510                 t, err := v.Mtime(loc)
511                 c.Check(err, check.IsNil)
512                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
513         }
514 }
515
516 type TestableS3AWSVolume struct {
517         *S3AWSVolume
518         server      *httptest.Server
519         c           *check.C
520         serverClock *s3AWSFakeClock
521 }
522
523 type LogrusLog struct {
524         log *logrus.FieldLogger
525 }
526
527 func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
528         switch level {
529         case gofakes3.LogErr:
530                 (*l.log).Errorln(v...)
531         case gofakes3.LogWarn:
532                 (*l.log).Warnln(v...)
533         case gofakes3.LogInfo:
534                 (*l.log).Infoln(v...)
535         default:
536                 panic("unknown level")
537         }
538 }
539
540 func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
541
542         clock := &s3AWSFakeClock{}
543         // fake s3
544         backend := s3mem.New(s3mem.WithTimeSource(clock))
545
546         // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
547         /* logger := new(LogrusLog)
548         ctxLogger := ctxlog.FromContext(context.Background())
549         logger.log = &ctxLogger */
550         faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
551         srv := httptest.NewServer(faker.Server())
552
553         endpoint := srv.URL
554         if s.s3server != nil {
555                 endpoint = s.s3server.URL
556         }
557
558         iamRole, accessKey, secretKey := "", "xxx", "xxx"
559         if s.metadata != nil {
560                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
561         }
562
563         v := &TestableS3AWSVolume{
564                 S3AWSVolume: &S3AWSVolume{
565                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
566                                 IAMRole:            iamRole,
567                                 AccessKey:          accessKey,
568                                 SecretKey:          secretKey,
569                                 Bucket:             S3AWSTestBucketName,
570                                 Endpoint:           endpoint,
571                                 Region:             "test-region-1",
572                                 LocationConstraint: true,
573                                 UnsafeDelete:       true,
574                                 IndexPageSize:      1000,
575                         },
576                         cluster: cluster,
577                         volume:  volume,
578                         logger:  ctxlog.TestLogger(c),
579                         metrics: metrics,
580                 },
581                 c:           c,
582                 server:      srv,
583                 serverClock: clock,
584         }
585         c.Assert(v.S3AWSVolume.check(""), check.IsNil)
586         // Our test S3 server uses the older 'Path Style'
587         v.S3AWSVolume.bucket.svc.ForcePathStyle = true
588         // Create the testbucket
589         input := &s3.CreateBucketInput{
590                 Bucket: aws.String(S3AWSTestBucketName),
591         }
592         req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
593         _, err := req.Send(context.Background())
594         c.Assert(err, check.IsNil)
595         // We couldn't set RaceWindow until now because check()
596         // rejects negative values.
597         v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
598         return v
599 }
600
601 // PutRaw skips the ContentMD5 test
602 func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
603
604         r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
605
606         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
607                 u.PartSize = 5 * 1024 * 1024
608                 u.Concurrency = 13
609         })
610
611         _, err := uploader.Upload(&s3manager.UploadInput{
612                 Bucket: aws.String(v.bucket.bucket),
613                 Key:    aws.String(loc),
614                 Body:   r,
615         })
616         if err != nil {
617                 v.logger.Printf("PutRaw: %s: %+v", loc, err)
618         }
619
620         empty := bytes.NewReader([]byte{})
621         _, err = uploader.Upload(&s3manager.UploadInput{
622                 Bucket: aws.String(v.bucket.bucket),
623                 Key:    aws.String("recent/" + loc),
624                 Body:   empty,
625         })
626         if err != nil {
627                 v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
628         }
629 }
630
631 // TouchWithDate turns back the clock while doing a Touch(). We assume
632 // there are no other operations happening on the same s3test server
633 // while we do this.
634 func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
635         v.serverClock.now = &lastPut
636
637         uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
638         empty := bytes.NewReader([]byte{})
639         _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
640                 Bucket: aws.String(v.bucket.bucket),
641                 Key:    aws.String("recent/" + locator),
642                 Body:   empty,
643         })
644         if err != nil {
645                 panic(err)
646         }
647
648         v.serverClock.now = nil
649 }
650
651 func (v *TestableS3AWSVolume) Teardown() {
652         v.server.Close()
653 }
654
655 func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
656         return "get", "put"
657 }