Merge branch '8886-async-permission-update' refs #8886
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/httpserver"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "log"
11         "net"
12         "net/http"
13         "os"
14         "os/signal"
15         "strings"
16         "syscall"
17         "time"
18 )
19
20 // ======================
21 // Configuration settings
22 //
23 // TODO(twp): make all of these configurable via command line flags
24 // and/or configuration file settings.
25
26 // Default TCP address on which to listen for requests.
27 // Initialized by the --listen flag.
28 const DefaultAddr = ":25107"
29
30 // A Keep "block" is 64MB.
31 const BlockSize = 64 * 1024 * 1024
32
33 // A Keep volume must have at least MinFreeKilobytes available
34 // in order to permit writes.
35 const MinFreeKilobytes = BlockSize / 1024
36
37 // ProcMounts /proc/mounts
38 var ProcMounts = "/proc/mounts"
39
40 // enforcePermissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the -enforce-permissions flag.
43 var enforcePermissions bool
44
45 // blobSignatureTTL is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the -permission-ttl flag.
48 var blobSignatureTTL time.Duration
49
50 // dataManagerToken represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the -data-manager-token-file flag.
53 var dataManagerToken string
54
55 // neverDelete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var neverDelete = true
58
59 // trashLifetime is the time duration after a block is trashed
60 // during which it can be recovered using an /untrash request
61 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
62 var trashLifetime time.Duration
63
64 // trashCheckInterval is the time duration at which the emptyTrash goroutine
65 // will check and delete expired trashed blocks. Default is one day.
66 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
67 var trashCheckInterval time.Duration
68
69 var maxBuffers = 128
70 var bufs *bufferPool
71
72 // KeepError types.
73 //
74 type KeepError struct {
75         HTTPCode int
76         ErrMsg   string
77 }
78
79 var (
80         BadRequestError     = &KeepError{400, "Bad Request"}
81         UnauthorizedError   = &KeepError{401, "Unauthorized"}
82         CollisionError      = &KeepError{500, "Collision"}
83         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
84         PermissionError     = &KeepError{403, "Forbidden"}
85         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
86         ExpiredError        = &KeepError{401, "Expired permission signature"}
87         NotFoundError       = &KeepError{404, "Not Found"}
88         GenericError        = &KeepError{500, "Fail"}
89         FullError           = &KeepError{503, "Full"}
90         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
91         TooLongError        = &KeepError{413, "Block is too large"}
92         MethodDisabledError = &KeepError{405, "Method disabled"}
93         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
94         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
95 )
96
97 func (e *KeepError) Error() string {
98         return e.ErrMsg
99 }
100
101 // ========================
102 // Internal data structures
103 //
104 // These global variables are used by multiple parts of the
105 // program. They are good candidates for moving into their own
106 // packages.
107
108 // The Keep VolumeManager maintains a list of available volumes.
109 // Initialized by the --volumes flag (or by FindKeepVolumes).
110 var KeepVM VolumeManager
111
112 // The pull list manager and trash queue are threadsafe queues which
113 // support atomic update operations. The PullHandler and TrashHandler
114 // store results from Data Manager /pull and /trash requests here.
115 //
116 // See the Keep and Data Manager design documents for more details:
117 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
118 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
119 //
120 var pullq *WorkQueue
121 var trashq *WorkQueue
122
123 type volumeSet []Volume
124
125 var (
126         flagSerializeIO bool
127         flagReadonly    bool
128         volumes         volumeSet
129 )
130
131 func (vs *volumeSet) String() string {
132         return fmt.Sprintf("%+v", (*vs)[:])
133 }
134
135 // TODO(twp): continue moving as much code as possible out of main
136 // so it can be effectively tested. Esp. handling and postprocessing
137 // of command line flags (identifying Keep volumes and initializing
138 // permission arguments).
139
140 func main() {
141         log.Println("keepstore starting, pid", os.Getpid())
142         defer log.Println("keepstore exiting, pid", os.Getpid())
143
144         var (
145                 dataManagerTokenFile string
146                 listen               string
147                 blobSigningKeyFile   string
148                 permissionTTLSec     int
149                 pidfile              string
150                 maxRequests          int
151         )
152         flag.StringVar(
153                 &dataManagerTokenFile,
154                 "data-manager-token-file",
155                 "",
156                 "File with the API token used by the Data Manager. All DELETE "+
157                         "requests or GET /index requests must carry this token.")
158         flag.BoolVar(
159                 &enforcePermissions,
160                 "enforce-permissions",
161                 false,
162                 "Enforce permission signatures on requests.")
163         flag.StringVar(
164                 &listen,
165                 "listen",
166                 DefaultAddr,
167                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
168         flag.IntVar(
169                 &maxRequests,
170                 "max-requests",
171                 0,
172                 "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
173         flag.BoolVar(
174                 &neverDelete,
175                 "never-delete",
176                 true,
177                 "If true, nothing will be deleted. "+
178                         "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
179                         "You should leave this option alone unless you can afford to lose data.")
180         flag.StringVar(
181                 &blobSigningKeyFile,
182                 "permission-key-file",
183                 "",
184                 "Synonym for -blob-signing-key-file.")
185         flag.StringVar(
186                 &blobSigningKeyFile,
187                 "blob-signing-key-file",
188                 "",
189                 "File containing the secret key for generating and verifying "+
190                         "blob permission signatures.")
191         flag.IntVar(
192                 &permissionTTLSec,
193                 "permission-ttl",
194                 0,
195                 "Synonym for -blob-signature-ttl.")
196         flag.IntVar(
197                 &permissionTTLSec,
198                 "blob-signature-ttl",
199                 int(time.Duration(2*7*24*time.Hour).Seconds()),
200                 "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
201                         "See services/api/config/application.default.yml.")
202         flag.BoolVar(
203                 &flagSerializeIO,
204                 "serialize",
205                 false,
206                 "Serialize read and write operations on the following volumes.")
207         flag.BoolVar(
208                 &flagReadonly,
209                 "readonly",
210                 false,
211                 "Do not write, delete, or touch anything on the following volumes.")
212         flag.StringVar(
213                 &pidfile,
214                 "pid",
215                 "",
216                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
217         flag.IntVar(
218                 &maxBuffers,
219                 "max-buffers",
220                 maxBuffers,
221                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
222         flag.DurationVar(
223                 &trashLifetime,
224                 "trash-lifetime",
225                 0*time.Second,
226                 "Time duration after a block is trashed during which it can be recovered using an /untrash request")
227         flag.DurationVar(
228                 &trashCheckInterval,
229                 "trash-check-interval",
230                 24*time.Hour,
231                 "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
232
233         flag.Parse()
234
235         if maxBuffers < 0 {
236                 log.Fatal("-max-buffers must be greater than zero.")
237         }
238         bufs = newBufferPool(maxBuffers, BlockSize)
239
240         if pidfile != "" {
241                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
242                 if err != nil {
243                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
244                 }
245                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
246                 if err != nil {
247                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
248                 }
249                 err = f.Truncate(0)
250                 if err != nil {
251                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
252                 }
253                 _, err = fmt.Fprint(f, os.Getpid())
254                 if err != nil {
255                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
256                 }
257                 err = f.Sync()
258                 if err != nil {
259                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
260                 }
261                 defer f.Close()
262                 defer os.Remove(pidfile)
263         }
264
265         if len(volumes) == 0 {
266                 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
267                         log.Fatal("No volumes found.")
268                 }
269         }
270
271         for _, v := range volumes {
272                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
273         }
274
275         // Initialize data manager token and permission key.
276         // If these tokens are specified but cannot be read,
277         // raise a fatal error.
278         if dataManagerTokenFile != "" {
279                 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
280                         dataManagerToken = strings.TrimSpace(string(buf))
281                 } else {
282                         log.Fatalf("reading data manager token: %s\n", err)
283                 }
284         }
285
286         if neverDelete != true {
287                 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
288                         "been extensively tested. You should leave this option alone unless you can afford to lose data.")
289         }
290
291         if blobSigningKeyFile != "" {
292                 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
293                         PermissionSecret = bytes.TrimSpace(buf)
294                 } else {
295                         log.Fatalf("reading permission key: %s\n", err)
296                 }
297         }
298
299         blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
300
301         if PermissionSecret == nil {
302                 if enforcePermissions {
303                         log.Fatal("-enforce-permissions requires a permission key")
304                 } else {
305                         log.Println("Running without a PermissionSecret. Block locators " +
306                                 "returned by this server will not be signed, and will be rejected " +
307                                 "by a server that enforces permissions.")
308                         log.Println("To fix this, use the -blob-signing-key-file flag " +
309                                 "to specify the file containing the permission key.")
310                 }
311         }
312
313         if maxRequests <= 0 {
314                 maxRequests = maxBuffers * 2
315                 log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
316         }
317
318         // Start a round-robin VolumeManager with the volumes we have found.
319         KeepVM = MakeRRVolumeManager(volumes)
320
321         // Middleware stack: logger, maxRequests limiter, method handlers
322         http.Handle("/", &LoggingRESTRouter{
323                 httpserver.NewRequestLimiter(maxRequests,
324                         MakeRESTRouter()),
325         })
326
327         // Set up a TCP listener.
328         listener, err := net.Listen("tcp", listen)
329         if err != nil {
330                 log.Fatal(err)
331         }
332
333         // Initialize Pull queue and worker
334         keepClient := &keepclient.KeepClient{
335                 Arvados:       nil,
336                 Want_replicas: 1,
337                 Client:        &http.Client{},
338         }
339
340         // Initialize the pullq and worker
341         pullq = NewWorkQueue()
342         go RunPullWorker(pullq, keepClient)
343
344         // Initialize the trashq and worker
345         trashq = NewWorkQueue()
346         go RunTrashWorker(trashq)
347
348         // Start emptyTrash goroutine
349         doneEmptyingTrash := make(chan bool)
350         go emptyTrash(doneEmptyingTrash, trashCheckInterval)
351
352         // Shut down the server gracefully (by closing the listener)
353         // if SIGTERM is received.
354         term := make(chan os.Signal, 1)
355         go func(sig <-chan os.Signal) {
356                 s := <-sig
357                 log.Println("caught signal:", s)
358                 doneEmptyingTrash <- true
359                 listener.Close()
360         }(term)
361         signal.Notify(term, syscall.SIGTERM)
362         signal.Notify(term, syscall.SIGINT)
363
364         log.Println("listening at", listen)
365         srv := &http.Server{Addr: listen}
366         srv.Serve(listener)
367 }
368
369 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
370 func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
371         ticker := time.NewTicker(trashCheckInterval)
372
373         for {
374                 select {
375                 case <-ticker.C:
376                         for _, v := range volumes {
377                                 if v.Writable() {
378                                         v.EmptyTrash()
379                                 }
380                         }
381                 case <-doneEmptyingTrash:
382                         ticker.Stop()
383                         return
384                 }
385         }
386 }