Write status output in JSON. (refs #2561)
[arvados.git] / services / keep / keep.go
index 48ce9e8d0feb5d23c4a3f74ce38d67d3a9ea5f88..71c2a8b9138f97d010fce4f5e48cf9665d3cad3f 100644 (file)
@@ -2,30 +2,53 @@ package main
 
 import (
        "bufio"
+       "bytes"
        "crypto/md5"
+       "encoding/json"
        "errors"
        "fmt"
        "github.com/gorilla/mux"
+       "io/ioutil"
        "log"
        "net/http"
        "os"
+       "path/filepath"
+       "strconv"
        "strings"
+       "syscall"
+       "time"
 )
 
+// Default TCP port on which to listen for requests.
 const DEFAULT_PORT = 25107
+
+// A Keep "block" is 64MB.
 const BLOCKSIZE = 64 * 1024 * 1024
 
+// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// in order to permit writes.
+const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+
 var PROC_MOUNTS = "/proc/mounts"
 
 var KeepVolumes []string
 
 type KeepError struct {
-       HTTPCode     int
-       ErrorMessage string
+       HTTPCode int
+       Err      error
 }
 
+const (
+       ErrCollision = 400
+       ErrMD5Fail   = 401
+       ErrCorrupt   = 402
+       ErrNotFound  = 404
+       ErrOther     = 500
+       ErrFull      = 503
+)
+
 func (e *KeepError) Error() string {
-       return e.ErrorMessage
+       return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
 }
 
 func main() {
@@ -44,8 +67,11 @@ func main() {
        // appropriate handler.
        //
        rest := mux.NewRouter()
-       rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
-       rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
+       rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+       rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/status\.json`, StatusHandler).Methods("GET", "HEAD")
 
        // Tell the built-in HTTP server to direct all requests to the REST
        // router.
@@ -110,19 +136,158 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
        // TODO(twp): decide what to do when the input stream contains
        // more than BLOCKSIZE bytes.
        //
-       var nread int
-       buf := make([]string, BLOCKSIZE)
+       buf := make([]byte, BLOCKSIZE)
        if nread, err := req.Body.Read(buf); err == nil {
                if err := PutBlock(buf[:nread], hash); err == nil {
                        w.WriteHeader(http.StatusOK)
                } else {
-                       http.Error(w, err.Error(), err.HTTPCode)
+                       ke := err.(*KeepError)
+                       http.Error(w, ke.Error(), ke.HTTPCode)
                }
        } else {
+               log.Println("error reading request: ", err)
                http.Error(w, err.Error(), 500)
        }
 }
 
+func IndexHandler(w http.ResponseWriter, req *http.Request) {
+       prefix := mux.Vars(req)["prefix"]
+
+       index := IndexLocators(prefix)
+       w.Write([]byte(index))
+}
+
+// StatusHandler
+//     Responds to /status.json requests with the current node status,
+//     described in a JSON structure.
+//
+//     The data given in a status.json response includes:
+//        time - the time the status was last updated
+//        df   - the output of the most recent `df --block-size=1k`
+//        disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d)
+//        dirs - an object describing Keep volumes, keyed on Keep volume dirs,
+//          each value is an object describing the status of that volume
+//            * status ("full [timestamp]" or "ok [timestamp]")
+//            * last_error
+//            * last_error_time
+//
+type VolumeStatus struct {
+       Space       string
+       LastErr     string
+       LastErrTime time.Time
+}
+
+type NodeStatus struct {
+       LastUpdate time.Time
+       DfOutput   string
+       DiskDev    []string
+       Volumes    map[string]VolumeStatus
+}
+
+func StatusHandler(w http.ResponseWriter, req *http.Request) {
+       st := new(NodeStatus)
+       st.LastUpdate = time.Now()
+
+       // Get a list of disk devices on this system.
+       st.DiskDev = make([]string, 1)
+       if devdir, err := os.Open("/dev"); err != nil {
+               log.Printf("StatusHandler: opening /dev: %s\n", err)
+       } else {
+               devs, err := devdir.Readdirnames(0)
+               if err == nil {
+                       for _, d := range devs {
+                               if strings.HasPrefix(d, "sd") ||
+                                       strings.HasPrefix(d, "hd") ||
+                                       strings.HasPrefix(d, "xvd") {
+                                       st.DiskDev = append(st.DiskDev, d)
+                               }
+                       }
+               } else {
+                       log.Printf("Readdirnames: %s", err)
+               }
+       }
+
+       for _, vol := range KeepVolumes {
+               st.Volumes[vol] = GetVolumeStatus(vol)
+       }
+
+       if jstat, err := json.Marshal(st); err == nil {
+               w.Write(jstat)
+       } else {
+               log.Printf("json.Marshal: %s\n", err)
+               log.Printf("NodeStatus = %v\n", st)
+               http.Error(w, err.Error(), 500)
+       }
+}
+
+// GetVolumeStatus
+//     Returns a VolumeStatus describing the requested volume.
+func GetVolumeStatus(volume string) VolumeStatus {
+       var isfull, lasterr string
+       var lasterrtime time.Time
+
+       if IsFull(volume) {
+               isfull = fmt.Sprintf("full %d", time.Now().Unix())
+       } else {
+               isfull = fmt.Sprintf("ok %d", time.Now().Unix())
+       }
+
+       // Not implemented yet
+       lasterr = ""
+       lasterrtime = time.Unix(0, 0)
+
+       return VolumeStatus{isfull, lasterr, lasterrtime}
+}
+
+// IndexLocators
+//     Returns a string containing a list of locator ids found on this
+//     Keep server.  If {prefix} is given, return only those locator
+//     ids that begin with the given prefix string.
+//
+//     The return string consists of a sequence of newline-separated
+//     strings in the format
+//
+//         locator+size modification-time
+//
+//     e.g.:
+//
+//         e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+//         e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+//         e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func IndexLocators(prefix string) string {
+       var output string
+       for _, vol := range KeepVolumes {
+               filepath.Walk(vol,
+                       func(path string, info os.FileInfo, err error) error {
+                               // This WalkFunc inspects each path in the volume
+                               // and prints an index line for all files that begin
+                               // with prefix.
+                               if err != nil {
+                                       log.Printf("IndexHandler: %s: walking to %s: %s",
+                                               vol, path, err)
+                                       return nil
+                               }
+                               locator := filepath.Base(path)
+                               // Skip directories that do not match prefix.
+                               // We know there is nothing interesting inside.
+                               if info.IsDir() &&
+                                       !strings.HasPrefix(locator, prefix) &&
+                                       !strings.HasPrefix(prefix, locator) {
+                                       return filepath.SkipDir
+                               }
+                               // Print filenames beginning with prefix
+                               if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+                                       output = output + fmt.Sprintf(
+                                               "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+                               }
+                               return nil
+                       })
+       }
+
+       return output
+}
+
 func GetBlock(hash string) ([]byte, error) {
        var buf = make([]byte, BLOCKSIZE)
 
@@ -132,17 +297,21 @@ func GetBlock(hash string) ([]byte, error) {
                var err error
                var nread int
 
-               path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
+               blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
 
-               f, err = os.Open(path)
+               f, err = os.Open(blockFilename)
                if err != nil {
-                       log.Printf("%s: opening %s: %s\n", vol, path, err)
+                       if !os.IsNotExist(err) {
+                               // A block is stored on only one Keep disk,
+                               // so os.IsNotExist is expected.  Report any other errors.
+                               log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
+                       }
                        continue
                }
 
                nread, err = f.Read(buf)
                if err != nil {
-                       log.Printf("%s: reading %s: %s\n", vol, path, err)
+                       log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
                        continue
                }
 
@@ -156,8 +325,8 @@ func GetBlock(hash string) ([]byte, error) {
                        // priority or logged as urgent problems.
                        //
                        log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
-                               vol, path, filehash)
-                       continue
+                               vol, blockFilename, filehash)
+                       return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
                }
 
                // Success!
@@ -165,7 +334,7 @@ func GetBlock(hash string) ([]byte, error) {
        }
 
        log.Printf("%s: not found on any volumes, giving up\n", hash)
-       return buf, KeepError{404, "not found: " + hash}
+       return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
 }
 
 /* PutBlock(block, hash)
@@ -174,68 +343,133 @@ func GetBlock(hash string) ([]byte, error) {
    The MD5 checksum of the block must be identical to the content id HASH.
    If not, an error is returned.
 
-   PutBlock stores the BLOCK in each of the available Keep volumes.
-   If any volume fails, an error is signaled on the back end.  A write
-   error is returned only if all volumes fail.
+   PutBlock stores the BLOCK on the first Keep volume with free space.
+   A failure code is returned to the user only if all volumes fail.
 
    On success, PutBlock returns nil.
    On failure, it returns a KeepError with one of the following codes:
 
    400 Collision
-         -- A different object already exists in Keep with the same hash.
-            This check provides protection against (unlikely) MD5 collisions.
+          A different block with the same hash already exists on this
+          Keep server.
    401 MD5Fail
-         -- The MD5 hash of the BLOCK does not match the argument HASH.
+          The MD5 hash of the BLOCK does not match the argument HASH.
    503 Full
-         -- There was not enough space left in any Keep volume to store
-            the object.
+          There was not enough space left in any Keep volume to store
+          the object.
    500 Fail
-         -- The object could not be stored for some other reason (e.g.
-            all writes failed). The text of the error message should
-            provide as much detail as possible.
+          The object could not be stored for some other reason (e.g.
+          all writes failed). The text of the error message should
+          provide as much detail as possible.
 */
 
-func PutBlock(block []string, hash string) error {
+func PutBlock(block []byte, hash string) error {
        // Check that BLOCK's checksum matches HASH.
-       if md5.Sum(block) != hash {
-               return KeepError{401, "MD5Fail"}
+       blockhash := fmt.Sprintf("%x", md5.Sum(block))
+       if blockhash != hash {
+               log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
+               return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
        }
 
-       // any_success will be set to true upon a successful block write.
-       any_success := false
-       for _, vol := range KeepVolumes {
-               var f *os.File
-               var err error
-               var nread int
+       // If we already have a block on disk under this identifier, return
+       // success (but check for MD5 collisions).
+       // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
+       // In either case, we want to write our new (good) block to disk, so there is
+       // nothing special to do if err != nil.
+       if oldblock, err := GetBlock(hash); err == nil {
+               if bytes.Compare(block, oldblock) == 0 {
+                       return nil
+               } else {
+                       return &KeepError{ErrCollision, errors.New("Collision")}
+               }
+       }
 
-               path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
+       // Store the block on the first available Keep volume.
+       allFull := true
+       for _, vol := range KeepVolumes {
+               if IsFull(vol) {
+                       continue
+               }
+               allFull = false
+               blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
+               if err := os.MkdirAll(blockDir, 0755); err != nil {
+                       log.Printf("%s: could not create directory %s: %s",
+                               hash, blockDir, err)
+                       continue
+               }
 
-               f, err = os.OpenFile(path, os.O_CREATE, 0644)
-               if err != nil {
-                       // if the block already exists, just skip to the next volume.
-                       if os.IsExist(err) {
-                               any_success = true
-                               continue
-                       } else {
-                               // Open failed for some other reason.
-                               log.Fprintf("%s: writing to %s: %s\n", vol, path, err)
-                               continue
-                       }
+               tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
+               if tmperr != nil {
+                       log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
+                       continue
                }
+               blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
 
-               if nwrite, err := f.Write(block); err == nil {
-                       f.Close()
-                       any_success = true
+               if _, err := tmpfile.Write(block); err != nil {
+                       log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
                        continue
-               } else {
-                       log.Fprintf("%s: writing to %s: %s\n", vol, path, err)
+               }
+               if err := tmpfile.Close(); err != nil {
+                       log.Printf("closing %s: %s\n", tmpfile.Name(), err)
+                       os.Remove(tmpfile.Name())
+                       continue
+               }
+               if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
+                       log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+                       os.Remove(tmpfile.Name())
                        continue
                }
+               return nil
        }
 
-       if any_success {
-               return nil
+       if allFull {
+               log.Printf("all Keep volumes full")
+               return &KeepError{ErrFull, errors.New("Full")}
+       } else {
+               log.Printf("all Keep volumes failed")
+               return &KeepError{ErrOther, errors.New("Fail")}
+       }
+}
+
+func IsFull(volume string) (isFull bool) {
+       fullSymlink := volume + "/full"
+
+       // Check if the volume has been marked as full in the last hour.
+       if link, err := os.Readlink(fullSymlink); err == nil {
+               if ts, err := strconv.Atoi(link); err == nil {
+                       fulltime := time.Unix(int64(ts), 0)
+                       if time.Since(fulltime).Hours() < 1.0 {
+                               return true
+                       }
+               }
+       }
+
+       if avail, err := FreeDiskSpace(volume); err == nil {
+               isFull = avail < MIN_FREE_KILOBYTES
        } else {
-               return KeepError{500, "Fail"}
+               log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
+               isFull = false
        }
+
+       // If the volume is full, timestamp it.
+       if isFull {
+               now := fmt.Sprintf("%d", time.Now().Unix())
+               os.Symlink(now, fullSymlink)
+       }
+       return
+}
+
+// FreeDiskSpace(volume)
+//     Returns the amount of available disk space on VOLUME,
+//     as a number of 1k blocks.
+//
+func FreeDiskSpace(volume string) (free uint64, err error) {
+       var fs syscall.Statfs_t
+       err = syscall.Statfs(volume, &fs)
+       if err == nil {
+               // Statfs output is not guaranteed to measure free
+               // space in terms of 1K blocks.
+               free = fs.Bavail * uint64(fs.Bsize) / 1024
+       }
+       return
 }