Merge branch '10200-cwl-crunch-script' closes #10200
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "io/ioutil"
8         "log"
9         "os"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "github.com/AdRoll/goamz/s3"
14         "github.com/AdRoll/goamz/s3/s3test"
15         check "gopkg.in/check.v1"
16 )
17
18 const (
19         TestBucketName = "testbucket"
20 )
21
22 type fakeClock struct {
23         now *time.Time
24 }
25
26 func (c *fakeClock) Now() time.Time {
27         if c.now == nil {
28                 return time.Now()
29         }
30         return *c.now
31 }
32
33 func init() {
34         // Deleting isn't safe from races, but if it's turned on
35         // anyway we do expect it to pass the generic volume tests.
36         s3UnsafeDelete = true
37 }
38
39 var _ = check.Suite(&StubbedS3Suite{})
40
41 type StubbedS3Suite struct {
42         volumes []*TestableS3Volume
43 }
44
45 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
46         DoGenericVolumeTests(c, func(t TB) TestableVolume {
47                 // Use a negative raceWindow so s3test's 1-second
48                 // timestamp precision doesn't confuse fixRace.
49                 return s.newTestableVolume(c, -2*time.Second, false, 2)
50         })
51 }
52
53 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
54         DoGenericVolumeTests(c, func(t TB) TestableVolume {
55                 return s.newTestableVolume(c, -2*time.Second, true, 2)
56         })
57 }
58
59 func (s *StubbedS3Suite) TestIndex(c *check.C) {
60         v := s.newTestableVolume(c, 0, false, 2)
61         v.IndexPageSize = 3
62         for i := 0; i < 256; i++ {
63                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
64         }
65         for _, spec := range []struct {
66                 prefix      string
67                 expectMatch int
68         }{
69                 {"", 256},
70                 {"c", 16},
71                 {"bc", 1},
72                 {"abc", 0},
73         } {
74                 buf := new(bytes.Buffer)
75                 err := v.IndexTo(spec.prefix, buf)
76                 c.Check(err, check.IsNil)
77
78                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
79                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
80                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
81         }
82 }
83
84 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
85         defer func(tl, bs arvados.Duration) {
86                 theConfig.TrashLifetime = tl
87                 theConfig.BlobSignatureTTL = bs
88         }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
89         theConfig.TrashLifetime.Set("1h")
90         theConfig.BlobSignatureTTL.Set("1h")
91
92         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
93         var none time.Time
94
95         putS3Obj := func(t time.Time, key string, data []byte) {
96                 if t == none {
97                         return
98                 }
99                 v.serverClock.now = &t
100                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
101         }
102
103         t0 := time.Now()
104         nextKey := 0
105         for _, scenario := range []struct {
106                 label               string
107                 dataT               time.Time
108                 recentT             time.Time
109                 trashT              time.Time
110                 canGet              bool
111                 canTrash            bool
112                 canGetAfterTrash    bool
113                 canUntrash          bool
114                 haveTrashAfterEmpty bool
115                 freshAfterEmpty     bool
116         }{
117                 {
118                         "No related objects",
119                         none, none, none,
120                         false, false, false, false, false, false,
121                 },
122                 {
123                         // Stored by older version, or there was a
124                         // race between EmptyTrash and Put: Trash is a
125                         // no-op even though the data object is very
126                         // old
127                         "No recent/X",
128                         t0.Add(-48 * time.Hour), none, none,
129                         true, true, true, false, false, false,
130                 },
131                 {
132                         "Not trash, but old enough to be eligible for trash",
133                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
134                         true, true, false, false, false, false,
135                 },
136                 {
137                         "Not trash, and not old enough to be eligible for trash",
138                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
139                         true, true, true, false, false, false,
140                 },
141                 {
142                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
143                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
144                         true, true, true, true, true, false,
145                 },
146                 {
147                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
148                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
149                         true, false, true, true, true, false,
150                 },
151                 {
152                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
153                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
154                         true, false, true, true, false, false,
155                 },
156                 {
157                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
158                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
159                         true, false, true, true, true, true,
160                 },
161                 {
162                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
163                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
164                         true, true, true, true, false, false,
165                 },
166                 {
167                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
168                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
169                         true, false, true, true, false, false,
170                 },
171                 {
172                         "Trash, not yet eligible for deletion",
173                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
174                         false, false, false, true, true, false,
175                 },
176                 {
177                         "Trash, not yet eligible for deletion, prone to races",
178                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
179                         false, false, false, true, true, false,
180                 },
181                 {
182                         "Trash, eligible for deletion",
183                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
184                         false, false, false, true, false, false,
185                 },
186                 {
187                         "Erroneously trashed during a race, detected before TrashLifetime",
188                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
189                         true, false, true, true, true, false,
190                 },
191                 {
192                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
193                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
194                         true, false, true, true, true, false,
195                 },
196                 {
197                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
198                         none, none, t0.Add(-time.Minute),
199                         false, false, false, true, true, true,
200                 },
201         } {
202                 c.Log("Scenario: ", scenario.label)
203
204                 // We have a few tests to run for each scenario, and
205                 // the tests are expected to change state. By calling
206                 // this setup func between tests, we (re)create the
207                 // scenario as specified, using a new unique block
208                 // locator to prevent interference from previous
209                 // tests.
210
211                 setupScenario := func() (string, []byte) {
212                         nextKey++
213                         blk := []byte(fmt.Sprintf("%d", nextKey))
214                         loc := fmt.Sprintf("%x", md5.Sum(blk))
215                         c.Log("\t", loc)
216                         putS3Obj(scenario.dataT, loc, blk)
217                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
218                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
219                         v.serverClock.now = &t0
220                         return loc, blk
221                 }
222
223                 // Check canGet
224                 loc, blk := setupScenario()
225                 buf := make([]byte, len(blk))
226                 _, err := v.Get(loc, buf)
227                 c.Check(err == nil, check.Equals, scenario.canGet)
228                 if err != nil {
229                         c.Check(os.IsNotExist(err), check.Equals, true)
230                 }
231
232                 // Call Trash, then check canTrash and canGetAfterTrash
233                 loc, blk = setupScenario()
234                 err = v.Trash(loc)
235                 c.Check(err == nil, check.Equals, scenario.canTrash)
236                 _, err = v.Get(loc, buf)
237                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
238                 if err != nil {
239                         c.Check(os.IsNotExist(err), check.Equals, true)
240                 }
241
242                 // Call Untrash, then check canUntrash
243                 loc, blk = setupScenario()
244                 err = v.Untrash(loc)
245                 c.Check(err == nil, check.Equals, scenario.canUntrash)
246                 if scenario.dataT != none || scenario.trashT != none {
247                         // In all scenarios where the data exists, we
248                         // should be able to Get after Untrash --
249                         // regardless of timestamps, errors, race
250                         // conditions, etc.
251                         _, err = v.Get(loc, buf)
252                         c.Check(err, check.IsNil)
253                 }
254
255                 // Call EmptyTrash, then check haveTrashAfterEmpty and
256                 // freshAfterEmpty
257                 loc, blk = setupScenario()
258                 v.EmptyTrash()
259                 _, err = v.bucket.Head("trash/"+loc, nil)
260                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
261                 if scenario.freshAfterEmpty {
262                         t, err := v.Mtime(loc)
263                         c.Check(err, check.IsNil)
264                         // new mtime must be current (with an
265                         // allowance for 1s timestamp precision)
266                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
267                 }
268
269                 // Check for current Mtime after Put (applies to all
270                 // scenarios)
271                 loc, blk = setupScenario()
272                 err = v.Put(loc, blk)
273                 c.Check(err, check.IsNil)
274                 t, err := v.Mtime(loc)
275                 c.Check(err, check.IsNil)
276                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
277         }
278 }
279
280 type TestableS3Volume struct {
281         *S3Volume
282         server      *s3test.Server
283         c           *check.C
284         serverClock *fakeClock
285 }
286
287 func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
288         clock := &fakeClock{}
289         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
290         c.Assert(err, check.IsNil)
291
292         tmp, err := ioutil.TempFile("", "keepstore")
293         c.Assert(err, check.IsNil)
294         defer os.Remove(tmp.Name())
295         _, err = tmp.Write([]byte("xxx\n"))
296         c.Assert(err, check.IsNil)
297         c.Assert(tmp.Close(), check.IsNil)
298
299         v := &TestableS3Volume{
300                 S3Volume: &S3Volume{
301                         Bucket:             TestBucketName,
302                         AccessKeyFile:      tmp.Name(),
303                         SecretKeyFile:      tmp.Name(),
304                         Endpoint:           srv.URL(),
305                         Region:             "test-region-1",
306                         LocationConstraint: true,
307                         RaceWindow:         arvados.Duration(raceWindow),
308                         S3Replication:      replication,
309                         UnsafeDelete:       s3UnsafeDelete,
310                         ReadOnly:           readonly,
311                         IndexPageSize:      1000,
312                 },
313                 server:      srv,
314                 serverClock: clock,
315         }
316         c.Assert(v.Start(), check.IsNil)
317         err = v.bucket.PutBucket(s3.ACL("private"))
318         c.Assert(err, check.IsNil)
319         return v
320 }
321
322 // PutRaw skips the ContentMD5 test
323 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
324         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
325         if err != nil {
326                 log.Printf("PutRaw: %+v", err)
327         }
328 }
329
330 // TouchWithDate turns back the clock while doing a Touch(). We assume
331 // there are no other operations happening on the same s3test server
332 // while we do this.
333 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
334         v.serverClock.now = &lastPut
335         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
336         if err != nil {
337                 panic(err)
338         }
339         v.serverClock.now = nil
340 }
341
342 func (v *TestableS3Volume) Teardown() {
343         v.server.Quit()
344 }