Merge branch 'master' into 2449-keep-index-status-handlers
[arvados.git] / services / keep / keep.go
index 7cca5344d8a2abc074cfb0f0f4dbd66d4fd2f0af..8683abf100557c35bf96fe08279737d8ddb65058 100644 (file)
@@ -4,21 +4,24 @@ import (
        "bufio"
        "bytes"
        "crypto/md5"
+       "encoding/json"
        "errors"
+       "flag"
        "fmt"
        "github.com/gorilla/mux"
        "io/ioutil"
        "log"
        "net/http"
        "os"
+       "path/filepath"
        "strconv"
        "strings"
        "syscall"
        "time"
 )
 
-// Default TCP port on which to listen for requests.
-const DEFAULT_PORT = 25107
+// Default TCP address on which to listen for requests.
+const DEFAULT_ADDR = ":25107"
 
 // A Keep "block" is 64MB.
 const BLOCKSIZE = 64 * 1024 * 1024
@@ -50,8 +53,42 @@ func (e *KeepError) Error() string {
 }
 
 func main() {
+       // Parse command-line flags:
+       //
+       // -listen=ipaddr:port
+       //    Interface on which to listen for requests. Use :port without
+       //    an ipaddr to listen on all network interfaces.
+       //    Examples:
+       //      -listen=127.0.0.1:4949
+       //      -listen=10.0.1.24:8000
+       //      -listen=:25107 (to listen to port 25107 on all interfaces)
+       //
+       // -volumes
+       //    A comma-separated list of directories to use as Keep volumes.
+       //    Example:
+       //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
+       //
+       //    If -volumes is empty or is not present, Keep will select volumes
+       //    by looking at currently mounted filesystems for /keep top-level
+       //    directories.
+
+       var listen, keepvols string
+       flag.StringVar(&listen, "listen", DEFAULT_ADDR,
+               "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
+       flag.StringVar(&keepvols, "volumes", "",
+               "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
+       flag.Parse()
+
        // Look for local keep volumes.
-       KeepVolumes = FindKeepVolumes()
+       if keepvols == "" {
+               // TODO(twp): decide whether this is desirable default behavior.
+               // In production we may want to require the admin to specify
+               // Keep volumes explicitly.
+               KeepVolumes = FindKeepVolumes()
+       } else {
+               KeepVolumes = strings.Split(keepvols, ",")
+       }
+
        if len(KeepVolumes) == 0 {
                log.Fatal("could not find any keep volumes")
        }
@@ -65,16 +102,18 @@ func main() {
        // appropriate handler.
        //
        rest := mux.NewRouter()
-       rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
-       rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
+       rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+       rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/status\.json`, StatusHandler).Methods("GET", "HEAD")
 
        // Tell the built-in HTTP server to direct all requests to the REST
        // router.
        http.Handle("/", rest)
 
        // Start listening for requests.
-       port := fmt.Sprintf(":%d", DEFAULT_PORT)
-       http.ListenAndServe(port, nil)
+       http.ListenAndServe(listen, nil)
 }
 
 // FindKeepVolumes
@@ -145,6 +184,144 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
        }
 }
 
+func IndexHandler(w http.ResponseWriter, req *http.Request) {
+       prefix := mux.Vars(req)["prefix"]
+
+       index := IndexLocators(prefix)
+       w.Write([]byte(index))
+}
+
+// StatusHandler
+//     Responds to /status.json requests with the current node status,
+//     described in a JSON structure.
+//
+//     The data given in a status.json response includes:
+//        time - the time the status was last updated
+//        df   - the output of the most recent `df --block-size=1k`
+//        disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d)
+//        dirs - an object describing Keep volumes, keyed on Keep volume dirs,
+//          each value is an object describing the status of that volume
+//            * status ("full [timestamp]" or "ok [timestamp]")
+//            * last_error
+//            * last_error_time
+//
+type VolumeStatus struct {
+       Space       string
+       LastErr     string
+       LastErrTime time.Time
+}
+
+type NodeStatus struct {
+       LastUpdate time.Time
+       DfOutput   string
+       DiskDev    []string
+       Volumes    map[string]VolumeStatus
+}
+
+func StatusHandler(w http.ResponseWriter, req *http.Request) {
+       st := new(NodeStatus)
+       st.LastUpdate = time.Now()
+
+       // Get a list of disk devices on this system.
+       st.DiskDev = make([]string, 1)
+       if devdir, err := os.Open("/dev"); err != nil {
+               log.Printf("StatusHandler: opening /dev: %s\n", err)
+       } else {
+               devs, err := devdir.Readdirnames(0)
+               if err == nil {
+                       for _, d := range devs {
+                               if strings.HasPrefix(d, "sd") ||
+                                       strings.HasPrefix(d, "hd") ||
+                                       strings.HasPrefix(d, "xvd") {
+                                       st.DiskDev = append(st.DiskDev, d)
+                               }
+                       }
+               } else {
+                       log.Printf("Readdirnames: %s", err)
+               }
+       }
+
+       for _, vol := range KeepVolumes {
+               st.Volumes[vol] = GetVolumeStatus(vol)
+       }
+
+       if jstat, err := json.Marshal(st); err == nil {
+               w.Write(jstat)
+       } else {
+               log.Printf("json.Marshal: %s\n", err)
+               log.Printf("NodeStatus = %v\n", st)
+               http.Error(w, err.Error(), 500)
+       }
+}
+
+// GetVolumeStatus
+//     Returns a VolumeStatus describing the requested volume.
+func GetVolumeStatus(volume string) VolumeStatus {
+       var isfull, lasterr string
+       var lasterrtime time.Time
+
+       if IsFull(volume) {
+               isfull = fmt.Sprintf("full %d", time.Now().Unix())
+       } else {
+               isfull = fmt.Sprintf("ok %d", time.Now().Unix())
+       }
+
+       // Not implemented yet
+       lasterr = ""
+       lasterrtime = time.Unix(0, 0)
+
+       return VolumeStatus{isfull, lasterr, lasterrtime}
+}
+
+// IndexLocators
+//     Returns a string containing a list of locator ids found on this
+//     Keep server.  If {prefix} is given, return only those locator
+//     ids that begin with the given prefix string.
+//
+//     The return string consists of a sequence of newline-separated
+//     strings in the format
+//
+//         locator+size modification-time
+//
+//     e.g.:
+//
+//         e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+//         e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+//         e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func IndexLocators(prefix string) string {
+       var output string
+       for _, vol := range KeepVolumes {
+               filepath.Walk(vol,
+                       func(path string, info os.FileInfo, err error) error {
+                               // This WalkFunc inspects each path in the volume
+                               // and prints an index line for all files that begin
+                               // with prefix.
+                               if err != nil {
+                                       log.Printf("IndexHandler: %s: walking to %s: %s",
+                                               vol, path, err)
+                                       return nil
+                               }
+                               locator := filepath.Base(path)
+                               // Skip directories that do not match prefix.
+                               // We know there is nothing interesting inside.
+                               if info.IsDir() &&
+                                       !strings.HasPrefix(locator, prefix) &&
+                                       !strings.HasPrefix(prefix, locator) {
+                                       return filepath.SkipDir
+                               }
+                               // Print filenames beginning with prefix
+                               if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+                                       output = output + fmt.Sprintf(
+                                               "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+                               }
+                               return nil
+                       })
+       }
+
+       return output
+}
+
 func GetBlock(hash string) ([]byte, error) {
        var buf = make([]byte, BLOCKSIZE)
 
@@ -328,6 +505,5 @@ func FreeDiskSpace(volume string) (free uint64, err error) {
                // space in terms of 1K blocks.
                free = fs.Bavail * uint64(fs.Bsize) / 1024
        }
-
        return
 }