8555: Fix EmptyTrash so it can clean up old races.
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "log"
8         "os"
9         "time"
10
11         "github.com/AdRoll/goamz/aws"
12         "github.com/AdRoll/goamz/s3"
13         "github.com/AdRoll/goamz/s3/s3test"
14         check "gopkg.in/check.v1"
15 )
16
17 type TestableS3Volume struct {
18         *S3Volume
19         server      *s3test.Server
20         c           *check.C
21         serverClock *fakeClock
22 }
23
24 const (
25         TestBucketName = "testbucket"
26 )
27
28 type fakeClock struct {
29         now *time.Time
30 }
31
32 func (c *fakeClock) Now() time.Time {
33         if c.now == nil {
34                 return time.Now()
35         }
36         return *c.now
37 }
38
39 func init() {
40         // Deleting isn't safe from races, but if it's turned on
41         // anyway we do expect it to pass the generic volume tests.
42         s3UnsafeDelete = true
43 }
44
45 func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
46         clock := &fakeClock{}
47         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
48         c.Assert(err, check.IsNil)
49         auth := aws.Auth{}
50         region := aws.Region{
51                 Name:                 "test-region-1",
52                 S3Endpoint:           srv.URL(),
53                 S3LocationConstraint: true,
54         }
55         bucket := &s3.Bucket{
56                 S3:   s3.New(auth, region),
57                 Name: TestBucketName,
58         }
59         err = bucket.PutBucket(s3.ACL("private"))
60         c.Assert(err, check.IsNil)
61
62         return &TestableS3Volume{
63                 S3Volume:    NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
64                 server:      srv,
65                 serverClock: clock,
66         }
67 }
68
69 var _ = check.Suite(&StubbedS3Suite{})
70
71 type StubbedS3Suite struct {
72         volumes []*TestableS3Volume
73 }
74
75 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
76         DoGenericVolumeTests(c, func(t TB) TestableVolume {
77                 // Use a negative raceWindow so s3test's 1-second
78                 // timestamp precision doesn't confuse fixRace.
79                 return NewTestableS3Volume(c, -2*time.Second, false, 2)
80         })
81 }
82
83 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
84         DoGenericVolumeTests(c, func(t TB) TestableVolume {
85                 return NewTestableS3Volume(c, -2*time.Second, true, 2)
86         })
87 }
88
89 func (s *StubbedS3Suite) TestIndex(c *check.C) {
90         v := NewTestableS3Volume(c, 0, false, 2)
91         v.indexPageSize = 3
92         for i := 0; i < 256; i++ {
93                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
94         }
95         for _, spec := range []struct {
96                 prefix      string
97                 expectMatch int
98         }{
99                 {"", 256},
100                 {"c", 16},
101                 {"bc", 1},
102                 {"abc", 0},
103         } {
104                 buf := new(bytes.Buffer)
105                 err := v.IndexTo(spec.prefix, buf)
106                 c.Check(err, check.IsNil)
107
108                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
109                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
110                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
111         }
112 }
113
114 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
115         defer func(tl, bs time.Duration) {
116                 trashLifetime = tl
117                 blobSignatureTTL = bs
118         }(trashLifetime, blobSignatureTTL)
119         trashLifetime = time.Hour
120         blobSignatureTTL = time.Hour
121
122         v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
123         var none time.Time
124
125         stubKey := func(t time.Time, key string, data []byte) {
126                 if t == none {
127                         return
128                 }
129                 v.serverClock.now = &t
130                 v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
131         }
132
133         t0 := time.Now()
134         nextKey := 0
135         for _, test := range []struct {
136                 label               string
137                 data                time.Time
138                 recent              time.Time
139                 trash               time.Time
140                 canGet              bool
141                 canTrash            bool
142                 canGetAfterTrash    bool
143                 canUntrash          bool
144                 haveTrashAfterEmpty bool
145                 freshAfterEmpty     bool
146         }{
147                 {
148                         "No related objects",
149                         none, none, none,
150                         false, false, false, false, false, false},
151                 {
152                         // Stored by older version, or there was a
153                         // race between EmptyTrash and Put: Trash is a
154                         // no-op even though the data object is very
155                         // old
156                         "No recent/X",
157                         t0.Add(-48 * time.Hour), none, none,
158                         true, true, true, false, false, false},
159                 {
160                         "Not trash; old enough to trash",
161                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
162                         true, true, false, false, false, false},
163                 {
164                         "Not trash; not old enough to trash",
165                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
166                         true, true, true, false, false, false},
167                 {
168                         "Trash + not-trash: recent race between Trash and Put",
169                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
170                         true, true, true, true, true, false},
171                 {
172                         "Trash + not-trash, nearly eligible for deletion, prone to Trash race",
173                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
174                         true, false, true, true, true, false},
175                 {
176                         "Trash + not-trash, eligible for deletion, prone to Trash race",
177                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
178                         true, false, true, true, false, false},
179                 {
180                         "Trash + not-trash, unsafe to empty; old race between Put and unfinished Trash",
181                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
182                         true, false, true, true, true, true},
183                 {
184                         "Trash + not-trash, was unsafe to empty, but since made safe by fixRace+Touch",
185                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
186                         true, true, true, true, false, false},
187                 {
188                         "Trash operation was interrupted",
189                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
190                         true, false, true, true, false, false},
191                 {
192                         "Trash, not yet eligible for deletion",
193                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
194                         false, false, false, true, true, false},
195                 {
196                         "Trash, not yet eligible for deletion, prone to races",
197                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
198                         false, false, false, true, true, false},
199                 {
200                         "Trash, eligible for deletion",
201                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
202                         false, false, false, true, false, false},
203                 {
204                         "Erroneously trashed during a race, detected before trashLifetime",
205                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
206                         true, false, true, true, true, false},
207                 {
208                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
209                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
210                         true, false, true, true, true, false},
211         } {
212                 c.Log("Scenario: ", test.label)
213                 var loc string
214                 var blk []byte
215
216                 setup := func() {
217                         nextKey++
218                         blk = []byte(fmt.Sprintf("%d", nextKey))
219                         loc = fmt.Sprintf("%x", md5.Sum(blk))
220                         c.Log("\t", loc)
221                         stubKey(test.data, loc, blk)
222                         stubKey(test.recent, "recent/"+loc, nil)
223                         stubKey(test.trash, "trash/"+loc, blk)
224                         v.serverClock.now = &t0
225                 }
226
227                 setup()
228                 buf := make([]byte, len(blk))
229                 _, err := v.Get(loc, buf)
230                 c.Check(err == nil, check.Equals, test.canGet)
231                 if err != nil {
232                         c.Check(os.IsNotExist(err), check.Equals, true)
233                 }
234
235                 setup()
236                 err = v.Trash(loc)
237                 c.Check(err == nil, check.Equals, test.canTrash)
238                 _, err = v.Get(loc, buf)
239                 c.Check(err == nil, check.Equals, test.canGetAfterTrash)
240                 if err != nil {
241                         c.Check(os.IsNotExist(err), check.Equals, true)
242                 }
243
244                 setup()
245                 err = v.Untrash(loc)
246                 c.Check(err == nil, check.Equals, test.canUntrash)
247
248                 setup()
249                 v.EmptyTrash()
250                 _, err = v.Bucket.Head("trash/"+loc, nil)
251                 c.Check(err == nil, check.Equals, test.haveTrashAfterEmpty)
252                 if test.freshAfterEmpty {
253                         t, err := v.Mtime(loc)
254                         c.Check(err, check.IsNil)
255                         // new mtime must be current (with an
256                         // allowance for 1s timestamp precision)
257                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
258                 }
259         }
260 }
261
262 // PutRaw skips the ContentMD5 test
263 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
264         err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
265         if err != nil {
266                 log.Printf("PutRaw: %+v", err)
267         }
268 }
269
270 // TouchWithDate turns back the clock while doing a Touch(). We assume
271 // there are no other operations happening on the same s3test server
272 // while we do this.
273 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
274         v.serverClock.now = &lastPut
275         err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
276         if err != nil {
277                 panic(err)
278         }
279         v.serverClock.now = nil
280 }
281
282 func (v *TestableS3Volume) Teardown() {
283         v.server.Quit()
284 }