Merge remote-tracking branch 'origin/master' into origin-2228-check-filter-uuid-columns
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "errors"
8         "fmt"
9         "github.com/gorilla/mux"
10         "io/ioutil"
11         "log"
12         "net/http"
13         "os"
14         "strconv"
15         "strings"
16         "syscall"
17         "time"
18 )
19
20 // Default TCP port on which to listen for requests.
21 const DEFAULT_PORT = 25107
22
23 // A Keep "block" is 64MB.
24 const BLOCKSIZE = 64 * 1024 * 1024
25
26 // A Keep volume must have at least MIN_FREE_KILOBYTES available
27 // in order to permit writes.
28 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
29
30 var PROC_MOUNTS = "/proc/mounts"
31
32 var KeepVolumes []string
33
34 type KeepError struct {
35         HTTPCode int
36         Err      error
37 }
38
39 const (
40         ErrCollision = 400
41         ErrMD5Fail   = 401
42         ErrCorrupt   = 402
43         ErrNotFound  = 404
44         ErrOther     = 500
45         ErrFull      = 503
46 )
47
48 func (e *KeepError) Error() string {
49         return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
50 }
51
52 func main() {
53         // Look for local keep volumes.
54         KeepVolumes = FindKeepVolumes()
55         if len(KeepVolumes) == 0 {
56                 log.Fatal("could not find any keep volumes")
57         }
58         for _, v := range KeepVolumes {
59                 log.Println("keep volume:", v)
60         }
61
62         // Set up REST handlers.
63         //
64         // Start with a router that will route each URL path to an
65         // appropriate handler.
66         //
67         rest := mux.NewRouter()
68         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
69         rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
70
71         // Tell the built-in HTTP server to direct all requests to the REST
72         // router.
73         http.Handle("/", rest)
74
75         // Start listening for requests.
76         port := fmt.Sprintf(":%d", DEFAULT_PORT)
77         http.ListenAndServe(port, nil)
78 }
79
80 // FindKeepVolumes
81 //     Returns a list of Keep volumes mounted on this system.
82 //
83 //     A Keep volume is a normal or tmpfs volume with a /keep
84 //     directory at the top level of the mount point.
85 //
86 func FindKeepVolumes() []string {
87         vols := make([]string, 0)
88
89         if f, err := os.Open(PROC_MOUNTS); err != nil {
90                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
91         } else {
92                 scanner := bufio.NewScanner(f)
93                 for scanner.Scan() {
94                         args := strings.Fields(scanner.Text())
95                         dev, mount := args[0], args[1]
96                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
97                                 keep := mount + "/keep"
98                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
99                                         vols = append(vols, keep)
100                                 }
101                         }
102                 }
103                 if err := scanner.Err(); err != nil {
104                         log.Fatal(err)
105                 }
106         }
107         return vols
108 }
109
110 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
111         hash := mux.Vars(req)["hash"]
112
113         block, err := GetBlock(hash)
114         if err != nil {
115                 http.Error(w, err.Error(), 404)
116                 return
117         }
118
119         _, err = w.Write(block)
120         if err != nil {
121                 log.Printf("GetBlockHandler: writing response: %s", err)
122         }
123
124         return
125 }
126
127 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
128         hash := mux.Vars(req)["hash"]
129
130         // Read the block data to be stored.
131         // TODO(twp): decide what to do when the input stream contains
132         // more than BLOCKSIZE bytes.
133         //
134         buf := make([]byte, BLOCKSIZE)
135         if nread, err := req.Body.Read(buf); err == nil {
136                 if err := PutBlock(buf[:nread], hash); err == nil {
137                         w.WriteHeader(http.StatusOK)
138                 } else {
139                         ke := err.(*KeepError)
140                         http.Error(w, ke.Error(), ke.HTTPCode)
141                 }
142         } else {
143                 log.Println("error reading request: ", err)
144                 http.Error(w, err.Error(), 500)
145         }
146 }
147
148 func GetBlock(hash string) ([]byte, error) {
149         var buf = make([]byte, BLOCKSIZE)
150
151         // Attempt to read the requested hash from a keep volume.
152         for _, vol := range KeepVolumes {
153                 var f *os.File
154                 var err error
155                 var nread int
156
157                 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
158
159                 f, err = os.Open(blockFilename)
160                 if err != nil {
161                         if !os.IsNotExist(err) {
162                                 // A block is stored on only one Keep disk,
163                                 // so os.IsNotExist is expected.  Report any other errors.
164                                 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
165                         }
166                         continue
167                 }
168
169                 nread, err = f.Read(buf)
170                 if err != nil {
171                         log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
172                         continue
173                 }
174
175                 // Double check the file checksum.
176                 //
177                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
178                 if filehash != hash {
179                         // TODO(twp): this condition probably represents a bad disk and
180                         // should raise major alarm bells for an administrator: e.g.
181                         // they should be sent directly to an event manager at high
182                         // priority or logged as urgent problems.
183                         //
184                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
185                                 vol, blockFilename, filehash)
186                         return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
187                 }
188
189                 // Success!
190                 return buf[:nread], nil
191         }
192
193         log.Printf("%s: not found on any volumes, giving up\n", hash)
194         return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
195 }
196
197 /* PutBlock(block, hash)
198    Stores the BLOCK (identified by the content id HASH) in Keep.
199
200    The MD5 checksum of the block must be identical to the content id HASH.
201    If not, an error is returned.
202
203    PutBlock stores the BLOCK on the first Keep volume with free space.
204    A failure code is returned to the user only if all volumes fail.
205
206    On success, PutBlock returns nil.
207    On failure, it returns a KeepError with one of the following codes:
208
209    400 Collision
210           A different block with the same hash already exists on this
211           Keep server.
212    401 MD5Fail
213           The MD5 hash of the BLOCK does not match the argument HASH.
214    503 Full
215           There was not enough space left in any Keep volume to store
216           the object.
217    500 Fail
218           The object could not be stored for some other reason (e.g.
219           all writes failed). The text of the error message should
220           provide as much detail as possible.
221 */
222
223 func PutBlock(block []byte, hash string) error {
224         // Check that BLOCK's checksum matches HASH.
225         blockhash := fmt.Sprintf("%x", md5.Sum(block))
226         if blockhash != hash {
227                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
228                 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
229         }
230
231         // If we already have a block on disk under this identifier, return
232         // success (but check for MD5 collisions).
233         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
234         // In either case, we want to write our new (good) block to disk, so there is
235         // nothing special to do if err != nil.
236         if oldblock, err := GetBlock(hash); err == nil {
237                 if bytes.Compare(block, oldblock) == 0 {
238                         return nil
239                 } else {
240                         return &KeepError{ErrCollision, errors.New("Collision")}
241                 }
242         }
243
244         // Store the block on the first available Keep volume.
245         allFull := true
246         for _, vol := range KeepVolumes {
247                 if IsFull(vol) {
248                         continue
249                 }
250                 allFull = false
251                 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
252                 if err := os.MkdirAll(blockDir, 0755); err != nil {
253                         log.Printf("%s: could not create directory %s: %s",
254                                 hash, blockDir, err)
255                         continue
256                 }
257
258                 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
259                 if tmperr != nil {
260                         log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
261                         continue
262                 }
263                 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
264
265                 if _, err := tmpfile.Write(block); err != nil {
266                         log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
267                         continue
268                 }
269                 if err := tmpfile.Close(); err != nil {
270                         log.Printf("closing %s: %s\n", tmpfile.Name(), err)
271                         os.Remove(tmpfile.Name())
272                         continue
273                 }
274                 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
275                         log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
276                         os.Remove(tmpfile.Name())
277                         continue
278                 }
279                 return nil
280         }
281
282         if allFull {
283                 log.Printf("all Keep volumes full")
284                 return &KeepError{ErrFull, errors.New("Full")}
285         } else {
286                 log.Printf("all Keep volumes failed")
287                 return &KeepError{ErrOther, errors.New("Fail")}
288         }
289 }
290
291 func IsFull(volume string) (isFull bool) {
292         fullSymlink := volume + "/full"
293
294         // Check if the volume has been marked as full in the last hour.
295         if link, err := os.Readlink(fullSymlink); err == nil {
296                 if ts, err := strconv.Atoi(link); err == nil {
297                         fulltime := time.Unix(int64(ts), 0)
298                         if time.Since(fulltime).Hours() < 1.0 {
299                                 return true
300                         }
301                 }
302         }
303
304         if avail, err := FreeDiskSpace(volume); err == nil {
305                 isFull = avail < MIN_FREE_KILOBYTES
306         } else {
307                 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
308                 isFull = false
309         }
310
311         // If the volume is full, timestamp it.
312         if isFull {
313                 now := fmt.Sprintf("%d", time.Now().Unix())
314                 os.Symlink(now, fullSymlink)
315         }
316         return
317 }
318
319 // FreeDiskSpace(volume)
320 //     Returns the amount of available disk space on VOLUME,
321 //     as a number of 1k blocks.
322 //
323 func FreeDiskSpace(volume string) (free uint64, err error) {
324         var fs syscall.Statfs_t
325         err = syscall.Statfs(volume, &fs)
326         if err == nil {
327                 // Statfs output is not guaranteed to measure free
328                 // space in terms of 1K blocks.
329                 free = fs.Bavail * uint64(fs.Bsize) / 1024
330         }
331
332         return
333 }