Merge branch 'master' into 13937-keepstore-prometheus
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io/ioutil"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/AdRoll/goamz/s3"
21         "github.com/AdRoll/goamz/s3/s3test"
22         "github.com/ghodss/yaml"
23         "github.com/prometheus/client_golang/prometheus"
24         check "gopkg.in/check.v1"
25 )
26
27 const (
28         TestBucketName = "testbucket"
29 )
30
31 type fakeClock struct {
32         now *time.Time
33 }
34
35 func (c *fakeClock) Now() time.Time {
36         if c.now == nil {
37                 return time.Now()
38         }
39         return *c.now
40 }
41
42 func init() {
43         // Deleting isn't safe from races, but if it's turned on
44         // anyway we do expect it to pass the generic volume tests.
45         s3UnsafeDelete = true
46 }
47
48 var _ = check.Suite(&StubbedS3Suite{})
49
50 type StubbedS3Suite struct {
51         volumes []*TestableS3Volume
52 }
53
54 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
55         DoGenericVolumeTests(c, func(t TB) TestableVolume {
56                 // Use a negative raceWindow so s3test's 1-second
57                 // timestamp precision doesn't confuse fixRace.
58                 return s.newTestableVolume(c, -2*time.Second, false, 2)
59         })
60 }
61
62 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
63         DoGenericVolumeTests(c, func(t TB) TestableVolume {
64                 return s.newTestableVolume(c, -2*time.Second, true, 2)
65         })
66 }
67
68 func (s *StubbedS3Suite) TestIndex(c *check.C) {
69         v := s.newTestableVolume(c, 0, false, 2)
70         v.IndexPageSize = 3
71         for i := 0; i < 256; i++ {
72                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
73         }
74         for _, spec := range []struct {
75                 prefix      string
76                 expectMatch int
77         }{
78                 {"", 256},
79                 {"c", 16},
80                 {"bc", 1},
81                 {"abc", 0},
82         } {
83                 buf := new(bytes.Buffer)
84                 err := v.IndexTo(spec.prefix, buf)
85                 c.Check(err, check.IsNil)
86
87                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
88                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
89                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
90         }
91 }
92
93 func (s *StubbedS3Suite) TestStats(c *check.C) {
94         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
95         stats := func() string {
96                 buf, err := json.Marshal(v.InternalStats())
97                 c.Check(err, check.IsNil)
98                 return string(buf)
99         }
100
101         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
102
103         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
104         _, err := v.Get(context.Background(), loc, make([]byte, 3))
105         c.Check(err, check.NotNil)
106         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
107         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
108         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
109
110         err = v.Put(context.Background(), loc, []byte("foo"))
111         c.Check(err, check.IsNil)
112         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
113         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
114
115         _, err = v.Get(context.Background(), loc, make([]byte, 3))
116         c.Check(err, check.IsNil)
117         _, err = v.Get(context.Background(), loc, make([]byte, 3))
118         c.Check(err, check.IsNil)
119         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
120 }
121
122 type blockingHandler struct {
123         requested chan *http.Request
124         unblock   chan struct{}
125 }
126
127 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
128         if h.requested != nil {
129                 h.requested <- r
130         }
131         if h.unblock != nil {
132                 <-h.unblock
133         }
134         http.Error(w, "nothing here", http.StatusNotFound)
135 }
136
137 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
138         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
139         buf := make([]byte, 3)
140
141         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
142                 _, err := v.Get(ctx, loc, buf)
143                 return err
144         })
145 }
146
147 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
148         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
149         buf := []byte("bar")
150
151         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
152                 return v.Compare(ctx, loc, buf)
153         })
154 }
155
156 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
157         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
158         buf := []byte("foo")
159
160         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
161                 return v.Put(ctx, loc, buf)
162         })
163 }
164
165 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
166         handler := &blockingHandler{}
167         srv := httptest.NewServer(handler)
168         defer srv.Close()
169
170         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
171         vol := *v.S3Volume
172         vol.Endpoint = srv.URL
173         v = &TestableS3Volume{S3Volume: &vol}
174         metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
175                 v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
176         v.Start(metrics)
177
178         ctx, cancel := context.WithCancel(context.Background())
179
180         handler.requested = make(chan *http.Request)
181         handler.unblock = make(chan struct{})
182         defer close(handler.unblock)
183
184         doneFunc := make(chan struct{})
185         go func() {
186                 err := testFunc(ctx, v)
187                 c.Check(err, check.Equals, context.Canceled)
188                 close(doneFunc)
189         }()
190
191         timeout := time.After(10 * time.Second)
192
193         // Wait for the stub server to receive a request, meaning
194         // Get() is waiting for an s3 operation.
195         select {
196         case <-timeout:
197                 c.Fatal("timed out waiting for test func to call our handler")
198         case <-doneFunc:
199                 c.Fatal("test func finished without even calling our handler!")
200         case <-handler.requested:
201         }
202
203         cancel()
204
205         select {
206         case <-timeout:
207                 c.Fatal("timed out")
208         case <-doneFunc:
209         }
210 }
211
212 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
213         defer func(tl, bs arvados.Duration) {
214                 theConfig.TrashLifetime = tl
215                 theConfig.BlobSignatureTTL = bs
216         }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
217         theConfig.TrashLifetime.Set("1h")
218         theConfig.BlobSignatureTTL.Set("1h")
219
220         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
221         var none time.Time
222
223         putS3Obj := func(t time.Time, key string, data []byte) {
224                 if t == none {
225                         return
226                 }
227                 v.serverClock.now = &t
228                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
229         }
230
231         t0 := time.Now()
232         nextKey := 0
233         for _, scenario := range []struct {
234                 label               string
235                 dataT               time.Time
236                 recentT             time.Time
237                 trashT              time.Time
238                 canGet              bool
239                 canTrash            bool
240                 canGetAfterTrash    bool
241                 canUntrash          bool
242                 haveTrashAfterEmpty bool
243                 freshAfterEmpty     bool
244         }{
245                 {
246                         "No related objects",
247                         none, none, none,
248                         false, false, false, false, false, false,
249                 },
250                 {
251                         // Stored by older version, or there was a
252                         // race between EmptyTrash and Put: Trash is a
253                         // no-op even though the data object is very
254                         // old
255                         "No recent/X",
256                         t0.Add(-48 * time.Hour), none, none,
257                         true, true, true, false, false, false,
258                 },
259                 {
260                         "Not trash, but old enough to be eligible for trash",
261                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
262                         true, true, false, false, false, false,
263                 },
264                 {
265                         "Not trash, and not old enough to be eligible for trash",
266                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
267                         true, true, true, false, false, false,
268                 },
269                 {
270                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
271                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
272                         true, true, true, true, true, false,
273                 },
274                 {
275                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
276                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
277                         true, false, true, true, true, false,
278                 },
279                 {
280                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
281                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
282                         true, false, true, true, false, false,
283                 },
284                 {
285                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
286                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
287                         true, false, true, true, true, true,
288                 },
289                 {
290                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
291                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
292                         true, true, true, true, false, false,
293                 },
294                 {
295                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
296                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
297                         true, false, true, true, false, false,
298                 },
299                 {
300                         "Trash, not yet eligible for deletion",
301                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
302                         false, false, false, true, true, false,
303                 },
304                 {
305                         "Trash, not yet eligible for deletion, prone to races",
306                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
307                         false, false, false, true, true, false,
308                 },
309                 {
310                         "Trash, eligible for deletion",
311                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
312                         false, false, false, true, false, false,
313                 },
314                 {
315                         "Erroneously trashed during a race, detected before TrashLifetime",
316                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
317                         true, false, true, true, true, false,
318                 },
319                 {
320                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
321                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
322                         true, false, true, true, true, false,
323                 },
324                 {
325                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
326                         none, none, t0.Add(-time.Minute),
327                         false, false, false, true, true, true,
328                 },
329         } {
330                 c.Log("Scenario: ", scenario.label)
331
332                 // We have a few tests to run for each scenario, and
333                 // the tests are expected to change state. By calling
334                 // this setup func between tests, we (re)create the
335                 // scenario as specified, using a new unique block
336                 // locator to prevent interference from previous
337                 // tests.
338
339                 setupScenario := func() (string, []byte) {
340                         nextKey++
341                         blk := []byte(fmt.Sprintf("%d", nextKey))
342                         loc := fmt.Sprintf("%x", md5.Sum(blk))
343                         c.Log("\t", loc)
344                         putS3Obj(scenario.dataT, loc, blk)
345                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
346                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
347                         v.serverClock.now = &t0
348                         return loc, blk
349                 }
350
351                 // Check canGet
352                 loc, blk := setupScenario()
353                 buf := make([]byte, len(blk))
354                 _, err := v.Get(context.Background(), loc, buf)
355                 c.Check(err == nil, check.Equals, scenario.canGet)
356                 if err != nil {
357                         c.Check(os.IsNotExist(err), check.Equals, true)
358                 }
359
360                 // Call Trash, then check canTrash and canGetAfterTrash
361                 loc, _ = setupScenario()
362                 err = v.Trash(loc)
363                 c.Check(err == nil, check.Equals, scenario.canTrash)
364                 _, err = v.Get(context.Background(), loc, buf)
365                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
366                 if err != nil {
367                         c.Check(os.IsNotExist(err), check.Equals, true)
368                 }
369
370                 // Call Untrash, then check canUntrash
371                 loc, _ = setupScenario()
372                 err = v.Untrash(loc)
373                 c.Check(err == nil, check.Equals, scenario.canUntrash)
374                 if scenario.dataT != none || scenario.trashT != none {
375                         // In all scenarios where the data exists, we
376                         // should be able to Get after Untrash --
377                         // regardless of timestamps, errors, race
378                         // conditions, etc.
379                         _, err = v.Get(context.Background(), loc, buf)
380                         c.Check(err, check.IsNil)
381                 }
382
383                 // Call EmptyTrash, then check haveTrashAfterEmpty and
384                 // freshAfterEmpty
385                 loc, _ = setupScenario()
386                 v.EmptyTrash()
387                 _, err = v.bucket.Head("trash/"+loc, nil)
388                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
389                 if scenario.freshAfterEmpty {
390                         t, err := v.Mtime(loc)
391                         c.Check(err, check.IsNil)
392                         // new mtime must be current (with an
393                         // allowance for 1s timestamp precision)
394                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
395                 }
396
397                 // Check for current Mtime after Put (applies to all
398                 // scenarios)
399                 loc, blk = setupScenario()
400                 err = v.Put(context.Background(), loc, blk)
401                 c.Check(err, check.IsNil)
402                 t, err := v.Mtime(loc)
403                 c.Check(err, check.IsNil)
404                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
405         }
406 }
407
408 type TestableS3Volume struct {
409         *S3Volume
410         server      *s3test.Server
411         c           *check.C
412         serverClock *fakeClock
413 }
414
415 func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
416         clock := &fakeClock{}
417         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
418         c.Assert(err, check.IsNil)
419
420         v := &TestableS3Volume{
421                 S3Volume: &S3Volume{
422                         Bucket:             TestBucketName,
423                         Endpoint:           srv.URL(),
424                         Region:             "test-region-1",
425                         LocationConstraint: true,
426                         RaceWindow:         arvados.Duration(raceWindow),
427                         S3Replication:      replication,
428                         UnsafeDelete:       s3UnsafeDelete,
429                         ReadOnly:           readonly,
430                         IndexPageSize:      1000,
431                 },
432                 c:           c,
433                 server:      srv,
434                 serverClock: clock,
435         }
436         metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
437                 v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
438         v.Start(metrics)
439         err = v.bucket.PutBucket(s3.ACL("private"))
440         c.Assert(err, check.IsNil)
441         return v
442 }
443
444 func (s *StubbedS3Suite) TestConfig(c *check.C) {
445         var cfg Config
446         err := yaml.Unmarshal([]byte(`
447 Volumes:
448   - Type: S3
449     StorageClasses: ["class_a", "class_b"]
450 `), &cfg)
451
452         c.Check(err, check.IsNil)
453         c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
454 }
455
456 func (v *TestableS3Volume) Start(m *volumeMetrics) error {
457         tmp, err := ioutil.TempFile("", "keepstore")
458         v.c.Assert(err, check.IsNil)
459         defer os.Remove(tmp.Name())
460         _, err = tmp.Write([]byte("xxx\n"))
461         v.c.Assert(err, check.IsNil)
462         v.c.Assert(tmp.Close(), check.IsNil)
463
464         v.S3Volume.AccessKeyFile = tmp.Name()
465         v.S3Volume.SecretKeyFile = tmp.Name()
466
467         v.c.Assert(v.S3Volume.Start(m), check.IsNil)
468         return nil
469 }
470
471 // PutRaw skips the ContentMD5 test
472 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
473         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
474         if err != nil {
475                 log.Printf("PutRaw: %s: %+v", loc, err)
476         }
477         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
478         if err != nil {
479                 log.Printf("PutRaw: recent/%s: %+v", loc, err)
480         }
481 }
482
483 // TouchWithDate turns back the clock while doing a Touch(). We assume
484 // there are no other operations happening on the same s3test server
485 // while we do this.
486 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
487         v.serverClock.now = &lastPut
488         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
489         if err != nil {
490                 panic(err)
491         }
492         v.serverClock.now = nil
493 }
494
495 func (v *TestableS3Volume) Teardown() {
496         v.server.Quit()
497 }