9701: Use a collection.OrderedDict instead of a simple dict to hold bufferblocks...
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "log"
8         "os"
9         "time"
10
11         "github.com/AdRoll/goamz/aws"
12         "github.com/AdRoll/goamz/s3"
13         "github.com/AdRoll/goamz/s3/s3test"
14         check "gopkg.in/check.v1"
15 )
16
17 type TestableS3Volume struct {
18         *S3Volume
19         server      *s3test.Server
20         c           *check.C
21         serverClock *fakeClock
22 }
23
24 const (
25         TestBucketName = "testbucket"
26 )
27
28 type fakeClock struct {
29         now *time.Time
30 }
31
32 func (c *fakeClock) Now() time.Time {
33         if c.now == nil {
34                 return time.Now()
35         }
36         return *c.now
37 }
38
39 func init() {
40         // Deleting isn't safe from races, but if it's turned on
41         // anyway we do expect it to pass the generic volume tests.
42         s3UnsafeDelete = true
43 }
44
45 func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
46         clock := &fakeClock{}
47         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
48         c.Assert(err, check.IsNil)
49         auth := aws.Auth{}
50         region := aws.Region{
51                 Name:                 "test-region-1",
52                 S3Endpoint:           srv.URL(),
53                 S3LocationConstraint: true,
54         }
55         bucket := &s3.Bucket{
56                 S3:   s3.New(auth, region),
57                 Name: TestBucketName,
58         }
59         err = bucket.PutBucket(s3.ACL("private"))
60         c.Assert(err, check.IsNil)
61
62         return &TestableS3Volume{
63                 S3Volume:    NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
64                 server:      srv,
65                 serverClock: clock,
66         }
67 }
68
69 var _ = check.Suite(&StubbedS3Suite{})
70
71 type StubbedS3Suite struct {
72         volumes []*TestableS3Volume
73 }
74
75 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
76         DoGenericVolumeTests(c, func(t TB) TestableVolume {
77                 // Use a negative raceWindow so s3test's 1-second
78                 // timestamp precision doesn't confuse fixRace.
79                 return NewTestableS3Volume(c, -2*time.Second, false, 2)
80         })
81 }
82
83 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
84         DoGenericVolumeTests(c, func(t TB) TestableVolume {
85                 return NewTestableS3Volume(c, -2*time.Second, true, 2)
86         })
87 }
88
89 func (s *StubbedS3Suite) TestIndex(c *check.C) {
90         v := NewTestableS3Volume(c, 0, false, 2)
91         v.indexPageSize = 3
92         for i := 0; i < 256; i++ {
93                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
94         }
95         for _, spec := range []struct {
96                 prefix      string
97                 expectMatch int
98         }{
99                 {"", 256},
100                 {"c", 16},
101                 {"bc", 1},
102                 {"abc", 0},
103         } {
104                 buf := new(bytes.Buffer)
105                 err := v.IndexTo(spec.prefix, buf)
106                 c.Check(err, check.IsNil)
107
108                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
109                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
110                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
111         }
112 }
113
114 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
115         defer func(tl, bs time.Duration) {
116                 trashLifetime = tl
117                 blobSignatureTTL = bs
118         }(trashLifetime, blobSignatureTTL)
119         trashLifetime = time.Hour
120         blobSignatureTTL = time.Hour
121
122         v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
123         var none time.Time
124
125         putS3Obj := func(t time.Time, key string, data []byte) {
126                 if t == none {
127                         return
128                 }
129                 v.serverClock.now = &t
130                 v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
131         }
132
133         t0 := time.Now()
134         nextKey := 0
135         for _, scenario := range []struct {
136                 label               string
137                 dataT               time.Time
138                 recentT             time.Time
139                 trashT              time.Time
140                 canGet              bool
141                 canTrash            bool
142                 canGetAfterTrash    bool
143                 canUntrash          bool
144                 haveTrashAfterEmpty bool
145                 freshAfterEmpty     bool
146         }{
147                 {
148                         "No related objects",
149                         none, none, none,
150                         false, false, false, false, false, false,
151                 },
152                 {
153                         // Stored by older version, or there was a
154                         // race between EmptyTrash and Put: Trash is a
155                         // no-op even though the data object is very
156                         // old
157                         "No recent/X",
158                         t0.Add(-48 * time.Hour), none, none,
159                         true, true, true, false, false, false,
160                 },
161                 {
162                         "Not trash, but old enough to be eligible for trash",
163                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
164                         true, true, false, false, false, false,
165                 },
166                 {
167                         "Not trash, and not old enough to be eligible for trash",
168                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
169                         true, true, true, false, false, false,
170                 },
171                 {
172                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
173                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
174                         true, true, true, true, true, false,
175                 },
176                 {
177                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
178                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
179                         true, false, true, true, true, false,
180                 },
181                 {
182                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
183                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
184                         true, false, true, true, false, false,
185                 },
186                 {
187                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
188                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
189                         true, false, true, true, true, true,
190                 },
191                 {
192                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
193                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
194                         true, true, true, true, false, false,
195                 },
196                 {
197                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
198                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
199                         true, false, true, true, false, false,
200                 },
201                 {
202                         "Trash, not yet eligible for deletion",
203                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
204                         false, false, false, true, true, false,
205                 },
206                 {
207                         "Trash, not yet eligible for deletion, prone to races",
208                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
209                         false, false, false, true, true, false,
210                 },
211                 {
212                         "Trash, eligible for deletion",
213                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
214                         false, false, false, true, false, false,
215                 },
216                 {
217                         "Erroneously trashed during a race, detected before trashLifetime",
218                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
219                         true, false, true, true, true, false,
220                 },
221                 {
222                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
223                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
224                         true, false, true, true, true, false,
225                 },
226                 {
227                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
228                         none, none, t0.Add(-time.Minute),
229                         false, false, false, true, true, true,
230                 },
231         } {
232                 c.Log("Scenario: ", scenario.label)
233
234                 // We have a few tests to run for each scenario, and
235                 // the tests are expected to change state. By calling
236                 // this setup func between tests, we (re)create the
237                 // scenario as specified, using a new unique block
238                 // locator to prevent interference from previous
239                 // tests.
240
241                 setupScenario := func() (string, []byte) {
242                         nextKey++
243                         blk := []byte(fmt.Sprintf("%d", nextKey))
244                         loc := fmt.Sprintf("%x", md5.Sum(blk))
245                         c.Log("\t", loc)
246                         putS3Obj(scenario.dataT, loc, blk)
247                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
248                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
249                         v.serverClock.now = &t0
250                         return loc, blk
251                 }
252
253                 // Check canGet
254                 loc, blk := setupScenario()
255                 buf := make([]byte, len(blk))
256                 _, err := v.Get(loc, buf)
257                 c.Check(err == nil, check.Equals, scenario.canGet)
258                 if err != nil {
259                         c.Check(os.IsNotExist(err), check.Equals, true)
260                 }
261
262                 // Call Trash, then check canTrash and canGetAfterTrash
263                 loc, blk = setupScenario()
264                 err = v.Trash(loc)
265                 c.Check(err == nil, check.Equals, scenario.canTrash)
266                 _, err = v.Get(loc, buf)
267                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
268                 if err != nil {
269                         c.Check(os.IsNotExist(err), check.Equals, true)
270                 }
271
272                 // Call Untrash, then check canUntrash
273                 loc, blk = setupScenario()
274                 err = v.Untrash(loc)
275                 c.Check(err == nil, check.Equals, scenario.canUntrash)
276                 if scenario.dataT != none || scenario.trashT != none {
277                         // In all scenarios where the data exists, we
278                         // should be able to Get after Untrash --
279                         // regardless of timestamps, errors, race
280                         // conditions, etc.
281                         _, err = v.Get(loc, buf)
282                         c.Check(err, check.IsNil)
283                 }
284
285                 // Call EmptyTrash, then check haveTrashAfterEmpty and
286                 // freshAfterEmpty
287                 loc, blk = setupScenario()
288                 v.EmptyTrash()
289                 _, err = v.Bucket.Head("trash/"+loc, nil)
290                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
291                 if scenario.freshAfterEmpty {
292                         t, err := v.Mtime(loc)
293                         c.Check(err, check.IsNil)
294                         // new mtime must be current (with an
295                         // allowance for 1s timestamp precision)
296                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
297                 }
298
299                 // Check for current Mtime after Put (applies to all
300                 // scenarios)
301                 loc, blk = setupScenario()
302                 err = v.Put(loc, blk)
303                 c.Check(err, check.IsNil)
304                 t, err := v.Mtime(loc)
305                 c.Check(err, check.IsNil)
306                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
307         }
308 }
309
310 // PutRaw skips the ContentMD5 test
311 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
312         err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
313         if err != nil {
314                 log.Printf("PutRaw: %+v", err)
315         }
316 }
317
318 // TouchWithDate turns back the clock while doing a Touch(). We assume
319 // there are no other operations happening on the same s3test server
320 // while we do this.
321 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
322         v.serverClock.now = &lastPut
323         err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
324         if err != nil {
325                 panic(err)
326         }
327         v.serverClock.now = nil
328 }
329
330 func (v *TestableS3Volume) Teardown() {
331         v.server.Quit()
332 }