Merge branch 'master' into 8556-trash-untrash-azure-volume
authorradhika <radhika@curoverse.com>
Wed, 27 Apr 2016 21:38:02 +0000 (17:38 -0400)
committerradhika <radhika@curoverse.com>
Wed, 27 Apr 2016 21:38:02 +0000 (17:38 -0400)
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_unix.go

index f08cebff63c65dc5cbbd941407602c262779708a..13cdafc80f2f031cc491c61f4fb14cee998e4db9 100644 (file)
@@ -10,6 +10,7 @@ import (
        "log"
        "os"
        "regexp"
+       "strconv"
        "strings"
        "sync"
        "time"
@@ -133,6 +134,18 @@ func (v *AzureBlobVolume) Check() error {
        return nil
 }
 
+// Return NotFoundError if trash marker is found on the block
+func (v *AzureBlobVolume) checkTrashed(loc string) error {
+       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       if err != nil {
+               return err
+       }
+       if metadata["expires_at"] != "" {
+               return v.translateError(NotFoundError)
+       }
+       return nil
+}
+
 // Get reads a Keep block that has been stored as a block blob in the
 // container.
 //
@@ -140,6 +153,9 @@ func (v *AzureBlobVolume) Check() error {
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+       if err := v.checkTrashed(loc); err != nil {
+               return nil, err
+       }
        var deadline time.Time
        haveDeadline := false
        buf, err := v.get(loc)
@@ -244,6 +260,9 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
 
 // Compare the given data with existing stored data.
 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+       if err := v.checkTrashed(loc); err != nil {
+               return err
+       }
        rdr, err := v.bsClient.GetBlob(v.containerName, loc)
        if err != nil {
                return v.translateError(err)
@@ -260,18 +279,40 @@ func (v *AzureBlobVolume) Put(loc string, block []byte) error {
        return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
 }
 
+func (v *AzureBlobVolume) addToMetadata(loc, name, value string) error {
+       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       if err != nil {
+               return err
+       }
+       metadata[name] = value
+       return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata)
+}
+
+func (v *AzureBlobVolume) removeFromMetadata(loc, name string) error {
+       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       if err != nil {
+               return err
+       }
+       delete(metadata, name)
+       return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata)
+}
+
 // Touch updates the last-modified property of a block blob.
 func (v *AzureBlobVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
-       return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
-               "touch": fmt.Sprintf("%d", time.Now()),
-       })
+       if err := v.checkTrashed(loc); err != nil {
+               return err
+       }
+       return v.addToMetadata(loc, "touch", fmt.Sprintf("%d", time.Now()))
 }
 
 // Mtime returns the last-modified property of a block blob.
 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
+       if err := v.checkTrashed(loc); err != nil {
+               return time.Time{}, err
+       }
        props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
        if err != nil {
                return time.Time{}, err
@@ -321,10 +362,6 @@ func (v *AzureBlobVolume) Trash(loc string) error {
                return MethodDisabledError
        }
 
-       if trashLifetime != 0 {
-               return ErrNotImplemented
-       }
-
        // Ideally we would use If-Unmodified-Since, but that
        // particular condition seems to be ignored by Azure. Instead,
        // we get the Etag before checking Mtime, and use If-Match to
@@ -339,15 +376,40 @@ func (v *AzureBlobVolume) Trash(loc string) error {
        } else if time.Since(t) < blobSignatureTTL {
                return nil
        }
-       return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
-               "If-Match": props.Etag,
-       })
+       if trashLifetime == 0 {
+               return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+                       "If-Match": props.Etag,
+               })
+       }
+       // Mark as trash
+       err = v.addToMetadata(loc, "expires_at", fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()))
+       if err != nil {
+               return err
+       }
+       return v.bsClient.CreateBlockBlobFromReader(v.containerName,
+               fmt.Sprintf("trash.%d.%v", time.Now().Add(trashLifetime).Unix(), loc), 0, nil)
 }
 
 // Untrash a Keep block.
