9 "github.com/gorilla/mux"
19 // Default TCP port on which to listen for requests.
20 const DEFAULT_PORT = 25107
22 // A Keep "block" is 64MB.
23 const BLOCKSIZE = 64 * 1024 * 1024
25 // A Keep volume must have at least MIN_FREE_KILOBYTES available
26 // in order to permit writes.
27 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
29 var PROC_MOUNTS = "/proc/mounts"
31 var KeepVolumes []string
33 type KeepError struct {
47 func (e *KeepError) Error() string {
48 return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
52 // Look for local keep volumes.
53 KeepVolumes = FindKeepVolumes()
54 if len(KeepVolumes) == 0 {
55 log.Fatal("could not find any keep volumes")
57 for _, v := range KeepVolumes {
58 log.Println("keep volume:", v)
61 // Set up REST handlers.
63 // Start with a router that will route each URL path to an
64 // appropriate handler.
66 rest := mux.NewRouter()
67 rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
68 rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
70 // Tell the built-in HTTP server to direct all requests to the REST
72 http.Handle("/", rest)
74 // Start listening for requests.
75 port := fmt.Sprintf(":%d", DEFAULT_PORT)
76 http.ListenAndServe(port, nil)
80 // Returns a list of Keep volumes mounted on this system.
82 // A Keep volume is a normal or tmpfs volume with a /keep
83 // directory at the top level of the mount point.
85 func FindKeepVolumes() []string {
86 vols := make([]string, 0)
88 if f, err := os.Open(PROC_MOUNTS); err != nil {
89 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
91 scanner := bufio.NewScanner(f)
93 args := strings.Fields(scanner.Text())
94 dev, mount := args[0], args[1]
95 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
96 keep := mount + "/keep"
97 if st, err := os.Stat(keep); err == nil && st.IsDir() {
98 vols = append(vols, keep)
102 if err := scanner.Err(); err != nil {
109 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
110 hash := mux.Vars(req)["hash"]
112 block, err := GetBlock(hash)
114 http.Error(w, err.Error(), 404)
118 _, err = w.Write(block)
120 log.Printf("GetBlockHandler: writing response: %s", err)
126 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
127 hash := mux.Vars(req)["hash"]
129 // Read the block data to be stored.
130 // TODO(twp): decide what to do when the input stream contains
131 // more than BLOCKSIZE bytes.
133 buf := make([]byte, BLOCKSIZE)
134 if nread, err := req.Body.Read(buf); err == nil {
135 if err := PutBlock(buf[:nread], hash); err == nil {
136 w.WriteHeader(http.StatusOK)
138 ke := err.(*KeepError)
139 http.Error(w, ke.Error(), ke.HTTPCode)
142 log.Println("error reading request: ", err)
143 http.Error(w, err.Error(), 500)
147 func GetBlock(hash string) ([]byte, error) {
148 var buf = make([]byte, BLOCKSIZE)
150 // Attempt to read the requested hash from a keep volume.
151 for _, vol := range KeepVolumes {
156 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
158 f, err = os.Open(blockFilename)
160 if !os.IsNotExist(err) {
161 // A block is stored on only one Keep disk,
162 // so os.IsNotExist is expected. Report any other errors.
163 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
168 nread, err = f.Read(buf)
170 log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
174 // Double check the file checksum.
176 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
177 if filehash != hash {
178 // TODO(twp): this condition probably represents a bad disk and
179 // should raise major alarm bells for an administrator: e.g.
180 // they should be sent directly to an event manager at high
181 // priority or logged as urgent problems.
183 log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
184 vol, blockFilename, filehash)
185 return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
189 return buf[:nread], nil
192 log.Printf("%s: not found on any volumes, giving up\n", hash)
193 return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
196 /* PutBlock(block, hash)
197 Stores the BLOCK (identified by the content id HASH) in Keep.
199 The MD5 checksum of the block must be identical to the content id HASH.
200 If not, an error is returned.
202 PutBlock stores the BLOCK on the first Keep volume with free space.
203 A failure code is returned to the user only if all volumes fail.
205 On success, PutBlock returns nil.
206 On failure, it returns a KeepError with one of the following codes:
209 A different block with the same hash already exists on this
212 The MD5 hash of the BLOCK does not match the argument HASH.
214 There was not enough space left in any Keep volume to store
217 The object could not be stored for some other reason (e.g.
218 all writes failed). The text of the error message should
219 provide as much detail as possible.
222 func PutBlock(block []byte, hash string) error {
223 // Check that BLOCK's checksum matches HASH.
224 blockhash := fmt.Sprintf("%x", md5.Sum(block))
225 if blockhash != hash {
226 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
227 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
230 // If we already have a block on disk under this identifier, return
231 // success (but check for MD5 collisions, which may signify on-disk corruption).
232 if oldblock, err := GetBlock(hash); err == nil {
233 if bytes.Compare(block, oldblock) == 0 {
236 return &KeepError{ErrCollision, errors.New("Collision")}
239 ke := err.(*KeepError)
240 if ke.HTTPCode == ErrCorrupt {
241 return &KeepError{ErrCollision, errors.New("Collision")}
245 // Store the block on the first available Keep volume.
247 for _, vol := range KeepVolumes {
252 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
253 if err := os.MkdirAll(blockDir, 0755); err != nil {
254 log.Printf("%s: could not create directory %s: %s",
259 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
261 f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
263 log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
267 if _, err := f.Write(block); err == nil {
271 log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
277 log.Printf("all Keep volumes full")
278 return &KeepError{ErrFull, errors.New("Full")}
280 log.Printf("all Keep volumes failed")
281 return &KeepError{ErrOther, errors.New("Fail")}
285 func IsFull(volume string) (isFull bool) {
286 fullSymlink := volume + "/full"
288 // Check if the volume has been marked as full in the last hour.
289 if link, err := os.Readlink(fullSymlink); err == nil {
290 if ts, err := strconv.Atoi(link); err == nil {
291 fulltime := time.Unix(int64(ts), 0)
292 if time.Since(fulltime).Hours() < 1.0 {
298 if avail, err := FreeDiskSpace(volume); err == nil {
299 isFull = avail < MIN_FREE_KILOBYTES
301 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
305 // If the volume is full, timestamp it.
307 now := fmt.Sprintf("%d", time.Now().Unix())
308 os.Symlink(now, fullSymlink)
313 // FreeDiskSpace(volume)
314 // Returns the amount of available disk space on VOLUME,
315 // as a number of 1k blocks.
317 func FreeDiskSpace(volume string) (free int, err error) {
318 // Run df to find out how much disk space is left.
319 cmd := exec.Command("df", "--block-size=1k", volume)
320 stdout, perr := cmd.StdoutPipe()
324 scanner := bufio.NewScanner(stdout)
325 if perr := cmd.Start(); err != nil {
329 scanner.Scan() // skip header line of df output
332 f := strings.Fields(scanner.Text())
333 if avail, err := strconv.Atoi(f[3]); err == nil {
336 err = errors.New("bad df format: " + scanner.Text())
339 // Flush the df output and shut it down cleanly.