"log"
"os"
"regexp"
+ "strconv"
"strings"
"sync"
"time"
return nil
}
+// Return true if expires_at metadata attribute is found on the block
+func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
+ metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+ if err != nil {
+ return false, metadata, v.translateError(err)
+ }
+ if metadata["expires_at"] != "" {
+ return true, metadata, nil
+ }
+ return false, metadata, nil
+}
+
// Get reads a Keep block that has been stored as a block blob in the
// container.
//
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a PutBlob operation is in progress, and
// wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+ trashed, _, err := v.checkTrashed(loc)
+ if err != nil {
+ return 0, err
+ }
+ if trashed {
+ return 0, os.ErrNotExist
+ }
var deadline time.Time
haveDeadline := false
- buf, err := v.get(loc)
- for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
+ size, err := v.get(loc, buf)
+ for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
// Seeing a brand new empty block probably means we're
// in a race with CreateBlob, which under the hood
// (apparently) does "CreateEmpty" and "CommitData"
} else if time.Now().After(deadline) {
break
}
- bufs.Put(buf)
time.Sleep(azureWriteRacePollTime)
- buf, err = v.get(loc)
+ size, err = v.get(loc, buf)
}
if haveDeadline {
- log.Printf("Race ended with len(buf)==%d", len(buf))
+ log.Printf("Race ended with size==%d", size)
}
- return buf, err
+ return size, err
}
-func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
- expectSize := BlockSize
+func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
+ expectSize := len(buf)
if azureMaxGetBytes < BlockSize {
// Unfortunately the handler doesn't tell us how long the blob
// is expected to be, so we have to ask Azure.
props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
if err != nil {
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
- return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+ return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
}
expectSize = int(props.ContentLength)
}
- buf := bufs.Get(expectSize)
if expectSize == 0 {
- return buf, nil
+ return 0, nil
}
// We'll update this actualSize if/when we get the last piece.
if startPos == 0 && endPos == expectSize {
rdr, err = v.bsClient.GetBlob(v.containerName, loc)
} else {
- rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1))
+ rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
}
if err != nil {
errors[p] = err
wg.Wait()
for _, err := range errors {
if err != nil {
- bufs.Put(buf)
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
}
- return buf[:actualSize], nil
+ return actualSize, nil
}
// Compare the given data with existing stored data.
func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+ trashed, _, err := v.checkTrashed(loc)
+ if err != nil {
+ return err
+ }
+ if trashed {
+ return os.ErrNotExist
+ }
rdr, err := v.bsClient.GetBlob(v.containerName, loc)
if err != nil {
return v.translateError(err)
if v.readonly {
return MethodDisabledError
}
- return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
+ return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
}
// Touch updates the last-modified property of a block blob.
if v.readonly {
return MethodDisabledError
}
- return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
- "touch": fmt.Sprintf("%d", time.Now()),
- })
+ trashed, metadata, err := v.checkTrashed(loc)
+ if err != nil {
+ return err
+ }
+ if trashed {
+ return os.ErrNotExist
+ }
+
+ metadata["touch"] = fmt.Sprintf("%d", time.Now())
+ return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
}
// Mtime returns the last-modified property of a block blob.
func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
+ trashed, _, err := v.checkTrashed(loc)
+ if err != nil {
+ return time.Time{}, err
+ }
+ if trashed {
+ return time.Time{}, os.ErrNotExist
+ }
+
props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
if err != nil {
return time.Time{}, err
// container.
func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
params := storage.ListBlobsParameters{
- Prefix: prefix,
+ Prefix: prefix,
+ Include: "metadata",
}
for {
resp, err := v.bsClient.ListBlobs(v.containerName, params)
// value.
continue
}
+ if b.Metadata["expires_at"] != "" {
+ // Trashed blob; exclude it from response
+ continue
+ }
fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
}
if resp.NextMarker == "" {
return MethodDisabledError
}
- if trashLifetime != 0 {
- return ErrNotImplemented
- }
-
// Ideally we would use If-Unmodified-Since, but that
// particular condition seems to be ignored by Azure. Instead,
// we get the Etag before checking Mtime, and use If-Match to
} else if time.Since(t) < blobSignatureTTL {
return nil
}
- return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+
+ // If trashLifetime == 0, just delete it
+ if trashLifetime == 0 {
+ return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+ "If-Match": props.Etag,
+ })
+ }
+
+ // Otherwise, mark as trash
+ return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
+ "expires_at": fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()),
+ }, map[string]string{
"If-Match": props.Etag,
})
}
// Untrash a Keep block.
-// TBD
+// Delete the expires_at metadata attribute
func (v *AzureBlobVolume) Untrash(loc string) error {
- return ErrNotImplemented
+ // if expires_at does not exist, return NotFoundError
+ metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+ if err != nil {
+ return v.translateError(err)
+ }
+ if metadata["expires_at"] == "" {
+ return os.ErrNotExist
+ }
+
+ // reset expires_at metadata attribute
+ metadata["expires_at"] = ""
+ err = v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+ return v.translateError(err)
}
// Status returns a VolumeStatus struct with placeholder data.
switch {
case err == nil:
return err
- case strings.Contains(err.Error(), "404 Not Found"):
+ case strings.Contains(err.Error(), "Not Found"):
// "storage: service returned without a response body (404 Not Found)"
return os.ErrNotExist
default:
// EmptyTrash looks for trashed blocks that exceeded trashLifetime
// and deletes them from the volume.
-// TBD
func (v *AzureBlobVolume) EmptyTrash() {
+ var bytesDeleted, bytesInTrash int64
+ var blocksDeleted, blocksInTrash int
+ params := storage.ListBlobsParameters{Include: "metadata"}
+
+ for {
+ resp, err := v.bsClient.ListBlobs(v.containerName, params)
+ if err != nil {
+ log.Printf("EmptyTrash: ListBlobs: %v", err)
+ break
+ }
+ for _, b := range resp.Blobs {
+ // Check if the block is expired
+ if b.Metadata["expires_at"] == "" {
+ continue
+ }
+
+ blocksInTrash++
+ bytesInTrash += b.Properties.ContentLength
+
+ expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
+ if err != nil {
+ log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
+ continue
+ }
+
+ if expiresAt > time.Now().Unix() {
+ continue
+ }
+
+ err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{
+ "If-Match": b.Properties.Etag,
+ })
+ if err != nil {
+ log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
+ continue
+ }
+ blocksDeleted++
+ bytesDeleted += b.Properties.ContentLength
+ }
+ if resp.NextMarker == "" {
+ break
+ }
+ params.Marker = resp.NextMarker
+ }
+
+ log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}