16427: Merge branch 'master'
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/AdRoll/goamz/s3"
23         "github.com/AdRoll/goamz/s3/s3test"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         check "gopkg.in/check.v1"
27 )
28
29 const (
30         TestBucketName = "testbucket"
31 )
32
33 type fakeClock struct {
34         now *time.Time
35 }
36
37 func (c *fakeClock) Now() time.Time {
38         if c.now == nil {
39                 return time.Now()
40         }
41         return *c.now
42 }
43
44 var _ = check.Suite(&StubbedS3Suite{})
45
46 type StubbedS3Suite struct {
47         s3server *httptest.Server
48         metadata *httptest.Server
49         cluster  *arvados.Cluster
50         handler  *handler
51         volumes  []*TestableS3Volume
52 }
53
54 func (s *StubbedS3Suite) SetUpTest(c *check.C) {
55         s.s3server = nil
56         s.metadata = nil
57         s.cluster = testCluster(c)
58         s.cluster.Volumes = map[string]arvados.Volume{
59                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
60                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
61         }
62         s.handler = &handler{}
63 }
64
65 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
66         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
67                 // Use a negative raceWindow so s3test's 1-second
68                 // timestamp precision doesn't confuse fixRace.
69                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
70         })
71 }
72
73 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
74         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
75                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
76         })
77 }
78
79 func (s *StubbedS3Suite) TestIndex(c *check.C) {
80         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
81         v.IndexPageSize = 3
82         for i := 0; i < 256; i++ {
83                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
84         }
85         for _, spec := range []struct {
86                 prefix      string
87                 expectMatch int
88         }{
89                 {"", 256},
90                 {"c", 16},
91                 {"bc", 1},
92                 {"abc", 0},
93         } {
94                 buf := new(bytes.Buffer)
95                 err := v.IndexTo(spec.prefix, buf)
96                 c.Check(err, check.IsNil)
97
98                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
99                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
100                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
101         }
102 }
103
104 func (s *StubbedS3Suite) TestSignatureVersion(c *check.C) {
105         var header http.Header
106         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
107                 header = r.Header
108         }))
109         defer stub.Close()
110
111         // Default V4 signature
112         vol := S3Volume{
113                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
114                         AccessKey: "xxx",
115                         SecretKey: "xxx",
116                         Endpoint:  stub.URL,
117                         Region:    "test-region-1",
118                         Bucket:    "test-bucket-name",
119                 },
120                 cluster: s.cluster,
121                 logger:  ctxlog.TestLogger(c),
122                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
123         }
124         err := vol.check()
125         c.Check(err, check.IsNil)
126         err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
127         c.Check(err, check.IsNil)
128         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
129
130         // Force V2 signature
131         vol = S3Volume{
132                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
133                         AccessKey:   "xxx",
134                         SecretKey:   "xxx",
135                         Endpoint:    stub.URL,
136                         Region:      "test-region-1",
137                         Bucket:      "test-bucket-name",
138                         V2Signature: true,
139                 },
140                 cluster: s.cluster,
141                 logger:  ctxlog.TestLogger(c),
142                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
143         }
144         err = vol.check()
145         c.Check(err, check.IsNil)
146         err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
147         c.Check(err, check.IsNil)
148         c.Check(header.Get("Authorization"), check.Matches, `AWS xxx:.*`)
149 }
150
151 func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
152         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
153                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
154                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
155                 // Literal example from
156                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
157                 // but with updated timestamps
158                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
159         }))
160         defer s.metadata.Close()
161
162         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
163         c.Check(v.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
164         c.Check(v.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
165         c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
166         c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
167
168         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
169                 w.WriteHeader(http.StatusNotFound)
170         }))
171         deadv := &S3Volume{
172                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
173                         IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
174                         Endpoint: "http://localhost:12345",
175                         Region:   "test-region-1",
176                         Bucket:   "test-bucket-name",
177                 },
178                 cluster: s.cluster,
179                 logger:  ctxlog.TestLogger(c),
180                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
181         }
182         err := deadv.check()
183         c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
184         c.Check(err, check.ErrorMatches, `.*404.*`)
185 }
186
187 func (s *StubbedS3Suite) TestStats(c *check.C) {
188         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
189         stats := func() string {
190                 buf, err := json.Marshal(v.InternalStats())
191                 c.Check(err, check.IsNil)
192                 return string(buf)
193         }
194
195         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
196
197         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
198         _, err := v.Get(context.Background(), loc, make([]byte, 3))
199         c.Check(err, check.NotNil)
200         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
201         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
202         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
203
204         err = v.Put(context.Background(), loc, []byte("foo"))
205         c.Check(err, check.IsNil)
206         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
207         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
208
209         _, err = v.Get(context.Background(), loc, make([]byte, 3))
210         c.Check(err, check.IsNil)
211         _, err = v.Get(context.Background(), loc, make([]byte, 3))
212         c.Check(err, check.IsNil)
213         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
214 }
215
216 type blockingHandler struct {
217         requested chan *http.Request
218         unblock   chan struct{}
219 }
220
221 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
222         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
223                 // Accept PutBucket ("PUT /bucketname/"), called by
224                 // newTestableVolume
225                 return
226         }
227         if h.requested != nil {
228                 h.requested <- r
229         }
230         if h.unblock != nil {
231                 <-h.unblock
232         }
233         http.Error(w, "nothing here", http.StatusNotFound)
234 }
235
236 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
237         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
238         buf := make([]byte, 3)
239
240         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
241                 _, err := v.Get(ctx, loc, buf)
242                 return err
243         })
244 }
245
246 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
247         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
248         buf := []byte("bar")
249
250         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
251                 return v.Compare(ctx, loc, buf)
252         })
253 }
254
255 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
256         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
257         buf := []byte("foo")
258
259         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
260                 return v.Put(ctx, loc, buf)
261         })
262 }
263
264 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
265         handler := &blockingHandler{}
266         s.s3server = httptest.NewServer(handler)
267         defer s.s3server.Close()
268
269         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
270
271         ctx, cancel := context.WithCancel(context.Background())
272
273         handler.requested = make(chan *http.Request)
274         handler.unblock = make(chan struct{})
275         defer close(handler.unblock)
276
277         doneFunc := make(chan struct{})
278         go func() {
279                 err := testFunc(ctx, v)
280                 c.Check(err, check.Equals, context.Canceled)
281                 close(doneFunc)
282         }()
283
284         timeout := time.After(10 * time.Second)
285
286         // Wait for the stub server to receive a request, meaning
287         // Get() is waiting for an s3 operation.
288         select {
289         case <-timeout:
290                 c.Fatal("timed out waiting for test func to call our handler")
291         case <-doneFunc:
292                 c.Fatal("test func finished without even calling our handler!")
293         case <-handler.requested:
294         }
295
296         cancel()
297
298         select {
299         case <-timeout:
300                 c.Fatal("timed out")
301         case <-doneFunc:
302         }
303 }
304
305 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
306         s.cluster.Collections.BlobTrashLifetime.Set("1h")
307         s.cluster.Collections.BlobSigningTTL.Set("1h")
308
309         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
310         var none time.Time
311
312         putS3Obj := func(t time.Time, key string, data []byte) {
313                 if t == none {
314                         return
315                 }
316                 v.serverClock.now = &t
317                 v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
318         }
319
320         t0 := time.Now()
321         nextKey := 0
322         for _, scenario := range []struct {
323                 label               string
324                 dataT               time.Time
325                 recentT             time.Time
326                 trashT              time.Time
327                 canGet              bool
328                 canTrash            bool
329                 canGetAfterTrash    bool
330                 canUntrash          bool
331                 haveTrashAfterEmpty bool
332                 freshAfterEmpty     bool
333         }{
334                 {
335                         "No related objects",
336                         none, none, none,
337                         false, false, false, false, false, false,
338                 },
339                 {
340                         // Stored by older version, or there was a
341                         // race between EmptyTrash and Put: Trash is a
342                         // no-op even though the data object is very
343                         // old
344                         "No recent/X",
345                         t0.Add(-48 * time.Hour), none, none,
346                         true, true, true, false, false, false,
347                 },
348                 {
349                         "Not trash, but old enough to be eligible for trash",
350                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
351                         true, true, false, false, false, false,
352                 },
353                 {
354                         "Not trash, and not old enough to be eligible for trash",
355                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
356                         true, true, true, false, false, false,
357                 },
358                 {
359                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
360                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
361                         true, true, true, true, true, false,
362                 },
363                 {
364                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
365                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
366                         true, false, true, true, true, false,
367                 },
368                 {
369                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
370                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
371                         true, false, true, true, false, false,
372                 },
373                 {
374                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
375                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
376                         true, false, true, true, true, true,
377                 },
378                 {
379                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
380                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
381                         true, true, true, true, false, false,
382                 },
383                 {
384                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
385                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
386                         true, false, true, true, false, false,
387                 },
388                 {
389                         "Trash, not yet eligible for deletion",
390                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
391                         false, false, false, true, true, false,
392                 },
393                 {
394                         "Trash, not yet eligible for deletion, prone to races",
395                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
396                         false, false, false, true, true, false,
397                 },
398                 {
399                         "Trash, eligible for deletion",
400                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
401                         false, false, false, true, false, false,
402                 },
403                 {
404                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
405                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
406                         true, false, true, true, true, false,
407                 },
408                 {
409                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
410                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
411                         true, false, true, true, true, false,
412                 },
413                 {
414                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
415                         none, none, t0.Add(-time.Minute),
416                         false, false, false, true, true, true,
417                 },
418         } {
419                 c.Log("Scenario: ", scenario.label)
420
421                 // We have a few tests to run for each scenario, and
422                 // the tests are expected to change state. By calling
423                 // this setup func between tests, we (re)create the
424                 // scenario as specified, using a new unique block
425                 // locator to prevent interference from previous
426                 // tests.
427
428                 setupScenario := func() (string, []byte) {
429                         nextKey++
430                         blk := []byte(fmt.Sprintf("%d", nextKey))
431                         loc := fmt.Sprintf("%x", md5.Sum(blk))
432                         c.Log("\t", loc)
433                         putS3Obj(scenario.dataT, loc, blk)
434                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
435                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
436                         v.serverClock.now = &t0
437                         return loc, blk
438                 }
439
440                 // Check canGet
441                 loc, blk := setupScenario()
442                 buf := make([]byte, len(blk))
443                 _, err := v.Get(context.Background(), loc, buf)
444                 c.Check(err == nil, check.Equals, scenario.canGet)
445                 if err != nil {
446                         c.Check(os.IsNotExist(err), check.Equals, true)
447                 }
448
449                 // Call Trash, then check canTrash and canGetAfterTrash
450                 loc, _ = setupScenario()
451                 err = v.Trash(loc)
452                 c.Check(err == nil, check.Equals, scenario.canTrash)
453                 _, err = v.Get(context.Background(), loc, buf)
454                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
455                 if err != nil {
456                         c.Check(os.IsNotExist(err), check.Equals, true)
457                 }
458
459                 // Call Untrash, then check canUntrash
460                 loc, _ = setupScenario()
461                 err = v.Untrash(loc)
462                 c.Check(err == nil, check.Equals, scenario.canUntrash)
463                 if scenario.dataT != none || scenario.trashT != none {
464                         // In all scenarios where the data exists, we
465                         // should be able to Get after Untrash --
466                         // regardless of timestamps, errors, race
467                         // conditions, etc.
468                         _, err = v.Get(context.Background(), loc, buf)
469                         c.Check(err, check.IsNil)
470                 }
471
472                 // Call EmptyTrash, then check haveTrashAfterEmpty and
473                 // freshAfterEmpty
474                 loc, _ = setupScenario()
475                 v.EmptyTrash()
476                 _, err = v.bucket.Head("trash/"+loc, nil)
477                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
478                 if scenario.freshAfterEmpty {
479                         t, err := v.Mtime(loc)
480                         c.Check(err, check.IsNil)
481                         // new mtime must be current (with an
482                         // allowance for 1s timestamp precision)
483                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
484                 }
485
486                 // Check for current Mtime after Put (applies to all
487                 // scenarios)
488                 loc, blk = setupScenario()
489                 err = v.Put(context.Background(), loc, blk)
490                 c.Check(err, check.IsNil)
491                 t, err := v.Mtime(loc)
492                 c.Check(err, check.IsNil)
493                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
494         }
495 }
496
497 type TestableS3Volume struct {
498         *S3Volume
499         server      *s3test.Server
500         c           *check.C
501         serverClock *fakeClock
502 }
503
504 func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3Volume {
505         clock := &fakeClock{}
506         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
507         c.Assert(err, check.IsNil)
508         endpoint := srv.URL()
509         if s.s3server != nil {
510                 endpoint = s.s3server.URL
511         }
512
513         iamRole, accessKey, secretKey := "", "xxx", "xxx"
514         if s.metadata != nil {
515                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
516         }
517
518         v := &TestableS3Volume{
519                 S3Volume: &S3Volume{
520                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
521                                 IAMRole:            iamRole,
522                                 AccessKey:          accessKey,
523                                 SecretKey:          secretKey,
524                                 Bucket:             TestBucketName,
525                                 Endpoint:           endpoint,
526                                 Region:             "test-region-1",
527                                 LocationConstraint: true,
528                                 UnsafeDelete:       true,
529                                 IndexPageSize:      1000,
530                         },
531                         cluster: cluster,
532                         volume:  volume,
533                         logger:  ctxlog.TestLogger(c),
534                         metrics: metrics,
535                 },
536                 c:           c,
537                 server:      srv,
538                 serverClock: clock,
539         }
540         c.Assert(v.S3Volume.check(), check.IsNil)
541         c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
542         // We couldn't set RaceWindow until now because check()
543         // rejects negative values.
544         v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
545         return v
546 }
547
548 // PutRaw skips the ContentMD5 test
549 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
550         err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
551         if err != nil {
552                 v.logger.Printf("PutRaw: %s: %+v", loc, err)
553         }
554         err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
555         if err != nil {
556                 v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
557         }
558 }
559
560 // TouchWithDate turns back the clock while doing a Touch(). We assume
561 // there are no other operations happening on the same s3test server
562 // while we do this.
563 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
564         v.serverClock.now = &lastPut
565         err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
566         if err != nil {
567                 panic(err)
568         }
569         v.serverClock.now = nil
570 }
571
572 func (v *TestableS3Volume) Teardown() {
573         v.server.Quit()
574 }
575
576 func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
577         return "get", "put"
578 }