"bufio"
"bytes"
"crypto/md5"
+ "encoding/json"
"errors"
+ "flag"
"fmt"
"github.com/gorilla/mux"
"io/ioutil"
"log"
"net/http"
"os"
+ "path/filepath"
"strconv"
"strings"
"syscall"
"time"
)
-// Default TCP port on which to listen for requests.
-const DEFAULT_PORT = 25107
+// Default TCP address on which to listen for requests.
+const DEFAULT_ADDR = ":25107"
// A Keep "block" is 64MB.
const BLOCKSIZE = 64 * 1024 * 1024
}
func main() {
+ // Parse command-line flags:
+ //
+ // -listen=ipaddr:port
+ // Interface on which to listen for requests. Use :port without
+ // an ipaddr to listen on all network interfaces.
+ // Examples:
+ // -listen=127.0.0.1:4949
+ // -listen=10.0.1.24:8000
+ // -listen=:25107 (to listen to port 25107 on all interfaces)
+ //
+ // -volumes
+ // A comma-separated list of directories to use as Keep volumes.
+ // Example:
+ // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
+ //
+ // If -volumes is empty or is not present, Keep will select volumes
+ // by looking at currently mounted filesystems for /keep top-level
+ // directories.
+
+ var listen, keepvols string
+ flag.StringVar(&listen, "listen", DEFAULT_ADDR,
+ "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
+ flag.StringVar(&keepvols, "volumes", "",
+ "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
+ flag.Parse()
+
// Look for local keep volumes.
- KeepVolumes = FindKeepVolumes()
+ if keepvols == "" {
+ // TODO(twp): decide whether this is desirable default behavior.
+ // In production we may want to require the admin to specify
+ // Keep volumes explicitly.
+ KeepVolumes = FindKeepVolumes()
+ } else {
+ KeepVolumes = strings.Split(keepvols, ",")
+ }
+
if len(KeepVolumes) == 0 {
log.Fatal("could not find any keep volumes")
}
// appropriate handler.
//
rest := mux.NewRouter()
- rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
- rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+ rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/status\.json`, StatusHandler).Methods("GET", "HEAD")
// Tell the built-in HTTP server to direct all requests to the REST
// router.
http.Handle("/", rest)
// Start listening for requests.
- port := fmt.Sprintf(":%d", DEFAULT_PORT)
- http.ListenAndServe(port, nil)
+ http.ListenAndServe(listen, nil)
}
// FindKeepVolumes
}
}
+func IndexHandler(w http.ResponseWriter, req *http.Request) {
+ prefix := mux.Vars(req)["prefix"]
+
+ index := IndexLocators(prefix)
+ w.Write([]byte(index))
+}
+
+// StatusHandler
+// Responds to /status.json requests with the current node status,
+// described in a JSON structure.
+//
+// The data given in a status.json response includes:
+// time - the time the status was last updated
+// df - the output of the most recent `df --block-size=1k`
+// disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d)
+// dirs - an object describing Keep volumes, keyed on Keep volume dirs,
+// each value is an object describing the status of that volume
+// * status ("full [timestamp]" or "ok [timestamp]")
+// * last_error
+// * last_error_time
+//
+type VolumeStatus struct {
+ Space string
+ LastErr string
+ LastErrTime time.Time
+}
+
+type NodeStatus struct {
+ LastUpdate time.Time
+ DfOutput string
+ DiskDev []string
+ Volumes map[string]VolumeStatus
+}
+
+func StatusHandler(w http.ResponseWriter, req *http.Request) {
+ st := new(NodeStatus)
+ st.LastUpdate = time.Now()
+
+ // Get a list of disk devices on this system.
+ st.DiskDev = make([]string, 1)
+ if devdir, err := os.Open("/dev"); err != nil {
+ log.Printf("StatusHandler: opening /dev: %s\n", err)
+ } else {
+ devs, err := devdir.Readdirnames(0)
+ if err == nil {
+ for _, d := range devs {
+ if strings.HasPrefix(d, "sd") ||
+ strings.HasPrefix(d, "hd") ||
+ strings.HasPrefix(d, "xvd") {
+ st.DiskDev = append(st.DiskDev, d)
+ }
+ }
+ } else {
+ log.Printf("Readdirnames: %s", err)
+ }
+ }
+
+ for _, vol := range KeepVolumes {
+ st.Volumes[vol] = GetVolumeStatus(vol)
+ }
+
+ if jstat, err := json.Marshal(st); err == nil {
+ w.Write(jstat)
+ } else {
+ log.Printf("json.Marshal: %s\n", err)
+ log.Printf("NodeStatus = %v\n", st)
+ http.Error(w, err.Error(), 500)
+ }
+}
+
+// GetVolumeStatus
+// Returns a VolumeStatus describing the requested volume.
+func GetVolumeStatus(volume string) VolumeStatus {
+ var isfull, lasterr string
+ var lasterrtime time.Time
+
+ if IsFull(volume) {
+ isfull = fmt.Sprintf("full %d", time.Now().Unix())
+ } else {
+ isfull = fmt.Sprintf("ok %d", time.Now().Unix())
+ }
+
+ // Not implemented yet
+ lasterr = ""
+ lasterrtime = time.Unix(0, 0)
+
+ return VolumeStatus{isfull, lasterr, lasterrtime}
+}
+
+// IndexLocators
+// Returns a string containing a list of locator ids found on this
+// Keep server. If {prefix} is given, return only those locator
+// ids that begin with the given prefix string.
+//
+// The return string consists of a sequence of newline-separated
+// strings in the format
+//
+// locator+size modification-time
+//
+// e.g.:
+//
+// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func IndexLocators(prefix string) string {
+ var output string
+ for _, vol := range KeepVolumes {
+ filepath.Walk(vol,
+ func(path string, info os.FileInfo, err error) error {
+ // This WalkFunc inspects each path in the volume
+ // and prints an index line for all files that begin
+ // with prefix.
+ if err != nil {
+ log.Printf("IndexHandler: %s: walking to %s: %s",
+ vol, path, err)
+ return nil
+ }
+ locator := filepath.Base(path)
+ // Skip directories that do not match prefix.
+ // We know there is nothing interesting inside.
+ if info.IsDir() &&
+ !strings.HasPrefix(locator, prefix) &&
+ !strings.HasPrefix(prefix, locator) {
+ return filepath.SkipDir
+ }
+ // Print filenames beginning with prefix
+ if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+ output = output + fmt.Sprintf(
+ "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+ }
+ return nil
+ })
+ }
+
+ return output
+}
+
func GetBlock(hash string) ([]byte, error) {
var buf = make([]byte, BLOCKSIZE)
// space in terms of 1K blocks.
free = fs.Bavail * uint64(fs.Bsize) / 1024
}
-
return
}