8555: Test various backend states. Update recent/X timestamp during Untrash.
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "log"
8         "os"
9         "time"
10
11         "github.com/AdRoll/goamz/aws"
12         "github.com/AdRoll/goamz/s3"
13         "github.com/AdRoll/goamz/s3/s3test"
14         check "gopkg.in/check.v1"
15 )
16
17 type TestableS3Volume struct {
18         *S3Volume
19         server      *s3test.Server
20         c           *check.C
21         serverClock *fakeClock
22 }
23
24 const (
25         TestBucketName = "testbucket"
26 )
27
28 type fakeClock struct {
29         now *time.Time
30 }
31
32 func (c *fakeClock) Now() time.Time {
33         if c.now == nil {
34                 return time.Now()
35         }
36         return *c.now
37 }
38
39 func init() {
40         // Deleting isn't safe from races, but if it's turned on
41         // anyway we do expect it to pass the generic volume tests.
42         s3UnsafeDelete = true
43 }
44
45 func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
46         clock := &fakeClock{}
47         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
48         c.Assert(err, check.IsNil)
49         auth := aws.Auth{}
50         region := aws.Region{
51                 Name:                 "test-region-1",
52                 S3Endpoint:           srv.URL(),
53                 S3LocationConstraint: true,
54         }
55         bucket := &s3.Bucket{
56                 S3:   s3.New(auth, region),
57                 Name: TestBucketName,
58         }
59         err = bucket.PutBucket(s3.ACL("private"))
60         c.Assert(err, check.IsNil)
61
62         return &TestableS3Volume{
63                 S3Volume:    NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
64                 server:      srv,
65                 serverClock: clock,
66         }
67 }
68
69 var _ = check.Suite(&StubbedS3Suite{})
70
71 type StubbedS3Suite struct {
72         volumes []*TestableS3Volume
73 }
74
75 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
76         DoGenericVolumeTests(c, func(t TB) TestableVolume {
77                 // Use a negative raceWindow so s3test's 1-second
78                 // timestamp precision doesn't confuse fixRace.
79                 return NewTestableS3Volume(c, -2*time.Second, false, 2)
80         })
81 }
82
83 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
84         DoGenericVolumeTests(c, func(t TB) TestableVolume {
85                 return NewTestableS3Volume(c, -2*time.Second, true, 2)
86         })
87 }
88
89 func (s *StubbedS3Suite) TestIndex(c *check.C) {
90         v := NewTestableS3Volume(c, 0, false, 2)
91         v.indexPageSize = 3
92         for i := 0; i < 256; i++ {
93                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
94         }
95         for _, spec := range []struct {
96                 prefix      string
97                 expectMatch int
98         }{
99                 {"", 256},
100                 {"c", 16},
101                 {"bc", 1},
102                 {"abc", 0},
103         } {
104                 buf := new(bytes.Buffer)
105                 err := v.IndexTo(spec.prefix, buf)
106                 c.Check(err, check.IsNil)
107
108                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
109                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
110                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
111         }
112 }
113
114 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
115         defer func(tl, bs time.Duration) {
116                 trashLifetime = tl
117                 blobSignatureTTL = bs
118         }(trashLifetime, blobSignatureTTL)
119         trashLifetime = time.Hour
120         blobSignatureTTL = time.Hour
121
122         v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
123         var none time.Time
124
125         stubKey := func(t time.Time, key string, data []byte) {
126                 if t == none {
127                         return
128                 }
129                 v.serverClock.now = &t
130                 v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
131         }
132
133         t0 := time.Now()
134         nextKey := 0
135         for _, test := range []struct {
136                 label               string
137                 data                time.Time
138                 recent              time.Time
139                 trash               time.Time
140                 canGet              bool
141                 canTrash            bool
142                 canGetAfterTrash    bool
143                 canUntrash          bool
144                 haveTrashAfterEmpty bool
145         }{
146                 {
147                         "No related objects",
148                         none, none, none,
149                         false, false, false, false, false},
150                 {
151                         // Stored by older version, or there was a
152                         // race between EmptyTrash and Put: Trash is a
153                         // no-op even though the data object is very
154                         // old
155                         "No recent/X",
156                         t0.Add(-48 * time.Hour), none, none,
157                         true, true, true, false, false},
158                 {
159                         "Not trash; old enough to trash",
160                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
161                         true, true, false, false, false},
162                 {
163                         "Not trash; not old enough to trash",
164                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
165                         true, true, true, false, false},
166                 {
167                         "Trash + not-trash: recent race between Trash and Put",
168                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
169                         true, true, true, true, true},
170                 {
171                         "Trash + not-trash, nearly eligible for deletion, prone to Trash race",
172                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
173                         true, false, true, true, true},
174                 {
175                         "Trash + not-trash, eligible for deletion, prone to Trash race",
176                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
177                         true, false, true, true, false},
178                 // FIXME: old trash never gets deleted!
179                 // {
180                 //      "Not trash; old race between Trash and Put, or incomplete Trash",
181                 //      t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
182                 //      true, false, true, true, false},
183                 {
184                         "Trash operation was interrupted",
185                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
186                         true, false, true, true, false},
187                 {
188                         "Trash, not yet eligible for deletion",
189                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
190                         false, false, false, true, true},
191                 {
192                         "Trash, not yet eligible for deletion, prone to races",
193                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
194                         false, false, false, true, true},
195                 {
196                         "Trash, eligible for deletion",
197                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
198                         false, false, false, true, false},
199                 {
200                         "Erroneously trashed during a race, detected before trashLifetime",
201                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
202                         true, false, true, true, true},
203                 {
204                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
205                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
206                         true, false, true, true, true},
207         } {
208                 c.Log("Scenario: ", test.label)
209                 var loc string
210                 var blk []byte
211
212                 setup := func() {
213                         nextKey++
214                         blk = []byte(fmt.Sprintf("%d", nextKey))
215                         loc = fmt.Sprintf("%x", md5.Sum(blk))
216                         c.Log("\t", loc)
217                         stubKey(test.data, loc, blk)
218                         stubKey(test.recent, "recent/"+loc, nil)
219                         stubKey(test.trash, "trash/"+loc, blk)
220                         v.serverClock.now = &t0
221                 }
222
223                 setup()
224                 buf := make([]byte, len(blk))
225                 _, err := v.Get(loc, buf)
226                 c.Check(err == nil, check.Equals, test.canGet)
227                 if err != nil {
228                         c.Check(os.IsNotExist(err), check.Equals, true)
229                 }
230
231                 setup()
232                 err = v.Trash(loc)
233                 c.Check(err == nil, check.Equals, test.canTrash)
234                 _, err = v.Get(loc, buf)
235                 c.Check(err == nil, check.Equals, test.canGetAfterTrash)
236                 if err != nil {
237                         c.Check(os.IsNotExist(err), check.Equals, true)
238                 }
239
240                 setup()
241                 err = v.Untrash(loc)
242                 c.Check(err == nil, check.Equals, test.canUntrash)
243
244                 setup()
245                 v.EmptyTrash()
246                 _, err = v.Bucket.Head("trash/"+loc, nil)
247                 c.Check(err == nil, check.Equals, test.haveTrashAfterEmpty)
248         }
249 }
250
251 // PutRaw skips the ContentMD5 test
252 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
253         err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
254         if err != nil {
255                 log.Printf("PutRaw: %+v", err)
256         }
257 }
258
259 // TouchWithDate turns back the clock while doing a Touch(). We assume
260 // there are no other operations happening on the same s3test server
261 // while we do this.
262 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
263         v.serverClock.now = &lastPut
264         err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
265         if err != nil {
266                 panic(err)
267         }
268         v.serverClock.now = nil
269 }
270
271 func (v *TestableS3Volume) Teardown() {
272         v.server.Quit()
273 }