From eca0d3d0a6938e08dde6067c4565754c45f5ebb7 Mon Sep 17 00:00:00 2001 From: Tim Pierce Date: Tue, 22 Apr 2014 17:19:43 -0400 Subject: [PATCH] Made KeepVolumes a slice of Volume objects, not strings. Moved more Unix-specific code into volume.go as part of the process. --- services/keep/src/keep/keep.go | 161 +++-------------------- services/keep/src/keep/keep_test.go | 34 +++-- services/keep/src/keep/volume.go | 195 +++++++++++++++++++++++++++- 3 files changed, 228 insertions(+), 162 deletions(-) diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go index e4a2675720..8b8ae17e4c 100644 --- a/services/keep/src/keep/keep.go +++ b/services/keep/src/keep/keep.go @@ -14,12 +14,9 @@ import ( "log" "net/http" "os" - "path/filepath" "regexp" - "strconv" "strings" "syscall" - "time" ) // ====================== @@ -40,7 +37,8 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024 var PROC_MOUNTS = "/proc/mounts" -var KeepVolumes []string +// KeepVolumes is a slice of volumes on which blocks can be stored. +var KeepVolumes []Volume // ========== // Error types. @@ -107,11 +105,11 @@ func main() { } // Check that the specified volumes actually exist. - KeepVolumes = []string(nil) + KeepVolumes = []Volume(nil) for _, v := range keepvols { if _, err := os.Stat(v); err == nil { log.Println("adding Keep volume:", v) - KeepVolumes = append(KeepVolumes, v) + KeepVolumes = append(KeepVolumes, &UnixVolume{v}) } else { log.Printf("bad Keep volume: %s\n", err) } @@ -226,7 +224,10 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) { func IndexHandler(w http.ResponseWriter, req *http.Request) { prefix := mux.Vars(req)["prefix"] - index := IndexLocators(prefix) + var index string + for _, vol := range KeepVolumes { + index = index + vol.Index(prefix) + } w.Write([]byte(index)) } @@ -273,7 +274,7 @@ func GetNodeStatus() *NodeStatus { st.Volumes = make([]*VolumeStatus, len(KeepVolumes)) for i, vol := range KeepVolumes { - st.Volumes[i] = GetVolumeStatus(vol) + st.Volumes[i] = vol.Status() } return st } @@ -305,66 +306,10 @@ func GetVolumeStatus(volume string) *VolumeStatus { return &VolumeStatus{volume, devnum, free, used} } -// IndexLocators -// Returns a string containing a list of locator ids found on this -// Keep server. If {prefix} is given, return only those locator -// ids that begin with the given prefix string. -// -// The return string consists of a sequence of newline-separated -// strings in the format -// -// locator+size modification-time -// -// e.g.: -// -// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303 -// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043 -// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 -// -func IndexLocators(prefix string) string { - var output string - for _, vol := range KeepVolumes { - filepath.Walk(vol, - func(path string, info os.FileInfo, err error) error { - // This WalkFunc inspects each path in the volume - // and prints an index line for all files that begin - // with prefix. - if err != nil { - log.Printf("IndexHandler: %s: walking to %s: %s", - vol, path, err) - return nil - } - locator := filepath.Base(path) - // Skip directories that do not match prefix. - // We know there is nothing interesting inside. - if info.IsDir() && - !strings.HasPrefix(locator, prefix) && - !strings.HasPrefix(prefix, locator) { - return filepath.SkipDir - } - // Skip any file that is not apparently a locator, e.g. .meta files - if is_valid, err := IsValidLocator(locator); err != nil { - return err - } else if !is_valid { - return nil - } - // Print filenames beginning with prefix - if !info.IsDir() && strings.HasPrefix(locator, prefix) { - output = output + fmt.Sprintf( - "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix()) - } - return nil - }) - } - - return output -} - func GetBlock(hash string) ([]byte, error) { // Attempt to read the requested hash from a keep volume. for _, vol := range KeepVolumes { - uv := UnixVolume{vol} - if buf, err := uv.Read(hash); err != nil { + if buf, err := vol.Read(hash); err != nil { // IsNotExist is an expected error and may be ignored. // (If all volumes report IsNotExist, we return a NotFoundError) // A CorruptError should be returned immediately. @@ -438,39 +383,16 @@ func PutBlock(block []byte, hash string) error { // Store the block on the first available Keep volume. allFull := true for _, vol := range KeepVolumes { - if IsFull(vol) { - continue - } - allFull = false - blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3]) - if err := os.MkdirAll(blockDir, 0755); err != nil { - log.Printf("%s: could not create directory %s: %s", - hash, blockDir, err) - continue - } - - tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash) - if tmperr != nil { - log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr) - continue - } - blockFilename := fmt.Sprintf("%s/%s", blockDir, hash) - - if _, err := tmpfile.Write(block); err != nil { - log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err) - continue + err := vol.Write(hash, block) + if err == nil { + return nil // success! } - if err := tmpfile.Close(); err != nil { - log.Printf("closing %s: %s\n", tmpfile.Name(), err) - os.Remove(tmpfile.Name()) - continue + if err != FullError { + // The volume is not full but the write did not succeed. + // Report the error and continue trying. + allFull = false + log.Printf("%s: Write(%s): %s\n", vol, hash, err) } - if err := os.Rename(tmpfile.Name(), blockFilename); err != nil { - log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err) - os.Remove(tmpfile.Name()) - continue - } - return nil } if allFull { @@ -482,53 +404,6 @@ func PutBlock(block []byte, hash string) error { } } -func IsFull(volume string) (isFull bool) { - fullSymlink := volume + "/full" - - // Check if the volume has been marked as full in the last hour. - if link, err := os.Readlink(fullSymlink); err == nil { - if ts, err := strconv.Atoi(link); err == nil { - fulltime := time.Unix(int64(ts), 0) - if time.Since(fulltime).Hours() < 1.0 { - return true - } - } - } - - if avail, err := FreeDiskSpace(volume); err == nil { - isFull = avail < MIN_FREE_KILOBYTES - } else { - log.Printf("%s: FreeDiskSpace: %s\n", volume, err) - isFull = false - } - - // If the volume is full, timestamp it. - if isFull { - now := fmt.Sprintf("%d", time.Now().Unix()) - os.Symlink(now, fullSymlink) - } - return -} - -// FreeDiskSpace(volume) -// Returns the amount of available disk space on VOLUME, -// as a number of 1k blocks. -// -// TODO(twp): consider integrating this better with -// VolumeStatus (e.g. keep a NodeStatus object up-to-date -// periodically and use it as the source of info) -// -func FreeDiskSpace(volume string) (free uint64, err error) { - var fs syscall.Statfs_t - err = syscall.Statfs(volume, &fs) - if err == nil { - // Statfs output is not guaranteed to measure free - // space in terms of 1K blocks. - free = fs.Bavail * uint64(fs.Bsize) / 1024 - } - return -} - // ReadAtMost // Reads bytes repeatedly from an io.Reader until either // encountering EOF, or the maxbytes byte limit has been reached. diff --git a/services/keep/src/keep/keep_test.go b/services/keep/src/keep/keep_test.go index 97fa1c7891..ce5a41c915 100644 --- a/services/keep/src/keep/keep_test.go +++ b/services/keep/src/keep/keep_test.go @@ -37,6 +37,9 @@ var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.") // - use an interface to mock ioutil.TempFile with a File // object that always returns an error on write // +// TODO(twp): Make these tests less dependent on being able to access +// the UnixVolume root field. +// // ======================================== // GetBlock tests. // ======================================== @@ -136,7 +139,7 @@ func TestPutBlockOneVol(t *testing.T) { // Create two test Keep volumes, but cripple one of them. KeepVolumes = setup(t, 2) - os.Chmod(KeepVolumes[0], 000) + os.Chmod(KeepVolumes[0].(*UnixVolume).root, 000) // Check that PutBlock stores the data as expected. if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil { @@ -237,12 +240,12 @@ func TestFindKeepVolumes(t *testing.T) { defer teardown() // Initialize two keep volumes. - var tempVols []string = setup(t, 2) + var tempVols []Volume = setup(t, 2) // Set up a bogus PROC_MOUNTS file. if f, err := ioutil.TempFile("", "keeptest"); err == nil { for _, vol := range tempVols { - fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol)) + fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol.(*UnixVolume).root)) } f.Close() PROC_MOUNTS = f.Name() @@ -254,9 +257,10 @@ func TestFindKeepVolumes(t *testing.T) { len(tempVols), len(resultVols)) } for i := range tempVols { - if tempVols[i] != resultVols[i] { + tempVolRoot := tempVols[i].(*UnixVolume).root + if tempVolRoot != resultVols[i] { t.Errorf("FindKeepVolumes returned %s, expected %s\n", - resultVols[i], tempVols[i]) + tempVolRoot, tempVols[i]) } } @@ -305,7 +309,7 @@ func TestIndex(t *testing.T) { store(t, KeepVolumes[0], TEST_HASH+".meta", []byte("metadata")) store(t, KeepVolumes[1], TEST_HASH_2+".meta", []byte("metadata")) - index := IndexLocators("") + index := KeepVolumes[0].Index("") + KeepVolumes[1].Index("") expected := `^` + TEST_HASH + `\+\d+ \d+\n` + TEST_HASH_3 + `\+\d+ \d+\n` + TEST_HASH_2 + `\+\d+ \d+\n$` @@ -337,7 +341,7 @@ func TestNodeStatus(t *testing.T) { for i, vol := range KeepVolumes { volinfo := st.Volumes[i] mtp := volinfo.MountPoint - if mtp != vol { + if mtp != vol.(*UnixVolume).root { t.Errorf("GetNodeStatus mount_point %s != KeepVolume %s", mtp, vol) } if volinfo.DeviceNum == 0 { @@ -360,12 +364,13 @@ func TestNodeStatus(t *testing.T) { // Create KeepVolumes for testing. // Returns a slice of pathnames to temporary Keep volumes. // -func setup(t *testing.T, num_volumes int) []string { - vols := make([]string, num_volumes) +func setup(t *testing.T, num_volumes int) []Volume { + vols := make([]Volume, num_volumes) for i := range vols { if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil { - vols[i] = dir + "/keep" - os.Mkdir(vols[i], 0755) + root := dir + "/keep" + vols[i] = &UnixVolume{root} + os.Mkdir(root, 0755) } else { t.Fatal(err) } @@ -378,16 +383,17 @@ func setup(t *testing.T, num_volumes int) []string { // func teardown() { for _, vol := range KeepVolumes { - os.RemoveAll(path.Dir(vol)) + os.RemoveAll(path.Dir(vol.(*UnixVolume).root)) } KeepVolumes = nil } // store // Low-level code to write Keep blocks directly to disk for testing. +// Note: works only on UnixVolumes. // -func store(t *testing.T, keepdir string, filename string, block []byte) { - blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3]) +func store(t *testing.T, vol Volume, filename string, block []byte) { + blockdir := fmt.Sprintf("%s/%s", vol.(*UnixVolume).root, filename[:3]) if err := os.MkdirAll(blockdir, 0755); err != nil { t.Fatal(err) } diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go index 20113ffb33..745f984949 100644 --- a/services/keep/src/keep/volume.go +++ b/services/keep/src/keep/volume.go @@ -3,15 +3,23 @@ package main import ( "crypto/md5" "fmt" + "io/ioutil" "log" "os" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" ) // A Volume is an interface that represents a Keep back-end volume. type Volume interface { - Read(locator string) ([]byte, error) - Write(locator string, block []byte) error + Read(loc string) ([]byte, error) + Write(loc string, block []byte) error + Index(prefix string) string + Status() *VolumeStatus } // A UnixVolume is a Volume that writes to a locally mounted disk. @@ -19,12 +27,23 @@ type UnixVolume struct { root string // path to this volume } -func (v *UnixVolume) Read(locator string) ([]byte, error) { +// Read retrieves a block identified by the locator string "loc", and +// returns its contents as a byte slice. +// +// If the block could not be opened or read, Read returns a nil slice +// and the os.Error that was generated. +// +// If the block is present but its content hash does not match loc, +// Read returns the block and a CorruptError. It is the caller's +// responsibility to decide what (if anything) to do with the +// corrupted data block. +// +func (v *UnixVolume) Read(loc string) ([]byte, error) { var f *os.File var err error var nread int - blockFilename := fmt.Sprintf("%s/%s/%s", v.root, locator[0:3], locator) + blockFilename := fmt.Sprintf("%s/%s/%s", v.root, loc[0:3], loc) f, err = os.Open(blockFilename) if err != nil { @@ -41,7 +60,7 @@ func (v *UnixVolume) Read(locator string) ([]byte, error) { // Double check the file checksum. // filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread])) - if filehash != locator { + if filehash != loc { // TODO(twp): this condition probably represents a bad disk and // should raise major alarm bells for an administrator: e.g. // they should be sent directly to an event manager at high @@ -55,3 +74,169 @@ func (v *UnixVolume) Read(locator string) ([]byte, error) { // Success! return buf[:nread], nil } + +// Write stores a block of data identified by the locator string +// "loc". It returns nil on success. If the volume is full, it +// returns a FullError. If the write fails due to some other error, +// that error is returned. +// +func (v *UnixVolume) Write(loc string, block []byte) error { + if v.IsFull() { + return FullError + } + blockDir := fmt.Sprintf("%s/%s", v.root, loc[0:3]) + if err := os.MkdirAll(blockDir, 0755); err != nil { + log.Printf("%s: could not create directory %s: %s", + loc, blockDir, err) + return err + } + + tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc) + if tmperr != nil { + log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr) + return tmperr + } + blockFilename := fmt.Sprintf("%s/%s", blockDir, loc) + + if _, err := tmpfile.Write(block); err != nil { + log.Printf("%s: writing to %s: %s\n", v.root, blockFilename, err) + return err + } + if err := tmpfile.Close(); err != nil { + log.Printf("closing %s: %s\n", tmpfile.Name(), err) + os.Remove(tmpfile.Name()) + return err + } + if err := os.Rename(tmpfile.Name(), blockFilename); err != nil { + log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err) + os.Remove(tmpfile.Name()) + return err + } + return nil +} + +// Status returns a VolumeStatus struct describing the volume's +// current state. +// +func (v *UnixVolume) Status() *VolumeStatus { + var fs syscall.Statfs_t + var devnum uint64 + + if fi, err := os.Stat(v.root); err == nil { + devnum = fi.Sys().(*syscall.Stat_t).Dev + } else { + log.Printf("%s: os.Stat: %s\n", v.root, err) + return nil + } + + err := syscall.Statfs(v.root, &fs) + if err != nil { + log.Printf("%s: statfs: %s\n", v.root, err) + return nil + } + // These calculations match the way df calculates disk usage: + // "free" space is measured by fs.Bavail, but "used" space + // uses fs.Blocks - fs.Bfree. + free := fs.Bavail * uint64(fs.Bsize) + used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize) + return &VolumeStatus{v.root, devnum, free, used} +} + +// Index returns a list of blocks found on this volume which begin with +// the specified prefix. +// +// The return value is a multiline string (separated by +// newlines). Each line is in the format +// +// locator+size modification-time +// +// e.g.: +// +// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303 +// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043 +// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 +// +func (v *UnixVolume) Index(prefix string) (output string) { + filepath.Walk(v.root, + func(path string, info os.FileInfo, err error) error { + // This WalkFunc inspects each path in the volume + // and prints an index line for all files that begin + // with prefix. + if err != nil { + log.Printf("IndexHandler: %s: walking to %s: %s", + v.root, path, err) + return nil + } + locator := filepath.Base(path) + // Skip directories that do not match prefix. + // We know there is nothing interesting inside. + if info.IsDir() && + !strings.HasPrefix(locator, prefix) && + !strings.HasPrefix(prefix, locator) { + return filepath.SkipDir + } + // Skip any file that is not apparently a locator, e.g. .meta files + if is_valid, err := IsValidLocator(locator); err != nil { + return err + } else if !is_valid { + return nil + } + // Print filenames beginning with prefix + if !info.IsDir() && strings.HasPrefix(locator, prefix) { + output = output + fmt.Sprintf( + "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix()) + } + return nil + }) + + return +} + +// IsFull returns true if the free space on the volume is less than +// MIN_FREE_KILOBYTES. +// +func (v *UnixVolume) IsFull() (isFull bool) { + fullSymlink := v.root + "/full" + + // Check if the volume has been marked as full in the last hour. + if link, err := os.Readlink(fullSymlink); err == nil { + if ts, err := strconv.Atoi(link); err == nil { + fulltime := time.Unix(int64(ts), 0) + if time.Since(fulltime).Hours() < 1.0 { + return true + } + } + } + + if avail, err := v.FreeDiskSpace(); err == nil { + isFull = avail < MIN_FREE_KILOBYTES + } else { + log.Printf("%s: FreeDiskSpace: %s\n", v.root, err) + isFull = false + } + + // If the volume is full, timestamp it. + if isFull { + now := fmt.Sprintf("%d", time.Now().Unix()) + os.Symlink(now, fullSymlink) + } + return +} + +// FreeDiskSpace returns the number of unused 1k blocks available on +// the volume. +// +// TODO(twp): consider integrating this better with VolumeStatus +// (e.g. keep a NodeStatus object up-to-date periodically and use +// it as the source of info) +// +func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) { + var fs syscall.Statfs_t + err = syscall.Statfs(v.root, &fs) + if err == nil { + // Statfs output is not guaranteed to measure free + // space in terms of 1K blocks. + free = fs.Bavail * uint64(fs.Bsize) / 1024 + } + return +} -- 2.30.2