8555: Test Get() after successful Untrash. Test Put+Mtime in all scenarios.
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "log"
8         "os"
9         "time"
10
11         "github.com/AdRoll/goamz/aws"
12         "github.com/AdRoll/goamz/s3"
13         "github.com/AdRoll/goamz/s3/s3test"
14         check "gopkg.in/check.v1"
15 )
16
17 type TestableS3Volume struct {
18         *S3Volume
19         server      *s3test.Server
20         c           *check.C
21         serverClock *fakeClock
22 }
23
24 const (
25         TestBucketName = "testbucket"
26 )
27
28 type fakeClock struct {
29         now *time.Time
30 }
31
32 func (c *fakeClock) Now() time.Time {
33         if c.now == nil {
34                 return time.Now()
35         }
36         return *c.now
37 }
38
39 func init() {
40         // Deleting isn't safe from races, but if it's turned on
41         // anyway we do expect it to pass the generic volume tests.
42         s3UnsafeDelete = true
43 }
44
45 func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
46         clock := &fakeClock{}
47         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
48         c.Assert(err, check.IsNil)
49         auth := aws.Auth{}
50         region := aws.Region{
51                 Name:                 "test-region-1",
52                 S3Endpoint:           srv.URL(),
53                 S3LocationConstraint: true,
54         }
55         bucket := &s3.Bucket{
56                 S3:   s3.New(auth, region),
57                 Name: TestBucketName,
58         }
59         err = bucket.PutBucket(s3.ACL("private"))
60         c.Assert(err, check.IsNil)
61
62         return &TestableS3Volume{
63                 S3Volume:    NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
64                 server:      srv,
65                 serverClock: clock,
66         }
67 }
68
69 var _ = check.Suite(&StubbedS3Suite{})
70
71 type StubbedS3Suite struct {
72         volumes []*TestableS3Volume
73 }
74
75 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
76         DoGenericVolumeTests(c, func(t TB) TestableVolume {
77                 // Use a negative raceWindow so s3test's 1-second
78                 // timestamp precision doesn't confuse fixRace.
79                 return NewTestableS3Volume(c, -2*time.Second, false, 2)
80         })
81 }
82
83 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
84         DoGenericVolumeTests(c, func(t TB) TestableVolume {
85                 return NewTestableS3Volume(c, -2*time.Second, true, 2)
86         })
87 }
88
89 func (s *StubbedS3Suite) TestIndex(c *check.C) {
90         v := NewTestableS3Volume(c, 0, false, 2)
91         v.indexPageSize = 3
92         for i := 0; i < 256; i++ {
93                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
94         }
95         for _, spec := range []struct {
96                 prefix      string
97                 expectMatch int
98         }{
99                 {"", 256},
100                 {"c", 16},
101                 {"bc", 1},
102                 {"abc", 0},
103         } {
104                 buf := new(bytes.Buffer)
105                 err := v.IndexTo(spec.prefix, buf)
106                 c.Check(err, check.IsNil)
107
108                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
109                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
110                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
111         }
112 }
113
114 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
115         defer func(tl, bs time.Duration) {
116                 trashLifetime = tl
117                 blobSignatureTTL = bs
118         }(trashLifetime, blobSignatureTTL)
119         trashLifetime = time.Hour
120         blobSignatureTTL = time.Hour
121
122         v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
123         var none time.Time
124
125         putS3Obj := func(t time.Time, key string, data []byte) {
126                 if t == none {
127                         return
128                 }
129                 v.serverClock.now = &t
130                 v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
131         }
132
133         t0 := time.Now()
134         nextKey := 0
135         for _, scenario := range []struct {
136                 label               string
137                 dataT               time.Time
138                 recentT             time.Time
139                 trashT              time.Time
140                 canGet              bool
141                 canTrash            bool
142                 canGetAfterTrash    bool
143                 canUntrash          bool
144                 haveTrashAfterEmpty bool
145                 freshAfterEmpty     bool
146         }{
147                 {
148                         "No related objects",
149                         none, none, none,
150                         false, false, false, false, false, false,
151                 },
152                 {
153                         // Stored by older version, or there was a
154                         // race between EmptyTrash and Put: Trash is a
155                         // no-op even though the data object is very
156                         // old
157                         "No recent/X",
158                         t0.Add(-48 * time.Hour), none, none,
159                         true, true, true, false, false, false,
160                 },
161                 {
162                         "Not trash, but old enough to be eligible for trash",
163                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
164                         true, true, false, false, false, false,
165                 },
166                 {
167                         "Not trash, and not old enough to be eligible for trash",
168                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
169                         true, true, true, false, false, false,
170                 },
171                 {
172                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
173                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
174                         true, true, true, true, true, false,
175                 },
176                 {
177                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
178                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
179                         true, false, true, true, true, false,
180                 },
181                 {
182                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
183                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
184                         true, false, true, true, false, false,
185                 },
186                 {
187                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
188                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
189                         true, false, true, true, true, true,
190                 },
191                 {
192                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
193                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
194                         true, true, true, true, false, false,
195                 },
196                 {
197                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
198                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
199                         true, false, true, true, false, false,
200                 },
201                 {
202                         "Trash, not yet eligible for deletion",
203                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
204                         false, false, false, true, true, false,
205                 },
206                 {
207                         "Trash, not yet eligible for deletion, prone to races",
208                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
209                         false, false, false, true, true, false,
210                 },
211                 {
212                         "Trash, eligible for deletion",
213                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
214                         false, false, false, true, false, false,
215                 },
216                 {
217                         "Erroneously trashed during a race, detected before trashLifetime",
218                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
219                         true, false, true, true, true, false,
220                 },
221                 {
222                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
223                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
224                         true, false, true, true, true, false,
225                 },
226                 {
227                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
228                         none, none, t0.Add(-time.Minute),
229                         false, false, false, true, true, true,
230                 },
231         } {
232                 c.Log("Scenario: ", scenario.label)
233
234                 // We have a few tests to run for each scenario, and
235                 // the tests are expected to change state. By calling
236                 // this setup func between tests, we (re)create the
237                 // scenario as specified, using a new unique block
238                 // locator to prevent interference from previous
239                 // tests.
240
241                 setupScenario := func() (string, []byte) {
242                         nextKey++
243                         blk := []byte(fmt.Sprintf("%d", nextKey))
244                         loc := fmt.Sprintf("%x", md5.Sum(blk))
245                         c.Log("\t", loc)
246                         putS3Obj(scenario.dataT, loc, blk)
247                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
248                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
249                         v.serverClock.now = &t0
250                         return loc, blk
251                 }
252
253                 loc, blk := setupScenario()
254                 buf := make([]byte, len(blk))
255                 _, err := v.Get(loc, buf)
256                 c.Check(err == nil, check.Equals, scenario.canGet)
257                 if err != nil {
258                         c.Check(os.IsNotExist(err), check.Equals, true)
259                 }
260
261                 loc, blk = setupScenario()
262                 err = v.Trash(loc)
263                 c.Check(err == nil, check.Equals, scenario.canTrash)
264                 _, err = v.Get(loc, buf)
265                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
266                 if err != nil {
267                         c.Check(os.IsNotExist(err), check.Equals, true)
268                 }
269
270                 loc, blk = setupScenario()
271                 err = v.Untrash(loc)
272                 c.Check(err == nil, check.Equals, scenario.canUntrash)
273                 if scenario.dataT != none || scenario.trashT != none {
274                         // In all scenarios where the data exists, we
275                         // should be able to Get after Untrash --
276                         // regardless of timestamps, errors, race
277                         // conditions, etc.
278                         _, err = v.Get(loc, buf)
279                         c.Check(err, check.IsNil)
280                 }
281
282                 loc, blk = setupScenario()
283                 v.EmptyTrash()
284                 _, err = v.Bucket.Head("trash/"+loc, nil)
285                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
286                 if scenario.freshAfterEmpty {
287                         t, err := v.Mtime(loc)
288                         c.Check(err, check.IsNil)
289                         // new mtime must be current (with an
290                         // allowance for 1s timestamp precision)
291                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
292                 }
293
294                 loc, blk = setupScenario()
295                 err = v.Put(loc, blk)
296                 c.Check(err, check.IsNil)
297                 t, err := v.Mtime(loc)
298                 c.Check(err, check.IsNil)
299                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
300         }
301 }
302
303 // PutRaw skips the ContentMD5 test
304 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
305         err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
306         if err != nil {
307                 log.Printf("PutRaw: %+v", err)
308         }
309 }
310
311 // TouchWithDate turns back the clock while doing a Touch(). We assume
312 // there are no other operations happening on the same s3test server
313 // while we do this.
314 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
315         v.serverClock.now = &lastPut
316         err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
317         if err != nil {
318                 panic(err)
319         }
320         v.serverClock.now = nil
321 }
322
323 func (v *TestableS3Volume) Teardown() {
324         v.server.Quit()
325 }