10467: Add "client disconnect" test.
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "crypto/md5"
7         "encoding/json"
8         "fmt"
9         "io/ioutil"
10         "net/http"
11         "net/http/httptest"
12         "os"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/AdRoll/goamz/s3"
17         "github.com/AdRoll/goamz/s3/s3test"
18         log "github.com/Sirupsen/logrus"
19         check "gopkg.in/check.v1"
20 )
21
22 const (
23         TestBucketName = "testbucket"
24 )
25
26 type fakeClock struct {
27         now *time.Time
28 }
29
30 func (c *fakeClock) Now() time.Time {
31         if c.now == nil {
32                 return time.Now()
33         }
34         return *c.now
35 }
36
37 func init() {
38         // Deleting isn't safe from races, but if it's turned on
39         // anyway we do expect it to pass the generic volume tests.
40         s3UnsafeDelete = true
41 }
42
43 var _ = check.Suite(&StubbedS3Suite{})
44
45 type StubbedS3Suite struct {
46         volumes []*TestableS3Volume
47 }
48
49 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
50         DoGenericVolumeTests(c, func(t TB) TestableVolume {
51                 // Use a negative raceWindow so s3test's 1-second
52                 // timestamp precision doesn't confuse fixRace.
53                 return s.newTestableVolume(c, -2*time.Second, false, 2)
54         })
55 }
56
57 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
58         DoGenericVolumeTests(c, func(t TB) TestableVolume {
59                 return s.newTestableVolume(c, -2*time.Second, true, 2)
60         })
61 }
62
63 func (s *StubbedS3Suite) TestIndex(c *check.C) {
64         v := s.newTestableVolume(c, 0, false, 2)
65         v.IndexPageSize = 3
66         for i := 0; i < 256; i++ {
67                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
68         }
69         for _, spec := range []struct {
70                 prefix      string
71                 expectMatch int
72         }{
73                 {"", 256},
74                 {"c", 16},
75                 {"bc", 1},
76                 {"abc", 0},
77         } {
78                 buf := new(bytes.Buffer)
79                 err := v.IndexTo(spec.prefix, buf)
80                 c.Check(err, check.IsNil)
81
82                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
83                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
84                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
85         }
86 }
87
88 func (s *StubbedS3Suite) TestStats(c *check.C) {
89         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
90         stats := func() string {
91                 buf, err := json.Marshal(v.InternalStats())
92                 c.Check(err, check.IsNil)
93                 return string(buf)
94         }
95
96         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
97
98         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
99         _, err := v.Get(context.Background(), loc, make([]byte, 3))
100         c.Check(err, check.NotNil)
101         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
102         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
103         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
104
105         err = v.Put(context.Background(), loc, []byte("foo"))
106         c.Check(err, check.IsNil)
107         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
108         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
109
110         _, err = v.Get(context.Background(), loc, make([]byte, 3))
111         c.Check(err, check.IsNil)
112         _, err = v.Get(context.Background(), loc, make([]byte, 3))
113         c.Check(err, check.IsNil)
114         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
115 }
116
117 type blockingHandler struct {
118         requested chan *http.Request
119         unblock   chan struct{}
120 }
121
122 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
123         if h.requested != nil {
124                 h.requested <- r
125         }
126         if h.unblock != nil {
127                 <-h.unblock
128         }
129         http.Error(w, "nothing here", http.StatusNotFound)
130 }
131
132 func (s *StubbedS3Suite) TestClientDisconnect(c *check.C) {
133         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
134         buf := make([]byte, 3)
135
136         handler := &blockingHandler{}
137         srv := httptest.NewServer(handler)
138         defer srv.Close()
139
140         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
141         vol := *v.S3Volume
142         vol.Endpoint = srv.URL
143         v = &TestableS3Volume{S3Volume: &vol}
144         v.Start()
145
146         ctx, cancel := context.WithCancel(context.Background())
147
148         handler.requested = make(chan *http.Request)
149         handler.unblock = make(chan struct{})
150         defer close(handler.unblock)
151
152         var n int
153         var err error
154         doneGet := make(chan struct{})
155         go func() {
156                 n, err = v.Get(ctx, loc, buf)
157                 close(doneGet)
158         }()
159
160         timeout := time.After(10 * time.Second)
161
162         // Wait for the stub server to receive a request, meaning
163         // Get() is waiting for an s3 operation.
164         select {
165         case <-timeout:
166                 c.Fatal("timed out waiting for Get to call our handler")
167         case <-doneGet:
168                 c.Fatal("Get finished without calling our handler!")
169         case <-handler.requested:
170         }
171
172         cancel()
173
174         select {
175         case <-timeout:
176                 c.Fatal("timed out")
177         case <-doneGet:
178                 c.Check(n, check.Equals, 0)
179                 c.Check(err, check.NotNil)
180                 c.Check(err, check.Equals, context.Canceled)
181         }
182 }
183
184 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
185         defer func(tl, bs arvados.Duration) {
186                 theConfig.TrashLifetime = tl
187                 theConfig.BlobSignatureTTL = bs
188         }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
189         theConfig.TrashLifetime.Set("1h")
190         theConfig.BlobSignatureTTL.Set("1h")
191
192         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
193         var none time.Time
194
195         putS3Obj := func(t time.Time, key string, data []byte) {
196                 if t == none {
197                         return
198                 }
199                 v.serverClock.now = &t
200                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
201         }
202
203         t0 := time.Now()
204         nextKey := 0
205         for _, scenario := range []struct {
206                 label               string
207                 dataT               time.Time
208                 recentT             time.Time
209                 trashT              time.Time
210                 canGet              bool
211                 canTrash            bool
212                 canGetAfterTrash    bool
213                 canUntrash          bool
214                 haveTrashAfterEmpty bool
215                 freshAfterEmpty     bool
216         }{
217                 {
218                         "No related objects",
219                         none, none, none,
220                         false, false, false, false, false, false,
221                 },
222                 {
223                         // Stored by older version, or there was a
224                         // race between EmptyTrash and Put: Trash is a
225                         // no-op even though the data object is very
226                         // old
227                         "No recent/X",
228                         t0.Add(-48 * time.Hour), none, none,
229                         true, true, true, false, false, false,
230                 },
231                 {
232                         "Not trash, but old enough to be eligible for trash",
233                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
234                         true, true, false, false, false, false,
235                 },
236                 {
237                         "Not trash, and not old enough to be eligible for trash",
238                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
239                         true, true, true, false, false, false,
240                 },
241                 {
242                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
243                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
244                         true, true, true, true, true, false,
245                 },
246                 {
247                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
248                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
249                         true, false, true, true, true, false,
250                 },
251                 {
252                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
253                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
254                         true, false, true, true, false, false,
255                 },
256                 {
257                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
258                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
259                         true, false, true, true, true, true,
260                 },
261                 {
262                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
263                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
264                         true, true, true, true, false, false,
265                 },
266                 {
267                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
268                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
269                         true, false, true, true, false, false,
270                 },
271                 {
272                         "Trash, not yet eligible for deletion",
273                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
274                         false, false, false, true, true, false,
275                 },
276                 {
277                         "Trash, not yet eligible for deletion, prone to races",
278                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
279                         false, false, false, true, true, false,
280                 },
281                 {
282                         "Trash, eligible for deletion",
283                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
284                         false, false, false, true, false, false,
285                 },
286                 {
287                         "Erroneously trashed during a race, detected before TrashLifetime",
288                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
289                         true, false, true, true, true, false,
290                 },
291                 {
292                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
293                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
294                         true, false, true, true, true, false,
295                 },
296                 {
297                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
298                         none, none, t0.Add(-time.Minute),
299                         false, false, false, true, true, true,
300                 },
301         } {
302                 c.Log("Scenario: ", scenario.label)
303
304                 // We have a few tests to run for each scenario, and
305                 // the tests are expected to change state. By calling
306                 // this setup func between tests, we (re)create the
307                 // scenario as specified, using a new unique block
308                 // locator to prevent interference from previous
309                 // tests.
310
311                 setupScenario := func() (string, []byte) {
312                         nextKey++
313                         blk := []byte(fmt.Sprintf("%d", nextKey))
314                         loc := fmt.Sprintf("%x", md5.Sum(blk))
315                         c.Log("\t", loc)
316                         putS3Obj(scenario.dataT, loc, blk)
317                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
318                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
319                         v.serverClock.now = &t0
320                         return loc, blk
321                 }
322
323                 // Check canGet
324                 loc, blk := setupScenario()
325                 buf := make([]byte, len(blk))
326                 _, err := v.Get(context.Background(), loc, buf)
327                 c.Check(err == nil, check.Equals, scenario.canGet)
328                 if err != nil {
329                         c.Check(os.IsNotExist(err), check.Equals, true)
330                 }
331
332                 // Call Trash, then check canTrash and canGetAfterTrash
333                 loc, blk = setupScenario()
334                 err = v.Trash(loc)
335                 c.Check(err == nil, check.Equals, scenario.canTrash)
336                 _, err = v.Get(context.Background(), loc, buf)
337                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
338                 if err != nil {
339                         c.Check(os.IsNotExist(err), check.Equals, true)
340                 }
341
342                 // Call Untrash, then check canUntrash
343                 loc, blk = setupScenario()
344                 err = v.Untrash(loc)
345                 c.Check(err == nil, check.Equals, scenario.canUntrash)
346                 if scenario.dataT != none || scenario.trashT != none {
347                         // In all scenarios where the data exists, we
348                         // should be able to Get after Untrash --
349                         // regardless of timestamps, errors, race
350                         // conditions, etc.
351                         _, err = v.Get(context.Background(), loc, buf)
352                         c.Check(err, check.IsNil)
353                 }
354
355                 // Call EmptyTrash, then check haveTrashAfterEmpty and
356                 // freshAfterEmpty
357                 loc, blk = setupScenario()
358                 v.EmptyTrash()
359                 _, err = v.bucket.Head("trash/"+loc, nil)
360                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
361                 if scenario.freshAfterEmpty {
362                         t, err := v.Mtime(loc)
363                         c.Check(err, check.IsNil)
364                         // new mtime must be current (with an
365                         // allowance for 1s timestamp precision)
366                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
367                 }
368
369                 // Check for current Mtime after Put (applies to all
370                 // scenarios)
371                 loc, blk = setupScenario()
372                 err = v.Put(context.Background(), loc, blk)
373                 c.Check(err, check.IsNil)
374                 t, err := v.Mtime(loc)
375                 c.Check(err, check.IsNil)
376                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
377         }
378 }
379
380 type TestableS3Volume struct {
381         *S3Volume
382         server      *s3test.Server
383         c           *check.C
384         serverClock *fakeClock
385 }
386
387 func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
388         clock := &fakeClock{}
389         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
390         c.Assert(err, check.IsNil)
391
392         v := &TestableS3Volume{
393                 S3Volume: &S3Volume{
394                         Bucket:             TestBucketName,
395                         Endpoint:           srv.URL(),
396                         Region:             "test-region-1",
397                         LocationConstraint: true,
398                         RaceWindow:         arvados.Duration(raceWindow),
399                         S3Replication:      replication,
400                         UnsafeDelete:       s3UnsafeDelete,
401                         ReadOnly:           readonly,
402                         IndexPageSize:      1000,
403                 },
404                 c:           c,
405                 server:      srv,
406                 serverClock: clock,
407         }
408         v.Start()
409         err = v.bucket.PutBucket(s3.ACL("private"))
410         c.Assert(err, check.IsNil)
411         return v
412 }
413
414 func (v *TestableS3Volume) Start() error {
415         tmp, err := ioutil.TempFile("", "keepstore")
416         v.c.Assert(err, check.IsNil)
417         defer os.Remove(tmp.Name())
418         _, err = tmp.Write([]byte("xxx\n"))
419         v.c.Assert(err, check.IsNil)
420         v.c.Assert(tmp.Close(), check.IsNil)
421
422         v.S3Volume.AccessKeyFile = tmp.Name()
423         v.S3Volume.SecretKeyFile = tmp.Name()
424
425         v.c.Assert(v.S3Volume.Start(), check.IsNil)
426         return nil
427 }
428
429 // PutRaw skips the ContentMD5 test
430 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
431         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
432         if err != nil {
433                 log.Printf("PutRaw: %+v", err)
434         }
435 }
436
437 // TouchWithDate turns back the clock while doing a Touch(). We assume
438 // there are no other operations happening on the same s3test server
439 // while we do this.
440 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
441         v.serverClock.now = &lastPut
442         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
443         if err != nil {
444                 panic(err)
445         }
446         v.serverClock.now = nil
447 }
448
449 func (v *TestableS3Volume) Teardown() {
450         v.server.Quit()
451 }