X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e23961a316dc64068a6cc9b799d618f021f85ddb..ad6e6f2457f354b666cdc322915d591743a4cd2f:/services/keep/keep.go diff --git a/services/keep/keep.go b/services/keep/keep.go index 0473ac8356..85e2aea5ef 100644 --- a/services/keep/keep.go +++ b/services/keep/keep.go @@ -2,24 +2,70 @@ package main import ( "bufio" + "bytes" "crypto/md5" "errors" + "flag" "fmt" "github.com/gorilla/mux" + "io/ioutil" "log" "net/http" "os" + "strconv" "strings" + "syscall" + "time" ) -const DEFAULT_PORT = 25107 +// Default TCP address on which to listen for requests. +const DEFAULT_ADDR = ":25107" + +// A Keep "block" is 64MB. const BLOCKSIZE = 64 * 1024 * 1024 +// A Keep volume must have at least MIN_FREE_KILOBYTES available +// in order to permit writes. +const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024 + +var PROC_MOUNTS = "/proc/mounts" + var KeepVolumes []string +type KeepError struct { + HTTPCode int + Err error +} + +const ( + ErrCollision = 400 + ErrMD5Fail = 401 + ErrCorrupt = 402 + ErrNotFound = 404 + ErrOther = 500 + ErrFull = 503 +) + +func (e *KeepError) Error() string { + return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error()) +} + func main() { + // Parse command-line flags. + var listen, keepvols string + flag.StringVar(&listen, "listen", DEFAULT_ADDR, + "interface on which to listen for requests") + flag.StringVar(&keepvols, "volumes", "", + "comma-separated list of directories to use for Keep volumes") + flag.Parse() + // Look for local keep volumes. - KeepVolumes = FindKeepVolumes() + if keepvols == "" { + KeepVolumes = FindKeepVolumes() + } else { + KeepVolumes = strings.Split(keepvols, ",") + } + if len(KeepVolumes) == 0 { log.Fatal("could not find any keep volumes") } @@ -34,14 +80,44 @@ func main() { // rest := mux.NewRouter() rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET") + rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT") // Tell the built-in HTTP server to direct all requests to the REST // router. http.Handle("/", rest) // Start listening for requests. - port := fmt.Sprintf(":%d", DEFAULT_PORT) - http.ListenAndServe(port, nil) + http.ListenAndServe(listen, nil) +} + +// FindKeepVolumes +// Returns a list of Keep volumes mounted on this system. +// +// A Keep volume is a normal or tmpfs volume with a /keep +// directory at the top level of the mount point. +// +func FindKeepVolumes() []string { + vols := make([]string, 0) + + if f, err := os.Open(PROC_MOUNTS); err != nil { + log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err) + } else { + scanner := bufio.NewScanner(f) + for scanner.Scan() { + args := strings.Fields(scanner.Text()) + dev, mount := args[0], args[1] + if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" { + keep := mount + "/keep" + if st, err := os.Stat(keep); err == nil && st.IsDir() { + vols = append(vols, keep) + } + } + } + if err := scanner.Err(); err != nil { + log.Fatal(err) + } + } + return vols } func GetBlockHandler(w http.ResponseWriter, req *http.Request) { @@ -61,6 +137,27 @@ func GetBlockHandler(w http.ResponseWriter, req *http.Request) { return } +func PutBlockHandler(w http.ResponseWriter, req *http.Request) { + hash := mux.Vars(req)["hash"] + + // Read the block data to be stored. + // TODO(twp): decide what to do when the input stream contains + // more than BLOCKSIZE bytes. + // + buf := make([]byte, BLOCKSIZE) + if nread, err := req.Body.Read(buf); err == nil { + if err := PutBlock(buf[:nread], hash); err == nil { + w.WriteHeader(http.StatusOK) + } else { + ke := err.(*KeepError) + http.Error(w, ke.Error(), ke.HTTPCode) + } + } else { + log.Println("error reading request: ", err) + http.Error(w, err.Error(), 500) + } +} + func GetBlock(hash string) ([]byte, error) { var buf = make([]byte, BLOCKSIZE) @@ -70,68 +167,180 @@ func GetBlock(hash string) ([]byte, error) { var err error var nread int - path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash) + blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash) - f, err = os.Open(path) + f, err = os.Open(blockFilename) if err != nil { - log.Printf("%s: opening %s: %s\n", vol, path, err) + if !os.IsNotExist(err) { + // A block is stored on only one Keep disk, + // so os.IsNotExist is expected. Report any other errors. + log.Printf("%s: opening %s: %s\n", vol, blockFilename, err) + } continue } nread, err = f.Read(buf) if err != nil { - log.Printf("%s: reading %s: %s\n", vol, path, err) + log.Printf("%s: reading %s: %s\n", vol, blockFilename, err) continue } // Double check the file checksum. // - // TODO(twp): this condition probably represents a bad disk and - // should raise major alarm bells for an administrator: e.g. - // they should be sent directly to an event manager at high - // priority or logged as urgent problems. - // filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread])) if filehash != hash { + // TODO(twp): this condition probably represents a bad disk and + // should raise major alarm bells for an administrator: e.g. + // they should be sent directly to an event manager at high + // priority or logged as urgent problems. + // log.Printf("%s: checksum mismatch: %s (actual hash %s)\n", - vol, path, filehash) - continue + vol, blockFilename, filehash) + return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")} } // Success! return buf[:nread], nil } - log.Printf("%s: all keep volumes failed, giving up\n", hash) - return buf, errors.New("not found: " + hash) + log.Printf("%s: not found on any volumes, giving up\n", hash) + return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)} } -// FindKeepVolumes -// Returns a list of Keep volumes mounted on this system. -// -// A Keep volume is a normal or tmpfs volume with a /keep -// directory at the top level of the mount point. -// -func FindKeepVolumes() []string { - vols := make([]string, 0) +/* PutBlock(block, hash) + Stores the BLOCK (identified by the content id HASH) in Keep. + + The MD5 checksum of the block must be identical to the content id HASH. + If not, an error is returned. + + PutBlock stores the BLOCK on the first Keep volume with free space. + A failure code is returned to the user only if all volumes fail. + + On success, PutBlock returns nil. + On failure, it returns a KeepError with one of the following codes: + + 400 Collision + A different block with the same hash already exists on this + Keep server. + 401 MD5Fail + The MD5 hash of the BLOCK does not match the argument HASH. + 503 Full + There was not enough space left in any Keep volume to store + the object. + 500 Fail + The object could not be stored for some other reason (e.g. + all writes failed). The text of the error message should + provide as much detail as possible. +*/ + +func PutBlock(block []byte, hash string) error { + // Check that BLOCK's checksum matches HASH. + blockhash := fmt.Sprintf("%x", md5.Sum(block)) + if blockhash != hash { + log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash) + return &KeepError{ErrMD5Fail, errors.New("MD5Fail")} + } - if f, err := os.Open("/proc/mounts"); err != nil { - log.Fatal("could not read /proc/mounts: ", err) + // If we already have a block on disk under this identifier, return + // success (but check for MD5 collisions). + // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound. + // In either case, we want to write our new (good) block to disk, so there is + // nothing special to do if err != nil. + if oldblock, err := GetBlock(hash); err == nil { + if bytes.Compare(block, oldblock) == 0 { + return nil + } else { + return &KeepError{ErrCollision, errors.New("Collision")} + } + } + + // Store the block on the first available Keep volume. + allFull := true + for _, vol := range KeepVolumes { + if IsFull(vol) { + continue + } + allFull = false + blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3]) + if err := os.MkdirAll(blockDir, 0755); err != nil { + log.Printf("%s: could not create directory %s: %s", + hash, blockDir, err) + continue + } + + tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash) + if tmperr != nil { + log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr) + continue + } + blockFilename := fmt.Sprintf("%s/%s", blockDir, hash) + + if _, err := tmpfile.Write(block); err != nil { + log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err) + continue + } + if err := tmpfile.Close(); err != nil { + log.Printf("closing %s: %s\n", tmpfile.Name(), err) + os.Remove(tmpfile.Name()) + continue + } + if err := os.Rename(tmpfile.Name(), blockFilename); err != nil { + log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err) + os.Remove(tmpfile.Name()) + continue + } + return nil + } + + if allFull { + log.Printf("all Keep volumes full") + return &KeepError{ErrFull, errors.New("Full")} } else { - scanner := bufio.NewScanner(f) - for scanner.Scan() { - args := strings.Fields(scanner.Text()) - dev, mount := args[0], args[1] - if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" { - keep := mount + "/keep" - if st, err := os.Stat(keep); err == nil && st.IsDir() { - vols = append(vols, keep) - } + log.Printf("all Keep volumes failed") + return &KeepError{ErrOther, errors.New("Fail")} + } +} + +func IsFull(volume string) (isFull bool) { + fullSymlink := volume + "/full" + + // Check if the volume has been marked as full in the last hour. + if link, err := os.Readlink(fullSymlink); err == nil { + if ts, err := strconv.Atoi(link); err == nil { + fulltime := time.Unix(int64(ts), 0) + if time.Since(fulltime).Hours() < 1.0 { + return true } } - if err := scanner.Err(); err != nil { - log.Fatal(err) - } } - return vols + + if avail, err := FreeDiskSpace(volume); err == nil { + isFull = avail < MIN_FREE_KILOBYTES + } else { + log.Printf("%s: FreeDiskSpace: %s\n", volume, err) + isFull = false + } + + // If the volume is full, timestamp it. + if isFull { + now := fmt.Sprintf("%d", time.Now().Unix()) + os.Symlink(now, fullSymlink) + } + return +} + +// FreeDiskSpace(volume) +// Returns the amount of available disk space on VOLUME, +// as a number of 1k blocks. +// +func FreeDiskSpace(volume string) (free uint64, err error) { + var fs syscall.Statfs_t + err = syscall.Statfs(volume, &fs) + if err == nil { + // Statfs output is not guaranteed to measure free + // space in terms of 1K blocks. + free = fs.Bavail * uint64(fs.Bsize) / 1024 + } + + return }