refs #7179
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/signal"
16         "strings"
17         "sync"
18         "syscall"
19         "time"
20 )
21
22 // ======================
23 // Configuration settings
24 //
25 // TODO(twp): make all of these configurable via command line flags
26 // and/or configuration file settings.
27
28 // Default TCP address on which to listen for requests.
29 // Initialized by the --listen flag.
30 const DefaultAddr = ":25107"
31
32 // A Keep "block" is 64MB.
33 const BlockSize = 64 * 1024 * 1024
34
35 // A Keep volume must have at least MinFreeKilobytes available
36 // in order to permit writes.
37 const MinFreeKilobytes = BlockSize / 1024
38
39 // ProcMounts /proc/mounts
40 var ProcMounts = "/proc/mounts"
41
42 // enforcePermissions controls whether permission signatures
43 // should be enforced (affecting GET and DELETE requests).
44 // Initialized by the -enforce-permissions flag.
45 var enforcePermissions bool
46
47 // blobSignatureTTL is the time duration for which new permission
48 // signatures (returned by PUT requests) will be valid.
49 // Initialized by the -permission-ttl flag.
50 var blobSignatureTTL time.Duration
51
52 // dataManagerToken represents the API token used by the
53 // Data Manager, and is required on certain privileged operations.
54 // Initialized by the -data-manager-token-file flag.
55 var dataManagerToken string
56
57 // neverDelete can be used to prevent the DELETE handler from
58 // actually deleting anything.
59 var neverDelete = true
60
61 var maxBuffers = 128
62 var bufs *bufferPool
63
64 // KeepError types.
65 //
66 type KeepError struct {
67         HTTPCode int
68         ErrMsg   string
69 }
70
71 var (
72         BadRequestError     = &KeepError{400, "Bad Request"}
73         UnauthorizedError   = &KeepError{401, "Unauthorized"}
74         CollisionError      = &KeepError{500, "Collision"}
75         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
76         PermissionError     = &KeepError{403, "Forbidden"}
77         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
78         ExpiredError        = &KeepError{401, "Expired permission signature"}
79         NotFoundError       = &KeepError{404, "Not Found"}
80         GenericError        = &KeepError{500, "Fail"}
81         FullError           = &KeepError{503, "Full"}
82         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
83         TooLongError        = &KeepError{413, "Block is too large"}
84         MethodDisabledError = &KeepError{405, "Method disabled"}
85 )
86
87 func (e *KeepError) Error() string {
88         return e.ErrMsg
89 }
90
91 // ========================
92 // Internal data structures
93 //
94 // These global variables are used by multiple parts of the
95 // program. They are good candidates for moving into their own
96 // packages.
97
98 // The Keep VolumeManager maintains a list of available volumes.
99 // Initialized by the --volumes flag (or by FindKeepVolumes).
100 var KeepVM VolumeManager
101
102 // The pull list manager and trash queue are threadsafe queues which
103 // support atomic update operations. The PullHandler and TrashHandler
104 // store results from Data Manager /pull and /trash requests here.
105 //
106 // See the Keep and Data Manager design documents for more details:
107 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
108 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
109 //
110 var pullq *WorkQueue
111 var trashq *WorkQueue
112
113 var (
114         flagSerializeIO bool
115         flagReadonly    bool
116 )
117
118 type volumeSet []Volume
119
120 func (vs *volumeSet) Set(value string) error {
121         if dirs := strings.Split(value, ","); len(dirs) > 1 {
122                 log.Print("DEPRECATED: using comma-separated volume list.")
123                 for _, dir := range dirs {
124                         if err := vs.Set(dir); err != nil {
125                                 return err
126                         }
127                 }
128                 return nil
129         }
130         if len(value) == 0 || value[0] != '/' {
131                 return errors.New("Invalid volume: must begin with '/'.")
132         }
133         if _, err := os.Stat(value); err != nil {
134                 return err
135         }
136         var locker sync.Locker
137         if flagSerializeIO {
138                 locker = &sync.Mutex{}
139         }
140         *vs = append(*vs, &UnixVolume{
141                 root:     value,
142                 locker:   locker,
143                 readonly: flagReadonly,
144         })
145         return nil
146 }
147
148 func (vs *volumeSet) String() string {
149         s := "["
150         for i, v := range *vs {
151                 if i > 0 {
152                         s = s + " "
153                 }
154                 s = s + v.String()
155         }
156         return s + "]"
157 }
158
159 // Discover adds a volume for every directory named "keep" that is
160 // located at the top level of a device- or tmpfs-backed mount point
161 // other than "/". It returns the number of volumes added.
162 func (vs *volumeSet) Discover() int {
163         added := 0
164         f, err := os.Open(ProcMounts)
165         if err != nil {
166                 log.Fatalf("opening %s: %s", ProcMounts, err)
167         }
168         scanner := bufio.NewScanner(f)
169         for scanner.Scan() {
170                 args := strings.Fields(scanner.Text())
171                 if err := scanner.Err(); err != nil {
172                         log.Fatalf("reading %s: %s", ProcMounts, err)
173                 }
174                 dev, mount := args[0], args[1]
175                 if mount == "/" {
176                         continue
177                 }
178                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
179                         continue
180                 }
181                 keepdir := mount + "/keep"
182                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
183                         continue
184                 }
185                 // Set the -readonly flag (but only for this volume)
186                 // if the filesystem is mounted readonly.
187                 flagReadonlyWas := flagReadonly
188                 for _, fsopt := range strings.Split(args[3], ",") {
189                         if fsopt == "ro" {
190                                 flagReadonly = true
191                                 break
192                         }
193                         if fsopt == "rw" {
194                                 break
195                         }
196                 }
197                 vs.Set(keepdir)
198                 flagReadonly = flagReadonlyWas
199                 added++
200         }
201         return added
202 }
203
204 // TODO(twp): continue moving as much code as possible out of main
205 // so it can be effectively tested. Esp. handling and postprocessing
206 // of command line flags (identifying Keep volumes and initializing
207 // permission arguments).
208
209 func main() {
210         log.Println("keepstore starting, pid", os.Getpid())
211         defer log.Println("keepstore exiting, pid", os.Getpid())
212
213         var (
214                 dataManagerTokenFile string
215                 listen               string
216                 blobSigningKeyFile   string
217                 permissionTTLSec     int
218                 volumes              volumeSet
219                 pidfile              string
220         )
221         flag.StringVar(
222                 &dataManagerTokenFile,
223                 "data-manager-token-file",
224                 "",
225                 "File with the API token used by the Data Manager. All DELETE "+
226                         "requests or GET /index requests must carry this token.")
227         flag.BoolVar(
228                 &enforcePermissions,
229                 "enforce-permissions",
230                 false,
231                 "Enforce permission signatures on requests.")
232         flag.StringVar(
233                 &listen,
234                 "listen",
235                 DefaultAddr,
236                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
237         flag.BoolVar(
238                 &neverDelete,
239                 "never-delete",
240                 true,
241                 "If set, nothing will be deleted. HTTP 405 will be returned "+
242                         "for valid DELETE requests.")
243         flag.StringVar(
244                 &blobSigningKeyFile,
245                 "permission-key-file",
246                 "",
247                 "Synonym for -blob-signing-key-file.")
248         flag.StringVar(
249                 &blobSigningKeyFile,
250                 "blob-signing-key-file",
251                 "",
252                 "File containing the secret key for generating and verifying "+
253                         "blob permission signatures.")
254         flag.IntVar(
255                 &permissionTTLSec,
256                 "permission-ttl",
257                 0,
258                 "Synonym for -blob-signature-ttl.")
259         flag.IntVar(
260                 &permissionTTLSec,
261                 "blob-signature-ttl",
262                 int(time.Duration(2*7*24*time.Hour).Seconds()),
263                 "Lifetime of blob permission signatures. "+
264                         "See services/api/config/application.default.yml.")
265         flag.BoolVar(
266                 &flagSerializeIO,
267                 "serialize",
268                 false,
269                 "Serialize read and write operations on the following volumes.")
270         flag.BoolVar(
271                 &flagReadonly,
272                 "readonly",
273                 false,
274                 "Do not write, delete, or touch anything on the following volumes.")
275         flag.Var(
276                 &volumes,
277                 "volumes",
278                 "Deprecated synonym for -volume.")
279         flag.Var(
280                 &volumes,
281                 "volume",
282                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
283         flag.StringVar(
284                 &pidfile,
285                 "pid",
286                 "",
287                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
288         flag.IntVar(
289                 &maxBuffers,
290                 "max-buffers",
291                 maxBuffers,
292                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
293
294         flag.Parse()
295
296         if neverDelete != true {
297                 log.Fatal("neverDelete must be true, see #6221")
298         }
299
300         if maxBuffers < 0 {
301                 log.Fatal("-max-buffers must be greater than zero.")
302         }
303         bufs = newBufferPool(maxBuffers, BlockSize)
304
305         if pidfile != "" {
306                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
307                 if err != nil {
308                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
309                 }
310                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
311                 if err != nil {
312                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
313                 }
314                 err = f.Truncate(0)
315                 if err != nil {
316                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
317                 }
318                 _, err = fmt.Fprint(f, os.Getpid())
319                 if err != nil {
320                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
321                 }
322                 err = f.Sync()
323                 if err != nil {
324                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
325                 }
326                 defer f.Close()
327                 defer os.Remove(pidfile)
328         }
329
330         if len(volumes) == 0 {
331                 if volumes.Discover() == 0 {
332                         log.Fatal("No volumes found.")
333                 }
334         }
335
336         for _, v := range volumes {
337                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
338         }
339
340         // Initialize data manager token and permission key.
341         // If these tokens are specified but cannot be read,
342         // raise a fatal error.
343         if dataManagerTokenFile != "" {
344                 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
345                         dataManagerToken = strings.TrimSpace(string(buf))
346                 } else {
347                         log.Fatalf("reading data manager token: %s\n", err)
348                 }
349         }
350         if blobSigningKeyFile != "" {
351                 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
352                         PermissionSecret = bytes.TrimSpace(buf)
353                 } else {
354                         log.Fatalf("reading permission key: %s\n", err)
355                 }
356         }
357
358         blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
359
360         if PermissionSecret == nil {
361                 if enforcePermissions {
362                         log.Fatal("-enforce-permissions requires a permission key")
363                 } else {
364                         log.Println("Running without a PermissionSecret. Block locators " +
365                                 "returned by this server will not be signed, and will be rejected " +
366                                 "by a server that enforces permissions.")
367                         log.Println("To fix this, use the -blob-signing-key-file flag " +
368                                 "to specify the file containing the permission key.")
369                 }
370         }
371
372         // Start a round-robin VolumeManager with the volumes we have found.
373         KeepVM = MakeRRVolumeManager(volumes)
374
375         // Tell the built-in HTTP server to direct all requests to the REST router.
376         loggingRouter := MakeLoggingRESTRouter()
377         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
378                 loggingRouter.ServeHTTP(resp, req)
379         })
380
381         // Set up a TCP listener.
382         listener, err := net.Listen("tcp", listen)
383         if err != nil {
384                 log.Fatal(err)
385         }
386
387         // Initialize Pull queue and worker
388         keepClient := &keepclient.KeepClient{
389                 Arvados:       nil,
390                 Want_replicas: 1,
391                 Using_proxy:   true,
392                 Client:        &http.Client{},
393         }
394
395         // Initialize the pullq and worker
396         pullq = NewWorkQueue()
397         go RunPullWorker(pullq, keepClient)
398
399         // Initialize the trashq and worker
400         trashq = NewWorkQueue()
401         go RunTrashWorker(trashq)
402
403         // Shut down the server gracefully (by closing the listener)
404         // if SIGTERM is received.
405         term := make(chan os.Signal, 1)
406         go func(sig <-chan os.Signal) {
407                 s := <-sig
408                 log.Println("caught signal:", s)
409                 listener.Close()
410         }(term)
411         signal.Notify(term, syscall.SIGTERM)
412         signal.Notify(term, syscall.SIGINT)
413
414         log.Println("listening at", listen)
415         srv := &http.Server{Addr: listen}
416         srv.Serve(listener)
417 }