import (
"bufio"
+ "bytes"
"crypto/md5"
+ "encoding/json"
"errors"
"fmt"
"github.com/gorilla/mux"
+ "io/ioutil"
"log"
"net/http"
"os"
+ "path/filepath"
+ "strconv"
"strings"
+ "syscall"
+ "time"
)
+// Default TCP port on which to listen for requests.
const DEFAULT_PORT = 25107
+
+// A Keep "block" is 64MB.
const BLOCKSIZE = 64 * 1024 * 1024
+// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// in order to permit writes.
+const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+
var PROC_MOUNTS = "/proc/mounts"
var KeepVolumes []string
Err error
}
+const (
+ ErrCollision = 400
+ ErrMD5Fail = 401
+ ErrCorrupt = 402
+ ErrNotFound = 404
+ ErrOther = 500
+ ErrFull = 503
+)
+
func (e *KeepError) Error() string {
return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
}
// appropriate handler.
//
rest := mux.NewRouter()
- rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
- rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+ rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/status\.json`, StatusHandler).Methods("GET", "HEAD")
// Tell the built-in HTTP server to direct all requests to the REST
// router.
}
}
+func IndexHandler(w http.ResponseWriter, req *http.Request) {
+ prefix := mux.Vars(req)["prefix"]
+
+ index := IndexLocators(prefix)
+ w.Write([]byte(index))
+}
+
+// StatusHandler
+// Responds to /status.json requests with the current node status,
+// described in a JSON structure.
+//
+// The data given in a status.json response includes:
+// time - the time the status was last updated
+// df - the output of the most recent `df --block-size=1k`
+// disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d)
+// dirs - an object describing Keep volumes, keyed on Keep volume dirs,
+// each value is an object describing the status of that volume
+// * status ("full [timestamp]" or "ok [timestamp]")
+// * last_error
+// * last_error_time
+//
+type VolumeStatus struct {
+ Space string
+ LastErr string
+ LastErrTime time.Time
+}
+
+type NodeStatus struct {
+ LastUpdate time.Time
+ DfOutput string
+ DiskDev []string
+ Volumes map[string]VolumeStatus
+}
+
+func StatusHandler(w http.ResponseWriter, req *http.Request) {
+ st := new(NodeStatus)
+ st.LastUpdate = time.Now()
+
+ // Get a list of disk devices on this system.
+ st.DiskDev = make([]string, 1)
+ if devdir, err := os.Open("/dev"); err != nil {
+ log.Printf("StatusHandler: opening /dev: %s\n", err)
+ } else {
+ devs, err := devdir.Readdirnames(0)
+ if err == nil {
+ for _, d := range devs {
+ if strings.HasPrefix(d, "sd") ||
+ strings.HasPrefix(d, "hd") ||
+ strings.HasPrefix(d, "xvd") {
+ st.DiskDev = append(st.DiskDev, d)
+ }
+ }
+ } else {
+ log.Printf("Readdirnames: %s", err)
+ }
+ }
+
+ for _, vol := range KeepVolumes {
+ st.Volumes[vol] = GetVolumeStatus(vol)
+ }
+
+ if jstat, err := json.Marshal(st); err == nil {
+ w.Write(jstat)
+ } else {
+ log.Printf("json.Marshal: %s\n", err)
+ log.Printf("NodeStatus = %v\n", st)
+ http.Error(w, err.Error(), 500)
+ }
+}
+
+// GetVolumeStatus
+// Returns a VolumeStatus describing the requested volume.
+func GetVolumeStatus(volume string) VolumeStatus {
+ var isfull, lasterr string
+ var lasterrtime time.Time
+
+ if IsFull(volume) {
+ isfull = fmt.Sprintf("full %d", time.Now().Unix())
+ } else {
+ isfull = fmt.Sprintf("ok %d", time.Now().Unix())
+ }
+
+ // Not implemented yet
+ lasterr = ""
+ lasterrtime = time.Unix(0, 0)
+
+ return VolumeStatus{isfull, lasterr, lasterrtime}
+}
+
+// IndexLocators
+// Returns a string containing a list of locator ids found on this
+// Keep server. If {prefix} is given, return only those locator
+// ids that begin with the given prefix string.
+//
+// The return string consists of a sequence of newline-separated
+// strings in the format
+//
+// locator+size modification-time
+//
+// e.g.:
+//
+// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func IndexLocators(prefix string) string {
+ var output string
+ for _, vol := range KeepVolumes {
+ filepath.Walk(vol,
+ func(path string, info os.FileInfo, err error) error {
+ // This WalkFunc inspects each path in the volume
+ // and prints an index line for all files that begin
+ // with prefix.
+ if err != nil {
+ log.Printf("IndexHandler: %s: walking to %s: %s",
+ vol, path, err)
+ return nil
+ }
+ locator := filepath.Base(path)
+ // Skip directories that do not match prefix.
+ // We know there is nothing interesting inside.
+ if info.IsDir() &&
+ !strings.HasPrefix(locator, prefix) &&
+ !strings.HasPrefix(prefix, locator) {
+ return filepath.SkipDir
+ }
+ // Print filenames beginning with prefix
+ if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+ output = output + fmt.Sprintf(
+ "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+ }
+ return nil
+ })
+ }
+
+ return output
+}
+
func GetBlock(hash string) ([]byte, error) {
var buf = make([]byte, BLOCKSIZE)
//
log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
vol, blockFilename, filehash)
- continue
+ return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
}
// Success!
}
log.Printf("%s: not found on any volumes, giving up\n", hash)
- return buf, &KeepError{404, errors.New("not found: " + hash)}
+ return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
}
/* PutBlock(block, hash)
On success, PutBlock returns nil.
On failure, it returns a KeepError with one of the following codes:
+ 400 Collision
+ A different block with the same hash already exists on this
+ Keep server.
401 MD5Fail
- -- The MD5 hash of the BLOCK does not match the argument HASH.
+ The MD5 hash of the BLOCK does not match the argument HASH.
503 Full
- -- There was not enough space left in any Keep volume to store
- the object.
+ There was not enough space left in any Keep volume to store
+ the object.
500 Fail
- -- The object could not be stored for some other reason (e.g.
- all writes failed). The text of the error message should
- provide as much detail as possible.
+ The object could not be stored for some other reason (e.g.
+ all writes failed). The text of the error message should
+ provide as much detail as possible.
*/
func PutBlock(block []byte, hash string) error {
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return &KeepError{401, errors.New("MD5Fail")}
+ return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
}
- for _, vol := range KeepVolumes {
-
- // TODO(twp): check for a full volume here before trying to write.
+ // If we already have a block on disk under this identifier, return
+ // success (but check for MD5 collisions).
+ // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
+ // In either case, we want to write our new (good) block to disk, so there is
+ // nothing special to do if err != nil.
+ if oldblock, err := GetBlock(hash); err == nil {
+ if bytes.Compare(block, oldblock) == 0 {
+ return nil
+ } else {
+ return &KeepError{ErrCollision, errors.New("Collision")}
+ }
+ }
+ // Store the block on the first available Keep volume.
+ allFull := true
+ for _, vol := range KeepVolumes {
+ if IsFull(vol) {
+ continue
+ }
+ allFull = false
blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
if err := os.MkdirAll(blockDir, 0755); err != nil {
log.Printf("%s: could not create directory %s: %s",
continue
}
- blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
- f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
- if err != nil {
- // if the block already exists, just return success.
- // TODO(twp): should we check here whether the file on disk
- // matches the file we were asked to store?
- if os.IsExist(err) {
- return nil
- } else {
- // Open failed for some other reason.
- log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
- continue
- }
+ tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
+ if tmperr != nil {
+ log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
+ continue
}
+ blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
- if _, err := f.Write(block); err == nil {
- f.Close()
- return nil
- } else {
+ if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
continue
}
+ if err := tmpfile.Close(); err != nil {
+ log.Printf("closing %s: %s\n", tmpfile.Name(), err)
+ os.Remove(tmpfile.Name())
+ continue
+ }
+ if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
+ log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+ os.Remove(tmpfile.Name())
+ continue
+ }
+ return nil
}
- // All volumes failed; report the failure and return an error.
- //
- log.Printf("all Keep volumes failed")
- return &KeepError{500, errors.New("Fail")}
+ if allFull {
+ log.Printf("all Keep volumes full")
+ return &KeepError{ErrFull, errors.New("Full")}
+ } else {
+ log.Printf("all Keep volumes failed")
+ return &KeepError{ErrOther, errors.New("Fail")}
+ }
+}
+
+func IsFull(volume string) (isFull bool) {
+ fullSymlink := volume + "/full"
+
+ // Check if the volume has been marked as full in the last hour.
+ if link, err := os.Readlink(fullSymlink); err == nil {
+ if ts, err := strconv.Atoi(link); err == nil {
+ fulltime := time.Unix(int64(ts), 0)
+ if time.Since(fulltime).Hours() < 1.0 {
+ return true
+ }
+ }
+ }
+
+ if avail, err := FreeDiskSpace(volume); err == nil {
+ isFull = avail < MIN_FREE_KILOBYTES
+ } else {
+ log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
+ isFull = false
+ }
+
+ // If the volume is full, timestamp it.
+ if isFull {
+ now := fmt.Sprintf("%d", time.Now().Unix())
+ os.Symlink(now, fullSymlink)
+ }
+ return
+}
+
+// FreeDiskSpace(volume)
+// Returns the amount of available disk space on VOLUME,
+// as a number of 1k blocks.
+//
+func FreeDiskSpace(volume string) (free uint64, err error) {
+ var fs syscall.Statfs_t
+ err = syscall.Statfs(volume, &fs)
+ if err == nil {
+ // Statfs output is not guaranteed to measure free
+ // space in terms of 1K blocks.
+ free = fs.Bavail * uint64(fs.Bsize) / 1024
+ }
+ return
}