Reversed histogram format, so timestamps come first.
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "crypto/md5"
6         "errors"
7         "fmt"
8         "github.com/gorilla/mux"
9         "log"
10         "net/http"
11         "os"
12         "strings"
13 )
14
15 const DEFAULT_PORT = 25107
16 const BLOCKSIZE = 64 * 1024 * 1024
17
18 var PROC_MOUNTS = "/proc/mounts"
19
20 var KeepVolumes []string
21
22 func main() {
23         // Look for local keep volumes.
24         KeepVolumes = FindKeepVolumes()
25         if len(KeepVolumes) == 0 {
26                 log.Fatal("could not find any keep volumes")
27         }
28         for _, v := range KeepVolumes {
29                 log.Println("keep volume:", v)
30         }
31
32         // Set up REST handlers.
33         //
34         // Start with a router that will route each URL path to an
35         // appropriate handler.
36         //
37         rest := mux.NewRouter()
38         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
39
40         // Tell the built-in HTTP server to direct all requests to the REST
41         // router.
42         http.Handle("/", rest)
43
44         // Start listening for requests.
45         port := fmt.Sprintf(":%d", DEFAULT_PORT)
46         http.ListenAndServe(port, nil)
47 }
48
49 // FindKeepVolumes
50 //     Returns a list of Keep volumes mounted on this system.
51 //
52 //     A Keep volume is a normal or tmpfs volume with a /keep
53 //     directory at the top level of the mount point.
54 //
55 func FindKeepVolumes() []string {
56         vols := make([]string, 0)
57
58         if f, err := os.Open(PROC_MOUNTS); err != nil {
59                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
60         } else {
61                 scanner := bufio.NewScanner(f)
62                 for scanner.Scan() {
63                         args := strings.Fields(scanner.Text())
64                         dev, mount := args[0], args[1]
65                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
66                                 keep := mount + "/keep"
67                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
68                                         vols = append(vols, keep)
69                                 }
70                         }
71                 }
72                 if err := scanner.Err(); err != nil {
73                         log.Fatal(err)
74                 }
75         }
76         return vols
77 }
78
79 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
80         hash := mux.Vars(req)["hash"]
81
82         block, err := GetBlock(hash)
83         if err != nil {
84                 http.Error(w, err.Error(), 404)
85                 return
86         }
87
88         _, err = w.Write(block)
89         if err != nil {
90                 log.Printf("GetBlockHandler: writing response: %s", err)
91         }
92
93         return
94 }
95
96 func GetBlock(hash string) ([]byte, error) {
97         var buf = make([]byte, BLOCKSIZE)
98
99         // Attempt to read the requested hash from a keep volume.
100         for _, vol := range KeepVolumes {
101                 var f *os.File
102                 var err error
103                 var nread int
104
105                 path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
106
107                 f, err = os.Open(path)
108                 if err != nil {
109                         log.Printf("%s: opening %s: %s\n", vol, path, err)
110                         continue
111                 }
112
113                 nread, err = f.Read(buf)
114                 if err != nil {
115                         log.Printf("%s: reading %s: %s\n", vol, path, err)
116                         continue
117                 }
118
119                 // Double check the file checksum.
120                 //
121                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
122                 if filehash != hash {
123                         // TODO(twp): this condition probably represents a bad disk and
124                         // should raise major alarm bells for an administrator: e.g.
125                         // they should be sent directly to an event manager at high
126                         // priority or logged as urgent problems.
127                         //
128                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
129                                 vol, path, filehash)
130                         continue
131                 }
132
133                 // Success!
134                 return buf[:nread], nil
135         }
136
137         log.Printf("%s: not found on any volumes, giving up\n", hash)
138         return buf, errors.New("not found: " + hash)
139 }