X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6287b16dd4fa87f5828387987514e21e00c33f4c..0a09da628d4d7f1ced2daaa55603bea940b9211c:/services/keep/keep.go diff --git a/services/keep/keep.go b/services/keep/keep.go index 8f639db8e9..b96afff991 100644 --- a/services/keep/keep.go +++ b/services/keep/keep.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "bytes" "crypto/md5" "errors" "fmt" @@ -9,13 +10,23 @@ import ( "log" "net/http" "os" - "path" + "path/filepath" + "strconv" "strings" + "syscall" + "time" ) +// Default TCP port on which to listen for requests. const DEFAULT_PORT = 25107 + +// A Keep "block" is 64MB. const BLOCKSIZE = 64 * 1024 * 1024 +// A Keep volume must have at least MIN_FREE_KILOBYTES available +// in order to permit writes. +const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024 + var PROC_MOUNTS = "/proc/mounts" var KeepVolumes []string @@ -25,6 +36,15 @@ type KeepError struct { Err error } +const ( + ErrCollision = 400 + ErrMD5Fail = 401 + ErrCorrupt = 402 + ErrNotFound = 404 + ErrOther = 500 + ErrFull = 503 +) + func (e *KeepError) Error() string { return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error()) } @@ -45,8 +65,11 @@ func main() { // appropriate handler. // rest := mux.NewRouter() - rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET") - rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT") + rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD") + rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT") + rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD") + rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD") + rest.HandleFunc(`/status\.json`, StatusHandler).Methods("GET", "HEAD") // Tell the built-in HTTP server to direct all requests to the REST // router. @@ -116,7 +139,8 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) { if err := PutBlock(buf[:nread], hash); err == nil { w.WriteHeader(http.StatusOK) } else { - http.Error(w, err.Error(), err.HTTPCode) + ke := err.(*KeepError) + http.Error(w, ke.Error(), ke.HTTPCode) } } else { log.Println("error reading request: ", err) @@ -124,6 +148,136 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) { } } +func IndexHandler(w http.ResponseWriter, req *http.Request) { + prefix := mux.Vars(req)["prefix"] + + index := IndexLocators(prefix) + w.Write([]byte(index)) +} + +// StatusHandler +// Responds to /status.json requests with the current node status, +// described in a JSON structure. +// +// The data given in a status.json response includes: +// time - the time the status was last updated +// df - the output of the most recent `df --block-size=1k` +// disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d) +// dirs - an object describing Keep volumes, keyed on Keep volume dirs, +// each value is an object describing the status of that volume +// * status ("full [timestamp]" or "ok [timestamp]") +// * last_error +// * last_error_time +// +type VolumeStatus struct { + Space string + LastErr string + LastErrTime time.Time +} + +type NodeStatus struct { + LastUpdate time.Time + DfOutput string + DiskDev []string + Volumes map[string]VolumeStatus +} + +func StatusHandler(w http.ResponseWriter, req *http.Request) { + st := new(NodeStatus) + st.LastUpdate = time.Now() + + // Get a list of disk devices on this system. + st.DiskDev = make([]string, 1) + if devdir, err := os.Open("/dev"); err != nil { + log.Printf("StatusHandler: opening /dev: %s\n", err) + } else { + devs, err := devdir.Readdirnames(0) + if err == nil { + for _, d := range devs { + if strings.HasPrefix(d, "sd") || + strings.HasPrefix(d, "hd") || + strings.HasPrefix(d, "xvd") { + st.DiskDev = append(st.DiskDev, d) + } + } + } else { + log.Printf("Readdirnames: %s", err) + } + } + + for _, vol := range KeepVolumes { + st.Volumes[vol] = GetVolumeStatus(vol) + } +} + +// GetVolumeStatus +// Returns a VolumeStatus describing the requested volume. +func GetVolumeStatus(volume string) VolumeStatus { + var isfull, lasterr string + var lasterrtime time.Time + + if IsFull(volume) { + isfull = fmt.Sprintf("full %d", time.Now().Unix()) + } else { + isfull = fmt.Sprintf("ok %d", time.Now().Unix()) + } + + // Not implemented yet + lasterr = "" + lasterrtime = time.Unix(0, 0) + + return VolumeStatus{isfull, lasterr, lasterrtime} +} + +// IndexLocators +// Returns a string containing a list of locator ids found on this +// Keep server. If {prefix} is given, return only those locator +// ids that begin with the given prefix string. +// +// The return string consists of a sequence of newline-separated +// strings in the format +// +// locator+size modification-time +// +// e.g.: +// +// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303 +// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043 +// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 +// +func IndexLocators(prefix string) string { + var output string + for _, vol := range KeepVolumes { + filepath.Walk(vol, + func(path string, info os.FileInfo, err error) error { + // This WalkFunc inspects each path in the volume + // and prints an index line for all files that begin + // with prefix. + if err != nil { + log.Printf("IndexHandler: %s: walking to %s: %s", + vol, path, err) + return nil + } + locator := filepath.Base(path) + // Skip directories that do not match prefix. + // We know there is nothing interesting inside. + if info.IsDir() && + !strings.HasPrefix(locator, prefix) && + !strings.HasPrefix(prefix, locator) { + return filepath.SkipDir + } + // Print filenames beginning with prefix + if !info.IsDir() && strings.HasPrefix(locator, prefix) { + output = output + fmt.Sprintf( + "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix()) + } + return nil + }) + } + + return output +} + func GetBlock(hash string) ([]byte, error) { var buf = make([]byte, BLOCKSIZE) @@ -133,17 +287,21 @@ func GetBlock(hash string) ([]byte, error) { var err error var nread int - path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash) + blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash) - f, err = os.Open(path) + f, err = os.Open(blockFilename) if err != nil { - log.Printf("%s: opening %s: %s\n", vol, path, err) + if !os.IsNotExist(err) { + // A block is stored on only one Keep disk, + // so os.IsNotExist is expected. Report any other errors. + log.Printf("%s: opening %s: %s\n", vol, blockFilename, err) + } continue } nread, err = f.Read(buf) if err != nil { - log.Printf("%s: reading %s: %s\n", vol, path, err) + log.Printf("%s: reading %s: %s\n", vol, blockFilename, err) continue } @@ -157,8 +315,8 @@ func GetBlock(hash string) ([]byte, error) { // priority or logged as urgent problems. // log.Printf("%s: checksum mismatch: %s (actual hash %s)\n", - vol, path, filehash) - continue + vol, blockFilename, filehash) + return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")} } // Success! @@ -166,7 +324,7 @@ func GetBlock(hash string) ([]byte, error) { } log.Printf("%s: not found on any volumes, giving up\n", hash) - return buf, &KeepError{404, errors.New("not found: " + hash)} + return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)} } /* PutBlock(block, hash) @@ -175,70 +333,128 @@ func GetBlock(hash string) ([]byte, error) { The MD5 checksum of the block must be identical to the content id HASH. If not, an error is returned. - PutBlock stores the BLOCK in each of the available Keep volumes. - If any volume fails, an error is signaled on the back end. A write - error is returned only if all volumes fail. + PutBlock stores the BLOCK on the first Keep volume with free space. + A failure code is returned to the user only if all volumes fail. On success, PutBlock returns nil. On failure, it returns a KeepError with one of the following codes: + 400 Collision + A different block with the same hash already exists on this + Keep server. 401 MD5Fail - -- The MD5 hash of the BLOCK does not match the argument HASH. + The MD5 hash of the BLOCK does not match the argument HASH. 503 Full - -- There was not enough space left in any Keep volume to store - the object. + There was not enough space left in any Keep volume to store + the object. 500 Fail - -- The object could not be stored for some other reason (e.g. - all writes failed). The text of the error message should - provide as much detail as possible. + The object could not be stored for some other reason (e.g. + all writes failed). The text of the error message should + provide as much detail as possible. */ -func PutBlock(block []byte, hash string) *KeepError { +func PutBlock(block []byte, hash string) error { // Check that BLOCK's checksum matches HASH. blockhash := fmt.Sprintf("%x", md5.Sum(block)) if blockhash != hash { log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash) - return &KeepError{401, errors.New("MD5Fail")} + return &KeepError{ErrMD5Fail, errors.New("MD5Fail")} } - // any_success will be set to true upon a successful block write. - any_success := false - for _, vol := range KeepVolumes { + // If we already have a block on disk under this identifier, return + // success (but check for MD5 collisions, which may signify on-disk corruption). + if oldblock, err := GetBlock(hash); err == nil { + if bytes.Compare(block, oldblock) == 0 { + return nil + } else { + return &KeepError{ErrCollision, errors.New("Collision")} + } + } else { + ke := err.(*KeepError) + if ke.HTTPCode == ErrCorrupt { + return &KeepError{ErrCollision, errors.New("Collision")} + } + } - bFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash) - if err := os.MkdirAll(path.Dir(bFilename), 0755); err != nil { + // Store the block on the first available Keep volume. + allFull := true + for _, vol := range KeepVolumes { + if IsFull(vol) { + continue + } + allFull = false + blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3]) + if err := os.MkdirAll(blockDir, 0755); err != nil { log.Printf("%s: could not create directory %s: %s", - hash, path.Dir(bFilename), err) + hash, blockDir, err) continue } - f, err := os.OpenFile(bFilename, os.O_CREATE|os.O_WRONLY, 0644) + blockFilename := fmt.Sprintf("%s/%s", blockDir, hash) + + f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - // if the block already exists, just skip to the next volume. - if os.IsExist(err) { - any_success = true - continue - } else { - // Open failed for some other reason. - log.Printf("%s: creatingb %s: %s\n", vol, bFilename, err) - continue - } + log.Printf("%s: creating %s: %s\n", vol, blockFilename, err) + continue } if _, err := f.Write(block); err == nil { f.Close() - any_success = true - continue + return nil } else { - log.Printf("%s: writing to %s: %s\n", vol, bFilename, err) + log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err) continue } } - if any_success { - return nil + if allFull { + log.Printf("all Keep volumes full") + return &KeepError{ErrFull, errors.New("Full")} } else { log.Printf("all Keep volumes failed") - return &KeepError{500, errors.New("Fail")} + return &KeepError{ErrOther, errors.New("Fail")} + } +} + +func IsFull(volume string) (isFull bool) { + fullSymlink := volume + "/full" + + // Check if the volume has been marked as full in the last hour. + if link, err := os.Readlink(fullSymlink); err == nil { + if ts, err := strconv.Atoi(link); err == nil { + fulltime := time.Unix(int64(ts), 0) + if time.Since(fulltime).Hours() < 1.0 { + return true + } + } } + + if avail, err := FreeDiskSpace(volume); err == nil { + isFull = avail < MIN_FREE_KILOBYTES + } else { + log.Printf("%s: FreeDiskSpace: %s\n", volume, err) + isFull = false + } + + // If the volume is full, timestamp it. + if isFull { + now := fmt.Sprintf("%d", time.Now().Unix()) + os.Symlink(now, fullSymlink) + } + return +} + +// FreeDiskSpace(volume) +// Returns the amount of available disk space on VOLUME, +// as a number of 1k blocks. +// +func FreeDiskSpace(volume string) (free uint64, err error) { + var fs syscall.Statfs_t + err = syscall.Statfs(volume, &fs) + if err == nil { + // Statfs output is not guaranteed to measure free + // space in terms of 1K blocks. + free = fs.Bavail * uint64(fs.Bsize) / 1024 + } + return }