1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
30 "git.arvados.org/arvados.git/sdk/go/arvados"
31 "git.arvados.org/arvados.git/sdk/go/ctxlog"
32 "github.com/Azure/azure-sdk-for-go/storage"
33 "github.com/prometheus/client_golang/prometheus"
34 "github.com/sirupsen/logrus"
35 check "gopkg.in/check.v1"
39 // This cannot be the fake account name "devstoreaccount1"
40 // used by Microsoft's Azure emulator: the Azure SDK
41 // recognizes that magic string and changes its behavior to
42 // cater to the Azure SDK's own test suite.
43 fakeAccountName = "fakeaccountname"
44 fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
48 azureTestContainer string
49 azureTestDebug = os.Getenv("ARVADOS_DEBUG") != ""
55 "test.azure-storage-container-volume",
57 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
63 Metadata map[string]string
65 Uncommitted map[string][]byte
68 type azStubHandler struct {
70 logger logrus.FieldLogger
71 blobs map[string]*azBlob
72 race chan chan struct{}
76 func newAzStubHandler(c *check.C) *azStubHandler {
77 return &azStubHandler{
78 blobs: make(map[string]*azBlob),
79 logger: ctxlog.TestLogger(c),
83 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
84 blob, ok := h.blobs[container+"|"+hash]
91 func (h *azStubHandler) BlockWriteRaw(container, hash string, data []byte) {
94 h.blobs[container+"|"+hash] = &azBlob{
97 Metadata: make(map[string]string),
98 Uncommitted: make(map[string][]byte),
102 func (h *azStubHandler) unlockAndRace() {
107 // Signal caller that race is starting by reading from
108 // h.race. If we get a channel, block until that channel is
109 // ready to receive. If we get nil (or h.race is closed) just
111 if c := <-h.race; c != nil {
117 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
119 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
123 defer h.logger.Printf("azStubHandler: %+v", r)
126 path := strings.Split(r.URL.Path, "/")
133 if err := r.ParseForm(); err != nil {
134 h.logger.Printf("azStubHandler(%+v): %s", r, err)
135 rw.WriteHeader(http.StatusBadRequest)
139 if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
140 rw.WriteHeader(http.StatusLengthRequired)
144 body, err := ioutil.ReadAll(r.Body)
149 type blockListRequestBody struct {
150 XMLName xml.Name `xml:"BlockList"`
154 blob, blobExists := h.blobs[container+"|"+hash]
157 case r.Method == "PUT" && r.Form.Get("comp") == "":
159 if _, ok := h.blobs[container+"|"+hash]; !ok {
160 // Like the real Azure service, we offer a
161 // race window during which other clients can
162 // list/get the new blob before any data is
164 h.blobs[container+"|"+hash] = &azBlob{
166 Uncommitted: make(map[string][]byte),
167 Metadata: make(map[string]string),
172 metadata := make(map[string]string)
173 for k, v := range r.Header {
174 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
175 name := k[len("x-ms-meta-"):]
176 metadata[strings.ToLower(name)] = v[0]
179 h.blobs[container+"|"+hash] = &azBlob{
182 Uncommitted: make(map[string][]byte),
186 rw.WriteHeader(http.StatusCreated)
187 case r.Method == "PUT" && r.Form.Get("comp") == "block":
190 h.logger.Printf("Got block for nonexistent blob: %+v", r)
191 rw.WriteHeader(http.StatusBadRequest)
194 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
195 if err != nil || len(blockID) == 0 {
196 h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
197 rw.WriteHeader(http.StatusBadRequest)
200 blob.Uncommitted[string(blockID)] = body
201 rw.WriteHeader(http.StatusCreated)
202 case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
203 // "Put Block List" API
204 bl := &blockListRequestBody{}
205 if err := xml.Unmarshal(body, bl); err != nil {
206 h.logger.Printf("xml Unmarshal: %s", err)
207 rw.WriteHeader(http.StatusBadRequest)
210 for _, encBlockID := range bl.Uncommitted {
211 blockID, err := base64.StdEncoding.DecodeString(encBlockID)
212 if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
213 h.logger.Printf("Invalid blockid: %+q", encBlockID)
214 rw.WriteHeader(http.StatusBadRequest)
217 blob.Data = blob.Uncommitted[string(blockID)]
218 blob.Etag = makeEtag()
219 blob.Mtime = time.Now()
220 delete(blob.Uncommitted, string(blockID))
222 rw.WriteHeader(http.StatusCreated)
223 case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
224 // "Set Metadata Headers" API. We don't bother
225 // stubbing "Get Metadata Headers": AzureBlobVolume
226 // sets metadata headers only as a way to bump Etag
227 // and Last-Modified.
229 h.logger.Printf("Got metadata for nonexistent blob: %+v", r)
230 rw.WriteHeader(http.StatusBadRequest)
233 blob.Metadata = make(map[string]string)
234 for k, v := range r.Header {
235 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
236 name := k[len("x-ms-meta-"):]
237 blob.Metadata[strings.ToLower(name)] = v[0]
240 blob.Mtime = time.Now()
241 blob.Etag = makeEtag()
242 case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
243 // "Get Blob Metadata" API
245 rw.WriteHeader(http.StatusNotFound)
248 for k, v := range blob.Metadata {
249 rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
252 case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
255 rw.WriteHeader(http.StatusNotFound)
259 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
260 b0, err0 := strconv.Atoi(rangeSpec[1])
261 b1, err1 := strconv.Atoi(rangeSpec[2])
262 if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
263 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
264 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
267 rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
268 rw.WriteHeader(http.StatusPartialContent)
269 data = data[b0 : b1+1]
271 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
272 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
273 if r.Method == "GET" {
274 if _, err := rw.Write(data); err != nil {
275 h.logger.Printf("write %+q: %s", data, err)
279 case r.Method == "DELETE" && hash != "":
282 rw.WriteHeader(http.StatusNotFound)
285 delete(h.blobs, container+"|"+hash)
286 rw.WriteHeader(http.StatusAccepted)
287 case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
291 rw.WriteHeader(http.StatusServiceUnavailable)
294 prefix := container + "|" + r.Form.Get("prefix")
295 marker := r.Form.Get("marker")
298 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
302 resp := storage.BlobListResponse{
305 MaxResults: int64(maxResults),
307 var hashes sort.StringSlice
308 for k := range h.blobs {
309 if strings.HasPrefix(k, prefix) {
310 hashes = append(hashes, k[len(container)+1:])
314 for _, hash := range hashes {
315 if len(resp.Blobs) == maxResults {
316 resp.NextMarker = hash
319 if len(resp.Blobs) > 0 || marker == "" || marker == hash {
320 blob := h.blobs[container+"|"+hash]
321 bmeta := map[string]string(nil)
322 if r.Form.Get("include") == "metadata" {
323 bmeta = blob.Metadata
327 Properties: storage.BlobProperties{
328 LastModified: storage.TimeRFC1123(blob.Mtime),
329 ContentLength: int64(len(blob.Data)),
334 resp.Blobs = append(resp.Blobs, b)
337 buf, err := xml.Marshal(resp)
340 rw.WriteHeader(http.StatusInternalServerError)
344 h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
345 rw.WriteHeader(http.StatusNotImplemented)
349 // azStubDialer is a net.Dialer that notices when the Azure driver
350 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
351 // in such cases transparently dials "127.0.0.1:46067" instead.
352 type azStubDialer struct {
353 logger logrus.FieldLogger
357 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
359 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
360 if hp := localHostPortRe.FindString(address); hp != "" {
362 d.logger.Debug("azStubDialer: dial", hp, "instead of", address)
366 return d.Dialer.Dial(network, address)
369 type testableAzureBlobVolume struct {
371 azHandler *azStubHandler
372 azStub *httptest.Server
376 func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolumeParams) *testableAzureBlobVolume {
377 azHandler := newAzStubHandler(t.(*check.C))
378 azStub := httptest.NewServer(azHandler)
380 var azClient storage.Client
383 container := azureTestContainer
385 // Connect to stub instead of real Azure storage service
386 stubURLBase := strings.Split(azStub.URL, "://")[1]
387 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
390 container = "fakecontainername"
392 // Connect to real Azure storage service
393 if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil {
397 azClient.Sender = &singleSender{}
399 bs := azClient.GetBlobService()
400 v := &AzureBlobVolume{
401 ContainerName: container,
402 WriteRaceInterval: arvados.Duration(time.Millisecond),
403 WriteRacePollTime: arvados.Duration(time.Nanosecond),
404 ListBlobsMaxAttempts: 2,
405 ListBlobsRetryDelay: arvados.Duration(time.Millisecond),
407 container: &azureContainer{ctr: bs.GetContainerReference(container)},
408 cluster: params.Cluster,
409 volume: params.ConfigVolume,
410 logger: ctxlog.TestLogger(t),
411 metrics: params.MetricsVecs,
412 bufferPool: params.BufferPool,
414 if err = v.check(); err != nil {
418 return &testableAzureBlobVolume{
420 azHandler: azHandler,
426 var _ = check.Suite(&stubbedAzureBlobSuite{})
428 type stubbedAzureBlobSuite struct {
429 origHTTPTransport http.RoundTripper
432 func (s *stubbedAzureBlobSuite) SetUpSuite(c *check.C) {
433 s.origHTTPTransport = http.DefaultTransport
434 http.DefaultTransport = &http.Transport{
435 Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
439 func (s *stubbedAzureBlobSuite) TearDownSuite(c *check.C) {
440 http.DefaultTransport = s.origHTTPTransport
443 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
444 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
445 return s.newTestableAzureBlobVolume(t, params)
449 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
450 // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
451 for _, b := range []int{2<<22 - 1, 2<<22 - 1} {
452 c.Logf("=== MaxGetBytes=%d", b)
453 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
454 v := s.newTestableAzureBlobVolume(t, params)
461 func (s *stubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
462 DoGenericVolumeTests(c, false, func(c TB, params newVolumeParams) TestableVolume {
463 return s.newTestableAzureBlobVolume(c, params)
467 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
468 v := s.newTestableAzureBlobVolume(c, newVolumeParams{
469 Cluster: testCluster(c),
470 ConfigVolume: arvados.Volume{Replication: 3},
471 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
472 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
476 for _, size := range []int{
477 2<<22 - 1, // one <max read
478 2 << 22, // one =max read
479 2<<22 + 1, // one =max read, one <max
480 2 << 23, // two =max reads
484 data := make([]byte, size)
485 for i := range data {
486 data[i] = byte((i + 7) & 0xff)
488 hash := fmt.Sprintf("%x", md5.Sum(data))
489 err := v.BlockWrite(context.Background(), hash, data)
493 gotData := bytes.NewBuffer(nil)
494 gotLen, err := v.BlockRead(context.Background(), hash, gotData)
498 gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
500 c.Errorf("length mismatch: got %d != %d", gotLen, size)
503 c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
508 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
509 v := s.newTestableAzureBlobVolume(c, newVolumeParams{
510 Cluster: testCluster(c),
511 ConfigVolume: arvados.Volume{Replication: 3},
512 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
513 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
517 var wg sync.WaitGroup
519 v.azHandler.race = make(chan chan struct{})
524 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
529 continueBlockWrite := make(chan struct{})
530 // Wait for the stub's BlockWrite to create the empty blob
531 v.azHandler.race <- continueBlockWrite
535 _, err := v.BlockRead(context.Background(), TestHash, io.Discard)
540 // Wait for the stub's BlockRead to get the empty blob
541 close(v.azHandler.race)
542 // Allow stub's BlockWrite to continue, so the real data is ready
543 // when the volume's BlockRead retries
545 // Wait for BlockRead() and BlockWrite() to finish
549 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
550 v := s.newTestableAzureBlobVolume(c, newVolumeParams{
551 Cluster: testCluster(c),
552 ConfigVolume: arvados.Volume{Replication: 3},
553 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
554 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
556 v.AzureBlobVolume.WriteRaceInterval.Set("2s")
557 v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
560 v.BlockWriteRaw(TestHash, nil)
562 buf := new(bytes.Buffer)
563 v.Index(context.Background(), "", buf)
565 c.Errorf("Index %+q should be empty", buf.Bytes())
568 v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
570 allDone := make(chan struct{})
573 buf := bytes.NewBuffer(nil)
574 n, err := v.BlockRead(context.Background(), TestHash, buf)
580 c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n)
585 case <-time.After(time.Second):
586 c.Error("BlockRead should have stopped waiting for race when block was 2s old")
590 v.Index(context.Background(), "", buf)
591 if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
592 c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
596 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
597 s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
598 v.BlockWriteRaw(TestHash, TestBlock)
599 _, err := v.BlockRead(ctx, TestHash, io.Discard)
604 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockWrite(c *check.C) {
605 s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
606 return v.BlockWrite(ctx, TestHash, make([]byte, BlockSize))
610 func (s *stubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *testableAzureBlobVolume) error) {
611 v := s.newTestableAzureBlobVolume(c, newVolumeParams{
612 Cluster: testCluster(c),
613 ConfigVolume: arvados.Volume{Replication: 3},
614 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
615 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
618 v.azHandler.race = make(chan chan struct{})
620 ctx, cancel := context.WithCancel(context.Background())
621 allDone := make(chan struct{})
624 err := testFunc(ctx, v)
625 if err != context.Canceled {
626 c.Errorf("got %T %q, expected %q", err, err, context.Canceled)
629 releaseHandler := make(chan struct{})
632 c.Error("testFunc finished without waiting for v.azHandler.race")
633 case <-time.After(10 * time.Second):
634 c.Error("timed out waiting to enter handler")
635 case v.azHandler.race <- releaseHandler:
641 case <-time.After(10 * time.Second):
642 c.Error("timed out waiting to cancel")
651 func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
652 volume := s.newTestableAzureBlobVolume(c, newVolumeParams{
653 Cluster: testCluster(c),
654 ConfigVolume: arvados.Volume{Replication: 3},
655 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
656 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
658 defer volume.Teardown()
660 stats := func() string {
661 buf, err := json.Marshal(volume.InternalStats())
662 c.Check(err, check.IsNil)
666 c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
667 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
669 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
670 _, err := volume.BlockRead(context.Background(), loc, io.Discard)
671 c.Check(err, check.NotNil)
672 c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
673 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
674 c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
675 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
677 err = volume.BlockWrite(context.Background(), loc, []byte("foo"))
678 c.Check(err, check.IsNil)
679 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
680 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
682 _, err = volume.BlockRead(context.Background(), loc, io.Discard)
683 c.Check(err, check.IsNil)
684 _, err = volume.BlockRead(context.Background(), loc, io.Discard)
685 c.Check(err, check.IsNil)
686 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
689 func (v *testableAzureBlobVolume) BlockWriteRaw(locator string, data []byte) {
690 v.azHandler.BlockWriteRaw(v.ContainerName, locator, data)
693 func (v *testableAzureBlobVolume) TouchWithDate(locator string, lastBlockWrite time.Time) {
694 v.azHandler.TouchWithDate(v.ContainerName, locator, lastBlockWrite)
697 func (v *testableAzureBlobVolume) Teardown() {
701 func (v *testableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
702 return "get", "create"
705 func makeEtag() string {
706 return fmt.Sprintf("0x%x", rand.Int63())