Added status.json handler. (refs #2561)
[arvados.git] / services / keep / keep.go
index 8f639db8e983bcbf65a43ddb7664762e85b3fc78..b96afff991864b567a5df91be4448c1629cf22b4 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bufio"
 
 import (
        "bufio"
+       "bytes"
        "crypto/md5"
        "errors"
        "fmt"
        "crypto/md5"
        "errors"
        "fmt"
@@ -9,13 +10,23 @@ import (
        "log"
        "net/http"
        "os"
        "log"
        "net/http"
        "os"
-       "path"
+       "path/filepath"
+       "strconv"
        "strings"
        "strings"
+       "syscall"
+       "time"
 )
 
 )
 
+// Default TCP port on which to listen for requests.
 const DEFAULT_PORT = 25107
 const DEFAULT_PORT = 25107
+
+// A Keep "block" is 64MB.
 const BLOCKSIZE = 64 * 1024 * 1024
 
 const BLOCKSIZE = 64 * 1024 * 1024
 
+// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// in order to permit writes.
+const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+
 var PROC_MOUNTS = "/proc/mounts"
 
 var KeepVolumes []string
 var PROC_MOUNTS = "/proc/mounts"
 
 var KeepVolumes []string
@@ -25,6 +36,15 @@ type KeepError struct {
        Err      error
 }
 
        Err      error
 }
 
+const (
+       ErrCollision = 400
+       ErrMD5Fail   = 401
+       ErrCorrupt   = 402
+       ErrNotFound  = 404
+       ErrOther     = 500
+       ErrFull      = 503
+)
+
 func (e *KeepError) Error() string {
        return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
 }
 func (e *KeepError) Error() string {
        return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
 }
