13647: Merge branch 'master' into 13647-keepstore-config
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "log"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
22         "github.com/AdRoll/goamz/s3"
23         "github.com/AdRoll/goamz/s3/s3test"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         check "gopkg.in/check.v1"
27 )
28
29 const (
30         TestBucketName = "testbucket"
31 )
32
33 type fakeClock struct {
34         now *time.Time
35 }
36
37 func (c *fakeClock) Now() time.Time {
38         if c.now == nil {
39                 return time.Now()
40         }
41         return *c.now
42 }
43
44 var _ = check.Suite(&StubbedS3Suite{})
45
46 type StubbedS3Suite struct {
47         s3server *httptest.Server
48         cluster  *arvados.Cluster
49         handler  *handler
50         volumes  []*TestableS3Volume
51 }
52
53 func (s *StubbedS3Suite) SetUpTest(c *check.C) {
54         s.s3server = nil
55         s.cluster = testCluster(c)
56         s.cluster.Volumes = map[string]arvados.Volume{
57                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
58                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
59         }
60         s.handler = &handler{}
61 }
62
63 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
64         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
65                 // Use a negative raceWindow so s3test's 1-second
66                 // timestamp precision doesn't confuse fixRace.
67                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
68         })
69 }
70
71 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
72         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
73                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
74         })
75 }
76
77 func (s *StubbedS3Suite) TestIndex(c *check.C) {
78         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
79         v.IndexPageSize = 3
80         for i := 0; i < 256; i++ {
81                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
82         }
83         for _, spec := range []struct {
84                 prefix      string
85                 expectMatch int
86         }{
87                 {"", 256},
88                 {"c", 16},
89                 {"bc", 1},
90                 {"abc", 0},
91         } {
92                 buf := new(bytes.Buffer)
93                 err := v.IndexTo(spec.prefix, buf)
94                 c.Check(err, check.IsNil)
95
96                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
97                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
98                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
99         }
100 }
101
102 func (s *StubbedS3Suite) TestStats(c *check.C) {
103         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
104         stats := func() string {
105                 buf, err := json.Marshal(v.InternalStats())
106                 c.Check(err, check.IsNil)
107                 return string(buf)
108         }
109
110         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
111
112         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
113         _, err := v.Get(context.Background(), loc, make([]byte, 3))
114         c.Check(err, check.NotNil)
115         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
116         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
117         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
118
119         err = v.Put(context.Background(), loc, []byte("foo"))
120         c.Check(err, check.IsNil)
121         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
122         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
123
124         _, err = v.Get(context.Background(), loc, make([]byte, 3))
125         c.Check(err, check.IsNil)
126         _, err = v.Get(context.Background(), loc, make([]byte, 3))
127         c.Check(err, check.IsNil)
128         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
129 }
130
131 type blockingHandler struct {
132         requested chan *http.Request
133         unblock   chan struct{}
134 }
135
136 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
137         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
138                 // Accept PutBucket ("PUT /bucketname/"), called by
139                 // newTestableVolume
140                 return
141         }
142         if h.requested != nil {
143                 h.requested <- r
144         }
145         if h.unblock != nil {
146                 <-h.unblock
147         }
148         http.Error(w, "nothing here", http.StatusNotFound)
149 }
150
151 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
152         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
153         buf := make([]byte, 3)
154
155         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
156                 _, err := v.Get(ctx, loc, buf)
157                 return err
158         })
159 }
160
161 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
162         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
163         buf := []byte("bar")
164
165         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
166                 return v.Compare(ctx, loc, buf)
167         })
168 }
169
170 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
171         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
172         buf := []byte("foo")
173
174         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
175                 return v.Put(ctx, loc, buf)
176         })
177 }
178
179 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
180         handler := &blockingHandler{}
181         s.s3server = httptest.NewServer(handler)
182         defer s.s3server.Close()
183
184         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
185
186         ctx, cancel := context.WithCancel(context.Background())
187
188         handler.requested = make(chan *http.Request)
189         handler.unblock = make(chan struct{})
190         defer close(handler.unblock)
191
192         doneFunc := make(chan struct{})
193         go func() {
194                 err := testFunc(ctx, v)
195                 c.Check(err, check.Equals, context.Canceled)
196                 close(doneFunc)
197         }()
198
199         timeout := time.After(10 * time.Second)
200
201         // Wait for the stub server to receive a request, meaning
202         // Get() is waiting for an s3 operation.
203         select {
204         case <-timeout:
205                 c.Fatal("timed out waiting for test func to call our handler")
206         case <-doneFunc:
207                 c.Fatal("test func finished without even calling our handler!")
208         case <-handler.requested:
209         }
210
211         cancel()
212
213         select {
214         case <-timeout:
215                 c.Fatal("timed out")
216         case <-doneFunc:
217         }
218 }
219
220 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
221         s.cluster.Collections.BlobTrashLifetime.Set("1h")
222         s.cluster.Collections.BlobSigningTTL.Set("1h")
223
224         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
225         var none time.Time
226
227         putS3Obj := func(t time.Time, key string, data []byte) {
228                 if t == none {
229                         return
230                 }
231                 v.serverClock.now = &t
232                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
233         }
234
235         t0 := time.Now()
236         nextKey := 0
237         for _, scenario := range []struct {
238                 label               string
239                 dataT               time.Time
240                 recentT             time.Time
241                 trashT              time.Time
242                 canGet              bool
243                 canTrash            bool
244                 canGetAfterTrash    bool
245                 canUntrash          bool
246                 haveTrashAfterEmpty bool
247                 freshAfterEmpty     bool
248         }{
249                 {
250                         "No related objects",
251                         none, none, none,
252                         false, false, false, false, false, false,
253                 },
254                 {
255                         // Stored by older version, or there was a
256                         // race between EmptyTrash and Put: Trash is a
257                         // no-op even though the data object is very
258                         // old
259                         "No recent/X",
260                         t0.Add(-48 * time.Hour), none, none,
261                         true, true, true, false, false, false,
262                 },
263                 {
264                         "Not trash, but old enough to be eligible for trash",
265                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
266                         true, true, false, false, false, false,
267                 },
268                 {
269                         "Not trash, and not old enough to be eligible for trash",
270                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
271                         true, true, true, false, false, false,
272                 },
273                 {
274                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
275                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
276                         true, true, true, true, true, false,
277                 },
278                 {
279                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
280                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
281                         true, false, true, true, true, false,
282                 },
283                 {
284                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
285                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
286                         true, false, true, true, false, false,
287                 },
288                 {
289                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
290                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
291                         true, false, true, true, true, true,
292                 },
293                 {
294                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
295                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
296                         true, true, true, true, false, false,
297                 },
298                 {
299                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
300                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
301                         true, false, true, true, false, false,
302                 },
303                 {
304                         "Trash, not yet eligible for deletion",
305                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
306                         false, false, false, true, true, false,
307                 },
308                 {
309                         "Trash, not yet eligible for deletion, prone to races",
310                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
311                         false, false, false, true, true, false,
312                 },
313                 {
314                         "Trash, eligible for deletion",
315                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
316                         false, false, false, true, false, false,
317                 },
318                 {
319                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
320                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
321                         true, false, true, true, true, false,
322                 },
323                 {
324                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
325                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
326                         true, false, true, true, true, false,
327                 },
328                 {
329                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
330                         none, none, t0.Add(-time.Minute),
331                         false, false, false, true, true, true,
332                 },
333         } {
334                 c.Log("Scenario: ", scenario.label)
335
336                 // We have a few tests to run for each scenario, and
337                 // the tests are expected to change state. By calling
338                 // this setup func between tests, we (re)create the
339                 // scenario as specified, using a new unique block
340                 // locator to prevent interference from previous
341                 // tests.
342
343                 setupScenario := func() (string, []byte) {
344                         nextKey++
345                         blk := []byte(fmt.Sprintf("%d", nextKey))
346                         loc := fmt.Sprintf("%x", md5.Sum(blk))
347                         c.Log("\t", loc)
348                         putS3Obj(scenario.dataT, loc, blk)
349                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
350                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
351                         v.serverClock.now = &t0
352                         return loc, blk
353                 }
354
355                 // Check canGet
356                 loc, blk := setupScenario()
357                 buf := make([]byte, len(blk))
358                 _, err := v.Get(context.Background(), loc, buf)
359                 c.Check(err == nil, check.Equals, scenario.canGet)
360                 if err != nil {
361                         c.Check(os.IsNotExist(err), check.Equals, true)
362                 }
363
364                 // Call Trash, then check canTrash and canGetAfterTrash
365                 loc, _ = setupScenario()
366                 err = v.Trash(loc)
367                 c.Check(err == nil, check.Equals, scenario.canTrash)
368                 _, err = v.Get(context.Background(), loc, buf)
369                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
370                 if err != nil {
371                         c.Check(os.IsNotExist(err), check.Equals, true)
372                 }
373
374                 // Call Untrash, then check canUntrash
375                 loc, _ = setupScenario()
376                 err = v.Untrash(loc)
377                 c.Check(err == nil, check.Equals, scenario.canUntrash)
378                 if scenario.dataT != none || scenario.trashT != none {
379                         // In all scenarios where the data exists, we
380                         // should be able to Get after Untrash --
381                         // regardless of timestamps, errors, race
382                         // conditions, etc.
383                         _, err = v.Get(context.Background(), loc, buf)
384                         c.Check(err, check.IsNil)
385                 }
386
387                 // Call EmptyTrash, then check haveTrashAfterEmpty and
388                 // freshAfterEmpty
389                 loc, _ = setupScenario()
390                 v.EmptyTrash()
391                 _, err = v.bucket.Head("trash/"+loc, nil)
392                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
393                 if scenario.freshAfterEmpty {
394                         t, err := v.Mtime(loc)
395                         c.Check(err, check.IsNil)
396                         // new mtime must be current (with an
397                         // allowance for 1s timestamp precision)
398                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
399                 }
400
401                 // Check for current Mtime after Put (applies to all
402                 // scenarios)
403                 loc, blk = setupScenario()
404                 err = v.Put(context.Background(), loc, blk)
405                 c.Check(err, check.IsNil)
406                 t, err := v.Mtime(loc)
407                 c.Check(err, check.IsNil)
408                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
409         }
410 }
411
412 type TestableS3Volume struct {
413         *S3Volume
414         server      *s3test.Server
415         c           *check.C
416         serverClock *fakeClock
417 }
418
419 func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3Volume {
420         clock := &fakeClock{}
421         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
422         c.Assert(err, check.IsNil)
423         endpoint := srv.URL()
424         if s.s3server != nil {
425                 endpoint = s.s3server.URL
426         }
427
428         v := &TestableS3Volume{
429                 S3Volume: &S3Volume{
430                         AccessKey:          "xxx",
431                         SecretKey:          "xxx",
432                         Bucket:             TestBucketName,
433                         Endpoint:           endpoint,
434                         Region:             "test-region-1",
435                         LocationConstraint: true,
436                         UnsafeDelete:       true,
437                         IndexPageSize:      1000,
438                         cluster:            cluster,
439                         volume:             volume,
440                         logger:             ctxlog.TestLogger(c),
441                         metrics:            metrics,
442                 },
443                 c:           c,
444                 server:      srv,
445                 serverClock: clock,
446         }
447         c.Assert(v.S3Volume.check(), check.IsNil)
448         c.Assert(v.bucket.PutBucket(s3.ACL("private")), check.IsNil)
449         // We couldn't set RaceWindow until now because check()
450         // rejects negative values.
451         v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
452         return v
453 }
454
455 // PutRaw skips the ContentMD5 test
456 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
457         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
458         if err != nil {
459                 log.Printf("PutRaw: %s: %+v", loc, err)
460         }
461         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
462         if err != nil {
463                 log.Printf("PutRaw: recent/%s: %+v", loc, err)
464         }
465 }
466
467 // TouchWithDate turns back the clock while doing a Touch(). We assume
468 // there are no other operations happening on the same s3test server
469 // while we do this.
470 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
471         v.serverClock.now = &lastPut
472         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
473         if err != nil {
474                 panic(err)
475         }
476         v.serverClock.now = nil
477 }
478
479 func (v *TestableS3Volume) Teardown() {
480         v.server.Quit()
481 }
482
483 func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
484         return "get", "put"
485 }