10272: Fix nil pointer dereference in help message. refs #10272
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/httpserver"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/signal"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 // ======================
22 // Configuration settings
23 //
24 // TODO(twp): make all of these configurable via command line flags
25 // and/or configuration file settings.
26
27 // Default TCP address on which to listen for requests.
28 // Initialized by the --listen flag.
29 const DefaultAddr = ":25107"
30
31 // A Keep "block" is 64MB.
32 const BlockSize = 64 * 1024 * 1024
33
34 // A Keep volume must have at least MinFreeKilobytes available
35 // in order to permit writes.
36 const MinFreeKilobytes = BlockSize / 1024
37
38 // ProcMounts /proc/mounts
39 var ProcMounts = "/proc/mounts"
40
41 // enforcePermissions controls whether permission signatures
42 // should be enforced (affecting GET and DELETE requests).
43 // Initialized by the -enforce-permissions flag.
44 var enforcePermissions bool
45
46 // blobSignatureTTL is the time duration for which new permission
47 // signatures (returned by PUT requests) will be valid.
48 // Initialized by the -permission-ttl flag.
49 var blobSignatureTTL time.Duration
50
51 // dataManagerToken represents the API token used by the
52 // Data Manager, and is required on certain privileged operations.
53 // Initialized by the -data-manager-token-file flag.
54 var dataManagerToken string
55
56 // neverDelete can be used to prevent the DELETE handler from
57 // actually deleting anything.
58 var neverDelete = true
59
60 // trashLifetime is the time duration after a block is trashed
61 // during which it can be recovered using an /untrash request
62 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
63 var trashLifetime time.Duration
64
65 // trashCheckInterval is the time duration at which the emptyTrash goroutine
66 // will check and delete expired trashed blocks. Default is one day.
67 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
68 var trashCheckInterval time.Duration
69
70 var maxBuffers = 128
71 var bufs *bufferPool
72
73 // KeepError types.
74 //
75 type KeepError struct {
76         HTTPCode int
77         ErrMsg   string
78 }
79
80 var (
81         BadRequestError     = &KeepError{400, "Bad Request"}
82         UnauthorizedError   = &KeepError{401, "Unauthorized"}
83         CollisionError      = &KeepError{500, "Collision"}
84         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
85         PermissionError     = &KeepError{403, "Forbidden"}
86         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
87         ExpiredError        = &KeepError{401, "Expired permission signature"}
88         NotFoundError       = &KeepError{404, "Not Found"}
89         GenericError        = &KeepError{500, "Fail"}
90         FullError           = &KeepError{503, "Full"}
91         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
92         TooLongError        = &KeepError{413, "Block is too large"}
93         MethodDisabledError = &KeepError{405, "Method disabled"}
94         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
95         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
96 )
97
98 func (e *KeepError) Error() string {
99         return e.ErrMsg
100 }
101
102 // ========================
103 // Internal data structures
104 //
105 // These global variables are used by multiple parts of the
106 // program. They are good candidates for moving into their own
107 // packages.
108
109 // The Keep VolumeManager maintains a list of available volumes.
110 // Initialized by the --volumes flag (or by FindKeepVolumes).
111 var KeepVM VolumeManager
112
113 // The pull list manager and trash queue are threadsafe queues which
114 // support atomic update operations. The PullHandler and TrashHandler
115 // store results from Data Manager /pull and /trash requests here.
116 //
117 // See the Keep and Data Manager design documents for more details:
118 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
119 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
120 //
121 var pullq *WorkQueue
122 var trashq *WorkQueue
123
124 type volumeSet []Volume
125
126 var (
127         flagSerializeIO bool
128         flagReadonly    bool
129         volumes         volumeSet
130 )
131
132 func (vs *volumeSet) String() string {
133         if vs == nil {
134                 return "[]"
135         }
136         return fmt.Sprintf("%+v", (*vs)[:])
137 }
138
139 // TODO(twp): continue moving as much code as possible out of main
140 // so it can be effectively tested. Esp. handling and postprocessing
141 // of command line flags (identifying Keep volumes and initializing
142 // permission arguments).
143
144 func main() {
145         log.Println("keepstore starting, pid", os.Getpid())
146         defer log.Println("keepstore exiting, pid", os.Getpid())
147
148         var (
149                 dataManagerTokenFile string
150                 listen               string
151                 blobSigningKeyFile   string
152                 permissionTTLSec     int
153                 pidfile              string
154                 maxRequests          int
155         )
156         flag.StringVar(
157                 &dataManagerTokenFile,
158                 "data-manager-token-file",
159                 "",
160                 "File with the API token used by the Data Manager. All DELETE "+
161                         "requests or GET /index requests must carry this token.")
162         flag.BoolVar(
163                 &enforcePermissions,
164                 "enforce-permissions",
165                 false,
166                 "Enforce permission signatures on requests.")
167         flag.StringVar(
168                 &listen,
169                 "listen",
170                 DefaultAddr,
171                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
172         flag.IntVar(
173                 &maxRequests,
174                 "max-requests",
175                 0,
176                 "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
177         flag.BoolVar(
178                 &neverDelete,
179                 "never-delete",
180                 true,
181                 "If true, nothing will be deleted. "+
182                         "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
183                         "You should leave this option alone unless you can afford to lose data.")
184         flag.StringVar(
185                 &blobSigningKeyFile,
186                 "permission-key-file",
187                 "",
188                 "Synonym for -blob-signing-key-file.")
189         flag.StringVar(
190                 &blobSigningKeyFile,
191                 "blob-signing-key-file",
192                 "",
193                 "File containing the secret key for generating and verifying "+
194                         "blob permission signatures.")
195         flag.IntVar(
196                 &permissionTTLSec,
197                 "permission-ttl",
198                 0,
199                 "Synonym for -blob-signature-ttl.")
200         flag.IntVar(
201                 &permissionTTLSec,
202                 "blob-signature-ttl",
203                 2*7*24*3600,
204                 "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+
205                         "See services/api/config/application.default.yml.")
206         flag.BoolVar(
207                 &flagSerializeIO,
208                 "serialize",
209                 false,
210                 "Serialize read and write operations on the following volumes.")
211         flag.BoolVar(
212                 &flagReadonly,
213                 "readonly",
214                 false,
215                 "Do not write, delete, or touch anything on the following volumes.")
216         flag.StringVar(
217                 &pidfile,
218                 "pid",
219                 "",
220                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
221         flag.IntVar(
222                 &maxBuffers,
223                 "max-buffers",
224                 maxBuffers,
225                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
226         flag.DurationVar(
227                 &trashLifetime,
228                 "trash-lifetime",
229                 0,
230                 "Time duration after a block is trashed during which it can be recovered using an /untrash request")
231         flag.DurationVar(
232                 &trashCheckInterval,
233                 "trash-check-interval",
234                 24*time.Hour,
235                 "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
236
237         flag.Parse()
238
239         if maxBuffers < 0 {
240                 log.Fatal("-max-buffers must be greater than zero.")
241         }
242         bufs = newBufferPool(maxBuffers, BlockSize)
243
244         if pidfile != "" {
245                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
246                 if err != nil {
247                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
248                 }
249                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
250                 if err != nil {
251                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
252                 }
253                 err = f.Truncate(0)
254                 if err != nil {
255                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
256                 }
257                 _, err = fmt.Fprint(f, os.Getpid())
258                 if err != nil {
259                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
260                 }
261                 err = f.Sync()
262                 if err != nil {
263                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
264                 }
265                 defer f.Close()
266                 defer os.Remove(pidfile)
267         }
268
269         if len(volumes) == 0 {
270                 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
271                         log.Fatal("No volumes found.")
272                 }
273         }
274
275         for _, v := range volumes {
276                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
277         }
278
279         // Initialize data manager token and permission key.
280         // If these tokens are specified but cannot be read,
281         // raise a fatal error.
282         if dataManagerTokenFile != "" {
283                 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
284                         dataManagerToken = strings.TrimSpace(string(buf))
285                 } else {
286                         log.Fatalf("reading data manager token: %s\n", err)
287                 }
288         }
289
290         if neverDelete != true {
291                 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
292                         "been extensively tested. You should leave this option alone unless you can afford to lose data.")
293         }
294
295         if blobSigningKeyFile != "" {
296                 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
297                         PermissionSecret = bytes.TrimSpace(buf)
298                 } else {
299                         log.Fatalf("reading permission key: %s\n", err)
300                 }
301         }
302
303         blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
304
305         if PermissionSecret == nil {
306                 if enforcePermissions {
307                         log.Fatal("-enforce-permissions requires a permission key")
308                 } else {
309                         log.Println("Running without a PermissionSecret. Block locators " +
310                                 "returned by this server will not be signed, and will be rejected " +
311                                 "by a server that enforces permissions.")
312                         log.Println("To fix this, use the -blob-signing-key-file flag " +
313                                 "to specify the file containing the permission key.")
314                 }
315         }
316
317         if maxRequests <= 0 {
318                 maxRequests = maxBuffers * 2
319                 log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
320         }
321
322         // Start a round-robin VolumeManager with the volumes we have found.
323         KeepVM = MakeRRVolumeManager(volumes)
324
325         // Middleware stack: logger, maxRequests limiter, method handlers
326         http.Handle("/", &LoggingRESTRouter{
327                 httpserver.NewRequestLimiter(maxRequests,
328                         MakeRESTRouter()),
329         })
330
331         // Set up a TCP listener.
332         listener, err := net.Listen("tcp", listen)
333         if err != nil {
334                 log.Fatal(err)
335         }
336
337         // Initialize Pull queue and worker
338         keepClient := &keepclient.KeepClient{
339                 Arvados:       &arvadosclient.ArvadosClient{},
340                 Want_replicas: 1,
341                 Client:        &http.Client{},
342         }
343
344         // Initialize the pullq and worker
345         pullq = NewWorkQueue()
346         go RunPullWorker(pullq, keepClient)
347
348         // Initialize the trashq and worker
349         trashq = NewWorkQueue()
350         go RunTrashWorker(trashq)
351
352         // Start emptyTrash goroutine
353         doneEmptyingTrash := make(chan bool)
354         go emptyTrash(doneEmptyingTrash, trashCheckInterval)
355
356         // Shut down the server gracefully (by closing the listener)
357         // if SIGTERM is received.
358         term := make(chan os.Signal, 1)
359         go func(sig <-chan os.Signal) {
360                 s := <-sig
361                 log.Println("caught signal:", s)
362                 doneEmptyingTrash <- true
363                 listener.Close()
364         }(term)
365         signal.Notify(term, syscall.SIGTERM)
366         signal.Notify(term, syscall.SIGINT)
367
368         log.Println("listening at", listen)
369         srv := &http.Server{Addr: listen}
370         srv.Serve(listener)
371 }
372
373 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
374 func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
375         ticker := time.NewTicker(trashCheckInterval)
376
377         for {
378                 select {
379                 case <-ticker.C:
380                         for _, v := range volumes {
381                                 if v.Writable() {
382                                         v.EmptyTrash()
383                                 }
384                         }
385                 case <-doneEmptyingTrash:
386                         ticker.Stop()
387                         return
388                 }
389         }
390 }