+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"bytes"
"context"
+ "crypto/sha256"
"encoding/base64"
"encoding/hex"
- "flag"
+ "encoding/json"
+ "errors"
"fmt"
"io"
"io/ioutil"
+ "log"
"net/http"
"os"
"regexp"
"strings"
"sync"
+ "sync/atomic"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
- log "github.com/Sirupsen/logrus"
-)
-
-const (
- s3DefaultReadTimeout = arvados.Duration(10 * time.Minute)
- s3DefaultConnectTimeout = arvados.Duration(time.Minute)
-)
-
-var (
- // ErrS3TrashDisabled is returned by Trash if that operation
- // is impossible with the current config.
- ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
-
- s3AccessKeyFile string
- s3SecretKeyFile string
- s3RegionName string
- s3Endpoint string
- s3Replication int
- s3UnsafeDelete bool
- s3RaceWindow time.Duration
-
- s3ACL = s3.Private
-)
-
-const (
- maxClockSkew = 600 * time.Second
- nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
-type s3VolumeAdder struct {
- *Config
-}
-
-// String implements flag.Value
-func (s *s3VolumeAdder) String() string {
- return "-"
+func init() {
+ driver["S3"] = newS3Volume
}
-func (s *s3VolumeAdder) Set(bucketName string) error {
- if bucketName == "" {
- return fmt.Errorf("no container name given")
- }
- if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
- return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
- }
- if deprecated.flagSerializeIO {
- log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
+func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+ v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
+ err := json.Unmarshal(volume.DriverParameters, &v)
+ if err != nil {
+ return nil, err
}
- s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
- Bucket: bucketName,
- AccessKeyFile: s3AccessKeyFile,
- SecretKeyFile: s3SecretKeyFile,
- Endpoint: s3Endpoint,
- Region: s3RegionName,
- RaceWindow: arvados.Duration(s3RaceWindow),
- S3Replication: s3Replication,
- UnsafeDelete: s3UnsafeDelete,
- ReadOnly: deprecated.flagReadonly,
- IndexPageSize: 1000,
- })
- return nil
+ return v, v.check()
}
-func s3regions() (okList []string) {
- for r := range aws.Regions {
- okList = append(okList, r)
+func (v *S3Volume) check() error {
+ if v.Bucket == "" || v.AccessKey == "" || v.SecretKey == "" {
+ return errors.New("DriverParameters: Bucket, AccessKey, and SecretKey must be provided")
}
- return
-}
-
-func init() {
- VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
-
- flag.Var(&s3VolumeAdder{theConfig},
- "s3-bucket-volume",
- "Use the given bucket as a storage volume. Can be given multiple times.")
- flag.StringVar(
- &s3RegionName,
- "s3-region",
- "",
- fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
- flag.StringVar(
- &s3Endpoint,
- "s3-endpoint",
- "",
- "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
- flag.StringVar(
- &s3AccessKeyFile,
- "s3-access-key-file",
- "",
- "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
- flag.StringVar(
- &s3SecretKeyFile,
- "s3-secret-key-file",
- "",
- "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
- flag.DurationVar(
- &s3RaceWindow,
- "s3-race-window",
- 24*time.Hour,
- "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
- flag.IntVar(
- &s3Replication,
- "s3-replication",
- 2,
- "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
- flag.BoolVar(
- &s3UnsafeDelete,
- "s3-unsafe-delete",
- false,
- "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
-}
-
-// S3Volume implements Volume using an S3 bucket.
-type S3Volume struct {
- AccessKeyFile string
- SecretKeyFile string
- Endpoint string
- Region string
- Bucket string
- LocationConstraint bool
- IndexPageSize int
- S3Replication int
- ConnectTimeout arvados.Duration
- ReadTimeout arvados.Duration
- RaceWindow arvados.Duration
- ReadOnly bool
- UnsafeDelete bool
-
- bucket *s3bucket
-
- startOnce sync.Once
-}
-
-// Examples implements VolumeWithExamples.
-func (*S3Volume) Examples() []Volume {
- return []Volume{
- &S3Volume{
- AccessKeyFile: "/etc/aws_s3_access_key.txt",
- SecretKeyFile: "/etc/aws_s3_secret_key.txt",
- Endpoint: "",
- Region: "us-east-1",
- Bucket: "example-bucket-name",
- IndexPageSize: 1000,
- S3Replication: 2,
- RaceWindow: arvados.Duration(24 * time.Hour),
- ConnectTimeout: arvados.Duration(time.Minute),
- ReadTimeout: arvados.Duration(5 * time.Minute),
- },
- &S3Volume{
- AccessKeyFile: "/etc/gce_s3_access_key.txt",
- SecretKeyFile: "/etc/gce_s3_secret_key.txt",
- Endpoint: "https://storage.googleapis.com",
- Region: "",
- Bucket: "example-bucket-name",
- IndexPageSize: 1000,
- S3Replication: 2,
- RaceWindow: arvados.Duration(24 * time.Hour),
- ConnectTimeout: arvados.Duration(time.Minute),
- ReadTimeout: arvados.Duration(5 * time.Minute),
- },
+ if v.IndexPageSize == 0 {
+ v.IndexPageSize = 1000
+ }
+ if v.RaceWindow < 0 {
+ return errors.New("DriverParameters: RaceWindow must not be negative")
}
-}
-
-// Type implements Volume.
-func (*S3Volume) Type() string {
- return "S3"
-}
-// Start populates private fields and verifies the configuration is
-// valid.
-func (v *S3Volume) Start() error {
region, ok := aws.Regions[v.Region]
if v.Endpoint == "" {
if !ok {
- return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
+ return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
}
} else if ok {
return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
- "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
+ "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
} else {
region = aws.Region{
Name: v.Region,
}
}
- var err error
- var auth aws.Auth
- auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
- if err != nil {
- return err
- }
- auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
- if err != nil {
- return err
+ auth := aws.Auth{
+ AccessKey: v.AccessKey,
+ SecretKey: v.SecretKey,
}
// Zero timeouts mean "wait forever", which is a bad
}
client := s3.New(auth, region)
+ if region.EC2Endpoint.Signer == aws.V4Signature {
+ // Currently affects only eu-central-1
+ client.Signature = aws.V4Signature
+ }
client.ConnectTimeout = time.Duration(v.ConnectTimeout)
client.ReadTimeout = time.Duration(v.ReadTimeout)
v.bucket = &s3bucket{
Name: v.Bucket,
},
}
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
+ v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
+
return nil
}
+const (
+ s3DefaultReadTimeout = arvados.Duration(10 * time.Minute)
+ s3DefaultConnectTimeout = arvados.Duration(time.Minute)
+)
+
+var (
+ // ErrS3TrashDisabled is returned by Trash if that operation
+ // is impossible with the current config.
+ ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
+
+ s3ACL = s3.Private
+
+ zeroTime time.Time
+)
+
+const (
+ maxClockSkew = 600 * time.Second
+ nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
+)
+
+func s3regions() (okList []string) {
+ for r := range aws.Regions {
+ okList = append(okList, r)
+ }
+ return
+}
+
+// S3Volume implements Volume using an S3 bucket.
+type S3Volume struct {
+ AccessKey string
+ SecretKey string
+ Endpoint string
+ Region string
+ Bucket string
+ LocationConstraint bool
+ IndexPageSize int
+ ConnectTimeout arvados.Duration
+ ReadTimeout arvados.Duration
+ RaceWindow arvados.Duration
+ UnsafeDelete bool
+
+ cluster *arvados.Cluster
+ volume arvados.Volume
+ logger logrus.FieldLogger
+ metrics *volumeMetricsVecs
+ bucket *s3bucket
+ startOnce sync.Once
+}
+
+// GetDeviceID returns a globally unique ID for the storage bucket.
+func (v *S3Volume) GetDeviceID() string {
+ return "s3://" + v.Endpoint + "/" + v.Bucket
+}
+
func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
ready := make(chan bool)
go func() {
case <-ready:
return
case <-ctx.Done():
- theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
+ v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
go func() {
<-ready
if err == nil {
}()
select {
case <-ctx.Done():
- theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
+ v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
rdr.Close()
// Must wait for ReadFull to return, to ensure it
// doesn't write to buf after we return.
- theConfig.debugLogf("s3: waiting for ReadFull() to fail")
+ v.logger.Debug("s3: waiting for ReadFull() to fail")
<-ready
return 0, ctx.Err()
case <-ready:
// Compare the given data with the stored data.
func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+ errChan := make(chan error, 1)
+ go func() {
+ _, err := v.bucket.Head("recent/"+loc, nil)
+ errChan <- err
+ }()
+ var err error
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err = <-errChan:
+ }
+ if err != nil {
+ // Checking for "loc" itself here would interfere with
+ // future GET requests.
+ //
+ // On AWS, if X doesn't exist, a HEAD or GET request
+ // for X causes X's non-existence to be cached. Thus,
+ // if we test for X, then create X and return a
+ // signature to our client, the client might still get
+ // 404 from all keepstores when trying to read it.
+ //
+ // To avoid this, we avoid doing HEAD X or GET X until
+ // we know X has been written.
+ //
+ // Note that X might exist even though recent/X
+ // doesn't: for example, the response to HEAD recent/X
+ // might itself come from a stale cache. In such
+ // cases, we will return a false negative and
+ // PutHandler might needlessly create another replica
+ // on a different volume. That's not ideal, but it's
+ // better than passing the eventually-consistent
+ // problem on to our clients.
+ return v.translateError(err)
+ }
rdr, err := v.getReaderWithContext(ctx, loc)
if err != nil {
return err
// Put writes a block.
func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
- if v.ReadOnly {
+ if v.volume.ReadOnly {
return MethodDisabledError
}
var opts s3.Options
return err
}
opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
+ // In AWS regions that use V4 signatures, we need to
+ // provide ContentSHA256 up front. Otherwise, the S3
+ // library reads the request body (from our buffer)
+ // into another new buffer in order to compute the
+ // SHA256 before sending the request -- which would
+ // mean consuming 128 MiB of memory for the duration
+ // of a 64 MiB write.
+ opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
}
// Send the block data through a pipe, so that (if we need to)
go func() {
defer func() {
if ctx.Err() != nil {
- theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+ v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
}
}()
defer close(ready)
if err != nil {
return
}
- err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
}()
select {
case <-ctx.Done():
- theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
+ v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
// Our pipe might be stuck in Write(), waiting for
- // io.Copy() to read. If so, un-stick it. This means
+ // PutReader() to read. If so, un-stick it. This means
// PutReader will get corrupt data, but that's OK: the
// size and MD5 won't match, so the write will fail.
go io.Copy(ioutil.Discard, bufr)
// CloseWithError() will return once pending I/O is done.
bufw.CloseWithError(ctx.Err())
- theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
+ v.logger.Debugf("%s: abandoning PutReader goroutine", v)
return ctx.Err()
case <-ready:
+ // Unblock pipe in case PutReader did not consume it.
+ io.Copy(ioutil.Discard, bufr)
return v.translateError(err)
}
}
// Touch sets the timestamp for the given locator to the current time.
func (v *S3Volume) Touch(loc string) error {
- if v.ReadOnly {
+ if v.volume.ReadOnly {
return MethodDisabledError
}
_, err := v.bucket.Head(loc, nil)
} else if err != nil {
return err
}
- err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
err = v.translateError(err)
if os.IsNotExist(err) {
// The data object X exists, but recent/X is missing.
- err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
log.Printf("error: creating %q: %s", "recent/"+loc, err)
return zeroTime, v.translateError(err)
Bucket: v.bucket.Bucket,
Prefix: prefix,
PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
}
recentL := s3Lister{
Bucket: v.bucket.Bucket,
Prefix: "recent/" + prefix,
PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
}
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
- for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+ for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
if data.Key >= "g" {
// Conveniently, "recent/*" and "trash/*" are
// lexically greater than all hex-encoded data
stamp := data
// Advance to the corresponding recent/X marker, if any
- for recent != nil {
+ for recent != nil && recentL.Error() == nil {
if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
recent = recentL.Next()
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
continue
} else if cmp == 0 {
stamp = recent
recent = recentL.Next()
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
break
} else {
// recent/X marker is missing: we'll
break
}
}
+ if err := recentL.Error(); err != nil {
+ return err
+ }
t, err := time.Parse(time.RFC3339, stamp.LastModified)
if err != nil {
return err
}
fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
}
- return nil
+ return dataL.Error()
}
// Trash a Keep block.
func (v *S3Volume) Trash(loc string) error {
- if v.ReadOnly {
+ if v.volume.ReadOnly {
return MethodDisabledError
}
if t, err := v.Mtime(loc); err != nil {
return err
- } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
+ } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
return nil
}
- if theConfig.TrashLifetime == 0 {
- if !s3UnsafeDelete {
+ if v.cluster.Collections.BlobTrashLifetime == 0 {
+ if !v.UnsafeDelete {
return ErrS3TrashDisabled
}
return v.translateError(v.bucket.Del(loc))
// Can't parse timestamp
return err
}
- safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
+ safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
if safeWindow <= 0 {
// We can't count on "touch trash/X" to prolong
// trash/X's lifetime. The new timestamp might not
MetadataDirective: "REPLACE",
}, v.bucket.Name+"/"+src)
err = v.translateError(err)
- if err != nil {
+ if os.IsNotExist(err) {
return err
+ } else if err != nil {
+ return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
}
if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
if err != nil {
return err
}
- err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
}
-// Writable returns false if all future Put, Mtime, and Delete calls
-// are expected to fail.
-func (v *S3Volume) Writable() bool {
- return !v.ReadOnly
-}
-
-// Replication returns the storage redundancy of the underlying
-// device. Configured via command line flag.
-func (v *S3Volume) Replication() int {
- return v.S3Replication
-}
-
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
func (v *S3Volume) isKeepBlock(s string) bool {
}
ageWhenTrashed := trashTime.Sub(recentTime)
- if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
+ if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
// No evidence of a race: block hasn't been written
// since it became eligible for Trash. No fix needed.
return false
}
- log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
+ log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
err = v.safeCopy(loc, "trash/"+loc)
if err != nil {
// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
func (v *S3Volume) EmptyTrash() {
+ if v.cluster.Collections.BlobDeleteConcurrency < 1 {
+ return
+ }
+
var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
- // Use a merge sort to find matching sets of trash/X and recent/X.
- trashL := s3Lister{
- Bucket: v.bucket.Bucket,
- Prefix: "trash/",
- PageSize: v.IndexPageSize,
- }
// Define "ready to delete" as "...when EmptyTrash started".
startT := time.Now()
- for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+
+ emptyOneKey := func(trash *s3.Key) {
loc := trash.Key[6:]
if !v.isKeepBlock(loc) {
- continue
+ return
}
- bytesInTrash += trash.Size
- blocksInTrash++
+ atomic.AddInt64(&bytesInTrash, trash.Size)
+ atomic.AddInt64(&blocksInTrash, 1)
trashT, err := time.Parse(time.RFC3339, trash.LastModified)
if err != nil {
log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
- continue
+ return
}
recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
if err != nil {
log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
}
- continue
+ return
} else if err != nil {
log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
- continue
+ return
}
recentT, err := v.lastModified(recent)
if err != nil {
log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
- continue
+ return
}
- if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
- if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
+ if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
+ if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
// recent/loc is too old to protect
// loc from being Trashed again during
// the raceWindow that starts if we
log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
v.fixRace(loc)
v.Touch(loc)
- continue
+ return
}
_, err := v.bucket.Head(loc, nil)
if os.IsNotExist(err) {
log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
v.fixRace(loc)
- continue
+ return
} else if err != nil {
log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
- continue
+ return
}
}
- if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
- continue
+ if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
+ return
}
err = v.bucket.Del(trash.Key)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
- continue
+ return
}
- bytesDeleted += trash.Size
- blocksDeleted++
+ atomic.AddInt64(&bytesDeleted, trash.Size)
+ atomic.AddInt64(&blocksDeleted, 1)
_, err = v.bucket.Head(loc, nil)
- if os.IsNotExist(err) {
- err = v.bucket.Del("recent/" + loc)
- if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
- }
- } else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ if err == nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+ return
+ }
+ if !os.IsNotExist(v.translateError(err)) {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ return
}
+ err = v.bucket.Del("recent/" + loc)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+ }
+ }
+
+ var wg sync.WaitGroup
+ todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency)
+ for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for key := range todo {
+ emptyOneKey(key)
+ }
+ }()
}
+
+ trashL := s3Lister{
+ Bucket: v.bucket.Bucket,
+ Prefix: "trash/",
+ PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
+ }
+ for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+ todo <- trash
+ }
+ close(todo)
+ wg.Wait()
+
if err := trashL.Error(); err != nil {
log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
}
Bucket *s3.Bucket
Prefix string
PageSize int
+ Stats *s3bucketStats
nextMarker string
buf []s3.Key
err error
}
func (lister *s3Lister) getPage() {
+ lister.Stats.TickOps("list")
+ lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
lister.nextMarker = ""
if err != nil {
func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
rdr, err := b.Bucket.GetReader(path)
+ b.stats.TickOps("get")
b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
b.stats.TickErr(err)
return NewCountingReader(rdr, b.stats.TickInBytes), err
func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
resp, err := b.Bucket.Head(path, headers)
+ b.stats.TickOps("head")
b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
b.stats.TickErr(err)
return resp, err
}
func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
- err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.TickOutBytes), length, contType, perm, options)
- b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
- b.stats.TickErr(err)
- return err
-}
-
-func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
- err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.TickOutBytes), int64(len(data)), contType, perm, options)
+ if length == 0 {
+ // goamz will only send Content-Length: 0 when reader
+ // is nil due to net.http.Request.ContentLength
+ // behavior. Otherwise, Content-Length header is
+ // omitted which will cause some S3 services
+ // (including AWS and Ceph RadosGW) to fail to create
+ // empty objects.
+ r = nil
+ } else {
+ r = NewCountingReader(r, b.stats.TickOutBytes)
+ }
+ err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+ b.stats.TickOps("put")
b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
b.stats.TickErr(err)
return err
func (b *s3bucket) Del(path string) error {
err := b.Bucket.Del(path)
+ b.stats.TickOps("delete")
b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
b.stats.TickErr(err)
return err