8555: Log statistics in EmptyTrash.
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "log"
8         "os"
9         "time"
10
11         "github.com/AdRoll/goamz/aws"
12         "github.com/AdRoll/goamz/s3"
13         "github.com/AdRoll/goamz/s3/s3test"
14         check "gopkg.in/check.v1"
15 )
16
17 type TestableS3Volume struct {
18         *S3Volume
19         server      *s3test.Server
20         c           *check.C
21         serverClock *fakeClock
22 }
23
24 const (
25         TestBucketName = "testbucket"
26 )
27
28 type fakeClock struct {
29         now *time.Time
30 }
31
32 func (c *fakeClock) Now() time.Time {
33         if c.now == nil {
34                 return time.Now()
35         }
36         return *c.now
37 }
38
39 func init() {
40         // Deleting isn't safe from races, but if it's turned on
41         // anyway we do expect it to pass the generic volume tests.
42         s3UnsafeDelete = true
43 }
44
45 func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
46         clock := &fakeClock{}
47         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
48         c.Assert(err, check.IsNil)
49         auth := aws.Auth{}
50         region := aws.Region{
51                 Name:                 "test-region-1",
52                 S3Endpoint:           srv.URL(),
53                 S3LocationConstraint: true,
54         }
55         bucket := &s3.Bucket{
56                 S3:   s3.New(auth, region),
57                 Name: TestBucketName,
58         }
59         err = bucket.PutBucket(s3.ACL("private"))
60         c.Assert(err, check.IsNil)
61
62         return &TestableS3Volume{
63                 S3Volume:    NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
64                 server:      srv,
65                 serverClock: clock,
66         }
67 }
68
69 var _ = check.Suite(&StubbedS3Suite{})
70
71 type StubbedS3Suite struct {
72         volumes []*TestableS3Volume
73 }
74
75 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
76         DoGenericVolumeTests(c, func(t TB) TestableVolume {
77                 // Use a negative raceWindow so s3test's 1-second
78                 // timestamp precision doesn't confuse fixRace.
79                 return NewTestableS3Volume(c, -2*time.Second, false, 2)
80         })
81 }
82
83 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
84         DoGenericVolumeTests(c, func(t TB) TestableVolume {
85                 return NewTestableS3Volume(c, -2*time.Second, true, 2)
86         })
87 }
88
89 func (s *StubbedS3Suite) TestIndex(c *check.C) {
90         v := NewTestableS3Volume(c, 0, false, 2)
91         v.indexPageSize = 3
92         for i := 0; i < 256; i++ {
93                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
94         }
95         for _, spec := range []struct {
96                 prefix      string
97                 expectMatch int
98         }{
99                 {"", 256},
100                 {"c", 16},
101                 {"bc", 1},
102                 {"abc", 0},
103         } {
104                 buf := new(bytes.Buffer)
105                 err := v.IndexTo(spec.prefix, buf)
106                 c.Check(err, check.IsNil)
107
108                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
109                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
110                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
111         }
112 }
113
114 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
115         defer func(tl, bs time.Duration) {
116                 trashLifetime = tl
117                 blobSignatureTTL = bs
118         }(trashLifetime, blobSignatureTTL)
119         trashLifetime = time.Hour
120         blobSignatureTTL = time.Hour
121
122         v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
123         var none time.Time
124
125         putS3Obj := func(t time.Time, key string, data []byte) {
126                 if t == none {
127                         return
128                 }
129                 v.serverClock.now = &t
130                 v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
131         }
132
133         t0 := time.Now()
134         nextKey := 0
135         for _, scenario := range []struct {
136                 label               string
137                 dataT               time.Time
138                 recentT             time.Time
139                 trashT              time.Time
140                 canGet              bool
141                 canTrash            bool
142                 canGetAfterTrash    bool
143                 canUntrash          bool
144                 haveTrashAfterEmpty bool
145                 freshAfterEmpty     bool
146         }{
147                 {
148                         "No related objects",
149                         none, none, none,
150                         false, false, false, false, false, false,
151                 },
152                 {
153                         // Stored by older version, or there was a
154                         // race between EmptyTrash and Put: Trash is a
155                         // no-op even though the data object is very
156                         // old
157                         "No recent/X",
158                         t0.Add(-48 * time.Hour), none, none,
159                         true, true, true, false, false, false,
160                 },
161                 {
162                         "Not trash, but old enough to be eligible for trash",
163                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
164                         true, true, false, false, false, false,
165                 },
166                 {
167                         "Not trash, and not old enough to be eligible for trash",
168                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
169                         true, true, true, false, false, false,
170                 },
171                 {
172                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
173                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
174                         true, true, true, true, true, false,
175                 },
176                 {
177                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
178                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
179                         true, false, true, true, true, false,
180                 },
181                 {
182                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
183                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
184                         true, false, true, true, false, false,
185                 },
186                 {
187                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
188                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
189                         true, false, true, true, true, true,
190                 },
191                 {
192                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
193                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
194                         true, true, true, true, false, false,
195                 },
196                 {
197                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
198                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
199                         true, false, true, true, false, false,
200                 },
201                 {
202                         "Trash, not yet eligible for deletion",
203                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
204                         false, false, false, true, true, false,
205                 },
206                 {
207                         "Trash, not yet eligible for deletion, prone to races",
208                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
209                         false, false, false, true, true, false,
210                 },
211                 {
212                         "Trash, eligible for deletion",
213                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
214                         false, false, false, true, false, false,
215                 },
216                 {
217                         "Erroneously trashed during a race, detected before trashLifetime",
218                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
219                         true, false, true, true, true, false,
220                 },
221                 {
222                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
223                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
224                         true, false, true, true, true, false,
225                 },
226         } {
227                 c.Log("Scenario: ", scenario.label)
228
229                 // We have a few tests to run for each scenario, and
230                 // the tests are expected to change state. By calling
231                 // this setup func between tests, we (re)create the
232                 // scenario as specified, using a new unique block
233                 // locator to prevent interference from previous
234                 // tests.
235
236                 setupScenario := func() (string, []byte) {
237                         nextKey++
238                         blk := []byte(fmt.Sprintf("%d", nextKey))
239                         loc := fmt.Sprintf("%x", md5.Sum(blk))
240                         c.Log("\t", loc)
241                         putS3Obj(scenario.dataT, loc, blk)
242                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
243                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
244                         v.serverClock.now = &t0
245                         return loc, blk
246                 }
247
248                 loc, blk := setupScenario()
249                 buf := make([]byte, len(blk))
250                 _, err := v.Get(loc, buf)
251                 c.Check(err == nil, check.Equals, scenario.canGet)
252                 if err != nil {
253                         c.Check(os.IsNotExist(err), check.Equals, true)
254                 }
255
256                 loc, blk = setupScenario()
257                 err = v.Trash(loc)
258                 c.Check(err == nil, check.Equals, scenario.canTrash)
259                 _, err = v.Get(loc, buf)
260                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
261                 if err != nil {
262                         c.Check(os.IsNotExist(err), check.Equals, true)
263                 }
264
265                 loc, blk = setupScenario()
266                 err = v.Untrash(loc)
267                 c.Check(err == nil, check.Equals, scenario.canUntrash)
268
269                 loc, blk = setupScenario()
270                 v.EmptyTrash()
271                 _, err = v.Bucket.Head("trash/"+loc, nil)
272                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
273                 if scenario.freshAfterEmpty {
274                         t, err := v.Mtime(loc)
275                         c.Check(err, check.IsNil)
276                         // new mtime must be current (with an
277                         // allowance for 1s timestamp precision)
278                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
279                 }
280         }
281 }
282
283 // PutRaw skips the ContentMD5 test
284 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
285         err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
286         if err != nil {
287                 log.Printf("PutRaw: %+v", err)
288         }
289 }
290
291 // TouchWithDate turns back the clock while doing a Touch(). We assume
292 // there are no other operations happening on the same s3test server
293 // while we do this.
294 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
295         v.serverClock.now = &lastPut
296         err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
297         if err != nil {
298                 panic(err)
299         }
300         v.serverClock.now = nil
301 }
302
303 func (v *TestableS3Volume) Teardown() {
304         v.server.Quit()
305 }