Merge branch '3967-improve-keepstore-logging'
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "io/ioutil"
8         "log"
9         "net"
10         "net/http"
11         "os"
12         "os/signal"
13         "strings"
14         "syscall"
15         "time"
16 )
17
18 // ======================
19 // Configuration settings
20 //
21 // TODO(twp): make all of these configurable via command line flags
22 // and/or configuration file settings.
23
24 // Default TCP address on which to listen for requests.
25 // Initialized by the --listen flag.
26 const DEFAULT_ADDR = ":25107"
27
28 // A Keep "block" is 64MB.
29 const BLOCKSIZE = 64 * 1024 * 1024
30
31 // A Keep volume must have at least MIN_FREE_KILOBYTES available
32 // in order to permit writes.
33 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
34
35 var PROC_MOUNTS = "/proc/mounts"
36
37 // enforce_permissions controls whether permission signatures
38 // should be enforced (affecting GET and DELETE requests).
39 // Initialized by the --enforce-permissions flag.
40 var enforce_permissions bool
41
42 // permission_ttl is the time duration for which new permission
43 // signatures (returned by PUT requests) will be valid.
44 // Initialized by the --permission-ttl flag.
45 var permission_ttl time.Duration
46
47 // data_manager_token represents the API token used by the
48 // Data Manager, and is required on certain privileged operations.
49 // Initialized by the --data-manager-token-file flag.
50 var data_manager_token string
51
52 // never_delete can be used to prevent the DELETE handler from
53 // actually deleting anything.
54 var never_delete = false
55
56 // ==========
57 // Error types.
58 //
59 type KeepError struct {
60         HTTPCode int
61         ErrMsg   string
62 }
63
64 var (
65         BadRequestError     = &KeepError{400, "Bad Request"}
66         UnauthorizedError   = &KeepError{401, "Unauthorized"}
67         CollisionError      = &KeepError{500, "Collision"}
68         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
69         PermissionError     = &KeepError{403, "Forbidden"}
70         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
71         ExpiredError        = &KeepError{401, "Expired permission signature"}
72         NotFoundError       = &KeepError{404, "Not Found"}
73         GenericError        = &KeepError{500, "Fail"}
74         FullError           = &KeepError{503, "Full"}
75         TooLongError        = &KeepError{504, "Timeout"}
76         MethodDisabledError = &KeepError{405, "Method disabled"}
77 )
78
79 func (e *KeepError) Error() string {
80         return e.ErrMsg
81 }
82
83 // ========================
84 // Internal data structures
85 //
86 // These global variables are used by multiple parts of the
87 // program. They are good candidates for moving into their own
88 // packages.
89
90 // The Keep VolumeManager maintains a list of available volumes.
91 // Initialized by the --volumes flag (or by FindKeepVolumes).
92 var KeepVM VolumeManager
93
94 // The pull list queue is a singleton pull list (a list of blocks
95 // that the current keepstore process should be pulling from remote
96 // keepstore servers in order to increase data replication) with
97 // atomic update methods that are safe to use from multiple
98 // goroutines.
99 var pullq *WorkQueue
100
101 // TODO(twp): continue moving as much code as possible out of main
102 // so it can be effectively tested. Esp. handling and postprocessing
103 // of command line flags (identifying Keep volumes and initializing
104 // permission arguments).
105
106 func main() {
107         log.Println("Keep started: pid", os.Getpid())
108
109         // Parse command-line flags:
110         //
111         // -listen=ipaddr:port
112         //    Interface on which to listen for requests. Use :port without
113         //    an ipaddr to listen on all network interfaces.
114         //    Examples:
115         //      -listen=127.0.0.1:4949
116         //      -listen=10.0.1.24:8000
117         //      -listen=:25107 (to listen to port 25107 on all interfaces)
118         //
119         // -volumes
120         //    A comma-separated list of directories to use as Keep volumes.
121         //    Example:
122         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
123         //
124         //    If -volumes is empty or is not present, Keep will select volumes
125         //    by looking at currently mounted filesystems for /keep top-level
126         //    directories.
127
128         var (
129                 data_manager_token_file string
130                 listen                  string
131                 permission_key_file     string
132                 permission_ttl_sec      int
133                 serialize_io            bool
134                 volumearg               string
135                 pidfile                 string
136         )
137         flag.StringVar(
138                 &data_manager_token_file,
139                 "data-manager-token-file",
140                 "",
141                 "File with the API token used by the Data Manager. All DELETE "+
142                         "requests or GET /index requests must carry this token.")
143         flag.BoolVar(
144                 &enforce_permissions,
145                 "enforce-permissions",
146                 false,
147                 "Enforce permission signatures on requests.")
148         flag.StringVar(
149                 &listen,
150                 "listen",
151                 DEFAULT_ADDR,
152                 "Interface on which to listen for requests, in the format "+
153                         "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
154                         "to listen on all network interfaces.")
155         flag.BoolVar(
156                 &never_delete,
157                 "never-delete",
158                 false,
159                 "If set, nothing will be deleted. HTTP 405 will be returned "+
160                         "for valid DELETE requests.")
161         flag.StringVar(
162                 &permission_key_file,
163                 "permission-key-file",
164                 "",
165                 "File containing the secret key for generating and verifying "+
166                         "permission signatures.")
167         flag.IntVar(
168                 &permission_ttl_sec,
169                 "permission-ttl",
170                 1209600,
171                 "Expiration time (in seconds) for newly generated permission "+
172                         "signatures.")
173         flag.BoolVar(
174                 &serialize_io,
175                 "serialize",
176                 false,
177                 "If set, all read and write operations on local Keep volumes will "+
178                         "be serialized.")
179         flag.StringVar(
180                 &volumearg,
181                 "volumes",
182                 "",
183                 "Comma-separated list of directories to use for Keep volumes, "+
184                         "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
185                         "supplied, Keep will scan mounted filesystems for volumes "+
186                         "with a /keep top-level directory.")
187
188         flag.StringVar(
189                 &pidfile,
190                 "pid",
191                 "",
192                 "Path to write pid file")
193
194         flag.Parse()
195
196         // Look for local keep volumes.
197         var keepvols []string
198         if volumearg == "" {
199                 // TODO(twp): decide whether this is desirable default behavior.
200                 // In production we may want to require the admin to specify
201                 // Keep volumes explicitly.
202                 keepvols = FindKeepVolumes()
203         } else {
204                 keepvols = strings.Split(volumearg, ",")
205         }
206
207         // Check that the specified volumes actually exist.
208         var goodvols []Volume = nil
209         for _, v := range keepvols {
210                 if _, err := os.Stat(v); err == nil {
211                         log.Println("adding Keep volume:", v)
212                         newvol := MakeUnixVolume(v, serialize_io)
213                         goodvols = append(goodvols, &newvol)
214                 } else {
215                         log.Printf("bad Keep volume: %s\n", err)
216                 }
217         }
218
219         if len(goodvols) == 0 {
220                 log.Fatal("could not find any keep volumes")
221         }
222
223         // Initialize data manager token and permission key.
224         // If these tokens are specified but cannot be read,
225         // raise a fatal error.
226         if data_manager_token_file != "" {
227                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
228                         data_manager_token = strings.TrimSpace(string(buf))
229                 } else {
230                         log.Fatalf("reading data manager token: %s\n", err)
231                 }
232         }
233         if permission_key_file != "" {
234                 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
235                         PermissionSecret = bytes.TrimSpace(buf)
236                 } else {
237                         log.Fatalf("reading permission key: %s\n", err)
238                 }
239         }
240
241         // Initialize permission TTL
242         permission_ttl = time.Duration(permission_ttl_sec) * time.Second
243
244         // If --enforce-permissions is true, we must have a permission key
245         // to continue.
246         if PermissionSecret == nil {
247                 if enforce_permissions {
248                         log.Fatal("--enforce-permissions requires a permission key")
249                 } else {
250                         log.Println("Running without a PermissionSecret. Block locators " +
251                                 "returned by this server will not be signed, and will be rejected " +
252                                 "by a server that enforces permissions.")
253                         log.Println("To fix this, run Keep with --permission-key-file=<path> " +
254                                 "to define the location of a file containing the permission key.")
255                 }
256         }
257
258         // Start a round-robin VolumeManager with the volumes we have found.
259         KeepVM = MakeRRVolumeManager(goodvols)
260
261         // Tell the built-in HTTP server to direct all requests to the REST
262         // router.
263         http.Handle("/", MakeRESTRouter())
264
265         // Set up a TCP listener.
266         listener, err := net.Listen("tcp", listen)
267         if err != nil {
268                 log.Fatal(err)
269         }
270
271         // Shut down the server gracefully (by closing the listener)
272         // if SIGTERM is received.
273         term := make(chan os.Signal, 1)
274         go func(sig <-chan os.Signal) {
275                 s := <-sig
276                 log.Println("caught signal:", s)
277                 listener.Close()
278         }(term)
279         signal.Notify(term, syscall.SIGTERM)
280
281         if pidfile != "" {
282                 f, err := os.Create(pidfile)
283                 if err == nil {
284                         fmt.Fprint(f, os.Getpid())
285                         f.Close()
286                 } else {
287                         log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
288                 }
289         }
290
291         // Start listening for requests.
292         srv := &http.Server{Addr: listen}
293         srv.Serve(listener)
294
295         log.Println("shutting down")
296
297         if pidfile != "" {
298                 os.Remove(pidfile)
299         }
300 }