Merge branch '10467-client-disconnect' refs #10467
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "crypto/md5"
7         "fmt"
8         "io/ioutil"
9         "log"
10         "os"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "github.com/AdRoll/goamz/s3"
15         "github.com/AdRoll/goamz/s3/s3test"
16         check "gopkg.in/check.v1"
17 )
18
19 const (
20         TestBucketName = "testbucket"
21 )
22
23 type fakeClock struct {
24         now *time.Time
25 }
26
27 func (c *fakeClock) Now() time.Time {
28         if c.now == nil {
29                 return time.Now()
30         }
31         return *c.now
32 }
33
34 func init() {
35         // Deleting isn't safe from races, but if it's turned on
36         // anyway we do expect it to pass the generic volume tests.
37         s3UnsafeDelete = true
38 }
39
40 var _ = check.Suite(&StubbedS3Suite{})
41
42 type StubbedS3Suite struct {
43         volumes []*TestableS3Volume
44 }
45
46 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
47         DoGenericVolumeTests(c, func(t TB) TestableVolume {
48                 // Use a negative raceWindow so s3test's 1-second
49                 // timestamp precision doesn't confuse fixRace.
50                 return s.newTestableVolume(c, -2*time.Second, false, 2)
51         })
52 }
53
54 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
55         DoGenericVolumeTests(c, func(t TB) TestableVolume {
56                 return s.newTestableVolume(c, -2*time.Second, true, 2)
57         })
58 }
59
60 func (s *StubbedS3Suite) TestIndex(c *check.C) {
61         v := s.newTestableVolume(c, 0, false, 2)
62         v.IndexPageSize = 3
63         for i := 0; i < 256; i++ {
64                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
65         }
66         for _, spec := range []struct {
67                 prefix      string
68                 expectMatch int
69         }{
70                 {"", 256},
71                 {"c", 16},
72                 {"bc", 1},
73                 {"abc", 0},
74         } {
75                 buf := new(bytes.Buffer)
76                 err := v.IndexTo(spec.prefix, buf)
77                 c.Check(err, check.IsNil)
78
79                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
80                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
81                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
82         }
83 }
84
85 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
86         defer func(tl, bs arvados.Duration) {
87                 theConfig.TrashLifetime = tl
88                 theConfig.BlobSignatureTTL = bs
89         }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
90         theConfig.TrashLifetime.Set("1h")
91         theConfig.BlobSignatureTTL.Set("1h")
92
93         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
94         var none time.Time
95
96         putS3Obj := func(t time.Time, key string, data []byte) {
97                 if t == none {
98                         return
99                 }
100                 v.serverClock.now = &t
101                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
102         }
103
104         t0 := time.Now()
105         nextKey := 0
106         for _, scenario := range []struct {
107                 label               string
108                 dataT               time.Time
109                 recentT             time.Time
110                 trashT              time.Time
111                 canGet              bool
112                 canTrash            bool
113                 canGetAfterTrash    bool
114                 canUntrash          bool
115                 haveTrashAfterEmpty bool
116                 freshAfterEmpty     bool
117         }{
118                 {
119                         "No related objects",
120                         none, none, none,
121                         false, false, false, false, false, false,
122                 },
123                 {
124                         // Stored by older version, or there was a
125                         // race between EmptyTrash and Put: Trash is a
126                         // no-op even though the data object is very
127                         // old
128                         "No recent/X",
129                         t0.Add(-48 * time.Hour), none, none,
130                         true, true, true, false, false, false,
131                 },
132                 {
133                         "Not trash, but old enough to be eligible for trash",
134                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
135                         true, true, false, false, false, false,
136                 },
137                 {
138                         "Not trash, and not old enough to be eligible for trash",
139                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
140                         true, true, true, false, false, false,
141                 },
142                 {
143                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
144                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
145                         true, true, true, true, true, false,
146                 },
147                 {
148                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
149                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
150                         true, false, true, true, true, false,
151                 },
152                 {
153                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
154                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
155                         true, false, true, true, false, false,
156                 },
157                 {
158                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
159                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
160                         true, false, true, true, true, true,
161                 },
162                 {
163                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
164                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
165                         true, true, true, true, false, false,
166                 },
167                 {
168                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
169                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
170                         true, false, true, true, false, false,
171                 },
172                 {
173                         "Trash, not yet eligible for deletion",
174                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
175                         false, false, false, true, true, false,
176                 },
177                 {
178                         "Trash, not yet eligible for deletion, prone to races",
179                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
180                         false, false, false, true, true, false,
181                 },
182                 {
183                         "Trash, eligible for deletion",
184                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
185                         false, false, false, true, false, false,
186                 },
187                 {
188                         "Erroneously trashed during a race, detected before TrashLifetime",
189                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
190                         true, false, true, true, true, false,
191                 },
192                 {
193                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
194                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
195                         true, false, true, true, true, false,
196                 },
197                 {
198                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
199                         none, none, t0.Add(-time.Minute),
200                         false, false, false, true, true, true,
201                 },
202         } {
203                 c.Log("Scenario: ", scenario.label)
204
205                 // We have a few tests to run for each scenario, and
206                 // the tests are expected to change state. By calling
207                 // this setup func between tests, we (re)create the
208                 // scenario as specified, using a new unique block
209                 // locator to prevent interference from previous
210                 // tests.
211
212                 setupScenario := func() (string, []byte) {
213                         nextKey++
214                         blk := []byte(fmt.Sprintf("%d", nextKey))
215                         loc := fmt.Sprintf("%x", md5.Sum(blk))
216                         c.Log("\t", loc)
217                         putS3Obj(scenario.dataT, loc, blk)
218                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
219                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
220                         v.serverClock.now = &t0
221                         return loc, blk
222                 }
223
224                 // Check canGet
225                 loc, blk := setupScenario()
226                 buf := make([]byte, len(blk))
227                 _, err := v.Get(context.Background(), loc, buf)
228                 c.Check(err == nil, check.Equals, scenario.canGet)
229                 if err != nil {
230                         c.Check(os.IsNotExist(err), check.Equals, true)
231                 }
232
233                 // Call Trash, then check canTrash and canGetAfterTrash
234                 loc, blk = setupScenario()
235                 err = v.Trash(loc)
236                 c.Check(err == nil, check.Equals, scenario.canTrash)
237                 _, err = v.Get(context.Background(), loc, buf)
238                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
239                 if err != nil {
240                         c.Check(os.IsNotExist(err), check.Equals, true)
241                 }
242
243                 // Call Untrash, then check canUntrash
244                 loc, blk = setupScenario()
245                 err = v.Untrash(loc)
246                 c.Check(err == nil, check.Equals, scenario.canUntrash)
247                 if scenario.dataT != none || scenario.trashT != none {
248                         // In all scenarios where the data exists, we
249                         // should be able to Get after Untrash --
250                         // regardless of timestamps, errors, race
251                         // conditions, etc.
252                         _, err = v.Get(context.Background(), loc, buf)
253                         c.Check(err, check.IsNil)
254                 }
255
256                 // Call EmptyTrash, then check haveTrashAfterEmpty and
257                 // freshAfterEmpty
258                 loc, blk = setupScenario()
259                 v.EmptyTrash()
260                 _, err = v.bucket.Head("trash/"+loc, nil)
261                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
262                 if scenario.freshAfterEmpty {
263                         t, err := v.Mtime(loc)
264                         c.Check(err, check.IsNil)
265                         // new mtime must be current (with an
266                         // allowance for 1s timestamp precision)
267                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
268                 }
269
270                 // Check for current Mtime after Put (applies to all
271                 // scenarios)
272                 loc, blk = setupScenario()
273                 err = v.Put(context.Background(), loc, blk)
274                 c.Check(err, check.IsNil)
275                 t, err := v.Mtime(loc)
276                 c.Check(err, check.IsNil)
277                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
278         }
279 }
280
281 type TestableS3Volume struct {
282         *S3Volume
283         server      *s3test.Server
284         c           *check.C
285         serverClock *fakeClock
286 }
287
288 func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
289         clock := &fakeClock{}
290         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
291         c.Assert(err, check.IsNil)
292
293         tmp, err := ioutil.TempFile("", "keepstore")
294         c.Assert(err, check.IsNil)
295         defer os.Remove(tmp.Name())
296         _, err = tmp.Write([]byte("xxx\n"))
297         c.Assert(err, check.IsNil)
298         c.Assert(tmp.Close(), check.IsNil)
299
300         v := &TestableS3Volume{
301                 S3Volume: &S3Volume{
302                         Bucket:             TestBucketName,
303                         AccessKeyFile:      tmp.Name(),
304                         SecretKeyFile:      tmp.Name(),
305                         Endpoint:           srv.URL(),
306                         Region:             "test-region-1",
307                         LocationConstraint: true,
308                         RaceWindow:         arvados.Duration(raceWindow),
309                         S3Replication:      replication,
310                         UnsafeDelete:       s3UnsafeDelete,
311                         ReadOnly:           readonly,
312                         IndexPageSize:      1000,
313                 },
314                 server:      srv,
315                 serverClock: clock,
316         }
317         c.Assert(v.Start(), check.IsNil)
318         err = v.bucket.PutBucket(s3.ACL("private"))
319         c.Assert(err, check.IsNil)
320         return v
321 }
322
323 // PutRaw skips the ContentMD5 test
324 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
325         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
326         if err != nil {
327                 log.Printf("PutRaw: %+v", err)
328         }
329 }
330
331 // TouchWithDate turns back the clock while doing a Touch(). We assume
332 // there are no other operations happening on the same s3test server
333 // while we do this.
334 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
335         v.serverClock.now = &lastPut
336         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
337         if err != nil {
338                 panic(err)
339         }
340         v.serverClock.now = nil
341 }
342
343 func (v *TestableS3Volume) Teardown() {
344         v.server.Quit()
345 }