1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
29 "git.curoverse.com/arvados.git/sdk/go/arvados"
30 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
31 "github.com/Azure/azure-sdk-for-go/storage"
32 "github.com/prometheus/client_golang/prometheus"
33 "github.com/sirupsen/logrus"
34 check "gopkg.in/check.v1"
38 // This cannot be the fake account name "devstoreaccount1"
39 // used by Microsoft's Azure emulator: the Azure SDK
40 // recognizes that magic string and changes its behavior to
41 // cater to the Azure SDK's own test suite.
42 fakeAccountName = "fakeaccountname"
43 fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
47 azureTestContainer string
48 azureTestDebug = os.Getenv("ARVADOS_DEBUG") != ""
54 "test.azure-storage-container-volume",
56 "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
62 Metadata map[string]string
64 Uncommitted map[string][]byte
67 type azStubHandler struct {
69 logger logrus.FieldLogger
70 blobs map[string]*azBlob
71 race chan chan struct{}
75 func newAzStubHandler(c *check.C) *azStubHandler {
76 return &azStubHandler{
77 blobs: make(map[string]*azBlob),
78 logger: ctxlog.TestLogger(c),
82 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
83 blob, ok := h.blobs[container+"|"+hash]
90 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
93 h.blobs[container+"|"+hash] = &azBlob{
96 Metadata: make(map[string]string),
97 Uncommitted: make(map[string][]byte),
101 func (h *azStubHandler) unlockAndRace() {
106 // Signal caller that race is starting by reading from
107 // h.race. If we get a channel, block until that channel is
108 // ready to receive. If we get nil (or h.race is closed) just
110 if c := <-h.race; c != nil {
116 var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
118 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
122 defer h.logger.Printf("azStubHandler: %+v", r)
125 path := strings.Split(r.URL.Path, "/")
132 if err := r.ParseForm(); err != nil {
133 h.logger.Printf("azStubHandler(%+v): %s", r, err)
134 rw.WriteHeader(http.StatusBadRequest)
138 if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
139 rw.WriteHeader(http.StatusLengthRequired)
143 body, err := ioutil.ReadAll(r.Body)
148 type blockListRequestBody struct {
149 XMLName xml.Name `xml:"BlockList"`
153 blob, blobExists := h.blobs[container+"|"+hash]
156 case r.Method == "PUT" && r.Form.Get("comp") == "":
158 if _, ok := h.blobs[container+"|"+hash]; !ok {
159 // Like the real Azure service, we offer a
160 // race window during which other clients can
161 // list/get the new blob before any data is
163 h.blobs[container+"|"+hash] = &azBlob{
165 Uncommitted: make(map[string][]byte),
166 Metadata: make(map[string]string),
171 metadata := make(map[string]string)
172 for k, v := range r.Header {
173 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
174 name := k[len("x-ms-meta-"):]
175 metadata[strings.ToLower(name)] = v[0]
178 h.blobs[container+"|"+hash] = &azBlob{
181 Uncommitted: make(map[string][]byte),
185 rw.WriteHeader(http.StatusCreated)
186 case r.Method == "PUT" && r.Form.Get("comp") == "block":
189 h.logger.Printf("Got block for nonexistent blob: %+v", r)
190 rw.WriteHeader(http.StatusBadRequest)
193 blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
194 if err != nil || len(blockID) == 0 {
195 h.logger.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
196 rw.WriteHeader(http.StatusBadRequest)
199 blob.Uncommitted[string(blockID)] = body
200 rw.WriteHeader(http.StatusCreated)
201 case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
202 // "Put Block List" API
203 bl := &blockListRequestBody{}
204 if err := xml.Unmarshal(body, bl); err != nil {
205 h.logger.Printf("xml Unmarshal: %s", err)
206 rw.WriteHeader(http.StatusBadRequest)
209 for _, encBlockID := range bl.Uncommitted {
210 blockID, err := base64.StdEncoding.DecodeString(encBlockID)
211 if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
212 h.logger.Printf("Invalid blockid: %+q", encBlockID)
213 rw.WriteHeader(http.StatusBadRequest)
216 blob.Data = blob.Uncommitted[string(blockID)]
217 blob.Etag = makeEtag()
218 blob.Mtime = time.Now()
219 delete(blob.Uncommitted, string(blockID))
221 rw.WriteHeader(http.StatusCreated)
222 case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
223 // "Set Metadata Headers" API. We don't bother
224 // stubbing "Get Metadata Headers": AzureBlobVolume
225 // sets metadata headers only as a way to bump Etag
226 // and Last-Modified.
228 h.logger.Printf("Got metadata for nonexistent blob: %+v", r)
229 rw.WriteHeader(http.StatusBadRequest)
232 blob.Metadata = make(map[string]string)
233 for k, v := range r.Header {
234 if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
235 name := k[len("x-ms-meta-"):]
236 blob.Metadata[strings.ToLower(name)] = v[0]
239 blob.Mtime = time.Now()
240 blob.Etag = makeEtag()
241 case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
242 // "Get Blob Metadata" API
244 rw.WriteHeader(http.StatusNotFound)
247 for k, v := range blob.Metadata {
248 rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
251 case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
254 rw.WriteHeader(http.StatusNotFound)
258 if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
259 b0, err0 := strconv.Atoi(rangeSpec[1])
260 b1, err1 := strconv.Atoi(rangeSpec[2])
261 if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
262 rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
263 rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
266 rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
267 rw.WriteHeader(http.StatusPartialContent)
268 data = data[b0 : b1+1]
270 rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
271 rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
272 if r.Method == "GET" {
273 if _, err := rw.Write(data); err != nil {
274 h.logger.Printf("write %+q: %s", data, err)
278 case r.Method == "DELETE" && hash != "":
281 rw.WriteHeader(http.StatusNotFound)
284 delete(h.blobs, container+"|"+hash)
285 rw.WriteHeader(http.StatusAccepted)
286 case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
290 rw.WriteHeader(http.StatusServiceUnavailable)
293 prefix := container + "|" + r.Form.Get("prefix")
294 marker := r.Form.Get("marker")
297 if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
301 resp := storage.BlobListResponse{
304 MaxResults: int64(maxResults),
306 var hashes sort.StringSlice
307 for k := range h.blobs {
308 if strings.HasPrefix(k, prefix) {
309 hashes = append(hashes, k[len(container)+1:])
313 for _, hash := range hashes {
314 if len(resp.Blobs) == maxResults {
315 resp.NextMarker = hash
318 if len(resp.Blobs) > 0 || marker == "" || marker == hash {
319 blob := h.blobs[container+"|"+hash]
320 bmeta := map[string]string(nil)
321 if r.Form.Get("include") == "metadata" {
322 bmeta = blob.Metadata
326 Properties: storage.BlobProperties{
327 LastModified: storage.TimeRFC1123(blob.Mtime),
328 ContentLength: int64(len(blob.Data)),
333 resp.Blobs = append(resp.Blobs, b)
336 buf, err := xml.Marshal(resp)
339 rw.WriteHeader(http.StatusInternalServerError)
343 h.logger.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
344 rw.WriteHeader(http.StatusNotImplemented)
348 // azStubDialer is a net.Dialer that notices when the Azure driver
349 // tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
350 // in such cases transparently dials "127.0.0.1:46067" instead.
351 type azStubDialer struct {
352 logger logrus.FieldLogger
356 var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
358 func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
359 if hp := localHostPortRe.FindString(address); hp != "" {
361 d.logger.Debug("azStubDialer: dial", hp, "instead of", address)
365 return d.Dialer.Dial(network, address)
368 type TestableAzureBlobVolume struct {
370 azHandler *azStubHandler
371 azStub *httptest.Server
375 func (s *StubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs) *TestableAzureBlobVolume {
376 azHandler := newAzStubHandler(t.(*check.C))
377 azStub := httptest.NewServer(azHandler)
379 var azClient storage.Client
382 container := azureTestContainer
384 // Connect to stub instead of real Azure storage service
385 stubURLBase := strings.Split(azStub.URL, "://")[1]
386 if azClient, err = storage.NewClient(fakeAccountName, fakeAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
389 container = "fakecontainername"
391 // Connect to real Azure storage service
392 if azClient, err = storage.NewBasicClient(os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_NAME"), os.Getenv("ARVADOS_TEST_AZURE_ACCOUNT_KEY")); err != nil {
396 azClient.Sender = &singleSender{}
398 bs := azClient.GetBlobService()
399 v := &AzureBlobVolume{
400 ContainerName: container,
401 WriteRaceInterval: arvados.Duration(time.Millisecond),
402 WriteRacePollTime: arvados.Duration(time.Nanosecond),
403 ListBlobsMaxAttempts: 2,
404 ListBlobsRetryDelay: arvados.Duration(time.Millisecond),
406 container: &azureContainer{ctr: bs.GetContainerReference(container)},
409 logger: ctxlog.TestLogger(t),
412 if err = v.check(); err != nil {
416 return &TestableAzureBlobVolume{
418 azHandler: azHandler,
424 var _ = check.Suite(&StubbedAzureBlobSuite{})
426 type StubbedAzureBlobSuite struct {
427 origHTTPTransport http.RoundTripper
430 func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
431 s.origHTTPTransport = http.DefaultTransport
432 http.DefaultTransport = &http.Transport{
433 Dial: (&azStubDialer{logger: ctxlog.TestLogger(c)}).Dial,
437 func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
438 http.DefaultTransport = s.origHTTPTransport
441 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeWithGeneric(c *check.C) {
442 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
443 return s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
447 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
448 // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
449 for _, b := range []int{2 << 22, 2<<22 - 1} {
450 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
451 v := s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
458 func (s *StubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
459 DoGenericVolumeTests(c, false, func(c TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
460 return s.newTestableAzureBlobVolume(c, cluster, volume, metrics)
464 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
465 v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
468 for _, size := range []int{
469 2<<22 - 1, // one <max read
470 2 << 22, // one =max read
471 2<<22 + 1, // one =max read, one <max
472 2 << 23, // two =max reads
476 data := make([]byte, size)
477 for i := range data {
478 data[i] = byte((i + 7) & 0xff)
480 hash := fmt.Sprintf("%x", md5.Sum(data))
481 err := v.Put(context.Background(), hash, data)
485 gotData := make([]byte, len(data))
486 gotLen, err := v.Get(context.Background(), hash, gotData)
490 gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
492 c.Errorf("length mismatch: got %d != %d", gotLen, size)
495 c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
500 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
501 v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
504 var wg sync.WaitGroup
506 v.azHandler.race = make(chan chan struct{})
511 err := v.Put(context.Background(), TestHash, TestBlock)
516 continuePut := make(chan struct{})
517 // Wait for the stub's Put to create the empty blob
518 v.azHandler.race <- continuePut
522 buf := make([]byte, len(TestBlock))
523 _, err := v.Get(context.Background(), TestHash, buf)
528 // Wait for the stub's Get to get the empty blob
529 close(v.azHandler.race)
530 // Allow stub's Put to continue, so the real data is ready
531 // when the volume's Get retries
533 // Wait for Get() and Put() to finish
537 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
538 v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
539 v.AzureBlobVolume.WriteRaceInterval.Set("2s")
540 v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
543 v.PutRaw(TestHash, nil)
545 buf := new(bytes.Buffer)
548 c.Errorf("Index %+q should be empty", buf.Bytes())
551 v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
553 allDone := make(chan struct{})
556 buf := make([]byte, BlockSize)
557 n, err := v.Get(context.Background(), TestHash, buf)
563 c.Errorf("Got %+q, expected empty buf", buf[:n])
568 case <-time.After(time.Second):
569 c.Error("Get should have stopped waiting for race when block was 2s old")
574 if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
575 c.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
579 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelGet(c *check.C) {
580 s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
581 v.PutRaw(TestHash, TestBlock)
582 _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
587 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelPut(c *check.C) {
588 s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
589 return v.Put(ctx, TestHash, make([]byte, BlockSize))
593 func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelCompare(c *check.C) {
594 s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *TestableAzureBlobVolume) error {
595 v.PutRaw(TestHash, TestBlock)
596 return v.Compare(ctx, TestHash, TestBlock2)
600 func (s *StubbedAzureBlobSuite) testAzureBlobVolumeContextCancel(c *check.C, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
601 v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
603 v.azHandler.race = make(chan chan struct{})
605 ctx, cancel := context.WithCancel(context.Background())
606 allDone := make(chan struct{})
609 err := testFunc(ctx, v)
610 if err != context.Canceled {
611 c.Errorf("got %T %q, expected %q", err, err, context.Canceled)
614 releaseHandler := make(chan struct{})
617 c.Error("testFunc finished without waiting for v.azHandler.race")
618 case <-time.After(10 * time.Second):
619 c.Error("timed out waiting to enter handler")
620 case v.azHandler.race <- releaseHandler:
626 case <-time.After(10 * time.Second):
627 c.Error("timed out waiting to cancel")
636 func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
637 volume := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
638 defer volume.Teardown()
640 stats := func() string {
641 buf, err := json.Marshal(volume.InternalStats())
642 c.Check(err, check.IsNil)
646 c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
647 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
649 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
650 _, err := volume.Get(context.Background(), loc, make([]byte, 3))
651 c.Check(err, check.NotNil)
652 c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
653 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
654 c.Check(stats(), check.Matches, `.*"storage\.AzureStorageServiceError 404 \(404 Not Found\)":[^0].*`)
655 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
657 err = volume.Put(context.Background(), loc, []byte("foo"))
658 c.Check(err, check.IsNil)
659 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
660 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
662 _, err = volume.Get(context.Background(), loc, make([]byte, 3))
663 c.Check(err, check.IsNil)
664 _, err = volume.Get(context.Background(), loc, make([]byte, 3))
665 c.Check(err, check.IsNil)
666 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
669 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
670 v.azHandler.PutRaw(v.ContainerName, locator, data)
673 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
674 v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
677 func (v *TestableAzureBlobVolume) Teardown() {
681 func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
682 return "get", "create"
685 func makeEtag() string {
686 return fmt.Sprintf("0x%x", rand.Int63())