projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git]
/
services
/
keepstore
/
azure_blob_volume.go
diff --git
a/services/keepstore/azure_blob_volume.go
b/services/keepstore/azure_blob_volume.go
index d2163f6b490376768383b260444d6be90a9ca1ed..6ca31c38329ec7347631a03c802b4216bd05f167 100644
(file)
--- a/
services/keepstore/azure_blob_volume.go
+++ b/
services/keepstore/azure_blob_volume.go
@@
-2,12
+2,14
@@
package main
import (
"bytes"
import (
"bytes"
+ "context"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
+ "net/http"
"os"
"regexp"
"strconv"
"os"
"regexp"
"strconv"
@@
-15,9
+17,12
@@
import (
"sync"
"time"
"sync"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/curoverse/azure-sdk-for-go/storage"
)
"github.com/curoverse/azure-sdk-for-go/storage"
)
+const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
+
var (
azureMaxGetBytes int
azureStorageAccountName string
var (
azureMaxGetBytes int
azureStorageAccountName string
@@
-95,6
+100,7
@@
type AzureBlobVolume struct {
ContainerName string
AzureReplication int
ReadOnly bool
ContainerName string
AzureReplication int
ReadOnly bool
+ RequestTimeout arvados.Duration
azClient storage.Client
bsClient storage.BlobStorageClient
azClient storage.Client
bsClient storage.BlobStorageClient
@@
-108,6
+114,7
@@
func (*AzureBlobVolume) Examples() []Volume {
StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
ContainerName: "example-container-name",
AzureReplication: 3,
StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
ContainerName: "example-container-name",
AzureReplication: 3,
+ RequestTimeout: azureDefaultRequestTimeout,
},
}
}
},
}
}
@@
-133,6
+140,13
@@
func (v *AzureBlobVolume) Start() error {
if err != nil {
return fmt.Errorf("creating Azure storage client: %s", err)
}
if err != nil {
return fmt.Errorf("creating Azure storage client: %s", err)
}
+
+ if v.RequestTimeout == 0 {
+ v.RequestTimeout = azureDefaultRequestTimeout
+ }
+ v.azClient.HTTPClient = &http.Client{
+ Timeout: time.Duration(v.RequestTimeout),
+ }
v.bsClient = v.azClient.GetBlobService()
ok, err := v.bsClient.ContainerExists(v.ContainerName)
v.bsClient = v.azClient.GetBlobService()
ok, err := v.bsClient.ContainerExists(v.ContainerName)
@@
-163,7
+177,7
@@
func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a PutBlob operation is in progress, and
// wait for it to finish writing.
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a PutBlob operation is in progress, and
// wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+func (v *AzureBlobVolume) Get(
ctx context.Context,
loc string, buf []byte) (int, error) {
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return 0, err
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return 0, err
@@
-271,7
+285,7
@@
func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
}
// Compare the given data with existing stored data.
}
// Compare the given data with existing stored data.
-func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+func (v *AzureBlobVolume) Compare(
ctx context.Context,
loc string, expect []byte) error {
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return err
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return err
@@
-284,11
+298,11
@@
func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
return v.translateError(err)
}
defer rdr.Close()
return v.translateError(err)
}
defer rdr.Close()
- return compareReaderWithBuf(rdr, expect, loc[:32])
+ return compareReaderWithBuf(
ctx,
rdr, expect, loc[:32])
}
// Put stores a Keep block as a block blob in the container.
}
// Put stores a Keep block as a block blob in the container.
-func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+func (v *AzureBlobVolume) Put(
ctx context.Context,
loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
if v.ReadOnly {
return MethodDisabledError
}