-// TBD
+// Delete the expires_at metadata attribute and trash marker
 func (v *AzureBlobVolume) Untrash(loc string) error {
-       return ErrNotImplemented
+       // if expires_at does not exist, return NotFoundError
+       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       if err != nil {
+               return err
+       }
+       if metadata["expires_at"] == "" {
+               return v.translateError(NotFoundError)
+       }
+       // reset expires_at metadata attribute
+       err = v.removeFromMetadata(loc, "expires_at")
+       if err != nil {
+               return err
+       }
+
+       // delete trash marker if exists
+       _, err = v.bsClient.DeleteBlobIfExists(v.containerName, fmt.Sprintf("trash.%v.%v", metadata["expires_at"], loc), map[string]string{})
+       return err
 }
 
 // Status returns a VolumeStatus struct with placeholder data.
@@ -396,8 +458,65 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
        return keepBlockRegexp.MatchString(s)
 }
 
+var azTrashLocRegexp = regexp.MustCompile(`trash\.(\d+)\.([0-9a-f]{32})$`)
+
 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
 // and deletes them from the volume.
-// TBD
 func (v *AzureBlobVolume) EmptyTrash() {
+       var bytesDeleted, bytesInTrash int64
+       var blocksDeleted, blocksInTrash int
+       var noMoreOldMarkers bool
+       params := storage.ListBlobsParameters{
+               Prefix: "trash.",
+       }
+       for {
+               resp, err := v.bsClient.ListBlobs(v.containerName, params)
+               if err != nil {
+                       log.Printf("EmptyTrash: ListBlobs: %v", err)
+                       break
+               }
+               for _, b := range resp.Blobs {
+                       matches := azTrashLocRegexp.FindStringSubmatch(b.Name)
+                       if len(matches) != 3 {
+                               log.Printf("EmptyTrash: regexp mismatch for: %v", b.Name)
+                               continue
+                       }
+                       blocksInTrash++
+                       deadline, err := strconv.ParseInt(matches[1], 10, 64)
+                       if err != nil {
+                               log.Printf("EmptyTrash: %v: ParseInt(%v): %v", matches[1], err)
+                               continue
+                       }
+                       if deadline > time.Now().Unix() {
+                               noMoreOldMarkers = true
+                               break
+                       }
+
+                       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, matches[2])
+                       if err != nil {
+                               log.Printf("EmptyTrash: %v: GetBlobMetadata(%v): %v", matches[2], err)
+                               continue
+                       }
+
+                       // Make sure the marker is for the current block, not an older one
+                       if metadata["expires_at"] == matches[1] {
+                               err = v.bsClient.DeleteBlob(v.containerName, matches[2], map[string]string{})
+                               if err != nil {
+                                       log.Printf("EmptyTrash: %v: DeleteBlob(%v): %v", matches[2], err)
+                               }
+                               blocksDeleted++
+                       }
+
+                       err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{})
+                       if err != nil {
+                               log.Printf("EmptyTrash: %v: DeleteBlob(%v): %v", b.Name, err)
+                       }
+               }
+               if resp.NextMarker == "" || noMoreOldMarkers == true {
+                       break
+               }
+               params.Marker = resp.NextMarker
+       }
+
+       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
index 439b40221465ada53c805c7b7afb47ba974652a9..08166a8fb7fbee2dd5af28117235216beb2ccce5 100644 (file)
@@ -71,9 +71,12 @@ func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
        h.Lock()
        defer h.Unlock()
+       metadata := make(map[string]string)
+       metadata["last_write_at"] = fmt.Sprintf("%d", time.Now().Unix())
        h.blobs[container+"|"+hash] = &azBlob{
                Data:        data,
                Mtime:       time.Now(),
+               Metadata:    metadata,
                Uncommitted: make(map[string][]byte),
        }
 }
@@ -136,6 +139,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                        h.blobs[container+"|"+hash] = &azBlob{
                                Mtime:       time.Now(),
                                Uncommitted: make(map[string][]byte),
+                               Metadata:    make(map[string]string),
                                Etag:        makeEtag(),
                        }
                        h.unlockAndRace()
@@ -144,6 +148,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                        Data:        body,
                        Mtime:       time.Now(),
                        Uncommitted: make(map[string][]byte),
