8936: update comment on keepstore and go fmt
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         "io/ioutil"
9         "log"
10         "net"
11         "net/http"
12         "os"
13         "os/signal"
14         "strings"
15         "syscall"
16         "time"
17 )
18
19 // ======================
20 // Configuration settings
21 //
22 // TODO(twp): make all of these configurable via command line flags
23 // and/or configuration file settings.
24
25 // Default TCP address on which to listen for requests.
26 // Initialized by the --listen flag.
27 const DefaultAddr = ":25107"
28
29 // A Keep "block" is 64MB.
30 const BlockSize = 64 * 1024 * 1024
31
32 // A Keep volume must have at least MinFreeKilobytes available
33 // in order to permit writes.
34 const MinFreeKilobytes = BlockSize / 1024
35
36 // ProcMounts /proc/mounts
37 var ProcMounts = "/proc/mounts"
38
39 // enforcePermissions controls whether permission signatures
40 // should be enforced (affecting GET and DELETE requests).
41 // Initialized by the -enforce-permissions flag.
42 var enforcePermissions bool
43
44 // blobSignatureTTL is the time duration for which new permission
45 // signatures (returned by PUT requests) will be valid.
46 // Initialized by the -permission-ttl flag.
47 var blobSignatureTTL time.Duration
48
49 // dataManagerToken represents the API token used by the
50 // Data Manager, and is required on certain privileged operations.
51 // Initialized by the -data-manager-token-file flag.
52 var dataManagerToken string
53
54 // neverDelete can be used to prevent the DELETE handler from
55 // actually deleting anything.
56 var neverDelete = true
57
58 // trashLifetime is the time duration after a block is trashed
59 // during which it can be recovered using an /untrash request
60 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
61 var trashLifetime time.Duration
62
63 // trashCheckInterval is the time duration at which the emptyTrash goroutine
64 // will check and delete expired trashed blocks. Default is one day.
65 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
66 var trashCheckInterval time.Duration
67
68 var maxBuffers = 128
69 var bufs *bufferPool
70
71 // KeepError types.
72 //
73 type KeepError struct {
74         HTTPCode int
75         ErrMsg   string
76 }
77
78 var (
79         BadRequestError     = &KeepError{400, "Bad Request"}
80         UnauthorizedError   = &KeepError{401, "Unauthorized"}
81         CollisionError      = &KeepError{500, "Collision"}
82         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
83         PermissionError     = &KeepError{403, "Forbidden"}
84         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
85         ExpiredError        = &KeepError{401, "Expired permission signature"}
86         NotFoundError       = &KeepError{404, "Not Found"}
87         GenericError        = &KeepError{500, "Fail"}
88         FullError           = &KeepError{503, "Full"}
89         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
90         TooLongError        = &KeepError{413, "Block is too large"}
91         MethodDisabledError = &KeepError{405, "Method disabled"}
92         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
93 )
94
95 func (e *KeepError) Error() string {
96         return e.ErrMsg
97 }
98
99 // ========================
100 // Internal data structures
101 //
102 // These global variables are used by multiple parts of the
103 // program. They are good candidates for moving into their own
104 // packages.
105
106 // The Keep VolumeManager maintains a list of available volumes.
107 // Initialized by the --volumes flag (or by FindKeepVolumes).
108 var KeepVM VolumeManager
109
110 // The pull list manager and trash queue are threadsafe queues which
111 // support atomic update operations. The PullHandler and TrashHandler
112 // store results from Data Manager /pull and /trash requests here.
113 //
114 // See the Keep and Data Manager design documents for more details:
115 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
116 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
117 //
118 var pullq *WorkQueue
119 var trashq *WorkQueue
120
121 type volumeSet []Volume
122
123 var (
124         flagSerializeIO bool
125         flagReadonly    bool
126         volumes         volumeSet
127 )
128
129 func (vs *volumeSet) String() string {
130         return fmt.Sprintf("%+v", (*vs)[:])
131 }
132
133 // TODO(twp): continue moving as much code as possible out of main
134 // so it can be effectively tested. Esp. handling and postprocessing
135 // of command line flags (identifying Keep volumes and initializing
136 // permission arguments).
137
138 func main() {
139         log.Println("keepstore starting, pid", os.Getpid())
140         defer log.Println("keepstore exiting, pid", os.Getpid())
141
142         var (
143                 dataManagerTokenFile string
144                 listen               string
145                 blobSigningKeyFile   string
146                 permissionTTLSec     int
147                 pidfile              string
148         )
149         flag.StringVar(
150                 &dataManagerTokenFile,
151                 "data-manager-token-file",
152                 "",
153                 "File with the API token used by the Data Manager. All DELETE "+
154                         "requests or GET /index requests must carry this token.")
155         flag.BoolVar(
156                 &enforcePermissions,
157                 "enforce-permissions",
158                 false,
159                 "Enforce permission signatures on requests.")
160         flag.StringVar(
161                 &listen,
162                 "listen",
163                 DefaultAddr,
164                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
165         flag.BoolVar(
166                 &neverDelete,
167                 "never-delete",
168                 true,
169                 "If true, nothing will be deleted. "+
170                         "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
171                         "You should leave this option alone unless you can afford to lose data.")
172         flag.StringVar(
173                 &blobSigningKeyFile,
174                 "permission-key-file",
175                 "",
176                 "Synonym for -blob-signing-key-file.")
177         flag.StringVar(
178                 &blobSigningKeyFile,
179                 "blob-signing-key-file",
180                 "",
181                 "File containing the secret key for generating and verifying "+
182                         "blob permission signatures.")
183         flag.IntVar(
184                 &permissionTTLSec,
185                 "permission-ttl",
186                 0,
187                 "Synonym for -blob-signature-ttl.")
188         flag.IntVar(
189                 &permissionTTLSec,
190                 "blob-signature-ttl",
191                 int(time.Duration(2*7*24*time.Hour).Seconds()),
192                 "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures"+
193                         "See services/api/config/application.default.yml.")
194         flag.BoolVar(
195                 &flagSerializeIO,
196                 "serialize",
197                 false,
198                 "Serialize read and write operations on the following volumes.")
199         flag.BoolVar(
200                 &flagReadonly,
201                 "readonly",
202                 false,
203                 "Do not write, delete, or touch anything on the following volumes.")
204         flag.StringVar(
205                 &pidfile,
206                 "pid",
207                 "",
208                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
209         flag.IntVar(
210                 &maxBuffers,
211                 "max-buffers",
212                 maxBuffers,
213                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
214         flag.DurationVar(
215                 &trashLifetime,
216                 "trash-lifetime",
217                 0*time.Second,
218                 "Time duration after a block is trashed during which it can be recovered using an /untrash request")
219         flag.DurationVar(
220                 &trashCheckInterval,
221                 "trash-check-interval",
222                 24*time.Hour,
223                 "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
224
225         flag.Parse()
226
227         if maxBuffers < 0 {
228                 log.Fatal("-max-buffers must be greater than zero.")
229         }
230         bufs = newBufferPool(maxBuffers, BlockSize)
231
232         if pidfile != "" {
233                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
234                 if err != nil {
235                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
236                 }
237                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
238                 if err != nil {
239                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
240                 }
241                 err = f.Truncate(0)
242                 if err != nil {
243                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
244                 }
245                 _, err = fmt.Fprint(f, os.Getpid())
246                 if err != nil {
247                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
248                 }
249                 err = f.Sync()
250                 if err != nil {
251                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
252                 }
253                 defer f.Close()
254                 defer os.Remove(pidfile)
255         }
256
257         if len(volumes) == 0 {
258                 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
259                         log.Fatal("No volumes found.")
260                 }
261         }
262
263         for _, v := range volumes {
264                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
265         }
266
267         // Initialize data manager token and permission key.
268         // If these tokens are specified but cannot be read,
269         // raise a fatal error.
270         if dataManagerTokenFile != "" {
271                 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
272                         dataManagerToken = strings.TrimSpace(string(buf))
273                 } else {
274                         log.Fatalf("reading data manager token: %s\n", err)
275                 }
276         }
277
278         if neverDelete != true {
279                 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
280                         "been extensively tested. You should leave this option alone unless you can afford to lose data.")
281         }
282
283         if blobSigningKeyFile != "" {
284                 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
285                         PermissionSecret = bytes.TrimSpace(buf)
286                 } else {
287                         log.Fatalf("reading permission key: %s\n", err)
288                 }
289         }
290
291         blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
292
293         if PermissionSecret == nil {
294                 if enforcePermissions {
295                         log.Fatal("-enforce-permissions requires a permission key")
296                 } else {
297                         log.Println("Running without a PermissionSecret. Block locators " +
298                                 "returned by this server will not be signed, and will be rejected " +
299                                 "by a server that enforces permissions.")
300                         log.Println("To fix this, use the -blob-signing-key-file flag " +
301                                 "to specify the file containing the permission key.")
302                 }
303         }
304
305         // Start a round-robin VolumeManager with the volumes we have found.
306         KeepVM = MakeRRVolumeManager(volumes)
307
308         // Tell the built-in HTTP server to direct all requests to the REST router.
309         loggingRouter := MakeLoggingRESTRouter()
310         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
311                 loggingRouter.ServeHTTP(resp, req)
312         })
313
314         // Set up a TCP listener.
315         listener, err := net.Listen("tcp", listen)
316         if err != nil {
317                 log.Fatal(err)
318         }
319
320         // Initialize Pull queue and worker
321         keepClient := &keepclient.KeepClient{
322                 Arvados:       nil,
323                 Want_replicas: 1,
324                 Client:        &http.Client{},
325         }
326
327         // Initialize the pullq and worker
328         pullq = NewWorkQueue()
329         go RunPullWorker(pullq, keepClient)
330
331         // Initialize the trashq and worker
332         trashq = NewWorkQueue()
333         go RunTrashWorker(trashq)
334
335         // Start emptyTrash goroutine
336         doneEmptyingTrash := make(chan bool)
337         go emptyTrash(doneEmptyingTrash, trashCheckInterval)
338
339         // Shut down the server gracefully (by closing the listener)
340         // if SIGTERM is received.
341         term := make(chan os.Signal, 1)
342         go func(sig <-chan os.Signal) {
343                 s := <-sig
344                 log.Println("caught signal:", s)
345                 doneEmptyingTrash <- true
346                 listener.Close()
347         }(term)
348         signal.Notify(term, syscall.SIGTERM)
349         signal.Notify(term, syscall.SIGINT)
350
351         log.Println("listening at", listen)
352         srv := &http.Server{Addr: listen}
353         srv.Serve(listener)
354 }
355
356 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
357 func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
358         ticker := time.NewTicker(trashCheckInterval)
359
360         for {
361                 select {
362                 case <-ticker.C:
363                         for _, v := range volumes {
364                                 if v.Writable() {
365                                         v.EmptyTrash()
366                                 }
367                         }
368                 case <-doneEmptyingTrash:
369                         ticker.Stop()
370                         return
371                 }
372         }
373 }