Incorporating comments from code review (refs #2438, refs #2291)
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "crypto/md5"
6         "errors"
7         "fmt"
8         "github.com/gorilla/mux"
9         "log"
10         "net/http"
11         "os"
12         "strings"
13 )
14
15 const DEFAULT_PORT = 25107
16 const BLOCKSIZE = 64 * 1024 * 1024
17
18 var KeepVolumes []string
19
20 func main() {
21         // Look for local keep volumes.
22         KeepVolumes = FindKeepVolumes()
23         if len(KeepVolumes) == 0 {
24                 log.Fatal("could not find any keep volumes")
25         }
26         for _, v := range KeepVolumes {
27                 log.Println("keep volume:", v)
28         }
29
30         // Set up REST handlers.
31         //
32         // Start with a router that will route each URL path to an
33         // appropriate handler.
34         //
35         rest := mux.NewRouter()
36         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
37
38         // Tell the built-in HTTP server to direct all requests to the REST
39         // router.
40         http.Handle("/", rest)
41
42         // Start listening for requests.
43         port := fmt.Sprintf(":%d", DEFAULT_PORT)
44         http.ListenAndServe(port, nil)
45 }
46
47 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
48         hash := mux.Vars(req)["hash"]
49
50         block, err := GetBlock(hash)
51         if err != nil {
52                 http.Error(w, err.Error(), 404)
53                 return
54         }
55
56         _, err = w.Write(block)
57         if err != nil {
58                 log.Printf("GetBlockHandler: writing response: %s", err)
59         }
60
61         return
62 }
63
64 func GetBlock(hash string) ([]byte, error) {
65         var buf = make([]byte, BLOCKSIZE)
66
67         // Attempt to read the requested hash from a keep volume.
68         for _, vol := range KeepVolumes {
69                 var f *os.File
70                 var err error
71                 var nread int
72
73                 path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
74
75                 f, err = os.Open(path)
76                 if err != nil {
77                         log.Printf("%s: opening %s: %s\n", vol, path, err)
78                         continue
79                 }
80
81                 nread, err = f.Read(buf)
82                 if err != nil {
83                         log.Printf("%s: reading %s: %s\n", vol, path, err)
84                         continue
85                 }
86
87                 // Double check the file checksum.
88                 //
89                 // TODO(twp): this condition probably represents a bad disk and
90                 // should raise major alarm bells for an administrator: e.g.
91                 // they should be sent directly to an event manager at high
92                 // priority or logged as urgent problems.
93                 //
94                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
95                 if filehash != hash {
96                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
97                                 vol, path, filehash)
98                         continue
99                 }
100
101                 // Success!
102                 return buf[:nread], nil
103         }
104
105         log.Printf("%s: all keep volumes failed, giving up\n", hash)
106         return buf, errors.New("not found: " + hash)
107 }
108
109 // FindKeepVolumes
110 //     Returns a list of Keep volumes mounted on this system.
111 //
112 //     A Keep volume is a normal or tmpfs volume with a /keep
113 //     directory at the top level of the mount point.
114 //
115 func FindKeepVolumes() []string {
116         vols := make([]string, 0)
117
118         if f, err := os.Open("/proc/mounts"); err != nil {
119                 log.Fatal("could not read /proc/mounts: ", err)
120         } else {
121                 scanner := bufio.NewScanner(f)
122                 for scanner.Scan() {
123                         args := strings.Fields(scanner.Text())
124                         dev, mount := args[0], args[1]
125                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
126                                 keep := mount + "/keep"
127                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
128                                         vols = append(vols, keep)
129                                 }
130                         }
131                 }
132                 if err := scanner.Err(); err != nil {
133                         log.Fatal(err)
134                 }
135         }
136         return vols
137 }