Added IsFull() to check for free space before writing. (refs #2292)
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "crypto/md5"
6         "errors"
7         "fmt"
8         "github.com/gorilla/mux"
9         "log"
10         "net/http"
11         "os"
12         "os/exec"
13         "strconv"
14         "strings"
15         "time"
16 )
17
18 // Default TCP port on which to listen for requests.
19 const DEFAULT_PORT = 25107
20
21 // A Keep "block" is 64MB.
22 const BLOCKSIZE = 64 * 1024 * 1024
23
24 // A Keep volume must have at least MIN_FREE_KILOBYTES available
25 // in order to permit writes.
26 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
27
28 var PROC_MOUNTS = "/proc/mounts"
29
30 var KeepVolumes []string
31
32 type KeepError struct {
33         HTTPCode int
34         Err      error
35 }
36
37 func (e *KeepError) Error() string {
38         return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
39 }
40
41 func main() {
42         // Look for local keep volumes.
43         KeepVolumes = FindKeepVolumes()
44         if len(KeepVolumes) == 0 {
45                 log.Fatal("could not find any keep volumes")
46         }
47         for _, v := range KeepVolumes {
48                 log.Println("keep volume:", v)
49         }
50
51         // Set up REST handlers.
52         //
53         // Start with a router that will route each URL path to an
54         // appropriate handler.
55         //
56         rest := mux.NewRouter()
57         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
58         rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
59
60         // Tell the built-in HTTP server to direct all requests to the REST
61         // router.
62         http.Handle("/", rest)
63
64         // Start listening for requests.
65         port := fmt.Sprintf(":%d", DEFAULT_PORT)
66         http.ListenAndServe(port, nil)
67 }
68
69 // FindKeepVolumes
70 //     Returns a list of Keep volumes mounted on this system.
71 //
72 //     A Keep volume is a normal or tmpfs volume with a /keep
73 //     directory at the top level of the mount point.
74 //
75 func FindKeepVolumes() []string {
76         vols := make([]string, 0)
77
78         if f, err := os.Open(PROC_MOUNTS); err != nil {
79                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
80         } else {
81                 scanner := bufio.NewScanner(f)
82                 for scanner.Scan() {
83                         args := strings.Fields(scanner.Text())
84                         dev, mount := args[0], args[1]
85                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
86                                 keep := mount + "/keep"
87                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
88                                         vols = append(vols, keep)
89                                 }
90                         }
91                 }
92                 if err := scanner.Err(); err != nil {
93                         log.Fatal(err)
94                 }
95         }
96         return vols
97 }
98
99 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
100         hash := mux.Vars(req)["hash"]
101
102         block, err := GetBlock(hash)
103         if err != nil {
104                 http.Error(w, err.Error(), 404)
105                 return
106         }
107
108         _, err = w.Write(block)
109         if err != nil {
110                 log.Printf("GetBlockHandler: writing response: %s", err)
111         }
112
113         return
114 }
115
116 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
117         hash := mux.Vars(req)["hash"]
118
119         // Read the block data to be stored.
120         // TODO(twp): decide what to do when the input stream contains
121         // more than BLOCKSIZE bytes.
122         //
123         buf := make([]byte, BLOCKSIZE)
124         if nread, err := req.Body.Read(buf); err == nil {
125                 if err := PutBlock(buf[:nread], hash); err == nil {
126                         w.WriteHeader(http.StatusOK)
127                 } else {
128                         ke := err.(*KeepError)
129                         http.Error(w, ke.Error(), ke.HTTPCode)
130                 }
131         } else {
132                 log.Println("error reading request: ", err)
133                 http.Error(w, err.Error(), 500)
134         }
135 }
136
137 func GetBlock(hash string) ([]byte, error) {
138         var buf = make([]byte, BLOCKSIZE)
139
140         // Attempt to read the requested hash from a keep volume.
141         for _, vol := range KeepVolumes {
142                 var f *os.File
143                 var err error
144                 var nread int
145
146                 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
147
148                 f, err = os.Open(blockFilename)
149                 if err != nil {
150                         if !os.IsNotExist(err) {
151                                 // A block is stored on only one Keep disk,
152                                 // so os.IsNotExist is expected.  Report any other errors.
153                                 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
154                         }
155                         continue
156                 }
157
158                 nread, err = f.Read(buf)
159                 if err != nil {
160                         log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
161                         continue
162                 }
163
164                 // Double check the file checksum.
165                 //
166                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
167                 if filehash != hash {
168                         // TODO(twp): this condition probably represents a bad disk and
169                         // should raise major alarm bells for an administrator: e.g.
170                         // they should be sent directly to an event manager at high
171                         // priority or logged as urgent problems.
172                         //
173                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
174                                 vol, blockFilename, filehash)
175                         continue
176                 }
177
178                 // Success!
179                 return buf[:nread], nil
180         }
181
182         log.Printf("%s: not found on any volumes, giving up\n", hash)
183         return buf, &KeepError{404, errors.New("not found: " + hash)}
184 }
185
186 /* PutBlock(block, hash)
187    Stores the BLOCK (identified by the content id HASH) in Keep.
188
189    The MD5 checksum of the block must be identical to the content id HASH.
190    If not, an error is returned.
191
192    PutBlock stores the BLOCK on the first Keep volume with free space.
193    A failure code is returned to the user only if all volumes fail.
194
195    On success, PutBlock returns nil.
196    On failure, it returns a KeepError with one of the following codes:
197
198    401 MD5Fail
199          -- The MD5 hash of the BLOCK does not match the argument HASH.
200    503 Full
201          -- There was not enough space left in any Keep volume to store
202             the object.
203    500 Fail
204          -- The object could not be stored for some other reason (e.g.
205             all writes failed). The text of the error message should
206             provide as much detail as possible.
207 */
208
209 func PutBlock(block []byte, hash string) error {
210         // Check that BLOCK's checksum matches HASH.
211         blockhash := fmt.Sprintf("%x", md5.Sum(block))
212         if blockhash != hash {
213                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
214                 return &KeepError{401, errors.New("MD5Fail")}
215         }
216
217         allFull := true
218         for _, vol := range KeepVolumes {
219                 if IsFull(vol) {
220                         continue
221                 }
222                 allFull = false
223                 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
224                 if err := os.MkdirAll(blockDir, 0755); err != nil {
225                         log.Printf("%s: could not create directory %s: %s",
226                                 hash, blockDir, err)
227                         continue
228                 }
229
230                 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
231                 f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
232                 if err != nil {
233                         // if the block already exists, just return success.
234                         // TODO(twp): should we check here whether the file on disk
235                         // matches the file we were asked to store?
236                         if os.IsExist(err) {
237                                 return nil
238                         } else {
239                                 // Open failed for some other reason.
240                                 log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
241                                 continue
242                         }
243                 }
244
245                 if _, err := f.Write(block); err == nil {
246                         f.Close()
247                         return nil
248                 } else {
249                         log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
250                         continue
251                 }
252         }
253
254         if allFull {
255                 log.Printf("all Keep volumes full")
256                 return &KeepError{503, errors.New("Full")}
257         } else {
258                 log.Printf("all Keep volumes failed")
259                 return &KeepError{500, errors.New("Fail")}
260         }
261 }
262
263 func IsFull(volume string) (isFull bool) {
264         fullSymlink := volume + "/full"
265
266         // Check if the volume has been marked as full in the last hour.
267         if link, err := os.Readlink(fullSymlink); err == nil {
268                 if ts, err := strconv.Atoi(link); err == nil {
269                         fulltime := time.Unix(int64(ts), 0)
270                         if time.Since(fulltime).Hours() < 1.0 {
271                                 return true
272                         }
273                 }
274         }
275
276         if avail, err := FreeDiskSpace(volume); err == nil {
277                 isFull = avail < MIN_FREE_KILOBYTES
278         } else {
279                 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
280                 isFull = false
281         }
282
283         // If the volume is full, timestamp it.
284         if isFull {
285                 now := fmt.Sprintf("%d", time.Now().Unix())
286                 os.Symlink(now, fullSymlink)
287         }
288         return
289 }
290
291 // FreeDiskSpace(volume)
292 //     Returns the amount of available disk space on VOLUME,
293 //     as a number of 1k blocks.
294 //
295 func FreeDiskSpace(volume string) (free int, err error) {
296         // Run df to find out how much disk space is left.
297         cmd := exec.Command("df", "--block-size=1k", volume)
298         stdout, perr := cmd.StdoutPipe()
299         if perr != nil {
300                 return 0, perr
301         }
302         scanner := bufio.NewScanner(stdout)
303         if perr := cmd.Start(); err != nil {
304                 return 0, perr
305         }
306
307         scanner.Scan() // skip header line of df output
308         scanner.Scan()
309
310         f := strings.Fields(scanner.Text())
311         if avail, err := strconv.Atoi(f[3]); err == nil {
312                 free = avail
313         } else {
314                 err = errors.New("bad df format: " + scanner.Text())
315         }
316
317         // Flush the df output and shut it down cleanly.
318         for scanner.Scan() {
319         }
320         cmd.Wait()
321
322         return
323 }