17857: Fix bsub args so containers can use multiple CPU cores.
[arvados.git] / services / keepstore / s3aws_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22
23         "github.com/aws/aws-sdk-go-v2/aws"
24         "github.com/aws/aws-sdk-go-v2/service/s3"
25         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
26
27         "github.com/johannesboyne/gofakes3"
28         "github.com/johannesboyne/gofakes3/backend/s3mem"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31         check "gopkg.in/check.v1"
32 )
33
34 const (
35         S3AWSTestBucketName = "testbucket"
36 )
37
38 type s3AWSFakeClock struct {
39         now *time.Time
40 }
41
42 func (c *s3AWSFakeClock) Now() time.Time {
43         if c.now == nil {
44                 return time.Now().UTC()
45         }
46         return c.now.UTC()
47 }
48
49 func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
50         return c.Now().Sub(t)
51 }
52
53 var _ = check.Suite(&StubbedS3AWSSuite{})
54
55 var srv httptest.Server
56
57 type StubbedS3AWSSuite struct {
58         s3server *httptest.Server
59         metadata *httptest.Server
60         cluster  *arvados.Cluster
61         handler  *handler
62         volumes  []*TestableS3AWSVolume
63 }
64
65 func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
66         s.s3server = nil
67         s.metadata = nil
68         s.cluster = testCluster(c)
69         s.cluster.Volumes = map[string]arvados.Volume{
70                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
71                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
72         }
73         s.handler = &handler{}
74 }
75
76 func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
77         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
78                 // Use a negative raceWindow so s3test's 1-second
79                 // timestamp precision doesn't confuse fixRace.
80                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
81         })
82 }
83
84 func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
85         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
86                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
87         })
88 }
89
90 func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
91         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
92         v.IndexPageSize = 3
93         for i := 0; i < 256; i++ {
94                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
95         }
96         for _, spec := range []struct {
97                 prefix      string
98                 expectMatch int
99         }{
100                 {"", 256},
101                 {"c", 16},
102                 {"bc", 1},
103                 {"abc", 0},
104         } {
105                 buf := new(bytes.Buffer)
106                 err := v.IndexTo(spec.prefix, buf)
107                 c.Check(err, check.IsNil)
108
109                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
110                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
111                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
112         }
113 }
114
115 func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
116         var header http.Header
117         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
118                 header = r.Header
119         }))
120         defer stub.Close()
121
122         // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
123         // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
124         vol := S3AWSVolume{
125                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
126                         AccessKeyID:     "xxx",
127                         SecretAccessKey: "xxx",
128                         Endpoint:        stub.URL,
129                         Region:          "test-region-1",
130                         Bucket:          "test-bucket-name",
131                 },
132                 cluster: s.cluster,
133                 logger:  ctxlog.TestLogger(c),
134                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
135         }
136         err := vol.check("")
137         // Our test S3 server uses the older 'Path Style'
138         vol.bucket.svc.ForcePathStyle = true
139
140         c.Check(err, check.IsNil)
141         err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
142         c.Check(err, check.IsNil)
143         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
144 }
145
146 func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
147         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
148                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
149                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
150                 // Literal example from
151                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
152                 // but with updated timestamps
153                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
154         }))
155         defer s.metadata.Close()
156
157         v := &S3AWSVolume{
158                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
159                         IAMRole:  s.metadata.URL + "/latest/api/token",
160                         Endpoint: "http://localhost:12345",
161                         Region:   "test-region-1",
162                         Bucket:   "test-bucket-name",
163                 },
164                 cluster: s.cluster,
165                 logger:  ctxlog.TestLogger(c),
166                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
167         }
168         err := v.check(s.metadata.URL + "/latest")
169         c.Check(err, check.IsNil)
170         creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
171         c.Check(err, check.IsNil)
172         c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
173         c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
174
175         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
176                 w.WriteHeader(http.StatusNotFound)
177         }))
178         deadv := &S3AWSVolume{
179                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
180                         IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
181                         Endpoint: "http://localhost:12345",
182                         Region:   "test-region-1",
183                         Bucket:   "test-bucket-name",
184                 },
185                 cluster: s.cluster,
186                 logger:  ctxlog.TestLogger(c),
187                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
188         }
189         err = deadv.check(s.metadata.URL + "/latest")
190         c.Check(err, check.IsNil)
191         _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
192         c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
193         c.Check(err, check.ErrorMatches, `(?s).*404.*`)
194 }
195
196 func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
197         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
198         stats := func() string {
199                 buf, err := json.Marshal(v.InternalStats())
200                 c.Check(err, check.IsNil)
201                 return string(buf)
202         }
203
204         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
205
206         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
207         _, err := v.Get(context.Background(), loc, make([]byte, 3))
208         c.Check(err, check.NotNil)
209         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
210         c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
211         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
212
213         err = v.Put(context.Background(), loc, []byte("foo"))
214         c.Check(err, check.IsNil)
215         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
216         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
217
218         _, err = v.Get(context.Background(), loc, make([]byte, 3))
219         c.Check(err, check.IsNil)
220         _, err = v.Get(context.Background(), loc, make([]byte, 3))
221         c.Check(err, check.IsNil)
222         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
223 }
224
225 type s3AWSBlockingHandler struct {
226         requested chan *http.Request
227         unblock   chan struct{}
228 }
229
230 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
231         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
232                 // Accept PutBucket ("PUT /bucketname/"), called by
233                 // newTestableVolume
234                 return
235         }
236         if h.requested != nil {
237                 h.requested <- r
238         }
239         if h.unblock != nil {
240                 <-h.unblock
241         }
242         http.Error(w, "nothing here", http.StatusNotFound)
243 }
244
245 func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
246         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
247         buf := make([]byte, 3)
248
249         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
250                 _, err := v.Get(ctx, loc, buf)
251                 return err
252         })
253 }
254
255 func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
256         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
257         buf := []byte("bar")
258
259         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
260                 return v.Compare(ctx, loc, buf)
261         })
262 }
263
264 func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
265         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
266         buf := []byte("foo")
267
268         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
269                 return v.Put(ctx, loc, buf)
270         })
271 }
272
273 func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
274         handler := &s3AWSBlockingHandler{}
275         s.s3server = httptest.NewServer(handler)
276         defer s.s3server.Close()
277
278         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
279
280         ctx, cancel := context.WithCancel(context.Background())
281
282         handler.requested = make(chan *http.Request)
283         handler.unblock = make(chan struct{})
284         defer close(handler.unblock)
285
286         doneFunc := make(chan struct{})
287         go func() {
288                 err := testFunc(ctx, v)
289                 c.Check(err, check.Equals, context.Canceled)
290                 close(doneFunc)
291         }()
292
293         timeout := time.After(10 * time.Second)
294
295         // Wait for the stub server to receive a request, meaning
296         // Get() is waiting for an s3 operation.
297         select {
298         case <-timeout:
299                 c.Fatal("timed out waiting for test func to call our handler")
300         case <-doneFunc:
301                 c.Fatal("test func finished without even calling our handler!")
302         case <-handler.requested:
303         }
304
305         cancel()
306
307         select {
308         case <-timeout:
309                 c.Fatal("timed out")
310         case <-doneFunc:
311         }
312 }
313
314 func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
315         s.cluster.Collections.BlobTrashLifetime.Set("1h")
316         s.cluster.Collections.BlobSigningTTL.Set("1h")
317
318         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
319         var none time.Time
320
321         putS3Obj := func(t time.Time, key string, data []byte) {
322                 if t == none {
323                         return
324                 }
325                 v.serverClock.now = &t
326                 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
327                 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
328                         Bucket: aws.String(v.bucket.bucket),
329                         Key:    aws.String(key),
330                         Body:   bytes.NewReader(data),
331                 })
332                 if err != nil {
333                         panic(err)
334                 }
335                 v.serverClock.now = nil
336                 _, err = v.Head(key)
337                 if err != nil {
338                         panic(err)
339                 }
340         }
341
342         t0 := time.Now()
343         nextKey := 0
344         for _, scenario := range []struct {
345                 label               string
346                 dataT               time.Time
347                 recentT             time.Time
348                 trashT              time.Time
349                 canGet              bool
350                 canTrash            bool
351                 canGetAfterTrash    bool
352                 canUntrash          bool
353                 haveTrashAfterEmpty bool
354                 freshAfterEmpty     bool
355         }{
356                 {
357                         "No related objects",
358                         none, none, none,
359                         false, false, false, false, false, false,
360                 },
361                 {
362                         // Stored by older version, or there was a
363                         // race between EmptyTrash and Put: Trash is a
364                         // no-op even though the data object is very
365                         // old
366                         "No recent/X",
367                         t0.Add(-48 * time.Hour), none, none,
368                         true, true, true, false, false, false,
369                 },
370                 {
371                         "Not trash, but old enough to be eligible for trash",
372                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
373                         true, true, false, false, false, false,
374                 },
375                 {
376                         "Not trash, and not old enough to be eligible for trash",
377                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
378                         true, true, true, false, false, false,
379                 },
380                 {
381                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
382                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
383                         true, true, true, true, true, false,
384                 },
385                 {
386                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
387                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
388                         true, false, true, true, true, false,
389                 },
390                 {
391                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
392                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
393                         true, false, true, true, false, false,
394                 },
395                 {
396                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
397                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
398                         true, false, true, true, true, true,
399                 },
400                 {
401                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
402                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
403                         true, true, true, true, false, false,
404                 },
405                 {
406                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
407                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
408                         true, false, true, true, false, false,
409                 },
410                 {
411                         "Trash, not yet eligible for deletion",
412                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
413                         false, false, false, true, true, false,
414                 },
415                 {
416                         "Trash, not yet eligible for deletion, prone to races",
417                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
418                         false, false, false, true, true, false,
419                 },
420                 {
421                         "Trash, eligible for deletion",
422                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
423                         false, false, false, true, false, false,
424                 },
425                 {
426                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
427                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
428                         true, false, true, true, true, false,
429                 },
430                 {
431                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
432                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
433                         true, false, true, true, true, false,
434                 },
435                 {
436                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
437                         none, none, t0.Add(-time.Minute),
438                         false, false, false, true, true, true,
439                 },
440         } {
441                 c.Log("Scenario: ", scenario.label)
442
443                 // We have a few tests to run for each scenario, and
444                 // the tests are expected to change state. By calling
445                 // this setup func between tests, we (re)create the
446                 // scenario as specified, using a new unique block
447                 // locator to prevent interference from previous
448                 // tests.
449
450                 setupScenario := func() (string, []byte) {
451                         nextKey++
452                         blk := []byte(fmt.Sprintf("%d", nextKey))
453                         loc := fmt.Sprintf("%x", md5.Sum(blk))
454                         c.Log("\t", loc)
455                         putS3Obj(scenario.dataT, loc, blk)
456                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
457                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
458                         v.serverClock.now = &t0
459                         return loc, blk
460                 }
461
462                 // Check canGet
463                 loc, blk := setupScenario()
464                 buf := make([]byte, len(blk))
465                 _, err := v.Get(context.Background(), loc, buf)
466                 c.Check(err == nil, check.Equals, scenario.canGet)
467                 if err != nil {
468                         c.Check(os.IsNotExist(err), check.Equals, true)
469                 }
470
471                 // Call Trash, then check canTrash and canGetAfterTrash
472                 loc, _ = setupScenario()
473                 err = v.Trash(loc)
474                 c.Check(err == nil, check.Equals, scenario.canTrash)
475                 _, err = v.Get(context.Background(), loc, buf)
476                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
477                 if err != nil {
478                         c.Check(os.IsNotExist(err), check.Equals, true)
479                 }
480
481                 // Call Untrash, then check canUntrash
482                 loc, _ = setupScenario()
483                 err = v.Untrash(loc)
484                 c.Check(err == nil, check.Equals, scenario.canUntrash)
485                 if scenario.dataT != none || scenario.trashT != none {
486                         // In all scenarios where the data exists, we
487                         // should be able to Get after Untrash --
488                         // regardless of timestamps, errors, race
489                         // conditions, etc.
490                         _, err = v.Get(context.Background(), loc, buf)
491                         c.Check(err, check.IsNil)
492                 }
493
494                 // Call EmptyTrash, then check haveTrashAfterEmpty and
495                 // freshAfterEmpty
496                 loc, _ = setupScenario()
497                 v.EmptyTrash()
498                 _, err = v.Head("trash/" + loc)
499                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
500                 if scenario.freshAfterEmpty {
501                         t, err := v.Mtime(loc)
502                         c.Check(err, check.IsNil)
503                         // new mtime must be current (with an
504                         // allowance for 1s timestamp precision)
505                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
506                 }
507
508                 // Check for current Mtime after Put (applies to all
509                 // scenarios)
510                 loc, blk = setupScenario()
511                 err = v.Put(context.Background(), loc, blk)
512                 c.Check(err, check.IsNil)
513                 t, err := v.Mtime(loc)
514                 c.Check(err, check.IsNil)
515                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
516         }
517 }
518
519 type TestableS3AWSVolume struct {
520         *S3AWSVolume
521         server      *httptest.Server
522         c           *check.C
523         serverClock *s3AWSFakeClock
524 }
525
526 type LogrusLog struct {
527         log *logrus.FieldLogger
528 }
529
530 func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
531         switch level {
532         case gofakes3.LogErr:
533                 (*l.log).Errorln(v...)
534         case gofakes3.LogWarn:
535                 (*l.log).Warnln(v...)
536         case gofakes3.LogInfo:
537                 (*l.log).Infoln(v...)
538         default:
539                 panic("unknown level")
540         }
541 }
542
543 func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
544
545         clock := &s3AWSFakeClock{}
546         // fake s3
547         backend := s3mem.New(s3mem.WithTimeSource(clock))
548
549         // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
550         /* logger := new(LogrusLog)
551         ctxLogger := ctxlog.FromContext(context.Background())
552         logger.log = &ctxLogger */
553         faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
554         srv := httptest.NewServer(faker.Server())
555
556         endpoint := srv.URL
557         if s.s3server != nil {
558                 endpoint = s.s3server.URL
559         }
560
561         iamRole, accessKey, secretKey := "", "xxx", "xxx"
562         if s.metadata != nil {
563                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
564         }
565
566         v := &TestableS3AWSVolume{
567                 S3AWSVolume: &S3AWSVolume{
568                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
569                                 IAMRole:            iamRole,
570                                 AccessKeyID:        accessKey,
571                                 SecretAccessKey:    secretKey,
572                                 Bucket:             S3AWSTestBucketName,
573                                 Endpoint:           endpoint,
574                                 Region:             "test-region-1",
575                                 LocationConstraint: true,
576                                 UnsafeDelete:       true,
577                                 IndexPageSize:      1000,
578                         },
579                         cluster: cluster,
580                         volume:  volume,
581                         logger:  ctxlog.TestLogger(c),
582                         metrics: metrics,
583                 },
584                 c:           c,
585                 server:      srv,
586                 serverClock: clock,
587         }
588         c.Assert(v.S3AWSVolume.check(""), check.IsNil)
589         // Our test S3 server uses the older 'Path Style'
590         v.S3AWSVolume.bucket.svc.ForcePathStyle = true
591         // Create the testbucket
592         input := &s3.CreateBucketInput{
593                 Bucket: aws.String(S3AWSTestBucketName),
594         }
595         req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
596         _, err := req.Send(context.Background())
597         c.Assert(err, check.IsNil)
598         // We couldn't set RaceWindow until now because check()
599         // rejects negative values.
600         v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
601         return v
602 }
603
604 // PutRaw skips the ContentMD5 test
605 func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
606
607         r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
608
609         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
610                 u.PartSize = 5 * 1024 * 1024
611                 u.Concurrency = 13
612         })
613
614         _, err := uploader.Upload(&s3manager.UploadInput{
615                 Bucket: aws.String(v.bucket.bucket),
616                 Key:    aws.String(loc),
617                 Body:   r,
618         })
619         if err != nil {
620                 v.logger.Printf("PutRaw: %s: %+v", loc, err)
621         }
622
623         empty := bytes.NewReader([]byte{})
624         _, err = uploader.Upload(&s3manager.UploadInput{
625                 Bucket: aws.String(v.bucket.bucket),
626                 Key:    aws.String("recent/" + loc),
627                 Body:   empty,
628         })
629         if err != nil {
630                 v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
631         }
632 }
633
634 // TouchWithDate turns back the clock while doing a Touch(). We assume
635 // there are no other operations happening on the same s3test server
636 // while we do this.
637 func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
638         v.serverClock.now = &lastPut
639
640         uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
641         empty := bytes.NewReader([]byte{})
642         _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
643                 Bucket: aws.String(v.bucket.bucket),
644                 Key:    aws.String("recent/" + locator),
645                 Body:   empty,
646         })
647         if err != nil {
648                 panic(err)
649         }
650
651         v.serverClock.now = nil
652 }
653
654 func (v *TestableS3AWSVolume) Teardown() {
655         v.server.Close()
656 }
657
658 func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
659         return "get", "put"
660 }