Merge branch '2291-new-keepd-read-blocks' (fixes #2291)
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "crypto/md5"
6         "errors"
7         "fmt"
8         "github.com/gorilla/mux"
9         "log"
10         "net/http"
11         "os"
12         "strings"
13 )
14
15 const DEFAULT_PORT = 25107
16 const BLOCKSIZE = 64 * 1024 * 1024
17
18 var KeepVolumes []string
19
20 func main() {
21         // Look for local keep volumes.
22         KeepVolumes = FindKeepVolumes()
23         if len(KeepVolumes) == 0 {
24                 log.Fatal("could not find any keep volumes")
25         }
26         for _, v := range KeepVolumes {
27                 log.Println("keep volume:", v)
28         }
29
30         // Set up REST handlers.
31         //
32         // Start with a router that will route each URL path to an
33         // appropriate handler.
34         //
35         rest := mux.NewRouter()
36         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
37
38         // Tell the built-in HTTP server to direct all requests to the REST
39         // router.
40         http.Handle("/", rest)
41
42         // Start listening for requests.
43         port := fmt.Sprintf(":%d", DEFAULT_PORT)
44         http.ListenAndServe(port, nil)
45 }
46
47 // FindKeepVolumes
48 //     Returns a list of Keep volumes mounted on this system.
49 //
50 //     A Keep volume is a normal or tmpfs volume with a /keep
51 //     directory at the top level of the mount point.
52 //
53 func FindKeepVolumes() []string {
54         vols := make([]string, 0)
55
56         if f, err := os.Open("/proc/mounts"); err != nil {
57                 log.Fatal("could not read /proc/mounts: ", err)
58         } else {
59                 scanner := bufio.NewScanner(f)
60                 for scanner.Scan() {
61                         args := strings.Fields(scanner.Text())
62                         dev, mount := args[0], args[1]
63                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
64                                 keep := mount + "/keep"
65                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
66                                         vols = append(vols, keep)
67                                 }
68                         }
69                 }
70                 if err := scanner.Err(); err != nil {
71                         log.Fatal(err)
72                 }
73         }
74         return vols
75 }
76
77 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
78         hash := mux.Vars(req)["hash"]
79
80         block, err := GetBlock(hash)
81         if err != nil {
82                 http.Error(w, err.Error(), 404)
83                 return
84         }
85
86         _, err = w.Write(block)
87         if err != nil {
88                 log.Printf("GetBlockHandler: writing response: %s", err)
89         }
90
91         return
92 }
93
94 func GetBlock(hash string) ([]byte, error) {
95         var buf = make([]byte, BLOCKSIZE)
96
97         // Attempt to read the requested hash from a keep volume.
98         for _, vol := range KeepVolumes {
99                 var f *os.File
100                 var err error
101                 var nread int
102
103                 path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
104
105                 f, err = os.Open(path)
106                 if err != nil {
107                         log.Printf("%s: opening %s: %s\n", vol, path, err)
108                         continue
109                 }
110
111                 nread, err = f.Read(buf)
112                 if err != nil {
113                         log.Printf("%s: reading %s: %s\n", vol, path, err)
114                         continue
115                 }
116
117                 // Double check the file checksum.
118                 //
119                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
120                 if filehash != hash {
121                         // TODO(twp): this condition probably represents a bad disk and
122                         // should raise major alarm bells for an administrator: e.g.
123                         // they should be sent directly to an event manager at high
124                         // priority or logged as urgent problems.
125                         //
126                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
127                                 vol, path, filehash)
128                         continue
129                 }
130
131                 // Success!
132                 return buf[:nread], nil
133         }
134
135         log.Printf("%s: not found on any volumes, giving up\n", hash)
136         return buf, errors.New("not found: " + hash)
137 }