Made KeepVolumes a slice of Volume objects, not strings.
authorTim Pierce <twp@curoverse.com>
Tue, 22 Apr 2014 21:19:43 +0000 (17:19 -0400)
committerTim Pierce <twp@curoverse.com>
Tue, 22 Apr 2014 21:19:43 +0000 (17:19 -0400)
Moved more Unix-specific code into volume.go as part of the process.

services/keep/src/keep/keep.go
services/keep/src/keep/keep_test.go
services/keep/src/keep/volume.go

index e4a26757208eee527a723215321b7c28d8970d08..8b8ae17e4c89d6c77da742d37c1eff53b45d5676 100644 (file)
@@ -14,12 +14,9 @@ import (
        "log"
        "net/http"
        "os"
-       "path/filepath"
        "regexp"
-       "strconv"
        "strings"
        "syscall"
-       "time"
 )
 
 // ======================
@@ -40,7 +37,8 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
 
 var PROC_MOUNTS = "/proc/mounts"
 
-var KeepVolumes []string
+// KeepVolumes is a slice of volumes on which blocks can be stored.
+var KeepVolumes []Volume
 
 // ==========
 // Error types.
@@ -107,11 +105,11 @@ func main() {
        }
 
        // Check that the specified volumes actually exist.
-       KeepVolumes = []string(nil)
+       KeepVolumes = []Volume(nil)
        for _, v := range keepvols {
                if _, err := os.Stat(v); err == nil {
                        log.Println("adding Keep volume:", v)
-                       KeepVolumes = append(KeepVolumes, v)
+                       KeepVolumes = append(KeepVolumes, &UnixVolume{v})
                } else {
                        log.Printf("bad Keep volume: %s\n", err)
                }
@@ -226,7 +224,10 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
 func IndexHandler(w http.ResponseWriter, req *http.Request) {
        prefix := mux.Vars(req)["prefix"]
 
-       index := IndexLocators(prefix)
+       var index string
+       for _, vol := range KeepVolumes {
+               index = index + vol.Index(prefix)
+       }
        w.Write([]byte(index))
 }
 
@@ -273,7 +274,7 @@ func GetNodeStatus() *NodeStatus {
 
        st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
        for i, vol := range KeepVolumes {
-               st.Volumes[i] = GetVolumeStatus(vol)
+               st.Volumes[i] = vol.Status()
        }
        return st
 }
@@ -305,66 +306,10 @@ func GetVolumeStatus(volume string) *VolumeStatus {
        return &VolumeStatus{volume, devnum, free, used}
 }
 
-// IndexLocators
-//     Returns a string containing a list of locator ids found on this
-//     Keep server.  If {prefix} is given, return only those locator
-//     ids that begin with the given prefix string.
-//
-//     The return string consists of a sequence of newline-separated
-//     strings in the format
-//
-//         locator+size modification-time
-//
-//     e.g.:
-//
-//         e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-//         e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-//         e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-//
-func IndexLocators(prefix string) string {
-       var output string
-       for _, vol := range KeepVolumes {
-               filepath.Walk(vol,
-                       func(path string, info os.FileInfo, err error) error {
-                               // This WalkFunc inspects each path in the volume
-                               // and prints an index line for all files that begin
-                               // with prefix.
-                               if err != nil {
-                                       log.Printf("IndexHandler: %s: walking to %s: %s",
-                                               vol, path, err)
-                                       return nil
-                               }
-                               locator := filepath.Base(path)
-                               // Skip directories that do not match prefix.
-                               // We know there is nothing interesting inside.
-                               if info.IsDir() &&
-                                       !strings.HasPrefix(locator, prefix) &&
-                                       !strings.HasPrefix(prefix, locator) {
-                                       return filepath.SkipDir
-                               }
-                               // Skip any file that is not apparently a locator, e.g. .meta files
-                               if is_valid, err := IsValidLocator(locator); err != nil {
-                                       return err
-                               } else if !is_valid {
-                                       return nil
-                               }
-                               // Print filenames beginning with prefix
-                               if !info.IsDir() && strings.HasPrefix(locator, prefix) {
-                                       output = output + fmt.Sprintf(
-                                               "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
-                               }
-                               return nil
-                       })
-       }
-
-       return output
-}
-
 func GetBlock(hash string) ([]byte, error) {
        // Attempt to read the requested hash from a keep volume.
        for _, vol := range KeepVolumes {
-               uv := UnixVolume{vol}
-               if buf, err := uv.Read(hash); err != nil {
+               if buf, err := vol.Read(hash); err != nil {
                        // IsNotExist is an expected error and may be ignored.
                        // (If all volumes report IsNotExist, we return a NotFoundError)
                        // A CorruptError should be returned immediately.
@@ -438,39 +383,16 @@ func PutBlock(block []byte, hash string) error {
        // Store the block on the first available Keep volume.
        allFull := true
        for _, vol := range KeepVolumes {
-               if IsFull(vol) {
-                       continue
-               }
-               allFull = false
-               blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
-               if err := os.MkdirAll(blockDir, 0755); err != nil {
-                       log.Printf("%s: could not create directory %s: %s",
-                               hash, blockDir, err)
-                       continue
-               }
-
-               tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
-               if tmperr != nil {
-                       log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
-                       continue
-               }
-               blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
-
-               if _, err := tmpfile.Write(block); err != nil {
-                       log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
-                       continue
+               err := vol.Write(hash, block)
+               if err == nil {
+                       return nil // success!
                }
-               if err := tmpfile.Close(); err != nil {
-                       log.Printf("closing %s: %s\n", tmpfile.Name(), err)
-                       os.Remove(tmpfile.Name())
-                       continue
+               if err != FullError {
+                       // The volume is not full but the write did not succeed.
+                       // Report the error and continue trying.
+                       allFull = false
+                       log.Printf("%s: Write(%s): %s\n", vol, hash, err)
                }
-               if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
-                       log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
-                       os.Remove(tmpfile.Name())
-                       continue
-               }
-               return nil
        }
 
        if allFull {
@@ -482,53 +404,6 @@ func PutBlock(block []byte, hash string) error {
        }
 }
 
-func IsFull(volume string) (isFull bool) {
-       fullSymlink := volume + "/full"
-
-       // Check if the volume has been marked as full in the last hour.
-       if link, err := os.Readlink(fullSymlink); err == nil {
-               if ts, err := strconv.Atoi(link); err == nil {
-                       fulltime := time.Unix(int64(ts), 0)
-                       if time.Since(fulltime).Hours() < 1.0 {
-                               return true
-                       }
-               }
-       }
-
-       if avail, err := FreeDiskSpace(volume); err == nil {
-               isFull = avail < MIN_FREE_KILOBYTES
-       } else {
-               log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
-               isFull = false
-       }
-
-       // If the volume is full, timestamp it.
-       if isFull {
-               now := fmt.Sprintf("%d", time.Now().Unix())
-               os.Symlink(now, fullSymlink)
-       }
-       return
-}
-
-// FreeDiskSpace(volume)
-//     Returns the amount of available disk space on VOLUME,
-//     as a number of 1k blocks.
-//
-//     TODO(twp): consider integrating this better with
-//     VolumeStatus (e.g. keep a NodeStatus object up-to-date
-//     periodically and use it as the source of info)
-//
-func FreeDiskSpace(volume string) (free uint64, err error) {
-       var fs syscall.Statfs_t
-       err = syscall.Statfs(volume, &fs)
-       if err == nil {
-               // Statfs output is not guaranteed to measure free
-               // space in terms of 1K blocks.
-               free = fs.Bavail * uint64(fs.Bsize) / 1024
-       }
-       return
-}
-
 // ReadAtMost
 //     Reads bytes repeatedly from an io.Reader until either
 //     encountering EOF, or the maxbytes byte limit has been reached.
index 97fa1c78919e3070bf02690dbb876a741837dda7..ce5a41c9157478eb6c756ba0536e395b8508d75f 100644 (file)
@@ -37,6 +37,9 @@ var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
 //           - use an interface to mock ioutil.TempFile with a File
 //             object that always returns an error on write
 //
+// TODO(twp): Make these tests less dependent on being able to access
+// the UnixVolume root field.
+//
 // ========================================
 // GetBlock tests.
 // ========================================
@@ -136,7 +139,7 @@ func TestPutBlockOneVol(t *testing.T) {
 
        // Create two test Keep volumes, but cripple one of them.
        KeepVolumes = setup(t, 2)
-       os.Chmod(KeepVolumes[0], 000)
+       os.Chmod(KeepVolumes[0].(*UnixVolume).root, 000)
 
        // Check that PutBlock stores the data as expected.
        if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
@@ -237,12 +240,12 @@ func TestFindKeepVolumes(t *testing.T) {
        defer teardown()
 
        // Initialize two keep volumes.
-       var tempVols []string = setup(t, 2)
+       var tempVols []Volume = setup(t, 2)
 
        // Set up a bogus PROC_MOUNTS file.
        if f, err := ioutil.TempFile("", "keeptest"); err == nil {
                for _, vol := range tempVols {
-                       fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
+                       fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol.(*UnixVolume).root))
                }
                f.Close()
                PROC_MOUNTS = f.Name()
@@ -254,9 +257,10 @@ func TestFindKeepVolumes(t *testing.T) {
                                len(tempVols), len(resultVols))
                }
                for i := range tempVols {
-                       if tempVols[i] != resultVols[i] {
+                       tempVolRoot := tempVols[i].(*UnixVolume).root
+                       if tempVolRoot != resultVols[i] {
                                t.Errorf("FindKeepVolumes returned %s, expected %s\n",
-                                       resultVols[i], tempVols[i])
+                                       tempVolRoot, tempVols[i])
                        }
                }
 
@@ -305,7 +309,7 @@ func TestIndex(t *testing.T) {
        store(t, KeepVolumes[0], TEST_HASH+".meta", []byte("metadata"))
        store(t, KeepVolumes[1], TEST_HASH_2+".meta", []byte("metadata"))
 
-       index := IndexLocators("")
+       index := KeepVolumes[0].Index("") + KeepVolumes[1].Index("")
        expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
                TEST_HASH_3 + `\+\d+ \d+\n` +
                TEST_HASH_2 + `\+\d+ \d+\n$`
@@ -337,7 +341,7 @@ func TestNodeStatus(t *testing.T) {
        for i, vol := range KeepVolumes {
                volinfo := st.Volumes[i]
                mtp := volinfo.MountPoint
-               if mtp != vol {
+               if mtp != vol.(*UnixVolume).root {
                        t.Errorf("GetNodeStatus mount_point %s != KeepVolume %s", mtp, vol)
                }
                if volinfo.DeviceNum == 0 {
@@ -360,12 +364,13 @@ func TestNodeStatus(t *testing.T) {
 //     Create KeepVolumes for testing.
 //     Returns a slice of pathnames to temporary Keep volumes.
 //
-func setup(t *testing.T, num_volumes int) []string {
-       vols := make([]string, num_volumes)
+func setup(t *testing.T, num_volumes int) []Volume {
+       vols := make([]Volume, num_volumes)
        for i := range vols {
                if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
-                       vols[i] = dir + "/keep"
-                       os.Mkdir(vols[i], 0755)
+                       root := dir + "/keep"
+                       vols[i] = &UnixVolume{root}
+                       os.Mkdir(root, 0755)
                } else {
                        t.Fatal(err)
                }
@@ -378,16 +383,17 @@ func setup(t *testing.T, num_volumes int) []string {
 //
 func teardown() {
        for _, vol := range KeepVolumes {
-               os.RemoveAll(path.Dir(vol))
+               os.RemoveAll(path.Dir(vol.(*UnixVolume).root))
        }
        KeepVolumes = nil
 }
 
 // store
 //     Low-level code to write Keep blocks directly to disk for testing.
+//     Note: works only on UnixVolumes.
 //
-func store(t *testing.T, keepdir string, filename string, block []byte) {
-       blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
+func store(t *testing.T, vol Volume, filename string, block []byte) {
+       blockdir := fmt.Sprintf("%s/%s", vol.(*UnixVolume).root, filename[:3])
        if err := os.MkdirAll(blockdir, 0755); err != nil {
                t.Fatal(err)
        }
index 20113ffb33fbff4fe03edb92cf3422666b45c3f8..745f984949edc21c31f2638535f4011fd4cd8c29 100644 (file)
@@ -3,15 +3,23 @@ package main
 import (
        "crypto/md5"
        "fmt"
+       "io/ioutil"
        "log"
        "os"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "syscall"
+       "time"
 )
 
 // A Volume is an interface that represents a Keep back-end volume.
 
 type Volume interface {
-       Read(locator string) ([]byte, error)
-       Write(locator string, block []byte) error
+       Read(loc string) ([]byte, error)
+       Write(loc string, block []byte) error
+       Index(prefix string) string
+       Status() *VolumeStatus
 }
 
 // A UnixVolume is a Volume that writes to a locally mounted disk.
@@ -19,12 +27,23 @@ type UnixVolume struct {
        root string // path to this volume
 }
 
-func (v *UnixVolume) Read(locator string) ([]byte, error) {
+// Read retrieves a block identified by the locator string "loc", and
+// returns its contents as a byte slice.
+//
+// If the block could not be opened or read, Read returns a nil slice
+// and the os.Error that was generated.
+//
+// If the block is present but its content hash does not match loc,
+// Read returns the block and a CorruptError.  It is the caller's
+// responsibility to decide what (if anything) to do with the
+// corrupted data block.
+//
+func (v *UnixVolume) Read(loc string) ([]byte, error) {
        var f *os.File
        var err error
        var nread int
 
-       blockFilename := fmt.Sprintf("%s/%s/%s", v.root, locator[0:3], locator)
+       blockFilename := fmt.Sprintf("%s/%s/%s", v.root, loc[0:3], loc)
 
        f, err = os.Open(blockFilename)
        if err != nil {
@@ -41,7 +60,7 @@ func (v *UnixVolume) Read(locator string) ([]byte, error) {
        // Double check the file checksum.
        //
        filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
-       if filehash != locator {
+       if filehash != loc {
                // TODO(twp): this condition probably represents a bad disk and
                // should raise major alarm bells for an administrator: e.g.
                // they should be sent directly to an event manager at high
@@ -55,3 +74,169 @@ func (v *UnixVolume) Read(locator string) ([]byte, error) {
        // Success!
        return buf[:nread], nil
 }
+
+// Write stores a block of data identified by the locator string
+// "loc".  It returns nil on success.  If the volume is full, it
+// returns a FullError.  If the write fails due to some other error,
+// that error is returned.
+//
+func (v *UnixVolume) Write(loc string, block []byte) error {
+       if v.IsFull() {
+               return FullError
+       }
+       blockDir := fmt.Sprintf("%s/%s", v.root, loc[0:3])
+       if err := os.MkdirAll(blockDir, 0755); err != nil {
+               log.Printf("%s: could not create directory %s: %s",
+                       loc, blockDir, err)
+               return err
+       }
+
+       tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
+       if tmperr != nil {
+               log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
+               return tmperr
+       }
+       blockFilename := fmt.Sprintf("%s/%s", blockDir, loc)
+
+       if _, err := tmpfile.Write(block); err != nil {
+               log.Printf("%s: writing to %s: %s\n", v.root, blockFilename, err)
+               return err
+       }
+       if err := tmpfile.Close(); err != nil {
+               log.Printf("closing %s: %s\n", tmpfile.Name(), err)
+               os.Remove(tmpfile.Name())
+               return err
+       }
+       if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
+               log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+               os.Remove(tmpfile.Name())
+               return err
+       }
+       return nil
+}
+
+// Status returns a VolumeStatus struct describing the volume's
+// current state.
+//
+func (v *UnixVolume) Status() *VolumeStatus {
+       var fs syscall.Statfs_t
+       var devnum uint64
+
+       if fi, err := os.Stat(v.root); err == nil {
+               devnum = fi.Sys().(*syscall.Stat_t).Dev
+       } else {
+               log.Printf("%s: os.Stat: %s\n", v.root, err)
+               return nil
+       }
+
+       err := syscall.Statfs(v.root, &fs)
+       if err != nil {
+               log.Printf("%s: statfs: %s\n", v.root, err)
+               return nil
+       }
+       // These calculations match the way df calculates disk usage:
+       // "free" space is measured by fs.Bavail, but "used" space
+       // uses fs.Blocks - fs.Bfree.
+       free := fs.Bavail * uint64(fs.Bsize)
+       used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
+       return &VolumeStatus{v.root, devnum, free, used}
+}
+
+// Index returns a list of blocks found on this volume which begin with
+// the specified prefix.
+//
+// The return value is a multiline string (separated by
+// newlines). Each line is in the format
+//
+//     locator+size modification-time
+//
+// e.g.:
+//
+//     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+//     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+//     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func (v *UnixVolume) Index(prefix string) (output string) {
+       filepath.Walk(v.root,
+               func(path string, info os.FileInfo, err error) error {
+                       // This WalkFunc inspects each path in the volume
+                       // and prints an index line for all files that begin
+                       // with prefix.
+                       if err != nil {
+                               log.Printf("IndexHandler: %s: walking to %s: %s",
+                                       v.root, path, err)
+                               return nil
+                       }
+                       locator := filepath.Base(path)
+                       // Skip directories that do not match prefix.
+                       // We know there is nothing interesting inside.
+                       if info.IsDir() &&
+                               !strings.HasPrefix(locator, prefix) &&
+                               !strings.HasPrefix(prefix, locator) {
+                               return filepath.SkipDir
+                       }
+                       // Skip any file that is not apparently a locator, e.g. .meta files
+                       if is_valid, err := IsValidLocator(locator); err != nil {
+                               return err
+                       } else if !is_valid {
+                               return nil
+                       }
+                       // Print filenames beginning with prefix
+                       if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+                               output = output + fmt.Sprintf(
+                                       "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+                       }
+                       return nil
+               })
+
+       return
+}
+
+// IsFull returns true if the free space on the volume is less than
+// MIN_FREE_KILOBYTES.
+//
+func (v *UnixVolume) IsFull() (isFull bool) {
+       fullSymlink := v.root + "/full"
+
+       // Check if the volume has been marked as full in the last hour.
+       if link, err := os.Readlink(fullSymlink); err == nil {
+               if ts, err := strconv.Atoi(link); err == nil {
+                       fulltime := time.Unix(int64(ts), 0)
+                       if time.Since(fulltime).Hours() < 1.0 {
+                               return true
+                       }
+               }
+       }
+
+       if avail, err := v.FreeDiskSpace(); err == nil {
+               isFull = avail < MIN_FREE_KILOBYTES
+       } else {
+               log.Printf("%s: FreeDiskSpace: %s\n", v.root, err)
+               isFull = false
+       }
+
+       // If the volume is full, timestamp it.
+       if isFull {
+               now := fmt.Sprintf("%d", time.Now().Unix())
+               os.Symlink(now, fullSymlink)
+       }
+       return
+}
+
+// FreeDiskSpace returns the number of unused 1k blocks available on
+// the volume.
+//
+//     TODO(twp): consider integrating this better with VolumeStatus
+//     (e.g. keep a NodeStatus object up-to-date periodically and use
+//     it as the source of info)
+//
+func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+       var fs syscall.Statfs_t
+       err = syscall.Statfs(v.root, &fs)
+       if err == nil {
+               // Statfs output is not guaranteed to measure free
+               // space in terms of 1K blocks.
+               free = fs.Bavail * uint64(fs.Bsize) / 1024
+       }
+       return
+}