X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/852eadc79b7103b3889eed53a851a1c26c4daeab..bc816b50fc16182fef2f5d17ffd61578432e83c3:/services/keepstore/azure_blob_volume.go diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index 2bf80bf3c6..657c3151ca 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -19,6 +19,8 @@ var ( azureStorageAccountName string azureStorageAccountKeyFile string azureStorageReplication int + azureWriteRaceInterval time.Duration = 15 * time.Second + azureWriteRacePollTime time.Duration = time.Second ) func readKeyFromFile(file string) (string, error) { @@ -96,11 +98,11 @@ type AzureBlobVolume struct { func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume { return &AzureBlobVolume{ - azClient: client, - bsClient: client.GetBlobService(), + azClient: client, + bsClient: client.GetBlobService(), containerName: containerName, - readonly: readonly, - replication: replication, + readonly: readonly, + replication: replication, } } @@ -117,25 +119,49 @@ func (v *AzureBlobVolume) Check() error { } func (v *AzureBlobVolume) Get(loc string) ([]byte, error) { - rdr, err := v.bsClient.GetBlob(v.containerName, loc) - if err != nil { - if strings.Contains(err.Error(), "404 Not Found") { - // "storage: service returned without a response body (404 Not Found)" - return nil, os.ErrNotExist + var deadline time.Time + haveDeadline := false + buf, err := v.get(loc) + for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" { + // Seeing a brand new empty block probably means we're + // in a race with CreateBlob, which under the hood + // (apparently) does "CreateEmpty" and "CommitData" + // with no additional transaction locking. + if !haveDeadline { + t, err := v.Mtime(loc) + if err != nil { + log.Print("Got empty block (possible race) but Mtime failed: ", err) + break + } + deadline = t.Add(azureWriteRaceInterval) + if time.Now().After(deadline) { + break + } + log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline) + haveDeadline = true + } else if time.Now().After(deadline) { + break } - return nil, err + bufs.Put(buf) + time.Sleep(azureWriteRacePollTime) + buf, err = v.get(loc) } - switch err := err.(type) { - case nil: - default: - log.Printf("ERROR IN Get(): %T %#v", err, err) - return nil, err + if haveDeadline { + log.Printf("Race ended with len(buf)==%d", len(buf)) + } + return buf, err +} + +func (v *AzureBlobVolume) get(loc string) ([]byte, error) { + rdr, err := v.bsClient.GetBlob(v.containerName, loc) + if err != nil { + return nil, v.translateError(err) } defer rdr.Close() buf := bufs.Get(BlockSize) n, err := io.ReadFull(rdr, buf) switch err { - case io.EOF, io.ErrUnexpectedEOF: + case nil, io.EOF, io.ErrUnexpectedEOF: return buf[:n], nil default: bufs.Put(buf) @@ -146,7 +172,7 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) { func (v *AzureBlobVolume) Compare(loc string, expect []byte) error { rdr, err := v.bsClient.GetBlob(v.containerName, loc) if err != nil { - return err + return v.translateError(err) } defer rdr.Close() return compareReaderWithBuf(rdr, expect, loc[:32]) @@ -190,6 +216,14 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { if err != nil { return err } + if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) { + // A new zero-length blob is probably + // just a new non-empty blob that + // hasn't committed its data yet (see + // Get()), and in any case has no + // value. + continue + } fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix()) } if resp.NextMarker == "" { @@ -200,16 +234,26 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { } func (v *AzureBlobVolume) Delete(loc string) error { - // TODO: Use leases to handle races with Touch and Put. if v.readonly { return MethodDisabledError } + // Ideally we would use If-Unmodified-Since, but that + // particular condition seems to be ignored by Azure. Instead, + // we get the Etag before checking Mtime, and use If-Match to + // ensure we don't delete data if Put() or Touch() happens + // between our calls to Mtime() and DeleteBlob(). + props, err := v.bsClient.GetBlobProperties(v.containerName, loc) + if err != nil { + return err + } if t, err := v.Mtime(loc); err != nil { return err } else if time.Since(t) < blobSignatureTTL { return nil } - return v.bsClient.DeleteBlob(v.containerName, loc) + return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{ + "If-Match": props.Etag, + }) } func (v *AzureBlobVolume) Status() *VolumeStatus { @@ -231,3 +275,17 @@ func (v *AzureBlobVolume) Writable() bool { func (v *AzureBlobVolume) Replication() int { return v.replication } + +// If possible, translate an Azure SDK error to a recognizable error +// like os.ErrNotExist. +func (v *AzureBlobVolume) translateError(err error) error { + switch { + case err == nil: + return err + case strings.Contains(err.Error(), "404 Not Found"): + // "storage: service returned without a response body (404 Not Found)" + return os.ErrNotExist + default: + return err + } +}