Merge branch '15531-logincluster-migrate' refs #15531
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "log"
15         "net/http"
16         "net/http/httptest"
17         "os"
18         "strings"
19         "time"
20
21         "git.curoverse.com/arvados.git/sdk/go/arvados"
22         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
23         "github.com/AdRoll/goamz/s3"
24         "github.com/AdRoll/goamz/s3/s3test"
25         "github.com/prometheus/client_golang/prometheus"
26         "github.com/sirupsen/logrus"
27         check "gopkg.in/check.v1"
28 )
29
30 const (
31         TestBucketName = "testbucket"
32 )
33
34 type fakeClock struct {
35         now *time.Time
36 }
37
38 func (c *fakeClock) Now() time.Time {
39         if c.now == nil {
40                 return time.Now()
41         }
42         return *c.now
43 }
44
45 var _ = check.Suite(&StubbedS3Suite{})
46
47 type StubbedS3Suite struct {
48         s3server *httptest.Server
49         metadata *httptest.Server
50         cluster  *arvados.Cluster
51         handler  *handler
52         volumes  []*TestableS3Volume
53 }
54
55 func (s *StubbedS3Suite) SetUpTest(c *check.C) {
56         s.s3server = nil
57         s.metadata = nil
58         s.cluster = testCluster(c)
59         s.cluster.Volumes = map[string]arvados.Volume{
60                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
61                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
62         }
63         s.handler = &handler{}
64 }
65
66 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
67         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
68                 // Use a negative raceWindow so s3test's 1-second
69                 // timestamp precision doesn't confuse fixRace.
70                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
71         })
72 }
73
74 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
75         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
76                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
77         })
78 }
79
80 func (s *StubbedS3Suite) TestIndex(c *check.C) {
81         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
82         v.IndexPageSize = 3
83         for i := 0; i < 256; i++ {
84                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
85         }
86         for _, spec := range []struct {
87                 prefix      string
88                 expectMatch int
89         }{
90                 {"", 256},
91                 {"c", 16},
92                 {"bc", 1},
93                 {"abc", 0},
94         } {
95                 buf := new(bytes.Buffer)
96                 err := v.IndexTo(spec.prefix, buf)
97                 c.Check(err, check.IsNil)
98
99                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
100                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
101                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
102         }
103 }
104
105 func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
106         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
107                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
108                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
109                 // Literal example from
110                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
111                 // but with updated timestamps
112                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
113         }))
114         defer s.metadata.Close()
115
116         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
117         c.Check(v.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
118         c.Check(v.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
119         c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
120         c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
121
122         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
123                 w.WriteHeader(http.StatusNotFound)
124         }))
125         deadv := &S3Volume{
126                 IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
127                 Endpoint: "http://localhost:12345",
128                 Region:   "test-region-1",
129                 Bucket:   "test-bucket-name",
130                 cluster:  s.cluster,
131                 logger:   ctxlog.TestLogger(c),
132                 metrics:  newVolumeMetricsVecs(prometheus.NewRegistry()),
133         }
134         err := deadv.check()
135         c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
136         c.Check(err, check.ErrorMatches, `.*404.*`)
137 }
138
139 func (s *StubbedS3Suite) TestStats(c *check.C) {
140         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
141         stats := func() string {
142                 buf, err := json.Marshal(v.InternalStats())
143                 c.Check(err, check.IsNil)
144                 return string(buf)
145         }
146
147         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
148
149         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
150         _, err := v.Get(context.Background(), loc, make([]byte, 3))
151         c.Check(err, check.NotNil)
152         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
153         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
154         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
155
156         err = v.Put(context.Background(), loc, []byte("foo"))
157         c.Check(err, check.IsNil)
158         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
159         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
160
161         _, err = v.Get(context.Background(), loc, make([]byte, 3))
162         c.Check(err, check.IsNil)
163         _, err = v.Get(context.Background(), loc, make([]byte, 3))
164         c.Check(err, check.IsNil)
165         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
166 }
167
168 type blockingHandler struct {
169         requested chan *http.Request
170         unblock   chan struct{}
171 }
172
173 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
174         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
175                 // Accept PutBucket ("PUT /bucketname/"), called by
176                 // newTestableVolume
177                 return
178         }
179         if h.requested != nil {
180                 h.requested <- r
181         }
182         if h.unblock != nil {
183                 <-h.unblock
184         }
185         http.Error(w, "nothing here", http.StatusNotFound)
186 }
187
188 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
189         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
190         buf := make([]byte, 3)
191
192         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
193                 _, err := v.Get(ctx, loc, buf)
194                 return err
195         })
196 }
197
198 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
199         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
200         buf := []byte("bar")
201
202         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
203                 return v.Compare(ctx, loc, buf)
204         })
205 }
206
207 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
208         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
209         buf := []byte("foo")
210
211         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
212                 return v.Put(ctx, loc, buf)
213         })
214 }
215
216 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
217         handler := &blockingHandler{}
218         s.s3server = httptest.NewServer(handler)
219         defer s.s3server.Close()
220
221         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
222
223         ctx, cancel := context.WithCancel(context.Background())
224
225         handler.requested = make(chan *http.Request)
226         handler.unblock = make(chan struct{})
227         defer close(handler.unblock)
228
229         doneFunc := make(chan struct{})
230         go func() {
231                 err := testFunc(ctx, v)
232                 c.Check(err, check.Equals, context.Canceled)
233                 close(doneFunc)
234         }()
235
236         timeout := time.After(10 * time.Second)
237
238         // Wait for the stub server to receive a request, meaning
239         // Get() is waiting for an s3 operation.
240         select {
241         case <-timeout:
242                 c.Fatal("timed out waiting for test func to call our handler")
243         case <-doneFunc:
244                 c.Fatal("test func finished without even calling our handler!")
245         case <-handler.requested:
246         }
247
248         cancel()
249
250         select {
251         case <-timeout:
252                 c.Fatal("timed out")
253         case <-doneFunc:
254         }
255 }
256
257 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
258         s.cluster.Collections.BlobTrashLifetime.Set("1h")
259         s.cluster.Collections.BlobSigningTTL.Set("1h")
260
261         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
262         var none time.Time
263
264         putS3Obj := func(t time.Time, key string, data []byte) {
265                 if t == none {
266                         return
267                 }
268                 v.serverClock.now = &t
269                 v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
270         }
271
272         t0 := time.Now()
273         nextKey := 0
274         for _, scenario := range []struct {
275                 label               string
276                 dataT               time.Time
277                 recentT             time.Time
278                 trashT              time.Time
279                 canGet              bool
280                 canTrash            bool
281                 canGetAfterTrash    bool
282                 canUntrash          bool
283                 haveTrashAfterEmpty bool
284                 freshAfterEmpty     bool
285         }{
286                 {
287                         "No related objects",
288                         none, none, none,
289                         false, false, false, false, false, false,
290                 },
291                 {
292                         // Stored by older version, or there was a
293                         // race between EmptyTrash and Put: Trash is a
294                         // no-op even though the data object is very
295                         // old
296                         "No recent/X",
297                         t0.Add(-48 * time.Hour), none, none,
298                         true, true, true, false, false, false,
299                 },
300                 {
301                         "Not trash, but old enough to be eligible for trash",
302                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
303                         true, true, false, false, false, false,
304                 },
305                 {
306                         "Not trash, and not old enough to be eligible for trash",
307                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
308                         true, true, true, false, false, false,
309                 },
310                 {
311                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
312                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
313                         true, true, true, true, true, false,
314                 },
315                 {
316                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
317                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
318                         true, false, true, true, true, false,
319                 },
320                 {
321                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
322                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
323                         true, false, true, true, false, false,
324                 },
325                 {
326                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
327                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
328                         true, false, true, true, true, true,
329                 },
330                 {
331                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
332                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
333                         true, true, true, true, false, false,
334                 },
335                 {
336                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
337                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
338                         true, false, true, true, false, false,
339                 },
340                 {
341                         "Trash, not yet eligible for deletion",
342                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
343                         false, false, false, true, true, false,
344                 },
345                 {
346                         "Trash, not yet eligible for deletion, prone to races",
347                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
348                         false, false, false, true, true, false,
349                 },
350                 {
351                         "Trash, eligible for deletion",
352                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
353                         false, false, false, true, false, false,
354                 },
355                 {
356                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
357                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
358                         true, false, true, true, true, false,
359                 },
360                 {
361                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
362                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
363                         true, false, true, true, true, false,
364                 },
365                 {
366                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
367                         none, none, t0.Add(-time.Minute),
368                         false, false, false, true, true, true,
369                 },
370         } {
371                 c.Log("Scenario: ", scenario.label)
372
373                 // We have a few tests to run for each scenario, and
374                 // the tests are expected to change state. By calling
375                 // this setup func between tests, we (re)create the
376                 // scenario as specified, using a new unique block
377                 // locator to prevent interference from previous
378                 // tests.
379
380                 setupScenario := func() (string, []byte) {
381                         nextKey++
382                         blk := []byte(fmt.Sprintf("%d", nextKey))
383                         loc := fmt.Sprintf("%x", md5.Sum(blk))
384                         c.Log("\t", loc)
385                         putS3Obj(scenario.dataT, loc, blk)
386                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
387                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
388                         v.serverClock.now = &t0
389                         return loc, blk
390                 }
391
392                 // Check canGet
393                 loc, blk := setupScenario()
394                 buf := make([]byte, len(blk))
395                 _, err := v.Get(context.Background(), loc, buf)
396                 c.Check(err == nil, check.Equals, scenario.canGet)
397                 if err != nil {
398                         c.Check(os.IsNotExist(err), check.Equals, true)
399                 }
400
401                 // Call Trash, then check canTrash and canGetAfterTrash
402                 loc, _ = setupScenario()
403                 err = v.Trash(loc)
404                 c.Check(err == nil, check.Equals, scenario.canTrash)
405                 _, err = v.Get(context.Background(), loc, buf)
406                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
407                 if err != nil {
408                         c.Check(os.IsNotExist(err), check.Equals, true)
409                 }
410
411                 // Call Untrash, then check canUntrash
412                 loc, _ = setupScenario()
413                 err = v.Untrash(loc)
414                 c.Check(err == nil, check.Equals, scenario.canUntrash)
415                 if scenario.dataT != none || scenario.trashT != none {
416                         // In all scenarios where the data exists, we
417                         // should be able to Get after Untrash --
418                         // regardless of timestamps, errors, race
419                         // conditions, etc.
420                         _, err = v.Get(context.Background(), loc, buf)
421                         c.Check(err, check.IsNil)
422                 }
423
424                 // Call EmptyTrash, then check haveTrashAfterEmpty and
425                 // freshAfterEmpty
426                 loc, _ = setupScenario()
427                 v.EmptyTrash()
428                 _, err = v.bucket.Head("trash/"+loc, nil)
429                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
430                 if scenario.freshAfterEmpty {
431                         t, err := v.Mtime(loc)
432                         c.Check(err, check.IsNil)
433                         // new mtime must be current (with an
434                         // allowance for 1s timestamp precision)
435                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
436                 }
437
438                 // Check for current Mtime after Put (applies to all
439                 // scenarios)
440                 loc, blk = setupScenario()
441                 err = v.Put(context.Background(), loc, blk)
442                 c.Check(err, check.IsNil)
443                 t, err := v.Mtime(loc)
444                 c.Check(err, check.IsNil)
445                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
446         }
447 }
448
449 type TestableS3Volume struct {
450         *S3Volume
451         server      *s3test.Server
452         c           *check.C
453         serverClock *fakeClock
454 }
455
456 func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3Volume {
457         clock := &fakeClock{}
458         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
459         c.Assert(err, check.IsNil)
460         endpoint := srv.URL()
461         if s.s3server != nil {
462                 endpoint = s.s3server.URL
463         }
464
465         iamRole, accessKey, secretKey := "", "xxx", "xxx"
466         if s.metadata != nil {
467                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
468         }
469
470         v := &TestableS3Volume{
471                 S3Volume: &S3Volume{
472                         AccessKey:          accessKey,
473                         SecretKey:          secretKey,
474                         IAMRole:            iamRole,
475                         Bucket:             TestBucketName,
476                         Endpoint:           endpoint,
477                         Region:             "test-region-1",
478                         LocationConstraint: true,
479                         UnsafeDelete:       true,
480                         IndexPageSize:      1000,
481                         cluster:            cluster,
482                         volume:             volume,
483                         logger:             ctxlog.TestLogger(c),
484                         metrics:            metrics,
485                 },
486                 c:           c,
487                 server:      srv,
488                 serverClock: clock,
489         }
490         c.Assert(v.S3Volume.check(), check.IsNil)
491         c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
492         // We couldn't set RaceWindow until now because check()
493         // rejects negative values.
494         v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
495         return v
496 }
497
498 // PutRaw skips the ContentMD5 test
499 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
500         err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
501         if err != nil {
502                 log.Printf("PutRaw: %s: %+v", loc, err)
503         }
504         err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
505         if err != nil {
506                 log.Printf("PutRaw: recent/%s: %+v", loc, err)
507         }
508 }
509
510 // TouchWithDate turns back the clock while doing a Touch(). We assume
511 // there are no other operations happening on the same s3test server
512 // while we do this.
513 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
514         v.serverClock.now = &lastPut
515         err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
516         if err != nil {
517                 panic(err)
518         }
519         v.serverClock.now = nil
520 }
521
522 func (v *TestableS3Volume) Teardown() {
523         v.server.Quit()
524 }
525
526 func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
527         return "get", "put"
528 }