ca796676f3964ddb10a31fd60917442e7809f9bd
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "io/ioutil"
8         "log"
9         "net"
10         "net/http"
11         "os"
12         "os/signal"
13         "strings"
14         "syscall"
15         "time"
16         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17         "git.curoverse.com/arvados.git/sdk/go/keepclient"
18 )
19
20 // ======================
21 // Configuration settings
22 //
23 // TODO(twp): make all of these configurable via command line flags
24 // and/or configuration file settings.
25
26 // Default TCP address on which to listen for requests.
27 // Initialized by the --listen flag.
28 const DEFAULT_ADDR = ":25107"
29
30 // A Keep "block" is 64MB.
31 const BLOCKSIZE = 64 * 1024 * 1024
32
33 // A Keep volume must have at least MIN_FREE_KILOBYTES available
34 // in order to permit writes.
35 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
36
37 var PROC_MOUNTS = "/proc/mounts"
38
39 // enforce_permissions controls whether permission signatures
40 // should be enforced (affecting GET and DELETE requests).
41 // Initialized by the --enforce-permissions flag.
42 var enforce_permissions bool
43
44 // permission_ttl is the time duration for which new permission
45 // signatures (returned by PUT requests) will be valid.
46 // Initialized by the --permission-ttl flag.
47 var permission_ttl time.Duration
48
49 // data_manager_token represents the API token used by the
50 // Data Manager, and is required on certain privileged operations.
51 // Initialized by the --data-manager-token-file flag.
52 var data_manager_token string
53
54 // never_delete can be used to prevent the DELETE handler from
55 // actually deleting anything.
56 var never_delete = false
57
58 // ==========
59 // Error types.
60 //
61 type KeepError struct {
62         HTTPCode int
63         ErrMsg   string
64 }
65
66 var (
67         BadRequestError     = &KeepError{400, "Bad Request"}
68         UnauthorizedError   = &KeepError{401, "Unauthorized"}
69         CollisionError      = &KeepError{500, "Collision"}
70         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
71         PermissionError     = &KeepError{403, "Forbidden"}
72         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
73         ExpiredError        = &KeepError{401, "Expired permission signature"}
74         NotFoundError       = &KeepError{404, "Not Found"}
75         GenericError        = &KeepError{500, "Fail"}
76         FullError           = &KeepError{503, "Full"}
77         TooLongError        = &KeepError{504, "Timeout"}
78         MethodDisabledError = &KeepError{405, "Method disabled"}
79 )
80
81 func (e *KeepError) Error() string {
82         return e.ErrMsg
83 }
84
85 // ========================
86 // Internal data structures
87 //
88 // These global variables are used by multiple parts of the
89 // program. They are good candidates for moving into their own
90 // packages.
91
92 // The Keep VolumeManager maintains a list of available volumes.
93 // Initialized by the --volumes flag (or by FindKeepVolumes).
94 var KeepVM VolumeManager
95
96 // The pull list manager and trash queue are threadsafe queues which
97 // support atomic update operations. The PullHandler and TrashHandler
98 // store results from Data Manager /pull and /trash requests here.
99 //
100 // See the Keep and Data Manager design documents for more details:
101 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
102 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
103 //
104 var pullq *WorkQueue
105 var trashq *WorkQueue
106
107 // TODO(twp): continue moving as much code as possible out of main
108 // so it can be effectively tested. Esp. handling and postprocessing
109 // of command line flags (identifying Keep volumes and initializing
110 // permission arguments).
111
112 func main() {
113         log.Println("Keep started: pid", os.Getpid())
114
115         // Parse command-line flags:
116         //
117         // -listen=ipaddr:port
118         //    Interface on which to listen for requests. Use :port without
119         //    an ipaddr to listen on all network interfaces.
120         //    Examples:
121         //      -listen=127.0.0.1:4949
122         //      -listen=10.0.1.24:8000
123         //      -listen=:25107 (to listen to port 25107 on all interfaces)
124         //
125         // -volumes
126         //    A comma-separated list of directories to use as Keep volumes.
127         //    Example:
128         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
129         //
130         //    If -volumes is empty or is not present, Keep will select volumes
131         //    by looking at currently mounted filesystems for /keep top-level
132         //    directories.
133
134         var (
135                 data_manager_token_file string
136                 listen                  string
137                 permission_key_file     string
138                 permission_ttl_sec      int
139                 serialize_io            bool
140                 volumearg               string
141                 pidfile                 string
142         )
143         flag.StringVar(
144                 &data_manager_token_file,
145                 "data-manager-token-file",
146                 "",
147                 "File with the API token used by the Data Manager. All DELETE "+
148                         "requests or GET /index requests must carry this token.")
149         flag.BoolVar(
150                 &enforce_permissions,
151                 "enforce-permissions",
152                 false,
153                 "Enforce permission signatures on requests.")
154         flag.StringVar(
155                 &listen,
156                 "listen",
157                 DEFAULT_ADDR,
158                 "Interface on which to listen for requests, in the format "+
159                         "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
160                         "to listen on all network interfaces.")
161         flag.BoolVar(
162                 &never_delete,
163                 "never-delete",
164                 false,
165                 "If set, nothing will be deleted. HTTP 405 will be returned "+
166                         "for valid DELETE requests.")
167         flag.StringVar(
168                 &permission_key_file,
169                 "permission-key-file",
170                 "",
171                 "File containing the secret key for generating and verifying "+
172                         "permission signatures.")
173         flag.IntVar(
174                 &permission_ttl_sec,
175                 "permission-ttl",
176                 1209600,
177                 "Expiration time (in seconds) for newly generated permission "+
178                         "signatures.")
179         flag.BoolVar(
180                 &serialize_io,
181                 "serialize",
182                 false,
183                 "If set, all read and write operations on local Keep volumes will "+
184                         "be serialized.")
185         flag.StringVar(
186                 &volumearg,
187                 "volumes",
188                 "",
189                 "Comma-separated list of directories to use for Keep volumes, "+
190                         "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
191                         "supplied, Keep will scan mounted filesystems for volumes "+
192                         "with a /keep top-level directory.")
193
194         flag.StringVar(
195                 &pidfile,
196                 "pid",
197                 "",
198                 "Path to write pid file")
199
200         flag.Parse()
201
202         // Look for local keep volumes.
203         var keepvols []string
204         if volumearg == "" {
205                 // TODO(twp): decide whether this is desirable default behavior.
206                 // In production we may want to require the admin to specify
207                 // Keep volumes explicitly.
208                 keepvols = FindKeepVolumes()
209         } else {
210                 keepvols = strings.Split(volumearg, ",")
211         }
212
213         // Check that the specified volumes actually exist.
214         var goodvols []Volume = nil
215         for _, v := range keepvols {
216                 if _, err := os.Stat(v); err == nil {
217                         log.Println("adding Keep volume:", v)
218                         newvol := MakeUnixVolume(v, serialize_io)
219                         goodvols = append(goodvols, &newvol)
220                 } else {
221                         log.Printf("bad Keep volume: %s\n", err)
222                 }
223         }
224
225         if len(goodvols) == 0 {
226                 log.Fatal("could not find any keep volumes")
227         }
228
229         // Initialize data manager token and permission key.
230         // If these tokens are specified but cannot be read,
231         // raise a fatal error.
232         if data_manager_token_file != "" {
233                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
234                         data_manager_token = strings.TrimSpace(string(buf))
235                 } else {
236                         log.Fatalf("reading data manager token: %s\n", err)
237                 }
238         }
239         if permission_key_file != "" {
240                 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
241                         PermissionSecret = bytes.TrimSpace(buf)
242                 } else {
243                         log.Fatalf("reading permission key: %s\n", err)
244                 }
245         }
246
247         // Initialize permission TTL
248         permission_ttl = time.Duration(permission_ttl_sec) * time.Second
249
250         // If --enforce-permissions is true, we must have a permission key
251         // to continue.
252         if PermissionSecret == nil {
253                 if enforce_permissions {
254                         log.Fatal("--enforce-permissions requires a permission key")
255                 } else {
256                         log.Println("Running without a PermissionSecret. Block locators " +
257                                 "returned by this server will not be signed, and will be rejected " +
258                                 "by a server that enforces permissions.")
259                         log.Println("To fix this, run Keep with --permission-key-file=<path> " +
260                                 "to define the location of a file containing the permission key.")
261                 }
262         }
263
264         // Start a round-robin VolumeManager with the volumes we have found.
265         KeepVM = MakeRRVolumeManager(goodvols)
266
267         // Tell the built-in HTTP server to direct all requests to the REST router.
268         loggingRouter := MakeLoggingRESTRouter()
269         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
270                 loggingRouter.ServeHTTP(resp, req)
271         })
272
273         // Set up a TCP listener.
274         listener, err := net.Listen("tcp", listen)
275         if err != nil {
276                 log.Fatal(err)
277         }
278
279         // Initialize Pull queue and worker
280         arv, err := arvadosclient.MakeArvadosClient()
281         if err != nil {
282                 log.Fatalf("Error setting up arvados client %s", err.Error())
283         }
284
285         keepClient, err := keepclient.MakeKeepClient(&arv)
286         if err != nil {
287                 log.Fatalf("Error setting up keep client %s", err.Error())
288         }
289
290         pullq = NewWorkQueue()
291         go RunPullWorker(pullq.NextItem, keepClient)
292
293         // Shut down the server gracefully (by closing the listener)
294         // if SIGTERM is received.
295         term := make(chan os.Signal, 1)
296         go func(sig <-chan os.Signal) {
297                 s := <-sig
298                 log.Println("caught signal:", s)
299                 listener.Close()
300         }(term)
301         signal.Notify(term, syscall.SIGTERM)
302
303         if pidfile != "" {
304                 f, err := os.Create(pidfile)
305                 if err == nil {
306                         fmt.Fprint(f, os.Getpid())
307                         f.Close()
308                 } else {
309                         log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
310                 }
311         }
312
313         // Start listening for requests.
314         srv := &http.Server{Addr: listen}
315         srv.Serve(listener)
316
317         log.Println("shutting down")
318
319         if pidfile != "" {
320                 os.Remove(pidfile)
321         }
322 }