10 "github.com/gorilla/mux"
22 // ======================
23 // Configuration settings
25 // TODO(twp): make all of these configurable via command line flags
26 // and/or configuration file settings.
28 // Default TCP address on which to listen for requests.
29 const DEFAULT_ADDR = ":25107"
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
38 var PROC_MOUNTS = "/proc/mounts"
40 var KeepVolumes []string
45 type KeepError struct {
51 CollisionError = &KeepError{400, "Collision"}
52 MD5Error = &KeepError{401, "MD5 Failure"}
53 CorruptError = &KeepError{402, "Corruption"}
54 NotFoundError = &KeepError{404, "Not Found"}
55 GenericError = &KeepError{500, "Fail"}
56 FullError = &KeepError{503, "Full"}
57 TooLongError = &KeepError{504, "Too Long"}
60 func (e *KeepError) Error() string {
64 // This error is returned by ReadAtMost if the available
65 // data exceeds BLOCKSIZE bytes.
66 var ReadErrorTooLong = errors.New("Too long")
69 // Parse command-line flags:
71 // -listen=ipaddr:port
72 // Interface on which to listen for requests. Use :port without
73 // an ipaddr to listen on all network interfaces.
75 // -listen=127.0.0.1:4949
76 // -listen=10.0.1.24:8000
77 // -listen=:25107 (to listen to port 25107 on all interfaces)
80 // A comma-separated list of directories to use as Keep volumes.
82 // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
84 // If -volumes is empty or is not present, Keep will select volumes
85 // by looking at currently mounted filesystems for /keep top-level
88 var listen, keepvols string
89 flag.StringVar(&listen, "listen", DEFAULT_ADDR,
90 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
91 flag.StringVar(&keepvols, "volumes", "",
92 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
95 // Look for local keep volumes.
97 // TODO(twp): decide whether this is desirable default behavior.
98 // In production we may want to require the admin to specify
99 // Keep volumes explicitly.
100 KeepVolumes = FindKeepVolumes()
102 KeepVolumes = strings.Split(keepvols, ",")
105 if len(KeepVolumes) == 0 {
106 log.Fatal("could not find any keep volumes")
108 for _, v := range KeepVolumes {
109 log.Println("keep volume:", v)
112 // Set up REST handlers.
114 // Start with a router that will route each URL path to an
115 // appropriate handler.
117 rest := mux.NewRouter()
118 rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
119 rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
121 // Tell the built-in HTTP server to direct all requests to the REST
123 http.Handle("/", rest)
125 // Start listening for requests.
126 http.ListenAndServe(listen, nil)
130 // Returns a list of Keep volumes mounted on this system.
132 // A Keep volume is a normal or tmpfs volume with a /keep
133 // directory at the top level of the mount point.
135 func FindKeepVolumes() []string {
136 vols := make([]string, 0)
138 if f, err := os.Open(PROC_MOUNTS); err != nil {
139 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
141 scanner := bufio.NewScanner(f)
143 args := strings.Fields(scanner.Text())
144 dev, mount := args[0], args[1]
145 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
146 keep := mount + "/keep"
147 if st, err := os.Stat(keep); err == nil && st.IsDir() {
148 vols = append(vols, keep)
152 if err := scanner.Err(); err != nil {
159 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
160 hash := mux.Vars(req)["hash"]
162 block, err := GetBlock(hash)
164 http.Error(w, err.Error(), 404)
168 _, err = w.Write(block)
170 log.Printf("GetBlockHandler: writing response: %s", err)
176 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
177 hash := mux.Vars(req)["hash"]
179 // Read the block data to be stored.
180 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
182 // Note: because req.Body is a buffered Reader, each Read() call will
183 // collect only the data in the network buffer (typically 16384 bytes),
184 // even if it is passed a much larger slice.
186 // Instead, call ReadAtMost to read data from the socket
187 // repeatedly until either EOF or BLOCKSIZE bytes have been read.
189 if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
190 if err := PutBlock(buf, hash); err == nil {
191 w.WriteHeader(http.StatusOK)
193 ke := err.(*KeepError)
194 http.Error(w, ke.Error(), ke.HTTPCode)
197 log.Println("error reading request: ", err)
198 errmsg := err.Error()
199 if err == ReadErrorTooLong {
200 // Use a more descriptive error message that includes
201 // the maximum request size.
202 errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
204 http.Error(w, errmsg, 500)
208 func GetBlock(hash string) ([]byte, error) {
209 var buf = make([]byte, BLOCKSIZE)
211 // Attempt to read the requested hash from a keep volume.
212 for _, vol := range KeepVolumes {
217 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
219 f, err = os.Open(blockFilename)
221 if !os.IsNotExist(err) {
222 // A block is stored on only one Keep disk,
223 // so os.IsNotExist is expected. Report any other errors.
224 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
229 nread, err = f.Read(buf)
231 log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
235 // Double check the file checksum.
237 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
238 if filehash != hash {
239 // TODO(twp): this condition probably represents a bad disk and
240 // should raise major alarm bells for an administrator: e.g.
241 // they should be sent directly to an event manager at high
242 // priority or logged as urgent problems.
244 log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
245 vol, blockFilename, filehash)
246 return buf, CorruptError
250 return buf[:nread], nil
253 log.Printf("%s: not found on any volumes, giving up\n", hash)
254 return buf, NotFoundError
257 /* PutBlock(block, hash)
258 Stores the BLOCK (identified by the content id HASH) in Keep.
260 The MD5 checksum of the block must be identical to the content id HASH.
261 If not, an error is returned.
263 PutBlock stores the BLOCK on the first Keep volume with free space.
264 A failure code is returned to the user only if all volumes fail.
266 On success, PutBlock returns nil.
267 On failure, it returns a KeepError with one of the following codes:
270 A different block with the same hash already exists on this
273 The MD5 hash of the BLOCK does not match the argument HASH.
275 There was not enough space left in any Keep volume to store
278 The object could not be stored for some other reason (e.g.
279 all writes failed). The text of the error message should
280 provide as much detail as possible.
283 func PutBlock(block []byte, hash string) error {
284 // Check that BLOCK's checksum matches HASH.
285 blockhash := fmt.Sprintf("%x", md5.Sum(block))
286 if blockhash != hash {
287 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
291 // If we already have a block on disk under this identifier, return
292 // success (but check for MD5 collisions).
293 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
294 // In either case, we want to write our new (good) block to disk, so there is
295 // nothing special to do if err != nil.
296 if oldblock, err := GetBlock(hash); err == nil {
297 if bytes.Compare(block, oldblock) == 0 {
300 return CollisionError
304 // Store the block on the first available Keep volume.
306 for _, vol := range KeepVolumes {
311 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
312 if err := os.MkdirAll(blockDir, 0755); err != nil {
313 log.Printf("%s: could not create directory %s: %s",
318 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
320 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
323 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
325 if _, err := tmpfile.Write(block); err != nil {
326 log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
329 if err := tmpfile.Close(); err != nil {
330 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
331 os.Remove(tmpfile.Name())
334 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
335 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
336 os.Remove(tmpfile.Name())
343 log.Printf("all Keep volumes full")
346 log.Printf("all Keep volumes failed")
351 func IsFull(volume string) (isFull bool) {
352 fullSymlink := volume + "/full"
354 // Check if the volume has been marked as full in the last hour.
355 if link, err := os.Readlink(fullSymlink); err == nil {
356 if ts, err := strconv.Atoi(link); err == nil {
357 fulltime := time.Unix(int64(ts), 0)
358 if time.Since(fulltime).Hours() < 1.0 {
364 if avail, err := FreeDiskSpace(volume); err == nil {
365 isFull = avail < MIN_FREE_KILOBYTES
367 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
371 // If the volume is full, timestamp it.
373 now := fmt.Sprintf("%d", time.Now().Unix())
374 os.Symlink(now, fullSymlink)
379 // FreeDiskSpace(volume)
380 // Returns the amount of available disk space on VOLUME,
381 // as a number of 1k blocks.
383 func FreeDiskSpace(volume string) (free uint64, err error) {
384 var fs syscall.Statfs_t
385 err = syscall.Statfs(volume, &fs)
387 // Statfs output is not guaranteed to measure free
388 // space in terms of 1K blocks.
389 free = fs.Bavail * uint64(fs.Bsize) / 1024
396 // Reads bytes repeatedly from an io.Reader until either
397 // encountering EOF, or the maxbytes byte limit has been reached.
398 // Returns a byte slice of the bytes that were read.
400 // If the reader contains more than maxbytes, returns a nil slice
403 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
404 // Attempt to read one more byte than maxbytes.
405 lr := io.LimitReader(r, int64(maxbytes+1))
406 buf, err := ioutil.ReadAll(lr)
407 if len(buf) > maxbytes {
408 return nil, ReadErrorTooLong