12715: Rename flag !reading => alwaysReadEOF.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/AdRoll/goamz/aws"
25         "github.com/AdRoll/goamz/s3"
26         log "github.com/Sirupsen/logrus"
27 )
28
29 const (
30         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
31         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
32 )
33
34 var (
35         // ErrS3TrashDisabled is returned by Trash if that operation
36         // is impossible with the current config.
37         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
38
39         s3AccessKeyFile string
40         s3SecretKeyFile string
41         s3RegionName    string
42         s3Endpoint      string
43         s3Replication   int
44         s3UnsafeDelete  bool
45         s3RaceWindow    time.Duration
46
47         s3ACL = s3.Private
48
49         zeroTime time.Time
50 )
51
52 const (
53         maxClockSkew  = 600 * time.Second
54         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
55 )
56
57 type s3VolumeAdder struct {
58         *Config
59 }
60
61 // String implements flag.Value
62 func (s *s3VolumeAdder) String() string {
63         return "-"
64 }
65
66 func (s *s3VolumeAdder) Set(bucketName string) error {
67         if bucketName == "" {
68                 return fmt.Errorf("no container name given")
69         }
70         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
71                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
72         }
73         if deprecated.flagSerializeIO {
74                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
75         }
76         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
77                 Bucket:        bucketName,
78                 AccessKeyFile: s3AccessKeyFile,
79                 SecretKeyFile: s3SecretKeyFile,
80                 Endpoint:      s3Endpoint,
81                 Region:        s3RegionName,
82                 RaceWindow:    arvados.Duration(s3RaceWindow),
83                 S3Replication: s3Replication,
84                 UnsafeDelete:  s3UnsafeDelete,
85                 ReadOnly:      deprecated.flagReadonly,
86                 IndexPageSize: 1000,
87         })
88         return nil
89 }
90
91 func s3regions() (okList []string) {
92         for r := range aws.Regions {
93                 okList = append(okList, r)
94         }
95         return
96 }
97
98 func init() {
99         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
100
101         flag.Var(&s3VolumeAdder{theConfig},
102                 "s3-bucket-volume",
103                 "Use the given bucket as a storage volume. Can be given multiple times.")
104         flag.StringVar(
105                 &s3RegionName,
106                 "s3-region",
107                 "",
108                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
109         flag.StringVar(
110                 &s3Endpoint,
111                 "s3-endpoint",
112                 "",
113                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
114         flag.StringVar(
115                 &s3AccessKeyFile,
116                 "s3-access-key-file",
117                 "",
118                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
119         flag.StringVar(
120                 &s3SecretKeyFile,
121                 "s3-secret-key-file",
122                 "",
123                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
124         flag.DurationVar(
125                 &s3RaceWindow,
126                 "s3-race-window",
127                 24*time.Hour,
128                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
129         flag.IntVar(
130                 &s3Replication,
131                 "s3-replication",
132                 2,
133                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
134         flag.BoolVar(
135                 &s3UnsafeDelete,
136                 "s3-unsafe-delete",
137                 false,
138                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
139 }
140
141 // S3Volume implements Volume using an S3 bucket.
142 type S3Volume struct {
143         AccessKeyFile      string
144         SecretKeyFile      string
145         Endpoint           string
146         Region             string
147         Bucket             string
148         LocationConstraint bool
149         IndexPageSize      int
150         S3Replication      int
151         ConnectTimeout     arvados.Duration
152         ReadTimeout        arvados.Duration
153         RaceWindow         arvados.Duration
154         ReadOnly           bool
155         UnsafeDelete       bool
156
157         bucket *s3bucket
158
159         startOnce sync.Once
160 }
161
162 // Examples implements VolumeWithExamples.
163 func (*S3Volume) Examples() []Volume {
164         return []Volume{
165                 &S3Volume{
166                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
167                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
168                         Endpoint:       "",
169                         Region:         "us-east-1",
170                         Bucket:         "example-bucket-name",
171                         IndexPageSize:  1000,
172                         S3Replication:  2,
173                         RaceWindow:     arvados.Duration(24 * time.Hour),
174                         ConnectTimeout: arvados.Duration(time.Minute),
175                         ReadTimeout:    arvados.Duration(5 * time.Minute),
176                 },
177                 &S3Volume{
178                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
179                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
180                         Endpoint:       "https://storage.googleapis.com",
181                         Region:         "",
182                         Bucket:         "example-bucket-name",
183                         IndexPageSize:  1000,
184                         S3Replication:  2,
185                         RaceWindow:     arvados.Duration(24 * time.Hour),
186                         ConnectTimeout: arvados.Duration(time.Minute),
187                         ReadTimeout:    arvados.Duration(5 * time.Minute),
188                 },
189         }
190 }
191
192 // Type implements Volume.
193 func (*S3Volume) Type() string {
194         return "S3"
195 }
196
197 // Start populates private fields and verifies the configuration is
198 // valid.
199 func (v *S3Volume) Start() error {
200         region, ok := aws.Regions[v.Region]
201         if v.Endpoint == "" {
202                 if !ok {
203                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
204                 }
205         } else if ok {
206                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
207                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
208         } else {
209                 region = aws.Region{
210                         Name:                 v.Region,
211                         S3Endpoint:           v.Endpoint,
212                         S3LocationConstraint: v.LocationConstraint,
213                 }
214         }
215
216         var err error
217         var auth aws.Auth
218         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
219         if err != nil {
220                 return err
221         }
222         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
223         if err != nil {
224                 return err
225         }
226
227         // Zero timeouts mean "wait forever", which is a bad
228         // default. Default to long timeouts instead.
229         if v.ConnectTimeout == 0 {
230                 v.ConnectTimeout = s3DefaultConnectTimeout
231         }
232         if v.ReadTimeout == 0 {
233                 v.ReadTimeout = s3DefaultReadTimeout
234         }
235
236         client := s3.New(auth, region)
237         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
238         client.ReadTimeout = time.Duration(v.ReadTimeout)
239         v.bucket = &s3bucket{
240                 Bucket: &s3.Bucket{
241                         S3:   client,
242                         Name: v.Bucket,
243                 },
244         }
245         return nil
246 }
247
248 // DeviceID returns a globally unique ID for the storage bucket.
249 func (v *S3Volume) DeviceID() string {
250         return "s3://" + v.Endpoint + "/" + v.Bucket
251 }
252
253 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
254         ready := make(chan bool)
255         go func() {
256                 rdr, err = v.getReader(loc)
257                 close(ready)
258         }()
259         select {
260         case <-ready:
261                 return
262         case <-ctx.Done():
263                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
264                 go func() {
265                         <-ready
266                         if err == nil {
267                                 rdr.Close()
268                         }
269                 }()
270                 return nil, ctx.Err()
271         }
272 }
273
274 // getReader wraps (Bucket)GetReader.
275 //
276 // In situations where (Bucket)GetReader would fail because the block
277 // disappeared in a Trash race, getReader calls fixRace to recover the
278 // data, and tries again.
279 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
280         rdr, err = v.bucket.GetReader(loc)
281         err = v.translateError(err)
282         if err == nil || !os.IsNotExist(err) {
283                 return
284         }
285
286         _, err = v.bucket.Head("recent/"+loc, nil)
287         err = v.translateError(err)
288         if err != nil {
289                 // If we can't read recent/X, there's no point in
290                 // trying fixRace. Give up.
291                 return
292         }
293         if !v.fixRace(loc) {
294                 err = os.ErrNotExist
295                 return
296         }
297
298         rdr, err = v.bucket.GetReader(loc)
299         if err != nil {
300                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
301                 err = v.translateError(err)
302         }
303         return
304 }
305
306 // Get a block: copy the block data into buf, and return the number of
307 // bytes copied.
308 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
309         rdr, err := v.getReaderWithContext(ctx, loc)
310         if err != nil {
311                 return 0, err
312         }
313
314         var n int
315         ready := make(chan bool)
316         go func() {
317                 defer close(ready)
318
319                 defer rdr.Close()
320                 n, err = io.ReadFull(rdr, buf)
321
322                 switch err {
323                 case nil, io.EOF, io.ErrUnexpectedEOF:
324                         err = nil
325                 default:
326                         err = v.translateError(err)
327                 }
328         }()
329         select {
330         case <-ctx.Done():
331                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
332                 rdr.Close()
333                 // Must wait for ReadFull to return, to ensure it
334                 // doesn't write to buf after we return.
335                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
336                 <-ready
337                 return 0, ctx.Err()
338         case <-ready:
339                 return n, err
340         }
341 }
342
343 // Compare the given data with the stored data.
344 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
345         errChan := make(chan error, 1)
346         go func() {
347                 _, err := v.bucket.Head("recent/"+loc, nil)
348                 errChan <- err
349         }()
350         var err error
351         select {
352         case <-ctx.Done():
353                 return ctx.Err()
354         case err = <-errChan:
355         }
356         if err != nil {
357                 // Checking for "loc" itself here would interfere with
358                 // future GET requests.
359                 //
360                 // On AWS, if X doesn't exist, a HEAD or GET request
361                 // for X causes X's non-existence to be cached. Thus,
362                 // if we test for X, then create X and return a
363                 // signature to our client, the client might still get
364                 // 404 from all keepstores when trying to read it.
365                 //
366                 // To avoid this, we avoid doing HEAD X or GET X until
367                 // we know X has been written.
368                 //
369                 // Note that X might exist even though recent/X
370                 // doesn't: for example, the response to HEAD recent/X
371                 // might itself come from a stale cache. In such
372                 // cases, we will return a false negative and
373                 // PutHandler might needlessly create another replica
374                 // on a different volume. That's not ideal, but it's
375                 // better than passing the eventually-consistent
376                 // problem on to our clients.
377                 return v.translateError(err)
378         }
379         rdr, err := v.getReaderWithContext(ctx, loc)
380         if err != nil {
381                 return err
382         }
383         defer rdr.Close()
384         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
385 }
386
387 // Put writes a block.
388 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
389         if v.ReadOnly {
390                 return MethodDisabledError
391         }
392         var opts s3.Options
393         size := len(block)
394         if size > 0 {
395                 md5, err := hex.DecodeString(loc)
396                 if err != nil {
397                         return err
398                 }
399                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
400         }
401
402         // Send the block data through a pipe, so that (if we need to)
403         // we can close the pipe early and abandon our PutReader()
404         // goroutine, without worrying about PutReader() accessing our
405         // block buffer after we release it.
406         bufr, bufw := io.Pipe()
407         go func() {
408                 io.Copy(bufw, bytes.NewReader(block))
409                 bufw.Close()
410         }()
411
412         var err error
413         ready := make(chan bool)
414         go func() {
415                 defer func() {
416                         if ctx.Err() != nil {
417                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
418                         }
419                 }()
420                 defer close(ready)
421                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
422                 if err != nil {
423                         return
424                 }
425                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
426         }()
427         select {
428         case <-ctx.Done():
429                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
430                 // Our pipe might be stuck in Write(), waiting for
431                 // io.Copy() to read. If so, un-stick it. This means
432                 // PutReader will get corrupt data, but that's OK: the
433                 // size and MD5 won't match, so the write will fail.
434                 go io.Copy(ioutil.Discard, bufr)
435                 // CloseWithError() will return once pending I/O is done.
436                 bufw.CloseWithError(ctx.Err())
437                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
438                 return ctx.Err()
439         case <-ready:
440                 return v.translateError(err)
441         }
442 }
443
444 // Touch sets the timestamp for the given locator to the current time.
445 func (v *S3Volume) Touch(loc string) error {
446         if v.ReadOnly {
447                 return MethodDisabledError
448         }
449         _, err := v.bucket.Head(loc, nil)
450         err = v.translateError(err)
451         if os.IsNotExist(err) && v.fixRace(loc) {
452                 // The data object got trashed in a race, but fixRace
453                 // rescued it.
454         } else if err != nil {
455                 return err
456         }
457         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
458         return v.translateError(err)
459 }
460
461 // Mtime returns the stored timestamp for the given locator.
462 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
463         _, err := v.bucket.Head(loc, nil)
464         if err != nil {
465                 return zeroTime, v.translateError(err)
466         }
467         resp, err := v.bucket.Head("recent/"+loc, nil)
468         err = v.translateError(err)
469         if os.IsNotExist(err) {
470                 // The data object X exists, but recent/X is missing.
471                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
472                 if err != nil {
473                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
474                         return zeroTime, v.translateError(err)
475                 }
476                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
477                 resp, err = v.bucket.Head("recent/"+loc, nil)
478                 if err != nil {
479                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
480                         return zeroTime, v.translateError(err)
481                 }
482         } else if err != nil {
483                 // HEAD recent/X failed for some other reason.
484                 return zeroTime, err
485         }
486         return v.lastModified(resp)
487 }
488
489 // IndexTo writes a complete list of locators with the given prefix
490 // for which Get() can retrieve data.
491 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
492         // Use a merge sort to find matching sets of X and recent/X.
493         dataL := s3Lister{
494                 Bucket:   v.bucket.Bucket,
495                 Prefix:   prefix,
496                 PageSize: v.IndexPageSize,
497         }
498         recentL := s3Lister{
499                 Bucket:   v.bucket.Bucket,
500                 Prefix:   "recent/" + prefix,
501                 PageSize: v.IndexPageSize,
502         }
503         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
504         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
505         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
506                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
507                 if data.Key >= "g" {
508                         // Conveniently, "recent/*" and "trash/*" are
509                         // lexically greater than all hex-encoded data
510                         // hashes, so stopping here avoids iterating
511                         // over all of them needlessly with dataL.
512                         break
513                 }
514                 if !v.isKeepBlock(data.Key) {
515                         continue
516                 }
517
518                 // stamp is the list entry we should use to report the
519                 // last-modified time for this data block: it will be
520                 // the recent/X entry if one exists, otherwise the
521                 // entry for the data block itself.
522                 stamp := data
523
524                 // Advance to the corresponding recent/X marker, if any
525                 for recent != nil {
526                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
527                                 recent = recentL.Next()
528                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
529                                 continue
530                         } else if cmp == 0 {
531                                 stamp = recent
532                                 recent = recentL.Next()
533                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
534                                 break
535                         } else {
536                                 // recent/X marker is missing: we'll
537                                 // use the timestamp on the data
538                                 // object.
539                                 break
540                         }
541                 }
542                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
543                 if err != nil {
544                         return err
545                 }
546                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
547         }
548         return nil
549 }
550
551 // Trash a Keep block.
552 func (v *S3Volume) Trash(loc string) error {
553         if v.ReadOnly {
554                 return MethodDisabledError
555         }
556         if t, err := v.Mtime(loc); err != nil {
557                 return err
558         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
559                 return nil
560         }
561         if theConfig.TrashLifetime == 0 {
562                 if !s3UnsafeDelete {
563                         return ErrS3TrashDisabled
564                 }
565                 return v.translateError(v.bucket.Del(loc))
566         }
567         err := v.checkRaceWindow(loc)
568         if err != nil {
569                 return err
570         }
571         err = v.safeCopy("trash/"+loc, loc)
572         if err != nil {
573                 return err
574         }
575         return v.translateError(v.bucket.Del(loc))
576 }
577
578 // checkRaceWindow returns a non-nil error if trash/loc is, or might
579 // be, in the race window (i.e., it's not safe to trash loc).
580 func (v *S3Volume) checkRaceWindow(loc string) error {
581         resp, err := v.bucket.Head("trash/"+loc, nil)
582         err = v.translateError(err)
583         if os.IsNotExist(err) {
584                 // OK, trash/X doesn't exist so we're not in the race
585                 // window
586                 return nil
587         } else if err != nil {
588                 // Error looking up trash/X. We don't know whether
589                 // we're in the race window
590                 return err
591         }
592         t, err := v.lastModified(resp)
593         if err != nil {
594                 // Can't parse timestamp
595                 return err
596         }
597         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
598         if safeWindow <= 0 {
599                 // We can't count on "touch trash/X" to prolong
600                 // trash/X's lifetime. The new timestamp might not
601                 // become visible until now+raceWindow, and EmptyTrash
602                 // is allowed to delete trash/X before then.
603                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
604         }
605         // trash/X exists, but it won't be eligible for deletion until
606         // after now+raceWindow, so it's safe to overwrite it.
607         return nil
608 }
609
610 // safeCopy calls PutCopy, and checks the response to make sure the
611 // copy succeeded and updated the timestamp on the destination object
612 // (PutCopy returns 200 OK if the request was received, even if the
613 // copy failed).
614 func (v *S3Volume) safeCopy(dst, src string) error {
615         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
616                 ContentType:       "application/octet-stream",
617                 MetadataDirective: "REPLACE",
618         }, v.bucket.Name+"/"+src)
619         err = v.translateError(err)
620         if err != nil {
621                 return err
622         }
623         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
624                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
625         } else if time.Now().Sub(t) > maxClockSkew {
626                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
627         }
628         return nil
629 }
630
631 // Get the LastModified header from resp, and parse it as RFC1123 or
632 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
633 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
634         s := resp.Header.Get("Last-Modified")
635         t, err = time.Parse(time.RFC1123, s)
636         if err != nil && s != "" {
637                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
638                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
639                 // as required by HTTP spec. If it's not a valid HTTP
640                 // header value, it's probably AWS (or s3test) giving
641                 // us a nearly-RFC1123 timestamp.
642                 t, err = time.Parse(nearlyRFC1123, s)
643         }
644         return
645 }
646
647 // Untrash moves block from trash back into store
648 func (v *S3Volume) Untrash(loc string) error {
649         err := v.safeCopy(loc, "trash/"+loc)
650         if err != nil {
651                 return err
652         }
653         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
654         return v.translateError(err)
655 }
656
657 // Status returns a *VolumeStatus representing the current in-use
658 // storage capacity and a fake available capacity that doesn't make
659 // the volume seem full or nearly-full.
660 func (v *S3Volume) Status() *VolumeStatus {
661         return &VolumeStatus{
662                 DeviceNum: 1,
663                 BytesFree: BlockSize * 1000,
664                 BytesUsed: 1,
665         }
666 }
667
668 // InternalStats returns bucket I/O and API call counters.
669 func (v *S3Volume) InternalStats() interface{} {
670         return &v.bucket.stats
671 }
672
673 // String implements fmt.Stringer.
674 func (v *S3Volume) String() string {
675         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
676 }
677
678 // Writable returns false if all future Put, Mtime, and Delete calls
679 // are expected to fail.
680 func (v *S3Volume) Writable() bool {
681         return !v.ReadOnly
682 }
683
684 // Replication returns the storage redundancy of the underlying
685 // device. Configured via command line flag.
686 func (v *S3Volume) Replication() int {
687         return v.S3Replication
688 }
689
690 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
691
692 func (v *S3Volume) isKeepBlock(s string) bool {
693         return s3KeepBlockRegexp.MatchString(s)
694 }
695
696 // fixRace(X) is called when "recent/X" exists but "X" doesn't
697 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
698 // there was a race between Put and Trash, fixRace recovers from the
699 // race by Untrashing the block.
700 func (v *S3Volume) fixRace(loc string) bool {
701         trash, err := v.bucket.Head("trash/"+loc, nil)
702         if err != nil {
703                 if !os.IsNotExist(v.translateError(err)) {
704                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
705                 }
706                 return false
707         }
708         trashTime, err := v.lastModified(trash)
709         if err != nil {
710                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
711                 return false
712         }
713
714         recent, err := v.bucket.Head("recent/"+loc, nil)
715         if err != nil {
716                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
717                 return false
718         }
719         recentTime, err := v.lastModified(recent)
720         if err != nil {
721                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
722                 return false
723         }
724
725         ageWhenTrashed := trashTime.Sub(recentTime)
726         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
727                 // No evidence of a race: block hasn't been written
728                 // since it became eligible for Trash. No fix needed.
729                 return false
730         }
731
732         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
733         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
734         err = v.safeCopy(loc, "trash/"+loc)
735         if err != nil {
736                 log.Printf("error: fixRace: %s", err)
737                 return false
738         }
739         return true
740 }
741
742 func (v *S3Volume) translateError(err error) error {
743         switch err := err.(type) {
744         case *s3.Error:
745                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
746                         strings.Contains(err.Error(), "Not Found") {
747                         return os.ErrNotExist
748                 }
749                 // Other 404 errors like NoSuchVersion and
750                 // NoSuchBucket are different problems which should
751                 // get called out downstream, so we don't convert them
752                 // to os.ErrNotExist.
753         }
754         return err
755 }
756
757 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
758 // and deletes them from the volume.
759 func (v *S3Volume) EmptyTrash() {
760         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
761
762         // Use a merge sort to find matching sets of trash/X and recent/X.
763         trashL := s3Lister{
764                 Bucket:   v.bucket.Bucket,
765                 Prefix:   "trash/",
766                 PageSize: v.IndexPageSize,
767         }
768         // Define "ready to delete" as "...when EmptyTrash started".
769         startT := time.Now()
770         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
771                 loc := trash.Key[6:]
772                 if !v.isKeepBlock(loc) {
773                         continue
774                 }
775                 bytesInTrash += trash.Size
776                 blocksInTrash++
777
778                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
779                 if err != nil {
780                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
781                         continue
782                 }
783                 recent, err := v.bucket.Head("recent/"+loc, nil)
784                 if err != nil && os.IsNotExist(v.translateError(err)) {
785                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
786                         err = v.Untrash(loc)
787                         if err != nil {
788                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
789                         }
790                         continue
791                 } else if err != nil {
792                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
793                         continue
794                 }
795                 recentT, err := v.lastModified(recent)
796                 if err != nil {
797                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
798                         continue
799                 }
800                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
801                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
802                                 // recent/loc is too old to protect
803                                 // loc from being Trashed again during
804                                 // the raceWindow that starts if we
805                                 // delete trash/X now.
806                                 //
807                                 // Note this means (TrashCheckInterval
808                                 // < BlobSignatureTTL - raceWindow) is
809                                 // necessary to avoid starvation.
810                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
811                                 v.fixRace(loc)
812                                 v.Touch(loc)
813                                 continue
814                         }
815                         _, err := v.bucket.Head(loc, nil)
816                         if os.IsNotExist(err) {
817                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
818                                 v.fixRace(loc)
819                                 continue
820                         } else if err != nil {
821                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
822                                 continue
823                         }
824                 }
825                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
826                         continue
827                 }
828                 err = v.bucket.Del(trash.Key)
829                 if err != nil {
830                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
831                         continue
832                 }
833                 bytesDeleted += trash.Size
834                 blocksDeleted++
835
836                 _, err = v.bucket.Head(loc, nil)
837                 if os.IsNotExist(err) {
838                         err = v.bucket.Del("recent/" + loc)
839                         if err != nil {
840                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
841                         }
842                 } else if err != nil {
843                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
844                 }
845         }
846         if err := trashL.Error(); err != nil {
847                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
848         }
849         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
850 }
851
852 type s3Lister struct {
853         Bucket     *s3.Bucket
854         Prefix     string
855         PageSize   int
856         nextMarker string
857         buf        []s3.Key
858         err        error
859 }
860
861 // First fetches the first page and returns the first item. It returns
862 // nil if the response is the empty set or an error occurs.
863 func (lister *s3Lister) First() *s3.Key {
864         lister.getPage()
865         return lister.pop()
866 }
867
868 // Next returns the next item, fetching the next page if necessary. It
869 // returns nil if the last available item has already been fetched, or
870 // an error occurs.
871 func (lister *s3Lister) Next() *s3.Key {
872         if len(lister.buf) == 0 && lister.nextMarker != "" {
873                 lister.getPage()
874         }
875         return lister.pop()
876 }
877
878 // Return the most recent error encountered by First or Next.
879 func (lister *s3Lister) Error() error {
880         return lister.err
881 }
882
883 func (lister *s3Lister) getPage() {
884         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
885         lister.nextMarker = ""
886         if err != nil {
887                 lister.err = err
888                 return
889         }
890         if resp.IsTruncated {
891                 lister.nextMarker = resp.NextMarker
892         }
893         lister.buf = make([]s3.Key, 0, len(resp.Contents))
894         for _, key := range resp.Contents {
895                 if !strings.HasPrefix(key.Key, lister.Prefix) {
896                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
897                         continue
898                 }
899                 lister.buf = append(lister.buf, key)
900         }
901 }
902
903 func (lister *s3Lister) pop() (k *s3.Key) {
904         if len(lister.buf) > 0 {
905                 k = &lister.buf[0]
906                 lister.buf = lister.buf[1:]
907         }
908         return
909 }
910
911 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
912 type s3bucket struct {
913         *s3.Bucket
914         stats s3bucketStats
915 }
916
917 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
918         rdr, err := b.Bucket.GetReader(path)
919         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
920         b.stats.TickErr(err)
921         return NewCountingReader(rdr, b.stats.TickInBytes), err
922 }
923
924 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
925         resp, err := b.Bucket.Head(path, headers)
926         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
927         b.stats.TickErr(err)
928         return resp, err
929 }
930
931 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
932         if length == 0 {
933                 // goamz will only send Content-Length: 0 when reader
934                 // is nil due to net.http.Request.ContentLength
935                 // behavior.  Otherwise, Content-Length header is
936                 // omitted which will cause some S3 services
937                 // (including AWS and Ceph RadosGW) to fail to create
938                 // empty objects.
939                 r = nil
940         } else {
941                 r = NewCountingReader(r, b.stats.TickOutBytes)
942         }
943         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
944         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
945         b.stats.TickErr(err)
946         return err
947 }
948
949 func (b *s3bucket) Del(path string) error {
950         err := b.Bucket.Del(path)
951         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
952         b.stats.TickErr(err)
953         return err
954 }
955
956 type s3bucketStats struct {
957         statsTicker
958         Ops     uint64
959         GetOps  uint64
960         PutOps  uint64
961         HeadOps uint64
962         DelOps  uint64
963         ListOps uint64
964 }
965
966 func (s *s3bucketStats) TickErr(err error) {
967         if err == nil {
968                 return
969         }
970         errType := fmt.Sprintf("%T", err)
971         if err, ok := err.(*s3.Error); ok {
972                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
973         }
974         s.statsTicker.TickErr(err, errType)
975 }