Merge branch '8232-docker-remove-obsolete' closes #8232
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/httpserver"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/signal"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 // ======================
22 // Configuration settings
23 //
24 // TODO(twp): make all of these configurable via command line flags
25 // and/or configuration file settings.
26
27 // Default TCP address on which to listen for requests.
28 // Initialized by the --listen flag.
29 const DefaultAddr = ":25107"
30
31 // A Keep "block" is 64MB.
32 const BlockSize = 64 * 1024 * 1024
33
34 // A Keep volume must have at least MinFreeKilobytes available
35 // in order to permit writes.
36 const MinFreeKilobytes = BlockSize / 1024
37
38 // ProcMounts /proc/mounts
39 var ProcMounts = "/proc/mounts"
40
41 // enforcePermissions controls whether permission signatures
42 // should be enforced (affecting GET and DELETE requests).
43 // Initialized by the -enforce-permissions flag.
44 var enforcePermissions bool
45
46 // blobSignatureTTL is the time duration for which new permission
47 // signatures (returned by PUT requests) will be valid.
48 // Initialized by the -permission-ttl flag.
49 var blobSignatureTTL time.Duration
50
51 // dataManagerToken represents the API token used by the
52 // Data Manager, and is required on certain privileged operations.
53 // Initialized by the -data-manager-token-file flag.
54 var dataManagerToken string
55
56 // neverDelete can be used to prevent the DELETE handler from
57 // actually deleting anything.
58 var neverDelete = true
59
60 // trashLifetime is the time duration after a block is trashed
61 // during which it can be recovered using an /untrash request
62 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
63 var trashLifetime time.Duration
64
65 // trashCheckInterval is the time duration at which the emptyTrash goroutine
66 // will check and delete expired trashed blocks. Default is one day.
67 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
68 var trashCheckInterval time.Duration
69
70 var maxBuffers = 128
71 var bufs *bufferPool
72
73 // KeepError types.
74 //
75 type KeepError struct {
76         HTTPCode int
77         ErrMsg   string
78 }
79
80 var (
81         BadRequestError     = &KeepError{400, "Bad Request"}
82         UnauthorizedError   = &KeepError{401, "Unauthorized"}
83         CollisionError      = &KeepError{500, "Collision"}
84         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
85         PermissionError     = &KeepError{403, "Forbidden"}
86         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
87         ExpiredError        = &KeepError{401, "Expired permission signature"}
88         NotFoundError       = &KeepError{404, "Not Found"}
89         GenericError        = &KeepError{500, "Fail"}
90         FullError           = &KeepError{503, "Full"}
91         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
92         TooLongError        = &KeepError{413, "Block is too large"}
93         MethodDisabledError = &KeepError{405, "Method disabled"}
94         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
95         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
96 )
97
98 func (e *KeepError) Error() string {
99         return e.ErrMsg
100 }
101
102 // ========================
103 // Internal data structures
104 //
105 // These global variables are used by multiple parts of the
106 // program. They are good candidates for moving into their own
107 // packages.
108
109 // The Keep VolumeManager maintains a list of available volumes.
110 // Initialized by the --volumes flag (or by FindKeepVolumes).
111 var KeepVM VolumeManager
112
113 // The pull list manager and trash queue are threadsafe queues which
114 // support atomic update operations. The PullHandler and TrashHandler
115 // store results from Data Manager /pull and /trash requests here.
116 //
117 // See the Keep and Data Manager design documents for more details:
118 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
119 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
120 //
121 var pullq *WorkQueue
122 var trashq *WorkQueue
123
124 type volumeSet []Volume
125
126 var (
127         flagSerializeIO bool
128         flagReadonly    bool
129         volumes         volumeSet
130 )
131
132 func (vs *volumeSet) String() string {
133         return fmt.Sprintf("%+v", (*vs)[:])
134 }
135
136 // TODO(twp): continue moving as much code as possible out of main
137 // so it can be effectively tested. Esp. handling and postprocessing
138 // of command line flags (identifying Keep volumes and initializing
139 // permission arguments).
140
141 func main() {
142         log.Println("keepstore starting, pid", os.Getpid())
143         defer log.Println("keepstore exiting, pid", os.Getpid())
144
145         var (
146                 dataManagerTokenFile string
147                 listen               string
148                 blobSigningKeyFile   string
149                 permissionTTLSec     int
150                 pidfile              string
151                 maxRequests          int
152         )
153         flag.StringVar(
154                 &dataManagerTokenFile,
155                 "data-manager-token-file",
156                 "",
157                 "File with the API token used by the Data Manager. All DELETE "+
158                         "requests or GET /index requests must carry this token.")
159         flag.BoolVar(
160                 &enforcePermissions,
161                 "enforce-permissions",
162                 false,
163                 "Enforce permission signatures on requests.")
164         flag.StringVar(
165                 &listen,
166                 "listen",
167                 DefaultAddr,
168                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
169         flag.IntVar(
170                 &maxRequests,
171                 "max-requests",
172                 0,
173                 "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
174         flag.BoolVar(
175                 &neverDelete,
176                 "never-delete",
177                 true,
178                 "If true, nothing will be deleted. "+
179                         "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
180                         "You should leave this option alone unless you can afford to lose data.")
181         flag.StringVar(
182                 &blobSigningKeyFile,
183                 "permission-key-file",
184                 "",
185                 "Synonym for -blob-signing-key-file.")
186         flag.StringVar(
187                 &blobSigningKeyFile,
188                 "blob-signing-key-file",
189                 "",
190                 "File containing the secret key for generating and verifying "+
191                         "blob permission signatures.")
192         flag.IntVar(
193                 &permissionTTLSec,
194                 "permission-ttl",
195                 0,
196                 "Synonym for -blob-signature-ttl.")
197         flag.IntVar(
198                 &permissionTTLSec,
199                 "blob-signature-ttl",
200                 2*7*24*3600,
201                 "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+
202                         "See services/api/config/application.default.yml.")
203         flag.BoolVar(
204                 &flagSerializeIO,
205                 "serialize",
206                 false,
207                 "Serialize read and write operations on the following volumes.")
208         flag.BoolVar(
209                 &flagReadonly,
210                 "readonly",
211                 false,
212                 "Do not write, delete, or touch anything on the following volumes.")
213         flag.StringVar(
214                 &pidfile,
215                 "pid",
216                 "",
217                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
218         flag.IntVar(
219                 &maxBuffers,
220                 "max-buffers",
221                 maxBuffers,
222                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
223         flag.DurationVar(
224                 &trashLifetime,
225                 "trash-lifetime",
226                 0,
227                 "Time duration after a block is trashed during which it can be recovered using an /untrash request")
228         flag.DurationVar(
229                 &trashCheckInterval,
230                 "trash-check-interval",
231                 24*time.Hour,
232                 "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
233
234         flag.Parse()
235
236         if maxBuffers < 0 {
237                 log.Fatal("-max-buffers must be greater than zero.")
238         }
239         bufs = newBufferPool(maxBuffers, BlockSize)
240
241         if pidfile != "" {
242                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
243                 if err != nil {
244                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
245                 }
246                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
247                 if err != nil {
248                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
249                 }
250                 err = f.Truncate(0)
251                 if err != nil {
252                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
253                 }
254                 _, err = fmt.Fprint(f, os.Getpid())
255                 if err != nil {
256                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
257                 }
258                 err = f.Sync()
259                 if err != nil {
260                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
261                 }
262                 defer f.Close()
263                 defer os.Remove(pidfile)
264         }
265
266         if len(volumes) == 0 {
267                 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
268                         log.Fatal("No volumes found.")
269                 }
270         }
271
272         for _, v := range volumes {
273                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
274         }
275
276         // Initialize data manager token and permission key.
277         // If these tokens are specified but cannot be read,
278         // raise a fatal error.
279         if dataManagerTokenFile != "" {
280                 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
281                         dataManagerToken = strings.TrimSpace(string(buf))
282                 } else {
283                         log.Fatalf("reading data manager token: %s\n", err)
284                 }
285         }
286
287         if neverDelete != true {
288                 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
289                         "been extensively tested. You should leave this option alone unless you can afford to lose data.")
290         }
291
292         if blobSigningKeyFile != "" {
293                 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
294                         PermissionSecret = bytes.TrimSpace(buf)
295                 } else {
296                         log.Fatalf("reading permission key: %s\n", err)
297                 }
298         }
299
300         blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
301
302         if PermissionSecret == nil {
303                 if enforcePermissions {
304                         log.Fatal("-enforce-permissions requires a permission key")
305                 } else {
306                         log.Println("Running without a PermissionSecret. Block locators " +
307                                 "returned by this server will not be signed, and will be rejected " +
308                                 "by a server that enforces permissions.")
309                         log.Println("To fix this, use the -blob-signing-key-file flag " +
310                                 "to specify the file containing the permission key.")
311                 }
312         }
313
314         if maxRequests <= 0 {
315                 maxRequests = maxBuffers * 2
316                 log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
317         }
318
319         // Start a round-robin VolumeManager with the volumes we have found.
320         KeepVM = MakeRRVolumeManager(volumes)
321
322         // Middleware stack: logger, maxRequests limiter, method handlers
323         http.Handle("/", &LoggingRESTRouter{
324                 httpserver.NewRequestLimiter(maxRequests,
325                         MakeRESTRouter()),
326         })
327
328         // Set up a TCP listener.
329         listener, err := net.Listen("tcp", listen)
330         if err != nil {
331                 log.Fatal(err)
332         }
333
334         // Initialize Pull queue and worker
335         keepClient := &keepclient.KeepClient{
336                 Arvados:       &arvadosclient.ArvadosClient{},
337                 Want_replicas: 1,
338                 Client:        &http.Client{},
339         }
340
341         // Initialize the pullq and worker
342         pullq = NewWorkQueue()
343         go RunPullWorker(pullq, keepClient)
344
345         // Initialize the trashq and worker
346         trashq = NewWorkQueue()
347         go RunTrashWorker(trashq)
348
349         // Start emptyTrash goroutine
350         doneEmptyingTrash := make(chan bool)
351         go emptyTrash(doneEmptyingTrash, trashCheckInterval)
352
353         // Shut down the server gracefully (by closing the listener)
354         // if SIGTERM is received.
355         term := make(chan os.Signal, 1)
356         go func(sig <-chan os.Signal) {
357                 s := <-sig
358                 log.Println("caught signal:", s)
359                 doneEmptyingTrash <- true
360                 listener.Close()
361         }(term)
362         signal.Notify(term, syscall.SIGTERM)
363         signal.Notify(term, syscall.SIGINT)
364
365         log.Println("listening at", listen)
366         srv := &http.Server{Addr: listen}
367         srv.Serve(listener)
368 }
369
370 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
371 func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
372         ticker := time.NewTicker(trashCheckInterval)
373
374         for {
375                 select {
376                 case <-ticker.C:
377                         for _, v := range volumes {
378                                 if v.Writable() {
379                                         v.EmptyTrash()
380                                 }
381                         }
382                 case <-doneEmptyingTrash:
383                         ticker.Stop()
384                         return
385                 }
386         }
387 }