Merge branch 'pr/25'
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         "io/ioutil"
9         "log"
10         "net"
11         "net/http"
12         "os"
13         "os/signal"
14         "strings"
15         "syscall"
16         "time"
17 )
18
19 // ======================
20 // Configuration settings
21 //
22 // TODO(twp): make all of these configurable via command line flags
23 // and/or configuration file settings.
24
25 // Default TCP address on which to listen for requests.
26 // Initialized by the --listen flag.
27 const DefaultAddr = ":25107"
28
29 // A Keep "block" is 64MB.
30 const BlockSize = 64 * 1024 * 1024
31
32 // A Keep volume must have at least MinFreeKilobytes available
33 // in order to permit writes.
34 const MinFreeKilobytes = BlockSize / 1024
35
36 // Until #6221 is resolved, never_delete must be true.
37 // However, allow it to be false in testing with TestDataManagerToken
38 const TestDataManagerToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
39
40 // ProcMounts /proc/mounts
41 var ProcMounts = "/proc/mounts"
42
43 // enforcePermissions controls whether permission signatures
44 // should be enforced (affecting GET and DELETE requests).
45 // Initialized by the -enforce-permissions flag.
46 var enforcePermissions bool
47
48 // blobSignatureTTL is the time duration for which new permission
49 // signatures (returned by PUT requests) will be valid.
50 // Initialized by the -permission-ttl flag.
51 var blobSignatureTTL time.Duration
52
53 // dataManagerToken represents the API token used by the
54 // Data Manager, and is required on certain privileged operations.
55 // Initialized by the -data-manager-token-file flag.
56 var dataManagerToken string
57
58 // neverDelete can be used to prevent the DELETE handler from
59 // actually deleting anything.
60 var neverDelete = true
61
62 var maxBuffers = 128
63 var bufs *bufferPool
64
65 // KeepError types.
66 //
67 type KeepError struct {
68         HTTPCode int
69         ErrMsg   string
70 }
71
72 var (
73         BadRequestError     = &KeepError{400, "Bad Request"}
74         UnauthorizedError   = &KeepError{401, "Unauthorized"}
75         CollisionError      = &KeepError{500, "Collision"}
76         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
77         PermissionError     = &KeepError{403, "Forbidden"}
78         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
79         ExpiredError        = &KeepError{401, "Expired permission signature"}
80         NotFoundError       = &KeepError{404, "Not Found"}
81         GenericError        = &KeepError{500, "Fail"}
82         FullError           = &KeepError{503, "Full"}
83         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
84         TooLongError        = &KeepError{413, "Block is too large"}
85         MethodDisabledError = &KeepError{405, "Method disabled"}
86 )
87
88 func (e *KeepError) Error() string {
89         return e.ErrMsg
90 }
91
92 // ========================
93 // Internal data structures
94 //
95 // These global variables are used by multiple parts of the
96 // program. They are good candidates for moving into their own
97 // packages.
98
99 // The Keep VolumeManager maintains a list of available volumes.
100 // Initialized by the --volumes flag (or by FindKeepVolumes).
101 var KeepVM VolumeManager
102
103 // The pull list manager and trash queue are threadsafe queues which
104 // support atomic update operations. The PullHandler and TrashHandler
105 // store results from Data Manager /pull and /trash requests here.
106 //
107 // See the Keep and Data Manager design documents for more details:
108 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
109 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
110 //
111 var pullq *WorkQueue
112 var trashq *WorkQueue
113
114 type volumeSet []Volume
115
116 var (
117         flagSerializeIO bool
118         flagReadonly    bool
119         volumes         volumeSet
120 )
121
122 func (vs *volumeSet) String() string {
123         return fmt.Sprintf("%+v", (*vs)[:])
124 }
125
126 // TODO(twp): continue moving as much code as possible out of main
127 // so it can be effectively tested. Esp. handling and postprocessing
128 // of command line flags (identifying Keep volumes and initializing
129 // permission arguments).
130
131 func main() {
132         log.Println("keepstore starting, pid", os.Getpid())
133         defer log.Println("keepstore exiting, pid", os.Getpid())
134
135         var (
136                 dataManagerTokenFile string
137                 listen               string
138                 blobSigningKeyFile   string
139                 permissionTTLSec     int
140                 pidfile              string
141         )
142         flag.StringVar(
143                 &dataManagerTokenFile,
144                 "data-manager-token-file",
145                 "",
146                 "File with the API token used by the Data Manager. All DELETE "+
147                         "requests or GET /index requests must carry this token.")
148         flag.BoolVar(
149                 &enforcePermissions,
150                 "enforce-permissions",
151                 false,
152                 "Enforce permission signatures on requests.")
153         flag.StringVar(
154                 &listen,
155                 "listen",
156                 DefaultAddr,
157                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
158         flag.BoolVar(
159                 &neverDelete,
160                 "never-delete",
161                 true,
162                 "If set, nothing will be deleted. HTTP 405 will be returned "+
163                         "for valid DELETE requests.")
164         flag.StringVar(
165                 &blobSigningKeyFile,
166                 "permission-key-file",
167                 "",
168                 "Synonym for -blob-signing-key-file.")
169         flag.StringVar(
170                 &blobSigningKeyFile,
171                 "blob-signing-key-file",
172                 "",
173                 "File containing the secret key for generating and verifying "+
174                         "blob permission signatures.")
175         flag.IntVar(
176                 &permissionTTLSec,
177                 "permission-ttl",
178                 0,
179                 "Synonym for -blob-signature-ttl.")
180         flag.IntVar(
181                 &permissionTTLSec,
182                 "blob-signature-ttl",
183                 int(time.Duration(2*7*24*time.Hour).Seconds()),
184                 "Lifetime of blob permission signatures. "+
185                         "See services/api/config/application.default.yml.")
186         flag.BoolVar(
187                 &flagSerializeIO,
188                 "serialize",
189                 false,
190                 "Serialize read and write operations on the following volumes.")
191         flag.BoolVar(
192                 &flagReadonly,
193                 "readonly",
194                 false,
195                 "Do not write, delete, or touch anything on the following volumes.")
196         flag.StringVar(
197                 &pidfile,
198                 "pid",
199                 "",
200                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
201         flag.IntVar(
202                 &maxBuffers,
203                 "max-buffers",
204                 maxBuffers,
205                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
206
207         flag.Parse()
208
209         if maxBuffers < 0 {
210                 log.Fatal("-max-buffers must be greater than zero.")
211         }
212         bufs = newBufferPool(maxBuffers, BlockSize)
213
214         if pidfile != "" {
215                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
216                 if err != nil {
217                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
218                 }
219                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
220                 if err != nil {
221                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
222                 }
223                 err = f.Truncate(0)
224                 if err != nil {
225                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
226                 }
227                 _, err = fmt.Fprint(f, os.Getpid())
228                 if err != nil {
229                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
230                 }
231                 err = f.Sync()
232                 if err != nil {
233                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
234                 }
235                 defer f.Close()
236                 defer os.Remove(pidfile)
237         }
238
239         if len(volumes) == 0 {
240                 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
241                         log.Fatal("No volumes found.")
242                 }
243         }
244
245         for _, v := range volumes {
246                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
247         }
248
249         // Initialize data manager token and permission key.
250         // If these tokens are specified but cannot be read,
251         // raise a fatal error.
252         if dataManagerTokenFile != "" {
253                 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
254                         dataManagerToken = strings.TrimSpace(string(buf))
255                 } else {
256                         log.Fatalf("reading data manager token: %s\n", err)
257                 }
258         }
259
260         if neverDelete != true && dataManagerToken != TestDataManagerToken {
261                 log.Fatal("never_delete must be true, see #6221")
262         }
263
264         if blobSigningKeyFile != "" {
265                 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
266                         PermissionSecret = bytes.TrimSpace(buf)
267                 } else {
268                         log.Fatalf("reading permission key: %s\n", err)
269                 }
270         }
271
272         blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
273
274         if PermissionSecret == nil {
275                 if enforcePermissions {
276                         log.Fatal("-enforce-permissions requires a permission key")
277                 } else {
278                         log.Println("Running without a PermissionSecret. Block locators " +
279                                 "returned by this server will not be signed, and will be rejected " +
280                                 "by a server that enforces permissions.")
281                         log.Println("To fix this, use the -blob-signing-key-file flag " +
282                                 "to specify the file containing the permission key.")
283                 }
284         }
285
286         // Start a round-robin VolumeManager with the volumes we have found.
287         KeepVM = MakeRRVolumeManager(volumes)
288
289         // Tell the built-in HTTP server to direct all requests to the REST router.
290         loggingRouter := MakeLoggingRESTRouter()
291         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
292                 loggingRouter.ServeHTTP(resp, req)
293         })
294
295         // Set up a TCP listener.
296         listener, err := net.Listen("tcp", listen)
297         if err != nil {
298                 log.Fatal(err)
299         }
300
301         // Initialize Pull queue and worker
302         keepClient := &keepclient.KeepClient{
303                 Arvados:       nil,
304                 Want_replicas: 1,
305                 Using_proxy:   true,
306                 Client:        &http.Client{},
307         }
308
309         // Initialize the pullq and worker
310         pullq = NewWorkQueue()
311         go RunPullWorker(pullq, keepClient)
312
313         // Initialize the trashq and worker
314         trashq = NewWorkQueue()
315         go RunTrashWorker(trashq)
316
317         // Shut down the server gracefully (by closing the listener)
318         // if SIGTERM is received.
319         term := make(chan os.Signal, 1)
320         go func(sig <-chan os.Signal) {
321                 s := <-sig
322                 log.Println("caught signal:", s)
323                 listener.Close()
324         }(term)
325         signal.Notify(term, syscall.SIGTERM)
326         signal.Notify(term, syscall.SIGINT)
327
328         log.Println("listening at", listen)
329         srv := &http.Server{Addr: listen}
330         srv.Serve(listener)
331 }