+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"net"
"net/http"
"net/http/httptest"
+ "os"
"regexp"
"sort"
"strconv"
"testing"
"time"
- log "github.com/Sirupsen/logrus"
- "github.com/curoverse/azure-sdk-for-go/storage"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
// used by Microsoft's Azure emulator: the Azure SDK
// recognizes that magic string and changes its behavior to
// cater to the Azure SDK's own test suite.
- fakeAccountName = "fakeAccountName"
+ fakeAccountName = "fakeaccountname"
fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
)
-var azureTestContainer string
+var (
+ azureTestContainer string
+ azureTestDebug = os.Getenv("ARVADOS_DEBUG") != ""
+)
func init() {
flag.StringVar(
type azStubHandler struct {
sync.Mutex
- blobs map[string]*azBlob
- race chan chan struct{}
+ blobs map[string]*azBlob
+ race chan chan struct{}
+ didlist503 bool
}
func newAzStubHandler() *azStubHandler {
func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
- // defer log.Printf("azStubHandler: %+v", r)
+ if azureTestDebug {
+ defer log.Printf("azStubHandler: %+v", r)
+ }
path := strings.Split(r.URL.Path, "/")
container := path[1]
return
}
+ if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
+ rw.WriteHeader(http.StatusLengthRequired)
+ return
+ }
+
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
rw.WriteHeader(http.StatusAccepted)
case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
// "List Blobs" API
+ if !h.didlist503 {
+ h.didlist503 = true
+ rw.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
prefix := container + "|" + r.Form.Get("prefix")
marker := r.Form.Get("marker")
b := storage.Blob{
Name: hash,
Properties: storage.BlobProperties{
- LastModified: blob.Mtime.Format(time.RFC1123),
+ LastModified: storage.TimeRFC1123(blob.Mtime),
ContentLength: int64(len(blob.Data)),
Etag: blob.Etag,
},
func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
if hp := localHostPortRe.FindString(address); hp != "" {
- log.Println("azStubDialer: dial", hp, "instead of", address)
+ if azureTestDebug {
+ log.Println("azStubDialer: dial", hp, "instead of", address)
+ }
address = hp
}
return d.Dialer.Dial(network, address)
t.Fatal(err)
}
}
+ azClient.Sender = &singleSender{}
bs := azClient.GetBlobService()
v := &AzureBlobVolume{
- ContainerName: container,
- ReadOnly: readonly,
- AzureReplication: replication,
- azClient: azClient,
- bsClient: &azureBlobClient{client: &bs},
+ ContainerName: container,
+ ReadOnly: readonly,
+ AzureReplication: replication,
+ ListBlobsMaxAttempts: 2,
+ ListBlobsRetryDelay: arvados.Duration(time.Millisecond),
+ azClient: azClient,
+ container: &azureContainer{ctr: bs.GetContainerReference(container)},
}
return &TestableAzureBlobVolume{
azureWriteRaceInterval = time.Second
azureWriteRacePollTime = time.Millisecond
- allDone := make(chan struct{})
+ var wg sync.WaitGroup
+
v.azHandler.race = make(chan chan struct{})
+
+ wg.Add(1)
go func() {
+ defer wg.Done()
err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Error(err)
continuePut := make(chan struct{})
// Wait for the stub's Put to create the empty blob
v.azHandler.race <- continuePut
+ wg.Add(1)
go func() {
+ defer wg.Done()
buf := make([]byte, len(TestBlock))
_, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
}
- close(allDone)
}()
// Wait for the stub's Get to get the empty blob
close(v.azHandler.race)
// Allow stub's Put to continue, so the real data is ready
// when the volume's Get retries
<-continuePut
- // Wait for volume's Get to return the real data
- <-allDone
+ // Wait for Get() and Put() to finish
+ wg.Wait()
}
func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
+func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: Azure
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
v.azHandler.PutRaw(v.ContainerName, locator, data)
}
v.azStub.Close()
}
+func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
+ return "get", "create"
+}
+
+func (v *TestableAzureBlobVolume) DeviceID() string {
+ // Dummy device id for testing purposes
+ return "azure://azure_blob_volume_test"
+}
+
+func (v *TestableAzureBlobVolume) Start(vm *volumeMetricsVecs) error {
+ // Override original Start() to be able to assign CounterVecs with a dummy DeviceID
+ v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
+ return nil
+}
+
func makeEtag() string {
return fmt.Sprintf("0x%x", rand.Int63())
}