f05cbee848742a436073230eac00b767930dd9b5
[arvados.git] / services / keepstore / s3aws_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22
23         "github.com/aws/aws-sdk-go-v2/aws"
24         "github.com/aws/aws-sdk-go-v2/service/s3"
25         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
26
27         "github.com/johannesboyne/gofakes3"
28         "github.com/johannesboyne/gofakes3/backend/s3mem"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31         check "gopkg.in/check.v1"
32 )
33
34 const (
35         S3AWSTestBucketName = "testbucket"
36 )
37
38 type s3AWSFakeClock struct {
39         now *time.Time
40 }
41
42 func (c *s3AWSFakeClock) Now() time.Time {
43         if c.now == nil {
44                 return time.Now().UTC()
45         }
46         return c.now.UTC()
47 }
48
49 func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
50         return c.Now().Sub(t)
51 }
52
53 var _ = check.Suite(&StubbedS3AWSSuite{})
54
55 var srv httptest.Server
56
57 type StubbedS3AWSSuite struct {
58         s3server *httptest.Server
59         metadata *httptest.Server
60         cluster  *arvados.Cluster
61         volumes  []*TestableS3AWSVolume
62 }
63
64 func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
65         s.s3server = nil
66         s.metadata = nil
67         s.cluster = testCluster(c)
68         s.cluster.Volumes = map[string]arvados.Volume{
69                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
70                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
71         }
72 }
73
74 func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
75         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
76                 // Use a negative raceWindow so s3test's 1-second
77                 // timestamp precision doesn't confuse fixRace.
78                 return s.newTestableVolume(c, params, -2*time.Second)
79         })
80 }
81
82 func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
83         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
84                 return s.newTestableVolume(c, params, -2*time.Second)
85         })
86 }
87
88 func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
89         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
90                 v := s.newTestableVolume(c, params, -2*time.Second)
91                 v.PrefixLength = 3
92                 return v
93         })
94 }
95
96 func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
97         v := s.newTestableVolume(c, newVolumeParams{
98                 Cluster:      s.cluster,
99                 ConfigVolume: arvados.Volume{Replication: 2},
100                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
101                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
102         }, 0)
103         v.IndexPageSize = 3
104         for i := 0; i < 256; i++ {
105                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
106         }
107         for _, spec := range []struct {
108                 prefix      string
109                 expectMatch int
110         }{
111                 {"", 256},
112                 {"c", 16},
113                 {"bc", 1},
114                 {"abc", 0},
115         } {
116                 buf := new(bytes.Buffer)
117                 err := v.Index(context.Background(), spec.prefix, buf)
118                 c.Check(err, check.IsNil)
119
120                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
121                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
122                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
123         }
124 }
125
126 func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
127         var header http.Header
128         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
129                 header = r.Header
130         }))
131         defer stub.Close()
132
133         // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
134         // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
135         vol := S3AWSVolume{
136                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
137                         AccessKeyID:     "xxx",
138                         SecretAccessKey: "xxx",
139                         Endpoint:        stub.URL,
140                         Region:          "test-region-1",
141                         Bucket:          "test-bucket-name",
142                 },
143                 cluster: s.cluster,
144                 logger:  ctxlog.TestLogger(c),
145                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
146         }
147         err := vol.check("")
148         // Our test S3 server uses the older 'Path Style'
149         vol.bucket.svc.ForcePathStyle = true
150
151         c.Check(err, check.IsNil)
152         err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
153         c.Check(err, check.IsNil)
154         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
155 }
156
157 func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
158         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
159                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
160                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
161                 // Literal example from
162                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
163                 // but with updated timestamps
164                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
165         }))
166         defer s.metadata.Close()
167
168         v := &S3AWSVolume{
169                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
170                         IAMRole:  s.metadata.URL + "/latest/api/token",
171                         Endpoint: "http://localhost:12345",
172                         Region:   "test-region-1",
173                         Bucket:   "test-bucket-name",
174                 },
175                 cluster: s.cluster,
176                 logger:  ctxlog.TestLogger(c),
177                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
178         }
179         err := v.check(s.metadata.URL + "/latest")
180         c.Check(err, check.IsNil)
181         creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
182         c.Check(err, check.IsNil)
183         c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
184         c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
185
186         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
187                 w.WriteHeader(http.StatusNotFound)
188         }))
189         deadv := &S3AWSVolume{
190                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
191                         IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
192                         Endpoint: "http://localhost:12345",
193                         Region:   "test-region-1",
194                         Bucket:   "test-bucket-name",
195                 },
196                 cluster: s.cluster,
197                 logger:  ctxlog.TestLogger(c),
198                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
199         }
200         err = deadv.check(s.metadata.URL + "/latest")
201         c.Check(err, check.IsNil)
202         _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
203         c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
204         c.Check(err, check.ErrorMatches, `(?s).*404.*`)
205 }
206
207 func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
208         v := s.newTestableVolume(c, newVolumeParams{
209                 Cluster:      s.cluster,
210                 ConfigVolume: arvados.Volume{Replication: 2},
211                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
212                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
213         }, 5*time.Minute)
214         stats := func() string {
215                 buf, err := json.Marshal(v.InternalStats())
216                 c.Check(err, check.IsNil)
217                 return string(buf)
218         }
219
220         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
221
222         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
223         _, err := v.BlockRead(context.Background(), loc, io.Discard)
224         c.Check(err, check.NotNil)
225         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
226         c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
227         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
228
229         err = v.BlockWrite(context.Background(), loc, []byte("foo"))
230         c.Check(err, check.IsNil)
231         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
232         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
233
234         _, err = v.BlockRead(context.Background(), loc, io.Discard)
235         c.Check(err, check.IsNil)
236         _, err = v.BlockRead(context.Background(), loc, io.Discard)
237         c.Check(err, check.IsNil)
238         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
239 }
240
241 type s3AWSBlockingHandler struct {
242         requested chan *http.Request
243         unblock   chan struct{}
244 }
245
246 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
247         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
248                 // Accept PutBucket ("PUT /bucketname/"), called by
249                 // newTestableVolume
250                 return
251         }
252         if h.requested != nil {
253                 h.requested <- r
254         }
255         if h.unblock != nil {
256                 <-h.unblock
257         }
258         http.Error(w, "nothing here", http.StatusNotFound)
259 }
260
261 func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
262         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
263                 _, err := v.BlockRead(ctx, fooHash, io.Discard)
264                 return err
265         })
266 }
267
268 func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
269         s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
270                 return v.BlockWrite(ctx, fooHash, []byte("foo"))
271         })
272 }
273
274 func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
275         handler := &s3AWSBlockingHandler{}
276         s.s3server = httptest.NewServer(handler)
277         defer s.s3server.Close()
278
279         v := s.newTestableVolume(c, newVolumeParams{
280                 Cluster:      s.cluster,
281                 ConfigVolume: arvados.Volume{Replication: 2},
282                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
283                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
284         }, 5*time.Minute)
285
286         ctx, cancel := context.WithCancel(context.Background())
287
288         handler.requested = make(chan *http.Request)
289         handler.unblock = make(chan struct{})
290         defer close(handler.unblock)
291
292         doneFunc := make(chan struct{})
293         go func() {
294                 err := testFunc(ctx, v)
295                 c.Check(err, check.Equals, context.Canceled)
296                 close(doneFunc)
297         }()
298
299         timeout := time.After(10 * time.Second)
300
301         // Wait for the stub server to receive a request, meaning
302         // Get() is waiting for an s3 operation.
303         select {
304         case <-timeout:
305                 c.Fatal("timed out waiting for test func to call our handler")
306         case <-doneFunc:
307                 c.Fatal("test func finished without even calling our handler!")
308         case <-handler.requested:
309         }
310
311         cancel()
312
313         select {
314         case <-timeout:
315                 c.Fatal("timed out")
316         case <-doneFunc:
317         }
318 }
319
320 func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
321         s.cluster.Collections.BlobTrashLifetime.Set("1h")
322         s.cluster.Collections.BlobSigningTTL.Set("1h")
323
324         v := s.newTestableVolume(c, newVolumeParams{
325                 Cluster:      s.cluster,
326                 ConfigVolume: arvados.Volume{Replication: 2},
327                 Logger:       ctxlog.TestLogger(c),
328                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
329                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
330         }, 5*time.Minute)
331         var none time.Time
332
333         putS3Obj := func(t time.Time, key string, data []byte) {
334                 if t == none {
335                         return
336                 }
337                 v.serverClock.now = &t
338                 uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
339                 _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
340                         Bucket: aws.String(v.bucket.bucket),
341                         Key:    aws.String(key),
342                         Body:   bytes.NewReader(data),
343                 })
344                 if err != nil {
345                         panic(err)
346                 }
347                 v.serverClock.now = nil
348                 _, err = v.head(key)
349                 if err != nil {
350                         panic(err)
351                 }
352         }
353
354         t0 := time.Now()
355         nextKey := 0
356         for _, scenario := range []struct {
357                 label               string
358                 dataT               time.Time
359                 recentT             time.Time
360                 trashT              time.Time
361                 canGet              bool
362                 canTrash            bool
363                 canGetAfterTrash    bool
364                 canUntrash          bool
365                 haveTrashAfterEmpty bool
366                 freshAfterEmpty     bool
367         }{
368                 {
369                         "No related objects",
370                         none, none, none,
371                         false, false, false, false, false, false,
372                 },
373                 {
374                         // Stored by older version, or there was a
375                         // race between EmptyTrash and Put: Trash is a
376                         // no-op even though the data object is very
377                         // old
378                         "No recent/X",
379                         t0.Add(-48 * time.Hour), none, none,
380                         true, true, true, false, false, false,
381                 },
382                 {
383                         "Not trash, but old enough to be eligible for trash",
384                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
385                         true, true, false, false, false, false,
386                 },
387                 {
388                         "Not trash, and not old enough to be eligible for trash",
389                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
390                         true, true, true, false, false, false,
391                 },
392                 {
393                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
394                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
395                         true, true, true, true, true, false,
396                 },
397                 {
398                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
399                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
400                         true, false, true, true, true, false,
401                 },
402                 {
403                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
404                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
405                         true, false, true, true, false, false,
406                 },
407                 {
408                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
409                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
410                         true, false, true, true, true, true,
411                 },
412                 {
413                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
414                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
415                         true, true, true, true, false, false,
416                 },
417                 {
418                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
419                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
420                         true, false, true, true, false, false,
421                 },
422                 {
423                         "Trash, not yet eligible for deletion",
424                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
425                         false, false, false, true, true, false,
426                 },
427                 {
428                         "Trash, not yet eligible for deletion, prone to races",
429                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
430                         false, false, false, true, true, false,
431                 },
432                 {
433                         "Trash, eligible for deletion",
434                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
435                         false, false, false, true, false, false,
436                 },
437                 {
438                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
439                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
440                         true, false, true, true, true, false,
441                 },
442                 {
443                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
444                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
445                         true, false, true, true, true, false,
446                 },
447                 {
448                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
449                         none, none, t0.Add(-time.Minute),
450                         false, false, false, true, true, true,
451                 },
452         } {
453                 for _, prefixLength := range []int{0, 3} {
454                         v.PrefixLength = prefixLength
455                         c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
456
457                         // We have a few tests to run for each scenario, and
458                         // the tests are expected to change state. By calling
459                         // this setup func between tests, we (re)create the
460                         // scenario as specified, using a new unique block
461                         // locator to prevent interference from previous
462                         // tests.
463
464                         setupScenario := func() (string, []byte) {
465                                 nextKey++
466                                 blk := []byte(fmt.Sprintf("%d", nextKey))
467                                 loc := fmt.Sprintf("%x", md5.Sum(blk))
468                                 key := loc
469                                 if prefixLength > 0 {
470                                         key = loc[:prefixLength] + "/" + loc
471                                 }
472                                 c.Log("\t", loc, "\t", key)
473                                 putS3Obj(scenario.dataT, key, blk)
474                                 putS3Obj(scenario.recentT, "recent/"+key, nil)
475                                 putS3Obj(scenario.trashT, "trash/"+key, blk)
476                                 v.serverClock.now = &t0
477                                 return loc, blk
478                         }
479
480                         // Check canGet
481                         loc, blk := setupScenario()
482                         _, err := v.BlockRead(context.Background(), loc, io.Discard)
483                         c.Check(err == nil, check.Equals, scenario.canGet)
484                         if err != nil {
485                                 c.Check(os.IsNotExist(err), check.Equals, true)
486                         }
487
488                         // Call Trash, then check canTrash and canGetAfterTrash
489                         loc, _ = setupScenario()
490                         err = v.BlockTrash(loc)
491                         c.Check(err == nil, check.Equals, scenario.canTrash)
492                         _, err = v.BlockRead(context.Background(), loc, io.Discard)
493                         c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
494                         if err != nil {
495                                 c.Check(os.IsNotExist(err), check.Equals, true)
496                         }
497
498                         // Call Untrash, then check canUntrash
499                         loc, _ = setupScenario()
500                         err = v.BlockUntrash(loc)
501                         c.Check(err == nil, check.Equals, scenario.canUntrash)
502                         if scenario.dataT != none || scenario.trashT != none {
503                                 // In all scenarios where the data exists, we
504                                 // should be able to Get after Untrash --
505                                 // regardless of timestamps, errors, race
506                                 // conditions, etc.
507                                 _, err = v.BlockRead(context.Background(), loc, io.Discard)
508                                 c.Check(err, check.IsNil)
509                         }
510
511                         // Call EmptyTrash, then check haveTrashAfterEmpty and
512                         // freshAfterEmpty
513                         loc, _ = setupScenario()
514                         v.EmptyTrash()
515                         _, err = v.head("trash/" + v.key(loc))
516                         c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
517                         if scenario.freshAfterEmpty {
518                                 t, err := v.Mtime(loc)
519                                 c.Check(err, check.IsNil)
520                                 // new mtime must be current (with an
521                                 // allowance for 1s timestamp precision)
522                                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
523                         }
524
525                         // Check for current Mtime after Put (applies to all
526                         // scenarios)
527                         loc, blk = setupScenario()
528                         err = v.BlockWrite(context.Background(), loc, blk)
529                         c.Check(err, check.IsNil)
530                         t, err := v.Mtime(loc)
531                         c.Check(err, check.IsNil)
532                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
533                 }
534         }
535 }
536
537 type TestableS3AWSVolume struct {
538         *S3AWSVolume
539         server      *httptest.Server
540         c           *check.C
541         serverClock *s3AWSFakeClock
542 }
543
544 type LogrusLog struct {
545         log *logrus.FieldLogger
546 }
547
548 func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
549         switch level {
550         case gofakes3.LogErr:
551                 (*l.log).Errorln(v...)
552         case gofakes3.LogWarn:
553                 (*l.log).Warnln(v...)
554         case gofakes3.LogInfo:
555                 (*l.log).Infoln(v...)
556         default:
557                 panic("unknown level")
558         }
559 }
560
561 func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *TestableS3AWSVolume {
562
563         clock := &s3AWSFakeClock{}
564         // fake s3
565         backend := s3mem.New(s3mem.WithTimeSource(clock))
566
567         // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
568         /* logger := new(LogrusLog)
569         ctxLogger := ctxlog.FromContext(context.Background())
570         logger.log = &ctxLogger */
571         faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
572         srv := httptest.NewServer(faker.Server())
573
574         endpoint := srv.URL
575         if s.s3server != nil {
576                 endpoint = s.s3server.URL
577         }
578
579         iamRole, accessKey, secretKey := "", "xxx", "xxx"
580         if s.metadata != nil {
581                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
582         }
583
584         v := &TestableS3AWSVolume{
585                 S3AWSVolume: &S3AWSVolume{
586                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
587                                 IAMRole:            iamRole,
588                                 AccessKeyID:        accessKey,
589                                 SecretAccessKey:    secretKey,
590                                 Bucket:             S3AWSTestBucketName,
591                                 Endpoint:           endpoint,
592                                 Region:             "test-region-1",
593                                 LocationConstraint: true,
594                                 UnsafeDelete:       true,
595                                 IndexPageSize:      1000,
596                         },
597                         cluster:    params.Cluster,
598                         volume:     params.ConfigVolume,
599                         logger:     params.Logger,
600                         metrics:    params.MetricsVecs,
601                         bufferPool: params.BufferPool,
602                 },
603                 c:           c,
604                 server:      srv,
605                 serverClock: clock,
606         }
607         c.Assert(v.S3AWSVolume.check(""), check.IsNil)
608         // Our test S3 server uses the older 'Path Style'
609         v.S3AWSVolume.bucket.svc.ForcePathStyle = true
610         // Create the testbucket
611         input := &s3.CreateBucketInput{
612                 Bucket: aws.String(S3AWSTestBucketName),
613         }
614         req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
615         _, err := req.Send(context.Background())
616         c.Assert(err, check.IsNil)
617         // We couldn't set RaceWindow until now because check()
618         // rejects negative values.
619         v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
620         return v
621 }
622
623 // PutRaw skips the ContentMD5 test
624 func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
625         key := v.key(loc)
626         r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
627
628         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
629                 u.PartSize = 5 * 1024 * 1024
630                 u.Concurrency = 13
631         })
632
633         _, err := uploader.Upload(&s3manager.UploadInput{
634                 Bucket: aws.String(v.bucket.bucket),
635                 Key:    aws.String(key),
636                 Body:   r,
637         })
638         if err != nil {
639                 v.logger.Printf("PutRaw: %s: %+v", key, err)
640         }
641
642         empty := bytes.NewReader([]byte{})
643         _, err = uploader.Upload(&s3manager.UploadInput{
644                 Bucket: aws.String(v.bucket.bucket),
645                 Key:    aws.String("recent/" + key),
646                 Body:   empty,
647         })
648         if err != nil {
649                 v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
650         }
651 }
652
653 // TouchWithDate turns back the clock while doing a Touch(). We assume
654 // there are no other operations happening on the same s3test server
655 // while we do this.
656 func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
657         v.serverClock.now = &lastPut
658
659         uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
660         empty := bytes.NewReader([]byte{})
661         _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
662                 Bucket: aws.String(v.bucket.bucket),
663                 Key:    aws.String("recent/" + v.key(loc)),
664                 Body:   empty,
665         })
666         if err != nil {
667                 panic(err)
668         }
669
670         v.serverClock.now = nil
671 }
672
673 func (v *TestableS3AWSVolume) Teardown() {
674         v.server.Close()
675 }
676
677 func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
678         return "get", "put"
679 }