22217: Make apt less quiet
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "sync/atomic"
19         "time"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/ctxlog"
23
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
26         "github.com/aws/aws-sdk-go-v2/service/s3"
27
28         "github.com/johannesboyne/gofakes3"
29         "github.com/johannesboyne/gofakes3/backend/s3mem"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32         check "gopkg.in/check.v1"
33 )
34
35 type s3fakeClock struct {
36         now *time.Time
37 }
38
39 func (c *s3fakeClock) Now() time.Time {
40         if c.now == nil {
41                 return time.Now().UTC()
42         }
43         return c.now.UTC()
44 }
45
46 func (c *s3fakeClock) Since(t time.Time) time.Duration {
47         return c.Now().Sub(t)
48 }
49
50 var _ = check.Suite(&stubbedS3Suite{})
51
52 var srv httptest.Server
53
54 type stubbedS3Suite struct {
55         s3server    *httptest.Server
56         s3fakeClock *s3fakeClock
57         metadata    *httptest.Server
58         cluster     *arvados.Cluster
59         volumes     []*testableS3Volume
60 }
61
62 func (s *stubbedS3Suite) SetUpTest(c *check.C) {
63         s.s3server = nil
64         s.s3fakeClock = &s3fakeClock{}
65         s.metadata = nil
66         s.cluster = testCluster(c)
67         s.cluster.Volumes = map[string]arvados.Volume{
68                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
69                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
70         }
71 }
72
73 func (s *stubbedS3Suite) TearDownTest(c *check.C) {
74         if s.s3server != nil {
75                 s.s3server.Close()
76         }
77 }
78
79 func (s *stubbedS3Suite) TestGeneric(c *check.C) {
80         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
81                 // Use a negative raceWindow so s3test's 1-second
82                 // timestamp precision doesn't confuse fixRace.
83                 return s.newTestableVolume(c, params, -2*time.Second)
84         })
85 }
86
87 func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
88         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
89                 return s.newTestableVolume(c, params, -2*time.Second)
90         })
91 }
92
93 func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
94         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
95                 v := s.newTestableVolume(c, params, -2*time.Second)
96                 v.PrefixLength = 3
97                 return v
98         })
99 }
100
101 func (s *stubbedS3Suite) TestIndex(c *check.C) {
102         v := s.newTestableVolume(c, newVolumeParams{
103                 Cluster:      s.cluster,
104                 ConfigVolume: arvados.Volume{Replication: 2},
105                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
106                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
107         }, 0)
108         v.IndexPageSize = 3
109         for i := 0; i < 256; i++ {
110                 err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
111                 c.Assert(err, check.IsNil)
112         }
113         for _, spec := range []struct {
114                 prefix      string
115                 expectMatch int
116         }{
117                 {"", 256},
118                 {"c", 16},
119                 {"bc", 1},
120                 {"abc", 0},
121         } {
122                 buf := new(bytes.Buffer)
123                 err := v.Index(context.Background(), spec.prefix, buf)
124                 c.Check(err, check.IsNil)
125
126                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
127                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
128                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
129         }
130 }
131
132 func (s *stubbedS3Suite) TestSignature(c *check.C) {
133         var header http.Header
134         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
135                 header = r.Header
136         }))
137         defer stub.Close()
138
139         // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
140         // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
141         vol := s3Volume{
142                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
143                         AccessKeyID:     "xxx",
144                         SecretAccessKey: "xxx",
145                         Endpoint:        stub.URL,
146                         Region:          "test-region-1",
147                         Bucket:          "test-bucket-name",
148                         UsePathStyle:    true,
149                 },
150                 cluster: s.cluster,
151                 logger:  ctxlog.TestLogger(c),
152                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
153         }
154         err := vol.check("")
155
156         c.Check(err, check.IsNil)
157         err = vol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
158         c.Check(err, check.IsNil)
159         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
160 }
161
162 func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
163         var reqHeader http.Header
164         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
165                 reqHeader = r.Header
166         }))
167         defer stub.Close()
168
169         retrievedMetadata := false
170         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
171                 retrievedMetadata = true
172                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
173                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
174                 c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
175                 switch {
176                 case r.URL.Path == "/latest/meta-data/iam/security-credentials/":
177                         io.WriteString(w, "testcredential\n")
178                 case r.URL.Path == "/latest/api/token",
179                         r.URL.Path == "/latest/meta-data/iam/security-credentials/testcredential":
180                         // Literal example from
181                         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
182                         // but with updated timestamps
183                         io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
184                 default:
185                         w.WriteHeader(http.StatusNotFound)
186                 }
187         }))
188         defer s.metadata.Close()
189
190         v := &s3Volume{
191                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
192                         Endpoint: stub.URL,
193                         Region:   "test-region-1",
194                         Bucket:   "test-bucket-name",
195                 },
196                 cluster: s.cluster,
197                 logger:  ctxlog.TestLogger(c),
198                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
199         }
200         err := v.check(s.metadata.URL + "/latest")
201         c.Check(err, check.IsNil)
202         resp, err := v.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
203         c.Check(err, check.IsNil)
204         c.Check(resp.Buckets, check.HasLen, 0)
205         c.Check(retrievedMetadata, check.Equals, true)
206         c.Check(reqHeader.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 Credential=ASIAIOSFODNN7EXAMPLE/\d+/test-region-1/s3/aws4_request, SignedHeaders=.*`)
207
208         retrievedMetadata = false
209         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
210                 retrievedMetadata = true
211                 c.Logf("metadata stub received request: %s %s", r.Method, r.URL.Path)
212                 w.WriteHeader(http.StatusNotFound)
213         }))
214         deadv := &s3Volume{
215                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
216                         Endpoint: "http://localhost:9",
217                         Region:   "test-region-1",
218                         Bucket:   "test-bucket-name",
219                 },
220                 cluster: s.cluster,
221                 logger:  ctxlog.TestLogger(c),
222                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
223         }
224         err = deadv.check(s.metadata.URL + "/latest")
225         c.Check(err, check.IsNil)
226         _, err = deadv.bucket.svc.ListBuckets(context.Background(), &s3.ListBucketsInput{})
227         c.Check(err, check.ErrorMatches, `(?s).*failed to refresh cached credentials, no EC2 IMDS role found.*`)
228         c.Check(err, check.ErrorMatches, `(?s).*404.*`)
229         c.Check(retrievedMetadata, check.Equals, true)
230 }
231
232 func (s *stubbedS3Suite) TestStats(c *check.C) {
233         v := s.newTestableVolume(c, newVolumeParams{
234                 Cluster:      s.cluster,
235                 ConfigVolume: arvados.Volume{Replication: 2},
236                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
237                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
238         }, 5*time.Minute)
239         stats := func() string {
240                 buf, err := json.Marshal(v.InternalStats())
241                 c.Check(err, check.IsNil)
242                 return string(buf)
243         }
244
245         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
246
247         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
248         err := v.BlockRead(context.Background(), loc, brdiscard)
249         c.Check(err, check.NotNil)
250         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
251         c.Check(stats(), check.Matches, `.*"\*smithy.OperationError 404 NoSuchKey":[^0].*`)
252         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
253
254         err = v.BlockWrite(context.Background(), loc, []byte("foo"))
255         c.Check(err, check.IsNil)
256         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
257         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
258
259         err = v.BlockRead(context.Background(), loc, brdiscard)
260         c.Check(err, check.IsNil)
261         err = v.BlockRead(context.Background(), loc, brdiscard)
262         c.Check(err, check.IsNil)
263         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
264 }
265
266 type s3AWSBlockingHandler struct {
267         requested chan *http.Request
268         unblock   chan struct{}
269 }
270
271 func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
272         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
273                 // Accept PutBucket ("PUT /bucketname/"), called by
274                 // newTestableVolume
275                 return
276         }
277         if h.requested != nil {
278                 h.requested <- r
279         }
280         if h.unblock != nil {
281                 <-h.unblock
282         }
283         http.Error(w, "nothing here", http.StatusNotFound)
284 }
285
286 func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
287         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
288                 return v.BlockRead(ctx, fooHash, brdiscard)
289         })
290 }
291
292 func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
293         s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
294                 return v.BlockWrite(ctx, fooHash, []byte("foo"))
295         })
296 }
297
298 func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
299         handler := &s3AWSBlockingHandler{}
300         s.s3server = httptest.NewServer(handler)
301         defer s.s3server.Close()
302
303         v := s.newTestableVolume(c, newVolumeParams{
304                 Cluster:      s.cluster,
305                 ConfigVolume: arvados.Volume{Replication: 2},
306                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
307                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
308         }, 5*time.Minute)
309
310         ctx, cancel := context.WithCancel(context.Background())
311
312         handler.requested = make(chan *http.Request)
313         handler.unblock = make(chan struct{})
314         defer close(handler.unblock)
315
316         doneFunc := make(chan struct{})
317         go func() {
318                 err := testFunc(ctx, v)
319                 c.Check(err, check.Equals, context.Canceled)
320                 close(doneFunc)
321         }()
322
323         timeout := time.After(10 * time.Second)
324
325         // Wait for the stub server to receive a request, meaning
326         // Get() is waiting for an s3 operation.
327         select {
328         case <-timeout:
329                 c.Fatal("timed out waiting for test func to call our handler")
330         case <-doneFunc:
331                 c.Fatal("test func finished without even calling our handler!")
332         case <-handler.requested:
333         }
334
335         cancel()
336
337         select {
338         case <-timeout:
339                 c.Fatal("timed out")
340         case <-doneFunc:
341         }
342 }
343
344 func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
345         s.cluster.Collections.BlobTrashLifetime.Set("1h")
346         s.cluster.Collections.BlobSigningTTL.Set("1h")
347
348         v := s.newTestableVolume(c, newVolumeParams{
349                 Cluster:      s.cluster,
350                 ConfigVolume: arvados.Volume{Replication: 2},
351                 Logger:       ctxlog.TestLogger(c),
352                 MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
353                 BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
354         }, 5*time.Minute)
355         var none time.Time
356
357         putS3Obj := func(t time.Time, key string, data []byte) {
358                 if t == none {
359                         return
360                 }
361                 s.s3fakeClock.now = &t
362                 uploader := manager.NewUploader(v.bucket.svc)
363                 _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
364                         Bucket: aws.String(v.bucket.bucket),
365                         Key:    aws.String(key),
366                         Body:   bytes.NewReader(data),
367                 })
368                 if err != nil {
369                         panic(err)
370                 }
371                 s.s3fakeClock.now = nil
372                 _, err = v.head(key)
373                 if err != nil {
374                         panic(err)
375                 }
376         }
377
378         t0 := time.Now()
379         nextKey := 0
380         for _, scenario := range []struct {
381                 label               string
382                 dataT               time.Time
383                 recentT             time.Time
384                 trashT              time.Time
385                 canGet              bool
386                 canTrash            bool
387                 canGetAfterTrash    bool
388                 canUntrash          bool
389                 haveTrashAfterEmpty bool
390                 freshAfterEmpty     bool
391         }{
392                 {
393                         "No related objects",
394                         none, none, none,
395                         false, false, false, false, false, false,
396                 },
397                 {
398                         // Stored by older version, or there was a
399                         // race between EmptyTrash and Put: Trash is a
400                         // no-op even though the data object is very
401                         // old
402                         "No recent/X",
403                         t0.Add(-48 * time.Hour), none, none,
404                         true, true, true, false, false, false,
405                 },
406                 {
407                         "Not trash, but old enough to be eligible for trash",
408                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
409                         true, true, false, false, false, false,
410                 },
411                 {
412                         "Not trash, and not old enough to be eligible for trash",
413                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
414                         true, true, true, false, false, false,
415                 },
416                 {
417                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
418                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
419                         true, true, true, true, true, false,
420                 },
421                 {
422                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
423                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
424                         true, false, true, true, true, false,
425                 },
426                 {
427                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
428                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
429                         true, false, true, true, false, false,
430                 },
431                 {
432                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
433                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
434                         true, false, true, true, true, true,
435                 },
436                 {
437                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
438                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
439                         true, true, true, true, false, false,
440                 },
441                 {
442                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
443                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
444                         true, false, true, true, false, false,
445                 },
446                 {
447                         "Trash, not yet eligible for deletion",
448                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
449                         false, false, false, true, true, false,
450                 },
451                 {
452                         "Trash, not yet eligible for deletion, prone to races",
453                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
454                         false, false, false, true, true, false,
455                 },
456                 {
457                         "Trash, eligible for deletion",
458                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
459                         false, false, false, true, false, false,
460                 },
461                 {
462                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
463                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
464                         true, false, true, true, true, false,
465                 },
466                 {
467                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
468                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
469                         true, false, true, true, true, false,
470                 },
471                 {
472                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
473                         none, none, t0.Add(-time.Minute),
474                         false, false, false, true, true, true,
475                 },
476         } {
477                 for _, prefixLength := range []int{0, 3} {
478                         v.PrefixLength = prefixLength
479                         c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
480
481                         // We have a few tests to run for each scenario, and
482                         // the tests are expected to change state. By calling
483                         // this setup func between tests, we (re)create the
484                         // scenario as specified, using a new unique block
485                         // locator to prevent interference from previous
486                         // tests.
487
488                         setupScenario := func() (string, []byte) {
489                                 nextKey++
490                                 blk := []byte(fmt.Sprintf("%d", nextKey))
491                                 loc := fmt.Sprintf("%x", md5.Sum(blk))
492                                 key := loc
493                                 if prefixLength > 0 {
494                                         key = loc[:prefixLength] + "/" + loc
495                                 }
496                                 c.Log("\t", loc, "\t", key)
497                                 putS3Obj(scenario.dataT, key, blk)
498                                 putS3Obj(scenario.recentT, "recent/"+key, nil)
499                                 putS3Obj(scenario.trashT, "trash/"+key, blk)
500                                 v.s3fakeClock.now = &t0
501                                 return loc, blk
502                         }
503
504                         // Check canGet
505                         loc, blk := setupScenario()
506                         err := v.BlockRead(context.Background(), loc, brdiscard)
507                         c.Check(err == nil, check.Equals, scenario.canGet, check.Commentf("err was %+v", err))
508                         if err != nil {
509                                 c.Check(os.IsNotExist(err), check.Equals, true)
510                         }
511
512                         // Call Trash, then check canTrash and canGetAfterTrash
513                         loc, _ = setupScenario()
514                         err = v.BlockTrash(loc)
515                         c.Check(err == nil, check.Equals, scenario.canTrash)
516                         err = v.BlockRead(context.Background(), loc, brdiscard)
517                         c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
518                         if err != nil {
519                                 c.Check(os.IsNotExist(err), check.Equals, true)
520                         }
521
522                         // Call Untrash, then check canUntrash
523                         loc, _ = setupScenario()
524                         err = v.BlockUntrash(loc)
525                         c.Check(err == nil, check.Equals, scenario.canUntrash)
526                         if scenario.dataT != none || scenario.trashT != none {
527                                 // In all scenarios where the data exists, we
528                                 // should be able to Get after Untrash --
529                                 // regardless of timestamps, errors, race
530                                 // conditions, etc.
531                                 err = v.BlockRead(context.Background(), loc, brdiscard)
532                                 c.Check(err, check.IsNil)
533                         }
534
535                         // Call EmptyTrash, then check haveTrashAfterEmpty and
536                         // freshAfterEmpty
537                         loc, _ = setupScenario()
538                         v.EmptyTrash()
539                         _, err = v.head("trash/" + v.key(loc))
540                         c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
541                         if scenario.freshAfterEmpty {
542                                 t, err := v.Mtime(loc)
543                                 c.Check(err, check.IsNil)
544                                 // new mtime must be current (with an
545                                 // allowance for 1s timestamp precision)
546                                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
547                         }
548
549                         // Check for current Mtime after Put (applies to all
550                         // scenarios)
551                         loc, blk = setupScenario()
552                         err = v.BlockWrite(context.Background(), loc, blk)
553                         c.Check(err, check.IsNil)
554                         t, err := v.Mtime(loc)
555                         c.Check(err, check.IsNil)
556                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
557                 }
558         }
559 }
560
561 type testableS3Volume struct {
562         *s3Volume
563         server      *httptest.Server
564         c           *check.C
565         s3fakeClock *s3fakeClock
566 }
567
568 type gofakes3logger struct {
569         logrus.FieldLogger
570 }
571
572 func (l gofakes3logger) Print(level gofakes3.LogLevel, v ...interface{}) {
573         switch level {
574         case gofakes3.LogErr:
575                 l.Errorln(v...)
576         case gofakes3.LogWarn:
577                 l.Warnln(v...)
578         case gofakes3.LogInfo:
579                 l.Infoln(v...)
580         default:
581                 panic("unknown level")
582         }
583 }
584
585 var testBucketSerial atomic.Int64
586
587 func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
588         if params.Logger == nil {
589                 params.Logger = ctxlog.TestLogger(c)
590         }
591         if s.s3server == nil {
592                 backend := s3mem.New(s3mem.WithTimeSource(s.s3fakeClock))
593                 logger := ctxlog.TestLogger(c)
594                 faker := gofakes3.New(backend,
595                         gofakes3.WithTimeSource(s.s3fakeClock),
596                         gofakes3.WithLogger(gofakes3logger{FieldLogger: logger}),
597                         gofakes3.WithTimeSkewLimit(0))
598                 s.s3server = httptest.NewServer(faker.Server())
599         }
600         endpoint := s.s3server.URL
601         bucketName := fmt.Sprintf("testbucket%d", testBucketSerial.Add(1))
602
603         var metadataURL, accessKey, secretKey string
604         if s.metadata != nil {
605                 metadataURL = s.metadata.URL
606         } else {
607                 accessKey, secretKey = "xxx", "xxx"
608         }
609
610         v := &testableS3Volume{
611                 s3Volume: &s3Volume{
612                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
613                                 AccessKeyID:        accessKey,
614                                 SecretAccessKey:    secretKey,
615                                 Bucket:             bucketName,
616                                 Endpoint:           endpoint,
617                                 Region:             "test-region-1",
618                                 LocationConstraint: true,
619                                 UnsafeDelete:       true,
620                                 IndexPageSize:      1000,
621                                 UsePathStyle:       true,
622                         },
623                         cluster:    params.Cluster,
624                         volume:     params.ConfigVolume,
625                         logger:     params.Logger,
626                         metrics:    params.MetricsVecs,
627                         bufferPool: params.BufferPool,
628                 },
629                 c:           c,
630                 s3fakeClock: s.s3fakeClock,
631         }
632         c.Assert(v.s3Volume.check(metadataURL), check.IsNil)
633         // Create the testbucket
634         input := &s3.CreateBucketInput{
635                 Bucket: aws.String(bucketName),
636         }
637         _, err := v.s3Volume.bucket.svc.CreateBucket(context.Background(), input)
638         c.Assert(err, check.IsNil)
639         // We couldn't set RaceWindow until now because check()
640         // rejects negative values.
641         v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
642         return v
643 }
644
645 func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
646         key := v.key(loc)
647         r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
648
649         uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
650                 u.PartSize = 5 * 1024 * 1024
651                 u.Concurrency = 13
652         })
653
654         _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
655                 Bucket: aws.String(v.bucket.bucket),
656                 Key:    aws.String(key),
657                 Body:   r,
658         })
659         if err != nil {
660                 return err
661         }
662
663         empty := bytes.NewReader([]byte{})
664         _, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
665                 Bucket: aws.String(v.bucket.bucket),
666                 Key:    aws.String("recent/" + key),
667                 Body:   empty,
668         })
669         return err
670 }
671
672 // TouchWithDate turns back the clock while doing a Touch(). We assume
673 // there are no other operations happening on the same s3test server
674 // while we do this.
675 func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
676         v.s3fakeClock.now = &lastPut
677
678         uploader := manager.NewUploader(v.bucket.svc)
679         empty := bytes.NewReader([]byte{})
680         _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{
681                 Bucket: aws.String(v.bucket.bucket),
682                 Key:    aws.String("recent/" + v.key(loc)),
683                 Body:   empty,
684         })
685         if err != nil {
686                 panic(err)
687         }
688
689         v.s3fakeClock.now = nil
690 }
691
692 func (v *testableS3Volume) Teardown() {
693 }
694
695 func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
696         return "get", "put"
697 }