Merge branch 'master' into 10293-cwl-cr-output
[arvados.git] / services / keepstore / s3_volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "crypto/md5"
7         "encoding/json"
8         "fmt"
9         "io/ioutil"
10         "os"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "github.com/AdRoll/goamz/s3"
15         "github.com/AdRoll/goamz/s3/s3test"
16         log "github.com/Sirupsen/logrus"
17         check "gopkg.in/check.v1"
18 )
19
20 const (
21         TestBucketName = "testbucket"
22 )
23
24 type fakeClock struct {
25         now *time.Time
26 }
27
28 func (c *fakeClock) Now() time.Time {
29         if c.now == nil {
30                 return time.Now()
31         }
32         return *c.now
33 }
34
35 func init() {
36         // Deleting isn't safe from races, but if it's turned on
37         // anyway we do expect it to pass the generic volume tests.
38         s3UnsafeDelete = true
39 }
40
41 var _ = check.Suite(&StubbedS3Suite{})
42
43 type StubbedS3Suite struct {
44         volumes []*TestableS3Volume
45 }
46
47 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
48         DoGenericVolumeTests(c, func(t TB) TestableVolume {
49                 // Use a negative raceWindow so s3test's 1-second
50                 // timestamp precision doesn't confuse fixRace.
51                 return s.newTestableVolume(c, -2*time.Second, false, 2)
52         })
53 }
54
55 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
56         DoGenericVolumeTests(c, func(t TB) TestableVolume {
57                 return s.newTestableVolume(c, -2*time.Second, true, 2)
58         })
59 }
60
61 func (s *StubbedS3Suite) TestIndex(c *check.C) {
62         v := s.newTestableVolume(c, 0, false, 2)
63         v.IndexPageSize = 3
64         for i := 0; i < 256; i++ {
65                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
66         }
67         for _, spec := range []struct {
68                 prefix      string
69                 expectMatch int
70         }{
71                 {"", 256},
72                 {"c", 16},
73                 {"bc", 1},
74                 {"abc", 0},
75         } {
76                 buf := new(bytes.Buffer)
77                 err := v.IndexTo(spec.prefix, buf)
78                 c.Check(err, check.IsNil)
79
80                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
81                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
82                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
83         }
84 }
85
86 func (s *StubbedS3Suite) TestStats(c *check.C) {
87         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
88         stats := func() string {
89                 buf, err := json.Marshal(v.InternalStats())
90                 c.Check(err, check.IsNil)
91                 return string(buf)
92         }
93
94         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
95
96         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
97         _, err := v.Get(context.Background(), loc, make([]byte, 3))
98         c.Check(err, check.NotNil)
99         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
100         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
101         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
102
103         err = v.Put(context.Background(), loc, []byte("foo"))
104         c.Check(err, check.IsNil)
105         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
106         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
107
108         _, err = v.Get(context.Background(), loc, make([]byte, 3))
109         c.Check(err, check.IsNil)
110         _, err = v.Get(context.Background(), loc, make([]byte, 3))
111         c.Check(err, check.IsNil)
112         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
113 }
114
115 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
116         defer func(tl, bs arvados.Duration) {
117                 theConfig.TrashLifetime = tl
118                 theConfig.BlobSignatureTTL = bs
119         }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
120         theConfig.TrashLifetime.Set("1h")
121         theConfig.BlobSignatureTTL.Set("1h")
122
123         v := s.newTestableVolume(c, 5*time.Minute, false, 2)
124         var none time.Time
125
126         putS3Obj := func(t time.Time, key string, data []byte) {
127                 if t == none {
128                         return
129                 }
130                 v.serverClock.now = &t
131                 v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
132         }
133
134         t0 := time.Now()
135         nextKey := 0
136         for _, scenario := range []struct {
137                 label               string
138                 dataT               time.Time
139                 recentT             time.Time
140                 trashT              time.Time
141                 canGet              bool
142                 canTrash            bool
143                 canGetAfterTrash    bool
144                 canUntrash          bool
145                 haveTrashAfterEmpty bool
146                 freshAfterEmpty     bool
147         }{
148                 {
149                         "No related objects",
150                         none, none, none,
151                         false, false, false, false, false, false,
152                 },
153                 {
154                         // Stored by older version, or there was a
155                         // race between EmptyTrash and Put: Trash is a
156                         // no-op even though the data object is very
157                         // old
158                         "No recent/X",
159                         t0.Add(-48 * time.Hour), none, none,
160                         true, true, true, false, false, false,
161                 },
162                 {
163                         "Not trash, but old enough to be eligible for trash",
164                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
165                         true, true, false, false, false, false,
166                 },
167                 {
168                         "Not trash, and not old enough to be eligible for trash",
169                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
170                         true, true, true, false, false, false,
171                 },
172                 {
173                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
174                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
175                         true, true, true, true, true, false,
176                 },
177                 {
178                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
179                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
180                         true, false, true, true, true, false,
181                 },
182                 {
183                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
184                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
185                         true, false, true, true, false, false,
186                 },
187                 {
188                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
189                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
190                         true, false, true, true, true, true,
191                 },
192                 {
193                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
194                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
195                         true, true, true, true, false, false,
196                 },
197                 {
198                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
199                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
200                         true, false, true, true, false, false,
201                 },
202                 {
203                         "Trash, not yet eligible for deletion",
204                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
205                         false, false, false, true, true, false,
206                 },
207                 {
208                         "Trash, not yet eligible for deletion, prone to races",
209                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
210                         false, false, false, true, true, false,
211                 },
212                 {
213                         "Trash, eligible for deletion",
214                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
215                         false, false, false, true, false, false,
216                 },
217                 {
218                         "Erroneously trashed during a race, detected before TrashLifetime",
219                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
220                         true, false, true, true, true, false,
221                 },
222                 {
223                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
224                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
225                         true, false, true, true, true, false,
226                 },
227                 {
228                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
229                         none, none, t0.Add(-time.Minute),
230                         false, false, false, true, true, true,
231                 },
232         } {
233                 c.Log("Scenario: ", scenario.label)
234
235                 // We have a few tests to run for each scenario, and
236                 // the tests are expected to change state. By calling
237                 // this setup func between tests, we (re)create the
238                 // scenario as specified, using a new unique block
239                 // locator to prevent interference from previous
240                 // tests.
241
242                 setupScenario := func() (string, []byte) {
243                         nextKey++
244                         blk := []byte(fmt.Sprintf("%d", nextKey))
245                         loc := fmt.Sprintf("%x", md5.Sum(blk))
246                         c.Log("\t", loc)
247                         putS3Obj(scenario.dataT, loc, blk)
248                         putS3Obj(scenario.recentT, "recent/"+loc, nil)
249                         putS3Obj(scenario.trashT, "trash/"+loc, blk)
250                         v.serverClock.now = &t0
251                         return loc, blk
252                 }
253
254                 // Check canGet
255                 loc, blk := setupScenario()
256                 buf := make([]byte, len(blk))
257                 _, err := v.Get(context.Background(), loc, buf)
258                 c.Check(err == nil, check.Equals, scenario.canGet)
259                 if err != nil {
260                         c.Check(os.IsNotExist(err), check.Equals, true)
261                 }
262
263                 // Call Trash, then check canTrash and canGetAfterTrash
264                 loc, blk = setupScenario()
265                 err = v.Trash(loc)
266                 c.Check(err == nil, check.Equals, scenario.canTrash)
267                 _, err = v.Get(context.Background(), loc, buf)
268                 c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
269                 if err != nil {
270                         c.Check(os.IsNotExist(err), check.Equals, true)
271                 }
272
273                 // Call Untrash, then check canUntrash
274                 loc, blk = setupScenario()
275                 err = v.Untrash(loc)
276                 c.Check(err == nil, check.Equals, scenario.canUntrash)
277                 if scenario.dataT != none || scenario.trashT != none {
278                         // In all scenarios where the data exists, we
279                         // should be able to Get after Untrash --
280                         // regardless of timestamps, errors, race
281                         // conditions, etc.
282                         _, err = v.Get(context.Background(), loc, buf)
283                         c.Check(err, check.IsNil)
284                 }
285
286                 // Call EmptyTrash, then check haveTrashAfterEmpty and
287                 // freshAfterEmpty
288                 loc, blk = setupScenario()
289                 v.EmptyTrash()
290                 _, err = v.bucket.Head("trash/"+loc, nil)
291                 c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
292                 if scenario.freshAfterEmpty {
293                         t, err := v.Mtime(loc)
294                         c.Check(err, check.IsNil)
295                         // new mtime must be current (with an
296                         // allowance for 1s timestamp precision)
297                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
298                 }
299
300                 // Check for current Mtime after Put (applies to all
301                 // scenarios)
302                 loc, blk = setupScenario()
303                 err = v.Put(context.Background(), loc, blk)
304                 c.Check(err, check.IsNil)
305                 t, err := v.Mtime(loc)
306                 c.Check(err, check.IsNil)
307                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
308         }
309 }
310
311 type TestableS3Volume struct {
312         *S3Volume
313         server      *s3test.Server
314         c           *check.C
315         serverClock *fakeClock
316 }
317
318 func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
319         clock := &fakeClock{}
320         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
321         c.Assert(err, check.IsNil)
322
323         tmp, err := ioutil.TempFile("", "keepstore")
324         c.Assert(err, check.IsNil)
325         defer os.Remove(tmp.Name())
326         _, err = tmp.Write([]byte("xxx\n"))
327         c.Assert(err, check.IsNil)
328         c.Assert(tmp.Close(), check.IsNil)
329
330         v := &TestableS3Volume{
331                 S3Volume: &S3Volume{
332                         Bucket:             TestBucketName,
333                         AccessKeyFile:      tmp.Name(),
334                         SecretKeyFile:      tmp.Name(),
335                         Endpoint:           srv.URL(),
336                         Region:             "test-region-1",
337                         LocationConstraint: true,
338                         RaceWindow:         arvados.Duration(raceWindow),
339                         S3Replication:      replication,
340                         UnsafeDelete:       s3UnsafeDelete,
341                         ReadOnly:           readonly,
342                         IndexPageSize:      1000,
343                 },
344                 server:      srv,
345                 serverClock: clock,
346         }
347         c.Assert(v.Start(), check.IsNil)
348         err = v.bucket.PutBucket(s3.ACL("private"))
349         c.Assert(err, check.IsNil)
350         return v
351 }
352
353 // PutRaw skips the ContentMD5 test
354 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
355         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
356         if err != nil {
357                 log.Printf("PutRaw: %+v", err)
358         }
359 }
360
361 // TouchWithDate turns back the clock while doing a Touch(). We assume
362 // there are no other operations happening on the same s3test server
363 // while we do this.
364 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
365         v.serverClock.now = &lastPut
366         err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
367         if err != nil {
368                 panic(err)
369         }
370         v.serverClock.now = nil
371 }
372
373 func (v *TestableS3Volume) Teardown() {
374         v.server.Quit()
375 }