c18941: fix cache_slot_get behavior
[arvados.git] / services / keepstore / s3aws_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22
23         "github.com/aws/aws-sdk-go-v2/aws"
24         "github.com/aws/aws-sdk-go-v2/service/s3"
25         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
26
27         "github.com/johannesboyne/gofakes3"
28         "github.com/johannesboyne/gofakes3/backend/s3mem"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31         check "gopkg.in/check.v1"
32 )
33
34 const (
35         S3AWSTestBucketName = "testbucket"
36 )
37
38 type s3AWSFakeClock struct {
39         now *time.Time
40 }
41
42 func (c *s3AWSFakeClock) Now() time.Time {
43         if c.now == nil {
44                 return time.Now().UTC()
45         }
46         return c.now.UTC()
47 }
48
49 func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
50         return c.Now().Sub(t)
51 }
52
53 var _ = check.Suite(&StubbedS3AWSSuite{})
54
55 var srv httptest.Server
56
57 type StubbedS3AWSSuite struct {
58         s3server *httptest.Server
59         metadata *httptest.Server
60         cluster  *arvados.Cluster
61         handler  *handler
62         volumes  []*TestableS3AWSVolume
63 }
64
65 func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
66         s.s3server = nil
67         s.metadata = nil
68         s.cluster = testCluster(c)
69         s.cluster.Volumes = map[string]arvados.Volume{
70                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
71                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
72         }
73         s.handler = &handler{}
74 }
75
76 func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
77         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
78                 // Use a negative raceWindow so s3test's 1-second
79                 // timestamp precision doesn't confuse fixRace.
80                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
81         })
82 }
83
84 func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
85         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
86                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
87         })
88 }
89
90 func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
91         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
92                 v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
93                 v.PrefixLength = 3
94                 return v
95         })
96 }
97
98 func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
99         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
100         v.IndexPageSize = 3
101         for i := 0; i < 256; i++ {
102                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
103         }
104         for _, spec := range []struct {
105                 prefix      string
106                 expectMatch int
107         }{
108                 {"", 256},
109                 {"c", 16},
110                 {"bc", 1},
111                 {"abc", 0},
112         } {
113                 buf := new(bytes.Buffer)
114                 err := v.IndexTo(spec.prefix, buf)
115                 c.Check(err, check.IsNil)
116
117                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
118                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
119                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
120         }
121 }
122
123 func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
124         var header http.Header
125         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
126                 header = r.Header
127         }))
128         defer stub.Close()
129
130         // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
131         // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
132         vol := S3AWSVolume{
133                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
134                         AccessKeyID:     "xxx",
135                         SecretAccessKey: "xxx",
136                         Endpoint:        stub.URL,
137                         Region:          "test-region-1",
138                         Bucket:          "test-bucket-name",
139                 },
140                 cluster: s.cluster,
141                 logger:  ctxlog.TestLogger(c),
142                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
143         }
144         err := vol.check("")
145         // Our test S3 server uses the older 'Path Style'
146         vol.bucket.svc.ForcePathStyle = true
147
148         c.Check(err, check.IsNil)
149         err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
150         c.Check(err, check.IsNil)
151         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
152 }
153
154 func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
155         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
156                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
157                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
158                 // Literal example from
159                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
160                 // but with updated timestamps
161                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
162         }))
163         defer s.metadata.Close()
164
165         v := &S3AWSVolume{
166                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
167                         IAMRole:  s.metadata.URL + "/latest/api/token",
168                         Endpoint: "http://localhost:12345",
169                         Region:   "test-region-1",
170                         Bucket:   "test-bucket-name",
171                 },
172                 cluster: s.cluster,
173                 logger:  ctxlog.TestLogger(c),
174                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
175         }
176         err := v.check(s.metadata.URL + "/latest")
177         c.Check(err, check.IsNil)
178         creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
179         c.Check(err, check.IsNil)
180         c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
181         c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
182
183         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
184                 w.WriteHeader(http.StatusNotFound)
185         }))
186         deadv := &S3AWSVolume{
187                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
188                         IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
189                         Endpoint: "http://localhost:12345",
190                         Region:   "test-region-1",
191                         Bucket:   "test-bucket-name",
192                 },
193                 cluster: s.cluster,
194                 logger:  ctxlog.TestLogger(c),
195                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
196         }
197         err = deadv.check(s.metadata.URL + "/latest")
198         c.Check(err, check.IsNil)
199         _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
200         c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
201         c.Check(err, check.ErrorMatches, `(?s).*404.*`)
202 }
203
204 func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
205         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
206         stats := func() string {
207                 buf, err := json.Marshal(v.InternalStats())
208                 c.Check(err, check.IsNil)
209                 return string(buf)
210         }
211
212         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
213
214         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
215         _, err := v.Get(context.Background(), loc, make([]byte, 3))
216         c.Check(err, check.NotNil)
217         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
218         c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
219         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
220
221         err = v.Put(context.Background(), loc, []byte("foo"))
222         c.Check(err, check.IsNil)
223         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
224         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
225
226         _, err = v.Get(context.Background(), loc, make([]byte, 3))
227         c.Check(err, check.IsNil)
228         _, err = v.Get(context.Background(), loc, make([]byte, 3))
229         c.Check(err, check.IsNil)
230         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
231 }
232
233 type s3AWSBlockingHandler struct {
234         requested chan *http.Request
235         unblock   chan struct{}
236 }
237
238 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
239         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
240                 // Accept PutBucket ("PUT /bucketname/"), called by
241                 // newTestableVolume
242                 return
243         }
244         if h.requested != nil {
245                 h.requested <- r
246         }
247         if h.unblock != nil {
248                 <-h.unblock
249         }
250         http.Error(w, "nothing here", http.StatusNotFound)
251 }
252
253 func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
254         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
255         buf := make([]byte, 3)
256
257         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
258                 _, err := v.Get(ctx, loc, buf)
259                 return err
260         })
261 }
262
263 func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
264         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
265         buf := []byte("bar")
266
267         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
268                 return v.Compare(ctx, loc, buf)
269         })
270 }
271
272 func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
273         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
274         buf := []byte("foo")
275
276         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
277                 return v.Put(ctx, loc, buf)
278         })
279 }
280
281 func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
282         handler := &s3AWSBlockingHandler{}
283         s.s3server = httptest.NewServer(handler)
284         defer s.s3server.Close()
285
286         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
287
288         ctx, cancel := context.WithCancel(context.Background())
289
290         handler.requested = make(chan *http.Request)
291         handler.unblock = make(chan struct{})
292         defer close(handler.unblock)
293
294         doneFunc := make(chan struct{})
295         go func() {
296                 err := testFunc(ctx, v)
297                 c.Check(err, check.Equals, context.Canceled)
298                 close(doneFunc)
299         }()
300
301         timeout := time.After(10 * time.Second)
302
303         // Wait for the stub server to receive a request, meaning
304         // Get() is waiting for an s3 operation.
305         select {
306         case <-timeout:
307                 c.Fatal("timed out waiting for test func to call our handler")
308         case <-doneFunc:
309                 c.Fatal("test func finished without even calling our handler!")
310         case <-handler.requested:
311         }
312
313         cancel()
314
315         select {
316         case <-timeout:
317                 c.Fatal("timed out")
318         case <-doneFunc:
319         }
320 }
321
322 func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
323         s.cluster.Collections.BlobTrashLifetime.Set("1h")
324         s.cluster.Collections.BlobSigningTTL.Set("1h")
325
326         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
327         var none time.Time
328
329         putS3Obj := func(t time.Time, key string, data []byte) {
330                 if t == none {
331                         return
332                 }
333                 v.serverClock.now = &t
334                 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
335                 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
336                         Bucket: aws.String(v.bucket.bucket),
337                         Key:    aws.String(key),
338                         Body:   bytes.NewReader(data),
339                 })
340                 if err != nil {
341                         panic(err)
342                 }
343                 v.serverClock.now = nil
344                 _, err = v.head(key)
345                 if err != nil {
346                         panic(err)
347                 }
348         }
349
350         t0 := time.Now()
351         nextKey := 0
352         for _, scenario := range []struct {
353                 label               string
354                 dataT               time.Time
355                 recentT             time.Time
356                 trashT              time.Time
357                 canGet              bool
358                 canTrash            bool
359                 canGetAfterTrash    bool
360                 canUntrash          bool
361                 haveTrashAfterEmpty bool
362                 freshAfterEmpty     bool
363         }{
364                 {
365                         "No related objects",
366                         none, none, none,
367                         false, false, false, false, false, false,
368                 },
369                 {
370                         // Stored by older version, or there was a
371                         // race between EmptyTrash and Put: Trash is a
372                         // no-op even though the data object is very
373                         // old
374                         "No recent/X",
375                         t0.Add(-48 * time.Hour), none, none,
376                         true, true, true, false, false, false,
377                 },
378                 {
379                         "Not trash, but old enough to be eligible for trash",
380                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
381                         true, true, false, false, false, false,
382                 },
383                 {
384                         "Not trash, and not old enough to be eligible for trash",
385                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
386                         true, true, true, false, false, false,
387                 },
388                 {
389                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
390                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
391                         true, true, true, true, true, false,
392                 },
393                 {
394                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
395                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
396                         true, false, true, true, true, false,
397                 },
398                 {
399                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
400                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
401                         true, false, true, true, false, false,
402                 },
403                 {
404                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
405                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
406                         true, false, true, true, true, true,
407                 },
408                 {
409                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
410                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
411                         true, true, true, true, false, false,
412                 },
413                 {
414                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
415                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
416                         true, false, true, true, false, false,
417                 },
418                 {
419                         "Trash, not yet eligible for deletion",
420                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
421                         false, false, false, true, true, false,
422                 },
423                 {
424                         "Trash, not yet eligible for deletion, prone to races",
425                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
426                         false, false, false, true, true, false,
427                 },
428                 {
429                         "Trash, eligible for deletion",
430                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
431                         false, false, false, true, false, false,
432                 },
433                 {
434                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
435                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
436                         true, false, true, true, true, false,
437                 },
438                 {
439                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
440                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
441                         true, false, true, true, true, false,
442                 },
443                 {
444                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
445                         none, none, t0.Add(-time.Minute),
446                         false, false, false, true, true, true,
447                 },
448         } {
449                 for _, prefixLength := range []int{0, 3} {
450                         v.PrefixLength = prefixLength
451                         c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
452
453                         // We have a few tests to run for each scenario, and
454                         // the tests are expected to change state. By calling
455                         // this setup func between tests, we (re)create the
456                         // scenario as specified, using a new unique block
457                         // locator to prevent interference from previous
458                         // tests.
459
460                         setupScenario := func() (string, []byte) {
461                                 nextKey++
462                                 blk := []byte(fmt.Sprintf("%d", nextKey))
463                                 loc := fmt.Sprintf("%x", md5.Sum(blk))
464                                 key := loc
465                                 if prefixLength > 0 {
466                                         key = loc[:prefixLength] + "/" + loc
467                                 }
468                                 c.Log("\t", loc, "\t", key)
469                                 putS3Obj(scenario.dataT, key, blk)
470                                 putS3Obj(scenario.recentT, "recent/"+key, nil)
471                                 putS3Obj(scenario.trashT, "trash/"+key, blk)
472                                 v.serverClock.now = &t0
473                                 return loc, blk
474                         }
475
476                         // Check canGet
477                         loc, blk := setupScenario()
478                         buf := make([]byte, len(blk))
479                         _, err := v.Get(context.Background(), loc, buf)
480                         c.Check(err == nil, check.Equals, scenario.canGet)
481                         if err != nil {
482                                 c.Check(os.IsNotExist(err), check.Equals, true)
483                         }
484
485                         // Call Trash, then check canTrash and canGetAfterTrash
486                         loc, _ = setupScenario()
487                         err = v.Trash(loc)
488                         c.Check(err == nil, check.Equals, scenario.canTrash)
489                         _, err = v.Get(context.Background(), loc, buf)
490                         c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
491                         if err != nil {
492                                 c.Check(os.IsNotExist(err), check.Equals, true)
493                         }
494
495                         // Call Untrash, then check canUntrash
496                         loc, _ = setupScenario()
497                         err = v.Untrash(loc)
498                         c.Check(err == nil, check.Equals, scenario.canUntrash)
499                         if scenario.dataT != none || scenario.trashT != none {
500                                 // In all scenarios where the data exists, we
501                                 // should be able to Get after Untrash --
502                                 // regardless of timestamps, errors, race
503                                 // conditions, etc.
504                                 _, err = v.Get(context.Background(), loc, buf)
505                                 c.Check(err, check.IsNil)
506                         }
507
508                         // Call EmptyTrash, then check haveTrashAfterEmpty and
509                         // freshAfterEmpty
510                         loc, _ = setupScenario()
511                         v.EmptyTrash()
512                         _, err = v.head("trash/" + v.key(loc))
513                         c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
514                         if scenario.freshAfterEmpty {
515                                 t, err := v.Mtime(loc)
516                                 c.Check(err, check.IsNil)
517                                 // new mtime must be current (with an
518                                 // allowance for 1s timestamp precision)
519                                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
520                         }
521
522                         // Check for current Mtime after Put (applies to all
523                         // scenarios)
524                         loc, blk = setupScenario()
525                         err = v.Put(context.Background(), loc, blk)
526                         c.Check(err, check.IsNil)
527                         t, err := v.Mtime(loc)
528                         c.Check(err, check.IsNil)
529                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
530                 }
531         }
532 }
533
534 type TestableS3AWSVolume struct {
535         *S3AWSVolume
536         server      *httptest.Server
537         c           *check.C
538         serverClock *s3AWSFakeClock
539 }
540
541 type LogrusLog struct {
542         log *logrus.FieldLogger
543 }
544
545 func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
546         switch level {
547         case gofakes3.LogErr:
548                 (*l.log).Errorln(v...)
549         case gofakes3.LogWarn:
550                 (*l.log).Warnln(v...)
551         case gofakes3.LogInfo:
552                 (*l.log).Infoln(v...)
553         default:
554                 panic("unknown level")
555         }
556 }
557
558 func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
559
560         clock := &s3AWSFakeClock{}
561         // fake s3
562         backend := s3mem.New(s3mem.WithTimeSource(clock))
563
564         // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
565         /* logger := new(LogrusLog)
566         ctxLogger := ctxlog.FromContext(context.Background())
567         logger.log = &ctxLogger */
568         faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
569         srv := httptest.NewServer(faker.Server())
570
571         endpoint := srv.URL
572         if s.s3server != nil {
573                 endpoint = s.s3server.URL
574         }
575
576         iamRole, accessKey, secretKey := "", "xxx", "xxx"
577         if s.metadata != nil {
578                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
579         }
580
581         v := &TestableS3AWSVolume{
582                 S3AWSVolume: &S3AWSVolume{
583                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
584                                 IAMRole:            iamRole,
585                                 AccessKeyID:        accessKey,
586                                 SecretAccessKey:    secretKey,
587                                 Bucket:             S3AWSTestBucketName,
588                                 Endpoint:           endpoint,
589                                 Region:             "test-region-1",
590                                 LocationConstraint: true,
591                                 UnsafeDelete:       true,
592                                 IndexPageSize:      1000,
593                         },
594                         cluster: cluster,
595                         volume:  volume,
596                         logger:  ctxlog.TestLogger(c),
597                         metrics: metrics,
598                 },
599                 c:           c,
600                 server:      srv,
601                 serverClock: clock,
602         }
603         c.Assert(v.S3AWSVolume.check(""), check.IsNil)
604         // Our test S3 server uses the older 'Path Style'
605         v.S3AWSVolume.bucket.svc.ForcePathStyle = true
606         // Create the testbucket
607         input := &s3.CreateBucketInput{
608                 Bucket: aws.String(S3AWSTestBucketName),
609         }
610         req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
611         _, err := req.Send(context.Background())
612         c.Assert(err, check.IsNil)
613         // We couldn't set RaceWindow until now because check()
614         // rejects negative values.
615         v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
616         return v
617 }
618
619 // PutRaw skips the ContentMD5 test
620 func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
621         key := v.key(loc)
622         r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
623
624         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
625                 u.PartSize = 5 * 1024 * 1024
626                 u.Concurrency = 13
627         })
628
629         _, err := uploader.Upload(&s3manager.UploadInput{
630                 Bucket: aws.String(v.bucket.bucket),
631                 Key:    aws.String(key),
632                 Body:   r,
633         })
634         if err != nil {
635                 v.logger.Printf("PutRaw: %s: %+v", key, err)
636         }
637
638         empty := bytes.NewReader([]byte{})
639         _, err = uploader.Upload(&s3manager.UploadInput{
640                 Bucket: aws.String(v.bucket.bucket),
641                 Key:    aws.String("recent/" + key),
642                 Body:   empty,
643         })
644         if err != nil {
645                 v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
646         }
647 }
648
649 // TouchWithDate turns back the clock while doing a Touch(). We assume
650 // there are no other operations happening on the same s3test server
651 // while we do this.
652 func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
653         v.serverClock.now = &lastPut
654
655         uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
656         empty := bytes.NewReader([]byte{})
657         _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
658                 Bucket: aws.String(v.bucket.bucket),
659                 Key:    aws.String("recent/" + v.key(loc)),
660                 Body:   empty,
661         })
662         if err != nil {
663                 panic(err)
664         }
665
666         v.serverClock.now = nil
667 }
668
669 func (v *TestableS3AWSVolume) Teardown() {
670         v.server.Close()
671 }
672
673 func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
674         return "get", "put"
675 }