11469: Docker-managed volumes go in "Volumes" not "Binds".
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "flag"
5         "fmt"
6         "net"
7         "net/http"
8         "os"
9         "os/signal"
10         "syscall"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
14         "git.curoverse.com/arvados.git/sdk/go/config"
15         "git.curoverse.com/arvados.git/sdk/go/httpserver"
16         "git.curoverse.com/arvados.git/sdk/go/keepclient"
17         log "github.com/Sirupsen/logrus"
18         "github.com/coreos/go-systemd/daemon"
19 )
20
21 // A Keep "block" is 64MB.
22 const BlockSize = 64 * 1024 * 1024
23
24 // A Keep volume must have at least MinFreeKilobytes available
25 // in order to permit writes.
26 const MinFreeKilobytes = BlockSize / 1024
27
28 // ProcMounts /proc/mounts
29 var ProcMounts = "/proc/mounts"
30
31 var bufs *bufferPool
32
33 // KeepError types.
34 //
35 type KeepError struct {
36         HTTPCode int
37         ErrMsg   string
38 }
39
40 var (
41         BadRequestError     = &KeepError{400, "Bad Request"}
42         UnauthorizedError   = &KeepError{401, "Unauthorized"}
43         CollisionError      = &KeepError{500, "Collision"}
44         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
45         PermissionError     = &KeepError{403, "Forbidden"}
46         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
47         ExpiredError        = &KeepError{401, "Expired permission signature"}
48         NotFoundError       = &KeepError{404, "Not Found"}
49         GenericError        = &KeepError{500, "Fail"}
50         FullError           = &KeepError{503, "Full"}
51         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
52         TooLongError        = &KeepError{413, "Block is too large"}
53         MethodDisabledError = &KeepError{405, "Method disabled"}
54         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
55         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
56 )
57
58 func (e *KeepError) Error() string {
59         return e.ErrMsg
60 }
61
62 // ========================
63 // Internal data structures
64 //
65 // These global variables are used by multiple parts of the
66 // program. They are good candidates for moving into their own
67 // packages.
68
69 // The Keep VolumeManager maintains a list of available volumes.
70 // Initialized by the --volumes flag (or by FindKeepVolumes).
71 var KeepVM VolumeManager
72
73 // The pull list manager and trash queue are threadsafe queues which
74 // support atomic update operations. The PullHandler and TrashHandler
75 // store results from Data Manager /pull and /trash requests here.
76 //
77 // See the Keep and Data Manager design documents for more details:
78 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
79 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
80 //
81 var pullq *WorkQueue
82 var trashq *WorkQueue
83
84 func main() {
85         deprecated.beforeFlagParse(theConfig)
86
87         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
88
89         defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
90         var configPath string
91         flag.StringVar(
92                 &configPath,
93                 "config",
94                 defaultConfigPath,
95                 "YAML or JSON configuration file `path`")
96         flag.Usage = usage
97         flag.Parse()
98
99         deprecated.afterFlagParse(theConfig)
100
101         err := config.LoadFile(theConfig, configPath)
102         if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
103                 log.Fatal(err)
104         }
105
106         if *dumpConfig {
107                 log.Fatal(config.DumpAndExit(theConfig))
108         }
109
110         err = theConfig.Start()
111         if err != nil {
112                 log.Fatal(err)
113         }
114
115         if pidfile := theConfig.PIDFile; pidfile != "" {
116                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
117                 if err != nil {
118                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
119                 }
120                 defer f.Close()
121                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
122                 if err != nil {
123                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
124                 }
125                 defer os.Remove(pidfile)
126                 err = f.Truncate(0)
127                 if err != nil {
128                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
129                 }
130                 _, err = fmt.Fprint(f, os.Getpid())
131                 if err != nil {
132                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
133                 }
134                 err = f.Sync()
135                 if err != nil {
136                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
137                 }
138         }
139
140         log.Println("keepstore starting, pid", os.Getpid())
141         defer log.Println("keepstore exiting, pid", os.Getpid())
142
143         // Start a round-robin VolumeManager with the volumes we have found.
144         KeepVM = MakeRRVolumeManager(theConfig.Volumes)
145
146         // Middleware stack: logger, MaxRequests limiter, method handlers
147         router := MakeRESTRouter()
148         limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
149         router.limiter = limiter
150         http.Handle("/", &LoggingRESTRouter{router: limiter})
151
152         // Set up a TCP listener.
153         listener, err := net.Listen("tcp", theConfig.Listen)
154         if err != nil {
155                 log.Fatal(err)
156         }
157
158         // Initialize Pull queue and worker
159         keepClient := &keepclient.KeepClient{
160                 Arvados:       &arvadosclient.ArvadosClient{},
161                 Want_replicas: 1,
162                 Client:        &http.Client{},
163         }
164
165         // Initialize the pullq and worker
166         pullq = NewWorkQueue()
167         go RunPullWorker(pullq, keepClient)
168
169         // Initialize the trashq and worker
170         trashq = NewWorkQueue()
171         go RunTrashWorker(trashq)
172
173         // Start emptyTrash goroutine
174         doneEmptyingTrash := make(chan bool)
175         go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
176
177         // Shut down the server gracefully (by closing the listener)
178         // if SIGTERM is received.
179         term := make(chan os.Signal, 1)
180         go func(sig <-chan os.Signal) {
181                 s := <-sig
182                 log.Println("caught signal:", s)
183                 doneEmptyingTrash <- true
184                 listener.Close()
185         }(term)
186         signal.Notify(term, syscall.SIGTERM)
187         signal.Notify(term, syscall.SIGINT)
188
189         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
190                 log.Printf("Error notifying init daemon: %v", err)
191         }
192         log.Println("listening at", listener.Addr())
193         srv := &http.Server{}
194         srv.Serve(listener)
195 }
196
197 // Periodically (once per interval) invoke EmptyTrash on all volumes.
198 func emptyTrash(done <-chan bool, interval time.Duration) {
199         ticker := time.NewTicker(interval)
200
201         for {
202                 select {
203                 case <-ticker.C:
204                         for _, v := range theConfig.Volumes {
205                                 if v.Writable() {
206                                         v.EmptyTrash()
207                                 }
208                         }
209                 case <-done:
210                         ticker.Stop()
211                         return
212                 }
213         }
214 }