@@ -45,8 +65,11 @@ func main() {
        // appropriate handler.
        //
        rest := mux.NewRouter()
        // appropriate handler.
        //
        rest := mux.NewRouter()
-       rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
-       rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
+       rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+       rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/status\.json`, StatusHandler).Methods("GET", "HEAD")
 
        // Tell the built-in HTTP server to direct all requests to the REST
        // router.
 
        // Tell the built-in HTTP server to direct all requests to the REST
        // router.
@@ -116,7 +139,8 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
                if err := PutBlock(buf[:nread], hash); err == nil {
                        w.WriteHeader(http.StatusOK)
                } else {
                if err := PutBlock(buf[:nread], hash); err == nil {
                        w.WriteHeader(http.StatusOK)
                } else {
-                       http.Error(w, err.Error(), err.HTTPCode)
+                       ke := err.(*KeepError)
+                       http.Error(w, ke.Error(), ke.HTTPCode)
                }
        } else {
                log.Println("error reading request: ", err)
                }
        } else {
                log.Println("error reading request: ", err)
@@ -124,6 +148,136 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
        }
 }
 
        }
 }
 
+func IndexHandler(w http.ResponseWriter, req *http.Request) {
+       prefix := mux.Vars(req)["prefix"]
+
+       index := IndexLocators(prefix)
+       w.Write([]byte(index))
+}
+
+// StatusHandler
+//     Responds to /status.json requests with the current node status,
+//     described in a JSON structure.
+//
+//     The data given in a status.json response includes:
+//        time - the time the status was last updated
+//        df   - the output of the most recent `df --block-size=1k`
+//        disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d)
+//        dirs - an object describing Keep volumes, keyed on Keep volume dirs,
+//          each value is an object describing the status of that volume
+//            * status ("full [timestamp]" or "ok [timestamp]")
+//            * last_error
+//            * last_error_time
+//
+type VolumeStatus struct {
+       Space       string
+       LastErr     string
+       LastErrTime time.Time
+}
+
+type NodeStatus struct {
+       LastUpdate time.Time
+       DfOutput   string
+       DiskDev    []string
+       Volumes    map[string]VolumeStatus
+}
+
+func StatusHandler(w http.ResponseWriter, req *http.Request) {
+       st := new(NodeStatus)
+       st.LastUpdate = time.Now()
+
+       // Get a list of disk devices on this system.
+       st.DiskDev = make([]string, 1)
+       if devdir, err := os.Open("/dev"); err != nil {
+               log.Printf("StatusHandler: opening /dev: %s\n", err)
+       } else {
+               devs, err := devdir.Readdirnames(0)
+               if err == nil {
+                       for _, d := range devs {
+                               if strings.HasPrefix(d, "sd") ||
+                                       strings.HasPrefix(d, "hd") ||
+                                       strings.HasPrefix(d, "xvd") {
+                                       st.DiskDev = append(st.DiskDev, d)
+                               }
+                       }
+               } else {
+                       log.Printf("Readdirnames: %s", err)
+               }
+       }
+
+       for _, vol := range KeepVolumes {
+               st.Volumes[vol] = GetVolumeStatus(vol)
+       }
+}
+
+// GetVolumeStatus
+//     Returns a VolumeStatus describing the requested volume.
+func GetVolumeStatus(volume string) VolumeStatus {
+       var isfull, lasterr string
+       var lasterrtime time.Time
+
+       if IsFull(volume) {
+               isfull = fmt.Sprintf("full %d", time.Now().Unix())
+       } else {
+               isfull = fmt.Sprintf("ok %d", time.Now().Unix())
+       }
+
+       // Not implemented yet
+       lasterr = ""
+       lasterrtime = time.Unix(0, 0)
+
+       return VolumeStatus{isfull, lasterr, lasterrtime}
+}
+
+// IndexLocators
+//     Returns a string containing a list of locator ids found on this
+//     Keep server.  If {prefix} is given, return only those locator
+//     ids that begin with the given prefix string.
+//
+//     The return string consists of a sequence of newline-separated
+//     strings in the format
+//
+//         locator+size modification-time
+//
+//     e.g.:
+//
+//         e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+//         e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+//         e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func IndexLocators(prefix string) string {
+       var output string
+       for _, vol := range KeepVolumes {
+               filepath.Walk(vol,
+                       func(path string, info os.FileInfo, err error) error {
+                               // This WalkFunc inspects each path in the volume
+                               // and prints an index line for all files that begin
+                               // with prefix.
+                               if err != nil {
+                                       log.Printf("IndexHandler: %s: walking to %s: %s",
+                                               vol, path, err)
+                                       return nil
+                               }
+                               locator := filepath.Base(path)
+                               // Skip directories that do not match prefix.
+                               // We know there is nothing interesting inside.
+                               if info.IsDir() &&
+                                       !strings.HasPrefix(locator, prefix) &&
+                                       !strings.HasPrefix(prefix, locator) {
+                                       return filepath.SkipDir
+                               }
+                               // Print filenames beginning with prefix
+                               if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+                                       output = output + fmt.Sprintf(
+                                               "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+                               }
+                               return nil
+                       })
+       }
+
+       return output
+}
+
 func GetBlock(hash string) ([]byte, error) {
        var buf = make([]byte, BLOCKSIZE)
 
 func GetBlock(hash string) ([]byte, error) {
        var buf = make([]byte, BLOCKSIZE)
 
@@ -133,17 +287,21 @@ func GetBlock(hash string) ([]byte, error) {
                var err error
                var nread int
 
                var err error
                var nread int
 
-               path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
+               blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
 
 
-               f, err = os.Open(path)
+               f, err = os.Open(blockFilename)
                if err != nil {
                if err != nil {
-                       log.Printf("%s: opening %s: %s\n", vol, path, err)
+                       if !os.IsNotExist(err) {
+                               // A block is stored on only one Keep disk,
+                               // so os.IsNotExist is expected.  Report any other errors.
+                               log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
+                       }
                        continue
                }
 
                nread, err = f.Read(buf)
                if err != nil {
                        continue
                }
 
                nread, err = f.Read(buf)
                if err != nil {
-                       log.Printf("%s: reading %s: %s\n", vol, path, err)
+                       log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
                        continue
                }
 
                        continue
                }
 
@@ -157,8 +315,8 @@ func GetBlock(hash string) ([]byte, error) {
                        // priority or logged as urgent problems.
                        //
                        log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
                        // priority or logged as urgent problems.
                        //
                        log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
-                               vol, path, filehash)
-                       continue
+                               vol, blockFilename, filehash)
+                       return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
                }
 
                // Success!
                }
 
                // Success!
@@ -166,7 +324,7 @@ func GetBlock(hash string) ([]byte, error) {
        }
 
        log.Printf("%s: not found on any volumes, giving up\n", hash)
        }
 
        log.Printf("%s: not found on any volumes, giving up\n", hash)
-       return buf, &KeepError{404, errors.New("not found: " + hash)}
+       return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
 }
 
 /* PutBlock(block, hash)
 }
 
 /* PutBlock(block, hash)
@@ -175,70 +333,128 @@ func GetBlock(hash string) ([]byte, error) {
    The MD5 checksum of the block must be identical to the content id HASH.
    If not, an error is returned.
 
    The MD5 checksum of the block must be identical to the content id HASH.
    If not, an error is returned.
 
-   PutBlock stores the BLOCK in each of the available Keep volumes.
-   If any volume fails, an error is signaled on the back end.  A write
-   error is returned only if all volumes fail.
+   PutBlock stores the BLOCK on the first Keep volume with free space.
+   A failure code is returned to the user only if all volumes fail.
 
    On success, PutBlock returns nil.
    On failure, it returns a KeepError with one of the following codes:
 
 
    On success, PutBlock returns nil.
    On failure, it returns a KeepError with one of the following codes:
 
+   400 Collision
+          A different block with the same hash already exists on this
+          Keep server.
    401 MD5Fail
    401 MD5Fail
-         -- The MD5 hash of the BLOCK does not match the argument HASH.
+          The MD5 hash of the BLOCK does not match the argument HASH.
    503 Full
    503 Full
-         -- There was not enough space left in any Keep volume to store
-            the object.
+          There was not enough space left in any Keep volume to store
+          the object.
    500 Fail
    500 Fail
-         -- The object could not be stored for some other reason (e.g.
-            all writes failed). The text of the error message should
-            provide as much detail as possible.
+          The object could not be stored for some other reason (e.g.
+          all writes failed). The text of the error message should
+          provide as much detail as possible.
 */
 
 */
 
-func PutBlock(block []byte, hash string) *KeepError {
+func PutBlock(block []byte, hash string) error {
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return &KeepError{401, errors.New("MD5Fail")}
+               return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
        }
 
        }
 
-       // any_success will be set to true upon a successful block write.
-       any_success := false
-       for _, vol := range KeepVolumes {
+       // If we already have a block on disk under this identifier, return
+       // success (but check for MD5 collisions, which may signify on-disk corruption).
+       if oldblock, err := GetBlock(hash); err == nil {
+               if bytes.Compare(block, oldblock) == 0 {
+                       return nil
+               } else {
+                       return &KeepError{ErrCollision, errors.New("Collision")}
+               }
+       } else {
+               ke := err.(*KeepError)
+               if ke.HTTPCode == ErrCorrupt {
+                       return &KeepError{ErrCollision, errors.New("Collision")}
+               }
+       }
 
 
-               bFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
-               if err := os.MkdirAll(path.Dir(bFilename), 0755); err != nil {
+       // Store the block on the first available Keep volume.
+       allFull := true
+       for _, vol := range KeepVolumes {
+               if IsFull(vol) {
+                       continue
+               }
+               allFull = false
+               blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
+               if err := os.MkdirAll(blockDir, 0755); err != nil {
                        log.Printf("%s: could not create directory %s: %s",
                        log.Printf("%s: could not create directory %s: %s",
-                               hash, path.Dir(bFilename), err)
+                               hash, blockDir, err)
                        continue
                }
 
                        continue
                }
 
-               f, err := os.OpenFile(bFilename, os.O_CREATE|os.O_WRONLY, 0644)
+               blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
+
+               f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
                if err != nil {
                if err != nil {
-                       // if the block already exists, just skip to the next volume.
-                       if os.IsExist(err) {
-                               any_success = true
-                               continue
-                       } else {
-                               // Open failed for some other reason.
-                               log.Printf("%s: creatingb %s: %s\n", vol, bFilename, err)
-                               continue
-                       }
+                       log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
+                       continue
                }
 
                if _, err := f.Write(block); err == nil {
                        f.Close()
                }
 
                if _, err := f.Write(block); err == nil {
                        f.Close()
-                       any_success = true
-                       continue
+                       return nil
                } else {
                } else {
-                       log.Printf("%s: writing to %s: %s\n", vol, bFilename, err)
+                       log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
                        continue
                }
        }
 
                        continue
                }
        }
 
-       if any_success {
-               return nil
+       if allFull {
+               log.Printf("all Keep volumes full")
+               return &KeepError{ErrFull, errors.New("Full")}
        } else {
                log.Printf("all Keep volumes failed")
        } else {
                log.Printf("all Keep volumes failed")
-               return &KeepError{500, errors.New("Fail")}
+               return &KeepError{ErrOther, errors.New("Fail")}
+       }
+}
+
+func IsFull(volume string) (isFull bool) {
+       fullSymlink := volume + "/full"
+
+       // Check if the volume has been marked as full in the last hour.
+       if link, err := os.Readlink(fullSymlink); err == nil {
+               if ts, err := strconv.Atoi(link); err == nil {
+                       fulltime := time.Unix(int64(ts), 0)
+                       if time.Since(fulltime).Hours() < 1.0 {
+                               return true
+                       }
+               }
        }
        }
+
+       if avail, err := FreeDiskSpace(volume); err == nil {
+               isFull = avail < MIN_FREE_KILOBYTES
+       } else {
+               log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
+               isFull = false
+       }
+
+       // If the volume is full, timestamp it.
+       if isFull {
+               now := fmt.Sprintf("%d", time.Now().Unix())
+               os.Symlink(now, fullSymlink)
+       }
+       return
+}
+
+// FreeDiskSpace(volume)
+//     Returns the amount of available disk space on VOLUME,
+//     as a number of 1k blocks.
+//
+func FreeDiskSpace(volume string) (free uint64, err error) {
+       var fs syscall.Statfs_t
+       err = syscall.Statfs(volume, &fs)
+       if err == nil {
+               // Statfs output is not guaranteed to measure free
+               // space in terms of 1K blocks.
+               free = fs.Bavail * uint64(fs.Bsize) / 1024
+       }
+       return
 }
 }