10223: Added output_name column to container_request. When being finalized, if output...
[arvados.git] / services / keepstore / azure_blob_volume_test.go
index e3c0e27083245f2d7cbead33f915f9365774cc02..232382c4216a60fd3c9f658a549957441805633a 100644 (file)
@@ -2,13 +2,13 @@ package main
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "encoding/base64"
        "encoding/xml"
        "flag"
        "fmt"
        "io/ioutil"
-       "log"
        "math/rand"
        "net"
        "net/http"
@@ -21,6 +21,7 @@ import (
        "testing"
        "time"
 
+       log "github.com/Sirupsen/logrus"
        "github.com/curoverse/azure-sdk-for-go/storage"
 )
 
@@ -74,6 +75,7 @@ func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
        h.blobs[container+"|"+hash] = &azBlob{
                Data:        data,
                Mtime:       time.Now(),
+               Metadata:    make(map[string]string),
                Uncommitted: make(map[string][]byte),
        }
 }
@@ -136,14 +138,23 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                        h.blobs[container+"|"+hash] = &azBlob{
                                Mtime:       time.Now(),
                                Uncommitted: make(map[string][]byte),
+                               Metadata:    make(map[string]string),
                                Etag:        makeEtag(),
                        }
                        h.unlockAndRace()
                }
+               metadata := make(map[string]string)
+               for k, v := range r.Header {
+                       if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+                               name := k[len("x-ms-meta-"):]
+                               metadata[strings.ToLower(name)] = v[0]
+                       }
+               }
                h.blobs[container+"|"+hash] = &azBlob{
                        Data:        body,
                        Mtime:       time.Now(),
                        Uncommitted: make(map[string][]byte),
+                       Metadata:    metadata,
                        Etag:        makeEtag(),
                }
                rw.WriteHeader(http.StatusCreated)
@@ -196,11 +207,22 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                blob.Metadata = make(map[string]string)
                for k, v := range r.Header {
                        if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
-                               blob.Metadata[k] = v[0]
+                               name := k[len("x-ms-meta-"):]
+                               blob.Metadata[strings.ToLower(name)] = v[0]
                        }
                }
                blob.Mtime = time.Now()
                blob.Etag = makeEtag()
+       case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
+               // "Get Blob Metadata" API
+               if !blobExists {
+                       rw.WriteHeader(http.StatusNotFound)
+                       return
+               }
+               for k, v := range blob.Metadata {
+                       rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
+               }
+               return
        case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
                // "Get Blob" API
                if !blobExists {
@@ -265,14 +287,20 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                        }
                        if len(resp.Blobs) > 0 || marker == "" || marker == hash {
                                blob := h.blobs[container+"|"+hash]
-                               resp.Blobs = append(resp.Blobs, storage.Blob{
+                               bmeta := map[string]string(nil)
+                               if r.Form.Get("include") == "metadata" {
+                                       bmeta = blob.Metadata
+                               }
+                               b := storage.Blob{
                                        Name: hash,
                                        Properties: storage.BlobProperties{
                                                LastModified:  blob.Mtime.Format(time.RFC1123),
                                                ContentLength: int64(len(blob.Data)),
                                                Etag:          blob.Etag,
                                        },
-                               })
+                                       Metadata: bmeta,
+                               }
+                               resp.Blobs = append(resp.Blobs, b)
                        }
                }
                buf, err := xml.Marshal(resp)
@@ -338,7 +366,13 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
                }
        }
 
-       v := NewAzureBlobVolume(azClient, container, readonly, replication)
+       v := &AzureBlobVolume{
+               ContainerName:    container,
+               ReadOnly:         readonly,
+               AzureReplication: replication,
+               azClient:         azClient,
+               bsClient:         azClient.GetBlobService(),
+       }
 
        return &TestableAzureBlobVolume{
                AzureBlobVolume: v,
@@ -421,12 +455,12 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
                        data[i] = byte((i + 7) & 0xff)
                }
                hash := fmt.Sprintf("%x", md5.Sum(data))
-               err := v.Put(hash, data)
+               err := v.Put(context.Background(), hash, data)
                if err != nil {
                        t.Error(err)
                }
                gotData := make([]byte, len(data))
-               gotLen, err := v.Get(hash, gotData)
+               gotLen, err := v.Get(context.Background(), hash, gotData)
                if err != nil {
                        t.Error(err)
                }
@@ -467,7 +501,7 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
        allDone := make(chan struct{})
        v.azHandler.race = make(chan chan struct{})
        go func() {
-               err := v.Put(TestHash, TestBlock)
+               err := v.Put(context.Background(), TestHash, TestBlock)
                if err != nil {
                        t.Error(err)
                }
@@ -477,7 +511,7 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
        v.azHandler.race <- continuePut
        go func() {
                buf := make([]byte, len(TestBlock))
-               _, err := v.Get(TestHash, buf)
+               _, err := v.Get(context.Background(), TestHash, buf)
                if err != nil {
                        t.Error(err)
                }
@@ -520,7 +554,7 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
        go func() {
                defer close(allDone)
                buf := make([]byte, BlockSize)
-               n, err := v.Get(TestHash, buf)
+               n, err := v.Get(context.Background(), TestHash, buf)
                if err != nil {
                        t.Error(err)
                        return
@@ -542,12 +576,76 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
        }
 }
 
+func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
+       testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+               v.PutRaw(TestHash, TestBlock)
+               _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
+               return err
+       })
+}
+
+func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
+       testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+               return v.Put(ctx, TestHash, make([]byte, BlockSize))
+       })
+}
+
+func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
+       testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+               v.PutRaw(TestHash, TestBlock)
+               return v.Compare(ctx, TestHash, TestBlock2)
+       })
+}
+
+func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
+       defer func(t http.RoundTripper) {
+               http.DefaultTransport = t
+       }(http.DefaultTransport)
+       http.DefaultTransport = &http.Transport{
+               Dial: (&azStubDialer{}).Dial,
+       }
+
+       v := NewTestableAzureBlobVolume(t, false, 3)
+       defer v.Teardown()
+       v.azHandler.race = make(chan chan struct{})
+
+       ctx, cancel := context.WithCancel(context.Background())
+       allDone := make(chan struct{})
+       go func() {
+               defer close(allDone)
+               err := testFunc(ctx, v)
+               if err != context.Canceled {
+                       t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
+               }
+       }()
+       releaseHandler := make(chan struct{})
+       select {
+       case <-allDone:
+               t.Error("testFunc finished without waiting for v.azHandler.race")
+       case <-time.After(10 * time.Second):
+               t.Error("timed out waiting to enter handler")
+       case v.azHandler.race <- releaseHandler:
+       }
+
+       cancel()
+
+       select {
+       case <-time.After(10 * time.Second):
+               t.Error("timed out waiting to cancel")
+       case <-allDone:
+       }
+
+       go func() {
+               <-releaseHandler
+       }()
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
-       v.azHandler.PutRaw(v.containerName, locator, data)
+       v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
 
 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
-       v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+       v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
 }
 
 func (v *TestableAzureBlobVolume) Teardown() {