10 "github.com/gorilla/mux"
21 // Default TCP address on which to listen for requests.
22 const DEFAULT_ADDR = ":25107"
24 // A Keep "block" is 64MB.
25 const BLOCKSIZE = 64 * 1024 * 1024
27 // A Keep volume must have at least MIN_FREE_KILOBYTES available
28 // in order to permit writes.
29 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
31 var PROC_MOUNTS = "/proc/mounts"
33 var KeepVolumes []string
35 type KeepError struct {
49 func (e *KeepError) Error() string {
50 return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
54 // Parse command-line flags.
55 var listen, keepvols string
56 flag.StringVar(&listen, "listen", DEFAULT_ADDR,
57 "interface on which to listen for requests")
58 flag.StringVar(&keepvols, "volumes", "",
59 "comma-separated list of directories to use for Keep volumes")
62 // Look for local keep volumes.
64 KeepVolumes = FindKeepVolumes()
66 KeepVolumes = strings.Split(keepvols, ",")
69 if len(KeepVolumes) == 0 {
70 log.Fatal("could not find any keep volumes")
72 for _, v := range KeepVolumes {
73 log.Println("keep volume:", v)
76 // Set up REST handlers.
78 // Start with a router that will route each URL path to an
79 // appropriate handler.
81 rest := mux.NewRouter()
82 rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
83 rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
85 // Tell the built-in HTTP server to direct all requests to the REST
87 http.Handle("/", rest)
89 // Start listening for requests.
90 http.ListenAndServe(listen, nil)
94 // Returns a list of Keep volumes mounted on this system.
96 // A Keep volume is a normal or tmpfs volume with a /keep
97 // directory at the top level of the mount point.
99 func FindKeepVolumes() []string {
100 vols := make([]string, 0)
102 if f, err := os.Open(PROC_MOUNTS); err != nil {
103 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
105 scanner := bufio.NewScanner(f)
107 args := strings.Fields(scanner.Text())
108 dev, mount := args[0], args[1]
109 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
110 keep := mount + "/keep"
111 if st, err := os.Stat(keep); err == nil && st.IsDir() {
112 vols = append(vols, keep)
116 if err := scanner.Err(); err != nil {
123 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
124 hash := mux.Vars(req)["hash"]
126 block, err := GetBlock(hash)
128 http.Error(w, err.Error(), 404)
132 _, err = w.Write(block)
134 log.Printf("GetBlockHandler: writing response: %s", err)
140 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
141 hash := mux.Vars(req)["hash"]
143 // Read the block data to be stored.
144 // TODO(twp): decide what to do when the input stream contains
145 // more than BLOCKSIZE bytes.
147 buf := make([]byte, BLOCKSIZE)
148 if nread, err := req.Body.Read(buf); err == nil {
149 if err := PutBlock(buf[:nread], hash); err == nil {
150 w.WriteHeader(http.StatusOK)
152 ke := err.(*KeepError)
153 http.Error(w, ke.Error(), ke.HTTPCode)
156 log.Println("error reading request: ", err)
157 http.Error(w, err.Error(), 500)
161 func GetBlock(hash string) ([]byte, error) {
162 var buf = make([]byte, BLOCKSIZE)
164 // Attempt to read the requested hash from a keep volume.
165 for _, vol := range KeepVolumes {
170 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
172 f, err = os.Open(blockFilename)
174 if !os.IsNotExist(err) {
175 // A block is stored on only one Keep disk,
176 // so os.IsNotExist is expected. Report any other errors.
177 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
182 nread, err = f.Read(buf)
184 log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
188 // Double check the file checksum.
190 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
191 if filehash != hash {
192 // TODO(twp): this condition probably represents a bad disk and
193 // should raise major alarm bells for an administrator: e.g.
194 // they should be sent directly to an event manager at high
195 // priority or logged as urgent problems.
197 log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
198 vol, blockFilename, filehash)
199 return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
203 return buf[:nread], nil
206 log.Printf("%s: not found on any volumes, giving up\n", hash)
207 return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
210 /* PutBlock(block, hash)
211 Stores the BLOCK (identified by the content id HASH) in Keep.
213 The MD5 checksum of the block must be identical to the content id HASH.
214 If not, an error is returned.
216 PutBlock stores the BLOCK on the first Keep volume with free space.
217 A failure code is returned to the user only if all volumes fail.
219 On success, PutBlock returns nil.
220 On failure, it returns a KeepError with one of the following codes:
223 A different block with the same hash already exists on this
226 The MD5 hash of the BLOCK does not match the argument HASH.
228 There was not enough space left in any Keep volume to store
231 The object could not be stored for some other reason (e.g.
232 all writes failed). The text of the error message should
233 provide as much detail as possible.
236 func PutBlock(block []byte, hash string) error {
237 // Check that BLOCK's checksum matches HASH.
238 blockhash := fmt.Sprintf("%x", md5.Sum(block))
239 if blockhash != hash {
240 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
241 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
244 // If we already have a block on disk under this identifier, return
245 // success (but check for MD5 collisions).
246 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
247 // In either case, we want to write our new (good) block to disk, so there is
248 // nothing special to do if err != nil.
249 if oldblock, err := GetBlock(hash); err == nil {
250 if bytes.Compare(block, oldblock) == 0 {
253 return &KeepError{ErrCollision, errors.New("Collision")}
257 // Store the block on the first available Keep volume.
259 for _, vol := range KeepVolumes {
264 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
265 if err := os.MkdirAll(blockDir, 0755); err != nil {
266 log.Printf("%s: could not create directory %s: %s",
271 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
273 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
276 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
278 if _, err := tmpfile.Write(block); err != nil {
279 log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
282 if err := tmpfile.Close(); err != nil {
283 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
284 os.Remove(tmpfile.Name())
287 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
288 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
289 os.Remove(tmpfile.Name())
296 log.Printf("all Keep volumes full")
297 return &KeepError{ErrFull, errors.New("Full")}
299 log.Printf("all Keep volumes failed")
300 return &KeepError{ErrOther, errors.New("Fail")}
304 func IsFull(volume string) (isFull bool) {
305 fullSymlink := volume + "/full"
307 // Check if the volume has been marked as full in the last hour.
308 if link, err := os.Readlink(fullSymlink); err == nil {
309 if ts, err := strconv.Atoi(link); err == nil {
310 fulltime := time.Unix(int64(ts), 0)
311 if time.Since(fulltime).Hours() < 1.0 {
317 if avail, err := FreeDiskSpace(volume); err == nil {
318 isFull = avail < MIN_FREE_KILOBYTES
320 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
324 // If the volume is full, timestamp it.
326 now := fmt.Sprintf("%d", time.Now().Unix())
327 os.Symlink(now, fullSymlink)
332 // FreeDiskSpace(volume)
333 // Returns the amount of available disk space on VOLUME,
334 // as a number of 1k blocks.
336 func FreeDiskSpace(volume string) (free uint64, err error) {
337 var fs syscall.Statfs_t
338 err = syscall.Statfs(volume, &fs)
340 // Statfs output is not guaranteed to measure free
341 // space in terms of 1K blocks.
342 free = fs.Bavail * uint64(fs.Bsize) / 1024