import (
"bufio"
+ "bytes"
"crypto/md5"
"errors"
+ "flag"
"fmt"
"github.com/gorilla/mux"
+ "io/ioutil"
"log"
"net/http"
"os"
+ "strconv"
"strings"
+ "syscall"
+ "time"
)
-const DEFAULT_PORT = 25107
+// Default TCP address on which to listen for requests.
+const DEFAULT_ADDR = ":25107"
+
+// A Keep "block" is 64MB.
const BLOCKSIZE = 64 * 1024 * 1024
+// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// in order to permit writes.
+const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+
var PROC_MOUNTS = "/proc/mounts"
var KeepVolumes []string
Err error
}
+const (
+ ErrCollision = 400
+ ErrMD5Fail = 401
+ ErrCorrupt = 402
+ ErrNotFound = 404
+ ErrOther = 500
+ ErrFull = 503
+)
+
func (e *KeepError) Error() string {
return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
}
func main() {
+ // Parse command-line flags.
+ var listen, keepvols string
+ flag.StringVar(&listen, "listen", DEFAULT_ADDR,
+ "interface on which to listen for requests")
+ flag.StringVar(&keepvols, "volumes", "",
+ "comma-separated list of directories to use for Keep volumes")
+ flag.Parse()
+
// Look for local keep volumes.
- KeepVolumes = FindKeepVolumes()
+ if keepvols == "" {
+ KeepVolumes = FindKeepVolumes()
+ } else {
+ KeepVolumes = strings.Split(keepvols, ",")
+ }
+
if len(KeepVolumes) == 0 {
log.Fatal("could not find any keep volumes")
}
http.Handle("/", rest)
// Start listening for requests.
- port := fmt.Sprintf(":%d", DEFAULT_PORT)
- http.ListenAndServe(port, nil)
+ http.ListenAndServe(listen, nil)
}
// FindKeepVolumes
//
log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
vol, blockFilename, filehash)
- continue
+ return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
}
// Success!
}
log.Printf("%s: not found on any volumes, giving up\n", hash)
- return buf, &KeepError{404, errors.New("not found: " + hash)}
+ return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
}
/* PutBlock(block, hash)
On success, PutBlock returns nil.
On failure, it returns a KeepError with one of the following codes:
+ 400 Collision
+ A different block with the same hash already exists on this
+ Keep server.
401 MD5Fail
- -- The MD5 hash of the BLOCK does not match the argument HASH.
+ The MD5 hash of the BLOCK does not match the argument HASH.
503 Full
- -- There was not enough space left in any Keep volume to store
- the object.
+ There was not enough space left in any Keep volume to store
+ the object.
500 Fail
- -- The object could not be stored for some other reason (e.g.
- all writes failed). The text of the error message should
- provide as much detail as possible.
+ The object could not be stored for some other reason (e.g.
+ all writes failed). The text of the error message should
+ provide as much detail as possible.
*/
func PutBlock(block []byte, hash string) error {
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return &KeepError{401, errors.New("MD5Fail")}
+ return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
}
- for _, vol := range KeepVolumes {
-
- // TODO(twp): check for a full volume here before trying to write.
+ // If we already have a block on disk under this identifier, return
+ // success (but check for MD5 collisions).
+ // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
+ // In either case, we want to write our new (good) block to disk, so there is
+ // nothing special to do if err != nil.
+ if oldblock, err := GetBlock(hash); err == nil {
+ if bytes.Compare(block, oldblock) == 0 {
+ return nil
+ } else {
+ return &KeepError{ErrCollision, errors.New("Collision")}
+ }
+ }
+ // Store the block on the first available Keep volume.
+ allFull := true
+ for _, vol := range KeepVolumes {
+ if IsFull(vol) {
+ continue
+ }
+ allFull = false
blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
if err := os.MkdirAll(blockDir, 0755); err != nil {
log.Printf("%s: could not create directory %s: %s",
continue
}
- blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
- f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
- if err != nil {
- // if the block already exists, just return success.
- // TODO(twp): should we check here whether the file on disk
- // matches the file we were asked to store?
- if os.IsExist(err) {
- return nil
- } else {
- // Open failed for some other reason.
- log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
- continue
- }
+ tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
+ if tmperr != nil {
+ log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
+ continue
}
+ blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
- if _, err := f.Write(block); err == nil {
- f.Close()
- return nil
- } else {
+ if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
continue
}
+ if err := tmpfile.Close(); err != nil {
+ log.Printf("closing %s: %s\n", tmpfile.Name(), err)
+ os.Remove(tmpfile.Name())
+ continue
+ }
+ if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
+ log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+ os.Remove(tmpfile.Name())
+ continue
+ }
+ return nil
}
- // All volumes failed; report the failure and return an error.
- //
- log.Printf("all Keep volumes failed")
- return &KeepError{500, errors.New("Fail")}
+ if allFull {
+ log.Printf("all Keep volumes full")
+ return &KeepError{ErrFull, errors.New("Full")}
+ } else {
+ log.Printf("all Keep volumes failed")
+ return &KeepError{ErrOther, errors.New("Fail")}
+ }
+}
+
+func IsFull(volume string) (isFull bool) {
+ fullSymlink := volume + "/full"
+
+ // Check if the volume has been marked as full in the last hour.
+ if link, err := os.Readlink(fullSymlink); err == nil {
+ if ts, err := strconv.Atoi(link); err == nil {
+ fulltime := time.Unix(int64(ts), 0)
+ if time.Since(fulltime).Hours() < 1.0 {
+ return true
+ }
+ }
+ }
+
+ if avail, err := FreeDiskSpace(volume); err == nil {
+ isFull = avail < MIN_FREE_KILOBYTES
+ } else {
+ log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
+ isFull = false
+ }
+
+ // If the volume is full, timestamp it.
+ if isFull {
+ now := fmt.Sprintf("%d", time.Now().Unix())
+ os.Symlink(now, fullSymlink)
+ }
+ return
+}
+
+// FreeDiskSpace(volume)
+// Returns the amount of available disk space on VOLUME,
+// as a number of 1k blocks.
+//
+func FreeDiskSpace(volume string) (free uint64, err error) {
+ var fs syscall.Statfs_t
+ err = syscall.Statfs(volume, &fs)
+ if err == nil {
+ // Statfs output is not guaranteed to measure free
+ // space in terms of 1K blocks.
+ free = fs.Bavail * uint64(fs.Bsize) / 1024
+ }
+
+ return
}