11 "github.com/gorilla/mux"
24 // ======================
25 // Configuration settings
27 // TODO(twp): make all of these configurable via command line flags
28 // and/or configuration file settings.
30 // Default TCP address on which to listen for requests.
31 const DEFAULT_ADDR = ":25107"
33 // A Keep "block" is 64MB.
34 const BLOCKSIZE = 64 * 1024 * 1024
36 // A Keep volume must have at least MIN_FREE_KILOBYTES available
37 // in order to permit writes.
38 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
40 var PROC_MOUNTS = "/proc/mounts"
42 var KeepVolumes []string
47 type KeepError struct {
53 CollisionError = &KeepError{400, "Collision"}
54 MD5Error = &KeepError{401, "MD5 Failure"}
55 CorruptError = &KeepError{402, "Corruption"}
56 NotFoundError = &KeepError{404, "Not Found"}
57 GenericError = &KeepError{500, "Fail"}
58 FullError = &KeepError{503, "Full"}
59 TooLongError = &KeepError{504, "Too Long"}
62 func (e *KeepError) Error() string {
66 // This error is returned by ReadAtMost if the available
67 // data exceeds BLOCKSIZE bytes.
68 var ReadErrorTooLong = errors.New("Too long")
71 // Parse command-line flags:
73 // -listen=ipaddr:port
74 // Interface on which to listen for requests. Use :port without
75 // an ipaddr to listen on all network interfaces.
77 // -listen=127.0.0.1:4949
78 // -listen=10.0.1.24:8000
79 // -listen=:25107 (to listen to port 25107 on all interfaces)
82 // A comma-separated list of directories to use as Keep volumes.
84 // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
86 // If -volumes is empty or is not present, Keep will select volumes
87 // by looking at currently mounted filesystems for /keep top-level
90 var listen, keepvols string
91 flag.StringVar(&listen, "listen", DEFAULT_ADDR,
92 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
93 flag.StringVar(&keepvols, "volumes", "",
94 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
97 // Look for local keep volumes.
99 // TODO(twp): decide whether this is desirable default behavior.
100 // In production we may want to require the admin to specify
101 // Keep volumes explicitly.
102 KeepVolumes = FindKeepVolumes()
104 KeepVolumes = strings.Split(keepvols, ",")
107 if len(KeepVolumes) == 0 {
108 log.Fatal("could not find any keep volumes")
110 for _, v := range KeepVolumes {
111 log.Println("keep volume:", v)
114 // Set up REST handlers.
116 // Start with a router that will route each URL path to an
117 // appropriate handler.
119 rest := mux.NewRouter()
120 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
121 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
122 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
123 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
124 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
126 // Tell the built-in HTTP server to direct all requests to the REST
128 http.Handle("/", rest)
130 // Start listening for requests.
131 http.ListenAndServe(listen, nil)
135 // Returns a list of Keep volumes mounted on this system.
137 // A Keep volume is a normal or tmpfs volume with a /keep
138 // directory at the top level of the mount point.
140 func FindKeepVolumes() []string {
141 vols := make([]string, 0)
143 if f, err := os.Open(PROC_MOUNTS); err != nil {
144 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
146 scanner := bufio.NewScanner(f)
148 args := strings.Fields(scanner.Text())
149 dev, mount := args[0], args[1]
150 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
151 keep := mount + "/keep"
152 if st, err := os.Stat(keep); err == nil && st.IsDir() {
153 vols = append(vols, keep)
157 if err := scanner.Err(); err != nil {
164 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
165 hash := mux.Vars(req)["hash"]
167 block, err := GetBlock(hash)
169 http.Error(w, err.Error(), 404)
173 _, err = w.Write(block)
175 log.Printf("GetBlockHandler: writing response: %s", err)
181 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
182 hash := mux.Vars(req)["hash"]
184 // Read the block data to be stored.
185 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
187 // Note: because req.Body is a buffered Reader, each Read() call will
188 // collect only the data in the network buffer (typically 16384 bytes),
189 // even if it is passed a much larger slice.
191 // Instead, call ReadAtMost to read data from the socket
192 // repeatedly until either EOF or BLOCKSIZE bytes have been read.
194 if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
195 if err := PutBlock(buf, hash); err == nil {
196 w.WriteHeader(http.StatusOK)
198 ke := err.(*KeepError)
199 http.Error(w, ke.Error(), ke.HTTPCode)
202 log.Println("error reading request: ", err)
203 errmsg := err.Error()
204 if err == ReadErrorTooLong {
205 // Use a more descriptive error message that includes
206 // the maximum request size.
207 errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
209 http.Error(w, errmsg, 500)
214 // A HandleFunc to address /index and /index/{prefix} requests.
216 func IndexHandler(w http.ResponseWriter, req *http.Request) {
217 prefix := mux.Vars(req)["prefix"]
219 index := IndexLocators(prefix)
220 w.Write([]byte(index))
224 // Responds to /status.json requests with the current node status,
225 // described in a JSON structure.
227 // The data given in a status.json response includes:
228 // time - the time the status was last updated
229 // df - the output of the most recent `df --block-size=1k`
230 // disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d)
231 // dirs - an object describing Keep volumes, keyed on Keep volume dirs,
232 // each value is an object describing the status of that volume
233 // * status ("full [timestamp]" or "ok [timestamp]")
237 type VolumeStatus struct {
238 MountPoint string `json:"mount_point"`
239 BytesFree uint64 `json:"bytes_free"`
240 BytesUsed uint64 `json:"bytes_used"`
243 type NodeStatus struct {
244 Volumes []*VolumeStatus `json:"volumes"`
247 func StatusHandler(w http.ResponseWriter, req *http.Request) {
248 st := GetNodeStatus()
249 if jstat, err := json.Marshal(st); err == nil {
252 log.Printf("json.Marshal: %s\n", err)
253 log.Printf("NodeStatus = %v\n", st)
254 http.Error(w, err.Error(), 500)
259 // Returns a NodeStatus struct describing this Keep
260 // node's current status.
262 func GetNodeStatus() *NodeStatus {
263 st := new(NodeStatus)
265 st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
266 for i, vol := range KeepVolumes {
267 st.Volumes[i] = GetVolumeStatus(vol)
273 // Returns a VolumeStatus describing the requested volume.
275 func GetVolumeStatus(volume string) *VolumeStatus {
276 var fs syscall.Statfs_t
278 err := syscall.Statfs(volume, &fs)
280 log.Printf("GetVolumeStatus: statfs: %s\n", err)
283 // These calculations match the way df calculates disk usage:
284 // "free" space is measured by fs.Bavail, but "used" space
285 // uses fs.Blocks - fs.Bfree.
286 free := fs.Bavail * uint64(fs.Bsize)
287 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
288 return &VolumeStatus{volume, free, used}
292 // Returns a string containing a list of locator ids found on this
293 // Keep server. If {prefix} is given, return only those locator
294 // ids that begin with the given prefix string.
296 // The return string consists of a sequence of newline-separated
297 // strings in the format
299 // locator+size modification-time
303 // e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
304 // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
305 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
307 func IndexLocators(prefix string) string {
309 for _, vol := range KeepVolumes {
311 func(path string, info os.FileInfo, err error) error {
312 // This WalkFunc inspects each path in the volume
313 // and prints an index line for all files that begin
316 log.Printf("IndexHandler: %s: walking to %s: %s",
320 locator := filepath.Base(path)
321 // Skip directories that do not match prefix.
322 // We know there is nothing interesting inside.
324 !strings.HasPrefix(locator, prefix) &&
325 !strings.HasPrefix(prefix, locator) {
326 return filepath.SkipDir
328 // Print filenames beginning with prefix
329 if !info.IsDir() && strings.HasPrefix(locator, prefix) {
330 output = output + fmt.Sprintf(
331 "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
340 func GetBlock(hash string) ([]byte, error) {
341 var buf = make([]byte, BLOCKSIZE)
343 // Attempt to read the requested hash from a keep volume.
344 for _, vol := range KeepVolumes {
349 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
351 f, err = os.Open(blockFilename)
353 if !os.IsNotExist(err) {
354 // A block is stored on only one Keep disk,
355 // so os.IsNotExist is expected. Report any other errors.
356 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
361 nread, err = f.Read(buf)
363 log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
367 // Double check the file checksum.
369 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
370 if filehash != hash {
371 // TODO(twp): this condition probably represents a bad disk and
372 // should raise major alarm bells for an administrator: e.g.
373 // they should be sent directly to an event manager at high
374 // priority or logged as urgent problems.
376 log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
377 vol, blockFilename, filehash)
378 return buf, CorruptError
382 return buf[:nread], nil
385 log.Printf("%s: not found on any volumes, giving up\n", hash)
386 return buf, NotFoundError
389 /* PutBlock(block, hash)
390 Stores the BLOCK (identified by the content id HASH) in Keep.
392 The MD5 checksum of the block must be identical to the content id HASH.
393 If not, an error is returned.
395 PutBlock stores the BLOCK on the first Keep volume with free space.
396 A failure code is returned to the user only if all volumes fail.
398 On success, PutBlock returns nil.
399 On failure, it returns a KeepError with one of the following codes:
402 A different block with the same hash already exists on this
405 The MD5 hash of the BLOCK does not match the argument HASH.
407 There was not enough space left in any Keep volume to store
410 The object could not be stored for some other reason (e.g.
411 all writes failed). The text of the error message should
412 provide as much detail as possible.
415 func PutBlock(block []byte, hash string) error {
416 // Check that BLOCK's checksum matches HASH.
417 blockhash := fmt.Sprintf("%x", md5.Sum(block))
418 if blockhash != hash {
419 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
423 // If we already have a block on disk under this identifier, return
424 // success (but check for MD5 collisions).
425 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
426 // In either case, we want to write our new (good) block to disk, so there is
427 // nothing special to do if err != nil.
428 if oldblock, err := GetBlock(hash); err == nil {
429 if bytes.Compare(block, oldblock) == 0 {
432 return CollisionError
436 // Store the block on the first available Keep volume.
438 for _, vol := range KeepVolumes {
443 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
444 if err := os.MkdirAll(blockDir, 0755); err != nil {
445 log.Printf("%s: could not create directory %s: %s",
450 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
452 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
455 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
457 if _, err := tmpfile.Write(block); err != nil {
458 log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
461 if err := tmpfile.Close(); err != nil {
462 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
463 os.Remove(tmpfile.Name())
466 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
467 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
468 os.Remove(tmpfile.Name())
475 log.Printf("all Keep volumes full")
478 log.Printf("all Keep volumes failed")
483 func IsFull(volume string) (isFull bool) {
484 fullSymlink := volume + "/full"
486 // Check if the volume has been marked as full in the last hour.
487 if link, err := os.Readlink(fullSymlink); err == nil {
488 if ts, err := strconv.Atoi(link); err == nil {
489 fulltime := time.Unix(int64(ts), 0)
490 if time.Since(fulltime).Hours() < 1.0 {
496 if avail, err := FreeDiskSpace(volume); err == nil {
497 isFull = avail < MIN_FREE_KILOBYTES
499 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
503 // If the volume is full, timestamp it.
505 now := fmt.Sprintf("%d", time.Now().Unix())
506 os.Symlink(now, fullSymlink)
511 // FreeDiskSpace(volume)
512 // Returns the amount of available disk space on VOLUME,
513 // as a number of 1k blocks.
515 // TODO(twp): consider integrating this better with
516 // VolumeStatus (e.g. keep a NodeStatus object up-to-date
517 // periodically and use it as the source of info)
519 func FreeDiskSpace(volume string) (free uint64, err error) {
520 var fs syscall.Statfs_t
521 err = syscall.Statfs(volume, &fs)
523 // Statfs output is not guaranteed to measure free
524 // space in terms of 1K blocks.
525 free = fs.Bavail * uint64(fs.Bsize) / 1024
531 // Reads bytes repeatedly from an io.Reader until either
532 // encountering EOF, or the maxbytes byte limit has been reached.
533 // Returns a byte slice of the bytes that were read.
535 // If the reader contains more than maxbytes, returns a nil slice
538 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
539 // Attempt to read one more byte than maxbytes.
540 lr := io.LimitReader(r, int64(maxbytes+1))
541 buf, err := ioutil.ReadAll(lr)
542 if len(buf) > maxbytes {
543 return nil, ReadErrorTooLong