Merge remote-tracking branch 'origin/master' into 14484-collection-record-update
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io/ioutil"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/AdRoll/goamz/s3"
21         "github.com/AdRoll/goamz/s3/s3test"
22         "github.com/ghodss/yaml"
23         "github.com/prometheus/client_golang/prometheus"
24         check "gopkg.in/check.v1"
25 )
26
27 const (
28         TestBucketName = "testbucket"
29 )
30
31 type fakeClock struct {
32         now *time.Time
33 }
34
35 func (c *fakeClock) Now() time.Time {
36         if c.now == nil {
37                 return time.Now()
38         }
39         return *c.now
40 }
41
42 func init() {
43         // Deleting isn't safe from races, but if it's turned on
44         // anyway we do expect it to pass the generic volume tests.
45         s3UnsafeDelete = true
46 }
47
48 var _ = check.Suite(&StubbedS3Suite{})
49
50 type StubbedS3Suite struct {
51         volumes []*TestableS3Volume
52 }
53
54 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
55         DoGenericVolumeTests(c, func(t TB) TestableVolume {
56                 // Use a negative raceWindow so s3test's 1-second
57                 // timestamp precision doesn't confuse fixRace.
58                 return s.newTestableVolume(c, -2*time.Second, false, 2)
59         })
60 }
61
62 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
63         DoGenericVolumeTests(c, func(t TB) TestableVolume {
64                 return s.newTestableVolume(c, -2*time.Second, true, 2)
65         })
66 }
67
68 func (s *StubbedS3Suite) TestIndex(c *check.C) {
69         v := s.newTestableVolume(c, 0, false, 2)
70         v.IndexPageSize = 3
71         for i := 0; i < 256; i++ {
72                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
73         }
74         for _, spec := range []struct {
75                 prefix      string
76                 expectMatch int
77         }{
78                 {"", 256},
79                 {"c", 16},
80                 {"bc", 1},
81                 {"abc", 0},
82         } {
83                 buf := new(bytes.Buffer)
84                 err := v.IndexTo(spec.prefix, buf)
85                 c.Check(err, check.IsNil)
86
87                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
88                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
89                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
90         }
91 }
92
93 func (s *StubbedS3Suite) TestStats(c *check.C) {
94         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
95         stats := func() string {
96                 buf, err := json.Marshal(v.InternalStats())
97                 c.Check(err, check.IsNil)
98                 return string(buf)
99         }
100
101         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
102
103         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
104         _, err := v.Get(context.Background(), loc, make([]byte, 3))
105         c.Check(err, check.NotNil)
106         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
107         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
108         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
109
110         err = v.Put(context.Background(), loc, []byte("foo"))
111         c.Check(err, check.IsNil)
112         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
113         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
114
115         _, err = v.Get(context.Background(), loc, make([]byte, 3))
116         c.Check(err, check.IsNil)
117         _, err = v.Get(context.Background(), loc, make([]byte, 3))
118         c.Check(err, check.IsNil)
119         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
120 }
121
122 type blockingHandler struct {
123         requested chan *http.Request
124         unblock   chan struct{}
125 }
126
127 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
128         if h.requested != nil {
129                 h.requested <- r
130         }
131         if h.unblock != nil {
132                 <-h.unblock
133         }
134         http.Error(w, "nothing here", http.StatusNotFound)
135 }
136
137 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
138         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
139         buf := make([]byte, 3)
140
141         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
142                 _, err := v.Get(ctx, loc, buf)
143                 return err
144         })
145 }
146
147 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
148         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
149         buf := []byte("bar")
150
151         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
152                 return v.Compare(ctx, loc, buf)
153         })
154 }
155
156 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
157         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
158         buf := []byte("foo")
159
160         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
161                 return v.Put(ctx, loc, buf)
162         })
163 }
164
165 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
166         handler := &blockingHandler{}
167         srv := httptest.NewServer(handler)
168         defer srv.Close()
169
170         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
171         vol := *v.S3Volume
172         vol.Endpoint = srv.URL
173         v = &TestableS3Volume{S3Volume: &vol}
174         metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
175         v.Start(metrics)
176
177         ctx, cancel := context.WithCancel(context.Background())
178
179         handler.requested = make(chan *http.Request)
180         handler.unblock = make(chan struct{})
181         defer close(handler.unblock)
182
183         doneFunc := make(chan struct{})
184         go func() {
185                 err := testFunc(ctx, v)
186                 c.Check(err, check.Equals, context.Canceled)
187                 close(doneFunc)
188         }()
189
190         timeout := time.After(10 * time.Second)
191
192         // Wait for the stub server to receive a request, meaning
193         // Get() is waiting for an s3 operation.
194         select {
195         case <-timeout:
196                 c.Fatal("timed out waiting for test func to call our handler")
197         case <-doneFunc:
198                 c.Fatal("test func finished without even calling our handler!")
199         case <-handler.requested:
200         }
201
202         cancel()
203
204         select {
205         case <-timeout:
206                 c.Fatal("timed out")
207         case <-doneFunc:
208         }
209 }
210
211 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
212         defer func(tl, bs arvados.Duration) {
213                 theConfig.TrashLifetime = tl
214                 theConfig.BlobSignatureTTL = bs
215         }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
216         theConfig.TrashLifetime.Set("1h")
217         theConfig.BlobSignatureTTL.Set("1h")
218
219         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
220         var none time.Time
221
222         putS3Obj := func(t time.Time, key string, data []byte) {
223                 if t == none {
224                         return
225                 }
226                 v.serverClock.now = &t
227                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
228         }
229
230         t0 := time.Now()
231         nextKey := 0
232         for _, scenario := range []struct {
233                 label               string
234                 dataT               time.Time
235                 recentT             time.Time
236                 trashT              time.Time
237                 canGet              bool
238                 canTrash            bool
239                 canGetAfterTrash    bool
240                 canUntrash          bool
241                 haveTrashAfterEmpty bool
242                 freshAfterEmpty     bool
243         }{
244                 {
245                         "No related objects",
246                         none, none, none,
247                         false, false, false, false, false, false,
248                 },
249                 {
250                         // Stored by older version, or there was a
251                         // race between EmptyTrash and Put: Trash is a
252                         // no-op even though the data object is very
253                         // old
254                         "No recent/X",
255                         t0.Add(-48 * time.Hour), none, none,
256                         true, true, true, false, false, false,
257                 },
258                 {
259                         "Not trash, but old enough to be eligible for trash",
260                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
261                         true, true, false, false, false, false,
262                 },
263                 {
264                         "Not trash, and not old enough to be eligible for trash",
265                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
266                         true, true, true, false, false, false,
267                 },
268                 {
269                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
270                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
271                         true, true, true, true, true, false,
272                 },
273                 {
274                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
275                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
276                         true, false, true, true, true, false,
277                 },
278                 {
279                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
280                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
281                         true, false, true, true, false, false,
282                 },
283                 {
284                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
285                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
286                         true, false, true, true, true, true,
287                 },
288                 {
289                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
290                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
291                         true, true, true, true, false, false,
292                 },
293                 {
294                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
295                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
296                         true, false, true, true, false, false,
297                 },
298                 {
299                         "Trash, not yet eligible for deletion",
300                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
301                         false, false, false, true, true, false,
302                 },
303                 {
304                         "Trash, not yet eligible for deletion, prone to races",
305                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
306                         false, false, false, true, true, false,
307                 },
308                 {
309                         "Trash, eligible for deletion",
310                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
311                         false, false, false, true, false, false,
312                 },
313                 {
314                         "Erroneously trashed during a race, detected before TrashLifetime",
315                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
316                         true, false, true, true, true, false,
317                 },
318                 {
319                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
320                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
321                         true, false, true, true, true, false,
322                 },
323                 {
324                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
325                         none, none, t0.Add(-time.Minute),
326                         false, false, false, true, true, true,
327                 },
328         } {
329                 c.Log("Scenario: ", scenario.label)
330
331                 // We have a few tests to run for each scenario, and
332                 // the tests are expected to change state. By calling
333                 // this setup func between tests, we (re)create the
334                 // scenario as specified, using a new unique block
335                 // locator to prevent interference from previous
336                 // tests.
337
338                 setupScenario := func() (string, []byte) {
339                         nextKey++
340                         blk := []byte(fmt.Sprintf("%d", nextKey))
341                         loc := fmt.Sprintf("%x", md5.Sum(blk))
342                         c.Log("\t", loc)
343                         putS3Obj(scenario.dataT, loc, blk)
344                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
345                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
346                         v.serverClock.now = &t0
347                         return loc, blk
348                 }
349
350                 // Check canGet
351                 loc, blk := setupScenario()
352                 buf := make([]byte, len(blk))
353                 _, err := v.Get(context.Background(), loc, buf)
354                 c.Check(err == nil, check.Equals, scenario.canGet)
355                 if err != nil {
356                         c.Check(os.IsNotExist(err), check.Equals, true)
357                 }
358
359                 // Call Trash, then check canTrash and canGetAfterTrash
360                 loc, _ = setupScenario()
361                 err = v.Trash(loc)
362                 c.Check(err == nil, check.Equals, scenario.canTrash)
363                 _, err = v.Get(context.Background(), loc, buf)
364                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
365                 if err != nil {
366                         c.Check(os.IsNotExist(err), check.Equals, true)
367                 }
368
369                 // Call Untrash, then check canUntrash
370                 loc, _ = setupScenario()
371                 err = v.Untrash(loc)
372                 c.Check(err == nil, check.Equals, scenario.canUntrash)
373                 if scenario.dataT != none || scenario.trashT != none {
374                         // In all scenarios where the data exists, we
375                         // should be able to Get after Untrash --
376                         // regardless of timestamps, errors, race
377                         // conditions, etc.
378                         _, err = v.Get(context.Background(), loc, buf)
379                         c.Check(err, check.IsNil)
380                 }
381
382                 // Call EmptyTrash, then check haveTrashAfterEmpty and
383                 // freshAfterEmpty
384                 loc, _ = setupScenario()
385                 v.EmptyTrash()
386                 _, err = v.bucket.Head("trash/"+loc, nil)
387                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
388                 if scenario.freshAfterEmpty {
389                         t, err := v.Mtime(loc)
390                         c.Check(err, check.IsNil)
391                         // new mtime must be current (with an
392                         // allowance for 1s timestamp precision)
393                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
394                 }
395
396                 // Check for current Mtime after Put (applies to all
397                 // scenarios)
398                 loc, blk = setupScenario()
399                 err = v.Put(context.Background(), loc, blk)
400                 c.Check(err, check.IsNil)
401                 t, err := v.Mtime(loc)
402                 c.Check(err, check.IsNil)
403                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
404         }
405 }
406
407 type TestableS3Volume struct {
408         *S3Volume
409         server      *s3test.Server
410         c           *check.C
411         serverClock *fakeClock
412 }
413
414 func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
415         clock := &fakeClock{}
416         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
417         c.Assert(err, check.IsNil)
418
419         v := &TestableS3Volume{
420                 S3Volume: &S3Volume{
421                         Bucket:             TestBucketName,
422                         Endpoint:           srv.URL(),
423                         Region:             "test-region-1",
424                         LocationConstraint: true,
425                         RaceWindow:         arvados.Duration(raceWindow),
426                         S3Replication:      replication,
427                         UnsafeDelete:       s3UnsafeDelete,
428                         ReadOnly:           readonly,
429                         IndexPageSize:      1000,
430                 },
431                 c:           c,
432                 server:      srv,
433                 serverClock: clock,
434         }
435         metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
436         v.Start(metrics)
437         err = v.bucket.PutBucket(s3.ACL("private"))
438         c.Assert(err, check.IsNil)
439         return v
440 }
441
442 func (s *StubbedS3Suite) TestConfig(c *check.C) {
443         var cfg Config
444         err := yaml.Unmarshal([]byte(`
445 Volumes:
446   - Type: S3
447     StorageClasses: ["class_a", "class_b"]
448 `), &cfg)
449
450         c.Check(err, check.IsNil)
451         c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
452 }
453
454 func (v *TestableS3Volume) Start(vm *volumeMetricsVecs) error {
455         tmp, err := ioutil.TempFile("", "keepstore")
456         v.c.Assert(err, check.IsNil)
457         defer os.Remove(tmp.Name())
458         _, err = tmp.Write([]byte("xxx\n"))
459         v.c.Assert(err, check.IsNil)
460         v.c.Assert(tmp.Close(), check.IsNil)
461
462         v.S3Volume.AccessKeyFile = tmp.Name()
463         v.S3Volume.SecretKeyFile = tmp.Name()
464
465         v.c.Assert(v.S3Volume.Start(vm), check.IsNil)
466         return nil
467 }
468
469 // PutRaw skips the ContentMD5 test
470 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
471         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
472         if err != nil {
473                 log.Printf("PutRaw: %s: %+v", loc, err)
474         }
475         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
476         if err != nil {
477                 log.Printf("PutRaw: recent/%s: %+v", loc, err)
478         }
479 }
480
481 // TouchWithDate turns back the clock while doing a Touch(). We assume
482 // there are no other operations happening on the same s3test server
483 // while we do this.
484 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
485         v.serverClock.now = &lastPut
486         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
487         if err != nil {
488                 panic(err)
489         }
490         v.serverClock.now = nil
491 }
492
493 func (v *TestableS3Volume) Teardown() {
494         v.server.Quit()
495 }
496
497 func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
498         return "get", "put"
499 }