3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43 // For IndexHandler we support:
44 // /index - returns all locators
45 // /index/{prefix} - returns all locators that begin with {prefix}
46 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
47 // If {prefix} is the empty string, return an index of all locators
48 // (so /index and /index/ behave identically)
49 // A client may supply a full 32-digit locator string, in which
50 // case the server will return an index with either zero or one
51 // entries. This usage allows a client to check whether a block is
52 // present, and its size and upload time, without retrieving the
55 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
57 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
58 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
60 // Any request which does not match any of these routes gets
62 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
67 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
68 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
71 // FindKeepVolumes scans all mounted volumes on the system for Keep
72 // volumes, and returns a list of matching paths.
74 // A device is assumed to be a Keep volume if it is a normal or tmpfs
75 // volume and has a "/keep" directory directly underneath the mount
78 func FindKeepVolumes() []string {
79 vols := make([]string, 0)
81 if f, err := os.Open(PROC_MOUNTS); err != nil {
82 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
84 scanner := bufio.NewScanner(f)
86 args := strings.Fields(scanner.Text())
87 dev, mount := args[0], args[1]
89 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
90 keep := mount + "/keep"
91 if st, err := os.Stat(keep); err == nil && st.IsDir() {
92 vols = append(vols, keep)
96 if err := scanner.Err(); err != nil {
103 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
104 hash := mux.Vars(req)["hash"]
106 log.Printf("%s %s", req.Method, hash)
108 hints := mux.Vars(req)["hints"]
110 // Parse the locator string and hints from the request.
111 // TODO(twp): implement a Locator type.
112 var signature, timestamp string
114 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
115 for _, hint := range strings.Split(hints, "+") {
116 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
117 // Server ignores size hints
118 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
121 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
122 // Any unknown hint that starts with an uppercase letter is
123 // presumed to be valid and ignored, to permit forward compatibility.
125 // Unknown format; not a valid locator.
126 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
132 // If permission checking is in effect, verify this
133 // request's permission signature.
134 if enforce_permissions {
135 if signature == "" || timestamp == "" {
136 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
138 } else if IsExpired(timestamp) {
139 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
142 req_locator := req.URL.Path[1:] // strip leading slash
143 if !VerifySignature(req_locator, GetApiToken(req)) {
144 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
150 block, err := GetBlock(hash)
152 // Garbage collect after each GET. Fixes #2865.
153 // TODO(twp): review Keep memory usage and see if there's
154 // a better way to do this than blindly garbage collecting
155 // after every block.
159 // This type assertion is safe because the only errors
160 // GetBlock can return are DiskHashError or NotFoundError.
161 if err == NotFoundError {
162 log.Printf("%s: not found, giving up\n", hash)
164 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
168 resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
170 _, err = resp.Write(block)
172 log.Printf("GetBlockHandler: writing response: %s", err)
178 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
179 // Garbage collect after each PUT. Fixes #2865.
180 // See also GetBlockHandler.
183 hash := mux.Vars(req)["hash"]
185 log.Printf("%s %s", req.Method, hash)
187 // Read the block data to be stored.
188 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
190 if req.ContentLength > BLOCKSIZE {
191 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
195 buf := make([]byte, req.ContentLength)
196 nread, err := io.ReadFull(req.Body, buf)
198 http.Error(resp, err.Error(), 500)
199 } else if int64(nread) < req.ContentLength {
200 http.Error(resp, "request truncated", 500)
202 if err := PutBlock(buf, hash); err == nil {
203 // Success; add a size hint, sign the locator if
204 // possible, and return it to the client.
205 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
206 api_token := GetApiToken(req)
207 if PermissionSecret != nil && api_token != "" {
208 expiry := time.Now().Add(permission_ttl)
209 return_hash = SignLocator(return_hash, api_token, expiry)
211 resp.Write([]byte(return_hash + "\n"))
213 ke := err.(*KeepError)
214 http.Error(resp, ke.Error(), ke.HTTPCode)
221 // A HandleFunc to address /index and /index/{prefix} requests.
223 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
224 prefix := mux.Vars(req)["prefix"]
226 // Only the data manager may issue /index requests,
227 // and only if enforce_permissions is enabled.
228 // All other requests return 403 Forbidden.
229 api_token := GetApiToken(req)
230 if !enforce_permissions ||
232 data_manager_token != api_token {
233 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
237 for _, vol := range KeepVM.Volumes() {
238 index = index + vol.Index(prefix)
240 resp.Write([]byte(index))
244 // Responds to /status.json requests with the current node status,
245 // described in a JSON structure.
247 // The data given in a status.json response includes:
248 // volumes - a list of Keep volumes currently in use by this server
249 // each volume is an object with the following fields:
251 // * device_num (an integer identifying the underlying filesystem)
255 type VolumeStatus struct {
256 MountPoint string `json:"mount_point"`
257 DeviceNum uint64 `json:"device_num"`
258 BytesFree uint64 `json:"bytes_free"`
259 BytesUsed uint64 `json:"bytes_used"`
262 type NodeStatus struct {
263 Volumes []*VolumeStatus `json:"volumes"`
266 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
267 st := GetNodeStatus()
268 if jstat, err := json.Marshal(st); err == nil {
271 log.Printf("json.Marshal: %s\n", err)
272 log.Printf("NodeStatus = %v\n", st)
273 http.Error(resp, err.Error(), 500)
278 // Returns a NodeStatus struct describing this Keep
279 // node's current status.
281 func GetNodeStatus() *NodeStatus {
282 st := new(NodeStatus)
284 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
285 for i, vol := range KeepVM.Volumes() {
286 st.Volumes[i] = vol.Status()
292 // Returns a VolumeStatus describing the requested volume.
294 func GetVolumeStatus(volume string) *VolumeStatus {
295 var fs syscall.Statfs_t
298 if fi, err := os.Stat(volume); err == nil {
299 devnum = fi.Sys().(*syscall.Stat_t).Dev
301 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
305 err := syscall.Statfs(volume, &fs)
307 log.Printf("GetVolumeStatus: statfs: %s\n", err)
310 // These calculations match the way df calculates disk usage:
311 // "free" space is measured by fs.Bavail, but "used" space
312 // uses fs.Blocks - fs.Bfree.
313 free := fs.Bavail * uint64(fs.Bsize)
314 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
315 return &VolumeStatus{volume, devnum, free, used}
318 func GetBlock(hash string) ([]byte, error) {
319 // Attempt to read the requested hash from a keep volume.
320 error_to_caller := NotFoundError
322 for _, vol := range KeepVM.Volumes() {
323 if buf, err := vol.Get(hash); err != nil {
324 // IsNotExist is an expected error and may be ignored.
325 // (If all volumes report IsNotExist, we return a NotFoundError)
326 // All other errors should be logged but we continue trying to
329 case os.IsNotExist(err):
332 log.Printf("GetBlock: reading %s: %s\n", hash, err)
335 // Double check the file checksum.
337 filehash := fmt.Sprintf("%x", md5.Sum(buf))
338 if filehash != hash {
339 // TODO(twp): this condition probably represents a bad disk and
340 // should raise major alarm bells for an administrator: e.g.
341 // they should be sent directly to an event manager at high
342 // priority or logged as urgent problems.
344 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
346 error_to_caller = DiskHashError
349 if error_to_caller != NotFoundError {
350 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
358 if error_to_caller != NotFoundError {
359 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
361 return nil, error_to_caller
364 /* PutBlock(block, hash)
365 Stores the BLOCK (identified by the content id HASH) in Keep.
367 The MD5 checksum of the block must be identical to the content id HASH.
368 If not, an error is returned.
370 PutBlock stores the BLOCK on the first Keep volume with free space.
371 A failure code is returned to the user only if all volumes fail.
373 On success, PutBlock returns nil.
374 On failure, it returns a KeepError with one of the following codes:
377 A different block with the same hash already exists on this
380 The MD5 hash of the BLOCK does not match the argument HASH.
382 There was not enough space left in any Keep volume to store
385 The object could not be stored for some other reason (e.g.
386 all writes failed). The text of the error message should
387 provide as much detail as possible.
390 func PutBlock(block []byte, hash string) error {
391 // Check that BLOCK's checksum matches HASH.
392 blockhash := fmt.Sprintf("%x", md5.Sum(block))
393 if blockhash != hash {
394 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
395 return RequestHashError
398 // If we already have a block on disk under this identifier, return
399 // success (but check for MD5 collisions).
400 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
401 // In either case, we want to write our new (good) block to disk,
402 // so there is nothing special to do if err != nil.
403 if oldblock, err := GetBlock(hash); err == nil {
404 if bytes.Compare(block, oldblock) == 0 {
407 return CollisionError
411 // Choose a Keep volume to write to.
412 // If this volume fails, try all of the volumes in order.
413 vol := KeepVM.Choose()
414 if err := vol.Put(hash, block); err == nil {
415 return nil // success!
418 for _, vol := range KeepVM.Volumes() {
419 err := vol.Put(hash, block)
421 return nil // success!
423 if err != FullError {
424 // The volume is not full but the write did not succeed.
425 // Report the error and continue trying.
427 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
432 log.Printf("all Keep volumes full")
435 log.Printf("all Keep volumes failed")
442 // Return true if the specified string is a valid Keep locator.
443 // When Keep is extended to support hash types other than MD5,
444 // this should be updated to cover those as well.
446 func IsValidLocator(loc string) bool {
447 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
451 log.Printf("IsValidLocator: %s\n", err)
455 // GetApiToken returns the OAuth2 token from the Authorization
456 // header of a HTTP request, or an empty string if no matching
458 func GetApiToken(req *http.Request) string {
459 if auth, ok := req.Header["Authorization"]; ok {
460 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
462 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
469 // IsExpired returns true if the given Unix timestamp (expressed as a
470 // hexadecimal string) is in the past, or if timestamp_hex cannot be
471 // parsed as a hexadecimal string.
472 func IsExpired(timestamp_hex string) bool {
473 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
475 log.Printf("IsExpired: %s\n", err)
478 return time.Unix(ts, 0).Before(time.Now())