8178: rename Delete api as Trash; add Untrash to volume interface; add UndeleteHandle...
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         "io/ioutil"
9         "log"
10         "net"
11         "net/http"
12         "os"
13         "os/signal"
14         "strings"
15         "syscall"
16         "time"
17 )
18
19 // ======================
20 // Configuration settings
21 //
22 // TODO(twp): make all of these configurable via command line flags
23 // and/or configuration file settings.
24
25 // Default TCP address on which to listen for requests.
26 // Initialized by the --listen flag.
27 const DefaultAddr = ":25107"
28
29 // A Keep "block" is 64MB.
30 const BlockSize = 64 * 1024 * 1024
31
32 // A Keep volume must have at least MinFreeKilobytes available
33 // in order to permit writes.
34 const MinFreeKilobytes = BlockSize / 1024
35
36 // ProcMounts /proc/mounts
37 var ProcMounts = "/proc/mounts"
38
39 // enforcePermissions controls whether permission signatures
40 // should be enforced (affecting GET and DELETE requests).
41 // Initialized by the -enforce-permissions flag.
42 var enforcePermissions bool
43
44 // blobSignatureTTL is the time duration for which new permission
45 // signatures (returned by PUT requests) will be valid.
46 // Initialized by the -permission-ttl flag.
47 var blobSignatureTTL time.Duration
48
49 // dataManagerToken represents the API token used by the
50 // Data Manager, and is required on certain privileged operations.
51 // Initialized by the -data-manager-token-file flag.
52 var dataManagerToken string
53
54 // neverDelete can be used to prevent the DELETE handler from
55 // actually deleting anything.
56 var neverDelete = true
57
58 var maxBuffers = 128
59 var bufs *bufferPool
60
61 // KeepError types.
62 //
63 type KeepError struct {
64         HTTPCode int
65         ErrMsg   string
66 }
67
68 var (
69         BadRequestError     = &KeepError{400, "Bad Request"}
70         UnauthorizedError   = &KeepError{401, "Unauthorized"}
71         CollisionError      = &KeepError{500, "Collision"}
72         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
73         PermissionError     = &KeepError{403, "Forbidden"}
74         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
75         ExpiredError        = &KeepError{401, "Expired permission signature"}
76         NotFoundError       = &KeepError{404, "Not Found"}
77         GenericError        = &KeepError{500, "Fail"}
78         FullError           = &KeepError{503, "Full"}
79         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
80         TooLongError        = &KeepError{413, "Block is too large"}
81         MethodDisabledError = &KeepError{405, "Method disabled"}
82         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
83 )
84
85 func (e *KeepError) Error() string {
86         return e.ErrMsg
87 }
88
89 // ========================
90 // Internal data structures
91 //
92 // These global variables are used by multiple parts of the
93 // program. They are good candidates for moving into their own
94 // packages.
95
96 // The Keep VolumeManager maintains a list of available volumes.
97 // Initialized by the --volumes flag (or by FindKeepVolumes).
98 var KeepVM VolumeManager
99
100 // The pull list manager and trash queue are threadsafe queues which
101 // support atomic update operations. The PullHandler and TrashHandler
102 // store results from Data Manager /pull and /trash requests here.
103 //
104 // See the Keep and Data Manager design documents for more details:
105 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
106 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
107 //
108 var pullq *WorkQueue
109 var trashq *WorkQueue
110
111 type volumeSet []Volume
112
113 var (
114         flagSerializeIO bool
115         flagReadonly    bool
116         volumes         volumeSet
117         trashLifetime   int
118 )
119
120 func (vs *volumeSet) String() string {
121         return fmt.Sprintf("%+v", (*vs)[:])
122 }
123
124 // TODO(twp): continue moving as much code as possible out of main
125 // so it can be effectively tested. Esp. handling and postprocessing
126 // of command line flags (identifying Keep volumes and initializing
127 // permission arguments).
128
129 func main() {
130         log.Println("keepstore starting, pid", os.Getpid())
131         defer log.Println("keepstore exiting, pid", os.Getpid())
132
133         var (
134                 dataManagerTokenFile string
135                 listen               string
136                 blobSigningKeyFile   string
137                 permissionTTLSec     int
138                 pidfile              string
139         )
140         flag.StringVar(
141                 &dataManagerTokenFile,
142                 "data-manager-token-file",
143                 "",
144                 "File with the API token used by the Data Manager. All DELETE "+
145                         "requests or GET /index requests must carry this token.")
146         flag.BoolVar(
147                 &enforcePermissions,
148                 "enforce-permissions",
149                 false,
150                 "Enforce permission signatures on requests.")
151         flag.StringVar(
152                 &listen,
153                 "listen",
154                 DefaultAddr,
155                 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
156         flag.BoolVar(
157                 &neverDelete,
158                 "never-delete",
159                 true,
160                 "If true, nothing will be deleted. "+
161                         "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
162                         "You should leave this option alone unless you can afford to lose data.")
163         flag.StringVar(
164                 &blobSigningKeyFile,
165                 "permission-key-file",
166                 "",
167                 "Synonym for -blob-signing-key-file.")
168         flag.StringVar(
169                 &blobSigningKeyFile,
170                 "blob-signing-key-file",
171                 "",
172                 "File containing the secret key for generating and verifying "+
173                         "blob permission signatures.")
174         flag.IntVar(
175                 &permissionTTLSec,
176                 "permission-ttl",
177                 0,
178                 "Synonym for -blob-signature-ttl.")
179         flag.IntVar(
180                 &permissionTTLSec,
181                 "blob-signature-ttl",
182                 int(time.Duration(2*7*24*time.Hour).Seconds()),
183                 "Lifetime of blob permission signatures. "+
184                         "See services/api/config/application.default.yml.")
185         flag.BoolVar(
186                 &flagSerializeIO,
187                 "serialize",
188                 false,
189                 "Serialize read and write operations on the following volumes.")
190         flag.BoolVar(
191                 &flagReadonly,
192                 "readonly",
193                 false,
194                 "Do not write, delete, or touch anything on the following volumes.")
195         flag.StringVar(
196                 &pidfile,
197                 "pid",
198                 "",
199                 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
200         flag.IntVar(
201                 &maxBuffers,
202                 "max-buffers",
203                 maxBuffers,
204                 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
205         flag.IntVar(
206                 &trashLifetime,
207                 "trash-lifetime",
208                 0,
209                 fmt.Sprintf("Trashed blocks will stay in trash for trash-lifetime interval before they are actually deleted by the system."))
210
211         flag.Parse()
212
213         if maxBuffers < 0 {
214                 log.Fatal("-max-buffers must be greater than zero.")
215         }
216         bufs = newBufferPool(maxBuffers, BlockSize)
217
218         if pidfile != "" {
219                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
220                 if err != nil {
221                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
222                 }
223                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
224                 if err != nil {
225                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
226                 }
227                 err = f.Truncate(0)
228                 if err != nil {
229                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
230                 }
231                 _, err = fmt.Fprint(f, os.Getpid())
232                 if err != nil {
233                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
234                 }
235                 err = f.Sync()
236                 if err != nil {
237                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
238                 }
239                 defer f.Close()
240                 defer os.Remove(pidfile)
241         }
242
243         if len(volumes) == 0 {
244                 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
245                         log.Fatal("No volumes found.")
246                 }
247         }
248
249         for _, v := range volumes {
250                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
251         }
252
253         // Initialize data manager token and permission key.
254         // If these tokens are specified but cannot be read,
255         // raise a fatal error.
256         if dataManagerTokenFile != "" {
257                 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
258                         dataManagerToken = strings.TrimSpace(string(buf))
259                 } else {
260                         log.Fatalf("reading data manager token: %s\n", err)
261                 }
262         }
263
264         if neverDelete != true {
265                 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
266                         "been extensively tested. You should leave this option alone unless you can afford to lose data.")
267         }
268
269         if blobSigningKeyFile != "" {
270                 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
271                         PermissionSecret = bytes.TrimSpace(buf)
272                 } else {
273                         log.Fatalf("reading permission key: %s\n", err)
274                 }
275         }
276
277         blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
278
279         if PermissionSecret == nil {
280                 if enforcePermissions {
281                         log.Fatal("-enforce-permissions requires a permission key")
282                 } else {
283                         log.Println("Running without a PermissionSecret. Block locators " +
284                                 "returned by this server will not be signed, and will be rejected " +
285                                 "by a server that enforces permissions.")
286                         log.Println("To fix this, use the -blob-signing-key-file flag " +
287                                 "to specify the file containing the permission key.")
288                 }
289         }
290
291         // Start a round-robin VolumeManager with the volumes we have found.
292         KeepVM = MakeRRVolumeManager(volumes)
293
294         // Tell the built-in HTTP server to direct all requests to the REST router.
295         loggingRouter := MakeLoggingRESTRouter()
296         http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
297                 loggingRouter.ServeHTTP(resp, req)
298         })
299
300         // Set up a TCP listener.
301         listener, err := net.Listen("tcp", listen)
302         if err != nil {
303                 log.Fatal(err)
304         }
305
306         // Initialize Pull queue and worker
307         keepClient := &keepclient.KeepClient{
308                 Arvados:       nil,
309                 Want_replicas: 1,
310                 Client:        &http.Client{},
311         }
312
313         // Initialize the pullq and worker
314         pullq = NewWorkQueue()
315         go RunPullWorker(pullq, keepClient)
316
317         // Initialize the trashq and worker
318         trashq = NewWorkQueue()
319         go RunTrashWorker(trashq)
320
321         // Shut down the server gracefully (by closing the listener)
322         // if SIGTERM is received.
323         term := make(chan os.Signal, 1)
324         go func(sig <-chan os.Signal) {
325                 s := <-sig
326                 log.Println("caught signal:", s)
327                 listener.Close()
328         }(term)
329         signal.Notify(term, syscall.SIGTERM)
330         signal.Notify(term, syscall.SIGINT)
331
332         log.Println("listening at", listen)
333         srv := &http.Server{Addr: listen}
334         srv.Serve(listener)
335 }