Merge branch '11645-keepstore-storageclasses' closes #11645
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io/ioutil"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/AdRoll/goamz/s3"
21         "github.com/AdRoll/goamz/s3/s3test"
22         "github.com/ghodss/yaml"
23         check "gopkg.in/check.v1"
24 )
25
26 const (
27         TestBucketName = "testbucket"
28 )
29
30 type fakeClock struct {
31         now *time.Time
32 }
33
34 func (c *fakeClock) Now() time.Time {
35         if c.now == nil {
36                 return time.Now()
37         }
38         return *c.now
39 }
40
41 func init() {
42         // Deleting isn't safe from races, but if it's turned on
43         // anyway we do expect it to pass the generic volume tests.
44         s3UnsafeDelete = true
45 }
46
47 var _ = check.Suite(&StubbedS3Suite{})
48
49 type StubbedS3Suite struct {
50         volumes []*TestableS3Volume
51 }
52
53 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
54         DoGenericVolumeTests(c, func(t TB) TestableVolume {
55                 // Use a negative raceWindow so s3test's 1-second
56                 // timestamp precision doesn't confuse fixRace.
57                 return s.newTestableVolume(c, -2*time.Second, false, 2)
58         })
59 }
60
61 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
62         DoGenericVolumeTests(c, func(t TB) TestableVolume {
63                 return s.newTestableVolume(c, -2*time.Second, true, 2)
64         })
65 }
66
67 func (s *StubbedS3Suite) TestIndex(c *check.C) {
68         v := s.newTestableVolume(c, 0, false, 2)
69         v.IndexPageSize = 3
70         for i := 0; i < 256; i++ {
71                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
72         }
73         for _, spec := range []struct {
74                 prefix      string
75                 expectMatch int
76         }{
77                 {"", 256},
78                 {"c", 16},
79                 {"bc", 1},
80                 {"abc", 0},
81         } {
82                 buf := new(bytes.Buffer)
83                 err := v.IndexTo(spec.prefix, buf)
84                 c.Check(err, check.IsNil)
85
86                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
87                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
88                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
89         }
90 }
91
92 func (s *StubbedS3Suite) TestStats(c *check.C) {
93         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
94         stats := func() string {
95                 buf, err := json.Marshal(v.InternalStats())
96                 c.Check(err, check.IsNil)
97                 return string(buf)
98         }
99
100         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
101
102         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
103         _, err := v.Get(context.Background(), loc, make([]byte, 3))
104         c.Check(err, check.NotNil)
105         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
106         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
107         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
108
109         err = v.Put(context.Background(), loc, []byte("foo"))
110         c.Check(err, check.IsNil)
111         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
112         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
113
114         _, err = v.Get(context.Background(), loc, make([]byte, 3))
115         c.Check(err, check.IsNil)
116         _, err = v.Get(context.Background(), loc, make([]byte, 3))
117         c.Check(err, check.IsNil)
118         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
119 }
120
121 type blockingHandler struct {
122         requested chan *http.Request
123         unblock   chan struct{}
124 }
125
126 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
127         if h.requested != nil {
128                 h.requested <- r
129         }
130         if h.unblock != nil {
131                 <-h.unblock
132         }
133         http.Error(w, "nothing here", http.StatusNotFound)
134 }
135
136 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
137         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
138         buf := make([]byte, 3)
139
140         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
141                 _, err := v.Get(ctx, loc, buf)
142                 return err
143         })
144 }
145
146 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
147         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
148         buf := []byte("bar")
149
150         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
151                 return v.Compare(ctx, loc, buf)
152         })
153 }
154
155 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
156         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
157         buf := []byte("foo")
158
159         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
160                 return v.Put(ctx, loc, buf)
161         })
162 }
163
164 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
165         handler := &blockingHandler{}
166         srv := httptest.NewServer(handler)
167         defer srv.Close()
168
169         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
170         vol := *v.S3Volume
171         vol.Endpoint = srv.URL
172         v = &TestableS3Volume{S3Volume: &vol}
173         v.Start()
174
175         ctx, cancel := context.WithCancel(context.Background())
176
177         handler.requested = make(chan *http.Request)
178         handler.unblock = make(chan struct{})
179         defer close(handler.unblock)
180
181         doneFunc := make(chan struct{})
182         go func() {
183                 err := testFunc(ctx, v)
184                 c.Check(err, check.Equals, context.Canceled)
185                 close(doneFunc)
186         }()
187
188         timeout := time.After(10 * time.Second)
189
190         // Wait for the stub server to receive a request, meaning
191         // Get() is waiting for an s3 operation.
192         select {
193         case <-timeout:
194                 c.Fatal("timed out waiting for test func to call our handler")
195         case <-doneFunc:
196                 c.Fatal("test func finished without even calling our handler!")
197         case <-handler.requested:
198         }
199
200         cancel()
201
202         select {
203         case <-timeout:
204                 c.Fatal("timed out")
205         case <-doneFunc:
206         }
207 }
208
209 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
210         defer func(tl, bs arvados.Duration) {
211                 theConfig.TrashLifetime = tl
212                 theConfig.BlobSignatureTTL = bs
213         }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
214         theConfig.TrashLifetime.Set("1h")
215         theConfig.BlobSignatureTTL.Set("1h")
216
217         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
218         var none time.Time
219
220         putS3Obj := func(t time.Time, key string, data []byte) {
221                 if t == none {
222                         return
223                 }
224                 v.serverClock.now = &t
225                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
226         }
227
228         t0 := time.Now()
229         nextKey := 0
230         for _, scenario := range []struct {
231                 label               string
232                 dataT               time.Time
233                 recentT             time.Time
234                 trashT              time.Time
235                 canGet              bool
236                 canTrash            bool
237                 canGetAfterTrash    bool
238                 canUntrash          bool
239                 haveTrashAfterEmpty bool
240                 freshAfterEmpty     bool
241         }{
242                 {
243                         "No related objects",
244                         none, none, none,
245                         false, false, false, false, false, false,
246                 },
247                 {
248                         // Stored by older version, or there was a
249                         // race between EmptyTrash and Put: Trash is a
250                         // no-op even though the data object is very
251                         // old
252                         "No recent/X",
253                         t0.Add(-48 * time.Hour), none, none,
254                         true, true, true, false, false, false,
255                 },
256                 {
257                         "Not trash, but old enough to be eligible for trash",
258                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
259                         true, true, false, false, false, false,
260                 },
261                 {
262                         "Not trash, and not old enough to be eligible for trash",
263                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
264                         true, true, true, false, false, false,
265                 },
266                 {
267                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
268                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
269                         true, true, true, true, true, false,
270                 },
271                 {
272                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
273                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
274                         true, false, true, true, true, false,
275                 },
276                 {
277                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
278                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
279                         true, false, true, true, false, false,
280                 },
281                 {
282                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
283                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
284                         true, false, true, true, true, true,
285                 },
286                 {
287                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
288                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
289                         true, true, true, true, false, false,
290                 },
291                 {
292                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
293                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
294                         true, false, true, true, false, false,
295                 },
296                 {
297                         "Trash, not yet eligible for deletion",
298                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
299                         false, false, false, true, true, false,
300                 },
301                 {
302                         "Trash, not yet eligible for deletion, prone to races",
303                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
304                         false, false, false, true, true, false,
305                 },
306                 {
307                         "Trash, eligible for deletion",
308                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
309                         false, false, false, true, false, false,
310                 },
311                 {
312                         "Erroneously trashed during a race, detected before TrashLifetime",
313                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
314                         true, false, true, true, true, false,
315                 },
316                 {
317                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
318                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
319                         true, false, true, true, true, false,
320                 },
321                 {
322                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
323                         none, none, t0.Add(-time.Minute),
324                         false, false, false, true, true, true,
325                 },
326         } {
327                 c.Log("Scenario: ", scenario.label)
328
329                 // We have a few tests to run for each scenario, and
330                 // the tests are expected to change state. By calling
331                 // this setup func between tests, we (re)create the
332                 // scenario as specified, using a new unique block
333                 // locator to prevent interference from previous
334                 // tests.
335
336                 setupScenario := func() (string, []byte) {
337                         nextKey++
338                         blk := []byte(fmt.Sprintf("%d", nextKey))
339                         loc := fmt.Sprintf("%x", md5.Sum(blk))
340                         c.Log("\t", loc)
341                         putS3Obj(scenario.dataT, loc, blk)
342                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
343                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
344                         v.serverClock.now = &t0
345                         return loc, blk
346                 }
347
348                 // Check canGet
349                 loc, blk := setupScenario()
350                 buf := make([]byte, len(blk))
351                 _, err := v.Get(context.Background(), loc, buf)
352                 c.Check(err == nil, check.Equals, scenario.canGet)
353                 if err != nil {
354                         c.Check(os.IsNotExist(err), check.Equals, true)
355                 }
356
357                 // Call Trash, then check canTrash and canGetAfterTrash
358                 loc, blk = setupScenario()
359                 err = v.Trash(loc)
360                 c.Check(err == nil, check.Equals, scenario.canTrash)
361                 _, err = v.Get(context.Background(), loc, buf)
362                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
363                 if err != nil {
364                         c.Check(os.IsNotExist(err), check.Equals, true)
365                 }
366
367                 // Call Untrash, then check canUntrash
368                 loc, blk = setupScenario()
369                 err = v.Untrash(loc)
370                 c.Check(err == nil, check.Equals, scenario.canUntrash)
371                 if scenario.dataT != none || scenario.trashT != none {
372                         // In all scenarios where the data exists, we
373                         // should be able to Get after Untrash --
374                         // regardless of timestamps, errors, race
375                         // conditions, etc.
376                         _, err = v.Get(context.Background(), loc, buf)
377                         c.Check(err, check.IsNil)
378                 }
379
380                 // Call EmptyTrash, then check haveTrashAfterEmpty and
381                 // freshAfterEmpty
382                 loc, blk = setupScenario()
383                 v.EmptyTrash()
384                 _, err = v.bucket.Head("trash/"+loc, nil)
385                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
386                 if scenario.freshAfterEmpty {
387                         t, err := v.Mtime(loc)
388                         c.Check(err, check.IsNil)
389                         // new mtime must be current (with an
390                         // allowance for 1s timestamp precision)
391                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
392                 }
393
394                 // Check for current Mtime after Put (applies to all
395                 // scenarios)
396                 loc, blk = setupScenario()
397                 err = v.Put(context.Background(), loc, blk)
398                 c.Check(err, check.IsNil)
399                 t, err := v.Mtime(loc)
400                 c.Check(err, check.IsNil)
401                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
402         }
403 }
404
405 type TestableS3Volume struct {
406         *S3Volume
407         server      *s3test.Server
408         c           *check.C
409         serverClock *fakeClock
410 }
411
412 func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
413         clock := &fakeClock{}
414         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
415         c.Assert(err, check.IsNil)
416
417         v := &TestableS3Volume{
418                 S3Volume: &S3Volume{
419                         Bucket:             TestBucketName,
420                         Endpoint:           srv.URL(),
421                         Region:             "test-region-1",
422                         LocationConstraint: true,
423                         RaceWindow:         arvados.Duration(raceWindow),
424                         S3Replication:      replication,
425                         UnsafeDelete:       s3UnsafeDelete,
426                         ReadOnly:           readonly,
427                         IndexPageSize:      1000,
428                 },
429                 c:           c,
430                 server:      srv,
431                 serverClock: clock,
432         }
433         v.Start()
434         err = v.bucket.PutBucket(s3.ACL("private"))
435         c.Assert(err, check.IsNil)
436         return v
437 }
438
439 func (s *StubbedS3Suite) TestConfig(c *check.C) {
440         var cfg Config
441         err := yaml.Unmarshal([]byte(`
442 Volumes:
443   - Type: S3
444     StorageClasses: ["class_a", "class_b"]
445 `), &cfg)
446
447         c.Check(err, check.IsNil)
448         c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
449 }
450
451 func (v *TestableS3Volume) Start() error {
452         tmp, err := ioutil.TempFile("", "keepstore")
453         v.c.Assert(err, check.IsNil)
454         defer os.Remove(tmp.Name())
455         _, err = tmp.Write([]byte("xxx\n"))
456         v.c.Assert(err, check.IsNil)
457         v.c.Assert(tmp.Close(), check.IsNil)
458
459         v.S3Volume.AccessKeyFile = tmp.Name()
460         v.S3Volume.SecretKeyFile = tmp.Name()
461
462         v.c.Assert(v.S3Volume.Start(), check.IsNil)
463         return nil
464 }
465
466 // PutRaw skips the ContentMD5 test
467 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
468         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
469         if err != nil {
470                 log.Printf("PutRaw: %s: %+v", loc, err)
471         }
472         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
473         if err != nil {
474                 log.Printf("PutRaw: recent/%s: %+v", loc, err)
475         }
476 }
477
478 // TouchWithDate turns back the clock while doing a Touch(). We assume
479 // there are no other operations happening on the same s3test server
480 // while we do this.
481 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
482         v.serverClock.now = &lastPut
483         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
484         if err != nil {
485                 panic(err)
486         }
487         v.serverClock.now = nil
488 }
489
490 func (v *TestableS3Volume) Teardown() {
491         v.server.Quit()
492 }