X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5fd9b5a5fb88f40b999ab6c9fa9435ad01f595ff..50128b53da4003912635b03fb27b5be2c5beaca1:/services/keepstore/azure_blob_volume.go diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index d2163f6b49..6ca31c3832 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -2,12 +2,14 @@ package main import ( "bytes" + "context" "errors" "flag" "fmt" "io" "io/ioutil" "log" + "net/http" "os" "regexp" "strconv" @@ -15,9 +17,12 @@ import ( "sync" "time" + "git.curoverse.com/arvados.git/sdk/go/arvados" "github.com/curoverse/azure-sdk-for-go/storage" ) +const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute) + var ( azureMaxGetBytes int azureStorageAccountName string @@ -95,6 +100,7 @@ type AzureBlobVolume struct { ContainerName string AzureReplication int ReadOnly bool + RequestTimeout arvados.Duration azClient storage.Client bsClient storage.BlobStorageClient @@ -108,6 +114,7 @@ func (*AzureBlobVolume) Examples() []Volume { StorageAccountKeyFile: "/etc/azure_storage_account_key.txt", ContainerName: "example-container-name", AzureReplication: 3, + RequestTimeout: azureDefaultRequestTimeout, }, } } @@ -133,6 +140,13 @@ func (v *AzureBlobVolume) Start() error { if err != nil { return fmt.Errorf("creating Azure storage client: %s", err) } + + if v.RequestTimeout == 0 { + v.RequestTimeout = azureDefaultRequestTimeout + } + v.azClient.HTTPClient = &http.Client{ + Timeout: time.Duration(v.RequestTimeout), + } v.bsClient = v.azClient.GetBlobService() ok, err := v.bsClient.ContainerExists(v.ContainerName) @@ -163,7 +177,7 @@ func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err // If the block is younger than azureWriteRaceInterval and is // unexpectedly empty, assume a PutBlob operation is in progress, and // wait for it to finish writing. -func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) { +func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) { trashed, _, err := v.checkTrashed(loc) if err != nil { return 0, err @@ -271,7 +285,7 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) { } // Compare the given data with existing stored data. -func (v *AzureBlobVolume) Compare(loc string, expect []byte) error { +func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error { trashed, _, err := v.checkTrashed(loc) if err != nil { return err @@ -284,11 +298,11 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error { return v.translateError(err) } defer rdr.Close() - return compareReaderWithBuf(rdr, expect, loc[:32]) + return compareReaderWithBuf(ctx, rdr, expect, loc[:32]) } // Put stores a Keep block as a block blob in the container. -func (v *AzureBlobVolume) Put(loc string, block []byte) error { +func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error { if v.ReadOnly { return MethodDisabledError }