1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
29 "git.arvados.org/arvados.git/sdk/go/arvados"
30 "git.arvados.org/arvados.git/sdk/go/ctxlog"
31 "github.com/Azure/azure-sdk-for-go/storage"
32 "github.com/prometheus/client_golang/prometheus"
33 "github.com/sirupsen/logrus"
34 check "gopkg.in/check.v1"
38 // This cannot be the fake account name "devstoreaccount1"
39 // used by Microsoft's Azure emulator: the Azure SDK
40 // recognizes that magic string and changes its behavior to
41 // cater to the Azure SDK's own test suite.
42 fakeAccountName = "fakeaccountname"
43 fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
47 azureTestContainer string
48 azureTestDebug = os.Getenv("ARVADOS_DEBUG") != ""
54 "test.azure-storage-container-volume",
56 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
62 Metadata map[string]string
64 Uncommitted map[string][]byte
67 type azStubHandler struct {
69 logger logrus.FieldLogger
70 blobs map[string]*azBlob
71 race chan chan struct{}
75 func newAzStubHandler(c *check.C) *azStubHandler {
76 return &azStubHandler{
77 blobs: make(map[string]*azBlob),
78 logger: ctxlog.TestLogger(c),
82 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
83 blob, ok := h.blobs[container+"|"+hash]
90 func (h *azStubHandler) BlockWriteRaw(container, hash string, data []byte) {
93 h.blobs[container+"|"+hash] = &azBlob{
96 Metadata: make(map[string]string),
97 Uncommitted: make(map[string][]byte),
101 func (h *azStubHandler) unlockAndRace() {
106 // Signal caller that race is starting by reading from
107 // h.race. If we get a channel, block until that channel is
108 // ready to receive. If we get nil (or h.race is closed) just
110 if c := <-h.race; c != nil {
116 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
118 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
122 defer h.logger.Printf("azStubHandler: %+v", r)
125 path := strings.Split(r.URL.Path, "/")
132 if err := r.ParseForm(); err != nil {
133 h.logger.Printf("azStubHandler(%+v): %s", r, err)
134 rw.WriteHeader(http.StatusBadRequest)
138 if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
139 rw.WriteHeader(http.StatusLengthRequired)
143 body, err := ioutil.ReadAll(r.Body)
148 type blockListRequestBody struct {
149 XMLName xml.Name `xml:"BlockList"`
153 blob, blobExists := h.blobs[container+"|"+hash]
156 case r.Method == "PUT" && r.Form.Get("comp") == "":
158 if _, ok := h.blobs[container+"|"+hash]; !ok {
159 // Like the real Azure service, we offer a
160 // race window during which other clients can
161 // list/get the new blob before any data is
163 h.blobs[container+"|"+hash] = &azBlob{
165 Uncommitted: make(map[string][]byte),
166 Metadata: make(map[string]string),
171 metadata := make(map[string]string)
172 for k, v := range r.Header {
173 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
174 name := k[len("x-ms-meta-"):]
175 metadata[strings.ToLower(name)] = v[0]
178 h.blobs[container+"|"+hash] = &azBlob{
181 Uncommitted: make(map[string][]byte),
185 rw.WriteHeader(http.StatusCreated)
186 case r.Method == "PUT" && r.Form.Get("comp") == "block":
189 h.logger.Printf("Got block for nonexistent blob: %+v", r)
190 rw.WriteHeader(http.StatusBadRequest)
193 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
194 if err != nil || len(blockID) == 0 {
195 h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
196 rw.WriteHeader(http.StatusBadRequest)
199 blob.Uncommitted[string(blockID)] = body
200 rw.WriteHeader(http.StatusCreated)
201 case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
202 // "Put Block List" API
203 bl := &blockListRequestBody{}
204 if err := xml.Unmarshal(body, bl); err != nil {
205 h.logger.Printf("xml Unmarshal: %s", err)
206 rw.WriteHeader(http.StatusBadRequest)
209 for _, encBlockID := range bl.Uncommitted {
210 blockID, err := base64.StdEncoding.DecodeString(encBlockID)
211 if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
212 h.logger.Printf("Invalid blockid: %+q", encBlockID)
213 rw.WriteHeader(http.StatusBadRequest)
216 blob.Data = blob.Uncommitted[string(blockID)]
217 blob.Etag = makeEtag()
218 blob.Mtime = time.Now()
219 delete(blob.Uncommitted, string(blockID))
221 rw.WriteHeader(http.StatusCreated)
222 case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
223 // "Set Metadata Headers" API. We don't bother
224 // stubbing "Get Metadata Headers": azureBlobVolume
225 // sets metadata headers only as a way to bump Etag
226 // and Last-Modified.
228 h.logger.Printf("Got metadata for nonexistent blob: %+v", r)
229 rw.WriteHeader(http.StatusBadRequest)
232 blob.Metadata = make(map[string]string)
233 for k, v := range r.Header {
234 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
235 name := k[len("x-ms-meta-"):]
236 blob.Metadata[strings.ToLower(name)] = v[0]
239 blob.Mtime = time.Now()
240 blob.Etag = makeEtag()
241 case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
242 // "Get Blob Metadata" API
244 rw.WriteHeader(http.StatusNotFound)
247 for k, v := range blob.Metadata {
248 rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
251 case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
254 rw.WriteHeader(http.StatusNotFound)
258 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
259 b0, err0 := strconv.Atoi(rangeSpec[1])
260 b1, err1 := strconv.Atoi(rangeSpec[2])
261 if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
262 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
263 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
266 rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
267 rw.WriteHeader(http.StatusPartialContent)
268 data = data[b0 : b1+1]
270 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
271 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
272 if r.Method == "GET" {
273 if _, err := rw.Write(data); err != nil {
274 h.logger.Printf("write %+q: %s", data, err)
278 case r.Method == "DELETE" && hash != "":
281 rw.WriteHeader(http.StatusNotFound)
284 delete(h.blobs, container+"|"+hash)
285 rw.WriteHeader(http.StatusAccepted)
286 case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
290 rw.WriteHeader(http.StatusServiceUnavailable)
293 prefix := container + "|" + r.Form.Get("prefix")
294 marker := r.Form.Get("marker")
297 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
301 resp := storage.BlobListResponse{
304 MaxResults: int64(maxResults),
306 var hashes sort.StringSlice
307 for k := range h.blobs {
308 if strings.HasPrefix(k, prefix) {
309 hashes = append(hashes, k[len(container)+1:])
313 for _, hash := range hashes {
314 if len(resp.Blobs) == maxResults {
315 resp.NextMarker = hash
318 if len(resp.Blobs) > 0 || marker == "" || marker == hash {
319 blob := h.blobs[container+"|"+hash]
320 bmeta := map[string]string(nil)
321 if r.Form.Get("include") == "metadata" {
322 bmeta = blob.Metadata
326 Properties: storage.BlobProperties{
327 LastModified: storage.TimeRFC1123(blob.Mtime),
328 ContentLength: int64(len(blob.Data)),
333 resp.Blobs = append(resp.Blobs, b)
336 buf, err := xml.Marshal(resp)
339 rw.WriteHeader(http.StatusInternalServerError)
343 h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
344 rw.WriteHeader(http.StatusNotImplemented)
348 // azStubDialer is a net.Dialer that notices when the Azure driver
349 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
350 // in such cases transparently dials "127.0.0.1:46067" instead.
351 type azStubDialer struct {
352 logger logrus.FieldLogger
356 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
358 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
359 if hp := localHostPortRe.FindString(address); hp != "" {
361 d.logger.Debug("azStubDialer: dial", hp, "instead of", address)
365 return d.Dialer.Dial(network, address)
368 type testableAzureBlobVolume struct {
370 azHandler *azStubHandler
371 azStub *httptest.Server
375 func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolumeParams) *testableAzureBlobVolume {
376 azHandler := newAzStubHandler(t.(*check.C))
377 azStub := httptest.NewServer(azHandler)
379 var azClient storage.Client
382 container := azureTestContainer
384 // Connect to stub instead of real Azure storage service
385 stubURLBase := strings.Split(azStub.URL, "://")[1]
386 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
389 container = "fakecontainername"
391 // Connect to real Azure storage service
392 if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil {
396 azClient.Sender = &singleSender{}
398 bs := azClient.GetBlobService()
399 v := &azureBlobVolume{
400 ContainerName: container,
401 WriteRaceInterval: arvados.Duration(time.Millisecond),
402 WriteRacePollTime: arvados.Duration(time.Nanosecond),
403 ListBlobsMaxAttempts: 2,
404 ListBlobsRetryDelay: arvados.Duration(time.Millisecond),
406 container: &azureContainer{ctr: bs.GetContainerReference(container)},
407 cluster: params.Cluster,
408 volume: params.ConfigVolume,
409 logger: ctxlog.TestLogger(t),
410 metrics: params.MetricsVecs,
411 bufferPool: params.BufferPool,
413 if err = v.check(); err != nil {
417 return &testableAzureBlobVolume{
419 azHandler: azHandler,
425 var _ = check.Suite(&stubbedAzureBlobSuite{})
427 type stubbedAzureBlobSuite struct {
428 origHTTPTransport http.RoundTripper
431 func (s *stubbedAzureBlobSuite) SetUpSuite(c *check.C) {
432 s.origHTTPTransport = http.DefaultTransport
433 http.DefaultTransport = &http.Transport{
434 Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
438 func (s *stubbedAzureBlobSuite) TearDownSuite(c *check.C) {
439 http.DefaultTransport = s.origHTTPTransport
442 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
443 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
444 return s.newTestableAzureBlobVolume(t, params)
448 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
449 // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
450 for _, b := range []int{2<<22 - 1, 2<<22 - 1} {
451 c.Logf("=== MaxGetBytes=%d", b)
452 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
453 v := s.newTestableAzureBlobVolume(t, params)
460 func (s *stubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
461 DoGenericVolumeTests(c, false, func(c TB, params newVolumeParams) TestableVolume {
462 return s.newTestableAzureBlobVolume(c, params)
466 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
467 v := s.newTestableAzureBlobVolume(c, newVolumeParams{
468 Cluster: testCluster(c),
469 ConfigVolume: arvados.Volume{Replication: 3},
470 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
471 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
475 for _, size := range []int{
476 2<<22 - 1, // one <max read
477 2 << 22, // one =max read
478 2<<22 + 1, // one =max read, one <max
479 2 << 23, // two =max reads
483 data := make([]byte, size)
484 for i := range data {
485 data[i] = byte((i + 7) & 0xff)
487 hash := fmt.Sprintf("%x", md5.Sum(data))
488 err := v.BlockWrite(context.Background(), hash, data)
492 gotData := &brbuffer{}
493 err = v.BlockRead(context.Background(), hash, gotData)
497 gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
498 c.Check(gotData.Len(), check.Equals, size)
500 c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
505 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
506 v := s.newTestableAzureBlobVolume(c, newVolumeParams{
507 Cluster: testCluster(c),
508 ConfigVolume: arvados.Volume{Replication: 3},
509 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
510 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
514 var wg sync.WaitGroup
516 v.azHandler.race = make(chan chan struct{})
521 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
526 continueBlockWrite := make(chan struct{})
527 // Wait for the stub's BlockWrite to create the empty blob
528 v.azHandler.race <- continueBlockWrite
532 err := v.BlockRead(context.Background(), TestHash, brdiscard)
537 // Wait for the stub's BlockRead to get the empty blob
538 close(v.azHandler.race)
539 // Allow stub's BlockWrite to continue, so the real data is ready
540 // when the volume's BlockRead retries
542 // Wait for BlockRead() and BlockWrite() to finish
546 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
547 v := s.newTestableAzureBlobVolume(c, newVolumeParams{
548 Cluster: testCluster(c),
549 ConfigVolume: arvados.Volume{Replication: 3},
550 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
551 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
553 v.azureBlobVolume.WriteRaceInterval.Set("2s")
554 v.azureBlobVolume.WriteRacePollTime.Set("5ms")
557 v.BlockWriteRaw(TestHash, nil)
559 buf := new(bytes.Buffer)
560 v.Index(context.Background(), "", buf)
562 c.Errorf("Index %+q should be empty", buf.Bytes())
565 v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
567 allDone := make(chan struct{})
571 err := v.BlockRead(context.Background(), TestHash, buf)
576 c.Check(buf.String(), check.Equals, "")
580 case <-time.After(time.Second):
581 c.Error("BlockRead should have stopped waiting for race when block was 2s old")
585 v.Index(context.Background(), "", buf)
586 if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
587 c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
591 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
592 s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
593 v.BlockWriteRaw(TestHash, TestBlock)
594 return v.BlockRead(ctx, TestHash, brdiscard)
598 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockWrite(c *check.C) {
599 s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
600 return v.BlockWrite(ctx, TestHash, make([]byte, BlockSize))
604 func (s *stubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *testableAzureBlobVolume) error) {
605 v := s.newTestableAzureBlobVolume(c, newVolumeParams{
606 Cluster: testCluster(c),
607 ConfigVolume: arvados.Volume{Replication: 3},
608 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
609 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
612 v.azHandler.race = make(chan chan struct{})
614 ctx, cancel := context.WithCancel(context.Background())
615 allDone := make(chan struct{})
618 err := testFunc(ctx, v)
619 if err != context.Canceled {
620 c.Errorf("got %T %q, expected %q", err, err, context.Canceled)
623 releaseHandler := make(chan struct{})
626 c.Error("testFunc finished without waiting for v.azHandler.race")
627 case <-time.After(10 * time.Second):
628 c.Error("timed out waiting to enter handler")
629 case v.azHandler.race <- releaseHandler:
635 case <-time.After(10 * time.Second):
636 c.Error("timed out waiting to cancel")
645 func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
646 volume := s.newTestableAzureBlobVolume(c, newVolumeParams{
647 Cluster: testCluster(c),
648 ConfigVolume: arvados.Volume{Replication: 3},
649 MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()),
650 BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
652 defer volume.Teardown()
654 stats := func() string {
655 buf, err := json.Marshal(volume.InternalStats())
656 c.Check(err, check.IsNil)
660 c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
661 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
663 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
664 err := volume.BlockRead(context.Background(), loc, brdiscard)
665 c.Check(err, check.NotNil)
666 c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
667 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
668 c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
669 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
671 err = volume.BlockWrite(context.Background(), loc, []byte("foo"))
672 c.Check(err, check.IsNil)
673 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
674 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
676 err = volume.BlockRead(context.Background(), loc, brdiscard)
677 c.Check(err, check.IsNil)
678 err = volume.BlockRead(context.Background(), loc, brdiscard)
679 c.Check(err, check.IsNil)
680 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
683 func (v *testableAzureBlobVolume) BlockWriteRaw(locator string, data []byte) {
684 v.azHandler.BlockWriteRaw(v.ContainerName, locator, data)
687 func (v *testableAzureBlobVolume) TouchWithDate(locator string, lastBlockWrite time.Time) {
688 v.azHandler.TouchWithDate(v.ContainerName, locator, lastBlockWrite)
691 func (v *testableAzureBlobVolume) Teardown() {
695 func (v *testableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
696 return "get", "create"
699 func makeEtag() string {
700 return fmt.Sprintf("0x%x", rand.Int63())