93ee43c446cf96624a09a0ff7660d198cacdd3cd
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/httpserver"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "log"
11         "net"
12         "net/http"
13         "os"
14         "os/signal"
15         "strings"
16         "syscall"
17         "time"
18 )
19
20 // ======================
21 // Configuration settings
22 //
23 // TODO(twp): make all of these configurable via command line flags
24 // and/or configuration file settings.
25
26 // Default TCP address on which to listen for requests.
27 // Initialized by the --listen flag.
28 const DefaultAddr = ":25107"
29
30 // A Keep "block" is 64MB.
31 const BlockSize = 64 * 1024 * 1024
32
33 // A Keep volume must have at least MinFreeKilobytes available
34 // in order to permit writes.
35 const MinFreeKilobytes = BlockSize / 1024
36
37 // ProcMounts /proc/mounts
38 var ProcMounts = "/proc/mounts"
39
40 // enforcePermissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the -enforce-permissions flag.
43 var enforcePermissions bool
44
45 // blobSignatureTTL is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the -permission-ttl flag.
48 var blobSignatureTTL time.Duration
49
50 // dataManagerToken represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the -data-manager-token-file flag.
53 var dataManagerToken string
54
55 // neverDelete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var neverDelete = true
58
59 // trashLifetime is the time duration after a block is trashed
60 // during which it can be recovered using an /untrash request
61 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
62 var trashLifetime time.Duration
63
64 // trashCheckInterval is the time duration at which the emptyTrash goroutine
65 // will check and delete expired trashed blocks. Default is one day.
66 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
67 var trashCheckInterval time.Duration
68
69 var maxBuffers = 128
70 var bufs *bufferPool
71
72 // KeepError types.
73 //
74 type KeepError struct {
75         HTTPCode int
76         ErrMsg   string
77 }
78
79 var (
80         BadRequestError     = &KeepError{400, "Bad Request"}
81         UnauthorizedError   = &KeepError{401, "Unauthorized"}
82         CollisionError      = &KeepError{500, "Collision"}
83         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
84         PermissionError     = &KeepError{403, "Forbidden"}
85         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
86         ExpiredError        = &KeepError{401, "Expired permission signature"}
87         NotFoundError       = &KeepError{404, "Not Found"}
88         GenericError        = &KeepError{500, "Fail"}
89         FullError           = &KeepError{503, "Full"}
90         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
91         TooLongError        = &KeepError{413, "Block is too large"}
92         MethodDisabledError = &KeepError{405, "Method disabled"}
93         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
94 )
95
96 func (e *KeepError) Error() string {
97         return e.ErrMsg
98 }
99
100 // ========================
101 // Internal data structures
102 //
103 // These global variables are used by multiple parts of the
104 // program. They are good candidates for moving into their own
105 // packages.
106
107 // The Keep VolumeManager maintains a list of available volumes.
108 // Initialized by the --volumes flag (or by FindKeepVolumes).
109 var KeepVM VolumeManager
110
111 // The pull list manager and trash queue are threadsafe queues which
112 // support atomic update operations. The PullHandler and TrashHandler
113 // store results from Data Manager /pull and /trash requests here.
114 //
115 // See the Keep and Data Manager design documents for more details:
116 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
117 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
118 //
119 var pullq *WorkQueue
120 var trashq *WorkQueue
121
122 type volumeSet []Volume
123
124 var (
125         flagSerializeIO bool
126         flagReadonly    bool
127         volumes         volumeSet
128 )
129
130 func (vs *volumeSet) String() string {
131         return fmt.Sprintf("%+v", (*vs)[:])
132 }
133
134 // TODO(twp): continue moving as much code as possible out of main
135 // so it can be effectively tested. Esp. handling and postprocessing
136 // of command line flags (identifying Keep volumes and initializing
137 // permission arguments).
138
139 func main() {
140         log.Println("keepstore starting, pid", os.Getpid())
141         defer log.Println("keepstore exiting, pid", os.Getpid())
142
143         var (
144                 dataManagerTokenFile string
145                 listen               string
146                 blobSigningKeyFile   string
147                 permissionTTLSec     int
148                 pidfile              string
149                 maxRequests          int
150         )
151         flag.StringVar(
152                 &dataManagerTokenFile,
153                 "data-manager-token-file",
154                 "",
155                 "File with the API token used by the Data Manager. All DELETE "+
156                         "requests or GET /index requests must carry this token.")
157         flag.BoolVar(
158                 &enforcePermissions,
159                 "enforce-permissions",
160                 false,
161                 "Enforce permission signatures on requests.")
162         flag.StringVar(
163                 &listen,
164                 "listen",
165                 DefaultAddr,
166                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
167         flag.IntVar(
168                 &maxRequests,
169                 "max-requests",
170                 0,
171                 "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
172         flag.BoolVar(
173                 &neverDelete,
174                 "never-delete",
175                 true,
176                 "If true, nothing will be deleted. "+
177                         "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
178                         "You should leave this option alone unless you can afford to lose data.")
179         flag.StringVar(
180                 &blobSigningKeyFile,
181                 "permission-key-file",
182                 "",
183                 "Synonym for -blob-signing-key-file.")
184         flag.StringVar(
185                 &blobSigningKeyFile,
186                 "blob-signing-key-file",
187                 "",
188                 "File containing the secret key for generating and verifying "+
189                         "blob permission signatures.")
190         flag.IntVar(
191                 &permissionTTLSec,
192                 "permission-ttl",
193                 0,
194                 "Synonym for -blob-signature-ttl.")
195         flag.IntVar(
196                 &permissionTTLSec,
197                 "blob-signature-ttl",
198                 int(time.Duration(2*7*24*time.Hour).Seconds()),
199                 "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
200                         "See services/api/config/application.default.yml.")
201         flag.BoolVar(
202                 &flagSerializeIO,
203                 "serialize",
204                 false,
205                 "Serialize read and write operations on the following volumes.")
206         flag.BoolVar(
207                 &flagReadonly,
208                 "readonly",
209                 false,
210                 "Do not write, delete, or touch anything on the following volumes.")
211         flag.StringVar(
212                 &pidfile,
213                 "pid",
214                 "",
215                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
216         flag.IntVar(
217                 &maxBuffers,
218                 "max-buffers",
219                 maxBuffers,
220                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
221         flag.DurationVar(
222                 &trashLifetime,
223                 "trash-lifetime",
224                 0*time.Second,
225                 "Time duration after a block is trashed during which it can be recovered using an /untrash request")
226         flag.DurationVar(
227                 &trashCheckInterval,
228                 "trash-check-interval",
229                 24*time.Hour,
230                 "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
231
232         flag.Parse()
233
234         if maxBuffers < 0 {
235                 log.Fatal("-max-buffers must be greater than zero.")
236         }
237         bufs = newBufferPool(maxBuffers, BlockSize)
238
239         if pidfile != "" {
240                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
241                 if err != nil {
242                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
243                 }
244                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
245                 if err != nil {
246                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
247                 }
248                 err = f.Truncate(0)
249                 if err != nil {
250                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
251                 }
252                 _, err = fmt.Fprint(f, os.Getpid())
253                 if err != nil {
254                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
255                 }
256                 err = f.Sync()
257                 if err != nil {
258                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
259                 }
260                 defer f.Close()
261                 defer os.Remove(pidfile)
262         }
263
264         if len(volumes) == 0 {
265                 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
266                         log.Fatal("No volumes found.")
267                 }
268         }
269
270         for _, v := range volumes {
271                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
272         }
273
274         // Initialize data manager token and permission key.
275         // If these tokens are specified but cannot be read,
276         // raise a fatal error.
277         if dataManagerTokenFile != "" {
278                 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
279                         dataManagerToken = strings.TrimSpace(string(buf))
280                 } else {
281                         log.Fatalf("reading data manager token: %s\n", err)
282                 }
283         }
284
285         if neverDelete != true {
286                 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
287                         "been extensively tested. You should leave this option alone unless you can afford to lose data.")
288         }
289
290         if blobSigningKeyFile != "" {
291                 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
292                         PermissionSecret = bytes.TrimSpace(buf)
293                 } else {
294                         log.Fatalf("reading permission key: %s\n", err)
295                 }
296         }
297
298         blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
299
300         if PermissionSecret == nil {
301                 if enforcePermissions {
302                         log.Fatal("-enforce-permissions requires a permission key")
303                 } else {
304                         log.Println("Running without a PermissionSecret. Block locators " +
305                                 "returned by this server will not be signed, and will be rejected " +
306                                 "by a server that enforces permissions.")
307                         log.Println("To fix this, use the -blob-signing-key-file flag " +
308                                 "to specify the file containing the permission key.")
309                 }
310         }
311
312         if maxRequests <= 0 {
313                 maxRequests = maxBuffers * 2
314                 log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
315         }
316
317         // Start a round-robin VolumeManager with the volumes we have found.
318         KeepVM = MakeRRVolumeManager(volumes)
319
320         // Middleware stack: logger, maxRequests limiter, method handlers
321         http.Handle("/", &LoggingRESTRouter{
322                 httpserver.NewRequestLimiter(maxRequests,
323                         MakeRESTRouter()),
324         })
325
326         // Set up a TCP listener.
327         listener, err := net.Listen("tcp", listen)
328         if err != nil {
329                 log.Fatal(err)
330         }
331
332         // Initialize Pull queue and worker
333         keepClient := &keepclient.KeepClient{
334                 Arvados:       nil,
335                 Want_replicas: 1,
336                 Client:        &http.Client{},
337         }
338
339         // Initialize the pullq and worker
340         pullq = NewWorkQueue()
341         go RunPullWorker(pullq, keepClient)
342
343         // Initialize the trashq and worker
344         trashq = NewWorkQueue()
345         go RunTrashWorker(trashq)
346
347         // Start emptyTrash goroutine
348         doneEmptyingTrash := make(chan bool)
349         go emptyTrash(doneEmptyingTrash, trashCheckInterval)
350
351         // Shut down the server gracefully (by closing the listener)
352         // if SIGTERM is received.
353         term := make(chan os.Signal, 1)
354         go func(sig <-chan os.Signal) {
355                 s := <-sig
356                 log.Println("caught signal:", s)
357                 doneEmptyingTrash <- true
358                 listener.Close()
359         }(term)
360         signal.Notify(term, syscall.SIGTERM)
361         signal.Notify(term, syscall.SIGINT)
362
363         log.Println("listening at", listen)
364         srv := &http.Server{Addr: listen}
365         srv.Serve(listener)
366 }
367
368 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
369 func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
370         ticker := time.NewTicker(trashCheckInterval)
371
372         for {
373                 select {
374                 case <-ticker.C:
375                         for _, v := range volumes {
376                                 if v.Writable() {
377                                         v.EmptyTrash()
378                                 }
379                         }
380                 case <-doneEmptyingTrash:
381                         ticker.Stop()
382                         return
383                 }
384         }
385 }