// StatusHandler (GET /status.json)
import (
- "bufio"
"bytes"
"container/list"
"crypto/md5"
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
-// FindKeepVolumes scans all mounted volumes on the system for Keep
-// volumes, and returns a list of matching paths.
-//
-// A device is assumed to be a Keep volume if it is a normal or tmpfs
-// volume and has a "/keep" directory directly underneath the mount
-// point.
-//
-func FindKeepVolumes() []string {
- vols := make([]string, 0)
-
- if f, err := os.Open(PROC_MOUNTS); err != nil {
- log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
- } else {
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- args := strings.Fields(scanner.Text())
- dev, mount := args[0], args[1]
- if mount != "/" &&
- (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
- keep := mount + "/keep"
- if st, err := os.Stat(keep); err == nil && st.IsDir() {
- vols = append(vols, keep)
- }
- }
- }
- if err := scanner.Err(); err != nil {
- log.Fatal(err)
- }
- }
- return vols
-}
-
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
// presumed to be valid and ignored, to permit forward compatibility.
} else {
// Unknown format; not a valid locator.
- log.Printf("%s %s %d %s", req.Method, hash, BadRequestError.HTTPCode, "-")
http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
return
}
// request's permission signature.
if enforce_permissions {
if signature == "" || timestamp == "" {
- log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
return
} else if IsExpired(timestamp) {
- log.Printf("%s %s %d %s", req.Method, hash, ExpiredError.HTTPCode, "-")
http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
return
} else {
req_locator := req.URL.Path[1:] // strip leading slash
if !VerifySignature(req_locator, GetApiToken(req)) {
- log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
return
}
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
- if err == NotFoundError {
- log.Printf("%s: not found, giving up\n", hash)
- }
- log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, "-")
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
- resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
+ resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
_, err = resp.Write(block)
- if err != nil {
- log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, len(block), "-")
- } else {
- log.Printf("%s %s %d %d", req.Method, hash, 200, len(block))
- }
return
}
// If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
//
if req.ContentLength > BLOCKSIZE {
- log.Printf("%s %s %d %d", req.Method, hash, TooLongError.HTTPCode, req.ContentLength)
http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
return
}
buf := make([]byte, req.ContentLength)
nread, err := io.ReadFull(req.Body, buf)
if err != nil {
- log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
http.Error(resp, err.Error(), 500)
} else if int64(nread) < req.ContentLength {
- log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
http.Error(resp, "request truncated", 500)
} else {
if err := PutBlock(buf, hash); err == nil {
expiry := time.Now().Add(permission_ttl)
return_hash = SignLocator(return_hash, api_token, expiry)
}
- log.Printf("%s %s %d %d", req.Method, hash, 200, req.ContentLength)
resp.Write([]byte(return_hash + "\n"))
} else {
ke := err.(*KeepError)
- log.Printf("%s %s %d %d", req.Method, hash, ke.HTTPCode, req.ContentLength)
http.Error(resp, ke.Error(), ke.HTTPCode)
}
}
// Reject unauthorized requests.
if !IsDataManagerToken(GetApiToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
return
}
prefix := mux.Vars(req)["prefix"]
var index string
- for _, vol := range KeepVM.Volumes() {
+ for _, vol := range KeepVM.AllReadable() {
index = index + vol.Index(prefix)
}
resp.Write([]byte(index))
func GetNodeStatus() *NodeStatus {
st := new(NodeStatus)
- st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
- for i, vol := range KeepVM.Volumes() {
+ st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
+ for i, vol := range KeepVM.AllReadable() {
st.Volumes[i] = vol.Status()
}
return st
//
func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
- log.Printf("%s %s", req.Method, hash)
// Confirm that this user is an admin and has a token with unlimited scope.
var tok = GetApiToken(req)
return
}
- // Delete copies of this block from all available volumes. Report
- // how many blocks were successfully and unsuccessfully
- // deleted.
+ // Delete copies of this block from all available volumes.
+ // Report how many blocks were successfully deleted, and how
+ // many were found on writable volumes but not deleted.
var result struct {
Deleted int `json:"copies_deleted"`
Failed int `json:"copies_failed"`
}
- for _, vol := range KeepVM.Volumes() {
+ for _, vol := range KeepVM.AllWritable() {
if err := vol.Delete(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
// Reject unauthorized requests.
if !IsDataManagerToken(GetApiToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
return
}
r := json.NewDecoder(req.Body)
if err := r.Decode(&pr); err != nil {
http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
- log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
return
}
// We have a properly formatted pull list sent from the data
// manager. Report success and send the list to the pull list
// manager for further handling.
- log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
resp.WriteHeader(http.StatusOK)
resp.Write([]byte(
fmt.Sprintf("Received %d pull requests\n", len(pr))))
for _, p := range pr {
plist.PushBack(p)
}
-
- if pullq == nil {
- pullq = NewWorkQueue()
- }
pullq.ReplaceQueue(plist)
}
// Reject unauthorized requests.
if !IsDataManagerToken(GetApiToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
return
}
r := json.NewDecoder(req.Body)
if err := r.Decode(&trash); err != nil {
http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
- log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
return
}
// We have a properly formatted trash list sent from the data
// manager. Report success and send the list to the trash work
// queue for further handling.
- log.Printf("%s %s: received %v\n", req.Method, req.URL, trash)
resp.WriteHeader(http.StatusOK)
resp.Write([]byte(
fmt.Sprintf("Received %d trash requests\n", len(trash))))
for _, t := range trash {
tlist.PushBack(t)
}
-
- if trashq == nil {
- trashq = NewWorkQueue()
- }
trashq.ReplaceQueue(tlist)
}
// Attempt to read the requested hash from a keep volume.
error_to_caller := NotFoundError
- for _, vol := range KeepVM.Volumes() {
- if buf, err := vol.Get(hash); err != nil {
- // IsNotExist is an expected error and may be ignored.
- // (If all volumes report IsNotExist, we return a NotFoundError)
- // All other errors should be logged but we continue trying to
- // read.
- switch {
- case os.IsNotExist(err):
- continue
- default:
+ var vols []Volume
+ if update_timestamp {
+ // Pointless to find the block on an unwritable volume
+ // because Touch() will fail -- this is as good as
+ // "not found" for purposes of callers who need to
+ // update_timestamp.
+ vols = KeepVM.AllWritable()
+ } else {
+ vols = KeepVM.AllReadable()
+ }
+
+ for _, vol := range vols {
+ buf, err := vol.Get(hash)
+ if err != nil {
+ // IsNotExist is an expected error and may be
+ // ignored. All other errors are logged. In
+ // any case we continue trying to read other
+ // volumes. If all volumes report IsNotExist,
+ // we return a NotFoundError.
+ if !os.IsNotExist(err) {
log.Printf("GetBlock: reading %s: %s\n", hash, err)
}
- } else {
- // Double check the file checksum.
- //
- filehash := fmt.Sprintf("%x", md5.Sum(buf))
- if filehash != hash {
- // TODO(twp): this condition probably represents a bad disk and
- // should raise major alarm bells for an administrator: e.g.
- // they should be sent directly to an event manager at high
- // priority or logged as urgent problems.
- //
- log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
- vol, hash, filehash)
- error_to_caller = DiskHashError
- } else {
- // Success!
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
- vol, hash)
- }
- // Update the timestamp if the caller requested.
- // If we could not update the timestamp, continue looking on
- // other volumes.
- if update_timestamp {
- if vol.Touch(hash) != nil {
- continue
- }
- }
- return buf, nil
+ continue
+ }
+ // Check the file checksum.
+ //
+ filehash := fmt.Sprintf("%x", md5.Sum(buf))
+ if filehash != hash {
+ // TODO: Try harder to tell a sysadmin about
+ // this.
+ log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
+ vol, hash, filehash)
+ error_to_caller = DiskHashError
+ continue
+ }
+ if error_to_caller == DiskHashError {
+ log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
+ vol, hash)
+ }
+ if update_timestamp {
+ if err := vol.Touch(hash); err != nil {
+ error_to_caller = GenericError
+ log.Printf("%s: Touch %s failed: %s",
+ vol, hash, error_to_caller)
+ continue
}
}
- }
-
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch, no good copy found\n", hash)
+ return buf, nil
}
return nil, error_to_caller
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
- vol := KeepVM.Choose()
- if err := vol.Put(hash, block); err == nil {
- return nil // success!
- } else {
- allFull := true
- for _, vol := range KeepVM.Volumes() {
- err := vol.Put(hash, block)
- if err == nil {
- return nil // success!
- }
- if err != FullError {
- // The volume is not full but the write did not succeed.
- // Report the error and continue trying.
- allFull = false
- log.Printf("%s: Write(%s): %s\n", vol, hash, err)
- }
+ if vol := KeepVM.NextWritable(); vol != nil {
+ if err := vol.Put(hash, block); err == nil {
+ return nil // success!
}
+ }
- if allFull {
- log.Printf("all Keep volumes full")
- return FullError
- } else {
- log.Printf("all Keep volumes failed")
- return GenericError
+ writables := KeepVM.AllWritable()
+ if len(writables) == 0 {
+ log.Print("No writable volumes.")
+ return FullError
+ }
+
+ allFull := true
+ for _, vol := range writables {
+ err := vol.Put(hash, block)
+ if err == nil {
+ return nil // success!
+ }
+ if err != FullError {
+ // The volume is not full but the
+ // write did not succeed. Report the
+ // error and continue trying.
+ allFull = false
+ log.Printf("%s: Write(%s): %s\n", vol, hash, err)
}
}
+
+ if allFull {
+ log.Print("All volumes are full.")
+ return FullError
+ } else {
+ // Already logged the non-full errors.
+ return GenericError
+ }
}
// IsValidLocator