X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b2bcd45082d2df2b5a17645eb60473cc17c76e88..20c5b73d598463a7dfb4ce711993480d56e23838:/services/keepstore/azure_blob_volume.go diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index 35b1dc79bf..79123a9747 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "errors" "flag" "fmt" @@ -11,12 +12,13 @@ import ( "strings" "time" - "github.com/Azure/azure-sdk-for-go/storage" + "github.com/curoverse/azure-sdk-for-go/storage" ) var ( azureStorageAccountName string azureStorageAccountKeyFile string + azureStorageReplication int ) func readKeyFromFile(file string) (string, error) { @@ -39,6 +41,9 @@ func (s *azureVolumeAdder) Set(containerName string) error { if containerName == "" { return errors.New("no container name given") } + if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" { + return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume") + } accountKey, err := readKeyFromFile(azureStorageAccountKeyFile) if err != nil { return err @@ -50,7 +55,7 @@ func (s *azureVolumeAdder) Set(containerName string) error { if flagSerializeIO { log.Print("Notice: -serialize is not supported by azure-blob-container volumes.") } - v := NewAzureBlobVolume(azClient, containerName, flagReadonly) + v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication) if err := v.Check(); err != nil { return err } @@ -72,6 +77,11 @@ func init() { "azure-storage-account-key-file", "", "File containing the account key used for subsequent --azure-storage-container-volume arguments.") + flag.IntVar( + &azureStorageReplication, + "azure-storage-replication", + 3, + "Replication level to report to clients when data is stored in an Azure container.") } // An AzureBlobVolume stores and retrieves blocks in an Azure Blob @@ -81,14 +91,16 @@ type AzureBlobVolume struct { bsClient storage.BlobStorageClient containerName string readonly bool + replication int } -func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool) *AzureBlobVolume { +func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume { return &AzureBlobVolume{ - azClient: client, - bsClient: client.GetBlobService(), + azClient: client, + bsClient: client.GetBlobService(), containerName: containerName, - readonly: readonly, + readonly: readonly, + replication: replication, } } @@ -144,26 +156,16 @@ func (v *AzureBlobVolume) Put(loc string, block []byte) error { if v.readonly { return MethodDisabledError } - if err := v.bsClient.CreateBlockBlob(v.containerName, loc); err != nil { - return err - } - // We use the same block ID, base64("0")=="MA==", for everything. - if err := v.bsClient.PutBlock(v.containerName, loc, "MA==", block); err != nil { - return err - } - return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusUncommitted}}) + return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block)) } func (v *AzureBlobVolume) Touch(loc string) error { if v.readonly { return MethodDisabledError } - if exists, err := v.bsClient.BlobExists(v.containerName, loc); err != nil { - return err - } else if !exists { - return os.ErrNotExist - } - return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusCommitted}}) + return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{ + "touch": fmt.Sprintf("%d", time.Now()), + }) } func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) { @@ -198,16 +200,26 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { } func (v *AzureBlobVolume) Delete(loc string) error { - // TODO: Use leases to handle races with Touch and Put. if v.readonly { return MethodDisabledError } + // Ideally we would use If-Unmodified-Since, but that + // particular condition seems to be ignored by Azure. Instead, + // we get the Etag before checking Mtime, and use If-Match to + // ensure we don't delete data if Put() or Touch() happens + // between our calls to Mtime() and DeleteBlob(). + props, err := v.bsClient.GetBlobProperties(v.containerName, loc) + if err != nil { + return err + } if t, err := v.Mtime(loc); err != nil { return err } else if time.Since(t) < blobSignatureTTL { return nil } - return v.bsClient.DeleteBlob(v.containerName, loc) + return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{ + "If-Match": props.Etag, + }) } func (v *AzureBlobVolume) Status() *VolumeStatus { @@ -225,3 +237,7 @@ func (v *AzureBlobVolume) String() string { func (v *AzureBlobVolume) Writable() bool { return !v.readonly } + +func (v *AzureBlobVolume) Replication() int { + return v.replication +}