)
var (
+ // Returned by Trash if that operation is impossible with the
+ // current config.
ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
s3AccessKeyFile string
"EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
}
+// S3Volume implements Volume using an S3 bucket.
type S3Volume struct {
*s3.Bucket
raceWindow time.Duration
}
}
+// Check returns an error if the volume is inaccessible (e.g., config
+// error).
func (v *S3Volume) Check() error {
return nil
}
return
}
+// Get a block: copy the block data into buf, and return the number of
+// bytes copied.
func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
rdr, err := v.getReader(loc)
if err != nil {
}
}
+// Compare the given data with the stored data.
func (v *S3Volume) Compare(loc string, expect []byte) error {
rdr, err := v.getReader(loc)
if err != nil {
return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
}
+// Put writes a block.
func (v *S3Volume) Put(loc string, block []byte) error {
if v.readonly {
return MethodDisabledError
return v.translateError(err)
}
+// Touch sets the timestamp for the given locator to the current time.
func (v *S3Volume) Touch(loc string) error {
if v.readonly {
return MethodDisabledError
return v.translateError(err)
}
+// Mtime returns the stored timestamp for the given locator.
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
_, err := v.Bucket.Head(loc, nil)
if err != nil {
return v.lastModified(resp)
}
+// IndexTo writes a complete list of locators with the given prefix
+// for which Get() can retrieve data.
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
}
recentL := s3Lister{
Bucket: v.Bucket,
- Prefix: "recent/"+prefix,
+ Prefix: "recent/" + prefix,
PageSize: v.indexPageSize,
}
for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
return nil
}
+// safeCopy calls PutCopy, and checks the response to make sure the
+// copy succeeded and updated the timestamp on the destination object
+// (PutCopy returns 200 OK if the request was received, even if the
+// copy failed).
func (v *S3Volume) safeCopy(dst, src string) error {
resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
ContentType: "application/octet-stream",
return
}
+// Untrash moves block from trash back into store
func (v *S3Volume) Untrash(loc string) error {
err := v.safeCopy(loc, "trash/"+loc)
if err != nil {
return v.translateError(err)
}
+// Status returns a *VolumeStatus representing the current in-use
+// storage capacity and a fake available capacity that doesn't make
+// the volume seem full or nearly-full.
func (v *S3Volume) Status() *VolumeStatus {
return &VolumeStatus{
DeviceNum: 1,
}
}
+// String implements fmt.Stringer.
func (v *S3Volume) String() string {
return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
}
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
func (v *S3Volume) Writable() bool {
return !v.readonly
}
+
+// Replication returns the storage redundancy of the underlying
+// device. Configured via command line flag.
func (v *S3Volume) Replication() int {
return v.replication
}
// EmptyTrash looks for trashed blocks that exceeded trashLifetime
// and deletes them from the volume.
func (v *S3Volume) EmptyTrash() {
+ var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
// Use a merge sort to find matching sets of trash/X and recent/X.
trashL := s3Lister{
Bucket: v.Bucket,
PageSize: v.indexPageSize,
}
// Define "ready to delete" as "...when EmptyTrash started".
- now := time.Now()
+ startT := time.Now()
for trash := trashL.First(); trash != nil; trash = trashL.Next() {
loc := trash.Key[6:]
if !v.isKeepBlock(loc) {
continue
}
- recent, err := v.Bucket.Head("recent/"+loc, nil)
+ bytesInTrash += trash.Size
+ blocksInTrash++
+
+ trashT, err := time.Parse(time.RFC3339, trash.LastModified)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+ log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
continue
}
- trashT, err := time.Parse(time.RFC3339, trash.LastModified)
+ recent, err := v.Bucket.Head("recent/"+loc, nil)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+ log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
continue
}
recentT, err := v.lastModified(recent)
continue
}
if trashT.Sub(recentT) < blobSignatureTTL {
- v.fixRace(loc)
- continue
+ if age := startT.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+ // recent/loc is too old to protect
+ // loc from being Trashed again during
+ // the raceWindow that starts if we
+ // delete trash/X now.
+ //
+ // Note this means (trashCheckInterval
+ // < blobSignatureTTL - raceWindow) is
+ // necessary to avoid starvation.
+ log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+ v.fixRace(loc)
+ v.Touch(loc)
+ continue
+ } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+ log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+ v.fixRace(loc)
+ continue
+ } else if err != nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ continue
+ }
}
- if now.Sub(trashT) < trashLifetime {
+ if startT.Sub(trashT) < trashLifetime {
continue
}
err = v.Bucket.Del(trash.Key)
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
continue
}
+ bytesDeleted += trash.Size
+ blocksDeleted++
+
_, err = v.Bucket.Head(loc, nil)
if os.IsNotExist(err) {
- err = v.Bucket.Del("recent/"+loc)
+ err = v.Bucket.Del("recent/" + loc)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
}
if err := trashL.Error(); err != nil {
log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
}
+ log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
type s3Lister struct {
- Bucket *s3.Bucket
- Prefix string
- PageSize int
- nextMarker string
- buf []s3.Key
- err error
+ Bucket *s3.Bucket
+ Prefix string
+ PageSize int
+ nextMarker string
+ buf []s3.Key
+ err error
}
// First fetches the first page and returns the first item. It returns