Merge branch '14285-keep-balance-metrics'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "github.com/AdRoll/goamz/aws"
26         "github.com/AdRoll/goamz/s3"
27 )
28
29 const (
30         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
31         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
32 )
33
34 var (
35         // ErrS3TrashDisabled is returned by Trash if that operation
36         // is impossible with the current config.
37         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
38
39         s3AccessKeyFile string
40         s3SecretKeyFile string
41         s3RegionName    string
42         s3Endpoint      string
43         s3Replication   int
44         s3UnsafeDelete  bool
45         s3RaceWindow    time.Duration
46
47         s3ACL = s3.Private
48
49         zeroTime time.Time
50 )
51
52 const (
53         maxClockSkew  = 600 * time.Second
54         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
55 )
56
57 type s3VolumeAdder struct {
58         *Config
59 }
60
61 // String implements flag.Value
62 func (s *s3VolumeAdder) String() string {
63         return "-"
64 }
65
66 func (s *s3VolumeAdder) Set(bucketName string) error {
67         if bucketName == "" {
68                 return fmt.Errorf("no container name given")
69         }
70         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
71                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
72         }
73         if deprecated.flagSerializeIO {
74                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
75         }
76         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
77                 Bucket:        bucketName,
78                 AccessKeyFile: s3AccessKeyFile,
79                 SecretKeyFile: s3SecretKeyFile,
80                 Endpoint:      s3Endpoint,
81                 Region:        s3RegionName,
82                 RaceWindow:    arvados.Duration(s3RaceWindow),
83                 S3Replication: s3Replication,
84                 UnsafeDelete:  s3UnsafeDelete,
85                 ReadOnly:      deprecated.flagReadonly,
86                 IndexPageSize: 1000,
87         })
88         return nil
89 }
90
91 func s3regions() (okList []string) {
92         for r := range aws.Regions {
93                 okList = append(okList, r)
94         }
95         return
96 }
97
98 func init() {
99         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
100
101         flag.Var(&s3VolumeAdder{theConfig},
102                 "s3-bucket-volume",
103                 "Use the given bucket as a storage volume. Can be given multiple times.")
104         flag.StringVar(
105                 &s3RegionName,
106                 "s3-region",
107                 "",
108                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
109         flag.StringVar(
110                 &s3Endpoint,
111                 "s3-endpoint",
112                 "",
113                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
114         flag.StringVar(
115                 &s3AccessKeyFile,
116                 "s3-access-key-file",
117                 "",
118                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
119         flag.StringVar(
120                 &s3SecretKeyFile,
121                 "s3-secret-key-file",
122                 "",
123                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
124         flag.DurationVar(
125                 &s3RaceWindow,
126                 "s3-race-window",
127                 24*time.Hour,
128                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
129         flag.IntVar(
130                 &s3Replication,
131                 "s3-replication",
132                 2,
133                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
134         flag.BoolVar(
135                 &s3UnsafeDelete,
136                 "s3-unsafe-delete",
137                 false,
138                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
139 }
140
141 // S3Volume implements Volume using an S3 bucket.
142 type S3Volume struct {
143         AccessKeyFile      string
144         SecretKeyFile      string
145         Endpoint           string
146         Region             string
147         Bucket             string
148         LocationConstraint bool
149         IndexPageSize      int
150         S3Replication      int
151         ConnectTimeout     arvados.Duration
152         ReadTimeout        arvados.Duration
153         RaceWindow         arvados.Duration
154         ReadOnly           bool
155         UnsafeDelete       bool
156         StorageClasses     []string
157
158         bucket *s3bucket
159
160         startOnce sync.Once
161 }
162
163 // Examples implements VolumeWithExamples.
164 func (*S3Volume) Examples() []Volume {
165         return []Volume{
166                 &S3Volume{
167                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
168                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
169                         Endpoint:       "",
170                         Region:         "us-east-1",
171                         Bucket:         "example-bucket-name",
172                         IndexPageSize:  1000,
173                         S3Replication:  2,
174                         RaceWindow:     arvados.Duration(24 * time.Hour),
175                         ConnectTimeout: arvados.Duration(time.Minute),
176                         ReadTimeout:    arvados.Duration(5 * time.Minute),
177                 },
178                 &S3Volume{
179                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
180                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
181                         Endpoint:       "https://storage.googleapis.com",
182                         Region:         "",
183                         Bucket:         "example-bucket-name",
184                         IndexPageSize:  1000,
185                         S3Replication:  2,
186                         RaceWindow:     arvados.Duration(24 * time.Hour),
187                         ConnectTimeout: arvados.Duration(time.Minute),
188                         ReadTimeout:    arvados.Duration(5 * time.Minute),
189                 },
190         }
191 }
192
193 // Type implements Volume.
194 func (*S3Volume) Type() string {
195         return "S3"
196 }
197
198 // Start populates private fields and verifies the configuration is
199 // valid.
200 func (v *S3Volume) Start() error {
201         region, ok := aws.Regions[v.Region]
202         if v.Endpoint == "" {
203                 if !ok {
204                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
205                 }
206         } else if ok {
207                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
208                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
209         } else {
210                 region = aws.Region{
211                         Name:                 v.Region,
212                         S3Endpoint:           v.Endpoint,
213                         S3LocationConstraint: v.LocationConstraint,
214                 }
215         }
216
217         var err error
218         var auth aws.Auth
219         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
220         if err != nil {
221                 return err
222         }
223         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
224         if err != nil {
225                 return err
226         }
227
228         // Zero timeouts mean "wait forever", which is a bad
229         // default. Default to long timeouts instead.
230         if v.ConnectTimeout == 0 {
231                 v.ConnectTimeout = s3DefaultConnectTimeout
232         }
233         if v.ReadTimeout == 0 {
234                 v.ReadTimeout = s3DefaultReadTimeout
235         }
236
237         client := s3.New(auth, region)
238         if region.EC2Endpoint.Signer == aws.V4Signature {
239                 // Currently affects only eu-central-1
240                 client.Signature = aws.V4Signature
241         }
242         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
243         client.ReadTimeout = time.Duration(v.ReadTimeout)
244         v.bucket = &s3bucket{
245                 Bucket: &s3.Bucket{
246                         S3:   client,
247                         Name: v.Bucket,
248                 },
249         }
250         return nil
251 }
252
253 // DeviceID returns a globally unique ID for the storage bucket.
254 func (v *S3Volume) DeviceID() string {
255         return "s3://" + v.Endpoint + "/" + v.Bucket
256 }
257
258 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
259         ready := make(chan bool)
260         go func() {
261                 rdr, err = v.getReader(loc)
262                 close(ready)
263         }()
264         select {
265         case <-ready:
266                 return
267         case <-ctx.Done():
268                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
269                 go func() {
270                         <-ready
271                         if err == nil {
272                                 rdr.Close()
273                         }
274                 }()
275                 return nil, ctx.Err()
276         }
277 }
278
279 // getReader wraps (Bucket)GetReader.
280 //
281 // In situations where (Bucket)GetReader would fail because the block
282 // disappeared in a Trash race, getReader calls fixRace to recover the
283 // data, and tries again.
284 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
285         rdr, err = v.bucket.GetReader(loc)
286         err = v.translateError(err)
287         if err == nil || !os.IsNotExist(err) {
288                 return
289         }
290
291         _, err = v.bucket.Head("recent/"+loc, nil)
292         err = v.translateError(err)
293         if err != nil {
294                 // If we can't read recent/X, there's no point in
295                 // trying fixRace. Give up.
296                 return
297         }
298         if !v.fixRace(loc) {
299                 err = os.ErrNotExist
300                 return
301         }
302
303         rdr, err = v.bucket.GetReader(loc)
304         if err != nil {
305                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
306                 err = v.translateError(err)
307         }
308         return
309 }
310
311 // Get a block: copy the block data into buf, and return the number of
312 // bytes copied.
313 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
314         rdr, err := v.getReaderWithContext(ctx, loc)
315         if err != nil {
316                 return 0, err
317         }
318
319         var n int
320         ready := make(chan bool)
321         go func() {
322                 defer close(ready)
323
324                 defer rdr.Close()
325                 n, err = io.ReadFull(rdr, buf)
326
327                 switch err {
328                 case nil, io.EOF, io.ErrUnexpectedEOF:
329                         err = nil
330                 default:
331                         err = v.translateError(err)
332                 }
333         }()
334         select {
335         case <-ctx.Done():
336                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
337                 rdr.Close()
338                 // Must wait for ReadFull to return, to ensure it
339                 // doesn't write to buf after we return.
340                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
341                 <-ready
342                 return 0, ctx.Err()
343         case <-ready:
344                 return n, err
345         }
346 }
347
348 // Compare the given data with the stored data.
349 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
350         errChan := make(chan error, 1)
351         go func() {
352                 _, err := v.bucket.Head("recent/"+loc, nil)
353                 errChan <- err
354         }()
355         var err error
356         select {
357         case <-ctx.Done():
358                 return ctx.Err()
359         case err = <-errChan:
360         }
361         if err != nil {
362                 // Checking for "loc" itself here would interfere with
363                 // future GET requests.
364                 //
365                 // On AWS, if X doesn't exist, a HEAD or GET request
366                 // for X causes X's non-existence to be cached. Thus,
367                 // if we test for X, then create X and return a
368                 // signature to our client, the client might still get
369                 // 404 from all keepstores when trying to read it.
370                 //
371                 // To avoid this, we avoid doing HEAD X or GET X until
372                 // we know X has been written.
373                 //
374                 // Note that X might exist even though recent/X
375                 // doesn't: for example, the response to HEAD recent/X
376                 // might itself come from a stale cache. In such
377                 // cases, we will return a false negative and
378                 // PutHandler might needlessly create another replica
379                 // on a different volume. That's not ideal, but it's
380                 // better than passing the eventually-consistent
381                 // problem on to our clients.
382                 return v.translateError(err)
383         }
384         rdr, err := v.getReaderWithContext(ctx, loc)
385         if err != nil {
386                 return err
387         }
388         defer rdr.Close()
389         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
390 }
391
392 // Put writes a block.
393 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
394         if v.ReadOnly {
395                 return MethodDisabledError
396         }
397         var opts s3.Options
398         size := len(block)
399         if size > 0 {
400                 md5, err := hex.DecodeString(loc)
401                 if err != nil {
402                         return err
403                 }
404                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
405         }
406
407         // Send the block data through a pipe, so that (if we need to)
408         // we can close the pipe early and abandon our PutReader()
409         // goroutine, without worrying about PutReader() accessing our
410         // block buffer after we release it.
411         bufr, bufw := io.Pipe()
412         go func() {
413                 io.Copy(bufw, bytes.NewReader(block))
414                 bufw.Close()
415         }()
416
417         var err error
418         ready := make(chan bool)
419         go func() {
420                 defer func() {
421                         if ctx.Err() != nil {
422                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
423                         }
424                 }()
425                 defer close(ready)
426                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
427                 if err != nil {
428                         return
429                 }
430                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
431         }()
432         select {
433         case <-ctx.Done():
434                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
435                 // Our pipe might be stuck in Write(), waiting for
436                 // PutReader() to read. If so, un-stick it. This means
437                 // PutReader will get corrupt data, but that's OK: the
438                 // size and MD5 won't match, so the write will fail.
439                 go io.Copy(ioutil.Discard, bufr)
440                 // CloseWithError() will return once pending I/O is done.
441                 bufw.CloseWithError(ctx.Err())
442                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
443                 return ctx.Err()
444         case <-ready:
445                 // Unblock pipe in case PutReader did not consume it.
446                 io.Copy(ioutil.Discard, bufr)
447                 return v.translateError(err)
448         }
449 }
450
451 // Touch sets the timestamp for the given locator to the current time.
452 func (v *S3Volume) Touch(loc string) error {
453         if v.ReadOnly {
454                 return MethodDisabledError
455         }
456         _, err := v.bucket.Head(loc, nil)
457         err = v.translateError(err)
458         if os.IsNotExist(err) && v.fixRace(loc) {
459                 // The data object got trashed in a race, but fixRace
460                 // rescued it.
461         } else if err != nil {
462                 return err
463         }
464         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
465         return v.translateError(err)
466 }
467
468 // Mtime returns the stored timestamp for the given locator.
469 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
470         _, err := v.bucket.Head(loc, nil)
471         if err != nil {
472                 return zeroTime, v.translateError(err)
473         }
474         resp, err := v.bucket.Head("recent/"+loc, nil)
475         err = v.translateError(err)
476         if os.IsNotExist(err) {
477                 // The data object X exists, but recent/X is missing.
478                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
479                 if err != nil {
480                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
481                         return zeroTime, v.translateError(err)
482                 }
483                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
484                 resp, err = v.bucket.Head("recent/"+loc, nil)
485                 if err != nil {
486                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
487                         return zeroTime, v.translateError(err)
488                 }
489         } else if err != nil {
490                 // HEAD recent/X failed for some other reason.
491                 return zeroTime, err
492         }
493         return v.lastModified(resp)
494 }
495
496 // IndexTo writes a complete list of locators with the given prefix
497 // for which Get() can retrieve data.
498 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
499         // Use a merge sort to find matching sets of X and recent/X.
500         dataL := s3Lister{
501                 Bucket:   v.bucket.Bucket,
502                 Prefix:   prefix,
503                 PageSize: v.IndexPageSize,
504         }
505         recentL := s3Lister{
506                 Bucket:   v.bucket.Bucket,
507                 Prefix:   "recent/" + prefix,
508                 PageSize: v.IndexPageSize,
509         }
510         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
511         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
512         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
513                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
514                 if data.Key >= "g" {
515                         // Conveniently, "recent/*" and "trash/*" are
516                         // lexically greater than all hex-encoded data
517                         // hashes, so stopping here avoids iterating
518                         // over all of them needlessly with dataL.
519                         break
520                 }
521                 if !v.isKeepBlock(data.Key) {
522                         continue
523                 }
524
525                 // stamp is the list entry we should use to report the
526                 // last-modified time for this data block: it will be
527                 // the recent/X entry if one exists, otherwise the
528                 // entry for the data block itself.
529                 stamp := data
530
531                 // Advance to the corresponding recent/X marker, if any
532                 for recent != nil {
533                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
534                                 recent = recentL.Next()
535                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
536                                 continue
537                         } else if cmp == 0 {
538                                 stamp = recent
539                                 recent = recentL.Next()
540                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
541                                 break
542                         } else {
543                                 // recent/X marker is missing: we'll
544                                 // use the timestamp on the data
545                                 // object.
546                                 break
547                         }
548                 }
549                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
550                 if err != nil {
551                         return err
552                 }
553                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
554         }
555         return nil
556 }
557
558 // Trash a Keep block.
559 func (v *S3Volume) Trash(loc string) error {
560         if v.ReadOnly {
561                 return MethodDisabledError
562         }
563         if t, err := v.Mtime(loc); err != nil {
564                 return err
565         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
566                 return nil
567         }
568         if theConfig.TrashLifetime == 0 {
569                 if !s3UnsafeDelete {
570                         return ErrS3TrashDisabled
571                 }
572                 return v.translateError(v.bucket.Del(loc))
573         }
574         err := v.checkRaceWindow(loc)
575         if err != nil {
576                 return err
577         }
578         err = v.safeCopy("trash/"+loc, loc)
579         if err != nil {
580                 return err
581         }
582         return v.translateError(v.bucket.Del(loc))
583 }
584
585 // checkRaceWindow returns a non-nil error if trash/loc is, or might
586 // be, in the race window (i.e., it's not safe to trash loc).
587 func (v *S3Volume) checkRaceWindow(loc string) error {
588         resp, err := v.bucket.Head("trash/"+loc, nil)
589         err = v.translateError(err)
590         if os.IsNotExist(err) {
591                 // OK, trash/X doesn't exist so we're not in the race
592                 // window
593                 return nil
594         } else if err != nil {
595                 // Error looking up trash/X. We don't know whether
596                 // we're in the race window
597                 return err
598         }
599         t, err := v.lastModified(resp)
600         if err != nil {
601                 // Can't parse timestamp
602                 return err
603         }
604         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
605         if safeWindow <= 0 {
606                 // We can't count on "touch trash/X" to prolong
607                 // trash/X's lifetime. The new timestamp might not
608                 // become visible until now+raceWindow, and EmptyTrash
609                 // is allowed to delete trash/X before then.
610                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
611         }
612         // trash/X exists, but it won't be eligible for deletion until
613         // after now+raceWindow, so it's safe to overwrite it.
614         return nil
615 }
616
617 // safeCopy calls PutCopy, and checks the response to make sure the
618 // copy succeeded and updated the timestamp on the destination object
619 // (PutCopy returns 200 OK if the request was received, even if the
620 // copy failed).
621 func (v *S3Volume) safeCopy(dst, src string) error {
622         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
623                 ContentType:       "application/octet-stream",
624                 MetadataDirective: "REPLACE",
625         }, v.bucket.Name+"/"+src)
626         err = v.translateError(err)
627         if err != nil {
628                 return err
629         }
630         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
631                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
632         } else if time.Now().Sub(t) > maxClockSkew {
633                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
634         }
635         return nil
636 }
637
638 // Get the LastModified header from resp, and parse it as RFC1123 or
639 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
640 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
641         s := resp.Header.Get("Last-Modified")
642         t, err = time.Parse(time.RFC1123, s)
643         if err != nil && s != "" {
644                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
645                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
646                 // as required by HTTP spec. If it's not a valid HTTP
647                 // header value, it's probably AWS (or s3test) giving
648                 // us a nearly-RFC1123 timestamp.
649                 t, err = time.Parse(nearlyRFC1123, s)
650         }
651         return
652 }
653
654 // Untrash moves block from trash back into store
655 func (v *S3Volume) Untrash(loc string) error {
656         err := v.safeCopy(loc, "trash/"+loc)
657         if err != nil {
658                 return err
659         }
660         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
661         return v.translateError(err)
662 }
663
664 // Status returns a *VolumeStatus representing the current in-use
665 // storage capacity and a fake available capacity that doesn't make
666 // the volume seem full or nearly-full.
667 func (v *S3Volume) Status() *VolumeStatus {
668         return &VolumeStatus{
669                 DeviceNum: 1,
670                 BytesFree: BlockSize * 1000,
671                 BytesUsed: 1,
672         }
673 }
674
675 // InternalStats returns bucket I/O and API call counters.
676 func (v *S3Volume) InternalStats() interface{} {
677         return &v.bucket.stats
678 }
679
680 // String implements fmt.Stringer.
681 func (v *S3Volume) String() string {
682         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
683 }
684
685 // Writable returns false if all future Put, Mtime, and Delete calls
686 // are expected to fail.
687 func (v *S3Volume) Writable() bool {
688         return !v.ReadOnly
689 }
690
691 // Replication returns the storage redundancy of the underlying
692 // device. Configured via command line flag.
693 func (v *S3Volume) Replication() int {
694         return v.S3Replication
695 }
696
697 // GetStorageClasses implements Volume
698 func (v *S3Volume) GetStorageClasses() []string {
699         return v.StorageClasses
700 }
701
702 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
703
704 func (v *S3Volume) isKeepBlock(s string) bool {
705         return s3KeepBlockRegexp.MatchString(s)
706 }
707
708 // fixRace(X) is called when "recent/X" exists but "X" doesn't
709 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
710 // there was a race between Put and Trash, fixRace recovers from the
711 // race by Untrashing the block.
712 func (v *S3Volume) fixRace(loc string) bool {
713         trash, err := v.bucket.Head("trash/"+loc, nil)
714         if err != nil {
715                 if !os.IsNotExist(v.translateError(err)) {
716                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
717                 }
718                 return false
719         }
720         trashTime, err := v.lastModified(trash)
721         if err != nil {
722                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
723                 return false
724         }
725
726         recent, err := v.bucket.Head("recent/"+loc, nil)
727         if err != nil {
728                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
729                 return false
730         }
731         recentTime, err := v.lastModified(recent)
732         if err != nil {
733                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
734                 return false
735         }
736
737         ageWhenTrashed := trashTime.Sub(recentTime)
738         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
739                 // No evidence of a race: block hasn't been written
740                 // since it became eligible for Trash. No fix needed.
741                 return false
742         }
743
744         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
745         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
746         err = v.safeCopy(loc, "trash/"+loc)
747         if err != nil {
748                 log.Printf("error: fixRace: %s", err)
749                 return false
750         }
751         return true
752 }
753
754 func (v *S3Volume) translateError(err error) error {
755         switch err := err.(type) {
756         case *s3.Error:
757                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
758                         strings.Contains(err.Error(), "Not Found") {
759                         return os.ErrNotExist
760                 }
761                 // Other 404 errors like NoSuchVersion and
762                 // NoSuchBucket are different problems which should
763                 // get called out downstream, so we don't convert them
764                 // to os.ErrNotExist.
765         }
766         return err
767 }
768
769 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
770 // and deletes them from the volume.
771 func (v *S3Volume) EmptyTrash() {
772         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
773
774         // Define "ready to delete" as "...when EmptyTrash started".
775         startT := time.Now()
776
777         emptyOneKey := func(trash *s3.Key) {
778                 loc := trash.Key[6:]
779                 if !v.isKeepBlock(loc) {
780                         return
781                 }
782                 atomic.AddInt64(&bytesInTrash, trash.Size)
783                 atomic.AddInt64(&blocksInTrash, 1)
784
785                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
786                 if err != nil {
787                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
788                         return
789                 }
790                 recent, err := v.bucket.Head("recent/"+loc, nil)
791                 if err != nil && os.IsNotExist(v.translateError(err)) {
792                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
793                         err = v.Untrash(loc)
794                         if err != nil {
795                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
796                         }
797                         return
798                 } else if err != nil {
799                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
800                         return
801                 }
802                 recentT, err := v.lastModified(recent)
803                 if err != nil {
804                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
805                         return
806                 }
807                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
808                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
809                                 // recent/loc is too old to protect
810                                 // loc from being Trashed again during
811                                 // the raceWindow that starts if we
812                                 // delete trash/X now.
813                                 //
814                                 // Note this means (TrashCheckInterval
815                                 // < BlobSignatureTTL - raceWindow) is
816                                 // necessary to avoid starvation.
817                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
818                                 v.fixRace(loc)
819                                 v.Touch(loc)
820                                 return
821                         }
822                         _, err := v.bucket.Head(loc, nil)
823                         if os.IsNotExist(err) {
824                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
825                                 v.fixRace(loc)
826                                 return
827                         } else if err != nil {
828                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
829                                 return
830                         }
831                 }
832                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
833                         return
834                 }
835                 err = v.bucket.Del(trash.Key)
836                 if err != nil {
837                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
838                         return
839                 }
840                 atomic.AddInt64(&bytesDeleted, trash.Size)
841                 atomic.AddInt64(&blocksDeleted, 1)
842
843                 _, err = v.bucket.Head(loc, nil)
844                 if err == nil {
845                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
846                         return
847                 }
848                 if !os.IsNotExist(v.translateError(err)) {
849                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
850                         return
851                 }
852                 err = v.bucket.Del("recent/" + loc)
853                 if err != nil {
854                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
855                 }
856         }
857
858         var wg sync.WaitGroup
859         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
860         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
861                 wg.Add(1)
862                 go func() {
863                         defer wg.Done()
864                         for key := range todo {
865                                 emptyOneKey(key)
866                         }
867                 }()
868         }
869
870         trashL := s3Lister{
871                 Bucket:   v.bucket.Bucket,
872                 Prefix:   "trash/",
873                 PageSize: v.IndexPageSize,
874         }
875         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
876                 todo <- trash
877         }
878         close(todo)
879         wg.Wait()
880
881         if err := trashL.Error(); err != nil {
882                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
883         }
884         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
885 }
886
887 type s3Lister struct {
888         Bucket     *s3.Bucket
889         Prefix     string
890         PageSize   int
891         nextMarker string
892         buf        []s3.Key
893         err        error
894 }
895
896 // First fetches the first page and returns the first item. It returns
897 // nil if the response is the empty set or an error occurs.
898 func (lister *s3Lister) First() *s3.Key {
899         lister.getPage()
900         return lister.pop()
901 }
902
903 // Next returns the next item, fetching the next page if necessary. It
904 // returns nil if the last available item has already been fetched, or
905 // an error occurs.
906 func (lister *s3Lister) Next() *s3.Key {
907         if len(lister.buf) == 0 && lister.nextMarker != "" {
908                 lister.getPage()
909         }
910         return lister.pop()
911 }
912
913 // Return the most recent error encountered by First or Next.
914 func (lister *s3Lister) Error() error {
915         return lister.err
916 }
917
918 func (lister *s3Lister) getPage() {
919         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
920         lister.nextMarker = ""
921         if err != nil {
922                 lister.err = err
923                 return
924         }
925         if resp.IsTruncated {
926                 lister.nextMarker = resp.NextMarker
927         }
928         lister.buf = make([]s3.Key, 0, len(resp.Contents))
929         for _, key := range resp.Contents {
930                 if !strings.HasPrefix(key.Key, lister.Prefix) {
931                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
932                         continue
933                 }
934                 lister.buf = append(lister.buf, key)
935         }
936 }
937
938 func (lister *s3Lister) pop() (k *s3.Key) {
939         if len(lister.buf) > 0 {
940                 k = &lister.buf[0]
941                 lister.buf = lister.buf[1:]
942         }
943         return
944 }
945
946 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
947 type s3bucket struct {
948         *s3.Bucket
949         stats s3bucketStats
950 }
951
952 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
953         rdr, err := b.Bucket.GetReader(path)
954         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
955         b.stats.TickErr(err)
956         return NewCountingReader(rdr, b.stats.TickInBytes), err
957 }
958
959 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
960         resp, err := b.Bucket.Head(path, headers)
961         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
962         b.stats.TickErr(err)
963         return resp, err
964 }
965
966 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
967         if length == 0 {
968                 // goamz will only send Content-Length: 0 when reader
969                 // is nil due to net.http.Request.ContentLength
970                 // behavior.  Otherwise, Content-Length header is
971                 // omitted which will cause some S3 services
972                 // (including AWS and Ceph RadosGW) to fail to create
973                 // empty objects.
974                 r = nil
975         } else {
976                 r = NewCountingReader(r, b.stats.TickOutBytes)
977         }
978         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
979         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
980         b.stats.TickErr(err)
981         return err
982 }
983
984 func (b *s3bucket) Del(path string) error {
985         err := b.Bucket.Del(path)
986         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
987         b.stats.TickErr(err)
988         return err
989 }
990
991 type s3bucketStats struct {
992         statsTicker
993         Ops     uint64
994         GetOps  uint64
995         PutOps  uint64
996         HeadOps uint64
997         DelOps  uint64
998         ListOps uint64
999 }
1000
1001 func (s *s3bucketStats) TickErr(err error) {
1002         if err == nil {
1003                 return
1004         }
1005         errType := fmt.Sprintf("%T", err)
1006         if err, ok := err.(*s3.Error); ok {
1007                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1008         }
1009         s.statsTicker.TickErr(err, errType)
1010 }