20336: Allow filtering on set of integers.
[arvados.git] / services / keepstore / s3_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/AdRoll/goamz/s3"
23         "github.com/AdRoll/goamz/s3/s3test"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         check "gopkg.in/check.v1"
27 )
28
29 const (
30         TestBucketName = "testbucket"
31 )
32
33 type fakeClock struct {
34         now *time.Time
35 }
36
37 func (c *fakeClock) Now() time.Time {
38         if c.now == nil {
39                 return time.Now()
40         }
41         return *c.now
42 }
43
44 var _ = check.Suite(&StubbedS3Suite{})
45
46 type StubbedS3Suite struct {
47         s3server *httptest.Server
48         metadata *httptest.Server
49         cluster  *arvados.Cluster
50         handler  *handler
51         volumes  []*TestableS3Volume
52 }
53
54 func (s *StubbedS3Suite) SetUpTest(c *check.C) {
55         s.s3server = nil
56         s.metadata = nil
57         s.cluster = testCluster(c)
58         s.cluster.Volumes = map[string]arvados.Volume{
59                 "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
60                 "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
61         }
62         s.handler = &handler{}
63 }
64
65 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
66         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
67                 // Use a negative raceWindow so s3test's 1-second
68                 // timestamp precision doesn't confuse fixRace.
69                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
70         })
71 }
72
73 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
74         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
75                 return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
76         })
77 }
78
79 func (s *StubbedS3Suite) TestGenericWithPrefix(c *check.C) {
80         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
81                 v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
82                 v.PrefixLength = 3
83                 return v
84         })
85 }
86
87 func (s *StubbedS3Suite) TestIndex(c *check.C) {
88         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
89         v.IndexPageSize = 3
90         for i := 0; i < 256; i++ {
91                 v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
92         }
93         for _, spec := range []struct {
94                 prefix      string
95                 expectMatch int
96         }{
97                 {"", 256},
98                 {"c", 16},
99                 {"bc", 1},
100                 {"abc", 0},
101         } {
102                 buf := new(bytes.Buffer)
103                 err := v.IndexTo(spec.prefix, buf)
104                 c.Check(err, check.IsNil)
105
106                 idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
107                 c.Check(len(idx), check.Equals, spec.expectMatch+1)
108                 c.Check(len(idx[len(idx)-1]), check.Equals, 0)
109         }
110 }
111
112 func (s *StubbedS3Suite) TestSignatureVersion(c *check.C) {
113         var header http.Header
114         stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
115                 header = r.Header
116         }))
117         defer stub.Close()
118
119         // Default V4 signature
120         vol := S3Volume{
121                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
122                         AccessKeyID:     "xxx",
123                         SecretAccessKey: "xxx",
124                         Endpoint:        stub.URL,
125                         Region:          "test-region-1",
126                         Bucket:          "test-bucket-name",
127                 },
128                 cluster: s.cluster,
129                 logger:  ctxlog.TestLogger(c),
130                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
131         }
132         err := vol.check()
133         c.Check(err, check.IsNil)
134         err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
135         c.Check(err, check.IsNil)
136         c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
137
138         // Force V2 signature
139         vol = S3Volume{
140                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
141                         AccessKeyID:     "xxx",
142                         SecretAccessKey: "xxx",
143                         Endpoint:        stub.URL,
144                         Region:          "test-region-1",
145                         Bucket:          "test-bucket-name",
146                         V2Signature:     true,
147                 },
148                 cluster: s.cluster,
149                 logger:  ctxlog.TestLogger(c),
150                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
151         }
152         err = vol.check()
153         c.Check(err, check.IsNil)
154         err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
155         c.Check(err, check.IsNil)
156         c.Check(header.Get("Authorization"), check.Matches, `AWS xxx:.*`)
157 }
158
159 func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
160         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
161                 upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
162                 exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
163                 // Literal example from
164                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
165                 // but with updated timestamps
166                 io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
167         }))
168         defer s.metadata.Close()
169
170         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
171         c.Check(v.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
172         c.Check(v.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
173         c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
174         c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
175
176         s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
177                 w.WriteHeader(http.StatusNotFound)
178         }))
179         deadv := &S3Volume{
180                 S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
181                         IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
182                         Endpoint: "http://localhost:12345",
183                         Region:   "test-region-1",
184                         Bucket:   "test-bucket-name",
185                 },
186                 cluster: s.cluster,
187                 logger:  ctxlog.TestLogger(c),
188                 metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
189         }
190         err := deadv.check()
191         c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
192         c.Check(err, check.ErrorMatches, `.*404.*`)
193 }
194
195 func (s *StubbedS3Suite) TestStats(c *check.C) {
196         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
197         stats := func() string {
198                 buf, err := json.Marshal(v.InternalStats())
199                 c.Check(err, check.IsNil)
200                 return string(buf)
201         }
202
203         c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
204
205         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
206         _, err := v.Get(context.Background(), loc, make([]byte, 3))
207         c.Check(err, check.NotNil)
208         c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
209         c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
210         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
211
212         err = v.Put(context.Background(), loc, []byte("foo"))
213         c.Check(err, check.IsNil)
214         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
215         c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
216
217         _, err = v.Get(context.Background(), loc, make([]byte, 3))
218         c.Check(err, check.IsNil)
219         _, err = v.Get(context.Background(), loc, make([]byte, 3))
220         c.Check(err, check.IsNil)
221         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
222 }
223
224 type blockingHandler struct {
225         requested chan *http.Request
226         unblock   chan struct{}
227 }
228
229 func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
230         if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
231                 // Accept PutBucket ("PUT /bucketname/"), called by
232                 // newTestableVolume
233                 return
234         }
235         if h.requested != nil {
236                 h.requested <- r
237         }
238         if h.unblock != nil {
239                 <-h.unblock
240         }
241         http.Error(w, "nothing here", http.StatusNotFound)
242 }
243
244 func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
245         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
246         buf := make([]byte, 3)
247
248         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
249                 _, err := v.Get(ctx, loc, buf)
250                 return err
251         })
252 }
253
254 func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
255         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
256         buf := []byte("bar")
257
258         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
259                 return v.Compare(ctx, loc, buf)
260         })
261 }
262
263 func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
264         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
265         buf := []byte("foo")
266
267         s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
268                 return v.Put(ctx, loc, buf)
269         })
270 }
271
272 func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
273         handler := &blockingHandler{}
274         s.s3server = httptest.NewServer(handler)
275         defer s.s3server.Close()
276
277         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
278
279         ctx, cancel := context.WithCancel(context.Background())
280
281         handler.requested = make(chan *http.Request)
282         handler.unblock = make(chan struct{})
283         defer close(handler.unblock)
284
285         doneFunc := make(chan struct{})
286         go func() {
287                 err := testFunc(ctx, v)
288                 c.Check(err, check.Equals, context.Canceled)
289                 close(doneFunc)
290         }()
291
292         timeout := time.After(10 * time.Second)
293
294         // Wait for the stub server to receive a request, meaning
295         // Get() is waiting for an s3 operation.
296         select {
297         case <-timeout:
298                 c.Fatal("timed out waiting for test func to call our handler")
299         case <-doneFunc:
300                 c.Fatal("test func finished without even calling our handler!")
301         case <-handler.requested:
302         }
303
304         cancel()
305
306         select {
307         case <-timeout:
308                 c.Fatal("timed out")
309         case <-doneFunc:
310         }
311 }
312
313 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
314         s.cluster.Collections.BlobTrashLifetime.Set("1h")
315         s.cluster.Collections.BlobSigningTTL.Set("1h")
316
317         v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
318         var none time.Time
319
320         putS3Obj := func(t time.Time, key string, data []byte) {
321                 if t == none {
322                         return
323                 }
324                 v.serverClock.now = &t
325                 v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
326         }
327
328         t0 := time.Now()
329         nextKey := 0
330         for _, scenario := range []struct {
331                 label               string
332                 dataT               time.Time
333                 recentT             time.Time
334                 trashT              time.Time
335                 canGet              bool
336                 canTrash            bool
337                 canGetAfterTrash    bool
338                 canUntrash          bool
339                 haveTrashAfterEmpty bool
340                 freshAfterEmpty     bool
341         }{
342                 {
343                         "No related objects",
344                         none, none, none,
345                         false, false, false, false, false, false,
346                 },
347                 {
348                         // Stored by older version, or there was a
349                         // race between EmptyTrash and Put: Trash is a
350                         // no-op even though the data object is very
351                         // old
352                         "No recent/X",
353                         t0.Add(-48 * time.Hour), none, none,
354                         true, true, true, false, false, false,
355                 },
356                 {
357                         "Not trash, but old enough to be eligible for trash",
358                         t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
359                         true, true, false, false, false, false,
360                 },
361                 {
362                         "Not trash, and not old enough to be eligible for trash",
363                         t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
364                         true, true, true, false, false, false,
365                 },
366                 {
367                         "Trashed + untrashed copies exist, due to recent race between Trash and Put",
368                         t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
369                         true, true, true, true, true, false,
370                 },
371                 {
372                         "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
373                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
374                         true, false, true, true, true, false,
375                 },
376                 {
377                         "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
378                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
379                         true, false, true, true, false, false,
380                 },
381                 {
382                         "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
383                         t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
384                         true, false, true, true, true, true,
385                 },
386                 {
387                         "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
388                         t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
389                         true, true, true, true, false, false,
390                 },
391                 {
392                         "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
393                         t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
394                         true, false, true, true, false, false,
395                 },
396                 {
397                         "Trash, not yet eligible for deletion",
398                         none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
399                         false, false, false, true, true, false,
400                 },
401                 {
402                         "Trash, not yet eligible for deletion, prone to races",
403                         none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
404                         false, false, false, true, true, false,
405                 },
406                 {
407                         "Trash, eligible for deletion",
408                         none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
409                         false, false, false, true, false, false,
410                 },
411                 {
412                         "Erroneously trashed during a race, detected before BlobTrashLifetime",
413                         none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
414                         true, false, true, true, true, false,
415                 },
416                 {
417                         "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
418                         none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
419                         true, false, true, true, true, false,
420                 },
421                 {
422                         "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
423                         none, none, t0.Add(-time.Minute),
424                         false, false, false, true, true, true,
425                 },
426         } {
427                 for _, prefixLength := range []int{0, 3} {
428                         v.PrefixLength = prefixLength
429                         c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
430
431                         // We have a few tests to run for each scenario, and
432                         // the tests are expected to change state. By calling
433                         // this setup func between tests, we (re)create the
434                         // scenario as specified, using a new unique block
435                         // locator to prevent interference from previous
436                         // tests.
437
438                         setupScenario := func() (string, []byte) {
439                                 nextKey++
440                                 blk := []byte(fmt.Sprintf("%d", nextKey))
441                                 loc := fmt.Sprintf("%x", md5.Sum(blk))
442                                 key := loc
443                                 if prefixLength > 0 {
444                                         key = loc[:prefixLength] + "/" + loc
445                                 }
446                                 c.Log("\t", loc)
447                                 putS3Obj(scenario.dataT, key, blk)
448                                 putS3Obj(scenario.recentT, "recent/"+key, nil)
449                                 putS3Obj(scenario.trashT, "trash/"+key, blk)
450                                 v.serverClock.now = &t0
451                                 return loc, blk
452                         }
453
454                         // Check canGet
455                         loc, blk := setupScenario()
456                         buf := make([]byte, len(blk))
457                         _, err := v.Get(context.Background(), loc, buf)
458                         c.Check(err == nil, check.Equals, scenario.canGet)
459                         if err != nil {
460                                 c.Check(os.IsNotExist(err), check.Equals, true)
461                         }
462
463                         // Call Trash, then check canTrash and canGetAfterTrash
464                         loc, _ = setupScenario()
465                         err = v.Trash(loc)
466                         c.Check(err == nil, check.Equals, scenario.canTrash)
467                         _, err = v.Get(context.Background(), loc, buf)
468                         c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
469                         if err != nil {
470                                 c.Check(os.IsNotExist(err), check.Equals, true)
471                         }
472
473                         // Call Untrash, then check canUntrash
474                         loc, _ = setupScenario()
475                         err = v.Untrash(loc)
476                         c.Check(err == nil, check.Equals, scenario.canUntrash)
477                         if scenario.dataT != none || scenario.trashT != none {
478                                 // In all scenarios where the data exists, we
479                                 // should be able to Get after Untrash --
480                                 // regardless of timestamps, errors, race
481                                 // conditions, etc.
482                                 _, err = v.Get(context.Background(), loc, buf)
483                                 c.Check(err, check.IsNil)
484                         }
485
486                         // Call EmptyTrash, then check haveTrashAfterEmpty and
487                         // freshAfterEmpty
488                         loc, _ = setupScenario()
489                         v.EmptyTrash()
490                         _, err = v.bucket.Head("trash/"+v.key(loc), nil)
491                         c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
492                         if scenario.freshAfterEmpty {
493                                 t, err := v.Mtime(loc)
494                                 c.Check(err, check.IsNil)
495                                 // new mtime must be current (with an
496                                 // allowance for 1s timestamp precision)
497                                 c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
498                         }
499
500                         // Check for current Mtime after Put (applies to all
501                         // scenarios)
502                         loc, blk = setupScenario()
503                         err = v.Put(context.Background(), loc, blk)
504                         c.Check(err, check.IsNil)
505                         t, err := v.Mtime(loc)
506                         c.Check(err, check.IsNil)
507                         c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
508                 }
509         }
510 }
511
512 type TestableS3Volume struct {
513         *S3Volume
514         server      *s3test.Server
515         c           *check.C
516         serverClock *fakeClock
517 }
518
519 func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3Volume {
520         clock := &fakeClock{}
521         srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
522         c.Assert(err, check.IsNil)
523         endpoint := srv.URL()
524         if s.s3server != nil {
525                 endpoint = s.s3server.URL
526         }
527
528         iamRole, accessKey, secretKey := "", "xxx", "xxx"
529         if s.metadata != nil {
530                 iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
531         }
532
533         v := &TestableS3Volume{
534                 S3Volume: &S3Volume{
535                         S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
536                                 IAMRole:            iamRole,
537                                 AccessKeyID:        accessKey,
538                                 SecretAccessKey:    secretKey,
539                                 Bucket:             TestBucketName,
540                                 Endpoint:           endpoint,
541                                 Region:             "test-region-1",
542                                 LocationConstraint: true,
543                                 UnsafeDelete:       true,
544                                 IndexPageSize:      1000,
545                         },
546                         cluster: cluster,
547                         volume:  volume,
548                         logger:  ctxlog.TestLogger(c),
549                         metrics: metrics,
550                 },
551                 c:           c,
552                 server:      srv,
553                 serverClock: clock,
554         }
555         c.Assert(v.S3Volume.check(), check.IsNil)
556         c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
557         // We couldn't set RaceWindow until now because check()
558         // rejects negative values.
559         v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
560         return v
561 }
562
563 // PutRaw skips the ContentMD5 test
564 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
565         key := v.key(loc)
566         err := v.bucket.Bucket().Put(key, block, "application/octet-stream", s3ACL, s3.Options{})
567         if err != nil {
568                 v.logger.Printf("PutRaw: %s: %+v", loc, err)
569         }
570         err = v.bucket.Bucket().Put("recent/"+key, nil, "application/octet-stream", s3ACL, s3.Options{})
571         if err != nil {
572                 v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
573         }
574 }
575
576 // TouchWithDate turns back the clock while doing a Touch(). We assume
577 // there are no other operations happening on the same s3test server
578 // while we do this.
579 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
580         v.serverClock.now = &lastPut
581         err := v.bucket.Bucket().Put("recent/"+v.key(locator), nil, "application/octet-stream", s3ACL, s3.Options{})
582         if err != nil {
583                 panic(err)
584         }
585         v.serverClock.now = nil
586 }
587
588 func (v *TestableS3Volume) Teardown() {
589         v.server.Quit()
590 }
591
592 func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
593         return "get", "put"
594 }