10467: Merge branch 'master' into 10467-client-disconnect
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/base64"
7         "encoding/hex"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/AdRoll/goamz/aws"
22         "github.com/AdRoll/goamz/s3"
23 )
24
25 const (
26         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
27         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
28 )
29
30 var (
31         // ErrS3TrashDisabled is returned by Trash if that operation
32         // is impossible with the current config.
33         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
34
35         s3AccessKeyFile string
36         s3SecretKeyFile string
37         s3RegionName    string
38         s3Endpoint      string
39         s3Replication   int
40         s3UnsafeDelete  bool
41         s3RaceWindow    time.Duration
42
43         s3ACL = s3.Private
44 )
45
46 const (
47         maxClockSkew  = 600 * time.Second
48         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
49 )
50
51 type s3VolumeAdder struct {
52         *Config
53 }
54
55 // String implements flag.Value
56 func (s *s3VolumeAdder) String() string {
57         return "-"
58 }
59
60 func (s *s3VolumeAdder) Set(bucketName string) error {
61         if bucketName == "" {
62                 return fmt.Errorf("no container name given")
63         }
64         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
65                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
66         }
67         if deprecated.flagSerializeIO {
68                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
69         }
70         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
71                 Bucket:        bucketName,
72                 AccessKeyFile: s3AccessKeyFile,
73                 SecretKeyFile: s3SecretKeyFile,
74                 Endpoint:      s3Endpoint,
75                 Region:        s3RegionName,
76                 RaceWindow:    arvados.Duration(s3RaceWindow),
77                 S3Replication: s3Replication,
78                 UnsafeDelete:  s3UnsafeDelete,
79                 ReadOnly:      deprecated.flagReadonly,
80                 IndexPageSize: 1000,
81         })
82         return nil
83 }
84
85 func s3regions() (okList []string) {
86         for r := range aws.Regions {
87                 okList = append(okList, r)
88         }
89         return
90 }
91
92 func init() {
93         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
94
95         flag.Var(&s3VolumeAdder{theConfig},
96                 "s3-bucket-volume",
97                 "Use the given bucket as a storage volume. Can be given multiple times.")
98         flag.StringVar(
99                 &s3RegionName,
100                 "s3-region",
101                 "",
102                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
103         flag.StringVar(
104                 &s3Endpoint,
105                 "s3-endpoint",
106                 "",
107                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
108         flag.StringVar(
109                 &s3AccessKeyFile,
110                 "s3-access-key-file",
111                 "",
112                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
113         flag.StringVar(
114                 &s3SecretKeyFile,
115                 "s3-secret-key-file",
116                 "",
117                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
118         flag.DurationVar(
119                 &s3RaceWindow,
120                 "s3-race-window",
121                 24*time.Hour,
122                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
123         flag.IntVar(
124                 &s3Replication,
125                 "s3-replication",
126                 2,
127                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
128         flag.BoolVar(
129                 &s3UnsafeDelete,
130                 "s3-unsafe-delete",
131                 false,
132                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
133 }
134
135 // S3Volume implements Volume using an S3 bucket.
136 type S3Volume struct {
137         AccessKeyFile      string
138         SecretKeyFile      string
139         Endpoint           string
140         Region             string
141         Bucket             string
142         LocationConstraint bool
143         IndexPageSize      int
144         S3Replication      int
145         ConnectTimeout     arvados.Duration
146         ReadTimeout        arvados.Duration
147         RaceWindow         arvados.Duration
148         ReadOnly           bool
149         UnsafeDelete       bool
150
151         bucket *s3.Bucket
152
153         startOnce sync.Once
154 }
155
156 // Examples implements VolumeWithExamples.
157 func (*S3Volume) Examples() []Volume {
158         return []Volume{
159                 &S3Volume{
160                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
161                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
162                         Endpoint:       "",
163                         Region:         "us-east-1",
164                         Bucket:         "example-bucket-name",
165                         IndexPageSize:  1000,
166                         S3Replication:  2,
167                         RaceWindow:     arvados.Duration(24 * time.Hour),
168                         ConnectTimeout: arvados.Duration(time.Minute),
169                         ReadTimeout:    arvados.Duration(5 * time.Minute),
170                 },
171                 &S3Volume{
172                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
173                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
174                         Endpoint:       "https://storage.googleapis.com",
175                         Region:         "",
176                         Bucket:         "example-bucket-name",
177                         IndexPageSize:  1000,
178                         S3Replication:  2,
179                         RaceWindow:     arvados.Duration(24 * time.Hour),
180                         ConnectTimeout: arvados.Duration(time.Minute),
181                         ReadTimeout:    arvados.Duration(5 * time.Minute),
182                 },
183         }
184 }
185
186 // Type implements Volume.
187 func (*S3Volume) Type() string {
188         return "S3"
189 }
190
191 // Start populates private fields and verifies the configuration is
192 // valid.
193 func (v *S3Volume) Start() error {
194         region, ok := aws.Regions[v.Region]
195         if v.Endpoint == "" {
196                 if !ok {
197                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
198                 }
199         } else if ok {
200                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
201                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
202         } else {
203                 region = aws.Region{
204                         Name:                 v.Region,
205                         S3Endpoint:           v.Endpoint,
206                         S3LocationConstraint: v.LocationConstraint,
207                 }
208         }
209
210         var err error
211         var auth aws.Auth
212         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
213         if err != nil {
214                 return err
215         }
216         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
217         if err != nil {
218                 return err
219         }
220
221         // Zero timeouts mean "wait forever", which is a bad
222         // default. Default to long timeouts instead.
223         if v.ConnectTimeout == 0 {
224                 v.ConnectTimeout = s3DefaultConnectTimeout
225         }
226         if v.ReadTimeout == 0 {
227                 v.ReadTimeout = s3DefaultReadTimeout
228         }
229
230         client := s3.New(auth, region)
231         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
232         client.ReadTimeout = time.Duration(v.ReadTimeout)
233         v.bucket = &s3.Bucket{
234                 S3:   client,
235                 Name: v.Bucket,
236         }
237         return nil
238 }
239
240 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
241         ready := make(chan bool)
242         go func() {
243                 rdr, err = v.getReader(loc)
244                 close(ready)
245         }()
246         select {
247         case <-ready:
248                 return
249         case <-ctx.Done():
250                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
251                 go func() {
252                         <-ready
253                         if err == nil {
254                                 rdr.Close()
255                         }
256                 }()
257                 return nil, ctx.Err()
258         }
259 }
260
261 // getReader wraps (Bucket)GetReader.
262 //
263 // In situations where (Bucket)GetReader would fail because the block
264 // disappeared in a Trash race, getReader calls fixRace to recover the
265 // data, and tries again.
266 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
267         rdr, err = v.bucket.GetReader(loc)
268         err = v.translateError(err)
269         if err == nil || !os.IsNotExist(err) {
270                 return
271         }
272         _, err = v.bucket.Head("recent/"+loc, nil)
273         err = v.translateError(err)
274         if err != nil {
275                 // If we can't read recent/X, there's no point in
276                 // trying fixRace. Give up.
277                 return
278         }
279         if !v.fixRace(loc) {
280                 err = os.ErrNotExist
281                 return
282         }
283         rdr, err = v.bucket.GetReader(loc)
284         if err != nil {
285                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
286                 err = v.translateError(err)
287         }
288         return
289 }
290
291 // Get a block: copy the block data into buf, and return the number of
292 // bytes copied.
293 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
294         rdr, err := v.getReaderWithContext(ctx, loc)
295         if err != nil {
296                 return 0, err
297         }
298
299         var n int
300         ready := make(chan bool)
301         go func() {
302                 defer close(ready)
303
304                 defer rdr.Close()
305                 n, err = io.ReadFull(rdr, buf)
306
307                 switch err {
308                 case nil, io.EOF, io.ErrUnexpectedEOF:
309                         err = nil
310                 default:
311                         err = v.translateError(err)
312                 }
313         }()
314         select {
315         case <-ctx.Done():
316                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
317                 rdr.Close()
318                 // Must wait for ReadFull to return, to ensure it
319                 // doesn't write to buf after we return.
320                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
321                 <-ready
322                 return 0, ctx.Err()
323         case <-ready:
324                 return n, err
325         }
326 }
327
328 // Compare the given data with the stored data.
329 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
330         rdr, err := v.getReaderWithContext(ctx, loc)
331         if err != nil {
332                 return err
333         }
334         defer rdr.Close()
335         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
336 }
337
338 // Put writes a block.
339 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
340         if v.ReadOnly {
341                 return MethodDisabledError
342         }
343         var opts s3.Options
344         size := len(block)
345         if size > 0 {
346                 md5, err := hex.DecodeString(loc)
347                 if err != nil {
348                         return err
349                 }
350                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
351         }
352
353         // Send the block data through a pipe, so that (if we need to)
354         // we can close the pipe early and abandon our PutReader()
355         // goroutine, without worrying about PutReader() accessing our
356         // block buffer after we release it.
357         bufr, bufw := io.Pipe()
358         go func() {
359                 io.Copy(bufw, bytes.NewReader(block))
360                 bufw.Close()
361         }()
362
363         var err error
364         ready := make(chan bool)
365         go func() {
366                 defer func() {
367                         if ctx.Err() != nil {
368                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
369                         }
370                 }()
371                 defer close(ready)
372                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
373                 if err != nil {
374                         return
375                 }
376                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
377         }()
378         select {
379         case <-ctx.Done():
380                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
381                 // Our pipe might be stuck in Write(), waiting for
382                 // io.Copy() to read. If so, un-stick it. This means
383                 // PutReader will get corrupt data, but that's OK: the
384                 // size and MD5 won't match, so the write will fail.
385                 go io.Copy(ioutil.Discard, bufr)
386                 // CloseWithError() will return once pending I/O is done.
387                 bufw.CloseWithError(ctx.Err())
388                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
389                 return ctx.Err()
390         case <-ready:
391                 return v.translateError(err)
392         }
393 }
394
395 // Touch sets the timestamp for the given locator to the current time.
396 func (v *S3Volume) Touch(loc string) error {
397         if v.ReadOnly {
398                 return MethodDisabledError
399         }
400         _, err := v.bucket.Head(loc, nil)
401         err = v.translateError(err)
402         if os.IsNotExist(err) && v.fixRace(loc) {
403                 // The data object got trashed in a race, but fixRace
404                 // rescued it.
405         } else if err != nil {
406                 return err
407         }
408         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
409         return v.translateError(err)
410 }
411
412 // Mtime returns the stored timestamp for the given locator.
413 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
414         _, err := v.bucket.Head(loc, nil)
415         if err != nil {
416                 return zeroTime, v.translateError(err)
417         }
418         resp, err := v.bucket.Head("recent/"+loc, nil)
419         err = v.translateError(err)
420         if os.IsNotExist(err) {
421                 // The data object X exists, but recent/X is missing.
422                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
423                 if err != nil {
424                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
425                         return zeroTime, v.translateError(err)
426                 }
427                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
428                 resp, err = v.bucket.Head("recent/"+loc, nil)
429                 if err != nil {
430                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
431                         return zeroTime, v.translateError(err)
432                 }
433         } else if err != nil {
434                 // HEAD recent/X failed for some other reason.
435                 return zeroTime, err
436         }
437         return v.lastModified(resp)
438 }
439
440 // IndexTo writes a complete list of locators with the given prefix
441 // for which Get() can retrieve data.
442 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
443         // Use a merge sort to find matching sets of X and recent/X.
444         dataL := s3Lister{
445                 Bucket:   v.bucket,
446                 Prefix:   prefix,
447                 PageSize: v.IndexPageSize,
448         }
449         recentL := s3Lister{
450                 Bucket:   v.bucket,
451                 Prefix:   "recent/" + prefix,
452                 PageSize: v.IndexPageSize,
453         }
454         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
455                 if data.Key >= "g" {
456                         // Conveniently, "recent/*" and "trash/*" are
457                         // lexically greater than all hex-encoded data
458                         // hashes, so stopping here avoids iterating
459                         // over all of them needlessly with dataL.
460                         break
461                 }
462                 if !v.isKeepBlock(data.Key) {
463                         continue
464                 }
465
466                 // stamp is the list entry we should use to report the
467                 // last-modified time for this data block: it will be
468                 // the recent/X entry if one exists, otherwise the
469                 // entry for the data block itself.
470                 stamp := data
471
472                 // Advance to the corresponding recent/X marker, if any
473                 for recent != nil {
474                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
475                                 recent = recentL.Next()
476                                 continue
477                         } else if cmp == 0 {
478                                 stamp = recent
479                                 recent = recentL.Next()
480                                 break
481                         } else {
482                                 // recent/X marker is missing: we'll
483                                 // use the timestamp on the data
484                                 // object.
485                                 break
486                         }
487                 }
488                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
489                 if err != nil {
490                         return err
491                 }
492                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
493         }
494         return nil
495 }
496
497 // Trash a Keep block.
498 func (v *S3Volume) Trash(loc string) error {
499         if v.ReadOnly {
500                 return MethodDisabledError
501         }
502         if t, err := v.Mtime(loc); err != nil {
503                 return err
504         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
505                 return nil
506         }
507         if theConfig.TrashLifetime == 0 {
508                 if !s3UnsafeDelete {
509                         return ErrS3TrashDisabled
510                 }
511                 return v.bucket.Del(loc)
512         }
513         err := v.checkRaceWindow(loc)
514         if err != nil {
515                 return err
516         }
517         err = v.safeCopy("trash/"+loc, loc)
518         if err != nil {
519                 return err
520         }
521         return v.translateError(v.bucket.Del(loc))
522 }
523
524 // checkRaceWindow returns a non-nil error if trash/loc is, or might
525 // be, in the race window (i.e., it's not safe to trash loc).
526 func (v *S3Volume) checkRaceWindow(loc string) error {
527         resp, err := v.bucket.Head("trash/"+loc, nil)
528         err = v.translateError(err)
529         if os.IsNotExist(err) {
530                 // OK, trash/X doesn't exist so we're not in the race
531                 // window
532                 return nil
533         } else if err != nil {
534                 // Error looking up trash/X. We don't know whether
535                 // we're in the race window
536                 return err
537         }
538         t, err := v.lastModified(resp)
539         if err != nil {
540                 // Can't parse timestamp
541                 return err
542         }
543         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
544         if safeWindow <= 0 {
545                 // We can't count on "touch trash/X" to prolong
546                 // trash/X's lifetime. The new timestamp might not
547                 // become visible until now+raceWindow, and EmptyTrash
548                 // is allowed to delete trash/X before then.
549                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
550         }
551         // trash/X exists, but it won't be eligible for deletion until
552         // after now+raceWindow, so it's safe to overwrite it.
553         return nil
554 }
555
556 // safeCopy calls PutCopy, and checks the response to make sure the
557 // copy succeeded and updated the timestamp on the destination object
558 // (PutCopy returns 200 OK if the request was received, even if the
559 // copy failed).
560 func (v *S3Volume) safeCopy(dst, src string) error {
561         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
562                 ContentType:       "application/octet-stream",
563                 MetadataDirective: "REPLACE",
564         }, v.bucket.Name+"/"+src)
565         err = v.translateError(err)
566         if err != nil {
567                 return err
568         }
569         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
570                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
571         } else if time.Now().Sub(t) > maxClockSkew {
572                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
573         }
574         return nil
575 }
576
577 // Get the LastModified header from resp, and parse it as RFC1123 or
578 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
579 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
580         s := resp.Header.Get("Last-Modified")
581         t, err = time.Parse(time.RFC1123, s)
582         if err != nil && s != "" {
583                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
584                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
585                 // as required by HTTP spec. If it's not a valid HTTP
586                 // header value, it's probably AWS (or s3test) giving
587                 // us a nearly-RFC1123 timestamp.
588                 t, err = time.Parse(nearlyRFC1123, s)
589         }
590         return
591 }
592
593 // Untrash moves block from trash back into store
594 func (v *S3Volume) Untrash(loc string) error {
595         err := v.safeCopy(loc, "trash/"+loc)
596         if err != nil {
597                 return err
598         }
599         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
600         return v.translateError(err)
601 }
602
603 // Status returns a *VolumeStatus representing the current in-use
604 // storage capacity and a fake available capacity that doesn't make
605 // the volume seem full or nearly-full.
606 func (v *S3Volume) Status() *VolumeStatus {
607         return &VolumeStatus{
608                 DeviceNum: 1,
609                 BytesFree: BlockSize * 1000,
610                 BytesUsed: 1,
611         }
612 }
613
614 // String implements fmt.Stringer.
615 func (v *S3Volume) String() string {
616         return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
617 }
618
619 // Writable returns false if all future Put, Mtime, and Delete calls
620 // are expected to fail.
621 func (v *S3Volume) Writable() bool {
622         return !v.ReadOnly
623 }
624
625 // Replication returns the storage redundancy of the underlying
626 // device. Configured via command line flag.
627 func (v *S3Volume) Replication() int {
628         return v.S3Replication
629 }
630
631 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
632
633 func (v *S3Volume) isKeepBlock(s string) bool {
634         return s3KeepBlockRegexp.MatchString(s)
635 }
636
637 // fixRace(X) is called when "recent/X" exists but "X" doesn't
638 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
639 // there was a race between Put and Trash, fixRace recovers from the
640 // race by Untrashing the block.
641 func (v *S3Volume) fixRace(loc string) bool {
642         trash, err := v.bucket.Head("trash/"+loc, nil)
643         if err != nil {
644                 if !os.IsNotExist(v.translateError(err)) {
645                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
646                 }
647                 return false
648         }
649         trashTime, err := v.lastModified(trash)
650         if err != nil {
651                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
652                 return false
653         }
654
655         recent, err := v.bucket.Head("recent/"+loc, nil)
656         if err != nil {
657                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
658                 return false
659         }
660         recentTime, err := v.lastModified(recent)
661         if err != nil {
662                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
663                 return false
664         }
665
666         ageWhenTrashed := trashTime.Sub(recentTime)
667         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
668                 // No evidence of a race: block hasn't been written
669                 // since it became eligible for Trash. No fix needed.
670                 return false
671         }
672
673         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
674         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
675         err = v.safeCopy(loc, "trash/"+loc)
676         if err != nil {
677                 log.Printf("error: fixRace: %s", err)
678                 return false
679         }
680         return true
681 }
682
683 func (v *S3Volume) translateError(err error) error {
684         switch err := err.(type) {
685         case *s3.Error:
686                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
687                         strings.Contains(err.Error(), "Not Found") {
688                         return os.ErrNotExist
689                 }
690                 // Other 404 errors like NoSuchVersion and
691                 // NoSuchBucket are different problems which should
692                 // get called out downstream, so we don't convert them
693                 // to os.ErrNotExist.
694         }
695         return err
696 }
697
698 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
699 // and deletes them from the volume.
700 func (v *S3Volume) EmptyTrash() {
701         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
702
703         // Use a merge sort to find matching sets of trash/X and recent/X.
704         trashL := s3Lister{
705                 Bucket:   v.bucket,
706                 Prefix:   "trash/",
707                 PageSize: v.IndexPageSize,
708         }
709         // Define "ready to delete" as "...when EmptyTrash started".
710         startT := time.Now()
711         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
712                 loc := trash.Key[6:]
713                 if !v.isKeepBlock(loc) {
714                         continue
715                 }
716                 bytesInTrash += trash.Size
717                 blocksInTrash++
718
719                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
720                 if err != nil {
721                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
722                         continue
723                 }
724                 recent, err := v.bucket.Head("recent/"+loc, nil)
725                 if err != nil && os.IsNotExist(v.translateError(err)) {
726                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
727                         err = v.Untrash(loc)
728                         if err != nil {
729                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
730                         }
731                         continue
732                 } else if err != nil {
733                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
734                         continue
735                 }
736                 recentT, err := v.lastModified(recent)
737                 if err != nil {
738                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
739                         continue
740                 }
741                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
742                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
743                                 // recent/loc is too old to protect
744                                 // loc from being Trashed again during
745                                 // the raceWindow that starts if we
746                                 // delete trash/X now.
747                                 //
748                                 // Note this means (TrashCheckInterval
749                                 // < BlobSignatureTTL - raceWindow) is
750                                 // necessary to avoid starvation.
751                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
752                                 v.fixRace(loc)
753                                 v.Touch(loc)
754                                 continue
755                         } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
756                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
757                                 v.fixRace(loc)
758                                 continue
759                         } else if err != nil {
760                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
761                                 continue
762                         }
763                 }
764                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
765                         continue
766                 }
767                 err = v.bucket.Del(trash.Key)
768                 if err != nil {
769                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
770                         continue
771                 }
772                 bytesDeleted += trash.Size
773                 blocksDeleted++
774
775                 _, err = v.bucket.Head(loc, nil)
776                 if os.IsNotExist(err) {
777                         err = v.bucket.Del("recent/" + loc)
778                         if err != nil {
779                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
780                         }
781                 } else if err != nil {
782                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
783                 }
784         }
785         if err := trashL.Error(); err != nil {
786                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
787         }
788         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
789 }
790
791 type s3Lister struct {
792         Bucket     *s3.Bucket
793         Prefix     string
794         PageSize   int
795         nextMarker string
796         buf        []s3.Key
797         err        error
798 }
799
800 // First fetches the first page and returns the first item. It returns
801 // nil if the response is the empty set or an error occurs.
802 func (lister *s3Lister) First() *s3.Key {
803         lister.getPage()
804         return lister.pop()
805 }
806
807 // Next returns the next item, fetching the next page if necessary. It
808 // returns nil if the last available item has already been fetched, or
809 // an error occurs.
810 func (lister *s3Lister) Next() *s3.Key {
811         if len(lister.buf) == 0 && lister.nextMarker != "" {
812                 lister.getPage()
813         }
814         return lister.pop()
815 }
816
817 // Return the most recent error encountered by First or Next.
818 func (lister *s3Lister) Error() error {
819         return lister.err
820 }
821
822 func (lister *s3Lister) getPage() {
823         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
824         lister.nextMarker = ""
825         if err != nil {
826                 lister.err = err
827                 return
828         }
829         if resp.IsTruncated {
830                 lister.nextMarker = resp.NextMarker
831         }
832         lister.buf = make([]s3.Key, 0, len(resp.Contents))
833         for _, key := range resp.Contents {
834                 if !strings.HasPrefix(key.Key, lister.Prefix) {
835                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
836                         continue
837                 }
838                 lister.buf = append(lister.buf, key)
839         }
840 }
841
842 func (lister *s3Lister) pop() (k *s3.Key) {
843         if len(lister.buf) > 0 {
844                 k = &lister.buf[0]
845                 lister.buf = lister.buf[1:]
846         }
847         return
848 }