5562: Use static method. Fixes "TypeError: _socket_open() takes exactly 5 arguments...
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/signal"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 // ======================
22 // Configuration settings
23 //
24 // TODO(twp): make all of these configurable via command line flags
25 // and/or configuration file settings.
26
27 // Default TCP address on which to listen for requests.
28 // Initialized by the --listen flag.
29 const DEFAULT_ADDR = ":25107"
30
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
33
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
37
38 var PROC_MOUNTS = "/proc/mounts"
39
40 // enforce_permissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the --enforce-permissions flag.
43 var enforce_permissions bool
44
45 // permission_ttl is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the --permission-ttl flag.
48 var permission_ttl time.Duration
49
50 // data_manager_token represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the --data-manager-token-file flag.
53 var data_manager_token string
54
55 // never_delete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var never_delete = false
58
59 // ==========
60 // Error types.
61 //
62 type KeepError struct {
63         HTTPCode int
64         ErrMsg   string
65 }
66
67 var (
68         BadRequestError     = &KeepError{400, "Bad Request"}
69         UnauthorizedError   = &KeepError{401, "Unauthorized"}
70         CollisionError      = &KeepError{500, "Collision"}
71         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
72         PermissionError     = &KeepError{403, "Forbidden"}
73         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
74         ExpiredError        = &KeepError{401, "Expired permission signature"}
75         NotFoundError       = &KeepError{404, "Not Found"}
76         GenericError        = &KeepError{500, "Fail"}
77         FullError           = &KeepError{503, "Full"}
78         TooLongError        = &KeepError{504, "Timeout"}
79         MethodDisabledError = &KeepError{405, "Method disabled"}
80 )
81
82 func (e *KeepError) Error() string {
83         return e.ErrMsg
84 }
85
86 // ========================
87 // Internal data structures
88 //
89 // These global variables are used by multiple parts of the
90 // program. They are good candidates for moving into their own
91 // packages.
92
93 // The Keep VolumeManager maintains a list of available volumes.
94 // Initialized by the --volumes flag (or by FindKeepVolumes).
95 var KeepVM VolumeManager
96
97 // The pull list manager and trash queue are threadsafe queues which
98 // support atomic update operations. The PullHandler and TrashHandler
99 // store results from Data Manager /pull and /trash requests here.
100 //
101 // See the Keep and Data Manager design documents for more details:
102 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
103 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
104 //
105 var pullq *WorkQueue
106 var trashq *WorkQueue
107
108 var (
109         flagSerializeIO bool
110         flagReadonly    bool
111 )
112 type volumeSet []Volume
113
114 func (vs *volumeSet) Set(value string) error {
115         if dirs := strings.Split(value, ","); len(dirs) > 1 {
116                 log.Print("DEPRECATED: using comma-separated volume list.")
117                 for _, dir := range dirs {
118                         if err := vs.Set(dir); err != nil {
119                                 return err
120                         }
121                 }
122                 return nil
123         }
124         if len(value) == 0 || value[0] != '/' {
125                 return errors.New("Invalid volume: must begin with '/'.")
126         }
127         if _, err := os.Stat(value); err != nil {
128                 return err
129         }
130         *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
131         return nil
132 }
133
134 func (vs *volumeSet) String() string {
135         s := "["
136         for i, v := range *vs {
137                 if i > 0 {
138                         s = s + " "
139                 }
140                 s = s + v.String()
141         }
142         return s + "]"
143 }
144
145 // Discover adds a volume for every directory named "keep" that is
146 // located at the top level of a device- or tmpfs-backed mount point
147 // other than "/". It returns the number of volumes added.
148 func (vs *volumeSet) Discover() int {
149         added := 0
150         f, err := os.Open(PROC_MOUNTS)
151         if err != nil {
152                 log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
153         }
154         scanner := bufio.NewScanner(f)
155         for scanner.Scan() {
156                 args := strings.Fields(scanner.Text())
157                 if err := scanner.Err(); err != nil {
158                         log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
159                 }
160                 dev, mount := args[0], args[1]
161                 if mount == "/" {
162                         continue
163                 }
164                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
165                         continue
166                 }
167                 keepdir := mount + "/keep"
168                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
169                         continue
170                 }
171                 // Set the -readonly flag (but only for this volume)
172                 // if the filesystem is mounted readonly.
173                 flagReadonlyWas := flagReadonly
174                 for _, fsopt := range strings.Split(args[3], ",") {
175                         if fsopt == "ro" {
176                                 flagReadonly = true
177                                 break
178                         }
179                         if fsopt == "rw" {
180                                 break
181                         }
182                 }
183                 vs.Set(keepdir)
184                 flagReadonly = flagReadonlyWas
185                 added++
186         }
187         return added
188 }
189
190 // TODO(twp): continue moving as much code as possible out of main
191 // so it can be effectively tested. Esp. handling and postprocessing
192 // of command line flags (identifying Keep volumes and initializing
193 // permission arguments).
194
195 func main() {
196         log.Println("Keep started: pid", os.Getpid())
197
198         var (
199                 data_manager_token_file string
200                 listen                  string
201                 permission_key_file     string
202                 permission_ttl_sec      int
203                 volumes                 volumeSet
204                 pidfile                 string
205         )
206         flag.StringVar(
207                 &data_manager_token_file,
208                 "data-manager-token-file",
209                 "",
210                 "File with the API token used by the Data Manager. All DELETE "+
211                         "requests or GET /index requests must carry this token.")
212         flag.BoolVar(
213                 &enforce_permissions,
214                 "enforce-permissions",
215                 false,
216                 "Enforce permission signatures on requests.")
217         flag.StringVar(
218                 &listen,
219                 "listen",
220                 DEFAULT_ADDR,
221                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
222         flag.BoolVar(
223                 &never_delete,
224                 "never-delete",
225                 false,
226                 "If set, nothing will be deleted. HTTP 405 will be returned "+
227                         "for valid DELETE requests.")
228         flag.StringVar(
229                 &permission_key_file,
230                 "permission-key-file",
231                 "",
232                 "File containing the secret key for generating and verifying "+
233                         "permission signatures.")
234         flag.IntVar(
235                 &permission_ttl_sec,
236                 "permission-ttl",
237                 1209600,
238                 "Expiration time (in seconds) for newly generated permission "+
239                         "signatures.")
240         flag.BoolVar(
241                 &flagSerializeIO,
242                 "serialize",
243                 false,
244                 "Serialize read and write operations on the following volumes.")
245         flag.BoolVar(
246                 &flagReadonly,
247                 "readonly",
248                 false,
249                 "Do not write, delete, or touch anything on the following volumes.")
250         flag.Var(
251                 &volumes,
252                 "volumes",
253                 "Deprecated synonym for -volume.")
254         flag.Var(
255                 &volumes,
256                 "volume",
257                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
258         flag.StringVar(
259                 &pidfile,
260                 "pid",
261                 "",
262                 "Path to write pid file")
263
264         flag.Parse()
265
266         if len(volumes) == 0 {
267                 if volumes.Discover() == 0 {
268                         log.Fatal("No volumes found.")
269                 }
270         }
271
272         for _, v := range volumes {
273                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
274         }
275
276         // Initialize data manager token and permission key.
277         // If these tokens are specified but cannot be read,
278         // raise a fatal error.
279         if data_manager_token_file != "" {
280                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
281                         data_manager_token = strings.TrimSpace(string(buf))
282                 } else {
283                         log.Fatalf("reading data manager token: %s\n", err)
284                 }
285         }
286         if permission_key_file != "" {
287                 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
288                         PermissionSecret = bytes.TrimSpace(buf)
289                 } else {
290                         log.Fatalf("reading permission key: %s\n", err)
291                 }
292         }
293
294         // Initialize permission TTL
295         permission_ttl = time.Duration(permission_ttl_sec) * time.Second
296
297         // If --enforce-permissions is true, we must have a permission key
298         // to continue.
299         if PermissionSecret == nil {
300                 if enforce_permissions {
301                         log.Fatal("--enforce-permissions requires a permission key")
302                 } else {
303                         log.Println("Running without a PermissionSecret. Block locators " +
304                                 "returned by this server will not be signed, and will be rejected " +
305                                 "by a server that enforces permissions.")
306                         log.Println("To fix this, run Keep with --permission-key-file=<path> " +
307                                 "to define the location of a file containing the permission key.")
308                 }
309         }
310
311         // Start a round-robin VolumeManager with the volumes we have found.
312         KeepVM = MakeRRVolumeManager(volumes)
313
314         // Tell the built-in HTTP server to direct all requests to the REST router.
315         loggingRouter := MakeLoggingRESTRouter()
316         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
317                 loggingRouter.ServeHTTP(resp, req)
318         })
319
320         // Set up a TCP listener.
321         listener, err := net.Listen("tcp", listen)
322         if err != nil {
323                 log.Fatal(err)
324         }
325
326         // Initialize Pull queue and worker
327         keepClient := &keepclient.KeepClient{
328                 Arvados:       nil,
329                 Want_replicas: 1,
330                 Using_proxy:   true,
331                 Client:        &http.Client{},
332         }
333
334         // Initialize the pullq and worker
335         pullq = NewWorkQueue()
336         go RunPullWorker(pullq, keepClient)
337
338         // Initialize the trashq and worker
339         trashq = NewWorkQueue()
340         go RunTrashWorker(trashq)
341
342         // Shut down the server gracefully (by closing the listener)
343         // if SIGTERM is received.
344         term := make(chan os.Signal, 1)
345         go func(sig <-chan os.Signal) {
346                 s := <-sig
347                 log.Println("caught signal:", s)
348                 listener.Close()
349         }(term)
350         signal.Notify(term, syscall.SIGTERM)
351
352         if pidfile != "" {
353                 f, err := os.Create(pidfile)
354                 if err == nil {
355                         fmt.Fprint(f, os.Getpid())
356                         f.Close()
357                 } else {
358                         log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
359                 }
360         }
361
362         // Start listening for requests.
363         srv := &http.Server{Addr: listen}
364         srv.Serve(listener)
365
366         log.Println("shutting down")
367
368         if pidfile != "" {
369                 os.Remove(pidfile)
370         }
371 }