13937: Adds counters to azure driver.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/sha256"
11         "encoding/base64"
12         "encoding/hex"
13         "flag"
14         "fmt"
15         "io"
16         "io/ioutil"
17         "net/http"
18         "os"
19         "regexp"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         "git.curoverse.com/arvados.git/sdk/go/arvados"
26         "github.com/AdRoll/goamz/aws"
27         "github.com/AdRoll/goamz/s3"
28         "github.com/prometheus/client_golang/prometheus"
29 )
30
31 const (
32         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
33         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
34 )
35
36 var (
37         // ErrS3TrashDisabled is returned by Trash if that operation
38         // is impossible with the current config.
39         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
40
41         s3AccessKeyFile string
42         s3SecretKeyFile string
43         s3RegionName    string
44         s3Endpoint      string
45         s3Replication   int
46         s3UnsafeDelete  bool
47         s3RaceWindow    time.Duration
48
49         s3ACL = s3.Private
50
51         zeroTime time.Time
52 )
53
54 const (
55         maxClockSkew  = 600 * time.Second
56         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
57 )
58
59 type s3VolumeAdder struct {
60         *Config
61 }
62
63 // String implements flag.Value
64 func (s *s3VolumeAdder) String() string {
65         return "-"
66 }
67
68 func (s *s3VolumeAdder) Set(bucketName string) error {
69         if bucketName == "" {
70                 return fmt.Errorf("no container name given")
71         }
72         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
73                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
74         }
75         if deprecated.flagSerializeIO {
76                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
77         }
78         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
79                 Bucket:        bucketName,
80                 AccessKeyFile: s3AccessKeyFile,
81                 SecretKeyFile: s3SecretKeyFile,
82                 Endpoint:      s3Endpoint,
83                 Region:        s3RegionName,
84                 RaceWindow:    arvados.Duration(s3RaceWindow),
85                 S3Replication: s3Replication,
86                 UnsafeDelete:  s3UnsafeDelete,
87                 ReadOnly:      deprecated.flagReadonly,
88                 IndexPageSize: 1000,
89         })
90         return nil
91 }
92
93 func s3regions() (okList []string) {
94         for r := range aws.Regions {
95                 okList = append(okList, r)
96         }
97         return
98 }
99
100 func init() {
101         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
102
103         flag.Var(&s3VolumeAdder{theConfig},
104                 "s3-bucket-volume",
105                 "Use the given bucket as a storage volume. Can be given multiple times.")
106         flag.StringVar(
107                 &s3RegionName,
108                 "s3-region",
109                 "",
110                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
111         flag.StringVar(
112                 &s3Endpoint,
113                 "s3-endpoint",
114                 "",
115                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
116         flag.StringVar(
117                 &s3AccessKeyFile,
118                 "s3-access-key-file",
119                 "",
120                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
121         flag.StringVar(
122                 &s3SecretKeyFile,
123                 "s3-secret-key-file",
124                 "",
125                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
126         flag.DurationVar(
127                 &s3RaceWindow,
128                 "s3-race-window",
129                 24*time.Hour,
130                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
131         flag.IntVar(
132                 &s3Replication,
133                 "s3-replication",
134                 2,
135                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
136         flag.BoolVar(
137                 &s3UnsafeDelete,
138                 "s3-unsafe-delete",
139                 false,
140                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
141 }
142
143 // S3Volume implements Volume using an S3 bucket.
144 type S3Volume struct {
145         AccessKeyFile      string
146         SecretKeyFile      string
147         Endpoint           string
148         Region             string
149         Bucket             string
150         LocationConstraint bool
151         IndexPageSize      int
152         S3Replication      int
153         ConnectTimeout     arvados.Duration
154         ReadTimeout        arvados.Duration
155         RaceWindow         arvados.Duration
156         ReadOnly           bool
157         UnsafeDelete       bool
158         StorageClasses     []string
159
160         bucket *s3bucket
161
162         startOnce sync.Once
163 }
164
165 // Examples implements VolumeWithExamples.
166 func (*S3Volume) Examples() []Volume {
167         return []Volume{
168                 &S3Volume{
169                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
170                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
171                         Endpoint:       "",
172                         Region:         "us-east-1",
173                         Bucket:         "example-bucket-name",
174                         IndexPageSize:  1000,
175                         S3Replication:  2,
176                         RaceWindow:     arvados.Duration(24 * time.Hour),
177                         ConnectTimeout: arvados.Duration(time.Minute),
178                         ReadTimeout:    arvados.Duration(5 * time.Minute),
179                 },
180                 &S3Volume{
181                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
182                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
183                         Endpoint:       "https://storage.googleapis.com",
184                         Region:         "",
185                         Bucket:         "example-bucket-name",
186                         IndexPageSize:  1000,
187                         S3Replication:  2,
188                         RaceWindow:     arvados.Duration(24 * time.Hour),
189                         ConnectTimeout: arvados.Duration(time.Minute),
190                         ReadTimeout:    arvados.Duration(5 * time.Minute),
191                 },
192         }
193 }
194
195 // Type implements Volume.
196 func (*S3Volume) Type() string {
197         return "S3"
198 }
199
200 // Start populates private fields and verifies the configuration is
201 // valid.
202 func (v *S3Volume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
203         region, ok := aws.Regions[v.Region]
204         if v.Endpoint == "" {
205                 if !ok {
206                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
207                 }
208         } else if ok {
209                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
210                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
211         } else {
212                 region = aws.Region{
213                         Name:                 v.Region,
214                         S3Endpoint:           v.Endpoint,
215                         S3LocationConstraint: v.LocationConstraint,
216                 }
217         }
218
219         var err error
220         var auth aws.Auth
221         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
222         if err != nil {
223                 return err
224         }
225         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
226         if err != nil {
227                 return err
228         }
229
230         // Zero timeouts mean "wait forever", which is a bad
231         // default. Default to long timeouts instead.
232         if v.ConnectTimeout == 0 {
233                 v.ConnectTimeout = s3DefaultConnectTimeout
234         }
235         if v.ReadTimeout == 0 {
236                 v.ReadTimeout = s3DefaultReadTimeout
237         }
238
239         client := s3.New(auth, region)
240         if region.EC2Endpoint.Signer == aws.V4Signature {
241                 // Currently affects only eu-central-1
242                 client.Signature = aws.V4Signature
243         }
244         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
245         client.ReadTimeout = time.Duration(v.ReadTimeout)
246         v.bucket = &s3bucket{
247                 Bucket: &s3.Bucket{
248                         S3:   client,
249                         Name: v.Bucket,
250                 },
251         }
252         // Set up prometheus metrics
253         lbls := prometheus.Labels{"device_id": v.DeviceID()}
254         v.bucket.stats.opsCounters = opsCounters.MustCurryWith(lbls)
255         v.bucket.stats.errCounters = errCounters.MustCurryWith(lbls)
256         v.bucket.stats.ioBytes = ioBytes.MustCurryWith(lbls)
257
258         return nil
259 }
260
261 // DeviceID returns a globally unique ID for the storage bucket.
262 func (v *S3Volume) DeviceID() string {
263         return "s3://" + v.Endpoint + "/" + v.Bucket
264 }
265
266 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
267         ready := make(chan bool)
268         go func() {
269                 rdr, err = v.getReader(loc)
270                 close(ready)
271         }()
272         select {
273         case <-ready:
274                 return
275         case <-ctx.Done():
276                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
277                 go func() {
278                         <-ready
279                         if err == nil {
280                                 rdr.Close()
281                         }
282                 }()
283                 return nil, ctx.Err()
284         }
285 }
286
287 // getReader wraps (Bucket)GetReader.
288 //
289 // In situations where (Bucket)GetReader would fail because the block
290 // disappeared in a Trash race, getReader calls fixRace to recover the
291 // data, and tries again.
292 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
293         rdr, err = v.bucket.GetReader(loc)
294         err = v.translateError(err)
295         if err == nil || !os.IsNotExist(err) {
296                 return
297         }
298
299         _, err = v.bucket.Head("recent/"+loc, nil)
300         err = v.translateError(err)
301         if err != nil {
302                 // If we can't read recent/X, there's no point in
303                 // trying fixRace. Give up.
304                 return
305         }
306         if !v.fixRace(loc) {
307                 err = os.ErrNotExist
308                 return
309         }
310
311         rdr, err = v.bucket.GetReader(loc)
312         if err != nil {
313                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
314                 err = v.translateError(err)
315         }
316         return
317 }
318
319 // Get a block: copy the block data into buf, and return the number of
320 // bytes copied.
321 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
322         rdr, err := v.getReaderWithContext(ctx, loc)
323         if err != nil {
324                 return 0, err
325         }
326
327         var n int
328         ready := make(chan bool)
329         go func() {
330                 defer close(ready)
331
332                 defer rdr.Close()
333                 n, err = io.ReadFull(rdr, buf)
334
335                 switch err {
336                 case nil, io.EOF, io.ErrUnexpectedEOF:
337                         err = nil
338                 default:
339                         err = v.translateError(err)
340                 }
341         }()
342         select {
343         case <-ctx.Done():
344                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
345                 rdr.Close()
346                 // Must wait for ReadFull to return, to ensure it
347                 // doesn't write to buf after we return.
348                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
349                 <-ready
350                 return 0, ctx.Err()
351         case <-ready:
352                 return n, err
353         }
354 }
355
356 // Compare the given data with the stored data.
357 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
358         errChan := make(chan error, 1)
359         go func() {
360                 _, err := v.bucket.Head("recent/"+loc, nil)
361                 errChan <- err
362         }()
363         var err error
364         select {
365         case <-ctx.Done():
366                 return ctx.Err()
367         case err = <-errChan:
368         }
369         if err != nil {
370                 // Checking for "loc" itself here would interfere with
371                 // future GET requests.
372                 //
373                 // On AWS, if X doesn't exist, a HEAD or GET request
374                 // for X causes X's non-existence to be cached. Thus,
375                 // if we test for X, then create X and return a
376                 // signature to our client, the client might still get
377                 // 404 from all keepstores when trying to read it.
378                 //
379                 // To avoid this, we avoid doing HEAD X or GET X until
380                 // we know X has been written.
381                 //
382                 // Note that X might exist even though recent/X
383                 // doesn't: for example, the response to HEAD recent/X
384                 // might itself come from a stale cache. In such
385                 // cases, we will return a false negative and
386                 // PutHandler might needlessly create another replica
387                 // on a different volume. That's not ideal, but it's
388                 // better than passing the eventually-consistent
389                 // problem on to our clients.
390                 return v.translateError(err)
391         }
392         rdr, err := v.getReaderWithContext(ctx, loc)
393         if err != nil {
394                 return err
395         }
396         defer rdr.Close()
397         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
398 }
399
400 // Put writes a block.
401 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
402         if v.ReadOnly {
403                 return MethodDisabledError
404         }
405         var opts s3.Options
406         size := len(block)
407         if size > 0 {
408                 md5, err := hex.DecodeString(loc)
409                 if err != nil {
410                         return err
411                 }
412                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
413                 // In AWS regions that use V4 signatures, we need to
414                 // provide ContentSHA256 up front. Otherwise, the S3
415                 // library reads the request body (from our buffer)
416                 // into another new buffer in order to compute the
417                 // SHA256 before sending the request -- which would
418                 // mean consuming 128 MiB of memory for the duration
419                 // of a 64 MiB write.
420                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
421         }
422
423         // Send the block data through a pipe, so that (if we need to)
424         // we can close the pipe early and abandon our PutReader()
425         // goroutine, without worrying about PutReader() accessing our
426         // block buffer after we release it.
427         bufr, bufw := io.Pipe()
428         go func() {
429                 io.Copy(bufw, bytes.NewReader(block))
430                 bufw.Close()
431         }()
432
433         var err error
434         ready := make(chan bool)
435         go func() {
436                 defer func() {
437                         if ctx.Err() != nil {
438                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
439                         }
440                 }()
441                 defer close(ready)
442                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
443                 if err != nil {
444                         return
445                 }
446                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
447         }()
448         select {
449         case <-ctx.Done():
450                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
451                 // Our pipe might be stuck in Write(), waiting for
452                 // PutReader() to read. If so, un-stick it. This means
453                 // PutReader will get corrupt data, but that's OK: the
454                 // size and MD5 won't match, so the write will fail.
455                 go io.Copy(ioutil.Discard, bufr)
456                 // CloseWithError() will return once pending I/O is done.
457                 bufw.CloseWithError(ctx.Err())
458                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
459                 return ctx.Err()
460         case <-ready:
461                 // Unblock pipe in case PutReader did not consume it.
462                 io.Copy(ioutil.Discard, bufr)
463                 return v.translateError(err)
464         }
465 }
466
467 // Touch sets the timestamp for the given locator to the current time.
468 func (v *S3Volume) Touch(loc string) error {
469         if v.ReadOnly {
470                 return MethodDisabledError
471         }
472         _, err := v.bucket.Head(loc, nil)
473         err = v.translateError(err)
474         if os.IsNotExist(err) && v.fixRace(loc) {
475                 // The data object got trashed in a race, but fixRace
476                 // rescued it.
477         } else if err != nil {
478                 return err
479         }
480         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
481         return v.translateError(err)
482 }
483
484 // Mtime returns the stored timestamp for the given locator.
485 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
486         _, err := v.bucket.Head(loc, nil)
487         if err != nil {
488                 return zeroTime, v.translateError(err)
489         }
490         resp, err := v.bucket.Head("recent/"+loc, nil)
491         err = v.translateError(err)
492         if os.IsNotExist(err) {
493                 // The data object X exists, but recent/X is missing.
494                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
495                 if err != nil {
496                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
497                         return zeroTime, v.translateError(err)
498                 }
499                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
500                 resp, err = v.bucket.Head("recent/"+loc, nil)
501                 if err != nil {
502                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
503                         return zeroTime, v.translateError(err)
504                 }
505         } else if err != nil {
506                 // HEAD recent/X failed for some other reason.
507                 return zeroTime, err
508         }
509         return v.lastModified(resp)
510 }
511
512 // IndexTo writes a complete list of locators with the given prefix
513 // for which Get() can retrieve data.
514 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
515         // Use a merge sort to find matching sets of X and recent/X.
516         dataL := s3Lister{
517                 Bucket:   v.bucket.Bucket,
518                 Prefix:   prefix,
519                 PageSize: v.IndexPageSize,
520                 Stats:    &v.bucket.stats,
521         }
522         recentL := s3Lister{
523                 Bucket:   v.bucket.Bucket,
524                 Prefix:   "recent/" + prefix,
525                 PageSize: v.IndexPageSize,
526                 Stats:    &v.bucket.stats,
527         }
528         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
529                 if data.Key >= "g" {
530                         // Conveniently, "recent/*" and "trash/*" are
531                         // lexically greater than all hex-encoded data
532                         // hashes, so stopping here avoids iterating
533                         // over all of them needlessly with dataL.
534                         break
535                 }
536                 if !v.isKeepBlock(data.Key) {
537                         continue
538                 }
539
540                 // stamp is the list entry we should use to report the
541                 // last-modified time for this data block: it will be
542                 // the recent/X entry if one exists, otherwise the
543                 // entry for the data block itself.
544                 stamp := data
545
546                 // Advance to the corresponding recent/X marker, if any
547                 for recent != nil && recentL.Error() == nil {
548                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
549                                 recent = recentL.Next()
550                                 continue
551                         } else if cmp == 0 {
552                                 stamp = recent
553                                 recent = recentL.Next()
554                                 break
555                         } else {
556                                 // recent/X marker is missing: we'll
557                                 // use the timestamp on the data
558                                 // object.
559                                 break
560                         }
561                 }
562                 if err := recentL.Error(); err != nil {
563                         return err
564                 }
565                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
566                 if err != nil {
567                         return err
568                 }
569                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
570         }
571         return dataL.Error()
572 }
573
574 // Trash a Keep block.
575 func (v *S3Volume) Trash(loc string) error {
576         if v.ReadOnly {
577                 return MethodDisabledError
578         }
579         if t, err := v.Mtime(loc); err != nil {
580                 return err
581         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
582                 return nil
583         }
584         if theConfig.TrashLifetime == 0 {
585                 if !s3UnsafeDelete {
586                         return ErrS3TrashDisabled
587                 }
588                 return v.translateError(v.bucket.Del(loc))
589         }
590         err := v.checkRaceWindow(loc)
591         if err != nil {
592                 return err
593         }
594         err = v.safeCopy("trash/"+loc, loc)
595         if err != nil {
596                 return err
597         }
598         return v.translateError(v.bucket.Del(loc))
599 }
600
601 // checkRaceWindow returns a non-nil error if trash/loc is, or might
602 // be, in the race window (i.e., it's not safe to trash loc).
603 func (v *S3Volume) checkRaceWindow(loc string) error {
604         resp, err := v.bucket.Head("trash/"+loc, nil)
605         err = v.translateError(err)
606         if os.IsNotExist(err) {
607                 // OK, trash/X doesn't exist so we're not in the race
608                 // window
609                 return nil
610         } else if err != nil {
611                 // Error looking up trash/X. We don't know whether
612                 // we're in the race window
613                 return err
614         }
615         t, err := v.lastModified(resp)
616         if err != nil {
617                 // Can't parse timestamp
618                 return err
619         }
620         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
621         if safeWindow <= 0 {
622                 // We can't count on "touch trash/X" to prolong
623                 // trash/X's lifetime. The new timestamp might not
624                 // become visible until now+raceWindow, and EmptyTrash
625                 // is allowed to delete trash/X before then.
626                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
627         }
628         // trash/X exists, but it won't be eligible for deletion until
629         // after now+raceWindow, so it's safe to overwrite it.
630         return nil
631 }
632
633 // safeCopy calls PutCopy, and checks the response to make sure the
634 // copy succeeded and updated the timestamp on the destination object
635 // (PutCopy returns 200 OK if the request was received, even if the
636 // copy failed).
637 func (v *S3Volume) safeCopy(dst, src string) error {
638         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
639                 ContentType:       "application/octet-stream",
640                 MetadataDirective: "REPLACE",
641         }, v.bucket.Name+"/"+src)
642         err = v.translateError(err)
643         if os.IsNotExist(err) {
644                 return err
645         } else if err != nil {
646                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
647         }
648         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
649                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
650         } else if time.Now().Sub(t) > maxClockSkew {
651                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
652         }
653         return nil
654 }
655
656 // Get the LastModified header from resp, and parse it as RFC1123 or
657 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
658 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
659         s := resp.Header.Get("Last-Modified")
660         t, err = time.Parse(time.RFC1123, s)
661         if err != nil && s != "" {
662                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
663                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
664                 // as required by HTTP spec. If it's not a valid HTTP
665                 // header value, it's probably AWS (or s3test) giving
666                 // us a nearly-RFC1123 timestamp.
667                 t, err = time.Parse(nearlyRFC1123, s)
668         }
669         return
670 }
671
672 // Untrash moves block from trash back into store
673 func (v *S3Volume) Untrash(loc string) error {
674         err := v.safeCopy(loc, "trash/"+loc)
675         if err != nil {
676                 return err
677         }
678         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
679         return v.translateError(err)
680 }
681
682 // Status returns a *VolumeStatus representing the current in-use
683 // storage capacity and a fake available capacity that doesn't make
684 // the volume seem full or nearly-full.
685 func (v *S3Volume) Status() *VolumeStatus {
686         return &VolumeStatus{
687                 DeviceNum: 1,
688                 BytesFree: BlockSize * 1000,
689                 BytesUsed: 1,
690         }
691 }
692
693 // InternalStats returns bucket I/O and API call counters.
694 func (v *S3Volume) InternalStats() interface{} {
695         return &v.bucket.stats
696 }
697
698 // String implements fmt.Stringer.
699 func (v *S3Volume) String() string {
700         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
701 }
702
703 // Writable returns false if all future Put, Mtime, and Delete calls
704 // are expected to fail.
705 func (v *S3Volume) Writable() bool {
706         return !v.ReadOnly
707 }
708
709 // Replication returns the storage redundancy of the underlying
710 // device. Configured via command line flag.
711 func (v *S3Volume) Replication() int {
712         return v.S3Replication
713 }
714
715 // GetStorageClasses implements Volume
716 func (v *S3Volume) GetStorageClasses() []string {
717         return v.StorageClasses
718 }
719
720 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
721
722 func (v *S3Volume) isKeepBlock(s string) bool {
723         return s3KeepBlockRegexp.MatchString(s)
724 }
725
726 // fixRace(X) is called when "recent/X" exists but "X" doesn't
727 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
728 // there was a race between Put and Trash, fixRace recovers from the
729 // race by Untrashing the block.
730 func (v *S3Volume) fixRace(loc string) bool {
731         trash, err := v.bucket.Head("trash/"+loc, nil)
732         if err != nil {
733                 if !os.IsNotExist(v.translateError(err)) {
734                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
735                 }
736                 return false
737         }
738         trashTime, err := v.lastModified(trash)
739         if err != nil {
740                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
741                 return false
742         }
743
744         recent, err := v.bucket.Head("recent/"+loc, nil)
745         if err != nil {
746                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
747                 return false
748         }
749         recentTime, err := v.lastModified(recent)
750         if err != nil {
751                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
752                 return false
753         }
754
755         ageWhenTrashed := trashTime.Sub(recentTime)
756         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
757                 // No evidence of a race: block hasn't been written
758                 // since it became eligible for Trash. No fix needed.
759                 return false
760         }
761
762         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
763         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
764         err = v.safeCopy(loc, "trash/"+loc)
765         if err != nil {
766                 log.Printf("error: fixRace: %s", err)
767                 return false
768         }
769         return true
770 }
771
772 func (v *S3Volume) translateError(err error) error {
773         switch err := err.(type) {
774         case *s3.Error:
775                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
776                         strings.Contains(err.Error(), "Not Found") {
777                         return os.ErrNotExist
778                 }
779                 // Other 404 errors like NoSuchVersion and
780                 // NoSuchBucket are different problems which should
781                 // get called out downstream, so we don't convert them
782                 // to os.ErrNotExist.
783         }
784         return err
785 }
786
787 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
788 // and deletes them from the volume.
789 func (v *S3Volume) EmptyTrash() {
790         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
791
792         // Define "ready to delete" as "...when EmptyTrash started".
793         startT := time.Now()
794
795         emptyOneKey := func(trash *s3.Key) {
796                 loc := trash.Key[6:]
797                 if !v.isKeepBlock(loc) {
798                         return
799                 }
800                 atomic.AddInt64(&bytesInTrash, trash.Size)
801                 atomic.AddInt64(&blocksInTrash, 1)
802
803                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
804                 if err != nil {
805                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
806                         return
807                 }
808                 recent, err := v.bucket.Head("recent/"+loc, nil)
809                 if err != nil && os.IsNotExist(v.translateError(err)) {
810                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
811                         err = v.Untrash(loc)
812                         if err != nil {
813                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
814                         }
815                         return
816                 } else if err != nil {
817                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
818                         return
819                 }
820                 recentT, err := v.lastModified(recent)
821                 if err != nil {
822                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
823                         return
824                 }
825                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
826                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
827                                 // recent/loc is too old to protect
828                                 // loc from being Trashed again during
829                                 // the raceWindow that starts if we
830                                 // delete trash/X now.
831                                 //
832                                 // Note this means (TrashCheckInterval
833                                 // < BlobSignatureTTL - raceWindow) is
834                                 // necessary to avoid starvation.
835                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
836                                 v.fixRace(loc)
837                                 v.Touch(loc)
838                                 return
839                         }
840                         _, err := v.bucket.Head(loc, nil)
841                         if os.IsNotExist(err) {
842                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
843                                 v.fixRace(loc)
844                                 return
845                         } else if err != nil {
846                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
847                                 return
848                         }
849                 }
850                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
851                         return
852                 }
853                 err = v.bucket.Del(trash.Key)
854                 if err != nil {
855                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
856                         return
857                 }
858                 atomic.AddInt64(&bytesDeleted, trash.Size)
859                 atomic.AddInt64(&blocksDeleted, 1)
860
861                 _, err = v.bucket.Head(loc, nil)
862                 if err == nil {
863                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
864                         return
865                 }
866                 if !os.IsNotExist(v.translateError(err)) {
867                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
868                         return
869                 }
870                 err = v.bucket.Del("recent/" + loc)
871                 if err != nil {
872                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
873                 }
874         }
875
876         var wg sync.WaitGroup
877         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
878         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
879                 wg.Add(1)
880                 go func() {
881                         defer wg.Done()
882                         for key := range todo {
883                                 emptyOneKey(key)
884                         }
885                 }()
886         }
887
888         trashL := s3Lister{
889                 Bucket:   v.bucket.Bucket,
890                 Prefix:   "trash/",
891                 PageSize: v.IndexPageSize,
892                 Stats:    &v.bucket.stats,
893         }
894         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
895                 todo <- trash
896         }
897         close(todo)
898         wg.Wait()
899
900         if err := trashL.Error(); err != nil {
901                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
902         }
903         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
904 }
905
906 type s3Lister struct {
907         Bucket     *s3.Bucket
908         Prefix     string
909         PageSize   int
910         Stats      *s3bucketStats
911         nextMarker string
912         buf        []s3.Key
913         err        error
914 }
915
916 // First fetches the first page and returns the first item. It returns
917 // nil if the response is the empty set or an error occurs.
918 func (lister *s3Lister) First() *s3.Key {
919         lister.getPage()
920         return lister.pop()
921 }
922
923 // Next returns the next item, fetching the next page if necessary. It
924 // returns nil if the last available item has already been fetched, or
925 // an error occurs.
926 func (lister *s3Lister) Next() *s3.Key {
927         if len(lister.buf) == 0 && lister.nextMarker != "" {
928                 lister.getPage()
929         }
930         return lister.pop()
931 }
932
933 // Return the most recent error encountered by First or Next.
934 func (lister *s3Lister) Error() error {
935         return lister.err
936 }
937
938 func (lister *s3Lister) getPage() {
939         lister.Stats.TickOps("list")
940         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
941         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
942         lister.nextMarker = ""
943         if err != nil {
944                 lister.err = err
945                 return
946         }
947         if resp.IsTruncated {
948                 lister.nextMarker = resp.NextMarker
949         }
950         lister.buf = make([]s3.Key, 0, len(resp.Contents))
951         for _, key := range resp.Contents {
952                 if !strings.HasPrefix(key.Key, lister.Prefix) {
953                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
954                         continue
955                 }
956                 lister.buf = append(lister.buf, key)
957         }
958 }
959
960 func (lister *s3Lister) pop() (k *s3.Key) {
961         if len(lister.buf) > 0 {
962                 k = &lister.buf[0]
963                 lister.buf = lister.buf[1:]
964         }
965         return
966 }
967
968 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
969 type s3bucket struct {
970         *s3.Bucket
971         stats s3bucketStats
972 }
973
974 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
975         rdr, err := b.Bucket.GetReader(path)
976         b.stats.TickOps("get")
977         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
978         b.stats.TickErr(err)
979         return NewCountingReader(rdr, b.stats.TickInBytes), err
980 }
981
982 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
983         resp, err := b.Bucket.Head(path, headers)
984         b.stats.TickOps("head")
985         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
986         b.stats.TickErr(err)
987         return resp, err
988 }
989
990 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
991         if length == 0 {
992                 // goamz will only send Content-Length: 0 when reader
993                 // is nil due to net.http.Request.ContentLength
994                 // behavior.  Otherwise, Content-Length header is
995                 // omitted which will cause some S3 services
996                 // (including AWS and Ceph RadosGW) to fail to create
997                 // empty objects.
998                 r = nil
999         } else {
1000                 r = NewCountingReader(r, b.stats.TickOutBytes)
1001         }
1002         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
1003         b.stats.TickOps("put")
1004         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
1005         b.stats.TickErr(err)
1006         return err
1007 }
1008
1009 func (b *s3bucket) Del(path string) error {
1010         err := b.Bucket.Del(path)
1011         b.stats.TickOps("delete")
1012         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1013         b.stats.TickErr(err)
1014         return err
1015 }
1016
1017 type s3bucketStats struct {
1018         statsTicker
1019         Ops     uint64
1020         GetOps  uint64
1021         PutOps  uint64
1022         HeadOps uint64
1023         DelOps  uint64
1024         ListOps uint64
1025 }
1026
1027 func (s *s3bucketStats) TickErr(err error) {
1028         if err == nil {
1029                 return
1030         }
1031         errType := fmt.Sprintf("%T", err)
1032         if err, ok := err.(*s3.Error); ok {
1033                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1034         }
1035         s.statsTicker.TickErr(err, errType)
1036 }