// StatusHandler (GET /status.json)
import (
- "bufio"
"bytes"
"container/list"
"crypto/md5"
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
-
- // For IndexHandler we support:
- // /index - returns all locators
- // /index/{prefix} - returns all locators that begin with {prefix}
- // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
- // If {prefix} is the empty string, return an index of all locators
- // (so /index and /index/ behave identically)
- // A client may supply a full 32-digit locator string, in which
- // case the server will return an index with either zero or one
- // entries. This usage allows a client to check whether a block is
- // present, and its size and upload time, without retrieving the
- // entire block.
- //
+ // List all blocks stored here. Privileged client only.
rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
- rest.HandleFunc(
- `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+ // List blocks stored here whose hash has the given prefix.
+ // Privileged client only.
+ rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+
+ // List volumes: path, device number, bytes used/avail.
rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
- // The PullHandler processes "PUT /pull" commands from Data Manager.
- // It parses the JSON list of pull requests in the request body, and
- // delivers them to the pull list manager for replication.
+ // Replace the current pull queue.
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+ // Replace the current trash queue.
+ rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+
// Any request which does not match any of these routes gets
// 400 Bad Request.
rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
-// FindKeepVolumes scans all mounted volumes on the system for Keep
-// volumes, and returns a list of matching paths.
-//
-// A device is assumed to be a Keep volume if it is a normal or tmpfs
-// volume and has a "/keep" directory directly underneath the mount
-// point.
-//
-func FindKeepVolumes() []string {
- vols := make([]string, 0)
-
- if f, err := os.Open(PROC_MOUNTS); err != nil {
- log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
- } else {
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- args := strings.Fields(scanner.Text())
- dev, mount := args[0], args[1]
- if mount != "/" &&
- (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
- keep := mount + "/keep"
- if st, err := os.Stat(keep); err == nil && st.IsDir() {
- vols = append(vols, keep)
- }
- }
- }
- if err := scanner.Err(); err != nil {
- log.Fatal(err)
- }
- }
- return vols
-}
-
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
// presumed to be valid and ignored, to permit forward compatibility.
} else {
// Unknown format; not a valid locator.
- log.Printf("%s %s %d %s", req.Method, hash, BadRequestError.HTTPCode, "-")
http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
return
}
// request's permission signature.
if enforce_permissions {
if signature == "" || timestamp == "" {
- log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
return
} else if IsExpired(timestamp) {
- log.Printf("%s %s %d %s", req.Method, hash, ExpiredError.HTTPCode, "-")
http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
return
} else {
req_locator := req.URL.Path[1:] // strip leading slash
if !VerifySignature(req_locator, GetApiToken(req)) {
- log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
return
}
}
block, err := GetBlock(hash, false)
-
- // Garbage collect after each GET. Fixes #2865.
- // TODO(twp): review Keep memory usage and see if there's
- // a better way to do this than blindly garbage collecting
- // after every block.
- defer runtime.GC()
-
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
- if err == NotFoundError {
- log.Printf("%s: not found, giving up\n", hash)
- }
- log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, "-")
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
+ defer bufs.Put(block)
- resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
-
- _, err = resp.Write(block)
- if err != nil {
- log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, len(block), "-")
- } else {
- log.Printf("%s %s %d %d", req.Method, hash, 200, len(block))
- }
-
- return
+ resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+ resp.Header().Set("Content-Type", "application/octet-stream")
+ resp.Write(block)
}
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
- // Read the block data to be stored.
- // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
- //
+ // Detect as many error conditions as possible before reading
+ // the body: avoid transmitting data that will not end up
+ // being written anyway.
+
+ if req.ContentLength == -1 {
+ http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
+ return
+ }
+
if req.ContentLength > BLOCKSIZE {
- log.Printf("%s %s %d %d", req.Method, hash, TooLongError.HTTPCode, req.ContentLength)
http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
return
}
- buf := make([]byte, req.ContentLength)
- nread, err := io.ReadFull(req.Body, buf)
+ if len(KeepVM.AllWritable()) == 0 {
+ http.Error(resp, FullError.Error(), FullError.HTTPCode)
+ return
+ }
+
+ buf := bufs.Get(int(req.ContentLength))
+ _, err := io.ReadFull(req.Body, buf)
if err != nil {
- log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
http.Error(resp, err.Error(), 500)
- } else if int64(nread) < req.ContentLength {
- log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
- http.Error(resp, "request truncated", 500)
- } else {
- if err := PutBlock(buf, hash); err == nil {
- // Success; add a size hint, sign the locator if
- // possible, and return it to the client.
- return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
- api_token := GetApiToken(req)
- if PermissionSecret != nil && api_token != "" {
- expiry := time.Now().Add(permission_ttl)
- return_hash = SignLocator(return_hash, api_token, expiry)
- }
- log.Printf("%s %s %d %d", req.Method, hash, 200, req.ContentLength)
- resp.Write([]byte(return_hash + "\n"))
- } else {
- ke := err.(*KeepError)
- log.Printf("%s %s %d %d", req.Method, hash, ke.HTTPCode, req.ContentLength)
- http.Error(resp, ke.Error(), ke.HTTPCode)
- }
+ bufs.Put(buf)
+ return
}
- return
+
+ err = PutBlock(buf, hash)
+ bufs.Put(buf)
+
+ if err != nil {
+ ke := err.(*KeepError)
+ http.Error(resp, ke.Error(), ke.HTTPCode)
+ return
+ }
+
+ // Success; add a size hint, sign the locator if possible, and
+ // return it to the client.
+ return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
+ api_token := GetApiToken(req)
+ if PermissionSecret != nil && api_token != "" {
+ expiry := time.Now().Add(blob_signature_ttl)
+ return_hash = SignLocator(return_hash, api_token, expiry)
+ }
+ resp.Write([]byte(return_hash + "\n"))
}
// IndexHandler
// A HandleFunc to address /index and /index/{prefix} requests.
//
func IndexHandler(resp http.ResponseWriter, req *http.Request) {
- prefix := mux.Vars(req)["prefix"]
-
- // Only the data manager may issue /index requests,
- // and only if enforce_permissions is enabled.
- // All other requests return 403 Forbidden.
- api_token := GetApiToken(req)
- if !enforce_permissions ||
- api_token == "" ||
- data_manager_token != api_token {
- http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
- var index string
- for _, vol := range KeepVM.Volumes() {
- index = index + vol.Index(prefix)
+
+ prefix := mux.Vars(req)["prefix"]
+
+ for _, vol := range KeepVM.AllReadable() {
+ if err := vol.IndexTo(prefix, resp); err != nil {
+ // The only errors returned by IndexTo are
+ // write errors returned by resp.Write(),
+ // which probably means the client has
+ // disconnected and this error will never be
+ // reported to the client -- but it will
+ // appear in our own error log.
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
+ }
}
- resp.Write([]byte(index))
}
// StatusHandler
func GetNodeStatus() *NodeStatus {
st := new(NodeStatus)
- st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
- for i, vol := range KeepVM.Volumes() {
+ st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
+ for i, vol := range KeepVM.AllReadable() {
st.Volumes[i] = vol.Status()
}
return st
//
func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
- log.Printf("%s %s", req.Method, hash)
// Confirm that this user is an admin and has a token with unlimited scope.
var tok = GetApiToken(req)
return
}
- // Delete copies of this block from all available volumes. Report
- // how many blocks were successfully and unsuccessfully
- // deleted.
+ // Delete copies of this block from all available volumes.
+ // Report how many blocks were successfully deleted, and how
+ // many were found on writable volumes but not deleted.
var result struct {
Deleted int `json:"copies_deleted"`
Failed int `json:"copies_failed"`
}
- for _, vol := range KeepVM.Volumes() {
+ for _, vol := range KeepVM.AllWritable() {
if err := vol.Delete(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
func PullHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- api_token := GetApiToken(req)
- if !IsDataManagerToken(api_token) {
+ if !IsDataManagerToken(GetApiToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
return
}
var pr []PullRequest
r := json.NewDecoder(req.Body)
if err := r.Decode(&pr); err != nil {
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
- log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
return
}
// We have a properly formatted pull list sent from the data
// manager. Report success and send the list to the pull list
// manager for further handling.
- log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
resp.WriteHeader(http.StatusOK)
resp.Write([]byte(
fmt.Sprintf("Received %d pull requests\n", len(pr))))
for _, p := range pr {
plist.PushBack(p)
}
+ pullq.ReplaceQueue(plist)
+}
- if pullq == nil {
- pullq = NewWorkQueue()
+type TrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+}
+
+func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
}
- pullq.ReplaceQueue(plist)
+
+ // Parse the request body.
+ var trash []TrashRequest
+ r := json.NewDecoder(req.Body)
+ if err := r.Decode(&trash); err != nil {
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
+ return
+ }
+
+ // We have a properly formatted trash list sent from the data
+ // manager. Report success and send the list to the trash work
+ // queue for further handling.
+ resp.WriteHeader(http.StatusOK)
+ resp.Write([]byte(
+ fmt.Sprintf("Received %d trash requests\n", len(trash))))
+
+ tlist := list.New()
+ for _, t := range trash {
+ tlist.PushBack(t)
+ }
+ trashq.ReplaceQueue(tlist)
}
// ==============================
// Attempt to read the requested hash from a keep volume.
error_to_caller := NotFoundError
- for _, vol := range KeepVM.Volumes() {
- if buf, err := vol.Get(hash); err != nil {
- // IsNotExist is an expected error and may be ignored.
- // (If all volumes report IsNotExist, we return a NotFoundError)
- // All other errors should be logged but we continue trying to
- // read.
- switch {
- case os.IsNotExist(err):
- continue
- default:
+ var vols []Volume
+ if update_timestamp {
+ // Pointless to find the block on an unwritable volume
+ // because Touch() will fail -- this is as good as
+ // "not found" for purposes of callers who need to
+ // update_timestamp.
+ vols = KeepVM.AllWritable()
+ } else {
+ vols = KeepVM.AllReadable()
+ }
+
+ for _, vol := range vols {
+ buf, err := vol.Get(hash)
+ if err != nil {
+ // IsNotExist is an expected error and may be
+ // ignored. All other errors are logged. In
+ // any case we continue trying to read other
+ // volumes. If all volumes report IsNotExist,
+ // we return a NotFoundError.
+ if !os.IsNotExist(err) {
log.Printf("GetBlock: reading %s: %s\n", hash, err)
}
- } else {
- // Double check the file checksum.
- //
- filehash := fmt.Sprintf("%x", md5.Sum(buf))
- if filehash != hash {
- // TODO(twp): this condition probably represents a bad disk and
- // should raise major alarm bells for an administrator: e.g.
- // they should be sent directly to an event manager at high
- // priority or logged as urgent problems.
- //
- log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
- vol, hash, filehash)
- error_to_caller = DiskHashError
- } else {
- // Success!
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
- vol, hash)
- }
- // Update the timestamp if the caller requested.
- // If we could not update the timestamp, continue looking on
- // other volumes.
- if update_timestamp {
- if vol.Touch(hash) != nil {
- continue
- }
- }
- return buf, nil
+ continue
+ }
+ // Check the file checksum.
+ //
+ filehash := fmt.Sprintf("%x", md5.Sum(buf))
+ if filehash != hash {
+ // TODO: Try harder to tell a sysadmin about
+ // this.
+ log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
+ vol, hash, filehash)
+ error_to_caller = DiskHashError
+ continue
+ }
+ if error_to_caller == DiskHashError {
+ log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
+ vol, hash)
+ }
+ if update_timestamp {
+ if err := vol.Touch(hash); err != nil {
+ error_to_caller = GenericError
+ log.Printf("%s: Touch %s failed: %s",
+ vol, hash, error_to_caller)
+ continue
}
}
- }
-
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch, no good copy found\n", hash)
+ return buf, nil
}
return nil, error_to_caller
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
- vol := KeepVM.Choose()
- if err := vol.Put(hash, block); err == nil {
- return nil // success!
- } else {
- allFull := true
- for _, vol := range KeepVM.Volumes() {
- err := vol.Put(hash, block)
- if err == nil {
- return nil // success!
- }
- if err != FullError {
- // The volume is not full but the write did not succeed.
- // Report the error and continue trying.
- allFull = false
- log.Printf("%s: Write(%s): %s\n", vol, hash, err)
- }
+ if vol := KeepVM.NextWritable(); vol != nil {
+ if err := vol.Put(hash, block); err == nil {
+ return nil // success!
}
+ }
- if allFull {
- log.Printf("all Keep volumes full")
- return FullError
- } else {
- log.Printf("all Keep volumes failed")
- return GenericError
+ writables := KeepVM.AllWritable()
+ if len(writables) == 0 {
+ log.Print("No writable volumes.")
+ return FullError
+ }
+
+ allFull := true
+ for _, vol := range writables {
+ err := vol.Put(hash, block)
+ if err == nil {
+ return nil // success!
+ }
+ if err != FullError {
+ // The volume is not full but the
+ // write did not succeed. Report the
+ // error and continue trying.
+ allFull = false
+ log.Printf("%s: Write(%s): %s\n", vol, hash, err)
}
}
+
+ if allFull {
+ log.Print("All volumes are full.")
+ return FullError
+ } else {
+ // Already logged the non-full errors.
+ return GenericError
+ }
}
// IsValidLocator