X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cafa08eae78c6e29898164f8b5b2fc0127d69f48..38de76ceaabe6f1b522a538a27dfeb2f58c5fb69:/services/keepstore/azure_blob_volume.go diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index 75344890ab..3090b95cb2 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -97,13 +97,14 @@ func init() { type AzureBlobVolume struct { StorageAccountName string StorageAccountKeyFile string + StorageBaseURL string // "" means default, "core.windows.net" ContainerName string AzureReplication int ReadOnly bool RequestTimeout arvados.Duration azClient storage.Client - bsClient storage.BlobStorageClient + bsClient *azureBlobClient } // Examples implements VolumeWithExamples. @@ -116,6 +117,14 @@ func (*AzureBlobVolume) Examples() []Volume { AzureReplication: 3, RequestTimeout: azureDefaultRequestTimeout, }, + &AzureBlobVolume{ + StorageAccountName: "cn-account-name", + StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt", + StorageBaseURL: "core.chinacloudapi.cn", + ContainerName: "cn-container-name", + AzureReplication: 3, + RequestTimeout: azureDefaultRequestTimeout, + }, } } @@ -136,7 +145,10 @@ func (v *AzureBlobVolume) Start() error { if err != nil { return err } - v.azClient, err = storage.NewBasicClient(v.StorageAccountName, accountKey) + if v.StorageBaseURL == "" { + v.StorageBaseURL = storage.DefaultBaseURL + } + v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true) if err != nil { return fmt.Errorf("creating Azure storage client: %s", err) } @@ -147,7 +159,10 @@ func (v *AzureBlobVolume) Start() error { v.azClient.HTTPClient = &http.Client{ Timeout: time.Duration(v.RequestTimeout), } - v.bsClient = v.azClient.GetBlobService() + bs := v.azClient.GetBlobService() + v.bsClient = &azureBlobClient{ + client: &bs, + } ok, err := v.bsClient.ContainerExists(v.ContainerName) if err != nil { @@ -159,6 +174,11 @@ func (v *AzureBlobVolume) Start() error { return nil } +// DeviceID returns a globally unique ID for the storage container. +func (v *AzureBlobVolume) DeviceID() string { + return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName +} + // Return true if expires_at metadata attribute is found on the block func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) { metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc) @@ -623,3 +643,104 @@ func (v *AzureBlobVolume) EmptyTrash() { log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) } + +// InternalStats returns bucket I/O and API call counters. +func (v *AzureBlobVolume) InternalStats() interface{} { + return &v.bsClient.stats +} + +type azureBlobStats struct { + statsTicker + Ops uint64 + GetOps uint64 + GetRangeOps uint64 + GetMetadataOps uint64 + GetPropertiesOps uint64 + CreateOps uint64 + SetMetadataOps uint64 + DelOps uint64 + ListOps uint64 +} + +func (s *azureBlobStats) TickErr(err error) { + if err == nil { + return + } + errType := fmt.Sprintf("%T", err) + if err, ok := err.(storage.AzureStorageServiceError); ok { + errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code) + } + log.Printf("errType %T, err %s", err, err) + s.statsTicker.TickErr(err, errType) +} + +// azureBlobClient wraps storage.BlobStorageClient in order to count +// I/O and API usage stats. +type azureBlobClient struct { + client *storage.BlobStorageClient + stats azureBlobStats +} + +func (c *azureBlobClient) ContainerExists(cname string) (bool, error) { + c.stats.Tick(&c.stats.Ops) + ok, err := c.client.ContainerExists(cname) + c.stats.TickErr(err) + return ok, err +} + +func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) { + c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps) + m, err := c.client.GetBlobMetadata(cname, bname) + c.stats.TickErr(err) + return m, err +} + +func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) { + c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps) + p, err := c.client.GetBlobProperties(cname, bname) + c.stats.TickErr(err) + return p, err +} + +func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) { + c.stats.Tick(&c.stats.Ops, &c.stats.GetOps) + rdr, err := c.client.GetBlob(cname, bname) + c.stats.TickErr(err) + return NewCountingReader(rdr, c.stats.TickInBytes), err +} + +func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) { + c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps) + rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs) + c.stats.TickErr(err) + return NewCountingReader(rdr, c.stats.TickInBytes), err +} + +func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error { + c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps) + rdr = NewCountingReader(rdr, c.stats.TickOutBytes) + err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs) + c.stats.TickErr(err) + return err +} + +func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error { + c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps) + err := c.client.SetBlobMetadata(cname, bname, m, hdrs) + c.stats.TickErr(err) + return err +} + +func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) { + c.stats.Tick(&c.stats.Ops, &c.stats.ListOps) + resp, err := c.client.ListBlobs(cname, params) + c.stats.TickErr(err) + return resp, err +} + +func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error { + c.stats.Tick(&c.stats.Ops, &c.stats.DelOps) + err := c.client.DeleteBlob(cname, bname, hdrs) + c.stats.TickErr(err) + return err +}