Merge branch '13023-wb-log-newlines'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/AdRoll/goamz/aws"
25         "github.com/AdRoll/goamz/s3"
26 )
27
28 const (
29         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
30         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
31 )
32
33 var (
34         // ErrS3TrashDisabled is returned by Trash if that operation
35         // is impossible with the current config.
36         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
37
38         s3AccessKeyFile string
39         s3SecretKeyFile string
40         s3RegionName    string
41         s3Endpoint      string
42         s3Replication   int
43         s3UnsafeDelete  bool
44         s3RaceWindow    time.Duration
45
46         s3ACL = s3.Private
47
48         zeroTime time.Time
49 )
50
51 const (
52         maxClockSkew  = 600 * time.Second
53         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
54 )
55
56 type s3VolumeAdder struct {
57         *Config
58 }
59
60 // String implements flag.Value
61 func (s *s3VolumeAdder) String() string {
62         return "-"
63 }
64
65 func (s *s3VolumeAdder) Set(bucketName string) error {
66         if bucketName == "" {
67                 return fmt.Errorf("no container name given")
68         }
69         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
70                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
71         }
72         if deprecated.flagSerializeIO {
73                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
74         }
75         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
76                 Bucket:        bucketName,
77                 AccessKeyFile: s3AccessKeyFile,
78                 SecretKeyFile: s3SecretKeyFile,
79                 Endpoint:      s3Endpoint,
80                 Region:        s3RegionName,
81                 RaceWindow:    arvados.Duration(s3RaceWindow),
82                 S3Replication: s3Replication,
83                 UnsafeDelete:  s3UnsafeDelete,
84                 ReadOnly:      deprecated.flagReadonly,
85                 IndexPageSize: 1000,
86         })
87         return nil
88 }
89
90 func s3regions() (okList []string) {
91         for r := range aws.Regions {
92                 okList = append(okList, r)
93         }
94         return
95 }
96
97 func init() {
98         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
99
100         flag.Var(&s3VolumeAdder{theConfig},
101                 "s3-bucket-volume",
102                 "Use the given bucket as a storage volume. Can be given multiple times.")
103         flag.StringVar(
104                 &s3RegionName,
105                 "s3-region",
106                 "",
107                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
108         flag.StringVar(
109                 &s3Endpoint,
110                 "s3-endpoint",
111                 "",
112                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
113         flag.StringVar(
114                 &s3AccessKeyFile,
115                 "s3-access-key-file",
116                 "",
117                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
118         flag.StringVar(
119                 &s3SecretKeyFile,
120                 "s3-secret-key-file",
121                 "",
122                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
123         flag.DurationVar(
124                 &s3RaceWindow,
125                 "s3-race-window",
126                 24*time.Hour,
127                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
128         flag.IntVar(
129                 &s3Replication,
130                 "s3-replication",
131                 2,
132                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
133         flag.BoolVar(
134                 &s3UnsafeDelete,
135                 "s3-unsafe-delete",
136                 false,
137                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
138 }
139
140 // S3Volume implements Volume using an S3 bucket.
141 type S3Volume struct {
142         AccessKeyFile      string
143         SecretKeyFile      string
144         Endpoint           string
145         Region             string
146         Bucket             string
147         LocationConstraint bool
148         IndexPageSize      int
149         S3Replication      int
150         ConnectTimeout     arvados.Duration
151         ReadTimeout        arvados.Duration
152         RaceWindow         arvados.Duration
153         ReadOnly           bool
154         UnsafeDelete       bool
155
156         bucket *s3bucket
157
158         startOnce sync.Once
159 }
160
161 // Examples implements VolumeWithExamples.
162 func (*S3Volume) Examples() []Volume {
163         return []Volume{
164                 &S3Volume{
165                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
166                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
167                         Endpoint:       "",
168                         Region:         "us-east-1",
169                         Bucket:         "example-bucket-name",
170                         IndexPageSize:  1000,
171                         S3Replication:  2,
172                         RaceWindow:     arvados.Duration(24 * time.Hour),
173                         ConnectTimeout: arvados.Duration(time.Minute),
174                         ReadTimeout:    arvados.Duration(5 * time.Minute),
175                 },
176                 &S3Volume{
177                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
178                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
179                         Endpoint:       "https://storage.googleapis.com",
180                         Region:         "",
181                         Bucket:         "example-bucket-name",
182                         IndexPageSize:  1000,
183                         S3Replication:  2,
184                         RaceWindow:     arvados.Duration(24 * time.Hour),
185                         ConnectTimeout: arvados.Duration(time.Minute),
186                         ReadTimeout:    arvados.Duration(5 * time.Minute),
187                 },
188         }
189 }
190
191 // Type implements Volume.
192 func (*S3Volume) Type() string {
193         return "S3"
194 }
195
196 // Start populates private fields and verifies the configuration is
197 // valid.
198 func (v *S3Volume) Start() error {
199         region, ok := aws.Regions[v.Region]
200         if v.Endpoint == "" {
201                 if !ok {
202                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
203                 }
204         } else if ok {
205                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
206                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
207         } else {
208                 region = aws.Region{
209                         Name:                 v.Region,
210                         S3Endpoint:           v.Endpoint,
211                         S3LocationConstraint: v.LocationConstraint,
212                 }
213         }
214
215         var err error
216         var auth aws.Auth
217         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
218         if err != nil {
219                 return err
220         }
221         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
222         if err != nil {
223                 return err
224         }
225
226         // Zero timeouts mean "wait forever", which is a bad
227         // default. Default to long timeouts instead.
228         if v.ConnectTimeout == 0 {
229                 v.ConnectTimeout = s3DefaultConnectTimeout
230         }
231         if v.ReadTimeout == 0 {
232                 v.ReadTimeout = s3DefaultReadTimeout
233         }
234
235         client := s3.New(auth, region)
236         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
237         client.ReadTimeout = time.Duration(v.ReadTimeout)
238         v.bucket = &s3bucket{
239                 Bucket: &s3.Bucket{
240                         S3:   client,
241                         Name: v.Bucket,
242                 },
243         }
244         return nil
245 }
246
247 // DeviceID returns a globally unique ID for the storage bucket.
248 func (v *S3Volume) DeviceID() string {
249         return "s3://" + v.Endpoint + "/" + v.Bucket
250 }
251
252 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
253         ready := make(chan bool)
254         go func() {
255                 rdr, err = v.getReader(loc)
256                 close(ready)
257         }()
258         select {
259         case <-ready:
260                 return
261         case <-ctx.Done():
262                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
263                 go func() {
264                         <-ready
265                         if err == nil {
266                                 rdr.Close()
267                         }
268                 }()
269                 return nil, ctx.Err()
270         }
271 }
272
273 // getReader wraps (Bucket)GetReader.
274 //
275 // In situations where (Bucket)GetReader would fail because the block
276 // disappeared in a Trash race, getReader calls fixRace to recover the
277 // data, and tries again.
278 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
279         rdr, err = v.bucket.GetReader(loc)
280         err = v.translateError(err)
281         if err == nil || !os.IsNotExist(err) {
282                 return
283         }
284
285         _, err = v.bucket.Head("recent/"+loc, nil)
286         err = v.translateError(err)
287         if err != nil {
288                 // If we can't read recent/X, there's no point in
289                 // trying fixRace. Give up.
290                 return
291         }
292         if !v.fixRace(loc) {
293                 err = os.ErrNotExist
294                 return
295         }
296
297         rdr, err = v.bucket.GetReader(loc)
298         if err != nil {
299                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
300                 err = v.translateError(err)
301         }
302         return
303 }
304
305 // Get a block: copy the block data into buf, and return the number of
306 // bytes copied.
307 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
308         rdr, err := v.getReaderWithContext(ctx, loc)
309         if err != nil {
310                 return 0, err
311         }
312
313         var n int
314         ready := make(chan bool)
315         go func() {
316                 defer close(ready)
317
318                 defer rdr.Close()
319                 n, err = io.ReadFull(rdr, buf)
320
321                 switch err {
322                 case nil, io.EOF, io.ErrUnexpectedEOF:
323                         err = nil
324                 default:
325                         err = v.translateError(err)
326                 }
327         }()
328         select {
329         case <-ctx.Done():
330                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
331                 rdr.Close()
332                 // Must wait for ReadFull to return, to ensure it
333                 // doesn't write to buf after we return.
334                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
335                 <-ready
336                 return 0, ctx.Err()
337         case <-ready:
338                 return n, err
339         }
340 }
341
342 // Compare the given data with the stored data.
343 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
344         errChan := make(chan error, 1)
345         go func() {
346                 _, err := v.bucket.Head("recent/"+loc, nil)
347                 errChan <- err
348         }()
349         var err error
350         select {
351         case <-ctx.Done():
352                 return ctx.Err()
353         case err = <-errChan:
354         }
355         if err != nil {
356                 // Checking for "loc" itself here would interfere with
357                 // future GET requests.
358                 //
359                 // On AWS, if X doesn't exist, a HEAD or GET request
360                 // for X causes X's non-existence to be cached. Thus,
361                 // if we test for X, then create X and return a
362                 // signature to our client, the client might still get
363                 // 404 from all keepstores when trying to read it.
364                 //
365                 // To avoid this, we avoid doing HEAD X or GET X until
366                 // we know X has been written.
367                 //
368                 // Note that X might exist even though recent/X
369                 // doesn't: for example, the response to HEAD recent/X
370                 // might itself come from a stale cache. In such
371                 // cases, we will return a false negative and
372                 // PutHandler might needlessly create another replica
373                 // on a different volume. That's not ideal, but it's
374                 // better than passing the eventually-consistent
375                 // problem on to our clients.
376                 return v.translateError(err)
377         }
378         rdr, err := v.getReaderWithContext(ctx, loc)
379         if err != nil {
380                 return err
381         }
382         defer rdr.Close()
383         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
384 }
385
386 // Put writes a block.
387 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
388         if v.ReadOnly {
389                 return MethodDisabledError
390         }
391         var opts s3.Options
392         size := len(block)
393         if size > 0 {
394                 md5, err := hex.DecodeString(loc)
395                 if err != nil {
396                         return err
397                 }
398                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
399         }
400
401         // Send the block data through a pipe, so that (if we need to)
402         // we can close the pipe early and abandon our PutReader()
403         // goroutine, without worrying about PutReader() accessing our
404         // block buffer after we release it.
405         bufr, bufw := io.Pipe()
406         go func() {
407                 io.Copy(bufw, bytes.NewReader(block))
408                 bufw.Close()
409         }()
410
411         var err error
412         ready := make(chan bool)
413         go func() {
414                 defer func() {
415                         if ctx.Err() != nil {
416                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
417                         }
418                 }()
419                 defer close(ready)
420                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
421                 if err != nil {
422                         return
423                 }
424                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
425         }()
426         select {
427         case <-ctx.Done():
428                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
429                 // Our pipe might be stuck in Write(), waiting for
430                 // io.Copy() to read. If so, un-stick it. This means
431                 // PutReader will get corrupt data, but that's OK: the
432                 // size and MD5 won't match, so the write will fail.
433                 go io.Copy(ioutil.Discard, bufr)
434                 // CloseWithError() will return once pending I/O is done.
435                 bufw.CloseWithError(ctx.Err())
436                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
437                 return ctx.Err()
438         case <-ready:
439                 return v.translateError(err)
440         }
441 }
442
443 // Touch sets the timestamp for the given locator to the current time.
444 func (v *S3Volume) Touch(loc string) error {
445         if v.ReadOnly {
446                 return MethodDisabledError
447         }
448         _, err := v.bucket.Head(loc, nil)
449         err = v.translateError(err)
450         if os.IsNotExist(err) && v.fixRace(loc) {
451                 // The data object got trashed in a race, but fixRace
452                 // rescued it.
453         } else if err != nil {
454                 return err
455         }
456         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
457         return v.translateError(err)
458 }
459
460 // Mtime returns the stored timestamp for the given locator.
461 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
462         _, err := v.bucket.Head(loc, nil)
463         if err != nil {
464                 return zeroTime, v.translateError(err)
465         }
466         resp, err := v.bucket.Head("recent/"+loc, nil)
467         err = v.translateError(err)
468         if os.IsNotExist(err) {
469                 // The data object X exists, but recent/X is missing.
470                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
471                 if err != nil {
472                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
473                         return zeroTime, v.translateError(err)
474                 }
475                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
476                 resp, err = v.bucket.Head("recent/"+loc, nil)
477                 if err != nil {
478                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
479                         return zeroTime, v.translateError(err)
480                 }
481         } else if err != nil {
482                 // HEAD recent/X failed for some other reason.
483                 return zeroTime, err
484         }
485         return v.lastModified(resp)
486 }
487
488 // IndexTo writes a complete list of locators with the given prefix
489 // for which Get() can retrieve data.
490 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
491         // Use a merge sort to find matching sets of X and recent/X.
492         dataL := s3Lister{
493                 Bucket:   v.bucket.Bucket,
494                 Prefix:   prefix,
495                 PageSize: v.IndexPageSize,
496         }
497         recentL := s3Lister{
498                 Bucket:   v.bucket.Bucket,
499                 Prefix:   "recent/" + prefix,
500                 PageSize: v.IndexPageSize,
501         }
502         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
503         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
504         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
505                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
506                 if data.Key >= "g" {
507                         // Conveniently, "recent/*" and "trash/*" are
508                         // lexically greater than all hex-encoded data
509                         // hashes, so stopping here avoids iterating
510                         // over all of them needlessly with dataL.
511                         break
512                 }
513                 if !v.isKeepBlock(data.Key) {
514                         continue
515                 }
516
517                 // stamp is the list entry we should use to report the
518                 // last-modified time for this data block: it will be
519                 // the recent/X entry if one exists, otherwise the
520                 // entry for the data block itself.
521                 stamp := data
522
523                 // Advance to the corresponding recent/X marker, if any
524                 for recent != nil {
525                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
526                                 recent = recentL.Next()
527                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
528                                 continue
529                         } else if cmp == 0 {
530                                 stamp = recent
531                                 recent = recentL.Next()
532                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
533                                 break
534                         } else {
535                                 // recent/X marker is missing: we'll
536                                 // use the timestamp on the data
537                                 // object.
538                                 break
539                         }
540                 }
541                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
542                 if err != nil {
543                         return err
544                 }
545                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
546         }
547         return nil
548 }
549
550 // Trash a Keep block.
551 func (v *S3Volume) Trash(loc string) error {
552         if v.ReadOnly {
553                 return MethodDisabledError
554         }
555         if t, err := v.Mtime(loc); err != nil {
556                 return err
557         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
558                 return nil
559         }
560         if theConfig.TrashLifetime == 0 {
561                 if !s3UnsafeDelete {
562                         return ErrS3TrashDisabled
563                 }
564                 return v.translateError(v.bucket.Del(loc))
565         }
566         err := v.checkRaceWindow(loc)
567         if err != nil {
568                 return err
569         }
570         err = v.safeCopy("trash/"+loc, loc)
571         if err != nil {
572                 return err
573         }
574         return v.translateError(v.bucket.Del(loc))
575 }
576
577 // checkRaceWindow returns a non-nil error if trash/loc is, or might
578 // be, in the race window (i.e., it's not safe to trash loc).
579 func (v *S3Volume) checkRaceWindow(loc string) error {
580         resp, err := v.bucket.Head("trash/"+loc, nil)
581         err = v.translateError(err)
582         if os.IsNotExist(err) {
583                 // OK, trash/X doesn't exist so we're not in the race
584                 // window
585                 return nil
586         } else if err != nil {
587                 // Error looking up trash/X. We don't know whether
588                 // we're in the race window
589                 return err
590         }
591         t, err := v.lastModified(resp)
592         if err != nil {
593                 // Can't parse timestamp
594                 return err
595         }
596         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
597         if safeWindow <= 0 {
598                 // We can't count on "touch trash/X" to prolong
599                 // trash/X's lifetime. The new timestamp might not
600                 // become visible until now+raceWindow, and EmptyTrash
601                 // is allowed to delete trash/X before then.
602                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
603         }
604         // trash/X exists, but it won't be eligible for deletion until
605         // after now+raceWindow, so it's safe to overwrite it.
606         return nil
607 }
608
609 // safeCopy calls PutCopy, and checks the response to make sure the
610 // copy succeeded and updated the timestamp on the destination object
611 // (PutCopy returns 200 OK if the request was received, even if the
612 // copy failed).
613 func (v *S3Volume) safeCopy(dst, src string) error {
614         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
615                 ContentType:       "application/octet-stream",
616                 MetadataDirective: "REPLACE",
617         }, v.bucket.Name+"/"+src)
618         err = v.translateError(err)
619         if err != nil {
620                 return err
621         }
622         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
623                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
624         } else if time.Now().Sub(t) > maxClockSkew {
625                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
626         }
627         return nil
628 }
629
630 // Get the LastModified header from resp, and parse it as RFC1123 or
631 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
632 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
633         s := resp.Header.Get("Last-Modified")
634         t, err = time.Parse(time.RFC1123, s)
635         if err != nil && s != "" {
636                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
637                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
638                 // as required by HTTP spec. If it's not a valid HTTP
639                 // header value, it's probably AWS (or s3test) giving
640                 // us a nearly-RFC1123 timestamp.
641                 t, err = time.Parse(nearlyRFC1123, s)
642         }
643         return
644 }
645
646 // Untrash moves block from trash back into store
647 func (v *S3Volume) Untrash(loc string) error {
648         err := v.safeCopy(loc, "trash/"+loc)
649         if err != nil {
650                 return err
651         }
652         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
653         return v.translateError(err)
654 }
655
656 // Status returns a *VolumeStatus representing the current in-use
657 // storage capacity and a fake available capacity that doesn't make
658 // the volume seem full or nearly-full.
659 func (v *S3Volume) Status() *VolumeStatus {
660         return &VolumeStatus{
661                 DeviceNum: 1,
662                 BytesFree: BlockSize * 1000,
663                 BytesUsed: 1,
664         }
665 }
666
667 // InternalStats returns bucket I/O and API call counters.
668 func (v *S3Volume) InternalStats() interface{} {
669         return &v.bucket.stats
670 }
671
672 // String implements fmt.Stringer.
673 func (v *S3Volume) String() string {
674         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
675 }
676
677 // Writable returns false if all future Put, Mtime, and Delete calls
678 // are expected to fail.
679 func (v *S3Volume) Writable() bool {
680         return !v.ReadOnly
681 }
682
683 // Replication returns the storage redundancy of the underlying
684 // device. Configured via command line flag.
685 func (v *S3Volume) Replication() int {
686         return v.S3Replication
687 }
688
689 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
690
691 func (v *S3Volume) isKeepBlock(s string) bool {
692         return s3KeepBlockRegexp.MatchString(s)
693 }
694
695 // fixRace(X) is called when "recent/X" exists but "X" doesn't
696 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
697 // there was a race between Put and Trash, fixRace recovers from the
698 // race by Untrashing the block.
699 func (v *S3Volume) fixRace(loc string) bool {
700         trash, err := v.bucket.Head("trash/"+loc, nil)
701         if err != nil {
702                 if !os.IsNotExist(v.translateError(err)) {
703                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
704                 }
705                 return false
706         }
707         trashTime, err := v.lastModified(trash)
708         if err != nil {
709                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
710                 return false
711         }
712
713         recent, err := v.bucket.Head("recent/"+loc, nil)
714         if err != nil {
715                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
716                 return false
717         }
718         recentTime, err := v.lastModified(recent)
719         if err != nil {
720                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
721                 return false
722         }
723
724         ageWhenTrashed := trashTime.Sub(recentTime)
725         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
726                 // No evidence of a race: block hasn't been written
727                 // since it became eligible for Trash. No fix needed.
728                 return false
729         }
730
731         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
732         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
733         err = v.safeCopy(loc, "trash/"+loc)
734         if err != nil {
735                 log.Printf("error: fixRace: %s", err)
736                 return false
737         }
738         return true
739 }
740
741 func (v *S3Volume) translateError(err error) error {
742         switch err := err.(type) {
743         case *s3.Error:
744                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
745                         strings.Contains(err.Error(), "Not Found") {
746                         return os.ErrNotExist
747                 }
748                 // Other 404 errors like NoSuchVersion and
749                 // NoSuchBucket are different problems which should
750                 // get called out downstream, so we don't convert them
751                 // to os.ErrNotExist.
752         }
753         return err
754 }
755
756 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
757 // and deletes them from the volume.
758 func (v *S3Volume) EmptyTrash() {
759         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
760
761         // Use a merge sort to find matching sets of trash/X and recent/X.
762         trashL := s3Lister{
763                 Bucket:   v.bucket.Bucket,
764                 Prefix:   "trash/",
765                 PageSize: v.IndexPageSize,
766         }
767         // Define "ready to delete" as "...when EmptyTrash started".
768         startT := time.Now()
769         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
770                 loc := trash.Key[6:]
771                 if !v.isKeepBlock(loc) {
772                         continue
773                 }
774                 bytesInTrash += trash.Size
775                 blocksInTrash++
776
777                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
778                 if err != nil {
779                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
780                         continue
781                 }
782                 recent, err := v.bucket.Head("recent/"+loc, nil)
783                 if err != nil && os.IsNotExist(v.translateError(err)) {
784                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
785                         err = v.Untrash(loc)
786                         if err != nil {
787                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
788                         }
789                         continue
790                 } else if err != nil {
791                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
792                         continue
793                 }
794                 recentT, err := v.lastModified(recent)
795                 if err != nil {
796                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
797                         continue
798                 }
799                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
800                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
801                                 // recent/loc is too old to protect
802                                 // loc from being Trashed again during
803                                 // the raceWindow that starts if we
804                                 // delete trash/X now.
805                                 //
806                                 // Note this means (TrashCheckInterval
807                                 // < BlobSignatureTTL - raceWindow) is
808                                 // necessary to avoid starvation.
809                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
810                                 v.fixRace(loc)
811                                 v.Touch(loc)
812                                 continue
813                         }
814                         _, err := v.bucket.Head(loc, nil)
815                         if os.IsNotExist(err) {
816                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
817                                 v.fixRace(loc)
818                                 continue
819                         } else if err != nil {
820                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
821                                 continue
822                         }
823                 }
824                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
825                         continue
826                 }
827                 err = v.bucket.Del(trash.Key)
828                 if err != nil {
829                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
830                         continue
831                 }
832                 bytesDeleted += trash.Size
833                 blocksDeleted++
834
835                 _, err = v.bucket.Head(loc, nil)
836                 if os.IsNotExist(err) {
837                         err = v.bucket.Del("recent/" + loc)
838                         if err != nil {
839                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
840                         }
841                 } else if err != nil {
842                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
843                 }
844         }
845         if err := trashL.Error(); err != nil {
846                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
847         }
848         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
849 }
850
851 type s3Lister struct {
852         Bucket     *s3.Bucket
853         Prefix     string
854         PageSize   int
855         nextMarker string
856         buf        []s3.Key
857         err        error
858 }
859
860 // First fetches the first page and returns the first item. It returns
861 // nil if the response is the empty set or an error occurs.
862 func (lister *s3Lister) First() *s3.Key {
863         lister.getPage()
864         return lister.pop()
865 }
866
867 // Next returns the next item, fetching the next page if necessary. It
868 // returns nil if the last available item has already been fetched, or
869 // an error occurs.
870 func (lister *s3Lister) Next() *s3.Key {
871         if len(lister.buf) == 0 && lister.nextMarker != "" {
872                 lister.getPage()
873         }
874         return lister.pop()
875 }
876
877 // Return the most recent error encountered by First or Next.
878 func (lister *s3Lister) Error() error {
879         return lister.err
880 }
881
882 func (lister *s3Lister) getPage() {
883         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
884         lister.nextMarker = ""
885         if err != nil {
886                 lister.err = err
887                 return
888         }
889         if resp.IsTruncated {
890                 lister.nextMarker = resp.NextMarker
891         }
892         lister.buf = make([]s3.Key, 0, len(resp.Contents))
893         for _, key := range resp.Contents {
894                 if !strings.HasPrefix(key.Key, lister.Prefix) {
895                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
896                         continue
897                 }
898                 lister.buf = append(lister.buf, key)
899         }
900 }
901
902 func (lister *s3Lister) pop() (k *s3.Key) {
903         if len(lister.buf) > 0 {
904                 k = &lister.buf[0]
905                 lister.buf = lister.buf[1:]
906         }
907         return
908 }
909
910 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
911 type s3bucket struct {
912         *s3.Bucket
913         stats s3bucketStats
914 }
915
916 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
917         rdr, err := b.Bucket.GetReader(path)
918         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
919         b.stats.TickErr(err)
920         return NewCountingReader(rdr, b.stats.TickInBytes), err
921 }
922
923 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
924         resp, err := b.Bucket.Head(path, headers)
925         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
926         b.stats.TickErr(err)
927         return resp, err
928 }
929
930 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
931         if length == 0 {
932                 // goamz will only send Content-Length: 0 when reader
933                 // is nil due to net.http.Request.ContentLength
934                 // behavior.  Otherwise, Content-Length header is
935                 // omitted which will cause some S3 services
936                 // (including AWS and Ceph RadosGW) to fail to create
937                 // empty objects.
938                 r = nil
939         } else {
940                 r = NewCountingReader(r, b.stats.TickOutBytes)
941         }
942         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
943         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
944         b.stats.TickErr(err)
945         return err
946 }
947
948 func (b *s3bucket) Del(path string) error {
949         err := b.Bucket.Del(path)
950         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
951         b.stats.TickErr(err)
952         return err
953 }
954
955 type s3bucketStats struct {
956         statsTicker
957         Ops     uint64
958         GetOps  uint64
959         PutOps  uint64
960         HeadOps uint64
961         DelOps  uint64
962         ListOps uint64
963 }
964
965 func (s *s3bucketStats) TickErr(err error) {
966         if err == nil {
967                 return
968         }
969         errType := fmt.Sprintf("%T", err)
970         if err, ok := err.(*s3.Error); ok {
971                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
972         }
973         s.statsTicker.TickErr(err, errType)
974 }