Merge branch 'master' into 3454-default-docker-image
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/signal"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 // ======================
22 // Configuration settings
23 //
24 // TODO(twp): make all of these configurable via command line flags
25 // and/or configuration file settings.
26
27 // Default TCP address on which to listen for requests.
28 // Initialized by the --listen flag.
29 const DEFAULT_ADDR = ":25107"
30
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
33
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
37
38 var PROC_MOUNTS = "/proc/mounts"
39
40 // enforce_permissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the -enforce-permissions flag.
43 var enforce_permissions bool
44
45 // blob_signature_ttl is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the -permission-ttl flag.
48 var blob_signature_ttl time.Duration
49
50 // data_manager_token represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the -data-manager-token-file flag.
53 var data_manager_token string
54
55 // never_delete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var never_delete = false
58
59 var maxBuffers = 128
60 var bufs *bufferPool
61
62 // ==========
63 // Error types.
64 //
65 type KeepError struct {
66         HTTPCode int
67         ErrMsg   string
68 }
69
70 var (
71         BadRequestError     = &KeepError{400, "Bad Request"}
72         UnauthorizedError   = &KeepError{401, "Unauthorized"}
73         CollisionError      = &KeepError{500, "Collision"}
74         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
75         PermissionError     = &KeepError{403, "Forbidden"}
76         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
77         ExpiredError        = &KeepError{401, "Expired permission signature"}
78         NotFoundError       = &KeepError{404, "Not Found"}
79         GenericError        = &KeepError{500, "Fail"}
80         FullError           = &KeepError{503, "Full"}
81         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
82         TooLongError        = &KeepError{413, "Block is too large"}
83         MethodDisabledError = &KeepError{405, "Method disabled"}
84 )
85
86 func (e *KeepError) Error() string {
87         return e.ErrMsg
88 }
89
90 // ========================
91 // Internal data structures
92 //
93 // These global variables are used by multiple parts of the
94 // program. They are good candidates for moving into their own
95 // packages.
96
97 // The Keep VolumeManager maintains a list of available volumes.
98 // Initialized by the --volumes flag (or by FindKeepVolumes).
99 var KeepVM VolumeManager
100
101 // The pull list manager and trash queue are threadsafe queues which
102 // support atomic update operations. The PullHandler and TrashHandler
103 // store results from Data Manager /pull and /trash requests here.
104 //
105 // See the Keep and Data Manager design documents for more details:
106 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
107 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
108 //
109 var pullq *WorkQueue
110 var trashq *WorkQueue
111
112 var (
113         flagSerializeIO bool
114         flagReadonly    bool
115 )
116
117 type volumeSet []Volume
118
119 func (vs *volumeSet) Set(value string) error {
120         if dirs := strings.Split(value, ","); len(dirs) > 1 {
121                 log.Print("DEPRECATED: using comma-separated volume list.")
122                 for _, dir := range dirs {
123                         if err := vs.Set(dir); err != nil {
124                                 return err
125                         }
126                 }
127                 return nil
128         }
129         if len(value) == 0 || value[0] != '/' {
130                 return errors.New("Invalid volume: must begin with '/'.")
131         }
132         if _, err := os.Stat(value); err != nil {
133                 return err
134         }
135         *vs = append(*vs, &UnixVolume{
136                 root:      value,
137                 serialize: flagSerializeIO,
138                 readonly:  flagReadonly,
139         })
140         return nil
141 }
142
143 func (vs *volumeSet) String() string {
144         s := "["
145         for i, v := range *vs {
146                 if i > 0 {
147                         s = s + " "
148                 }
149                 s = s + v.String()
150         }
151         return s + "]"
152 }
153
154 // Discover adds a volume for every directory named "keep" that is
155 // located at the top level of a device- or tmpfs-backed mount point
156 // other than "/". It returns the number of volumes added.
157 func (vs *volumeSet) Discover() int {
158         added := 0
159         f, err := os.Open(PROC_MOUNTS)
160         if err != nil {
161                 log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
162         }
163         scanner := bufio.NewScanner(f)
164         for scanner.Scan() {
165                 args := strings.Fields(scanner.Text())
166                 if err := scanner.Err(); err != nil {
167                         log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
168                 }
169                 dev, mount := args[0], args[1]
170                 if mount == "/" {
171                         continue
172                 }
173                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
174                         continue
175                 }
176                 keepdir := mount + "/keep"
177                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
178                         continue
179                 }
180                 // Set the -readonly flag (but only for this volume)
181                 // if the filesystem is mounted readonly.
182                 flagReadonlyWas := flagReadonly
183                 for _, fsopt := range strings.Split(args[3], ",") {
184                         if fsopt == "ro" {
185                                 flagReadonly = true
186                                 break
187                         }
188                         if fsopt == "rw" {
189                                 break
190                         }
191                 }
192                 vs.Set(keepdir)
193                 flagReadonly = flagReadonlyWas
194                 added++
195         }
196         return added
197 }
198
199 // TODO(twp): continue moving as much code as possible out of main
200 // so it can be effectively tested. Esp. handling and postprocessing
201 // of command line flags (identifying Keep volumes and initializing
202 // permission arguments).
203
204 func main() {
205         log.Println("keepstore starting, pid", os.Getpid())
206         defer log.Println("keepstore exiting, pid", os.Getpid())
207
208         var (
209                 data_manager_token_file string
210                 listen                  string
211                 blob_signing_key_file   string
212                 permission_ttl_sec      int
213                 volumes                 volumeSet
214                 pidfile                 string
215         )
216         flag.StringVar(
217                 &data_manager_token_file,
218                 "data-manager-token-file",
219                 "",
220                 "File with the API token used by the Data Manager. All DELETE "+
221                         "requests or GET /index requests must carry this token.")
222         flag.BoolVar(
223                 &enforce_permissions,
224                 "enforce-permissions",
225                 false,
226                 "Enforce permission signatures on requests.")
227         flag.StringVar(
228                 &listen,
229                 "listen",
230                 DEFAULT_ADDR,
231                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
232         flag.BoolVar(
233                 &never_delete,
234                 "never-delete",
235                 false,
236                 "If set, nothing will be deleted. HTTP 405 will be returned "+
237                         "for valid DELETE requests.")
238         flag.StringVar(
239                 &blob_signing_key_file,
240                 "permission-key-file",
241                 "",
242                 "Synonym for -blob-signing-key-file.")
243         flag.StringVar(
244                 &blob_signing_key_file,
245                 "blob-signing-key-file",
246                 "",
247                 "File containing the secret key for generating and verifying "+
248                         "blob permission signatures.")
249         flag.IntVar(
250                 &permission_ttl_sec,
251                 "permission-ttl",
252                 0,
253                 "Synonym for -blob-signature-ttl.")
254         flag.IntVar(
255                 &permission_ttl_sec,
256                 "blob-signature-ttl",
257                 int(time.Duration(2*7*24*time.Hour).Seconds()),
258                 "Lifetime of blob permission signatures. "+
259                         "See services/api/config/application.default.yml.")
260         flag.BoolVar(
261                 &flagSerializeIO,
262                 "serialize",
263                 false,
264                 "Serialize read and write operations on the following volumes.")
265         flag.BoolVar(
266                 &flagReadonly,
267                 "readonly",
268                 false,
269                 "Do not write, delete, or touch anything on the following volumes.")
270         flag.Var(
271                 &volumes,
272                 "volumes",
273                 "Deprecated synonym for -volume.")
274         flag.Var(
275                 &volumes,
276                 "volume",
277                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
278         flag.StringVar(
279                 &pidfile,
280                 "pid",
281                 "",
282                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
283         flag.IntVar(
284                 &maxBuffers,
285                 "max-buffers",
286                 maxBuffers,
287                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
288
289         flag.Parse()
290
291         if maxBuffers < 0 {
292                 log.Fatal("-max-buffers must be greater than zero.")
293         }
294         bufs = newBufferPool(maxBuffers, BLOCKSIZE)
295
296         if pidfile != "" {
297                 f, err := os.OpenFile(pidfile, os.O_RDWR | os.O_CREATE, 0777)
298                 if err != nil {
299                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
300                 }
301                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX | syscall.LOCK_NB)
302                 if err != nil {
303                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
304                 }
305                 err = f.Truncate(0)
306                 if err != nil {
307                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
308                 }
309                 _, err = fmt.Fprint(f, os.Getpid())
310                 if err != nil {
311                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
312                 }
313                 err = f.Sync()
314                 if err != nil {
315                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
316                 }
317                 defer f.Close()
318                 defer os.Remove(pidfile)
319         }
320
321         if len(volumes) == 0 {
322                 if volumes.Discover() == 0 {
323                         log.Fatal("No volumes found.")
324                 }
325         }
326
327         for _, v := range volumes {
328                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
329         }
330
331         // Initialize data manager token and permission key.
332         // If these tokens are specified but cannot be read,
333         // raise a fatal error.
334         if data_manager_token_file != "" {
335                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
336                         data_manager_token = strings.TrimSpace(string(buf))
337                 } else {
338                         log.Fatalf("reading data manager token: %s\n", err)
339                 }
340         }
341         if blob_signing_key_file != "" {
342                 if buf, err := ioutil.ReadFile(blob_signing_key_file); err == nil {
343                         PermissionSecret = bytes.TrimSpace(buf)
344                 } else {
345                         log.Fatalf("reading permission key: %s\n", err)
346                 }
347         }
348
349         blob_signature_ttl = time.Duration(permission_ttl_sec) * time.Second
350
351         if PermissionSecret == nil {
352                 if enforce_permissions {
353                         log.Fatal("-enforce-permissions requires a permission key")
354                 } else {
355                         log.Println("Running without a PermissionSecret. Block locators " +
356                                 "returned by this server will not be signed, and will be rejected " +
357                                 "by a server that enforces permissions.")
358                         log.Println("To fix this, use the -blob-signing-key-file flag " +
359                                 "to specify the file containing the permission key.")
360                 }
361         }
362
363         // Start a round-robin VolumeManager with the volumes we have found.
364         KeepVM = MakeRRVolumeManager(volumes)
365
366         // Tell the built-in HTTP server to direct all requests to the REST router.
367         loggingRouter := MakeLoggingRESTRouter()
368         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
369                 loggingRouter.ServeHTTP(resp, req)
370         })
371
372         // Set up a TCP listener.
373         listener, err := net.Listen("tcp", listen)
374         if err != nil {
375                 log.Fatal(err)
376         }
377
378         // Initialize Pull queue and worker
379         keepClient := &keepclient.KeepClient{
380                 Arvados:       nil,
381                 Want_replicas: 1,
382                 Using_proxy:   true,
383                 Client:        &http.Client{},
384         }
385
386         // Initialize the pullq and worker
387         pullq = NewWorkQueue()
388         go RunPullWorker(pullq, keepClient)
389
390         // Initialize the trashq and worker
391         trashq = NewWorkQueue()
392         go RunTrashWorker(trashq)
393
394         // Shut down the server gracefully (by closing the listener)
395         // if SIGTERM is received.
396         term := make(chan os.Signal, 1)
397         go func(sig <-chan os.Signal) {
398                 s := <-sig
399                 log.Println("caught signal:", s)
400                 listener.Close()
401         }(term)
402         signal.Notify(term, syscall.SIGTERM)
403         signal.Notify(term, syscall.SIGINT)
404
405         log.Println("listening at", listen)
406         srv := &http.Server{Addr: listen}
407         srv.Serve(listener)
408 }