Merge branch '2449-keep-write-blocks' into 2449-keep-flags
[arvados.git] / services / keep / keep.go
index daa967bace5aaf4ba29072f7304e51a38a1ca689..85e2aea5ef2007408cadbfcc1cc555ab6867221e 100644 (file)
@@ -2,19 +2,32 @@ package main
 
 import (
        "bufio"
+       "bytes"
        "crypto/md5"
        "errors"
+       "flag"
        "fmt"
        "github.com/gorilla/mux"
+       "io/ioutil"
        "log"
        "net/http"
        "os"
+       "strconv"
        "strings"
+       "syscall"
+       "time"
 )
 
-const DEFAULT_PORT = 25107
+// Default TCP address on which to listen for requests.
+const DEFAULT_ADDR = ":25107"
+
+// A Keep "block" is 64MB.
 const BLOCKSIZE = 64 * 1024 * 1024
 
+// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// in order to permit writes.
+const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+
 var PROC_MOUNTS = "/proc/mounts"
 
 var KeepVolumes []string
@@ -24,13 +37,35 @@ type KeepError struct {
        Err      error
 }
 
+const (
+       ErrCollision = 400
+       ErrMD5Fail   = 401
+       ErrCorrupt   = 402
+       ErrNotFound  = 404
+       ErrOther     = 500
+       ErrFull      = 503
+)
+
 func (e *KeepError) Error() string {
        return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
 }
 
 func main() {
+       // Parse command-line flags.
+       var listen, keepvols string
+       flag.StringVar(&listen, "listen", DEFAULT_ADDR,
+               "interface on which to listen for requests")
+       flag.StringVar(&keepvols, "volumes", "",
+               "comma-separated list of directories to use for Keep volumes")
+       flag.Parse()
+
        // Look for local keep volumes.
-       KeepVolumes = FindKeepVolumes()
+       if keepvols == "" {
+               KeepVolumes = FindKeepVolumes()
+       } else {
+               KeepVolumes = strings.Split(keepvols, ",")
+       }
+
        if len(KeepVolumes) == 0 {
                log.Fatal("could not find any keep volumes")
        }
@@ -52,8 +87,7 @@ func main() {
        http.Handle("/", rest)
 
        // Start listening for requests.
-       port := fmt.Sprintf(":%d", DEFAULT_PORT)
-       http.ListenAndServe(port, nil)
+       http.ListenAndServe(listen, nil)
 }
 
 // FindKeepVolumes
@@ -162,7 +196,7 @@ func GetBlock(hash string) ([]byte, error) {
                        //
                        log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
                                vol, blockFilename, filehash)
-                       continue
+                       return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
                }
 
                // Success!
@@ -170,7 +204,7 @@ func GetBlock(hash string) ([]byte, error) {
        }
 
        log.Printf("%s: not found on any volumes, giving up\n", hash)
