X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e3c48fef662408636cb49fe1bb0a3c1040269e7c..75df7deca434d9b3b161dc2d7d13ae21b1fe4b08:/services/keep/keep.go?ds=sidebyside diff --git a/services/keep/keep.go b/services/keep/keep.go index 7cca5344d8..8683abf100 100644 --- a/services/keep/keep.go +++ b/services/keep/keep.go @@ -4,21 +4,24 @@ import ( "bufio" "bytes" "crypto/md5" + "encoding/json" "errors" + "flag" "fmt" "github.com/gorilla/mux" "io/ioutil" "log" "net/http" "os" + "path/filepath" "strconv" "strings" "syscall" "time" ) -// Default TCP port on which to listen for requests. -const DEFAULT_PORT = 25107 +// Default TCP address on which to listen for requests. +const DEFAULT_ADDR = ":25107" // A Keep "block" is 64MB. const BLOCKSIZE = 64 * 1024 * 1024 @@ -50,8 +53,42 @@ func (e *KeepError) Error() string { } func main() { + // Parse command-line flags: + // + // -listen=ipaddr:port + // Interface on which to listen for requests. Use :port without + // an ipaddr to listen on all network interfaces. + // Examples: + // -listen=127.0.0.1:4949 + // -listen=10.0.1.24:8000 + // -listen=:25107 (to listen to port 25107 on all interfaces) + // + // -volumes + // A comma-separated list of directories to use as Keep volumes. + // Example: + // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir + // + // If -volumes is empty or is not present, Keep will select volumes + // by looking at currently mounted filesystems for /keep top-level + // directories. + + var listen, keepvols string + flag.StringVar(&listen, "listen", DEFAULT_ADDR, + "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.") + flag.StringVar(&keepvols, "volumes", "", + "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.") + flag.Parse() + // Look for local keep volumes. - KeepVolumes = FindKeepVolumes() + if keepvols == "" { + // TODO(twp): decide whether this is desirable default behavior. + // In production we may want to require the admin to specify + // Keep volumes explicitly. + KeepVolumes = FindKeepVolumes() + } else { + KeepVolumes = strings.Split(keepvols, ",") + } + if len(KeepVolumes) == 0 { log.Fatal("could not find any keep volumes") } @@ -65,16 +102,18 @@ func main() { // appropriate handler. // rest := mux.NewRouter() - rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET") - rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT") + rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD") + rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT") + rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD") + rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD") + rest.HandleFunc(`/status\.json`, StatusHandler).Methods("GET", "HEAD") // Tell the built-in HTTP server to direct all requests to the REST // router. http.Handle("/", rest) // Start listening for requests. - port := fmt.Sprintf(":%d", DEFAULT_PORT) - http.ListenAndServe(port, nil) + http.ListenAndServe(listen, nil) } // FindKeepVolumes @@ -145,6 +184,144 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) { } } +func IndexHandler(w http.ResponseWriter, req *http.Request) { + prefix := mux.Vars(req)["prefix"] + + index := IndexLocators(prefix) + w.Write([]byte(index)) +} + +// StatusHandler +// Responds to /status.json requests with the current node status, +// described in a JSON structure. +// +// The data given in a status.json response includes: +// time - the time the status was last updated +// df - the output of the most recent `df --block-size=1k` +// disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d) +// dirs - an object describing Keep volumes, keyed on Keep volume dirs, +// each value is an object describing the status of that volume +// * status ("full [timestamp]" or "ok [timestamp]") +// * last_error +// * last_error_time +// +type VolumeStatus struct { + Space string + LastErr string + LastErrTime time.Time +} + +type NodeStatus struct { + LastUpdate time.Time + DfOutput string + DiskDev []string + Volumes map[string]VolumeStatus +} + +func StatusHandler(w http.ResponseWriter, req *http.Request) { + st := new(NodeStatus) + st.LastUpdate = time.Now() + + // Get a list of disk devices on this system. + st.DiskDev = make([]string, 1) + if devdir, err := os.Open("/dev"); err != nil { + log.Printf("StatusHandler: opening /dev: %s\n", err) + } else { + devs, err := devdir.Readdirnames(0) + if err == nil { + for _, d := range devs { + if strings.HasPrefix(d, "sd") || + strings.HasPrefix(d, "hd") || + strings.HasPrefix(d, "xvd") { + st.DiskDev = append(st.DiskDev, d) + } + } + } else { + log.Printf("Readdirnames: %s", err) + } + } + + for _, vol := range KeepVolumes { + st.Volumes[vol] = GetVolumeStatus(vol) + } + + if jstat, err := json.Marshal(st); err == nil { + w.Write(jstat) + } else { + log.Printf("json.Marshal: %s\n", err) + log.Printf("NodeStatus = %v\n", st) + http.Error(w, err.Error(), 500) + } +} + +// GetVolumeStatus +// Returns a VolumeStatus describing the requested volume. +func GetVolumeStatus(volume string) VolumeStatus { + var isfull, lasterr string + var lasterrtime time.Time + + if IsFull(volume) { + isfull = fmt.Sprintf("full %d", time.Now().Unix()) + } else { + isfull = fmt.Sprintf("ok %d", time.Now().Unix()) + } + + // Not implemented yet + lasterr = "" + lasterrtime = time.Unix(0, 0) + + return VolumeStatus{isfull, lasterr, lasterrtime} +} + +// IndexLocators +// Returns a string containing a list of locator ids found on this +// Keep server. If {prefix} is given, return only those locator +// ids that begin with the given prefix string. +// +// The return string consists of a sequence of newline-separated +// strings in the format +// +// locator+size modification-time +// +// e.g.: +// +// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303 +// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043 +// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 +// +func IndexLocators(prefix string) string { + var output string + for _, vol := range KeepVolumes { + filepath.Walk(vol, + func(path string, info os.FileInfo, err error) error { + // This WalkFunc inspects each path in the volume + // and prints an index line for all files that begin + // with prefix. + if err != nil { + log.Printf("IndexHandler: %s: walking to %s: %s", + vol, path, err) + return nil + } + locator := filepath.Base(path) + // Skip directories that do not match prefix. + // We know there is nothing interesting inside. + if info.IsDir() && + !strings.HasPrefix(locator, prefix) && + !strings.HasPrefix(prefix, locator) { + return filepath.SkipDir + } + // Print filenames beginning with prefix + if !info.IsDir() && strings.HasPrefix(locator, prefix) { + output = output + fmt.Sprintf( + "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix()) + } + return nil + }) + } + + return output +} + func GetBlock(hash string) ([]byte, error) { var buf = make([]byte, BLOCKSIZE) @@ -328,6 +505,5 @@ func FreeDiskSpace(volume string) (free uint64, err error) { // space in terms of 1K blocks. free = fs.Bavail * uint64(fs.Bsize) / 1024 } - return }