X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ef42f0396f5b51fb8f87b2f7e605d50f32f256b4..c2cba51503a0e41ddd68083993e32fe085e49a7f:/services/keepstore/azure_blob_volume_test.go diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index a240c23e16..3f395e40ca 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -1,35 +1,52 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( "bytes" + "context" + "crypto/md5" "encoding/base64" + "encoding/json" "encoding/xml" "flag" "fmt" "io/ioutil" - "log" "math/rand" "net" "net/http" "net/http/httptest" + "os" "regexp" "sort" "strconv" "strings" "sync" - "testing" "time" - "github.com/curoverse/azure-sdk-for-go/storage" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/Azure/azure-sdk-for-go/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + check "gopkg.in/check.v1" ) const ( - // The same fake credentials used by Microsoft's Azure emulator - emulatorAccountName = "devstoreaccount1" - emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + // This cannot be the fake account name "devstoreaccount1" + // used by Microsoft's Azure emulator: the Azure SDK + // recognizes that magic string and changes its behavior to + // cater to the Azure SDK's own test suite. + fakeAccountName = "fakeaccountname" + fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" ) -var azureTestContainer string +var ( + azureTestContainer string + azureTestDebug = os.Getenv("ARVADOS_DEBUG") != "" +) func init() { flag.StringVar( @@ -49,13 +66,16 @@ type azBlob struct { type azStubHandler struct { sync.Mutex - blobs map[string]*azBlob - race chan chan struct{} + logger logrus.FieldLogger + blobs map[string]*azBlob + race chan chan struct{} + didlist503 bool } -func newAzStubHandler() *azStubHandler { +func newAzStubHandler(c *check.C) *azStubHandler { return &azStubHandler{ - blobs: make(map[string]*azBlob), + blobs: make(map[string]*azBlob), + logger: ctxlog.TestLogger(c), } } @@ -73,6 +93,7 @@ func (h *azStubHandler) PutRaw(container, hash string, data []byte) { h.blobs[container+"|"+hash] = &azBlob{ Data: data, Mtime: time.Now(), + Metadata: make(map[string]string), Uncommitted: make(map[string][]byte), } } @@ -92,10 +113,14 @@ func (h *azStubHandler) unlockAndRace() { h.Lock() } +var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`) + func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { h.Lock() defer h.Unlock() - // defer log.Printf("azStubHandler: %+v", r) + if azureTestDebug { + defer h.logger.Printf("azStubHandler: %+v", r) + } path := strings.Split(r.URL.Path, "/") container := path[1] @@ -105,11 +130,16 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } if err := r.ParseForm(); err != nil { - log.Printf("azStubHandler(%+v): %s", r, err) + h.logger.Printf("azStubHandler(%+v): %s", r, err) rw.WriteHeader(http.StatusBadRequest) return } + if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" { + rw.WriteHeader(http.StatusLengthRequired) + return + } + body, err := ioutil.ReadAll(r.Body) if err != nil { return @@ -133,27 +163,36 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { h.blobs[container+"|"+hash] = &azBlob{ Mtime: time.Now(), Uncommitted: make(map[string][]byte), + Metadata: make(map[string]string), Etag: makeEtag(), } h.unlockAndRace() } + metadata := make(map[string]string) + for k, v := range r.Header { + if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") { + name := k[len("x-ms-meta-"):] + metadata[strings.ToLower(name)] = v[0] + } + } h.blobs[container+"|"+hash] = &azBlob{ Data: body, Mtime: time.Now(), Uncommitted: make(map[string][]byte), + Metadata: metadata, Etag: makeEtag(), } rw.WriteHeader(http.StatusCreated) case r.Method == "PUT" && r.Form.Get("comp") == "block": // "Put Block" API if !blobExists { - log.Printf("Got block for nonexistent blob: %+v", r) + h.logger.Printf("Got block for nonexistent blob: %+v", r) rw.WriteHeader(http.StatusBadRequest) return } blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid")) if err != nil || len(blockID) == 0 { - log.Printf("Invalid blockid: %+q", r.Form.Get("blockid")) + h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid")) rw.WriteHeader(http.StatusBadRequest) return } @@ -163,14 +202,14 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { // "Put Block List" API bl := &blockListRequestBody{} if err := xml.Unmarshal(body, bl); err != nil { - log.Printf("xml Unmarshal: %s", err) + h.logger.Printf("xml Unmarshal: %s", err) rw.WriteHeader(http.StatusBadRequest) return } for _, encBlockID := range bl.Uncommitted { blockID, err := base64.StdEncoding.DecodeString(encBlockID) if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil { - log.Printf("Invalid blockid: %+q", encBlockID) + h.logger.Printf("Invalid blockid: %+q", encBlockID) rw.WriteHeader(http.StatusBadRequest) return } @@ -186,29 +225,53 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { // sets metadata headers only as a way to bump Etag // and Last-Modified. if !blobExists { - log.Printf("Got metadata for nonexistent blob: %+v", r) + h.logger.Printf("Got metadata for nonexistent blob: %+v", r) rw.WriteHeader(http.StatusBadRequest) return } blob.Metadata = make(map[string]string) for k, v := range r.Header { if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") { - blob.Metadata[k] = v[0] + name := k[len("x-ms-meta-"):] + blob.Metadata[strings.ToLower(name)] = v[0] } } blob.Mtime = time.Now() blob.Etag = makeEtag() + case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "": + // "Get Blob Metadata" API + if !blobExists { + rw.WriteHeader(http.StatusNotFound) + return + } + for k, v := range blob.Metadata { + rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v) + } + return case (r.Method == "GET" || r.Method == "HEAD") && hash != "": // "Get Blob" API if !blobExists { rw.WriteHeader(http.StatusNotFound) return } + data := blob.Data + if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil { + b0, err0 := strconv.Atoi(rangeSpec[1]) + b1, err1 := strconv.Atoi(rangeSpec[2]) + if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 { + rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data))) + rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return + } + rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data))) + rw.WriteHeader(http.StatusPartialContent) + data = data[b0 : b1+1] + } rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123)) - rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data))) + rw.Header().Set("Content-Length", strconv.Itoa(len(data))) if r.Method == "GET" { - if _, err := rw.Write(blob.Data); err != nil { - log.Printf("write %+q: %s", blob.Data, err) + if _, err := rw.Write(data); err != nil { + h.logger.Printf("write %+q: %s", data, err) } } h.unlockAndRace() @@ -222,6 +285,11 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusAccepted) case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container": // "List Blobs" API + if !h.didlist503 { + h.didlist503 = true + rw.WriteHeader(http.StatusServiceUnavailable) + return + } prefix := container + "|" + r.Form.Get("prefix") marker := r.Form.Get("marker") @@ -249,24 +317,30 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } if len(resp.Blobs) > 0 || marker == "" || marker == hash { blob := h.blobs[container+"|"+hash] - resp.Blobs = append(resp.Blobs, storage.Blob{ + bmeta := map[string]string(nil) + if r.Form.Get("include") == "metadata" { + bmeta = blob.Metadata + } + b := storage.Blob{ Name: hash, Properties: storage.BlobProperties{ - LastModified: blob.Mtime.Format(time.RFC1123), + LastModified: storage.TimeRFC1123(blob.Mtime), ContentLength: int64(len(blob.Data)), Etag: blob.Etag, }, - }) + Metadata: bmeta, + } + resp.Blobs = append(resp.Blobs, b) } } buf, err := xml.Marshal(resp) if err != nil { - log.Print(err) + h.logger.Error(err) rw.WriteHeader(http.StatusInternalServerError) } rw.Write(buf) default: - log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body) + h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body) rw.WriteHeader(http.StatusNotImplemented) } } @@ -275,6 +349,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and // in such cases transparently dials "127.0.0.1:46067" instead. type azStubDialer struct { + logger logrus.FieldLogger net.Dialer } @@ -282,7 +357,9 @@ var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`) func (d *azStubDialer) Dial(network, address string) (net.Conn, error) { if hp := localHostPortRe.FindString(address); hp != "" { - log.Println("azStubDialer: dial", hp, "instead of", address) + if azureTestDebug { + d.logger.Debug("azStubDialer: dial", hp, "instead of", address) + } address = hp } return d.Dialer.Dial(network, address) @@ -292,37 +369,49 @@ type TestableAzureBlobVolume struct { *AzureBlobVolume azHandler *azStubHandler azStub *httptest.Server - t *testing.T + t TB } -func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) *TestableAzureBlobVolume { - azHandler := newAzStubHandler() +func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs) *TestableAzureBlobVolume { + azHandler := newAzStubHandler(t.(*check.C)) azStub := httptest.NewServer(azHandler) var azClient storage.Client + var err error container := azureTestContainer if container == "" { // Connect to stub instead of real Azure storage service stubURLBase := strings.Split(azStub.URL, "://")[1] - var err error - if azClient, err = storage.NewClient(emulatorAccountName, emulatorAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil { + if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil { t.Fatal(err) } container = "fakecontainername" } else { // Connect to real Azure storage service - accountKey, err := readKeyFromFile(azureStorageAccountKeyFile) - if err != nil { - t.Fatal(err) - } - azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey) - if err != nil { + if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil { t.Fatal(err) } } - - v := NewAzureBlobVolume(azClient, container, readonly, replication) + azClient.Sender = &singleSender{} + + bs := azClient.GetBlobService() + v := &AzureBlobVolume{ + ContainerName: container, + WriteRaceInterval: arvados.Duration(time.Millisecond), + WriteRacePollTime: arvados.Duration(time.Nanosecond), + ListBlobsMaxAttempts: 2, + ListBlobsRetryDelay: arvados.Duration(time.Millisecond), + azClient: azClient, + container: &azureContainer{ctr: bs.GetContainerReference(container)}, + cluster: cluster, + volume: volume, + logger: ctxlog.TestLogger(t), + metrics: metrics, + } + if err = v.check(); err != nil { + t.Fatal(err) + } return &TestableAzureBlobVolume{ AzureBlobVolume: v, @@ -332,149 +421,267 @@ func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) *T } } -func TestAzureBlobVolumeWithGeneric(t *testing.T) { - defer func(t http.RoundTripper) { - http.DefaultTransport = t - }(http.DefaultTransport) +var _ = check.Suite(&StubbedAzureBlobSuite{}) + +type StubbedAzureBlobSuite struct { + origHTTPTransport http.RoundTripper +} + +func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) { + s.origHTTPTransport = http.DefaultTransport http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, + Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial, } - azureWriteRaceInterval = time.Millisecond - azureWriteRacePollTime = time.Nanosecond - DoGenericVolumeTests(t, func(t *testing.T) TestableVolume { - return NewTestableAzureBlobVolume(t, false, azureStorageReplication) +} + +func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) { + http.DefaultTransport = s.origHTTPTransport +} + +func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) { + DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { + return s.newTestableAzureBlobVolume(t, cluster, volume, metrics) }) } -func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) { - defer func(t http.RoundTripper) { - http.DefaultTransport = t - }(http.DefaultTransport) - http.DefaultTransport = &http.Transport{ - Dial: (&azStubDialer{}).Dial, +func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) { + // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases + for _, b := range []int{2 << 22, 2<<22 - 1} { + DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { + v := s.newTestableAzureBlobVolume(t, cluster, volume, metrics) + v.MaxGetBytes = b + return v + }) } - azureWriteRaceInterval = time.Millisecond - azureWriteRacePollTime = time.Nanosecond - DoGenericVolumeTests(t, func(t *testing.T) TestableVolume { - return NewTestableAzureBlobVolume(t, true, azureStorageReplication) +} + +func (s *StubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) { + DoGenericVolumeTests(c, false, func(c TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { + return s.newTestableAzureBlobVolume(c, cluster, volume, metrics) }) } -func TestAzureBlobVolumeReplication(t *testing.T) { - for r := 1; r <= 4; r++ { - v := NewTestableAzureBlobVolume(t, false, r) - defer v.Teardown() - if n := v.Replication(); n != r { - t.Errorf("Got replication %d, expected %d", n, r) +func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) { + v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry())) + defer v.Teardown() + + for _, size := range []int{ + 2<<22 - 1, // one