Merge branch 'master' into 2449-keep-flags
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "errors"
8         "flag"
9         "fmt"
10         "github.com/gorilla/mux"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "strconv"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 // Default TCP address on which to listen for requests.
22 const DEFAULT_ADDR = ":25107"
23
24 // A Keep "block" is 64MB.
25 const BLOCKSIZE = 64 * 1024 * 1024
26
27 // A Keep volume must have at least MIN_FREE_KILOBYTES available
28 // in order to permit writes.
29 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
30
31 var PROC_MOUNTS = "/proc/mounts"
32
33 var KeepVolumes []string
34
35 type KeepError struct {
36         HTTPCode int
37         Err      error
38 }
39
40 const (
41         ErrCollision = 400
42         ErrMD5Fail   = 401
43         ErrCorrupt   = 402
44         ErrNotFound  = 404
45         ErrOther     = 500
46         ErrFull      = 503
47 )
48
49 func (e *KeepError) Error() string {
50         return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
51 }
52
53 func main() {
54         // Parse command-line flags.
55         var listen, keepvols string
56         flag.StringVar(&listen, "listen", DEFAULT_ADDR,
57                 "interface on which to listen for requests")
58         flag.StringVar(&keepvols, "volumes", "",
59                 "comma-separated list of directories to use for Keep volumes")
60         flag.Parse()
61
62         // Look for local keep volumes.
63         if keepvols == "" {
64                 KeepVolumes = FindKeepVolumes()
65         } else {
66                 KeepVolumes = strings.Split(keepvols, ",")
67         }
68
69         if len(KeepVolumes) == 0 {
70                 log.Fatal("could not find any keep volumes")
71         }
72         for _, v := range KeepVolumes {
73                 log.Println("keep volume:", v)
74         }
75
76         // Set up REST handlers.
77         //
78         // Start with a router that will route each URL path to an
79         // appropriate handler.
80         //
81         rest := mux.NewRouter()
82         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
83         rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
84
85         // Tell the built-in HTTP server to direct all requests to the REST
86         // router.
87         http.Handle("/", rest)
88
89         // Start listening for requests.
90         http.ListenAndServe(listen, nil)
91 }
92
93 // FindKeepVolumes
94 //     Returns a list of Keep volumes mounted on this system.
95 //
96 //     A Keep volume is a normal or tmpfs volume with a /keep
97 //     directory at the top level of the mount point.
98 //
99 func FindKeepVolumes() []string {
100         vols := make([]string, 0)
101
102         if f, err := os.Open(PROC_MOUNTS); err != nil {
103                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
104         } else {
105                 scanner := bufio.NewScanner(f)
106                 for scanner.Scan() {
107                         args := strings.Fields(scanner.Text())
108                         dev, mount := args[0], args[1]
109                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
110                                 keep := mount + "/keep"
111                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
112                                         vols = append(vols, keep)
113                                 }
114                         }
115                 }
116                 if err := scanner.Err(); err != nil {
117                         log.Fatal(err)
118                 }
119         }
120         return vols
121 }
122
123 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
124         hash := mux.Vars(req)["hash"]
125
126         block, err := GetBlock(hash)
127         if err != nil {
128                 http.Error(w, err.Error(), 404)
129                 return
130         }
131
132         _, err = w.Write(block)
133         if err != nil {
134                 log.Printf("GetBlockHandler: writing response: %s", err)
135         }
136
137         return
138 }
139
140 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
141         hash := mux.Vars(req)["hash"]
142
143         // Read the block data to be stored.
144         // TODO(twp): decide what to do when the input stream contains
145         // more than BLOCKSIZE bytes.
146         //
147         buf := make([]byte, BLOCKSIZE)
148         if nread, err := req.Body.Read(buf); err == nil {
149                 if err := PutBlock(buf[:nread], hash); err == nil {
150                         w.WriteHeader(http.StatusOK)
151                 } else {
152                         ke := err.(*KeepError)
153                         http.Error(w, ke.Error(), ke.HTTPCode)
154                 }
155         } else {
156                 log.Println("error reading request: ", err)
157                 http.Error(w, err.Error(), 500)
158         }
159 }
160
161 func GetBlock(hash string) ([]byte, error) {
162         var buf = make([]byte, BLOCKSIZE)
163
164         // Attempt to read the requested hash from a keep volume.
165         for _, vol := range KeepVolumes {
166                 var f *os.File
167                 var err error
168                 var nread int
169
170                 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
171
172                 f, err = os.Open(blockFilename)
173                 if err != nil {
174                         if !os.IsNotExist(err) {
175                                 // A block is stored on only one Keep disk,
176                                 // so os.IsNotExist is expected.  Report any other errors.
177                                 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
178                         }
179                         continue
180                 }
181
182                 nread, err = f.Read(buf)
183                 if err != nil {
184                         log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
185                         continue
186                 }
187
188                 // Double check the file checksum.
189                 //
190                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
191                 if filehash != hash {
192                         // TODO(twp): this condition probably represents a bad disk and
193                         // should raise major alarm bells for an administrator: e.g.
194                         // they should be sent directly to an event manager at high
195                         // priority or logged as urgent problems.
196                         //
197                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
198                                 vol, blockFilename, filehash)
199                         return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
200                 }
201
202                 // Success!
203                 return buf[:nread], nil
204         }
205
206         log.Printf("%s: not found on any volumes, giving up\n", hash)
207         return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
208 }
209
210 /* PutBlock(block, hash)
211    Stores the BLOCK (identified by the content id HASH) in Keep.
212
213    The MD5 checksum of the block must be identical to the content id HASH.
214    If not, an error is returned.
215
216    PutBlock stores the BLOCK on the first Keep volume with free space.
217    A failure code is returned to the user only if all volumes fail.
218
219    On success, PutBlock returns nil.
220    On failure, it returns a KeepError with one of the following codes:
221
222    400 Collision
223           A different block with the same hash already exists on this
224           Keep server.
225    401 MD5Fail
226           The MD5 hash of the BLOCK does not match the argument HASH.
227    503 Full
228           There was not enough space left in any Keep volume to store
229           the object.
230    500 Fail
231           The object could not be stored for some other reason (e.g.
232           all writes failed). The text of the error message should
233           provide as much detail as possible.
234 */
235
236 func PutBlock(block []byte, hash string) error {
237         // Check that BLOCK's checksum matches HASH.
238         blockhash := fmt.Sprintf("%x", md5.Sum(block))
239         if blockhash != hash {
240                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
241                 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
242         }
243
244         // If we already have a block on disk under this identifier, return
245         // success (but check for MD5 collisions).
246         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
247         // In either case, we want to write our new (good) block to disk, so there is
248         // nothing special to do if err != nil.
249         if oldblock, err := GetBlock(hash); err == nil {
250                 if bytes.Compare(block, oldblock) == 0 {
251                         return nil
252                 } else {
253                         return &KeepError{ErrCollision, errors.New("Collision")}
254                 }
255         }
256
257         // Store the block on the first available Keep volume.
258         allFull := true
259         for _, vol := range KeepVolumes {
260                 if IsFull(vol) {
261                         continue
262                 }
263                 allFull = false
264                 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
265                 if err := os.MkdirAll(blockDir, 0755); err != nil {
266                         log.Printf("%s: could not create directory %s: %s",
267                                 hash, blockDir, err)
268                         continue
269                 }
270
271                 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
272                 if tmperr != nil {
273                         log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
274                         continue
275                 }
276                 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
277
278                 if _, err := tmpfile.Write(block); err != nil {
279                         log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
280                         continue
281                 }
282                 if err := tmpfile.Close(); err != nil {
283                         log.Printf("closing %s: %s\n", tmpfile.Name(), err)
284                         os.Remove(tmpfile.Name())
285                         continue
286                 }
287                 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
288                         log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
289                         os.Remove(tmpfile.Name())
290                         continue
291                 }
292                 return nil
293         }
294
295         if allFull {
296                 log.Printf("all Keep volumes full")
297                 return &KeepError{ErrFull, errors.New("Full")}
298         } else {
299                 log.Printf("all Keep volumes failed")
300                 return &KeepError{ErrOther, errors.New("Fail")}
301         }
302 }
303
304 func IsFull(volume string) (isFull bool) {
305         fullSymlink := volume + "/full"
306
307         // Check if the volume has been marked as full in the last hour.
308         if link, err := os.Readlink(fullSymlink); err == nil {
309                 if ts, err := strconv.Atoi(link); err == nil {
310                         fulltime := time.Unix(int64(ts), 0)
311                         if time.Since(fulltime).Hours() < 1.0 {
312                                 return true
313                         }
314                 }
315         }
316
317         if avail, err := FreeDiskSpace(volume); err == nil {
318                 isFull = avail < MIN_FREE_KILOBYTES
319         } else {
320                 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
321                 isFull = false
322         }
323
324         // If the volume is full, timestamp it.
325         if isFull {
326                 now := fmt.Sprintf("%d", time.Now().Unix())
327                 os.Symlink(now, fullSymlink)
328         }
329         return
330 }
331
332 // FreeDiskSpace(volume)
333 //     Returns the amount of available disk space on VOLUME,
334 //     as a number of 1k blocks.
335 //
336 func FreeDiskSpace(volume string) (free uint64, err error) {
337         var fs syscall.Statfs_t
338         err = syscall.Statfs(volume, &fs)
339         if err == nil {
340                 // Statfs output is not guaranteed to measure free
341                 // space in terms of 1K blocks.
342                 free = fs.Bavail * uint64(fs.Bsize) / 1024
343         }
344
345         return
346 }