Warn if MaxKeepBlobBuffers > MaxConcurrentRequests.
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/AdRoll/goamz/s3"
23         "github.com/AdRoll/goamz/s3/s3test"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         check "gopkg.in/check.v1"
27 )
28
29 const (
30         TestBucketName = "testbucket"
31 )
32
33 type fakeClock struct {
34         now *time.Time
35 }
36
37 func (c *fakeClock) Now() time.Time {
38         if c.now == nil {
39                 return time.Now()
40         }
41         return *c.now
42 }
43
44 var _ = check.Suite(&StubbedS3Suite{})
45
46 type StubbedS3Suite struct {
47         s3server *httptest.Server
48         metadata *httptest.Server
49         cluster  *arvados.Cluster
50         handler  *handler
51         volumes  []*TestableS3Volume
52 }
53
54 func (s *StubbedS3Suite) SetUpTest(c *check.C) {
55         s.s3server = nil
56         s.metadata = nil
57         s.cluster = testCluster(c)
58         s.cluster.Volumes = map[string]arvados.Volume{
59                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
60                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
61         }
62         s.handler = &handler{}
63 }
64
65 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
66         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
67                 // Use a negative raceWindow so s3test's 1-second
68                 // timestamp precision doesn't confuse fixRace.
69                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
70         })
71 }
72
73 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
74         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
75                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
76         })
77 }
78
79 func (s *StubbedS3Suite) TestIndex(c *check.C) {
80         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
81         v.IndexPageSize = 3
82         for i := 0; i < 256; i++ {
83                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
84         }
85         for _, spec := range []struct {
86                 prefix      string
87                 expectMatch int
88         }{
89                 {"", 256},
90                 {"c", 16},
91                 {"bc", 1},
92                 {"abc", 0},
93         } {
94                 buf := new(bytes.Buffer)
95                 err := v.IndexTo(spec.prefix, buf)
96                 c.Check(err, check.IsNil)
97
98                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
99                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
100                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
101         }
102 }
103
104 func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
105         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
106                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
107                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
108                 // Literal example from
109                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
110                 // but with updated timestamps
111                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
112         }))
113         defer s.metadata.Close()
114
115         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
116         c.Check(v.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
117         c.Check(v.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
118         c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
119         c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
120
121         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
122                 w.WriteHeader(http.StatusNotFound)
123         }))
124         deadv := &S3Volume{
125                 IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
126                 Endpoint: "http://localhost:12345",
127                 Region:   "test-region-1",
128                 Bucket:   "test-bucket-name",
129                 cluster:  s.cluster,
130                 logger:   ctxlog.TestLogger(c),
131                 metrics:  newVolumeMetricsVecs(prometheus.NewRegistry()),
132         }
133         err := deadv.check()
134         c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
135         c.Check(err, check.ErrorMatches, `.*404.*`)
136 }
137
138 func (s *StubbedS3Suite) TestStats(c *check.C) {
139         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
140         stats := func() string {
141                 buf, err := json.Marshal(v.InternalStats())
142                 c.Check(err, check.IsNil)
143                 return string(buf)
144         }
145
146         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
147
148         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
149         _, err := v.Get(context.Background(), loc, make([]byte, 3))
150         c.Check(err, check.NotNil)
151         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
152         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
153         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
154
155         err = v.Put(context.Background(), loc, []byte("foo"))
156         c.Check(err, check.IsNil)
157         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
158         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
159
160         _, err = v.Get(context.Background(), loc, make([]byte, 3))
161         c.Check(err, check.IsNil)
162         _, err = v.Get(context.Background(), loc, make([]byte, 3))
163         c.Check(err, check.IsNil)
164         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
165 }
166
167 type blockingHandler struct {
168         requested chan *http.Request
169         unblock   chan struct{}
170 }
171
172 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
173         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
174                 // Accept PutBucket ("PUT /bucketname/"), called by
175                 // newTestableVolume
176                 return
177         }
178         if h.requested != nil {
179                 h.requested <- r
180         }
181         if h.unblock != nil {
182                 <-h.unblock
183         }
184         http.Error(w, "nothing here", http.StatusNotFound)
185 }
186
187 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
188         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
189         buf := make([]byte, 3)
190
191         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
192                 _, err := v.Get(ctx, loc, buf)
193                 return err
194         })
195 }
196
197 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
198         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
199         buf := []byte("bar")
200
201         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
202                 return v.Compare(ctx, loc, buf)
203         })
204 }
205
206 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
207         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
208         buf := []byte("foo")
209
210         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
211                 return v.Put(ctx, loc, buf)
212         })
213 }
214
215 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
216         handler := &blockingHandler{}
217         s.s3server = httptest.NewServer(handler)
218         defer s.s3server.Close()
219
220         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
221
222         ctx, cancel := context.WithCancel(context.Background())
223
224         handler.requested = make(chan *http.Request)
225         handler.unblock = make(chan struct{})
226         defer close(handler.unblock)
227
228         doneFunc := make(chan struct{})
229         go func() {
230                 err := testFunc(ctx, v)
231                 c.Check(err, check.Equals, context.Canceled)
232                 close(doneFunc)
233         }()
234
235         timeout := time.After(10 * time.Second)
236
237         // Wait for the stub server to receive a request, meaning
238         // Get() is waiting for an s3 operation.
239         select {
240         case <-timeout:
241                 c.Fatal("timed out waiting for test func to call our handler")
242         case <-doneFunc:
243                 c.Fatal("test func finished without even calling our handler!")
244         case <-handler.requested:
245         }
246
247         cancel()
248
249         select {
250         case <-timeout:
251                 c.Fatal("timed out")
252         case <-doneFunc:
253         }
254 }
255
256 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
257         s.cluster.Collections.BlobTrashLifetime.Set("1h")
258         s.cluster.Collections.BlobSigningTTL.Set("1h")
259
260         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
261         var none time.Time
262
263         putS3Obj := func(t time.Time, key string, data []byte) {
264                 if t == none {
265                         return
266                 }
267                 v.serverClock.now = &t
268                 v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
269         }
270
271         t0 := time.Now()
272         nextKey := 0
273         for _, scenario := range []struct {
274                 label               string
275                 dataT               time.Time
276                 recentT             time.Time
277                 trashT              time.Time
278                 canGet              bool
279                 canTrash            bool
280                 canGetAfterTrash    bool
281                 canUntrash          bool
282                 haveTrashAfterEmpty bool
283                 freshAfterEmpty     bool
284         }{
285                 {
286                         "No related objects",
287                         none, none, none,
288                         false, false, false, false, false, false,
289                 },
290                 {
291                         // Stored by older version, or there was a
292                         // race between EmptyTrash and Put: Trash is a
293                         // no-op even though the data object is very
294                         // old
295                         "No recent/X",
296                         t0.Add(-48 * time.Hour), none, none,
297                         true, true, true, false, false, false,
298                 },
299                 {
300                         "Not trash, but old enough to be eligible for trash",
301                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
302                         true, true, false, false, false, false,
303                 },
304                 {
305                         "Not trash, and not old enough to be eligible for trash",
306                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
307                         true, true, true, false, false, false,
308                 },
309                 {
310                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
311                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
312                         true, true, true, true, true, false,
313                 },
314                 {
315                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
316                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
317                         true, false, true, true, true, false,
318                 },
319                 {
320                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
321                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
322                         true, false, true, true, false, false,
323                 },
324                 {
325                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
326                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
327                         true, false, true, true, true, true,
328                 },
329                 {
330                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
331                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
332                         true, true, true, true, false, false,
333                 },
334                 {
335                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
336                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
337                         true, false, true, true, false, false,
338                 },
339                 {
340                         "Trash, not yet eligible for deletion",
341                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
342                         false, false, false, true, true, false,
343                 },
344                 {
345                         "Trash, not yet eligible for deletion, prone to races",
346                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
347                         false, false, false, true, true, false,
348                 },
349                 {
350                         "Trash, eligible for deletion",
351                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
352                         false, false, false, true, false, false,
353                 },
354                 {
355                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
356                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
357                         true, false, true, true, true, false,
358                 },
359                 {
360                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
361                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
362                         true, false, true, true, true, false,
363                 },
364                 {
365                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
366                         none, none, t0.Add(-time.Minute),
367                         false, false, false, true, true, true,
368                 },
369         } {
370                 c.Log("Scenario: ", scenario.label)
371
372                 // We have a few tests to run for each scenario, and
373                 // the tests are expected to change state. By calling
374                 // this setup func between tests, we (re)create the
375                 // scenario as specified, using a new unique block
376                 // locator to prevent interference from previous
377                 // tests.
378
379                 setupScenario := func() (string, []byte) {
380                         nextKey++
381                         blk := []byte(fmt.Sprintf("%d", nextKey))
382                         loc := fmt.Sprintf("%x", md5.Sum(blk))
383                         c.Log("\t", loc)
384                         putS3Obj(scenario.dataT, loc, blk)
385                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
386                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
387                         v.serverClock.now = &t0
388                         return loc, blk
389                 }
390
391                 // Check canGet
392                 loc, blk := setupScenario()
393                 buf := make([]byte, len(blk))
394                 _, err := v.Get(context.Background(), loc, buf)
395                 c.Check(err == nil, check.Equals, scenario.canGet)
396                 if err != nil {
397                         c.Check(os.IsNotExist(err), check.Equals, true)
398                 }
399
400                 // Call Trash, then check canTrash and canGetAfterTrash
401                 loc, _ = setupScenario()
402                 err = v.Trash(loc)
403                 c.Check(err == nil, check.Equals, scenario.canTrash)
404                 _, err = v.Get(context.Background(), loc, buf)
405                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
406                 if err != nil {
407                         c.Check(os.IsNotExist(err), check.Equals, true)
408                 }
409
410                 // Call Untrash, then check canUntrash
411                 loc, _ = setupScenario()
412                 err = v.Untrash(loc)
413                 c.Check(err == nil, check.Equals, scenario.canUntrash)
414                 if scenario.dataT != none || scenario.trashT != none {
415                         // In all scenarios where the data exists, we
416                         // should be able to Get after Untrash --
417                         // regardless of timestamps, errors, race
418                         // conditions, etc.
419                         _, err = v.Get(context.Background(), loc, buf)
420                         c.Check(err, check.IsNil)
421                 }
422
423                 // Call EmptyTrash, then check haveTrashAfterEmpty and
424                 // freshAfterEmpty
425                 loc, _ = setupScenario()
426                 v.EmptyTrash()
427                 _, err = v.bucket.Head("trash/"+loc, nil)
428                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
429                 if scenario.freshAfterEmpty {
430                         t, err := v.Mtime(loc)
431                         c.Check(err, check.IsNil)
432                         // new mtime must be current (with an
433                         // allowance for 1s timestamp precision)
434                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
435                 }
436
437                 // Check for current Mtime after Put (applies to all
438                 // scenarios)
439                 loc, blk = setupScenario()
440                 err = v.Put(context.Background(), loc, blk)
441                 c.Check(err, check.IsNil)
442                 t, err := v.Mtime(loc)
443                 c.Check(err, check.IsNil)
444                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
445         }
446 }
447
448 type TestableS3Volume struct {
449         *S3Volume
450         server      *s3test.Server
451         c           *check.C
452         serverClock *fakeClock
453 }
454
455 func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3Volume {
456         clock := &fakeClock{}
457         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
458         c.Assert(err, check.IsNil)
459         endpoint := srv.URL()
460         if s.s3server != nil {
461                 endpoint = s.s3server.URL
462         }
463
464         iamRole, accessKey, secretKey := "", "xxx", "xxx"
465         if s.metadata != nil {
466                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
467         }
468
469         v := &TestableS3Volume{
470                 S3Volume: &S3Volume{
471                         AccessKey:          accessKey,
472                         SecretKey:          secretKey,
473                         IAMRole:            iamRole,
474                         Bucket:             TestBucketName,
475                         Endpoint:           endpoint,
476                         Region:             "test-region-1",
477                         LocationConstraint: true,
478                         UnsafeDelete:       true,
479                         IndexPageSize:      1000,
480                         cluster:            cluster,
481                         volume:             volume,
482                         logger:             ctxlog.TestLogger(c),
483                         metrics:            metrics,
484                 },
485                 c:           c,
486                 server:      srv,
487                 serverClock: clock,
488         }
489         c.Assert(v.S3Volume.check(), check.IsNil)
490         c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
491         // We couldn't set RaceWindow until now because check()
492         // rejects negative values.
493         v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
494         return v
495 }
496
497 // PutRaw skips the ContentMD5 test
498 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
499         err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
500         if err != nil {
501                 v.logger.Printf("PutRaw: %s: %+v", loc, err)
502         }
503         err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
504         if err != nil {
505                 v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
506         }
507 }
508
509 // TouchWithDate turns back the clock while doing a Touch(). We assume
510 // there are no other operations happening on the same s3test server
511 // while we do this.
512 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
513         v.serverClock.now = &lastPut
514         err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
515         if err != nil {
516                 panic(err)
517         }
518         v.serverClock.now = nil
519 }
520
521 func (v *TestableS3Volume) Teardown() {
522         v.server.Quit()
523 }
524
525 func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
526         return "get", "put"
527 }