Merge branch '13023-wb-log-newlines'
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io/ioutil"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/AdRoll/goamz/s3"
21         "github.com/AdRoll/goamz/s3/s3test"
22         check "gopkg.in/check.v1"
23 )
24
25 const (
26         TestBucketName = "testbucket"
27 )
28
29 type fakeClock struct {
30         now *time.Time
31 }
32
33 func (c *fakeClock) Now() time.Time {
34         if c.now == nil {
35                 return time.Now()
36         }
37         return *c.now
38 }
39
40 func init() {
41         // Deleting isn't safe from races, but if it's turned on
42         // anyway we do expect it to pass the generic volume tests.
43         s3UnsafeDelete = true
44 }
45
46 var _ = check.Suite(&StubbedS3Suite{})
47
48 type StubbedS3Suite struct {
49         volumes []*TestableS3Volume
50 }
51
52 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
53         DoGenericVolumeTests(c, func(t TB) TestableVolume {
54                 // Use a negative raceWindow so s3test's 1-second
55                 // timestamp precision doesn't confuse fixRace.
56                 return s.newTestableVolume(c, -2*time.Second, false, 2)
57         })
58 }
59
60 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
61         DoGenericVolumeTests(c, func(t TB) TestableVolume {
62                 return s.newTestableVolume(c, -2*time.Second, true, 2)
63         })
64 }
65
66 func (s *StubbedS3Suite) TestIndex(c *check.C) {
67         v := s.newTestableVolume(c, 0, false, 2)
68         v.IndexPageSize = 3
69         for i := 0; i < 256; i++ {
70                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
71         }
72         for _, spec := range []struct {
73                 prefix      string
74                 expectMatch int
75         }{
76                 {"", 256},
77                 {"c", 16},
78                 {"bc", 1},
79                 {"abc", 0},
80         } {
81                 buf := new(bytes.Buffer)
82                 err := v.IndexTo(spec.prefix, buf)
83                 c.Check(err, check.IsNil)
84
85                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
86                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
87                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
88         }
89 }
90
91 func (s *StubbedS3Suite) TestStats(c *check.C) {
92         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
93         stats := func() string {
94                 buf, err := json.Marshal(v.InternalStats())
95                 c.Check(err, check.IsNil)
96                 return string(buf)
97         }
98
99         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
100
101         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
102         _, err := v.Get(context.Background(), loc, make([]byte, 3))
103         c.Check(err, check.NotNil)
104         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
105         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
106         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
107
108         err = v.Put(context.Background(), loc, []byte("foo"))
109         c.Check(err, check.IsNil)
110         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
111         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
112
113         _, err = v.Get(context.Background(), loc, make([]byte, 3))
114         c.Check(err, check.IsNil)
115         _, err = v.Get(context.Background(), loc, make([]byte, 3))
116         c.Check(err, check.IsNil)
117         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
118 }
119
120 type blockingHandler struct {
121         requested chan *http.Request
122         unblock   chan struct{}
123 }
124
125 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
126         if h.requested != nil {
127                 h.requested <- r
128         }
129         if h.unblock != nil {
130                 <-h.unblock
131         }
132         http.Error(w, "nothing here", http.StatusNotFound)
133 }
134
135 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
136         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
137         buf := make([]byte, 3)
138
139         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
140                 _, err := v.Get(ctx, loc, buf)
141                 return err
142         })
143 }
144
145 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
146         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
147         buf := []byte("bar")
148
149         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
150                 return v.Compare(ctx, loc, buf)
151         })
152 }
153
154 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
155         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
156         buf := []byte("foo")
157
158         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
159                 return v.Put(ctx, loc, buf)
160         })
161 }
162
163 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
164         handler := &blockingHandler{}
165         srv := httptest.NewServer(handler)
166         defer srv.Close()
167
168         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
169         vol := *v.S3Volume
170         vol.Endpoint = srv.URL
171         v = &TestableS3Volume{S3Volume: &vol}
172         v.Start()
173
174         ctx, cancel := context.WithCancel(context.Background())
175
176         handler.requested = make(chan *http.Request)
177         handler.unblock = make(chan struct{})
178         defer close(handler.unblock)
179
180         doneFunc := make(chan struct{})
181         go func() {
182                 err := testFunc(ctx, v)
183                 c.Check(err, check.Equals, context.Canceled)
184                 close(doneFunc)
185         }()
186
187         timeout := time.After(10 * time.Second)
188
189         // Wait for the stub server to receive a request, meaning
190         // Get() is waiting for an s3 operation.
191         select {
192         case <-timeout:
193                 c.Fatal("timed out waiting for test func to call our handler")
194         case <-doneFunc:
195                 c.Fatal("test func finished without even calling our handler!")
196         case <-handler.requested:
197         }
198
199         cancel()
200
201         select {
202         case <-timeout:
203                 c.Fatal("timed out")
204         case <-doneFunc:
205         }
206 }
207
208 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
209         defer func(tl, bs arvados.Duration) {
210                 theConfig.TrashLifetime = tl
211                 theConfig.BlobSignatureTTL = bs
212         }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
213         theConfig.TrashLifetime.Set("1h")
214         theConfig.BlobSignatureTTL.Set("1h")
215
216         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
217         var none time.Time
218
219         putS3Obj := func(t time.Time, key string, data []byte) {
220                 if t == none {
221                         return
222                 }
223                 v.serverClock.now = &t
224                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
225         }
226
227         t0 := time.Now()
228         nextKey := 0
229         for _, scenario := range []struct {
230                 label               string
231                 dataT               time.Time
232                 recentT             time.Time
233                 trashT              time.Time
234                 canGet              bool
235                 canTrash            bool
236                 canGetAfterTrash    bool
237                 canUntrash          bool
238                 haveTrashAfterEmpty bool
239                 freshAfterEmpty     bool
240         }{
241                 {
242                         "No related objects",
243                         none, none, none,
244                         false, false, false, false, false, false,
245                 },
246                 {
247                         // Stored by older version, or there was a
248                         // race between EmptyTrash and Put: Trash is a
249                         // no-op even though the data object is very
250                         // old
251                         "No recent/X",
252                         t0.Add(-48 * time.Hour), none, none,
253                         true, true, true, false, false, false,
254                 },
255                 {
256                         "Not trash, but old enough to be eligible for trash",
257                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
258                         true, true, false, false, false, false,
259                 },
260                 {
261                         "Not trash, and not old enough to be eligible for trash",
262                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
263                         true, true, true, false, false, false,
264                 },
265                 {
266                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
267                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
268                         true, true, true, true, true, false,
269                 },
270                 {
271                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
272                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
273                         true, false, true, true, true, false,
274                 },
275                 {
276                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
277                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
278                         true, false, true, true, false, false,
279                 },
280                 {
281                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
282                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
283                         true, false, true, true, true, true,
284                 },
285                 {
286                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
287                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
288                         true, true, true, true, false, false,
289                 },
290                 {
291                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
292                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
293                         true, false, true, true, false, false,
294                 },
295                 {
296                         "Trash, not yet eligible for deletion",
297                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
298                         false, false, false, true, true, false,
299                 },
300                 {
301                         "Trash, not yet eligible for deletion, prone to races",
302                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
303                         false, false, false, true, true, false,
304                 },
305                 {
306                         "Trash, eligible for deletion",
307                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
308                         false, false, false, true, false, false,
309                 },
310                 {
311                         "Erroneously trashed during a race, detected before TrashLifetime",
312                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
313                         true, false, true, true, true, false,
314                 },
315                 {
316                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
317                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
318                         true, false, true, true, true, false,
319                 },
320                 {
321                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
322                         none, none, t0.Add(-time.Minute),
323                         false, false, false, true, true, true,
324                 },
325         } {
326                 c.Log("Scenario: ", scenario.label)
327
328                 // We have a few tests to run for each scenario, and
329                 // the tests are expected to change state. By calling
330                 // this setup func between tests, we (re)create the
331                 // scenario as specified, using a new unique block
332                 // locator to prevent interference from previous
333                 // tests.
334
335                 setupScenario := func() (string, []byte) {
336                         nextKey++
337                         blk := []byte(fmt.Sprintf("%d", nextKey))
338                         loc := fmt.Sprintf("%x", md5.Sum(blk))
339                         c.Log("\t", loc)
340                         putS3Obj(scenario.dataT, loc, blk)
341                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
342                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
343                         v.serverClock.now = &t0
344                         return loc, blk
345                 }
346
347                 // Check canGet
348                 loc, blk := setupScenario()
349                 buf := make([]byte, len(blk))
350                 _, err := v.Get(context.Background(), loc, buf)
351                 c.Check(err == nil, check.Equals, scenario.canGet)
352                 if err != nil {
353                         c.Check(os.IsNotExist(err), check.Equals, true)
354                 }
355
356                 // Call Trash, then check canTrash and canGetAfterTrash
357                 loc, blk = setupScenario()
358                 err = v.Trash(loc)
359                 c.Check(err == nil, check.Equals, scenario.canTrash)
360                 _, err = v.Get(context.Background(), loc, buf)
361                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
362                 if err != nil {
363                         c.Check(os.IsNotExist(err), check.Equals, true)
364                 }
365
366                 // Call Untrash, then check canUntrash
367                 loc, blk = setupScenario()
368                 err = v.Untrash(loc)
369                 c.Check(err == nil, check.Equals, scenario.canUntrash)
370                 if scenario.dataT != none || scenario.trashT != none {
371                         // In all scenarios where the data exists, we
372                         // should be able to Get after Untrash --
373                         // regardless of timestamps, errors, race
374                         // conditions, etc.
375                         _, err = v.Get(context.Background(), loc, buf)
376                         c.Check(err, check.IsNil)
377                 }
378
379                 // Call EmptyTrash, then check haveTrashAfterEmpty and
380                 // freshAfterEmpty
381                 loc, blk = setupScenario()
382                 v.EmptyTrash()
383                 _, err = v.bucket.Head("trash/"+loc, nil)
384                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
385                 if scenario.freshAfterEmpty {
386                         t, err := v.Mtime(loc)
387                         c.Check(err, check.IsNil)
388                         // new mtime must be current (with an
389                         // allowance for 1s timestamp precision)
390                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
391                 }
392
393                 // Check for current Mtime after Put (applies to all
394                 // scenarios)
395                 loc, blk = setupScenario()
396                 err = v.Put(context.Background(), loc, blk)
397                 c.Check(err, check.IsNil)
398                 t, err := v.Mtime(loc)
399                 c.Check(err, check.IsNil)
400                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
401         }
402 }
403
404 type TestableS3Volume struct {
405         *S3Volume
406         server      *s3test.Server
407         c           *check.C
408         serverClock *fakeClock
409 }
410
411 func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
412         clock := &fakeClock{}
413         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
414         c.Assert(err, check.IsNil)
415
416         v := &TestableS3Volume{
417                 S3Volume: &S3Volume{
418                         Bucket:             TestBucketName,
419                         Endpoint:           srv.URL(),
420                         Region:             "test-region-1",
421                         LocationConstraint: true,
422                         RaceWindow:         arvados.Duration(raceWindow),
423                         S3Replication:      replication,
424                         UnsafeDelete:       s3UnsafeDelete,
425                         ReadOnly:           readonly,
426                         IndexPageSize:      1000,
427                 },
428                 c:           c,
429                 server:      srv,
430                 serverClock: clock,
431         }
432         v.Start()
433         err = v.bucket.PutBucket(s3.ACL("private"))
434         c.Assert(err, check.IsNil)
435         return v
436 }
437
438 func (v *TestableS3Volume) Start() error {
439         tmp, err := ioutil.TempFile("", "keepstore")
440         v.c.Assert(err, check.IsNil)
441         defer os.Remove(tmp.Name())
442         _, err = tmp.Write([]byte("xxx\n"))
443         v.c.Assert(err, check.IsNil)
444         v.c.Assert(tmp.Close(), check.IsNil)
445
446         v.S3Volume.AccessKeyFile = tmp.Name()
447         v.S3Volume.SecretKeyFile = tmp.Name()
448
449         v.c.Assert(v.S3Volume.Start(), check.IsNil)
450         return nil
451 }
452
453 // PutRaw skips the ContentMD5 test
454 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
455         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
456         if err != nil {
457                 log.Printf("PutRaw: %s: %+v", loc, err)
458         }
459         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
460         if err != nil {
461                 log.Printf("PutRaw: recent/%s: %+v", loc, err)
462         }
463 }
464
465 // TouchWithDate turns back the clock while doing a Touch(). We assume
466 // there are no other operations happening on the same s3test server
467 // while we do this.
468 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
469         v.serverClock.now = &lastPut
470         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
471         if err != nil {
472                 panic(err)
473         }
474         v.serverClock.now = nil
475 }
476
477 func (v *TestableS3Volume) Teardown() {
478         v.server.Quit()
479 }