Merge branch 'master' into 6260-test-datamanager
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/signal"
16         "strings"
17         "sync"
18         "syscall"
19         "time"
20 )
21
22 // ======================
23 // Configuration settings
24 //
25 // TODO(twp): make all of these configurable via command line flags
26 // and/or configuration file settings.
27
28 // Default TCP address on which to listen for requests.
29 // Initialized by the --listen flag.
30 const DefaultAddr = ":25107"
31
32 // A Keep "block" is 64MB.
33 const BlockSize = 64 * 1024 * 1024
34
35 // A Keep volume must have at least MinFreeKilobytes available
36 // in order to permit writes.
37 const MinFreeKilobytes = BlockSize / 1024
38
39 // Until #6221 is resolved, never_delete must be true.
40 // However, allow it to be false in testing with TEST_DATA_MANAGER_TOKEN
41 const TEST_DATA_MANAGER_TOKEN = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
42
43 // ProcMounts /proc/mounts
44 var ProcMounts = "/proc/mounts"
45
46 // enforcePermissions controls whether permission signatures
47 // should be enforced (affecting GET and DELETE requests).
48 // Initialized by the -enforce-permissions flag.
49 var enforcePermissions bool
50
51 // blobSignatureTTL is the time duration for which new permission
52 // signatures (returned by PUT requests) will be valid.
53 // Initialized by the -permission-ttl flag.
54 var blobSignatureTTL time.Duration
55
56 // dataManagerToken represents the API token used by the
57 // Data Manager, and is required on certain privileged operations.
58 // Initialized by the -data-manager-token-file flag.
59 var dataManagerToken string
60
61 // neverDelete can be used to prevent the DELETE handler from
62 // actually deleting anything.
63 var neverDelete = true
64
65 var maxBuffers = 128
66 var bufs *bufferPool
67
68 // KeepError types.
69 //
70 type KeepError struct {
71         HTTPCode int
72         ErrMsg   string
73 }
74
75 var (
76         BadRequestError     = &KeepError{400, "Bad Request"}
77         UnauthorizedError   = &KeepError{401, "Unauthorized"}
78         CollisionError      = &KeepError{500, "Collision"}
79         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
80         PermissionError     = &KeepError{403, "Forbidden"}
81         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
82         ExpiredError        = &KeepError{401, "Expired permission signature"}
83         NotFoundError       = &KeepError{404, "Not Found"}
84         GenericError        = &KeepError{500, "Fail"}
85         FullError           = &KeepError{503, "Full"}
86         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
87         TooLongError        = &KeepError{413, "Block is too large"}
88         MethodDisabledError = &KeepError{405, "Method disabled"}
89 )
90
91 func (e *KeepError) Error() string {
92         return e.ErrMsg
93 }
94
95 // ========================
96 // Internal data structures
97 //
98 // These global variables are used by multiple parts of the
99 // program. They are good candidates for moving into their own
100 // packages.
101
102 // The Keep VolumeManager maintains a list of available volumes.
103 // Initialized by the --volumes flag (or by FindKeepVolumes).
104 var KeepVM VolumeManager
105
106 // The pull list manager and trash queue are threadsafe queues which
107 // support atomic update operations. The PullHandler and TrashHandler
108 // store results from Data Manager /pull and /trash requests here.
109 //
110 // See the Keep and Data Manager design documents for more details:
111 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
112 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
113 //
114 var pullq *WorkQueue
115 var trashq *WorkQueue
116
117 var (
118         flagSerializeIO bool
119         flagReadonly    bool
120 )
121
122 type volumeSet []Volume
123
124 func (vs *volumeSet) Set(value string) error {
125         if dirs := strings.Split(value, ","); len(dirs) > 1 {
126                 log.Print("DEPRECATED: using comma-separated volume list.")
127                 for _, dir := range dirs {
128                         if err := vs.Set(dir); err != nil {
129                                 return err
130                         }
131                 }
132                 return nil
133         }
134         if len(value) == 0 || value[0] != '/' {
135                 return errors.New("Invalid volume: must begin with '/'.")
136         }
137         if _, err := os.Stat(value); err != nil {
138                 return err
139         }
140         var locker sync.Locker
141         if flagSerializeIO {
142                 locker = &sync.Mutex{}
143         }
144         *vs = append(*vs, &UnixVolume{
145                 root:     value,
146                 locker:   locker,
147                 readonly: flagReadonly,
148         })
149         return nil
150 }
151
152 func (vs *volumeSet) String() string {
153         s := "["
154         for i, v := range *vs {
155                 if i > 0 {
156                         s = s + " "
157                 }
158                 s = s + v.String()
159         }
160         return s + "]"
161 }
162
163 // Discover adds a volume for every directory named "keep" that is
164 // located at the top level of a device- or tmpfs-backed mount point
165 // other than "/". It returns the number of volumes added.
166 func (vs *volumeSet) Discover() int {
167         added := 0
168         f, err := os.Open(ProcMounts)
169         if err != nil {
170                 log.Fatalf("opening %s: %s", ProcMounts, err)
171         }
172         scanner := bufio.NewScanner(f)
173         for scanner.Scan() {
174                 args := strings.Fields(scanner.Text())
175                 if err := scanner.Err(); err != nil {
176                         log.Fatalf("reading %s: %s", ProcMounts, err)
177                 }
178                 dev, mount := args[0], args[1]
179                 if mount == "/" {
180                         continue
181                 }
182                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
183                         continue
184                 }
185                 keepdir := mount + "/keep"
186                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
187                         continue
188                 }
189                 // Set the -readonly flag (but only for this volume)
190                 // if the filesystem is mounted readonly.
191                 flagReadonlyWas := flagReadonly
192                 for _, fsopt := range strings.Split(args[3], ",") {
193                         if fsopt == "ro" {
194                                 flagReadonly = true
195                                 break
196                         }
197                         if fsopt == "rw" {
198                                 break
199                         }
200                 }
201                 vs.Set(keepdir)
202                 flagReadonly = flagReadonlyWas
203                 added++
204         }
205         return added
206 }
207
208 // TODO(twp): continue moving as much code as possible out of main
209 // so it can be effectively tested. Esp. handling and postprocessing
210 // of command line flags (identifying Keep volumes and initializing
211 // permission arguments).
212
213 func main() {
214         log.Println("keepstore starting, pid", os.Getpid())
215         defer log.Println("keepstore exiting, pid", os.Getpid())
216
217         var (
218                 dataManagerTokenFile string
219                 listen               string
220                 blobSigningKeyFile   string
221                 permissionTTLSec     int
222                 volumes              volumeSet
223                 pidfile              string
224         )
225         flag.StringVar(
226                 &dataManagerTokenFile,
227                 "data-manager-token-file",
228                 "",
229                 "File with the API token used by the Data Manager. All DELETE "+
230                         "requests or GET /index requests must carry this token.")
231         flag.BoolVar(
232                 &enforcePermissions,
233                 "enforce-permissions",
234                 false,
235                 "Enforce permission signatures on requests.")
236         flag.StringVar(
237                 &listen,
238                 "listen",
239                 DefaultAddr,
240                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
241         flag.BoolVar(
242                 &neverDelete,
243                 "never-delete",
244                 true,
245                 "If set, nothing will be deleted. HTTP 405 will be returned "+
246                         "for valid DELETE requests.")
247         flag.StringVar(
248                 &blobSigningKeyFile,
249                 "permission-key-file",
250                 "",
251                 "Synonym for -blob-signing-key-file.")
252         flag.StringVar(
253                 &blobSigningKeyFile,
254                 "blob-signing-key-file",
255                 "",
256                 "File containing the secret key for generating and verifying "+
257                         "blob permission signatures.")
258         flag.IntVar(
259                 &permissionTTLSec,
260                 "permission-ttl",
261                 0,
262                 "Synonym for -blob-signature-ttl.")
263         flag.IntVar(
264                 &permissionTTLSec,
265                 "blob-signature-ttl",
266                 int(time.Duration(2*7*24*time.Hour).Seconds()),
267                 "Lifetime of blob permission signatures. "+
268                         "See services/api/config/application.default.yml.")
269         flag.BoolVar(
270                 &flagSerializeIO,
271                 "serialize",
272                 false,
273                 "Serialize read and write operations on the following volumes.")
274         flag.BoolVar(
275                 &flagReadonly,
276                 "readonly",
277                 false,
278                 "Do not write, delete, or touch anything on the following volumes.")
279         flag.Var(
280                 &volumes,
281                 "volumes",
282                 "Deprecated synonym for -volume.")
283         flag.Var(
284                 &volumes,
285                 "volume",
286                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
287         flag.StringVar(
288                 &pidfile,
289                 "pid",
290                 "",
291                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
292         flag.IntVar(
293                 &maxBuffers,
294                 "max-buffers",
295                 maxBuffers,
296                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
297
298         flag.Parse()
299
300         if maxBuffers < 0 {
301                 log.Fatal("-max-buffers must be greater than zero.")
302         }
303         bufs = newBufferPool(maxBuffers, BlockSize)
304
305         if pidfile != "" {
306                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
307                 if err != nil {
308                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
309                 }
310                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
311                 if err != nil {
312                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
313                 }
314                 err = f.Truncate(0)
315                 if err != nil {
316                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
317                 }
318                 _, err = fmt.Fprint(f, os.Getpid())
319                 if err != nil {
320                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
321                 }
322                 err = f.Sync()
323                 if err != nil {
324                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
325                 }
326                 defer f.Close()
327                 defer os.Remove(pidfile)
328         }
329
330         if len(volumes) == 0 {
331                 if volumes.Discover() == 0 {
332                         log.Fatal("No volumes found.")
333                 }
334         }
335
336         for _, v := range volumes {
337                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
338         }
339
340         // Initialize data manager token and permission key.
341         // If these tokens are specified but cannot be read,
342         // raise a fatal error.
343         if dataManagerTokenFile != "" {
344                 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
345                         dataManagerToken = strings.TrimSpace(string(buf))
346                 } else {
347                         log.Fatalf("reading data manager token: %s\n", err)
348                 }
349         }
350
351         if neverDelete != true && dataManagerToken != TEST_DATA_MANAGER_TOKEN {
352                 log.Fatal("never_delete must be true, see #6221")
353         }
354
355         if blobSigningKeyFile != "" {
356                 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
357                         PermissionSecret = bytes.TrimSpace(buf)
358                 } else {
359                         log.Fatalf("reading permission key: %s\n", err)
360                 }
361         }
362
363         blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
364
365         if PermissionSecret == nil {
366                 if enforcePermissions {
367                         log.Fatal("-enforce-permissions requires a permission key")
368                 } else {
369                         log.Println("Running without a PermissionSecret. Block locators " +
370                                 "returned by this server will not be signed, and will be rejected " +
371                                 "by a server that enforces permissions.")
372                         log.Println("To fix this, use the -blob-signing-key-file flag " +
373                                 "to specify the file containing the permission key.")
374                 }
375         }
376
377         // Start a round-robin VolumeManager with the volumes we have found.
378         KeepVM = MakeRRVolumeManager(volumes)
379
380         // Tell the built-in HTTP server to direct all requests to the REST router.
381         loggingRouter := MakeLoggingRESTRouter()
382         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
383                 loggingRouter.ServeHTTP(resp, req)
384         })
385
386         // Set up a TCP listener.
387         listener, err := net.Listen("tcp", listen)
388         if err != nil {
389                 log.Fatal(err)
390         }
391
392         // Initialize Pull queue and worker
393         keepClient := &keepclient.KeepClient{
394                 Arvados:       nil,
395                 Want_replicas: 1,
396                 Using_proxy:   true,
397                 Client:        &http.Client{},
398         }
399
400         // Initialize the pullq and worker
401         pullq = NewWorkQueue()
402         go RunPullWorker(pullq, keepClient)
403
404         // Initialize the trashq and worker
405         trashq = NewWorkQueue()
406         go RunTrashWorker(trashq)
407
408         // Shut down the server gracefully (by closing the listener)
409         // if SIGTERM is received.
410         term := make(chan os.Signal, 1)
411         go func(sig <-chan os.Signal) {
412                 s := <-sig
413                 log.Println("caught signal:", s)
414                 listener.Close()
415         }(term)
416         signal.Notify(term, syscall.SIGTERM)
417         signal.Notify(term, syscall.SIGINT)
418
419         log.Println("listening at", listen)
420         srv := &http.Server{Addr: listen}
421         srv.Serve(listener)
422 }