From: Tim Pierce Date: Wed, 26 Mar 2014 21:34:50 +0000 (-0400) Subject: Merge branch '2291-new-keepd-read-blocks' (fixes #2291) X-Git-Tag: 1.1.0~2608^2~11^2~31 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/1013c7cee5478538901fd9ba5fc0fb2ce30f6422?hp=8e34082da3e9952c18e8bd9c0afac2516dea1287 Merge branch '2291-new-keepd-read-blocks' (fixes #2291) --- diff --git a/services/keep/keep.go b/services/keep/keep.go new file mode 100644 index 0000000000..06a81b6fa8 --- /dev/null +++ b/services/keep/keep.go @@ -0,0 +1,137 @@ +package main + +import ( + "bufio" + "crypto/md5" + "errors" + "fmt" + "github.com/gorilla/mux" + "log" + "net/http" + "os" + "strings" +) + +const DEFAULT_PORT = 25107 +const BLOCKSIZE = 64 * 1024 * 1024 + +var KeepVolumes []string + +func main() { + // Look for local keep volumes. + KeepVolumes = FindKeepVolumes() + if len(KeepVolumes) == 0 { + log.Fatal("could not find any keep volumes") + } + for _, v := range KeepVolumes { + log.Println("keep volume:", v) + } + + // Set up REST handlers. + // + // Start with a router that will route each URL path to an + // appropriate handler. + // + rest := mux.NewRouter() + rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET") + + // Tell the built-in HTTP server to direct all requests to the REST + // router. + http.Handle("/", rest) + + // Start listening for requests. + port := fmt.Sprintf(":%d", DEFAULT_PORT) + http.ListenAndServe(port, nil) +} + +// FindKeepVolumes +// Returns a list of Keep volumes mounted on this system. +// +// A Keep volume is a normal or tmpfs volume with a /keep +// directory at the top level of the mount point. +// +func FindKeepVolumes() []string { + vols := make([]string, 0) + + if f, err := os.Open("/proc/mounts"); err != nil { + log.Fatal("could not read /proc/mounts: ", err) + } else { + scanner := bufio.NewScanner(f) + for scanner.Scan() { + args := strings.Fields(scanner.Text()) + dev, mount := args[0], args[1] + if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" { + keep := mount + "/keep" + if st, err := os.Stat(keep); err == nil && st.IsDir() { + vols = append(vols, keep) + } + } + } + if err := scanner.Err(); err != nil { + log.Fatal(err) + } + } + return vols +} + +func GetBlockHandler(w http.ResponseWriter, req *http.Request) { + hash := mux.Vars(req)["hash"] + + block, err := GetBlock(hash) + if err != nil { + http.Error(w, err.Error(), 404) + return + } + + _, err = w.Write(block) + if err != nil { + log.Printf("GetBlockHandler: writing response: %s", err) + } + + return +} + +func GetBlock(hash string) ([]byte, error) { + var buf = make([]byte, BLOCKSIZE) + + // Attempt to read the requested hash from a keep volume. + for _, vol := range KeepVolumes { + var f *os.File + var err error + var nread int + + path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash) + + f, err = os.Open(path) + if err != nil { + log.Printf("%s: opening %s: %s\n", vol, path, err) + continue + } + + nread, err = f.Read(buf) + if err != nil { + log.Printf("%s: reading %s: %s\n", vol, path, err) + continue + } + + // Double check the file checksum. + // + filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread])) + if filehash != hash { + // TODO(twp): this condition probably represents a bad disk and + // should raise major alarm bells for an administrator: e.g. + // they should be sent directly to an event manager at high + // priority or logged as urgent problems. + // + log.Printf("%s: checksum mismatch: %s (actual hash %s)\n", + vol, path, filehash) + continue + } + + // Success! + return buf[:nread], nil + } + + log.Printf("%s: not found on any volumes, giving up\n", hash) + return buf, errors.New("not found: " + hash) +} diff --git a/services/keep/keep_test.go b/services/keep/keep_test.go new file mode 100644 index 0000000000..5e3b763d0f --- /dev/null +++ b/services/keep/keep_test.go @@ -0,0 +1,125 @@ +package main + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "testing" +) + +var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.") +var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0" +var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.") + +// Test simple block reads. +func TestGetBlockOK(t *testing.T) { + defer teardown() + + // Create two test Keep volumes and store a block in each of them. + setup(t, 2) + for _, vol := range KeepVolumes { + store(t, vol, TEST_HASH, TEST_BLOCK) + } + + // Check that GetBlock returns success. + result, err := GetBlock(TEST_HASH) + if err != nil { + t.Errorf("GetBlock error: %s", err) + } + if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) { + t.Errorf("expected %s, got %s", TEST_BLOCK, result) + } +} + +// Test block reads when one Keep volume is missing. +func TestGetBlockOneKeepOK(t *testing.T) { + defer teardown() + + // Two test Keep volumes, only the second has a block. + setup(t, 2) + store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK) + + // Check that GetBlock returns success. + result, err := GetBlock(TEST_HASH) + if err != nil { + t.Errorf("GetBlock error: %s", err) + } + if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) { + t.Errorf("expected %s, got %s", TEST_BLOCK, result) + } +} + +// Test block read failure. +func TestGetBlockFail(t *testing.T) { + defer teardown() + + // Create two empty test Keep volumes. + setup(t, 2) + + // Check that GetBlock returns failure. + result, err := GetBlock(TEST_HASH) + if err == nil { + t.Errorf("GetBlock incorrectly returned success: ", result) + } +} + +// Test reading a corrupt block. +func TestGetBlockCorrupt(t *testing.T) { + defer teardown() + + // Create two test Keep volumes and store a block in each of them, + // but the hash of the block does not match the filename. + setup(t, 2) + for _, vol := range KeepVolumes { + store(t, vol, TEST_HASH, BAD_BLOCK) + } + + // Check that GetBlock returns failure. + result, err := GetBlock(TEST_HASH) + if err == nil { + t.Errorf("GetBlock incorrectly returned success: %s", result) + } +} + +// setup +// Create KeepVolumes for testing. +// +func setup(t *testing.T, num_volumes int) { + KeepVolumes = make([]string, num_volumes) + for i := range KeepVolumes { + if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil { + KeepVolumes[i] = dir + "/keep" + } else { + t.Fatal(err) + } + } +} + +// teardown +// Cleanup to perform after each test. +// +func teardown() { + for _, vol := range KeepVolumes { + os.RemoveAll(path.Dir(vol)) + } +} + +// store +// +func store(t *testing.T, keepdir string, filename string, block []byte) error { + blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3]) + if err := os.MkdirAll(blockdir, 0755); err != nil { + t.Fatal(err) + } + + blockpath := fmt.Sprintf("%s/%s", blockdir, filename) + if f, err := os.Create(blockpath); err == nil { + f.Write(block) + f.Close() + } else { + t.Fatal(err) + } + + return nil +}