// StatusHandler (GET /status.json)
import (
- "bufio"
"bytes"
"container/list"
"crypto/md5"
`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
- // The PullHandler processes "PUT /pull" commands from Data Manager.
- // It parses the JSON list of pull requests in the request body, and
- // delivers them to the pull list manager for replication.
+ // The PullHandler and TrashHandler process "PUT /pull" and "PUT
+ // /trash" requests from Data Manager. These requests instruct
+ // Keep to replicate or delete blocks; see
+ // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
+ // for more details.
+ //
+ // Each handler parses the JSON list of block management requests
+ // in the message body, and replaces any existing pull queue or
+ // trash queue with their contentes.
+ //
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+ rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
// Any request which does not match any of these routes gets
// 400 Bad Request.
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
-// FindKeepVolumes scans all mounted volumes on the system for Keep
-// volumes, and returns a list of matching paths.
-//
-// A device is assumed to be a Keep volume if it is a normal or tmpfs
-// volume and has a "/keep" directory directly underneath the mount
-// point.
-//
-func FindKeepVolumes() []string {
- vols := make([]string, 0)
-
- if f, err := os.Open(PROC_MOUNTS); err != nil {
- log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
- } else {
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- args := strings.Fields(scanner.Text())
- dev, mount := args[0], args[1]
- if mount != "/" &&
- (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
- keep := mount + "/keep"
- if st, err := os.Stat(keep); err == nil && st.IsDir() {
- vols = append(vols, keep)
- }
- }
- }
- if err := scanner.Err(); err != nil {
- log.Fatal(err)
- }
- }
- return vols
-}
-
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
// presumed to be valid and ignored, to permit forward compatibility.
} else {
// Unknown format; not a valid locator.
- log.Printf("%s %s %d %s", req.Method, hash, BadRequestError.HTTPCode, "-")
http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
return
}
// request's permission signature.
if enforce_permissions {
if signature == "" || timestamp == "" {
- log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
return
} else if IsExpired(timestamp) {
- log.Printf("%s %s %d %s", req.Method, hash, ExpiredError.HTTPCode, "-")
http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
return
} else {
req_locator := req.URL.Path[1:] // strip leading slash
if !VerifySignature(req_locator, GetApiToken(req)) {
- log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
return
}
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
- if err == NotFoundError {
- log.Printf("%s: not found, giving up\n", hash)
- }
- log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, "-")
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
- resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
+ resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
_, err = resp.Write(block)
- if err != nil {
- log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, len(block), "-")
- } else {
- log.Printf("%s %s %d %d", req.Method, hash, 200, len(block))
- }
return
}
// If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
//
if req.ContentLength > BLOCKSIZE {
- log.Printf("%s %s %d %d", req.Method, hash, TooLongError.HTTPCode, req.ContentLength)
http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
return
}
buf := make([]byte, req.ContentLength)
nread, err := io.ReadFull(req.Body, buf)
if err != nil {
- log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
http.Error(resp, err.Error(), 500)
} else if int64(nread) < req.ContentLength {
- log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
http.Error(resp, "request truncated", 500)
} else {
if err := PutBlock(buf, hash); err == nil {
expiry := time.Now().Add(permission_ttl)
return_hash = SignLocator(return_hash, api_token, expiry)
}
- log.Printf("%s %s %d %d", req.Method, hash, 200, req.ContentLength)
resp.Write([]byte(return_hash + "\n"))
} else {
ke := err.(*KeepError)
- log.Printf("%s %s %d %d", req.Method, hash, ke.HTTPCode, req.ContentLength)
http.Error(resp, ke.Error(), ke.HTTPCode)
}
}
// A HandleFunc to address /index and /index/{prefix} requests.
//
func IndexHandler(resp http.ResponseWriter, req *http.Request) {
- prefix := mux.Vars(req)["prefix"]
-
- // Only the data manager may issue /index requests,
- // and only if enforce_permissions is enabled.
- // All other requests return 403 Forbidden.
- api_token := GetApiToken(req)
- if !enforce_permissions ||
- api_token == "" ||
- data_manager_token != api_token {
- http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
+
+ prefix := mux.Vars(req)["prefix"]
+
var index string
- for _, vol := range KeepVM.Volumes() {
+ for _, vol := range KeepVM.AllReadable() {
index = index + vol.Index(prefix)
}
resp.Write([]byte(index))
func GetNodeStatus() *NodeStatus {
st := new(NodeStatus)
- st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
- for i, vol := range KeepVM.Volumes() {
+ st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
+ for i, vol := range KeepVM.AllReadable() {
st.Volumes[i] = vol.Status()
}
return st
//
func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
- log.Printf("%s %s", req.Method, hash)
// Confirm that this user is an admin and has a token with unlimited scope.
var tok = GetApiToken(req)
return
}
- // Delete copies of this block from all available volumes. Report
- // how many blocks were successfully and unsuccessfully
- // deleted.
+ // Delete copies of this block from all available volumes.
+ // Report how many blocks were successfully deleted, and how
+ // many were found on writable volumes but not deleted.
var result struct {
Deleted int `json:"copies_deleted"`
Failed int `json:"copies_failed"`
}
- for _, vol := range KeepVM.Volumes() {
+ for _, vol := range KeepVM.AllWritable() {
if err := vol.Delete(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
func PullHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- api_token := GetApiToken(req)
- if !IsDataManagerToken(api_token) {
+ if !IsDataManagerToken(GetApiToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
return
}
r := json.NewDecoder(req.Body)
if err := r.Decode(&pr); err != nil {
http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
- log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
return
}
// We have a properly formatted pull list sent from the data
// manager. Report success and send the list to the pull list
// manager for further handling.
- log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
resp.WriteHeader(http.StatusOK)
resp.Write([]byte(
fmt.Sprintf("Received %d pull requests\n", len(pr))))
for _, p := range pr {
plist.PushBack(p)
}
+ pullq.ReplaceQueue(plist)
+}
+
+type TrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+}
- if pullq == nil {
- pullq = NewWorkQueue()
+func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
}
- pullq.ReplaceQueue(plist)
+
+ // Parse the request body.
+ var trash []TrashRequest
+ r := json.NewDecoder(req.Body)
+ if err := r.Decode(&trash); err != nil {
+ http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ return
+ }
+
+ // We have a properly formatted trash list sent from the data
+ // manager. Report success and send the list to the trash work
+ // queue for further handling.
+ resp.WriteHeader(http.StatusOK)
+ resp.Write([]byte(
+ fmt.Sprintf("Received %d trash requests\n", len(trash))))
+
+ tlist := list.New()
+ for _, t := range trash {
+ tlist.PushBack(t)
+ }
+ trashq.ReplaceQueue(tlist)
}
// ==============================
// Attempt to read the requested hash from a keep volume.
error_to_caller := NotFoundError
- for _, vol := range KeepVM.Volumes() {
- if buf, err := vol.Get(hash); err != nil {
- // IsNotExist is an expected error and may be ignored.
- // (If all volumes report IsNotExist, we return a NotFoundError)
- // All other errors should be logged but we continue trying to
- // read.
- switch {
- case os.IsNotExist(err):
- continue
- default:
+ var vols []Volume
+ if update_timestamp {
+ // Pointless to find the block on an unwritable volume
+ // because Touch() will fail -- this is as good as
+ // "not found" for purposes of callers who need to
+ // update_timestamp.
+ vols = KeepVM.AllWritable()
+ } else {
+ vols = KeepVM.AllReadable()
+ }
+
+ for _, vol := range vols {
+ buf, err := vol.Get(hash)
+ if err != nil {
+ // IsNotExist is an expected error and may be
+ // ignored. All other errors are logged. In
+ // any case we continue trying to read other
+ // volumes. If all volumes report IsNotExist,
+ // we return a NotFoundError.
+ if !os.IsNotExist(err) {
log.Printf("GetBlock: reading %s: %s\n", hash, err)
}
- } else {
- // Double check the file checksum.
- //
- filehash := fmt.Sprintf("%x", md5.Sum(buf))
- if filehash != hash {
- // TODO(twp): this condition probably represents a bad disk and
- // should raise major alarm bells for an administrator: e.g.
- // they should be sent directly to an event manager at high
- // priority or logged as urgent problems.
- //
- log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
- vol, hash, filehash)
- error_to_caller = DiskHashError
- } else {
- // Success!
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
- vol, hash)
- }
- // Update the timestamp if the caller requested.
- // If we could not update the timestamp, continue looking on
- // other volumes.
- if update_timestamp {
- if vol.Touch(hash) != nil {
- continue
- }
- }
- return buf, nil
+ continue
+ }
+ // Check the file checksum.
+ //
+ filehash := fmt.Sprintf("%x", md5.Sum(buf))
+ if filehash != hash {
+ // TODO: Try harder to tell a sysadmin about
+ // this.
+ log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
+ vol, hash, filehash)
+ error_to_caller = DiskHashError
+ continue
+ }
+ if error_to_caller == DiskHashError {
+ log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
+ vol, hash)
+ }
+ if update_timestamp {
+ if err := vol.Touch(hash); err != nil {
+ error_to_caller = GenericError
+ log.Printf("%s: Touch %s failed: %s",
+ vol, hash, error_to_caller)
+ continue
}
}
- }
-
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch, no good copy found\n", hash)
+ return buf, nil
}
return nil, error_to_caller
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
- vol := KeepVM.Choose()
- if err := vol.Put(hash, block); err == nil {
- return nil // success!
- } else {
- allFull := true
- for _, vol := range KeepVM.Volumes() {
- err := vol.Put(hash, block)
- if err == nil {
- return nil // success!
- }
- if err != FullError {
- // The volume is not full but the write did not succeed.
- // Report the error and continue trying.
- allFull = false
- log.Printf("%s: Write(%s): %s\n", vol, hash, err)
- }
+ if vol := KeepVM.NextWritable(); vol != nil {
+ if err := vol.Put(hash, block); err == nil {
+ return nil // success!
}
+ }
- if allFull {
- log.Printf("all Keep volumes full")
- return FullError
- } else {
- log.Printf("all Keep volumes failed")
- return GenericError
+ writables := KeepVM.AllWritable()
+ if len(writables) == 0 {
+ log.Print("No writable volumes.")
+ return FullError
+ }
+
+ allFull := true
+ for _, vol := range writables {
+ err := vol.Put(hash, block)
+ if err == nil {
+ return nil // success!
}
+ if err != FullError {
+ // The volume is not full but the
+ // write did not succeed. Report the
+ // error and continue trying.
+ allFull = false
+ log.Printf("%s: Write(%s): %s\n", vol, hash, err)
+ }
+ }
+
+ if allFull {
+ log.Print("All volumes are full.")
+ return FullError
+ } else {
+ // Already logged the non-full errors.
+ return GenericError
}
}