9 "github.com/gorilla/mux"
21 // Default TCP port on which to listen for requests.
22 const DEFAULT_PORT = 25107
24 // A Keep "block" is 64MB.
25 const BLOCKSIZE = 64 * 1024 * 1024
27 // A Keep volume must have at least MIN_FREE_KILOBYTES available
28 // in order to permit writes.
29 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
31 var PROC_MOUNTS = "/proc/mounts"
33 var KeepVolumes []string
35 type KeepError struct {
49 func (e *KeepError) Error() string {
50 return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
54 // Look for local keep volumes.
55 KeepVolumes = FindKeepVolumes()
56 if len(KeepVolumes) == 0 {
57 log.Fatal("could not find any keep volumes")
59 for _, v := range KeepVolumes {
60 log.Println("keep volume:", v)
63 // Set up REST handlers.
65 // Start with a router that will route each URL path to an
66 // appropriate handler.
68 rest := mux.NewRouter()
69 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
70 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
71 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
72 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
73 rest.HandleFunc(`/status\.json`, StatusHandler).Methods("GET", "HEAD")
75 // Tell the built-in HTTP server to direct all requests to the REST
77 http.Handle("/", rest)
79 // Start listening for requests.
80 port := fmt.Sprintf(":%d", DEFAULT_PORT)
81 http.ListenAndServe(port, nil)
85 // Returns a list of Keep volumes mounted on this system.
87 // A Keep volume is a normal or tmpfs volume with a /keep
88 // directory at the top level of the mount point.
90 func FindKeepVolumes() []string {
91 vols := make([]string, 0)
93 if f, err := os.Open(PROC_MOUNTS); err != nil {
94 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
96 scanner := bufio.NewScanner(f)
98 args := strings.Fields(scanner.Text())
99 dev, mount := args[0], args[1]
100 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
101 keep := mount + "/keep"
102 if st, err := os.Stat(keep); err == nil && st.IsDir() {
103 vols = append(vols, keep)
107 if err := scanner.Err(); err != nil {
114 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
115 hash := mux.Vars(req)["hash"]
117 block, err := GetBlock(hash)
119 http.Error(w, err.Error(), 404)
123 _, err = w.Write(block)
125 log.Printf("GetBlockHandler: writing response: %s", err)
131 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
132 hash := mux.Vars(req)["hash"]
134 // Read the block data to be stored.
135 // TODO(twp): decide what to do when the input stream contains
136 // more than BLOCKSIZE bytes.
138 buf := make([]byte, BLOCKSIZE)
139 if nread, err := req.Body.Read(buf); err == nil {
140 if err := PutBlock(buf[:nread], hash); err == nil {
141 w.WriteHeader(http.StatusOK)
143 ke := err.(*KeepError)
144 http.Error(w, ke.Error(), ke.HTTPCode)
147 log.Println("error reading request: ", err)
148 http.Error(w, err.Error(), 500)
152 func IndexHandler(w http.ResponseWriter, req *http.Request) {
153 prefix := mux.Vars(req)["prefix"]
155 index := IndexLocators(prefix)
156 w.Write([]byte(index))
160 // Responds to /status.json requests with the current node status,
161 // described in a JSON structure.
163 // The data given in a status.json response includes:
164 // time - the time the status was last updated
165 // df - the output of the most recent `df --block-size=1k`
166 // disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d)
167 // dirs - an object describing Keep volumes, keyed on Keep volume dirs,
168 // each value is an object describing the status of that volume
169 // * status ("full [timestamp]" or "ok [timestamp]")
173 type VolumeStatus struct {
176 LastErrTime time.Time
179 type NodeStatus struct {
183 Volumes map[string]VolumeStatus
186 func StatusHandler(w http.ResponseWriter, req *http.Request) {
187 st := new(NodeStatus)
188 st.LastUpdate = time.Now()
190 // Get a list of disk devices on this system.
191 st.DiskDev = make([]string, 1)
192 if devdir, err := os.Open("/dev"); err != nil {
193 log.Printf("StatusHandler: opening /dev: %s\n", err)
195 devs, err := devdir.Readdirnames(0)
197 for _, d := range devs {
198 if strings.HasPrefix(d, "sd") ||
199 strings.HasPrefix(d, "hd") ||
200 strings.HasPrefix(d, "xvd") {
201 st.DiskDev = append(st.DiskDev, d)
205 log.Printf("Readdirnames: %s", err)
209 for _, vol := range KeepVolumes {
210 st.Volumes[vol] = GetVolumeStatus(vol)
215 // Returns a VolumeStatus describing the requested volume.
216 func GetVolumeStatus(volume string) VolumeStatus {
217 var isfull, lasterr string
218 var lasterrtime time.Time
221 isfull = fmt.Sprintf("full %d", time.Now().Unix())
223 isfull = fmt.Sprintf("ok %d", time.Now().Unix())
226 // Not implemented yet
228 lasterrtime = time.Unix(0, 0)
230 return VolumeStatus{isfull, lasterr, lasterrtime}
234 // Returns a string containing a list of locator ids found on this
235 // Keep server. If {prefix} is given, return only those locator
236 // ids that begin with the given prefix string.
238 // The return string consists of a sequence of newline-separated
239 // strings in the format
241 // locator+size modification-time
245 // e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
246 // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
247 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
249 func IndexLocators(prefix string) string {
251 for _, vol := range KeepVolumes {
253 func(path string, info os.FileInfo, err error) error {
254 // This WalkFunc inspects each path in the volume
255 // and prints an index line for all files that begin
258 log.Printf("IndexHandler: %s: walking to %s: %s",
262 locator := filepath.Base(path)
263 // Skip directories that do not match prefix.
264 // We know there is nothing interesting inside.
266 !strings.HasPrefix(locator, prefix) &&
267 !strings.HasPrefix(prefix, locator) {
268 return filepath.SkipDir
270 // Print filenames beginning with prefix
271 if !info.IsDir() && strings.HasPrefix(locator, prefix) {
272 output = output + fmt.Sprintf(
273 "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
282 func GetBlock(hash string) ([]byte, error) {
283 var buf = make([]byte, BLOCKSIZE)
285 // Attempt to read the requested hash from a keep volume.
286 for _, vol := range KeepVolumes {
291 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
293 f, err = os.Open(blockFilename)
295 if !os.IsNotExist(err) {
296 // A block is stored on only one Keep disk,
297 // so os.IsNotExist is expected. Report any other errors.
298 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
303 nread, err = f.Read(buf)
305 log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
309 // Double check the file checksum.
311 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
312 if filehash != hash {
313 // TODO(twp): this condition probably represents a bad disk and
314 // should raise major alarm bells for an administrator: e.g.
315 // they should be sent directly to an event manager at high
316 // priority or logged as urgent problems.
318 log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
319 vol, blockFilename, filehash)
320 return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
324 return buf[:nread], nil
327 log.Printf("%s: not found on any volumes, giving up\n", hash)
328 return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
331 /* PutBlock(block, hash)
332 Stores the BLOCK (identified by the content id HASH) in Keep.
334 The MD5 checksum of the block must be identical to the content id HASH.
335 If not, an error is returned.
337 PutBlock stores the BLOCK on the first Keep volume with free space.
338 A failure code is returned to the user only if all volumes fail.
340 On success, PutBlock returns nil.
341 On failure, it returns a KeepError with one of the following codes:
344 A different block with the same hash already exists on this
347 The MD5 hash of the BLOCK does not match the argument HASH.
349 There was not enough space left in any Keep volume to store
352 The object could not be stored for some other reason (e.g.
353 all writes failed). The text of the error message should
354 provide as much detail as possible.
357 func PutBlock(block []byte, hash string) error {
358 // Check that BLOCK's checksum matches HASH.
359 blockhash := fmt.Sprintf("%x", md5.Sum(block))
360 if blockhash != hash {
361 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
362 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
365 // If we already have a block on disk under this identifier, return
366 // success (but check for MD5 collisions).
367 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
368 // In either case, we want to write our new (good) block to disk, so there is
369 // nothing special to do if err != nil.
370 if oldblock, err := GetBlock(hash); err == nil {
371 if bytes.Compare(block, oldblock) == 0 {
374 return &KeepError{ErrCollision, errors.New("Collision")}
378 // Store the block on the first available Keep volume.
380 for _, vol := range KeepVolumes {
385 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
386 if err := os.MkdirAll(blockDir, 0755); err != nil {
387 log.Printf("%s: could not create directory %s: %s",
392 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
394 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
397 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
399 if _, err := tmpfile.Write(block); err != nil {
400 log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
403 if err := tmpfile.Close(); err != nil {
404 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
405 os.Remove(tmpfile.Name())
408 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
409 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
410 os.Remove(tmpfile.Name())
417 log.Printf("all Keep volumes full")
418 return &KeepError{ErrFull, errors.New("Full")}
420 log.Printf("all Keep volumes failed")
421 return &KeepError{ErrOther, errors.New("Fail")}
425 func IsFull(volume string) (isFull bool) {
426 fullSymlink := volume + "/full"
428 // Check if the volume has been marked as full in the last hour.
429 if link, err := os.Readlink(fullSymlink); err == nil {
430 if ts, err := strconv.Atoi(link); err == nil {
431 fulltime := time.Unix(int64(ts), 0)
432 if time.Since(fulltime).Hours() < 1.0 {
438 if avail, err := FreeDiskSpace(volume); err == nil {
439 isFull = avail < MIN_FREE_KILOBYTES
441 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
445 // If the volume is full, timestamp it.
447 now := fmt.Sprintf("%d", time.Now().Unix())
448 os.Symlink(now, fullSymlink)
453 // FreeDiskSpace(volume)
454 // Returns the amount of available disk space on VOLUME,
455 // as a number of 1k blocks.
457 func FreeDiskSpace(volume string) (free uint64, err error) {
458 var fs syscall.Statfs_t
459 err = syscall.Statfs(volume, &fs)
461 // Statfs output is not guaranteed to measure free
462 // space in terms of 1K blocks.
463 free = fs.Bavail * uint64(fs.Bsize) / 1024