10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "crypto/md5"
7         "encoding/json"
8         "fmt"
9         "io/ioutil"
10         "net/http"
11         "net/http/httptest"
12         "os"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/AdRoll/goamz/s3"
17         "github.com/AdRoll/goamz/s3/s3test"
18         log "github.com/Sirupsen/logrus"
19         check "gopkg.in/check.v1"
20 )
21
22 const (
23         TestBucketName = "testbucket"
24 )
25
26 type fakeClock struct {
27         now *time.Time
28 }
29
30 func (c *fakeClock) Now() time.Time {
31         if c.now == nil {
32                 return time.Now()
33         }
34         return *c.now
35 }
36
37 func init() {
38         // Deleting isn't safe from races, but if it's turned on
39         // anyway we do expect it to pass the generic volume tests.
40         s3UnsafeDelete = true
41 }
42
43 var _ = check.Suite(&StubbedS3Suite{})
44
45 type StubbedS3Suite struct {
46         volumes []*TestableS3Volume
47 }
48
49 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
50         DoGenericVolumeTests(c, func(t TB) TestableVolume {
51                 // Use a negative raceWindow so s3test's 1-second
52                 // timestamp precision doesn't confuse fixRace.
53                 return s.newTestableVolume(c, -2*time.Second, false, 2)
54         })
55 }
56
57 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
58         DoGenericVolumeTests(c, func(t TB) TestableVolume {
59                 return s.newTestableVolume(c, -2*time.Second, true, 2)
60         })
61 }
62
63 func (s *StubbedS3Suite) TestIndex(c *check.C) {
64         v := s.newTestableVolume(c, 0, false, 2)
65         v.IndexPageSize = 3
66         for i := 0; i < 256; i++ {
67                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
68         }
69         for _, spec := range []struct {
70                 prefix      string
71                 expectMatch int
72         }{
73                 {"", 256},
74                 {"c", 16},
75                 {"bc", 1},
76                 {"abc", 0},
77         } {
78                 buf := new(bytes.Buffer)
79                 err := v.IndexTo(spec.prefix, buf)
80                 c.Check(err, check.IsNil)
81
82                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
83                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
84                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
85         }
86 }
87
88 func (s *StubbedS3Suite) TestStats(c *check.C) {
89         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
90         stats := func() string {
91                 buf, err := json.Marshal(v.InternalStats())
92                 c.Check(err, check.IsNil)
93                 return string(buf)
94         }
95
96         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
97
98         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
99         _, err := v.Get(context.Background(), loc, make([]byte, 3))
100         c.Check(err, check.NotNil)
101         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
102         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
103         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
104
105         err = v.Put(context.Background(), loc, []byte("foo"))
106         c.Check(err, check.IsNil)
107         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
108         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
109
110         _, err = v.Get(context.Background(), loc, make([]byte, 3))
111         c.Check(err, check.IsNil)
112         _, err = v.Get(context.Background(), loc, make([]byte, 3))
113         c.Check(err, check.IsNil)
114         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
115 }
116
117 type blockingHandler struct {
118         requested chan *http.Request
119         unblock   chan struct{}
120 }
121
122 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
123         if h.requested != nil {
124                 h.requested <- r
125         }
126         if h.unblock != nil {
127                 <-h.unblock
128         }
129         http.Error(w, "nothing here", http.StatusNotFound)
130 }
131
132 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
133         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
134         buf := make([]byte, 3)
135
136         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
137                 _, err := v.Get(ctx, loc, buf)
138                 return err
139         })
140 }
141
142 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
143         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
144         buf := []byte("bar")
145
146         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
147                 return v.Compare(ctx, loc, buf)
148         })
149 }
150
151 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
152         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
153         buf := []byte("foo")
154
155         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
156                 return v.Put(ctx, loc, buf)
157         })
158 }
159
160 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
161         handler := &blockingHandler{}
162         srv := httptest.NewServer(handler)
163         defer srv.Close()
164
165         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
166         vol := *v.S3Volume
167         vol.Endpoint = srv.URL
168         v = &TestableS3Volume{S3Volume: &vol}
169         v.Start()
170
171         ctx, cancel := context.WithCancel(context.Background())
172
173         handler.requested = make(chan *http.Request)
174         handler.unblock = make(chan struct{})
175         defer close(handler.unblock)
176
177         doneFunc := make(chan struct{})
178         go func() {
179                 err := testFunc(ctx, v)
180                 c.Check(err, check.Equals, context.Canceled)
181                 close(doneFunc)
182         }()
183
184         timeout := time.After(10 * time.Second)
185
186         // Wait for the stub server to receive a request, meaning
187         // Get() is waiting for an s3 operation.
188         select {
189         case <-timeout:
190                 c.Fatal("timed out waiting for test func to call our handler")
191         case <-doneFunc:
192                 c.Fatal("test func finished without even calling our handler!")
193         case <-handler.requested:
194         }
195
196         cancel()
197
198         select {
199         case <-timeout:
200                 c.Fatal("timed out")
201         case <-doneFunc:
202         }
203 }
204
205 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
206         defer func(tl, bs arvados.Duration) {
207                 theConfig.TrashLifetime = tl
208                 theConfig.BlobSignatureTTL = bs
209         }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
210         theConfig.TrashLifetime.Set("1h")
211         theConfig.BlobSignatureTTL.Set("1h")
212
213         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
214         var none time.Time
215
216         putS3Obj := func(t time.Time, key string, data []byte) {
217                 if t == none {
218                         return
219                 }
220                 v.serverClock.now = &t
221                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
222         }
223
224         t0 := time.Now()
225         nextKey := 0
226         for _, scenario := range []struct {
227                 label               string
228                 dataT               time.Time
229                 recentT             time.Time
230                 trashT              time.Time
231                 canGet              bool
232                 canTrash            bool
233                 canGetAfterTrash    bool
234                 canUntrash          bool
235                 haveTrashAfterEmpty bool
236                 freshAfterEmpty     bool
237         }{
238                 {
239                         "No related objects",
240                         none, none, none,
241                         false, false, false, false, false, false,
242                 },
243                 {
244                         // Stored by older version, or there was a
245                         // race between EmptyTrash and Put: Trash is a
246                         // no-op even though the data object is very
247                         // old
248                         "No recent/X",
249                         t0.Add(-48 * time.Hour), none, none,
250                         true, true, true, false, false, false,
251                 },
252                 {
253                         "Not trash, but old enough to be eligible for trash",
254                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
255                         true, true, false, false, false, false,
256                 },
257                 {
258                         "Not trash, and not old enough to be eligible for trash",
259                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
260                         true, true, true, false, false, false,
261                 },
262                 {
263                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
264                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
265                         true, true, true, true, true, false,
266                 },
267                 {
268                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
269                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
270                         true, false, true, true, true, false,
271                 },
272                 {
273                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
274                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
275                         true, false, true, true, false, false,
276                 },
277                 {
278                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
279                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
280                         true, false, true, true, true, true,
281                 },
282                 {
283                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
284                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
285                         true, true, true, true, false, false,
286                 },
287                 {
288                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
289                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
290                         true, false, true, true, false, false,
291                 },
292                 {
293                         "Trash, not yet eligible for deletion",
294                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
295                         false, false, false, true, true, false,
296                 },
297                 {
298                         "Trash, not yet eligible for deletion, prone to races",
299                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
300                         false, false, false, true, true, false,
301                 },
302                 {
303                         "Trash, eligible for deletion",
304                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
305                         false, false, false, true, false, false,
306                 },
307                 {
308                         "Erroneously trashed during a race, detected before TrashLifetime",
309                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
310                         true, false, true, true, true, false,
311                 },
312                 {
313                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
314                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
315                         true, false, true, true, true, false,
316                 },
317                 {
318                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
319                         none, none, t0.Add(-time.Minute),
320                         false, false, false, true, true, true,
321                 },
322         } {
323                 c.Log("Scenario: ", scenario.label)
324
325                 // We have a few tests to run for each scenario, and
326                 // the tests are expected to change state. By calling
327                 // this setup func between tests, we (re)create the
328                 // scenario as specified, using a new unique block
329                 // locator to prevent interference from previous
330                 // tests.
331
332                 setupScenario := func() (string, []byte) {
333                         nextKey++
334                         blk := []byte(fmt.Sprintf("%d", nextKey))
335                         loc := fmt.Sprintf("%x", md5.Sum(blk))
336                         c.Log("\t", loc)
337                         putS3Obj(scenario.dataT, loc, blk)
338                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
339                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
340                         v.serverClock.now = &t0
341                         return loc, blk
342                 }
343
344                 // Check canGet
345                 loc, blk := setupScenario()
346                 buf := make([]byte, len(blk))
347                 _, err := v.Get(context.Background(), loc, buf)
348                 c.Check(err == nil, check.Equals, scenario.canGet)
349                 if err != nil {
350                         c.Check(os.IsNotExist(err), check.Equals, true)
351                 }
352
353                 // Call Trash, then check canTrash and canGetAfterTrash
354                 loc, blk = setupScenario()
355                 err = v.Trash(loc)
356                 c.Check(err == nil, check.Equals, scenario.canTrash)
357                 _, err = v.Get(context.Background(), loc, buf)
358                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
359                 if err != nil {
360                         c.Check(os.IsNotExist(err), check.Equals, true)
361                 }
362
363                 // Call Untrash, then check canUntrash
364                 loc, blk = setupScenario()
365                 err = v.Untrash(loc)
366                 c.Check(err == nil, check.Equals, scenario.canUntrash)
367                 if scenario.dataT != none || scenario.trashT != none {
368                         // In all scenarios where the data exists, we
369                         // should be able to Get after Untrash --
370                         // regardless of timestamps, errors, race
371                         // conditions, etc.
372                         _, err = v.Get(context.Background(), loc, buf)
373                         c.Check(err, check.IsNil)
374                 }
375
376                 // Call EmptyTrash, then check haveTrashAfterEmpty and
377                 // freshAfterEmpty
378                 loc, blk = setupScenario()
379                 v.EmptyTrash()
380                 _, err = v.bucket.Head("trash/"+loc, nil)
381                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
382                 if scenario.freshAfterEmpty {
383                         t, err := v.Mtime(loc)
384                         c.Check(err, check.IsNil)
385                         // new mtime must be current (with an
386                         // allowance for 1s timestamp precision)
387                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
388                 }
389
390                 // Check for current Mtime after Put (applies to all
391                 // scenarios)
392                 loc, blk = setupScenario()
393                 err = v.Put(context.Background(), loc, blk)
394                 c.Check(err, check.IsNil)
395                 t, err := v.Mtime(loc)
396                 c.Check(err, check.IsNil)
397                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
398         }
399 }
400
401 type TestableS3Volume struct {
402         *S3Volume
403         server      *s3test.Server
404         c           *check.C
405         serverClock *fakeClock
406 }
407
408 func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
409         clock := &fakeClock{}
410         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
411         c.Assert(err, check.IsNil)
412
413         v := &TestableS3Volume{
414                 S3Volume: &S3Volume{
415                         Bucket:             TestBucketName,
416                         Endpoint:           srv.URL(),
417                         Region:             "test-region-1",
418                         LocationConstraint: true,
419                         RaceWindow:         arvados.Duration(raceWindow),
420                         S3Replication:      replication,
421                         UnsafeDelete:       s3UnsafeDelete,
422                         ReadOnly:           readonly,
423                         IndexPageSize:      1000,
424                 },
425                 c:           c,
426                 server:      srv,
427                 serverClock: clock,
428         }
429         v.Start()
430         err = v.bucket.PutBucket(s3.ACL("private"))
431         c.Assert(err, check.IsNil)
432         return v
433 }
434
435 func (v *TestableS3Volume) Start() error {
436         tmp, err := ioutil.TempFile("", "keepstore")
437         v.c.Assert(err, check.IsNil)
438         defer os.Remove(tmp.Name())
439         _, err = tmp.Write([]byte("xxx\n"))
440         v.c.Assert(err, check.IsNil)
441         v.c.Assert(tmp.Close(), check.IsNil)
442
443         v.S3Volume.AccessKeyFile = tmp.Name()
444         v.S3Volume.SecretKeyFile = tmp.Name()
445
446         v.c.Assert(v.S3Volume.Start(), check.IsNil)
447         return nil
448 }
449
450 // PutRaw skips the ContentMD5 test
451 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
452         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
453         if err != nil {
454                 log.Printf("PutRaw: %+v", err)
455         }
456 }
457
458 // TouchWithDate turns back the clock while doing a Touch(). We assume
459 // there are no other operations happening on the same s3test server
460 // while we do this.
461 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
462         v.serverClock.now = &lastPut
463         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
464         if err != nil {
465                 panic(err)
466         }
467         v.serverClock.now = nil
468 }
469
470 func (v *TestableS3Volume) Teardown() {
471         v.server.Quit()
472 }