+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeConcurrentRanges(c *check.C) {
+ // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
+ for _, b := range []int{2 << 22, 2<<22 - 1} {
+ DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ v := s.newTestableAzureBlobVolume(t, cluster, volume, metrics)
+ v.MaxGetBytes = b
+ return v
+ })
+ }
+}
+
+func (s *StubbedAzureBlobSuite) TestReadonlyAzureBlobVolumeWithGeneric(c *check.C) {
+ DoGenericVolumeTests(c, false, func(c TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ return s.newTestableAzureBlobVolume(c, cluster, volume, metrics)
+ })
+}
+
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
+ v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+ defer v.Teardown()
+
+ for _, size := range []int{
+ 2<<22 - 1, // one <max read
+ 2 << 22, // one =max read
+ 2<<22 + 1, // one =max read, one <max
+ 2 << 23, // two =max reads
+ BlockSize - 1,
+ BlockSize,
+ } {
+ data := make([]byte, size)
+ for i := range data {
+ data[i] = byte((i + 7) & 0xff)
+ }
+ hash := fmt.Sprintf("%x", md5.Sum(data))
+ err := v.Put(context.Background(), hash, data)
+ if err != nil {
+ c.Error(err)
+ }
+ gotData := make([]byte, len(data))
+ gotLen, err := v.Get(context.Background(), hash, gotData)
+ if err != nil {
+ c.Error(err)
+ }
+ gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
+ if gotLen != size {
+ c.Errorf("length mismatch: got %d != %d", gotLen, size)
+ }
+ if gotHash != hash {
+ c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
+ }
+ }
+}
+
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
+ v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+ defer v.Teardown()
+
+ var wg sync.WaitGroup
+
+ v.azHandler.race = make(chan chan struct{})
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err := v.Put(context.Background(), TestHash, TestBlock)
+ if err != nil {
+ c.Error(err)
+ }
+ }()
+ continuePut := make(chan struct{})
+ // Wait for the stub's Put to create the empty blob
+ v.azHandler.race <- continuePut
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ buf := make([]byte, len(TestBlock))
+ _, err := v.Get(context.Background(), TestHash, buf)
+ if err != nil {
+ c.Error(err)
+ }
+ }()
+ // Wait for the stub's Get to get the empty blob
+ close(v.azHandler.race)
+ // Allow stub's Put to continue, so the real data is ready
+ // when the volume's Get retries
+ <-continuePut
+ // Wait for Get() and Put() to finish
+ wg.Wait()
+}
+
+func (s *StubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *check.C) {
+ v := s.newTestableAzureBlobVolume(c, testCluster(c), arvados.Volume{Replication: 3}, newVolumeMetricsVecs(prometheus.NewRegistry()))
+ v.AzureBlobVolume.WriteRaceInterval.Set("2s")
+ v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
+ defer v.Teardown()
+
+ v.PutRaw(TestHash, nil)
+
+ buf := new(bytes.Buffer)
+ v.IndexTo("", buf)
+ if buf.Len() != 0 {
+ c.Errorf("Index %+q should be empty", buf.Bytes())