Merge branch '5745-serialize-content-only' closes #5745
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/signal"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 // ======================
22 // Configuration settings
23 //
24 // TODO(twp): make all of these configurable via command line flags
25 // and/or configuration file settings.
26
27 // Default TCP address on which to listen for requests.
28 // Initialized by the --listen flag.
29 const DEFAULT_ADDR = ":25107"
30
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
33
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
37
38 var PROC_MOUNTS = "/proc/mounts"
39
40 // enforce_permissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the -enforce-permissions flag.
43 var enforce_permissions bool
44
45 // blob_signature_ttl is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the -permission-ttl flag.
48 var blob_signature_ttl time.Duration
49
50 // data_manager_token represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the -data-manager-token-file flag.
53 var data_manager_token string
54
55 // never_delete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var never_delete = false
58
59 // ==========
60 // Error types.
61 //
62 type KeepError struct {
63         HTTPCode int
64         ErrMsg   string
65 }
66
67 var (
68         BadRequestError     = &KeepError{400, "Bad Request"}
69         UnauthorizedError   = &KeepError{401, "Unauthorized"}
70         CollisionError      = &KeepError{500, "Collision"}
71         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
72         PermissionError     = &KeepError{403, "Forbidden"}
73         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
74         ExpiredError        = &KeepError{401, "Expired permission signature"}
75         NotFoundError       = &KeepError{404, "Not Found"}
76         GenericError        = &KeepError{500, "Fail"}
77         FullError           = &KeepError{503, "Full"}
78         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
79         TooLongError        = &KeepError{413, "Block is too large"}
80         MethodDisabledError = &KeepError{405, "Method disabled"}
81 )
82
83 func (e *KeepError) Error() string {
84         return e.ErrMsg
85 }
86
87 // ========================
88 // Internal data structures
89 //
90 // These global variables are used by multiple parts of the
91 // program. They are good candidates for moving into their own
92 // packages.
93
94 // The Keep VolumeManager maintains a list of available volumes.
95 // Initialized by the --volumes flag (or by FindKeepVolumes).
96 var KeepVM VolumeManager
97
98 // The pull list manager and trash queue are threadsafe queues which
99 // support atomic update operations. The PullHandler and TrashHandler
100 // store results from Data Manager /pull and /trash requests here.
101 //
102 // See the Keep and Data Manager design documents for more details:
103 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
104 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
105 //
106 var pullq *WorkQueue
107 var trashq *WorkQueue
108
109 var (
110         flagSerializeIO bool
111         flagReadonly    bool
112 )
113
114 type volumeSet []Volume
115
116 func (vs *volumeSet) Set(value string) error {
117         if dirs := strings.Split(value, ","); len(dirs) > 1 {
118                 log.Print("DEPRECATED: using comma-separated volume list.")
119                 for _, dir := range dirs {
120                         if err := vs.Set(dir); err != nil {
121                                 return err
122                         }
123                 }
124                 return nil
125         }
126         if len(value) == 0 || value[0] != '/' {
127                 return errors.New("Invalid volume: must begin with '/'.")
128         }
129         if _, err := os.Stat(value); err != nil {
130                 return err
131         }
132         *vs = append(*vs, &UnixVolume{
133                 root:      value,
134                 serialize: flagSerializeIO,
135                 readonly:  flagReadonly,
136         })
137         return nil
138 }
139
140 func (vs *volumeSet) String() string {
141         s := "["
142         for i, v := range *vs {
143                 if i > 0 {
144                         s = s + " "
145                 }
146                 s = s + v.String()
147         }
148         return s + "]"
149 }
150
151 // Discover adds a volume for every directory named "keep" that is
152 // located at the top level of a device- or tmpfs-backed mount point
153 // other than "/". It returns the number of volumes added.
154 func (vs *volumeSet) Discover() int {
155         added := 0
156         f, err := os.Open(PROC_MOUNTS)
157         if err != nil {
158                 log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
159         }
160         scanner := bufio.NewScanner(f)
161         for scanner.Scan() {
162                 args := strings.Fields(scanner.Text())
163                 if err := scanner.Err(); err != nil {
164                         log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
165                 }
166                 dev, mount := args[0], args[1]
167                 if mount == "/" {
168                         continue
169                 }
170                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
171                         continue
172                 }
173                 keepdir := mount + "/keep"
174                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
175                         continue
176                 }
177                 // Set the -readonly flag (but only for this volume)
178                 // if the filesystem is mounted readonly.
179                 flagReadonlyWas := flagReadonly
180                 for _, fsopt := range strings.Split(args[3], ",") {
181                         if fsopt == "ro" {
182                                 flagReadonly = true
183                                 break
184                         }
185                         if fsopt == "rw" {
186                                 break
187                         }
188                 }
189                 vs.Set(keepdir)
190                 flagReadonly = flagReadonlyWas
191                 added++
192         }
193         return added
194 }
195
196 // TODO(twp): continue moving as much code as possible out of main
197 // so it can be effectively tested. Esp. handling and postprocessing
198 // of command line flags (identifying Keep volumes and initializing
199 // permission arguments).
200
201 func main() {
202         log.Println("Keep started: pid", os.Getpid())
203
204         var (
205                 data_manager_token_file string
206                 listen                  string
207                 blob_signing_key_file   string
208                 permission_ttl_sec      int
209                 volumes                 volumeSet
210                 pidfile                 string
211         )
212         flag.StringVar(
213                 &data_manager_token_file,
214                 "data-manager-token-file",
215                 "",
216                 "File with the API token used by the Data Manager. All DELETE "+
217                         "requests or GET /index requests must carry this token.")
218         flag.BoolVar(
219                 &enforce_permissions,
220                 "enforce-permissions",
221                 false,
222                 "Enforce permission signatures on requests.")
223         flag.StringVar(
224                 &listen,
225                 "listen",
226                 DEFAULT_ADDR,
227                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
228         flag.BoolVar(
229                 &never_delete,
230                 "never-delete",
231                 false,
232                 "If set, nothing will be deleted. HTTP 405 will be returned "+
233                         "for valid DELETE requests.")
234         flag.StringVar(
235                 &blob_signing_key_file,
236                 "permission-key-file",
237                 "",
238                 "Synonym for -blob-signing-key-file.")
239         flag.StringVar(
240                 &blob_signing_key_file,
241                 "blob-signing-key-file",
242                 "",
243                 "File containing the secret key for generating and verifying "+
244                         "blob permission signatures.")
245         flag.IntVar(
246                 &permission_ttl_sec,
247                 "permission-ttl",
248                 0,
249                 "Synonym for -blob-signature-ttl.")
250         flag.IntVar(
251                 &permission_ttl_sec,
252                 "blob-signature-ttl",
253                 int(time.Duration(2*7*24*time.Hour).Seconds()),
254                 "Lifetime of blob permission signatures. "+
255                         "See services/api/config/application.default.yml.")
256         flag.BoolVar(
257                 &flagSerializeIO,
258                 "serialize",
259                 false,
260                 "Serialize read and write operations on the following volumes.")
261         flag.BoolVar(
262                 &flagReadonly,
263                 "readonly",
264                 false,
265                 "Do not write, delete, or touch anything on the following volumes.")
266         flag.Var(
267                 &volumes,
268                 "volumes",
269                 "Deprecated synonym for -volume.")
270         flag.Var(
271                 &volumes,
272                 "volume",
273                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
274         flag.StringVar(
275                 &pidfile,
276                 "pid",
277                 "",
278                 "Path to write pid file")
279
280         flag.Parse()
281
282         if len(volumes) == 0 {
283                 if volumes.Discover() == 0 {
284                         log.Fatal("No volumes found.")
285                 }
286         }
287
288         for _, v := range volumes {
289                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
290         }
291
292         // Initialize data manager token and permission key.
293         // If these tokens are specified but cannot be read,
294         // raise a fatal error.
295         if data_manager_token_file != "" {
296                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
297                         data_manager_token = strings.TrimSpace(string(buf))
298                 } else {
299                         log.Fatalf("reading data manager token: %s\n", err)
300                 }
301         }
302         if blob_signing_key_file != "" {
303                 if buf, err := ioutil.ReadFile(blob_signing_key_file); err == nil {
304                         PermissionSecret = bytes.TrimSpace(buf)
305                 } else {
306                         log.Fatalf("reading permission key: %s\n", err)
307                 }
308         }
309
310         blob_signature_ttl = time.Duration(permission_ttl_sec) * time.Second
311
312         if PermissionSecret == nil {
313                 if enforce_permissions {
314                         log.Fatal("-enforce-permissions requires a permission key")
315                 } else {
316                         log.Println("Running without a PermissionSecret. Block locators " +
317                                 "returned by this server will not be signed, and will be rejected " +
318                                 "by a server that enforces permissions.")
319                         log.Println("To fix this, use the -permission-key-file flag " +
320                                 "to specify the file containing the permission key.")
321                 }
322         }
323
324         // Start a round-robin VolumeManager with the volumes we have found.
325         KeepVM = MakeRRVolumeManager(volumes)
326
327         // Tell the built-in HTTP server to direct all requests to the REST router.
328         loggingRouter := MakeLoggingRESTRouter()
329         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
330                 loggingRouter.ServeHTTP(resp, req)
331         })
332
333         // Set up a TCP listener.
334         listener, err := net.Listen("tcp", listen)
335         if err != nil {
336                 log.Fatal(err)
337         }
338
339         // Initialize Pull queue and worker
340         keepClient := &keepclient.KeepClient{
341                 Arvados:       nil,
342                 Want_replicas: 1,
343                 Using_proxy:   true,
344                 Client:        &http.Client{},
345         }
346
347         // Initialize the pullq and worker
348         pullq = NewWorkQueue()
349         go RunPullWorker(pullq, keepClient)
350
351         // Initialize the trashq and worker
352         trashq = NewWorkQueue()
353         go RunTrashWorker(trashq)
354
355         // Shut down the server gracefully (by closing the listener)
356         // if SIGTERM is received.
357         term := make(chan os.Signal, 1)
358         go func(sig <-chan os.Signal) {
359                 s := <-sig
360                 log.Println("caught signal:", s)
361                 listener.Close()
362         }(term)
363         signal.Notify(term, syscall.SIGTERM)
364
365         if pidfile != "" {
366                 f, err := os.Create(pidfile)
367                 if err == nil {
368                         fmt.Fprint(f, os.Getpid())
369                         f.Close()
370                 } else {
371                         log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
372                 }
373         }
374
375         // Start listening for requests.
376         srv := &http.Server{Addr: listen}
377         srv.Serve(listener)
378
379         log.Println("shutting down")
380
381         if pidfile != "" {
382                 os.Remove(pidfile)
383         }
384 }