--- /dev/null
+package main
+
+import (
+ "bytes"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/curoverse/azure-sdk-for-go/storage"
+)
+
+var (
+ azureStorageAccountName string
+ azureStorageAccountKeyFile string
+ azureStorageReplication int
+)
+
+func readKeyFromFile(file string) (string, error) {
+ buf, err := ioutil.ReadFile(file)
+ if err != nil {
+ return "", errors.New("reading key from " + file + ": " + err.Error())
+ }
+ accountKey := strings.TrimSpace(string(buf))
+ if accountKey == "" {
+ return "", errors.New("empty account key in " + file)
+ }
+ return accountKey, nil
+}
+
+type azureVolumeAdder struct {
+ *volumeSet
+}
+
+func (s *azureVolumeAdder) Set(containerName string) error {
+ if containerName == "" {
+ return errors.New("no container name given")
+ }
+ if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
+ return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
+ }
+ accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
+ if err != nil {
+ return err
+ }
+ azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
+ if err != nil {
+ return errors.New("creating Azure storage client: " + err.Error())
+ }
+ if flagSerializeIO {
+ log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
+ }
+ v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
+ if err := v.Check(); err != nil {
+ return err
+ }
+ *s.volumeSet = append(*s.volumeSet, v)
+ return nil
+}
+
+func init() {
+ flag.Var(&azureVolumeAdder{&volumes},
+ "azure-storage-container-volume",
+ "Use the given container as a storage volume. Can be given multiple times.")
+ flag.StringVar(
+ &azureStorageAccountName,
+ "azure-storage-account-name",
+ "",
+ "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
+ flag.StringVar(
+ &azureStorageAccountKeyFile,
+ "azure-storage-account-key-file",
+ "",
+ "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+ flag.IntVar(
+ &azureStorageReplication,
+ "azure-storage-replication",
+ 3,
+ "Replication level to report to clients when data is stored in an Azure container.")
+}
+
+// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
+// container.
+type AzureBlobVolume struct {
+ azClient storage.Client
+ bsClient storage.BlobStorageClient
+ containerName string
+ readonly bool
+ replication int
+}
+
+func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
+ return &AzureBlobVolume{
+ azClient: client,
+ bsClient: client.GetBlobService(),
+ containerName: containerName,
+ readonly: readonly,
+ replication: replication,
+ }
+}
+
+// Check returns nil if the volume is usable.
+func (v *AzureBlobVolume) Check() error {
+ ok, err := v.bsClient.ContainerExists(v.containerName)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return errors.New("container does not exist")
+ }
+ return nil
+}
+
+func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+ rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+ if err != nil {
+ if strings.Contains(err.Error(), "404 Not Found") {
+ // "storage: service returned without a response body (404 Not Found)"
+ return nil, os.ErrNotExist
+ }
+ return nil, err
+ }
+ switch err := err.(type) {
+ case nil:
+ default:
+ log.Printf("ERROR IN Get(): %T %#v", err, err)
+ return nil, err
+ }
+ defer rdr.Close()
+ buf := bufs.Get(BlockSize)
+ n, err := io.ReadFull(rdr, buf)
+ switch err {
+ case io.EOF, io.ErrUnexpectedEOF:
+ return buf[:n], nil
+ default:
+ bufs.Put(buf)
+ return nil, err
+ }
+}
+
+func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+ rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+ if err != nil {
+ return err
+ }
+ defer rdr.Close()
+ return compareReaderWithBuf(rdr, expect, loc[:32])
+}
+
+func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
+ return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
+}
+
+func (v *AzureBlobVolume) Touch(loc string) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
+ return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
+ "touch": fmt.Sprintf("%d", time.Now()),
+ })
+}
+
+func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
+ props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ if err != nil {
+ return time.Time{}, err
+ }
+ return time.Parse(time.RFC1123, props.LastModified)
+}
+
+func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
+ params := storage.ListBlobsParameters{
+ Prefix: prefix,
+ }
+ for {
+ resp, err := v.bsClient.ListBlobs(v.containerName, params)
+ if err != nil {
+ return err
+ }
+ for _, b := range resp.Blobs {
+ t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
+ if err != nil {
+ return err
+ }
+ fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
+ }
+ if resp.NextMarker == "" {
+ return nil
+ }
+ params.Marker = resp.NextMarker
+ }
+}
+
+func (v *AzureBlobVolume) Delete(loc string) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
+ // Ideally we would use If-Unmodified-Since, but that
+ // particular condition seems to be ignored by Azure. Instead,
+ // we get the Etag before checking Mtime, and use If-Match to
+ // ensure we don't delete data if Put() or Touch() happens
+ // between our calls to Mtime() and DeleteBlob().
+ props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ if err != nil {
+ return err
+ }
+ if t, err := v.Mtime(loc); err != nil {
+ return err
+ } else if time.Since(t) < blobSignatureTTL {
+ return nil
+ }
+ return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+ "If-Match": props.Etag,
+ })
+}
+
+func (v *AzureBlobVolume) Status() *VolumeStatus {
+ return &VolumeStatus{
+ DeviceNum: 1,
+ BytesFree: BlockSize * 1000,
+ BytesUsed: 1,
+ }
+}
+
+func (v *AzureBlobVolume) String() string {
+ return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
+}
+
+func (v *AzureBlobVolume) Writable() bool {
+ return !v.readonly
+}
+
+func (v *AzureBlobVolume) Replication() int {
+ return v.replication
+}
--- /dev/null
+package main
+
+import (
+ "encoding/base64"
+ "encoding/xml"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "math/rand"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "regexp"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/curoverse/azure-sdk-for-go/storage"
+)
+
+const (
+ // The same fake credentials used by Microsoft's Azure emulator
+ emulatorAccountName = "devstoreaccount1"
+ emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
+)
+
+var azureTestContainer string
+
+func init() {
+ flag.StringVar(
+ &azureTestContainer,
+ "test.azure-storage-container-volume",
+ "",
+ "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
+}
+
+type azBlob struct {
+ Data []byte
+ Etag string
+ Metadata map[string]string
+ Mtime time.Time
+ Uncommitted map[string][]byte
+}
+
+type azStubHandler struct {
+ sync.Mutex
+ blobs map[string]*azBlob
+}
+
+func newAzStubHandler() *azStubHandler {
+ return &azStubHandler{
+ blobs: make(map[string]*azBlob),
+ }
+}
+
+func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
+ if blob, ok := h.blobs[container+"|"+hash]; !ok {
+ return
+ } else {
+ blob.Mtime = t
+ }
+}
+
+func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
+ h.Lock()
+ defer h.Unlock()
+ h.blobs[container+"|"+hash] = &azBlob{
+ Data: data,
+ Mtime: time.Now(),
+ Uncommitted: make(map[string][]byte),
+ }
+}
+
+func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+ h.Lock()
+ defer h.Unlock()
+ // defer log.Printf("azStubHandler: %+v", r)
+
+ path := strings.Split(r.URL.Path, "/")
+ container := path[1]
+ hash := ""
+ if len(path) > 2 {
+ hash = path[2]
+ }
+
+ if err := r.ParseForm(); err != nil {
+ log.Printf("azStubHandler(%+v): %s", r, err)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return
+ }
+
+ type blockListRequestBody struct {
+ XMLName xml.Name `xml:"BlockList"`
+ Uncommitted []string
+ }
+
+ blob, blobExists := h.blobs[container+"|"+hash]
+
+ switch {
+ case r.Method == "PUT" && r.Form.Get("comp") == "":
+ // "Put Blob" API
+ h.blobs[container+"|"+hash] = &azBlob{
+ Data: body,
+ Mtime: time.Now(),
+ Uncommitted: make(map[string][]byte),
+ Etag: makeEtag(),
+ }
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "block":
+ // "Put Block" API
+ if !blobExists {
+ log.Printf("Got block for nonexistent blob: %+v", r)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
+ if err != nil || len(blockID) == 0 {
+ log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Uncommitted[string(blockID)] = body
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
+ // "Put Block List" API
+ bl := &blockListRequestBody{}
+ if err := xml.Unmarshal(body, bl); err != nil {
+ log.Printf("xml Unmarshal: %s", err)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ for _, encBlockID := range bl.Uncommitted {
+ blockID, err := base64.StdEncoding.DecodeString(encBlockID)
+ if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
+ log.Printf("Invalid blockid: %+q", encBlockID)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Data = blob.Uncommitted[string(blockID)]
+ blob.Etag = makeEtag()
+ blob.Mtime = time.Now()
+ delete(blob.Uncommitted, string(blockID))
+ }
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
+ // "Set Metadata Headers" API. We don't bother
+ // stubbing "Get Metadata Headers": AzureBlobVolume
+ // sets metadata headers only as a way to bump Etag
+ // and Last-Modified.
+ if !blobExists {
+ log.Printf("Got metadata for nonexistent blob: %+v", r)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Metadata = make(map[string]string)
+ for k, v := range r.Header {
+ if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+ blob.Metadata[k] = v[0]
+ }
+ }
+ blob.Mtime = time.Now()
+ blob.Etag = makeEtag()
+ case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
+ // "Get Blob" API
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
+ rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
+ if r.Method == "GET" {
+ if _, err := rw.Write(blob.Data); err != nil {
+ log.Printf("write %+q: %s", blob.Data, err)
+ }
+ }
+ case r.Method == "DELETE" && hash != "":
+ // "Delete Blob" API
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ delete(h.blobs, container+"|"+hash)
+ rw.WriteHeader(http.StatusAccepted)
+ case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
+ // "List Blobs" API
+ prefix := container + "|" + r.Form.Get("prefix")
+ marker := r.Form.Get("marker")
+
+ maxResults := 2
+ if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
+ maxResults = n
+ }
+
+ resp := storage.BlobListResponse{
+ Marker: marker,
+ NextMarker: "",
+ MaxResults: int64(maxResults),
+ }
+ var hashes sort.StringSlice
+ for k := range h.blobs {
+ if strings.HasPrefix(k, prefix) {
+ hashes = append(hashes, k[len(container)+1:])
+ }
+ }
+ hashes.Sort()
+ for _, hash := range hashes {
+ if len(resp.Blobs) == maxResults {
+ resp.NextMarker = hash
+ break
+ }
+ if len(resp.Blobs) > 0 || marker == "" || marker == hash {
+ blob := h.blobs[container+"|"+hash]
+ resp.Blobs = append(resp.Blobs, storage.Blob{
+ Name: hash,
+ Properties: storage.BlobProperties{
+ LastModified: blob.Mtime.Format(time.RFC1123),
+ ContentLength: int64(len(blob.Data)),
+ Etag: blob.Etag,
+ },
+ })
+ }
+ }
+ buf, err := xml.Marshal(resp)
+ if err != nil {
+ log.Print(err)
+ rw.WriteHeader(http.StatusInternalServerError)
+ }
+ rw.Write(buf)
+ default:
+ log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
+ rw.WriteHeader(http.StatusNotImplemented)
+ }
+}
+
+// azStubDialer is a net.Dialer that notices when the Azure driver
+// tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
+// in such cases transparently dials "127.0.0.1:46067" instead.
+type azStubDialer struct {
+ net.Dialer
+}
+
+var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
+
+func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
+ if hp := localHostPortRe.FindString(address); hp != "" {
+ log.Println("azStubDialer: dial", hp, "instead of", address)
+ address = hp
+ }
+ return d.Dialer.Dial(network, address)
+}
+
+type TestableAzureBlobVolume struct {
+ *AzureBlobVolume
+ azHandler *azStubHandler
+ azStub *httptest.Server
+ t *testing.T
+}
+
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume {
+ azHandler := newAzStubHandler()
+ azStub := httptest.NewServer(azHandler)
+
+ var azClient storage.Client
+
+ container := azureTestContainer
+ if container == "" {
+ // Connect to stub instead of real Azure storage service
+ stubURLBase := strings.Split(azStub.URL, "://")[1]
+ var err error
+ if azClient, err = storage.NewClient(emulatorAccountName, emulatorAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
+ t.Fatal(err)
+ }
+ container = "fakecontainername"
+ } else {
+ // Connect to real Azure storage service
+ accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
+ if err != nil {
+ t.Fatal(err)
+ }
+ azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ v := NewAzureBlobVolume(azClient, container, readonly, replication)
+
+ return &TestableAzureBlobVolume{
+ AzureBlobVolume: v,
+ azHandler: azHandler,
+ azStub: azStub,
+ t: t,
+ }
+}
+
+func TestAzureBlobVolumeWithGeneric(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+ DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+ })
+}
+
+func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+ DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
+ })
+}
+
+func TestAzureBlobVolumeReplication(t *testing.T) {
+ for r := 1; r <= 4; r++ {
+ v := NewTestableAzureBlobVolume(t, false, r)
+ defer v.Teardown()
+ if n := v.Replication(); n != r {
+ t.Errorf("Got replication %d, expected %d", n, r)
+ }
+ }
+}
+
+func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
+ v.azHandler.PutRaw(v.containerName, locator, data)
+}
+
+func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
+ v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+}
+
+func (v *TestableAzureBlobVolume) Teardown() {
+ v.azStub.Close()
+}
+
+func makeEtag() string {
+ return fmt.Sprintf("0x%x", rand.Int63())
+}
package main
import (
+ "bytes"
"crypto/md5"
"fmt"
"io"
}
return <-outcome
}
+
+func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+ bufLen := 1 << 20
+ if bufLen > len(expect) && len(expect) > 0 {
+ // No need for bufLen to be longer than
+ // expect, except that len(buf)==0 would
+ // prevent us from handling empty readers the
+ // same way as non-empty readers: reading 0
+ // bytes at a time never reaches EOF.
+ bufLen = len(expect)
+ }
+ buf := make([]byte, bufLen)
+ cmp := expect
+
+ // Loop invariants: all data read so far matched what
+ // we expected, and the first N bytes of cmp are
+ // expected to equal the next N bytes read from
+ // rdr.
+ for {
+ n, err := rdr.Read(buf)
+ if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+ return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
+ }
+ cmp = cmp[n:]
+ if err == io.EOF {
+ if len(cmp) != 0 {
+ return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], nil, nil)
+ }
+ return nil
+ } else if err != nil {
+ return err
+ }
+ }
+}
case <-ok:
}
}
+
+func TestPutReplicationHeader(t *testing.T) {
+ defer teardown()
+
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+
+ resp := IssueRequest(&RequestTester{
+ method: "PUT",
+ uri: "/" + TestHash,
+ requestBody: TestBlock,
+ })
+ if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
+ t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
+ }
+}
return
}
- err = PutBlock(buf, hash)
+ replication, err := PutBlock(buf, hash)
bufs.Put(buf)
if err != nil {
expiry := time.Now().Add(blobSignatureTTL)
returnHash = SignLocator(returnHash, apiToken, expiry)
}
+ resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
resp.Write([]byte(returnHash + "\n"))
}
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(block []byte, hash string) error {
+func PutBlock(block []byte, hash string) (int, error) {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return RequestHashError
+ return 0, RequestHashError
}
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
- return err
+ if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+ return n, err
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if vol := KeepVM.NextWritable(); vol != nil {
if err := vol.Put(hash, block); err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
}
writables := KeepVM.AllWritable()
if len(writables) == 0 {
log.Print("No writable volumes.")
- return FullError
+ return 0, FullError
}
allFull := true
for _, vol := range writables {
err := vol.Put(hash, block)
if err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
if err != FullError {
// The volume is not full but the
if allFull {
log.Print("All volumes are full.")
- return FullError
+ return 0, FullError
}
// Already logged the non-full errors.
- return GenericError
+ return 0, GenericError
}
-// CompareAndTouch returns nil if one of the volumes already has the
-// given content and it successfully updates the relevant block's
-// modification time in order to protect it from premature garbage
-// collection.
-func CompareAndTouch(hash string, buf []byte) error {
+// CompareAndTouch returns the current replication level if one of the
+// volumes already has the given content and it successfully updates
+// the relevant block's modification time in order to protect it from
+// premature garbage collection. Otherwise, it returns a non-nil
+// error.
+func CompareAndTouch(hash string, buf []byte) (int, error) {
var bestErr error = NotFoundError
for _, vol := range KeepVM.AllWritable() {
if err := vol.Compare(hash, buf); err == CollisionError {
// both, so there's no point writing it even
// on a different volume.)
log.Printf("%s: Compare(%s): %s", vol, hash, err)
- return err
+ return 0, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// "normal" error: we don't log anything.
continue
}
// Compare and Touch both worked --> done.
- return nil
+ return vol.Replication(), nil
}
- return bestErr
+ return 0, bestErr
}
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
setupHandlersWithGenericVolumeTest(t, factory)
// PutBlock
- if err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
// Check that PutBlock succeeds again even after CompareAndTouch
- if err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
testableVolumes[1].PutRaw(testHash, badData)
// Check that PutBlock with good data succeeds
- if err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock for %q: %s", testHash, err)
}
package main
import (
- "bufio"
"bytes"
- "errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"os"
"os/signal"
"strings"
- "sync"
"syscall"
"time"
)
var pullq *WorkQueue
var trashq *WorkQueue
+type volumeSet []Volume
+
var (
flagSerializeIO bool
flagReadonly bool
+ volumes volumeSet
)
-type volumeSet []Volume
-
-func (vs *volumeSet) Set(value string) error {
- if dirs := strings.Split(value, ","); len(dirs) > 1 {
- log.Print("DEPRECATED: using comma-separated volume list.")
- for _, dir := range dirs {
- if err := vs.Set(dir); err != nil {
- return err
- }
- }
- return nil
- }
- if len(value) == 0 || value[0] != '/' {
- return errors.New("Invalid volume: must begin with '/'.")
- }
- if _, err := os.Stat(value); err != nil {
- return err
- }
- var locker sync.Locker
- if flagSerializeIO {
- locker = &sync.Mutex{}
- }
- *vs = append(*vs, &UnixVolume{
- root: value,
- locker: locker,
- readonly: flagReadonly,
- })
- return nil
-}
-
func (vs *volumeSet) String() string {
- s := "["
- for i, v := range *vs {
- if i > 0 {
- s = s + " "
- }
- s = s + v.String()
- }
- return s + "]"
-}
-
-// Discover adds a volume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *volumeSet) Discover() int {
- added := 0
- f, err := os.Open(ProcMounts)
- if err != nil {
- log.Fatalf("opening %s: %s", ProcMounts, err)
- }
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- args := strings.Fields(scanner.Text())
- if err := scanner.Err(); err != nil {
- log.Fatalf("reading %s: %s", ProcMounts, err)
- }
- dev, mount := args[0], args[1]
- if mount == "/" {
- continue
- }
- if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
- continue
- }
- keepdir := mount + "/keep"
- if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
- continue
- }
- // Set the -readonly flag (but only for this volume)
- // if the filesystem is mounted readonly.
- flagReadonlyWas := flagReadonly
- for _, fsopt := range strings.Split(args[3], ",") {
- if fsopt == "ro" {
- flagReadonly = true
- break
- }
- if fsopt == "rw" {
- break
- }
- }
- vs.Set(keepdir)
- flagReadonly = flagReadonlyWas
- added++
- }
- return added
+ return fmt.Sprintf("%+v", (*vs)[:])
}
// TODO(twp): continue moving as much code as possible out of main
listen string
blobSigningKeyFile string
permissionTTLSec int
- volumes volumeSet
pidfile string
)
flag.StringVar(
"readonly",
false,
"Do not write, delete, or touch anything on the following volumes.")
- flag.Var(
- &volumes,
- "volumes",
- "Deprecated synonym for -volume.")
- flag.Var(
- &volumes,
- "volume",
- "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
flag.StringVar(
&pidfile,
"pid",
}
if len(volumes) == 0 {
- if volumes.Discover() == 0 {
+ if (&unixVolumeAdder{&volumes}).Discover() == 0 {
log.Fatal("No volumes found.")
}
}
defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols := KeepVM.AllReadable()
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
result, err := GetBlock(TestHash)
// Check that PutBlock returns the expected error when the hash does
// not match the block.
- if err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+ if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
t.Error("Expected RequestHashError, got %v", err)
}
// Store a corrupted block under TestHash.
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, BadBlock)
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Errorf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Errorf("PutBlock: n %d err %v", n, err)
}
// The block on disk should now match TestBlock.
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
- if err := PutBlock(b1, locator); err != nil {
+ if _, err := PutBlock(b1, locator); err != nil {
t.Error(err)
}
- if err := PutBlock(b2, locator); err == nil {
+ if _, err := PutBlock(b2, locator); err == nil {
t.Error("PutBlock did not report a collision")
} else if err != CollisionError {
t.Errorf("PutBlock returned %v", err)
// vols[0].Touch will fail on the next call, so the volume
// manager will store a copy on vols[1] instead.
vols[0].(*MockVolume).Touchable = false
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols[0].(*MockVolume).Touchable = true
f.Close()
ProcMounts = f.Name()
- var resultVols volumeSet
- added := resultVols.Discover()
+ resultVols := volumeSet{}
+ added := (&unixVolumeAdder{&resultVols}).Discover()
if added != len(resultVols) {
t.Errorf("Discover returned %d, but added %d volumes",
f.Close()
ProcMounts = f.Name()
- var resultVols volumeSet
- added := resultVols.Discover()
+ resultVols := volumeSet{}
+ added := (&unixVolumeAdder{&resultVols}).Discover()
if added != 0 || len(resultVols) != 0 {
t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
}
// Put block
var PutContent = func(content []byte, locator string) (err error) {
- err = PutBlock(content, locator)
+ _, err = PutBlock(content, locator)
return
}
// will fail because it is full, but Mtime or Delete can
// succeed -- then Writable should return false.
Writable() bool
+
+ // Replication returns the storage redundancy of the
+ // underlying device. It will be passed on to clients in
+ // responses to PUT requests.
+ Replication() int
}
// A VolumeManager tells callers which volumes can read, which volumes
buf, err := v.Get(TestHash)
if err != nil {
- t.Error(err)
+ t.Fatal(err)
}
bufs.Put(buf)
// Put must not return a nil error unless it has
// overwritten the existing data.
if bytes.Compare(buf, testDataB) != 0 {
- t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testDataB)
+ t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
}
} else {
// It is permissible for Put to fail, but it must
// leave us with either the original data, the new
// data, or nothing at all.
if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 {
- t.Errorf("Put failed but Get returned %+v, which is neither %+v nor %+v", buf, testDataA, testDataB)
+ t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
}
}
if getErr == nil {
data, err := v.Get(TestHash)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock) != 0 {
- t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock)
+ } else {
+ if bytes.Compare(data, TestBlock) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
data, err = v.Get(TestHash2)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock2) != 0 {
- t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock2)
+ } else {
+ if bytes.Compare(data, TestBlock2) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
data, err = v.Get(TestHash3)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock3) != 0 {
- t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock3)
+ } else {
+ if bytes.Compare(data, TestBlock3) != 0 {
+ t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
}
// testPutAndTouch
func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
+ blobSignatureTTL = 300 * time.Second
if v.Writable() == false {
return
data, err := v.Get(TestHash)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock) != 0 {
- t.Error("Block still present, but content is incorrect: %+v != %+v", data, TestBlock)
+ } else {
+ if bytes.Compare(data, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", data, TestBlock)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
}
// Calling Delete() for a block with a timestamp older than
func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
+ blobSignatureTTL = 300 * time.Second
if v.Writable() == false {
return
}
v.Put(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL*time.Second))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
if err := v.Delete(TestHash); err != nil {
t.Error(err)
}
if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
- t.Errorf("os.IsNotExist(%v) should have been true", err.Error())
+ t.Errorf("os.IsNotExist(%v) should have been true", err)
}
}
func (v *MockVolume) Writable() bool {
return !v.Readonly
}
+
+func (v *MockVolume) Replication() int {
+ return 1
+}
package main
import (
- "bytes"
+ "bufio"
+ "errors"
+ "flag"
"fmt"
"io"
"io/ioutil"
"time"
)
+type unixVolumeAdder struct {
+ *volumeSet
+}
+
+func (vs *unixVolumeAdder) Set(value string) error {
+ if dirs := strings.Split(value, ","); len(dirs) > 1 {
+ log.Print("DEPRECATED: using comma-separated volume list.")
+ for _, dir := range dirs {
+ if err := vs.Set(dir); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ if len(value) == 0 || value[0] != '/' {
+ return errors.New("Invalid volume: must begin with '/'.")
+ }
+ if _, err := os.Stat(value); err != nil {
+ return err
+ }
+ var locker sync.Locker
+ if flagSerializeIO {
+ locker = &sync.Mutex{}
+ }
+ *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
+ root: value,
+ locker: locker,
+ readonly: flagReadonly,
+ })
+ return nil
+}
+
+func init() {
+ flag.Var(
+ &unixVolumeAdder{&volumes},
+ "volumes",
+ "Deprecated synonym for -volume.")
+ flag.Var(
+ &unixVolumeAdder{&volumes},
+ "volume",
+ "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+}
+
+// Discover adds a UnixVolume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *unixVolumeAdder) Discover() int {
+ added := 0
+ f, err := os.Open(ProcMounts)
+ if err != nil {
+ log.Fatalf("opening %s: %s", ProcMounts, err)
+ }
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ args := strings.Fields(scanner.Text())
+ if err := scanner.Err(); err != nil {
+ log.Fatalf("reading %s: %s", ProcMounts, err)
+ }
+ dev, mount := args[0], args[1]
+ if mount == "/" {
+ continue
+ }
+ if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+ continue
+ }
+ keepdir := mount + "/keep"
+ if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+ continue
+ }
+ // Set the -readonly flag (but only for this volume)
+ // if the filesystem is mounted readonly.
+ flagReadonlyWas := flagReadonly
+ for _, fsopt := range strings.Split(args[3], ",") {
+ if fsopt == "ro" {
+ flagReadonly = true
+ break
+ }
+ if fsopt == "rw" {
+ break
+ }
+ }
+ if err := vs.Set(keepdir); err != nil {
+ log.Printf("adding %q: %s", keepdir, err)
+ } else {
+ added++
+ }
+ flagReadonly = flagReadonlyWas
+ }
+ return added
+}
+
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
// path to the volume's root directory
// bytes.Compare(), but uses less memory.
func (v *UnixVolume) Compare(loc string, expect []byte) error {
path := v.blockPath(loc)
- stat, err := v.stat(path)
- if err != nil {
+ if _, err := v.stat(path); err != nil {
return err
}
- bufLen := 1 << 20
- if int64(bufLen) > stat.Size() {
- bufLen = int(stat.Size())
- if bufLen < 1 {
- // len(buf)==0 would prevent us from handling
- // empty files the same way as non-empty
- // files, because reading 0 bytes at a time
- // never reaches EOF.
- bufLen = 1
- }
- }
- cmp := expect
- buf := make([]byte, bufLen)
return v.getFunc(path, func(rdr io.Reader) error {
- // Loop invariants: all data read so far matched what
- // we expected, and the first N bytes of cmp are
- // expected to equal the next N bytes read from
- // reader.
- for {
- n, err := rdr.Read(buf)
- if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
- return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
- }
- cmp = cmp[n:]
- if err == io.EOF {
- if len(cmp) != 0 {
- return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
- }
- return nil
- } else if err != nil {
- return err
- }
- }
+ return compareReaderWithBuf(rdr, expect, loc[:32])
})
}
return !v.readonly
}
+func (v *UnixVolume) Replication() int {
+ return 1
+}
+
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func lockfile(f *os.File) error {
return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)