5748: Use a buffer pool instead of calling runtime.GC() during each GET.
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/signal"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 // ======================
22 // Configuration settings
23 //
24 // TODO(twp): make all of these configurable via command line flags
25 // and/or configuration file settings.
26
27 // Default TCP address on which to listen for requests.
28 // Initialized by the --listen flag.
29 const DEFAULT_ADDR = ":25107"
30
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
33
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
37
38 var PROC_MOUNTS = "/proc/mounts"
39
40 // enforce_permissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the -enforce-permissions flag.
43 var enforce_permissions bool
44
45 // blob_signature_ttl is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the -permission-ttl flag.
48 var blob_signature_ttl time.Duration
49
50 // data_manager_token represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the -data-manager-token-file flag.
53 var data_manager_token string
54
55 // never_delete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var never_delete = false
58
59 var maxBuffers = 128
60 var bufs *bufferPool
61
62 // ==========
63 // Error types.
64 //
65 type KeepError struct {
66         HTTPCode int
67         ErrMsg   string
68 }
69
70 var (
71         BadRequestError     = &KeepError{400, "Bad Request"}
72         UnauthorizedError   = &KeepError{401, "Unauthorized"}
73         CollisionError      = &KeepError{500, "Collision"}
74         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
75         PermissionError     = &KeepError{403, "Forbidden"}
76         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
77         ExpiredError        = &KeepError{401, "Expired permission signature"}
78         NotFoundError       = &KeepError{404, "Not Found"}
79         GenericError        = &KeepError{500, "Fail"}
80         FullError           = &KeepError{503, "Full"}
81         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
82         TooLongError        = &KeepError{413, "Block is too large"}
83         MethodDisabledError = &KeepError{405, "Method disabled"}
84 )
85
86 func (e *KeepError) Error() string {
87         return e.ErrMsg
88 }
89
90 // ========================
91 // Internal data structures
92 //
93 // These global variables are used by multiple parts of the
94 // program. They are good candidates for moving into their own
95 // packages.
96
97 // The Keep VolumeManager maintains a list of available volumes.
98 // Initialized by the --volumes flag (or by FindKeepVolumes).
99 var KeepVM VolumeManager
100
101 // The pull list manager and trash queue are threadsafe queues which
102 // support atomic update operations. The PullHandler and TrashHandler
103 // store results from Data Manager /pull and /trash requests here.
104 //
105 // See the Keep and Data Manager design documents for more details:
106 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
107 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
108 //
109 var pullq *WorkQueue
110 var trashq *WorkQueue
111
112 var (
113         flagSerializeIO bool
114         flagReadonly    bool
115 )
116
117 type volumeSet []Volume
118
119 func (vs *volumeSet) Set(value string) error {
120         if dirs := strings.Split(value, ","); len(dirs) > 1 {
121                 log.Print("DEPRECATED: using comma-separated volume list.")
122                 for _, dir := range dirs {
123                         if err := vs.Set(dir); err != nil {
124                                 return err
125                         }
126                 }
127                 return nil
128         }
129         if len(value) == 0 || value[0] != '/' {
130                 return errors.New("Invalid volume: must begin with '/'.")
131         }
132         if _, err := os.Stat(value); err != nil {
133                 return err
134         }
135         *vs = append(*vs, &UnixVolume{
136                 root:      value,
137                 serialize: flagSerializeIO,
138                 readonly:  flagReadonly,
139         })
140         return nil
141 }
142
143 func (vs *volumeSet) String() string {
144         s := "["
145         for i, v := range *vs {
146                 if i > 0 {
147                         s = s + " "
148                 }
149                 s = s + v.String()
150         }
151         return s + "]"
152 }
153
154 // Discover adds a volume for every directory named "keep" that is
155 // located at the top level of a device- or tmpfs-backed mount point
156 // other than "/". It returns the number of volumes added.
157 func (vs *volumeSet) Discover() int {
158         added := 0
159         f, err := os.Open(PROC_MOUNTS)
160         if err != nil {
161                 log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
162         }
163         scanner := bufio.NewScanner(f)
164         for scanner.Scan() {
165                 args := strings.Fields(scanner.Text())
166                 if err := scanner.Err(); err != nil {
167                         log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
168                 }
169                 dev, mount := args[0], args[1]
170                 if mount == "/" {
171                         continue
172                 }
173                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
174                         continue
175                 }
176                 keepdir := mount + "/keep"
177                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
178                         continue
179                 }
180                 // Set the -readonly flag (but only for this volume)
181                 // if the filesystem is mounted readonly.
182                 flagReadonlyWas := flagReadonly
183                 for _, fsopt := range strings.Split(args[3], ",") {
184                         if fsopt == "ro" {
185                                 flagReadonly = true
186                                 break
187                         }
188                         if fsopt == "rw" {
189                                 break
190                         }
191                 }
192                 vs.Set(keepdir)
193                 flagReadonly = flagReadonlyWas
194                 added++
195         }
196         return added
197 }
198
199 // TODO(twp): continue moving as much code as possible out of main
200 // so it can be effectively tested. Esp. handling and postprocessing
201 // of command line flags (identifying Keep volumes and initializing
202 // permission arguments).
203
204 func main() {
205         log.Println("Keep started: pid", os.Getpid())
206
207         var (
208                 data_manager_token_file string
209                 listen                  string
210                 blob_signing_key_file   string
211                 permission_ttl_sec      int
212                 volumes                 volumeSet
213                 pidfile                 string
214         )
215         flag.StringVar(
216                 &data_manager_token_file,
217                 "data-manager-token-file",
218                 "",
219                 "File with the API token used by the Data Manager. All DELETE "+
220                         "requests or GET /index requests must carry this token.")
221         flag.BoolVar(
222                 &enforce_permissions,
223                 "enforce-permissions",
224                 false,
225                 "Enforce permission signatures on requests.")
226         flag.StringVar(
227                 &listen,
228                 "listen",
229                 DEFAULT_ADDR,
230                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
231         flag.BoolVar(
232                 &never_delete,
233                 "never-delete",
234                 false,
235                 "If set, nothing will be deleted. HTTP 405 will be returned "+
236                         "for valid DELETE requests.")
237         flag.StringVar(
238                 &blob_signing_key_file,
239                 "permission-key-file",
240                 "",
241                 "Synonym for -blob-signing-key-file.")
242         flag.StringVar(
243                 &blob_signing_key_file,
244                 "blob-signing-key-file",
245                 "",
246                 "File containing the secret key for generating and verifying "+
247                         "blob permission signatures.")
248         flag.IntVar(
249                 &permission_ttl_sec,
250                 "permission-ttl",
251                 0,
252                 "Synonym for -blob-signature-ttl.")
253         flag.IntVar(
254                 &permission_ttl_sec,
255                 "blob-signature-ttl",
256                 int(time.Duration(2*7*24*time.Hour).Seconds()),
257                 "Lifetime of blob permission signatures. "+
258                         "See services/api/config/application.default.yml.")
259         flag.BoolVar(
260                 &flagSerializeIO,
261                 "serialize",
262                 false,
263                 "Serialize read and write operations on the following volumes.")
264         flag.BoolVar(
265                 &flagReadonly,
266                 "readonly",
267                 false,
268                 "Do not write, delete, or touch anything on the following volumes.")
269         flag.Var(
270                 &volumes,
271                 "volumes",
272                 "Deprecated synonym for -volume.")
273         flag.Var(
274                 &volumes,
275                 "volume",
276                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
277         flag.StringVar(
278                 &pidfile,
279                 "pid",
280                 "",
281                 "Path to write pid file")
282         flag.IntVar(
283                 &maxBuffers,
284                 "max-buffers",
285                 maxBuffers,
286                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
287
288         flag.Parse()
289
290         if maxBuffers < 0 {
291                 log.Fatal("-max-buffers must be greater than zero.")
292         }
293         bufs = newBufferPool(maxBuffers, BLOCKSIZE)
294
295         if len(volumes) == 0 {
296                 if volumes.Discover() == 0 {
297                         log.Fatal("No volumes found.")
298                 }
299         }
300
301         for _, v := range volumes {
302                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
303         }
304
305         // Initialize data manager token and permission key.
306         // If these tokens are specified but cannot be read,
307         // raise a fatal error.
308         if data_manager_token_file != "" {
309                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
310                         data_manager_token = strings.TrimSpace(string(buf))
311                 } else {
312                         log.Fatalf("reading data manager token: %s\n", err)
313                 }
314         }
315         if blob_signing_key_file != "" {
316                 if buf, err := ioutil.ReadFile(blob_signing_key_file); err == nil {
317                         PermissionSecret = bytes.TrimSpace(buf)
318                 } else {
319                         log.Fatalf("reading permission key: %s\n", err)
320                 }
321         }
322
323         blob_signature_ttl = time.Duration(permission_ttl_sec) * time.Second
324
325         if PermissionSecret == nil {
326                 if enforce_permissions {
327                         log.Fatal("-enforce-permissions requires a permission key")
328                 } else {
329                         log.Println("Running without a PermissionSecret. Block locators " +
330                                 "returned by this server will not be signed, and will be rejected " +
331                                 "by a server that enforces permissions.")
332                         log.Println("To fix this, use the -permission-key-file flag " +
333                                 "to specify the file containing the permission key.")
334                 }
335         }
336
337         // Start a round-robin VolumeManager with the volumes we have found.
338         KeepVM = MakeRRVolumeManager(volumes)
339
340         // Tell the built-in HTTP server to direct all requests to the REST router.
341         loggingRouter := MakeLoggingRESTRouter()
342         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
343                 loggingRouter.ServeHTTP(resp, req)
344         })
345
346         // Set up a TCP listener.
347         listener, err := net.Listen("tcp", listen)
348         if err != nil {
349                 log.Fatal(err)
350         }
351
352         // Initialize Pull queue and worker
353         keepClient := &keepclient.KeepClient{
354                 Arvados:       nil,
355                 Want_replicas: 1,
356                 Using_proxy:   true,
357                 Client:        &http.Client{},
358         }
359
360         // Initialize the pullq and worker
361         pullq = NewWorkQueue()
362         go RunPullWorker(pullq, keepClient)
363
364         // Initialize the trashq and worker
365         trashq = NewWorkQueue()
366         go RunTrashWorker(trashq)
367
368         // Shut down the server gracefully (by closing the listener)
369         // if SIGTERM is received.
370         term := make(chan os.Signal, 1)
371         go func(sig <-chan os.Signal) {
372                 s := <-sig
373                 log.Println("caught signal:", s)
374                 listener.Close()
375         }(term)
376         signal.Notify(term, syscall.SIGTERM)
377
378         if pidfile != "" {
379                 f, err := os.Create(pidfile)
380                 if err == nil {
381                         fmt.Fprint(f, os.Getpid())
382                         f.Close()
383                 } else {
384                         log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
385                 }
386         }
387
388         // Start listening for requests.
389         srv := &http.Server{Addr: listen}
390         srv.Serve(listener)
391
392         log.Println("shutting down")
393
394         if pidfile != "" {
395                 os.Remove(pidfile)
396         }
397 }