+                       Metadata:    make(map[string]string),
                        Etag:        makeEtag(),
                }
                rw.WriteHeader(http.StatusCreated)
@@ -201,6 +206,16 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                }
                blob.Mtime = time.Now()
                blob.Etag = makeEtag()
+       case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
+               // "Get Blob Metadata" API
+               if !blobExists {
+                       rw.WriteHeader(http.StatusNotFound)
+                       return
+               }
+               for k, v := range blob.Metadata {
+                       rw.Header().Set(k, v)
+               }
+               return
        case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
                // "Get Blob" API
                if !blobExists {
index 95166c252f004bef5a1ba1f583f4e13f54117f47..2c1b2668ed59a77fddccda0b2dd5b316b84f1214 100644 (file)
@@ -120,7 +120,7 @@ func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
        defer v.Teardown()
 
        err := v.Compare(TestHash, TestBlock)
-       if err != os.ErrNotExist {
+       if err != os.ErrNotExist && !strings.Contains(err.Error(), "Not Found") {
                t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
        }
 }
@@ -455,7 +455,8 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
-       if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
+       _, err := v.Get(TestHash)
+       if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
                t.Errorf("os.IsNotExist(%v) should have been true", err)
        }
 }
@@ -730,16 +731,16 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        err = v.Trash(TestHash)
        if v.Writable() == false {
                if err != MethodDisabledError {
-                       t.Error(err)
+                       t.Fatal(err)
                }
        } else if err != nil {
                if err != ErrNotImplemented {
-                       t.Error(err)
+                       t.Fatal(err)
                }
        } else {
                _, err = v.Get(TestHash)
-               if err == nil || !os.IsNotExist(err) {
-                       t.Errorf("os.IsNotExist(%v) should have been true", err)
+               if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
+                       t.Fatalf("os.IsNotExist(%v) should have been true", err)
                }
 
                // Untrash
@@ -755,7 +756,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
                t.Fatal(err)
        }
        if bytes.Compare(buf, TestBlock) != 0 {
-               t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+               t.Fatalf("Got data %+q, expected %+q", buf, TestBlock)
        }
        bufs.Put(buf)
 }
@@ -799,7 +800,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        err = checkGet()
-       if err == nil || !os.IsNotExist(err) {
+       if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
                t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
 
@@ -819,7 +820,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        // Untrash should fail if the only block in the trash has
        // already been untrashed.
        err = v.Untrash(TestHash)
-       if err == nil || !os.IsNotExist(err) {
+       if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
                t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
 
@@ -839,7 +840,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
                t.Fatal(err)
        }
        err = checkGet()
-       if err == nil || !os.IsNotExist(err) {
+       if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
                t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
 
@@ -858,20 +859,20 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        // goes away.
        err = v.Trash(TestHash)
        err = checkGet()
-       if err == nil || !os.IsNotExist(err) {
-               t.Errorf("os.IsNotExist(%v) should have been true", err)
+       if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
+               t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
        v.EmptyTrash()
 
        // Untrash won't find it
        err = v.Untrash(TestHash)
-       if err == nil || !os.IsNotExist(err) {
+       if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
                t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
 
        // Get block won't find it
        err = checkGet()
-       if err == nil || !os.IsNotExist(err) {
+       if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
                t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
 
@@ -888,7 +889,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
                t.Fatal(err)
        }
        err = checkGet()
-       if err == nil || !os.IsNotExist(err) {
+       if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
                t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
 
index 996068cf3d2438f71364b0b2c9ddafcdbd712c54..e02035dae6355e19a43db8bf68640bae77aa16cd 100644 (file)
@@ -540,7 +540,7 @@ func (v *UnixVolume) translateError(err error) error {
        }
 }
 
-var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
@@ -556,7 +556,7 @@ func (v *UnixVolume) EmptyTrash() {
                if info.Mode().IsDir() {
                        return nil
                }
-               matches := trashLocRegexp.FindStringSubmatch(path)
+               matches := unixTrashLocRegexp.FindStringSubmatch(path)
                if len(matches) != 3 {
                        return nil
                }