14397: Pass SHA256 to s3 library.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/sha256"
11         "encoding/base64"
12         "encoding/hex"
13         "flag"
14         "fmt"
15         "io"
16         "io/ioutil"
17         "net/http"
18         "os"
19         "regexp"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         "git.curoverse.com/arvados.git/sdk/go/arvados"
26         "github.com/AdRoll/goamz/aws"
27         "github.com/AdRoll/goamz/s3"
28 )
29
30 const (
31         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
32         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
33 )
34
35 var (
36         // ErrS3TrashDisabled is returned by Trash if that operation
37         // is impossible with the current config.
38         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
39
40         s3AccessKeyFile string
41         s3SecretKeyFile string
42         s3RegionName    string
43         s3Endpoint      string
44         s3Replication   int
45         s3UnsafeDelete  bool
46         s3RaceWindow    time.Duration
47
48         s3ACL = s3.Private
49
50         zeroTime time.Time
51 )
52
53 const (
54         maxClockSkew  = 600 * time.Second
55         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
56 )
57
58 type s3VolumeAdder struct {
59         *Config
60 }
61
62 // String implements flag.Value
63 func (s *s3VolumeAdder) String() string {
64         return "-"
65 }
66
67 func (s *s3VolumeAdder) Set(bucketName string) error {
68         if bucketName == "" {
69                 return fmt.Errorf("no container name given")
70         }
71         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
72                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
73         }
74         if deprecated.flagSerializeIO {
75                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
76         }
77         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
78                 Bucket:        bucketName,
79                 AccessKeyFile: s3AccessKeyFile,
80                 SecretKeyFile: s3SecretKeyFile,
81                 Endpoint:      s3Endpoint,
82                 Region:        s3RegionName,
83                 RaceWindow:    arvados.Duration(s3RaceWindow),
84                 S3Replication: s3Replication,
85                 UnsafeDelete:  s3UnsafeDelete,
86                 ReadOnly:      deprecated.flagReadonly,
87                 IndexPageSize: 1000,
88         })
89         return nil
90 }
91
92 func s3regions() (okList []string) {
93         for r := range aws.Regions {
94                 okList = append(okList, r)
95         }
96         return
97 }
98
99 func init() {
100         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
101
102         flag.Var(&s3VolumeAdder{theConfig},
103                 "s3-bucket-volume",
104                 "Use the given bucket as a storage volume. Can be given multiple times.")
105         flag.StringVar(
106                 &s3RegionName,
107                 "s3-region",
108                 "",
109                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
110         flag.StringVar(
111                 &s3Endpoint,
112                 "s3-endpoint",
113                 "",
114                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
115         flag.StringVar(
116                 &s3AccessKeyFile,
117                 "s3-access-key-file",
118                 "",
119                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
120         flag.StringVar(
121                 &s3SecretKeyFile,
122                 "s3-secret-key-file",
123                 "",
124                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
125         flag.DurationVar(
126                 &s3RaceWindow,
127                 "s3-race-window",
128                 24*time.Hour,
129                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
130         flag.IntVar(
131                 &s3Replication,
132                 "s3-replication",
133                 2,
134                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
135         flag.BoolVar(
136                 &s3UnsafeDelete,
137                 "s3-unsafe-delete",
138                 false,
139                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
140 }
141
142 // S3Volume implements Volume using an S3 bucket.
143 type S3Volume struct {
144         AccessKeyFile      string
145         SecretKeyFile      string
146         Endpoint           string
147         Region             string
148         Bucket             string
149         LocationConstraint bool
150         IndexPageSize      int
151         S3Replication      int
152         ConnectTimeout     arvados.Duration
153         ReadTimeout        arvados.Duration
154         RaceWindow         arvados.Duration
155         ReadOnly           bool
156         UnsafeDelete       bool
157         StorageClasses     []string
158
159         bucket *s3bucket
160
161         startOnce sync.Once
162 }
163
164 // Examples implements VolumeWithExamples.
165 func (*S3Volume) Examples() []Volume {
166         return []Volume{
167                 &S3Volume{
168                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
169                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
170                         Endpoint:       "",
171                         Region:         "us-east-1",
172                         Bucket:         "example-bucket-name",
173                         IndexPageSize:  1000,
174                         S3Replication:  2,
175                         RaceWindow:     arvados.Duration(24 * time.Hour),
176                         ConnectTimeout: arvados.Duration(time.Minute),
177                         ReadTimeout:    arvados.Duration(5 * time.Minute),
178                 },
179                 &S3Volume{
180                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
181                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
182                         Endpoint:       "https://storage.googleapis.com",
183                         Region:         "",
184                         Bucket:         "example-bucket-name",
185                         IndexPageSize:  1000,
186                         S3Replication:  2,
187                         RaceWindow:     arvados.Duration(24 * time.Hour),
188                         ConnectTimeout: arvados.Duration(time.Minute),
189                         ReadTimeout:    arvados.Duration(5 * time.Minute),
190                 },
191         }
192 }
193
194 // Type implements Volume.
195 func (*S3Volume) Type() string {
196         return "S3"
197 }
198
199 // Start populates private fields and verifies the configuration is
200 // valid.
201 func (v *S3Volume) Start() error {
202         region, ok := aws.Regions[v.Region]
203         if v.Endpoint == "" {
204                 if !ok {
205                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
206                 }
207         } else if ok {
208                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
209                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
210         } else {
211                 region = aws.Region{
212                         Name:                 v.Region,
213                         S3Endpoint:           v.Endpoint,
214                         S3LocationConstraint: v.LocationConstraint,
215                 }
216         }
217
218         var err error
219         var auth aws.Auth
220         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
221         if err != nil {
222                 return err
223         }
224         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
225         if err != nil {
226                 return err
227         }
228
229         // Zero timeouts mean "wait forever", which is a bad
230         // default. Default to long timeouts instead.
231         if v.ConnectTimeout == 0 {
232                 v.ConnectTimeout = s3DefaultConnectTimeout
233         }
234         if v.ReadTimeout == 0 {
235                 v.ReadTimeout = s3DefaultReadTimeout
236         }
237
238         client := s3.New(auth, region)
239         if region.EC2Endpoint.Signer == aws.V4Signature {
240                 // Currently affects only eu-central-1
241                 client.Signature = aws.V4Signature
242         }
243         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
244         client.ReadTimeout = time.Duration(v.ReadTimeout)
245         v.bucket = &s3bucket{
246                 Bucket: &s3.Bucket{
247                         S3:   client,
248                         Name: v.Bucket,
249                 },
250         }
251         return nil
252 }
253
254 // DeviceID returns a globally unique ID for the storage bucket.
255 func (v *S3Volume) DeviceID() string {
256         return "s3://" + v.Endpoint + "/" + v.Bucket
257 }
258
259 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
260         ready := make(chan bool)
261         go func() {
262                 rdr, err = v.getReader(loc)
263                 close(ready)
264         }()
265         select {
266         case <-ready:
267                 return
268         case <-ctx.Done():
269                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
270                 go func() {
271                         <-ready
272                         if err == nil {
273                                 rdr.Close()
274                         }
275                 }()
276                 return nil, ctx.Err()
277         }
278 }
279
280 // getReader wraps (Bucket)GetReader.
281 //
282 // In situations where (Bucket)GetReader would fail because the block
283 // disappeared in a Trash race, getReader calls fixRace to recover the
284 // data, and tries again.
285 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
286         rdr, err = v.bucket.GetReader(loc)
287         err = v.translateError(err)
288         if err == nil || !os.IsNotExist(err) {
289                 return
290         }
291
292         _, err = v.bucket.Head("recent/"+loc, nil)
293         err = v.translateError(err)
294         if err != nil {
295                 // If we can't read recent/X, there's no point in
296                 // trying fixRace. Give up.
297                 return
298         }
299         if !v.fixRace(loc) {
300                 err = os.ErrNotExist
301                 return
302         }
303
304         rdr, err = v.bucket.GetReader(loc)
305         if err != nil {
306                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
307                 err = v.translateError(err)
308         }
309         return
310 }
311
312 // Get a block: copy the block data into buf, and return the number of
313 // bytes copied.
314 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
315         rdr, err := v.getReaderWithContext(ctx, loc)
316         if err != nil {
317                 return 0, err
318         }
319
320         var n int
321         ready := make(chan bool)
322         go func() {
323                 defer close(ready)
324
325                 defer rdr.Close()
326                 n, err = io.ReadFull(rdr, buf)
327
328                 switch err {
329                 case nil, io.EOF, io.ErrUnexpectedEOF:
330                         err = nil
331                 default:
332                         err = v.translateError(err)
333                 }
334         }()
335         select {
336         case <-ctx.Done():
337                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
338                 rdr.Close()
339                 // Must wait for ReadFull to return, to ensure it
340                 // doesn't write to buf after we return.
341                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
342                 <-ready
343                 return 0, ctx.Err()
344         case <-ready:
345                 return n, err
346         }
347 }
348
349 // Compare the given data with the stored data.
350 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
351         errChan := make(chan error, 1)
352         go func() {
353                 _, err := v.bucket.Head("recent/"+loc, nil)
354                 errChan <- err
355         }()
356         var err error
357         select {
358         case <-ctx.Done():
359                 return ctx.Err()
360         case err = <-errChan:
361         }
362         if err != nil {
363                 // Checking for "loc" itself here would interfere with
364                 // future GET requests.
365                 //
366                 // On AWS, if X doesn't exist, a HEAD or GET request
367                 // for X causes X's non-existence to be cached. Thus,
368                 // if we test for X, then create X and return a
369                 // signature to our client, the client might still get
370                 // 404 from all keepstores when trying to read it.
371                 //
372                 // To avoid this, we avoid doing HEAD X or GET X until
373                 // we know X has been written.
374                 //
375                 // Note that X might exist even though recent/X
376                 // doesn't: for example, the response to HEAD recent/X
377                 // might itself come from a stale cache. In such
378                 // cases, we will return a false negative and
379                 // PutHandler might needlessly create another replica
380                 // on a different volume. That's not ideal, but it's
381                 // better than passing the eventually-consistent
382                 // problem on to our clients.
383                 return v.translateError(err)
384         }
385         rdr, err := v.getReaderWithContext(ctx, loc)
386         if err != nil {
387                 return err
388         }
389         defer rdr.Close()
390         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
391 }
392
393 // Put writes a block.
394 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
395         if v.ReadOnly {
396                 return MethodDisabledError
397         }
398         var opts s3.Options
399         size := len(block)
400         if size > 0 {
401                 md5, err := hex.DecodeString(loc)
402                 if err != nil {
403                         return err
404                 }
405                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
406                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
407         }
408
409         // Send the block data through a pipe, so that (if we need to)
410         // we can close the pipe early and abandon our PutReader()
411         // goroutine, without worrying about PutReader() accessing our
412         // block buffer after we release it.
413         bufr, bufw := io.Pipe()
414         go func() {
415                 io.Copy(bufw, bytes.NewReader(block))
416                 bufw.Close()
417         }()
418
419         var err error
420         ready := make(chan bool)
421         go func() {
422                 defer func() {
423                         if ctx.Err() != nil {
424                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
425                         }
426                 }()
427                 defer close(ready)
428                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
429                 if err != nil {
430                         return
431                 }
432                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
433         }()
434         select {
435         case <-ctx.Done():
436                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
437                 // Our pipe might be stuck in Write(), waiting for
438                 // PutReader() to read. If so, un-stick it. This means
439                 // PutReader will get corrupt data, but that's OK: the
440                 // size and MD5 won't match, so the write will fail.
441                 go io.Copy(ioutil.Discard, bufr)
442                 // CloseWithError() will return once pending I/O is done.
443                 bufw.CloseWithError(ctx.Err())
444                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
445                 return ctx.Err()
446         case <-ready:
447                 // Unblock pipe in case PutReader did not consume it.
448                 io.Copy(ioutil.Discard, bufr)
449                 return v.translateError(err)
450         }
451 }
452
453 // Touch sets the timestamp for the given locator to the current time.
454 func (v *S3Volume) Touch(loc string) error {
455         if v.ReadOnly {
456                 return MethodDisabledError
457         }
458         _, err := v.bucket.Head(loc, nil)
459         err = v.translateError(err)
460         if os.IsNotExist(err) && v.fixRace(loc) {
461                 // The data object got trashed in a race, but fixRace
462                 // rescued it.
463         } else if err != nil {
464                 return err
465         }
466         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
467         return v.translateError(err)
468 }
469
470 // Mtime returns the stored timestamp for the given locator.
471 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
472         _, err := v.bucket.Head(loc, nil)
473         if err != nil {
474                 return zeroTime, v.translateError(err)
475         }
476         resp, err := v.bucket.Head("recent/"+loc, nil)
477         err = v.translateError(err)
478         if os.IsNotExist(err) {
479                 // The data object X exists, but recent/X is missing.
480                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
481                 if err != nil {
482                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
483                         return zeroTime, v.translateError(err)
484                 }
485                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
486                 resp, err = v.bucket.Head("recent/"+loc, nil)
487                 if err != nil {
488                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
489                         return zeroTime, v.translateError(err)
490                 }
491         } else if err != nil {
492                 // HEAD recent/X failed for some other reason.
493                 return zeroTime, err
494         }
495         return v.lastModified(resp)
496 }
497
498 // IndexTo writes a complete list of locators with the given prefix
499 // for which Get() can retrieve data.
500 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
501         // Use a merge sort to find matching sets of X and recent/X.
502         dataL := s3Lister{
503                 Bucket:   v.bucket.Bucket,
504                 Prefix:   prefix,
505                 PageSize: v.IndexPageSize,
506                 Stats:    &v.bucket.stats,
507         }
508         recentL := s3Lister{
509                 Bucket:   v.bucket.Bucket,
510                 Prefix:   "recent/" + prefix,
511                 PageSize: v.IndexPageSize,
512                 Stats:    &v.bucket.stats,
513         }
514         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
515                 if data.Key >= "g" {
516                         // Conveniently, "recent/*" and "trash/*" are
517                         // lexically greater than all hex-encoded data
518                         // hashes, so stopping here avoids iterating
519                         // over all of them needlessly with dataL.
520                         break
521                 }
522                 if !v.isKeepBlock(data.Key) {
523                         continue
524                 }
525
526                 // stamp is the list entry we should use to report the
527                 // last-modified time for this data block: it will be
528                 // the recent/X entry if one exists, otherwise the
529                 // entry for the data block itself.
530                 stamp := data
531
532                 // Advance to the corresponding recent/X marker, if any
533                 for recent != nil && recentL.Error() == nil {
534                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
535                                 recent = recentL.Next()
536                                 continue
537                         } else if cmp == 0 {
538                                 stamp = recent
539                                 recent = recentL.Next()
540                                 break
541                         } else {
542                                 // recent/X marker is missing: we'll
543                                 // use the timestamp on the data
544                                 // object.
545                                 break
546                         }
547                 }
548                 if err := recentL.Error(); err != nil {
549                         return err
550                 }
551                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
552                 if err != nil {
553                         return err
554                 }
555                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
556         }
557         return dataL.Error()
558 }
559
560 // Trash a Keep block.
561 func (v *S3Volume) Trash(loc string) error {
562         if v.ReadOnly {
563                 return MethodDisabledError
564         }
565         if t, err := v.Mtime(loc); err != nil {
566                 return err
567         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
568                 return nil
569         }
570         if theConfig.TrashLifetime == 0 {
571                 if !s3UnsafeDelete {
572                         return ErrS3TrashDisabled
573                 }
574                 return v.translateError(v.bucket.Del(loc))
575         }
576         err := v.checkRaceWindow(loc)
577         if err != nil {
578                 return err
579         }
580         err = v.safeCopy("trash/"+loc, loc)
581         if err != nil {
582                 return err
583         }
584         return v.translateError(v.bucket.Del(loc))
585 }
586
587 // checkRaceWindow returns a non-nil error if trash/loc is, or might
588 // be, in the race window (i.e., it's not safe to trash loc).
589 func (v *S3Volume) checkRaceWindow(loc string) error {
590         resp, err := v.bucket.Head("trash/"+loc, nil)
591         err = v.translateError(err)
592         if os.IsNotExist(err) {
593                 // OK, trash/X doesn't exist so we're not in the race
594                 // window
595                 return nil
596         } else if err != nil {
597                 // Error looking up trash/X. We don't know whether
598                 // we're in the race window
599                 return err
600         }
601         t, err := v.lastModified(resp)
602         if err != nil {
603                 // Can't parse timestamp
604                 return err
605         }
606         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
607         if safeWindow <= 0 {
608                 // We can't count on "touch trash/X" to prolong
609                 // trash/X's lifetime. The new timestamp might not
610                 // become visible until now+raceWindow, and EmptyTrash
611                 // is allowed to delete trash/X before then.
612                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
613         }
614         // trash/X exists, but it won't be eligible for deletion until
615         // after now+raceWindow, so it's safe to overwrite it.
616         return nil
617 }
618
619 // safeCopy calls PutCopy, and checks the response to make sure the
620 // copy succeeded and updated the timestamp on the destination object
621 // (PutCopy returns 200 OK if the request was received, even if the
622 // copy failed).
623 func (v *S3Volume) safeCopy(dst, src string) error {
624         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
625                 ContentType:       "application/octet-stream",
626                 MetadataDirective: "REPLACE",
627         }, v.bucket.Name+"/"+src)
628         err = v.translateError(err)
629         if os.IsNotExist(err) {
630                 return err
631         } else if err != nil {
632                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
633         }
634         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
635                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
636         } else if time.Now().Sub(t) > maxClockSkew {
637                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
638         }
639         return nil
640 }
641
642 // Get the LastModified header from resp, and parse it as RFC1123 or
643 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
644 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
645         s := resp.Header.Get("Last-Modified")
646         t, err = time.Parse(time.RFC1123, s)
647         if err != nil && s != "" {
648                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
649                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
650                 // as required by HTTP spec. If it's not a valid HTTP
651                 // header value, it's probably AWS (or s3test) giving
652                 // us a nearly-RFC1123 timestamp.
653                 t, err = time.Parse(nearlyRFC1123, s)
654         }
655         return
656 }
657
658 // Untrash moves block from trash back into store
659 func (v *S3Volume) Untrash(loc string) error {
660         err := v.safeCopy(loc, "trash/"+loc)
661         if err != nil {
662                 return err
663         }
664         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
665         return v.translateError(err)
666 }
667
668 // Status returns a *VolumeStatus representing the current in-use
669 // storage capacity and a fake available capacity that doesn't make
670 // the volume seem full or nearly-full.
671 func (v *S3Volume) Status() *VolumeStatus {
672         return &VolumeStatus{
673                 DeviceNum: 1,
674                 BytesFree: BlockSize * 1000,
675                 BytesUsed: 1,
676         }
677 }
678
679 // InternalStats returns bucket I/O and API call counters.
680 func (v *S3Volume) InternalStats() interface{} {
681         return &v.bucket.stats
682 }
683
684 // String implements fmt.Stringer.
685 func (v *S3Volume) String() string {
686         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
687 }
688
689 // Writable returns false if all future Put, Mtime, and Delete calls
690 // are expected to fail.
691 func (v *S3Volume) Writable() bool {
692         return !v.ReadOnly
693 }
694
695 // Replication returns the storage redundancy of the underlying
696 // device. Configured via command line flag.
697 func (v *S3Volume) Replication() int {
698         return v.S3Replication
699 }
700
701 // GetStorageClasses implements Volume
702 func (v *S3Volume) GetStorageClasses() []string {
703         return v.StorageClasses
704 }
705
706 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
707
708 func (v *S3Volume) isKeepBlock(s string) bool {
709         return s3KeepBlockRegexp.MatchString(s)
710 }
711
712 // fixRace(X) is called when "recent/X" exists but "X" doesn't
713 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
714 // there was a race between Put and Trash, fixRace recovers from the
715 // race by Untrashing the block.
716 func (v *S3Volume) fixRace(loc string) bool {
717         trash, err := v.bucket.Head("trash/"+loc, nil)
718         if err != nil {
719                 if !os.IsNotExist(v.translateError(err)) {
720                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
721                 }
722                 return false
723         }
724         trashTime, err := v.lastModified(trash)
725         if err != nil {
726                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
727                 return false
728         }
729
730         recent, err := v.bucket.Head("recent/"+loc, nil)
731         if err != nil {
732                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
733                 return false
734         }
735         recentTime, err := v.lastModified(recent)
736         if err != nil {
737                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
738                 return false
739         }
740
741         ageWhenTrashed := trashTime.Sub(recentTime)
742         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
743                 // No evidence of a race: block hasn't been written
744                 // since it became eligible for Trash. No fix needed.
745                 return false
746         }
747
748         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
749         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
750         err = v.safeCopy(loc, "trash/"+loc)
751         if err != nil {
752                 log.Printf("error: fixRace: %s", err)
753                 return false
754         }
755         return true
756 }
757
758 func (v *S3Volume) translateError(err error) error {
759         switch err := err.(type) {
760         case *s3.Error:
761                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
762                         strings.Contains(err.Error(), "Not Found") {
763                         return os.ErrNotExist
764                 }
765                 // Other 404 errors like NoSuchVersion and
766                 // NoSuchBucket are different problems which should
767                 // get called out downstream, so we don't convert them
768                 // to os.ErrNotExist.
769         }
770         return err
771 }
772
773 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
774 // and deletes them from the volume.
775 func (v *S3Volume) EmptyTrash() {
776         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
777
778         // Define "ready to delete" as "...when EmptyTrash started".
779         startT := time.Now()
780
781         emptyOneKey := func(trash *s3.Key) {
782                 loc := trash.Key[6:]
783                 if !v.isKeepBlock(loc) {
784                         return
785                 }
786                 atomic.AddInt64(&bytesInTrash, trash.Size)
787                 atomic.AddInt64(&blocksInTrash, 1)
788
789                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
790                 if err != nil {
791                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
792                         return
793                 }
794                 recent, err := v.bucket.Head("recent/"+loc, nil)
795                 if err != nil && os.IsNotExist(v.translateError(err)) {
796                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
797                         err = v.Untrash(loc)
798                         if err != nil {
799                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
800                         }
801                         return
802                 } else if err != nil {
803                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
804                         return
805                 }
806                 recentT, err := v.lastModified(recent)
807                 if err != nil {
808                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
809                         return
810                 }
811                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
812                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
813                                 // recent/loc is too old to protect
814                                 // loc from being Trashed again during
815                                 // the raceWindow that starts if we
816                                 // delete trash/X now.
817                                 //
818                                 // Note this means (TrashCheckInterval
819                                 // < BlobSignatureTTL - raceWindow) is
820                                 // necessary to avoid starvation.
821                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
822                                 v.fixRace(loc)
823                                 v.Touch(loc)
824                                 return
825                         }
826                         _, err := v.bucket.Head(loc, nil)
827                         if os.IsNotExist(err) {
828                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
829                                 v.fixRace(loc)
830                                 return
831                         } else if err != nil {
832                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
833                                 return
834                         }
835                 }
836                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
837                         return
838                 }
839                 err = v.bucket.Del(trash.Key)
840                 if err != nil {
841                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
842                         return
843                 }
844                 atomic.AddInt64(&bytesDeleted, trash.Size)
845                 atomic.AddInt64(&blocksDeleted, 1)
846
847                 _, err = v.bucket.Head(loc, nil)
848                 if err == nil {
849                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
850                         return
851                 }
852                 if !os.IsNotExist(v.translateError(err)) {
853                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
854                         return
855                 }
856                 err = v.bucket.Del("recent/" + loc)
857                 if err != nil {
858                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
859                 }
860         }
861
862         var wg sync.WaitGroup
863         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
864         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
865                 wg.Add(1)
866                 go func() {
867                         defer wg.Done()
868                         for key := range todo {
869                                 emptyOneKey(key)
870                         }
871                 }()
872         }
873
874         trashL := s3Lister{
875                 Bucket:   v.bucket.Bucket,
876                 Prefix:   "trash/",
877                 PageSize: v.IndexPageSize,
878                 Stats:    &v.bucket.stats,
879         }
880         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
881                 todo <- trash
882         }
883         close(todo)
884         wg.Wait()
885
886         if err := trashL.Error(); err != nil {
887                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
888         }
889         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
890 }
891
892 type s3Lister struct {
893         Bucket     *s3.Bucket
894         Prefix     string
895         PageSize   int
896         Stats      *s3bucketStats
897         nextMarker string
898         buf        []s3.Key
899         err        error
900 }
901
902 // First fetches the first page and returns the first item. It returns
903 // nil if the response is the empty set or an error occurs.
904 func (lister *s3Lister) First() *s3.Key {
905         lister.getPage()
906         return lister.pop()
907 }
908
909 // Next returns the next item, fetching the next page if necessary. It
910 // returns nil if the last available item has already been fetched, or
911 // an error occurs.
912 func (lister *s3Lister) Next() *s3.Key {
913         if len(lister.buf) == 0 && lister.nextMarker != "" {
914                 lister.getPage()
915         }
916         return lister.pop()
917 }
918
919 // Return the most recent error encountered by First or Next.
920 func (lister *s3Lister) Error() error {
921         return lister.err
922 }
923
924 func (lister *s3Lister) getPage() {
925         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
926         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
927         lister.nextMarker = ""
928         if err != nil {
929                 lister.err = err
930                 return
931         }
932         if resp.IsTruncated {
933                 lister.nextMarker = resp.NextMarker
934         }
935         lister.buf = make([]s3.Key, 0, len(resp.Contents))
936         for _, key := range resp.Contents {
937                 if !strings.HasPrefix(key.Key, lister.Prefix) {
938                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
939                         continue
940                 }
941                 lister.buf = append(lister.buf, key)
942         }
943 }
944
945 func (lister *s3Lister) pop() (k *s3.Key) {
946         if len(lister.buf) > 0 {
947                 k = &lister.buf[0]
948                 lister.buf = lister.buf[1:]
949         }
950         return
951 }
952
953 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
954 type s3bucket struct {
955         *s3.Bucket
956         stats s3bucketStats
957 }
958
959 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
960         rdr, err := b.Bucket.GetReader(path)
961         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
962         b.stats.TickErr(err)
963         return NewCountingReader(rdr, b.stats.TickInBytes), err
964 }
965
966 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
967         resp, err := b.Bucket.Head(path, headers)
968         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
969         b.stats.TickErr(err)
970         return resp, err
971 }
972
973 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
974         if length == 0 {
975                 // goamz will only send Content-Length: 0 when reader
976                 // is nil due to net.http.Request.ContentLength
977                 // behavior.  Otherwise, Content-Length header is
978                 // omitted which will cause some S3 services
979                 // (including AWS and Ceph RadosGW) to fail to create
980                 // empty objects.
981                 r = nil
982         } else {
983                 r = NewCountingReader(r, b.stats.TickOutBytes)
984         }
985         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
986         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
987         b.stats.TickErr(err)
988         return err
989 }
990
991 func (b *s3bucket) Del(path string) error {
992         err := b.Bucket.Del(path)
993         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
994         b.stats.TickErr(err)
995         return err
996 }
997
998 type s3bucketStats struct {
999         statsTicker
1000         Ops     uint64
1001         GetOps  uint64
1002         PutOps  uint64
1003         HeadOps uint64
1004         DelOps  uint64
1005         ListOps uint64
1006 }
1007
1008 func (s *s3bucketStats) TickErr(err error) {
1009         if err == nil {
1010                 return
1011         }
1012         errType := fmt.Sprintf("%T", err)
1013         if err, ok := err.(*s3.Error); ok {
1014                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1015         }
1016         s.statsTicker.TickErr(err, errType)
1017 }