-       return buf, &KeepError{404, errors.New("not found: " + hash)}
+       return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
 }
 
 /* PutBlock(block, hash)
@@ -185,15 +219,18 @@ func GetBlock(hash string) ([]byte, error) {
    On success, PutBlock returns nil.
    On failure, it returns a KeepError with one of the following codes:
 
+   400 Collision
+          A different block with the same hash already exists on this
+          Keep server.
    401 MD5Fail
-         -- The MD5 hash of the BLOCK does not match the argument HASH.
+          The MD5 hash of the BLOCK does not match the argument HASH.
    503 Full
-         -- There was not enough space left in any Keep volume to store
-            the object.
+          There was not enough space left in any Keep volume to store
+          the object.
    500 Fail
-         -- The object could not be stored for some other reason (e.g.
-            all writes failed). The text of the error message should
-            provide as much detail as possible.
+          The object could not be stored for some other reason (e.g.
+          all writes failed). The text of the error message should
+          provide as much detail as possible.
 */
 
 func PutBlock(block []byte, hash string) error {
@@ -201,13 +238,29 @@ func PutBlock(block []byte, hash string) error {
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return &KeepError{401, errors.New("MD5Fail")}
+               return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
        }
 
-       for _, vol := range KeepVolumes {
-
-               // TODO(twp): check for a full volume here before trying to write.
+       // If we already have a block on disk under this identifier, return
+       // success (but check for MD5 collisions).
+       // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
+       // In either case, we want to write our new (good) block to disk, so there is
+       // nothing special to do if err != nil.
+       if oldblock, err := GetBlock(hash); err == nil {
+               if bytes.Compare(block, oldblock) == 0 {
+                       return nil
+               } else {
+                       return &KeepError{ErrCollision, errors.New("Collision")}
+               }
+       }
 
+       // Store the block on the first available Keep volume.
+       allFull := true
+       for _, vol := range KeepVolumes {
+               if IsFull(vol) {
+                       continue
+               }
+               allFull = false
                blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
                if err := os.MkdirAll(blockDir, 0755); err != nil {
                        log.Printf("%s: could not create directory %s: %s",
@@ -215,32 +268,79 @@ func PutBlock(block []byte, hash string) error {
                        continue
                }
 
-               blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
-               f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
-               if err != nil {
-                       // if the block already exists, just return success.
-                       // TODO(twp): should we check here whether the file on disk
-                       // matches the file we were asked to store?
-                       if os.IsExist(err) {
-                               return nil
-                       } else {
-                               // Open failed for some other reason.
-                               log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
-                               continue
-                       }
+               tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
+               if tmperr != nil {
+                       log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
+                       continue
                }
+               blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
 
-               if _, err := f.Write(block); err == nil {
-                       f.Close()
-                       return nil
-               } else {
+               if _, err := tmpfile.Write(block); err != nil {
                        log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
                        continue
                }
+               if err := tmpfile.Close(); err != nil {
+                       log.Printf("closing %s: %s\n", tmpfile.Name(), err)
+                       os.Remove(tmpfile.Name())
+                       continue
+               }
+               if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
+                       log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+                       os.Remove(tmpfile.Name())
+                       continue
+               }
+               return nil
        }
 
-       // All volumes failed; report the failure and return an error.
-       //
-       log.Printf("all Keep volumes failed")
-       return &KeepError{500, errors.New("Fail")}
+       if allFull {
+               log.Printf("all Keep volumes full")
+               return &KeepError{ErrFull, errors.New("Full")}
+       } else {
+               log.Printf("all Keep volumes failed")
+               return &KeepError{ErrOther, errors.New("Fail")}
+       }
+}
+
+func IsFull(volume string) (isFull bool) {
+       fullSymlink := volume + "/full"
+
+       // Check if the volume has been marked as full in the last hour.
+       if link, err := os.Readlink(fullSymlink); err == nil {
+               if ts, err := strconv.Atoi(link); err == nil {
+                       fulltime := time.Unix(int64(ts), 0)
+                       if time.Since(fulltime).Hours() < 1.0 {
+                               return true
+                       }
+               }
+       }
+
+       if avail, err := FreeDiskSpace(volume); err == nil {
+               isFull = avail < MIN_FREE_KILOBYTES
+       } else {
+               log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
+               isFull = false
+       }
+
+       // If the volume is full, timestamp it.
+       if isFull {
+               now := fmt.Sprintf("%d", time.Now().Unix())
+               os.Symlink(now, fullSymlink)
+       }
+       return
+}
+
+// FreeDiskSpace(volume)
+//     Returns the amount of available disk space on VOLUME,
+//     as a number of 1k blocks.
+//
+func FreeDiskSpace(volume string) (free uint64, err error) {
+       var fs syscall.Statfs_t
+       err = syscall.Statfs(volume, &fs)
+       if err == nil {
+               // Statfs output is not guaranteed to measure free
+               // space in terms of 1K blocks.
+               free = fs.Bavail * uint64(fs.Bsize) / 1024
+       }
+
+       return
 }