3762: undo the api server discovery document update. trash worker can instead use...
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         "io/ioutil"
9         "log"
10         "net"
11         "net/http"
12         "os"
13         "os/signal"
14         "strings"
15         "syscall"
16         "time"
17 )
18
19 // ======================
20 // Configuration settings
21 //
22 // TODO(twp): make all of these configurable via command line flags
23 // and/or configuration file settings.
24
25 // Default TCP address on which to listen for requests.
26 // Initialized by the --listen flag.
27 const DEFAULT_ADDR = ":25107"
28
29 // A Keep "block" is 64MB.
30 const BLOCKSIZE = 64 * 1024 * 1024
31
32 // A Keep volume must have at least MIN_FREE_KILOBYTES available
33 // in order to permit writes.
34 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
35
36 var PROC_MOUNTS = "/proc/mounts"
37
38 // enforce_permissions controls whether permission signatures
39 // should be enforced (affecting GET and DELETE requests).
40 // Initialized by the --enforce-permissions flag.
41 var enforce_permissions bool
42
43 // permission_ttl is the time duration for which new permission
44 // signatures (returned by PUT requests) will be valid.
45 // Initialized by the --permission-ttl flag.
46 var permission_ttl time.Duration
47
48 // data_manager_token represents the API token used by the
49 // Data Manager, and is required on certain privileged operations.
50 // Initialized by the --data-manager-token-file flag.
51 var data_manager_token string
52
53 // never_delete can be used to prevent the DELETE handler from
54 // actually deleting anything.
55 var never_delete = false
56
57 // ==========
58 // Error types.
59 //
60 type KeepError struct {
61         HTTPCode int
62         ErrMsg   string
63 }
64
65 var (
66         BadRequestError     = &KeepError{400, "Bad Request"}
67         UnauthorizedError   = &KeepError{401, "Unauthorized"}
68         CollisionError      = &KeepError{500, "Collision"}
69         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
70         PermissionError     = &KeepError{403, "Forbidden"}
71         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
72         ExpiredError        = &KeepError{401, "Expired permission signature"}
73         NotFoundError       = &KeepError{404, "Not Found"}
74         GenericError        = &KeepError{500, "Fail"}
75         FullError           = &KeepError{503, "Full"}
76         TooLongError        = &KeepError{504, "Timeout"}
77         MethodDisabledError = &KeepError{405, "Method disabled"}
78 )
79
80 func (e *KeepError) Error() string {
81         return e.ErrMsg
82 }
83
84 // ========================
85 // Internal data structures
86 //
87 // These global variables are used by multiple parts of the
88 // program. They are good candidates for moving into their own
89 // packages.
90
91 // The Keep VolumeManager maintains a list of available volumes.
92 // Initialized by the --volumes flag (or by FindKeepVolumes).
93 var KeepVM VolumeManager
94
95 // The pull list manager and trash queue are threadsafe queues which
96 // support atomic update operations. The PullHandler and TrashHandler
97 // store results from Data Manager /pull and /trash requests here.
98 //
99 // See the Keep and Data Manager design documents for more details:
100 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
101 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
102 //
103 var pullq *WorkQueue
104 var trashq *WorkQueue
105
106 // TODO(twp): continue moving as much code as possible out of main
107 // so it can be effectively tested. Esp. handling and postprocessing
108 // of command line flags (identifying Keep volumes and initializing
109 // permission arguments).
110
111 func main() {
112         log.Println("Keep started: pid", os.Getpid())
113
114         // Parse command-line flags:
115         //
116         // -listen=ipaddr:port
117         //    Interface on which to listen for requests. Use :port without
118         //    an ipaddr to listen on all network interfaces.
119         //    Examples:
120         //      -listen=127.0.0.1:4949
121         //      -listen=10.0.1.24:8000
122         //      -listen=:25107 (to listen to port 25107 on all interfaces)
123         //
124         // -volumes
125         //    A comma-separated list of directories to use as Keep volumes.
126         //    Example:
127         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
128         //
129         //    If -volumes is empty or is not present, Keep will select volumes
130         //    by looking at currently mounted filesystems for /keep top-level
131         //    directories.
132
133         var (
134                 data_manager_token_file string
135                 listen                  string
136                 permission_key_file     string
137                 permission_ttl_sec      int
138                 serialize_io            bool
139                 volumearg               string
140                 pidfile                 string
141         )
142         flag.StringVar(
143                 &data_manager_token_file,
144                 "data-manager-token-file",
145                 "",
146                 "File with the API token used by the Data Manager. All DELETE "+
147                         "requests or GET /index requests must carry this token.")
148         flag.BoolVar(
149                 &enforce_permissions,
150                 "enforce-permissions",
151                 false,
152                 "Enforce permission signatures on requests.")
153         flag.StringVar(
154                 &listen,
155                 "listen",
156                 DEFAULT_ADDR,
157                 "Interface on which to listen for requests, in the format "+
158                         "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
159                         "to listen on all network interfaces.")
160         flag.BoolVar(
161                 &never_delete,
162                 "never-delete",
163                 false,
164                 "If set, nothing will be deleted. HTTP 405 will be returned "+
165                         "for valid DELETE requests.")
166         flag.StringVar(
167                 &permission_key_file,
168                 "permission-key-file",
169                 "",
170                 "File containing the secret key for generating and verifying "+
171                         "permission signatures.")
172         flag.IntVar(
173                 &permission_ttl_sec,
174                 "permission-ttl",
175                 1209600,
176                 "Expiration time (in seconds) for newly generated permission "+
177                         "signatures.")
178         flag.BoolVar(
179                 &serialize_io,
180                 "serialize",
181                 false,
182                 "If set, all read and write operations on local Keep volumes will "+
183                         "be serialized.")
184         flag.StringVar(
185                 &volumearg,
186                 "volumes",
187                 "",
188                 "Comma-separated list of directories to use for Keep volumes, "+
189                         "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
190                         "supplied, Keep will scan mounted filesystems for volumes "+
191                         "with a /keep top-level directory.")
192
193         flag.StringVar(
194                 &pidfile,
195                 "pid",
196                 "",
197                 "Path to write pid file")
198
199         flag.Parse()
200
201         // Look for local keep volumes.
202         var keepvols []string
203         if volumearg == "" {
204                 // TODO(twp): decide whether this is desirable default behavior.
205                 // In production we may want to require the admin to specify
206                 // Keep volumes explicitly.
207                 keepvols = FindKeepVolumes()
208         } else {
209                 keepvols = strings.Split(volumearg, ",")
210         }
211
212         // Check that the specified volumes actually exist.
213         var goodvols []Volume = nil
214         for _, v := range keepvols {
215                 if _, err := os.Stat(v); err == nil {
216                         log.Println("adding Keep volume:", v)
217                         newvol := MakeUnixVolume(v, serialize_io)
218                         goodvols = append(goodvols, &newvol)
219                 } else {
220                         log.Printf("bad Keep volume: %s\n", err)
221                 }
222         }
223
224         if len(goodvols) == 0 {
225                 log.Fatal("could not find any keep volumes")
226         }
227
228         // Initialize data manager token and permission key.
229         // If these tokens are specified but cannot be read,
230         // raise a fatal error.
231         if data_manager_token_file != "" {
232                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
233                         data_manager_token = strings.TrimSpace(string(buf))
234                 } else {
235                         log.Fatalf("reading data manager token: %s\n", err)
236                 }
237         }
238         if permission_key_file != "" {
239                 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
240                         PermissionSecret = bytes.TrimSpace(buf)
241                 } else {
242                         log.Fatalf("reading permission key: %s\n", err)
243                 }
244         }
245
246         // Initialize permission TTL
247         permission_ttl = time.Duration(permission_ttl_sec) * time.Second
248
249         // If --enforce-permissions is true, we must have a permission key
250         // to continue.
251         if PermissionSecret == nil {
252                 if enforce_permissions {
253                         log.Fatal("--enforce-permissions requires a permission key")
254                 } else {
255                         log.Println("Running without a PermissionSecret. Block locators " +
256                                 "returned by this server will not be signed, and will be rejected " +
257                                 "by a server that enforces permissions.")
258                         log.Println("To fix this, run Keep with --permission-key-file=<path> " +
259                                 "to define the location of a file containing the permission key.")
260                 }
261         }
262
263         // Start a round-robin VolumeManager with the volumes we have found.
264         KeepVM = MakeRRVolumeManager(goodvols)
265
266         // Tell the built-in HTTP server to direct all requests to the REST router.
267         loggingRouter := MakeLoggingRESTRouter()
268         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
269                 loggingRouter.ServeHTTP(resp, req)
270         })
271
272         // Set up a TCP listener.
273         listener, err := net.Listen("tcp", listen)
274         if err != nil {
275                 log.Fatal(err)
276         }
277
278         // Initialize Pull queue and worker
279         keepClient := keepclient.KeepClient{
280                 Arvados:       nil,
281                 Want_replicas: 1,
282                 Using_proxy:   true,
283                 Client:        &http.Client{},
284         }
285
286         // Initialize the pullq and worker
287         pullq = NewWorkQueue()
288         go RunPullWorker(pullq, keepClient)
289
290         // Initialize the trashq and worker
291         trashq = NewWorkQueue()
292         go RunTrashWorker(trashq)
293
294         // Shut down the server gracefully (by closing the listener)
295         // if SIGTERM is received.
296         term := make(chan os.Signal, 1)
297         go func(sig <-chan os.Signal) {
298                 s := <-sig
299                 log.Println("caught signal:", s)
300                 listener.Close()
301         }(term)
302         signal.Notify(term, syscall.SIGTERM)
303
304         if pidfile != "" {
305                 f, err := os.Create(pidfile)
306                 if err == nil {
307                         fmt.Fprint(f, os.Getpid())
308                         f.Close()
309                 } else {
310                         log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
311                 }
312         }
313
314         // Start listening for requests.
315         srv := &http.Server{Addr: listen}
316         srv.Serve(listener)
317
318         log.Println("shutting down")
319
320         if pidfile != "" {
321                 os.Remove(pidfile)
322         }
323 }