Merge branch 'master' of git.curoverse.com:arvados into 13330-cwl-intermediate-collec...
[arvados.git] / services / keepstore / config.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io/ioutil"
12         "net/http"
13         "strconv"
14         "strings"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/stats"
19         "github.com/Sirupsen/logrus"
20         "github.com/golang/protobuf/jsonpb"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/prometheus/client_golang/prometheus/promhttp"
23 )
24
25 type Config struct {
26         Debug  bool
27         Listen string
28
29         LogFormat string
30
31         PIDFile string
32
33         MaxBuffers  int
34         MaxRequests int
35
36         BlobSignatureTTL    arvados.Duration
37         BlobSigningKeyFile  string
38         RequireSignatures   bool
39         SystemAuthTokenFile string
40         EnableDelete        bool
41         TrashLifetime       arvados.Duration
42         TrashCheckInterval  arvados.Duration
43         PullWorkers         int
44         TrashWorkers        int
45         EmptyTrashWorkers   int
46         TLSCertificateFile  string
47         TLSKeyFile          string
48
49         Volumes VolumeList
50
51         blobSigningKey  []byte
52         systemAuthToken string
53         debugLogf       func(string, ...interface{})
54
55         ManagementToken string `doc: The secret key that must be provided by monitoring services
56 wishing to access the health check endpoint (/_health).`
57
58         metrics
59 }
60
61 var (
62         theConfig = DefaultConfig()
63         formatter = map[string]logrus.Formatter{
64                 "text": &logrus.TextFormatter{
65                         FullTimestamp:   true,
66                         TimestampFormat: rfc3339NanoFixed,
67                 },
68                 "json": &logrus.JSONFormatter{
69                         TimestampFormat: rfc3339NanoFixed,
70                 },
71         }
72         log = logrus.StandardLogger()
73 )
74
75 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
76
77 // DefaultConfig returns the default configuration.
78 func DefaultConfig() *Config {
79         return &Config{
80                 Listen:             ":25107",
81                 LogFormat:          "json",
82                 MaxBuffers:         128,
83                 RequireSignatures:  true,
84                 BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
85                 TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
86                 TrashCheckInterval: arvados.Duration(24 * time.Hour),
87                 Volumes:            []Volume{},
88         }
89 }
90
91 // Start should be called exactly once: after setting all public
92 // fields, and before using the config.
93 func (cfg *Config) Start() error {
94         if cfg.Debug {
95                 log.Level = logrus.DebugLevel
96                 cfg.debugLogf = log.Printf
97                 cfg.debugLogf("debugging enabled")
98         } else {
99                 log.Level = logrus.InfoLevel
100                 cfg.debugLogf = func(string, ...interface{}) {}
101         }
102
103         f := formatter[strings.ToLower(cfg.LogFormat)]
104         if f == nil {
105                 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
106         }
107         log.Formatter = f
108
109         if cfg.MaxBuffers < 0 {
110                 return fmt.Errorf("MaxBuffers must be greater than zero")
111         }
112         bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
113
114         if cfg.MaxRequests < 1 {
115                 cfg.MaxRequests = cfg.MaxBuffers * 2
116                 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
117         }
118
119         if cfg.BlobSigningKeyFile != "" {
120                 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
121                 if err != nil {
122                         return fmt.Errorf("reading blob signing key file: %s", err)
123                 }
124                 cfg.blobSigningKey = bytes.TrimSpace(buf)
125                 if len(cfg.blobSigningKey) == 0 {
126                         return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
127                 }
128         } else if cfg.RequireSignatures {
129                 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
130         } else {
131                 log.Println("Running without a blob signing key. Block locators " +
132                         "returned by this server will not be signed, and will be rejected " +
133                         "by a server that enforces permissions.")
134                 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
135         }
136
137         if fn := cfg.SystemAuthTokenFile; fn != "" {
138                 buf, err := ioutil.ReadFile(fn)
139                 if err != nil {
140                         return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
141                 }
142                 cfg.systemAuthToken = strings.TrimSpace(string(buf))
143         }
144
145         if cfg.EnableDelete {
146                 log.Print("Trash/delete features are enabled. WARNING: this has not " +
147                         "been extensively tested. You should disable this unless you can afford to lose data.")
148         }
149
150         if len(cfg.Volumes) == 0 {
151                 if (&unixVolumeAdder{cfg}).Discover() == 0 {
152                         return fmt.Errorf("no volumes found")
153                 }
154         }
155         for _, v := range cfg.Volumes {
156                 if err := v.Start(); err != nil {
157                         return fmt.Errorf("volume %s: %s", v, err)
158                 }
159                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
160         }
161         return nil
162 }
163
164 type metrics struct {
165         registry     *prometheus.Registry
166         reqDuration  *prometheus.SummaryVec
167         timeToStatus *prometheus.SummaryVec
168         exportProm   http.Handler
169 }
170
171 func (*metrics) Levels() []logrus.Level {
172         return logrus.AllLevels
173 }
174
175 func (m *metrics) Fire(ent *logrus.Entry) error {
176         if tts, ok := ent.Data["timeToStatus"].(stats.Duration); !ok {
177         } else if method, ok := ent.Data["reqMethod"].(string); !ok {
178         } else if code, ok := ent.Data["respStatusCode"].(int); !ok {
179         } else {
180                 m.timeToStatus.WithLabelValues(strconv.Itoa(code), strings.ToLower(method)).Observe(time.Duration(tts).Seconds())
181         }
182         return nil
183 }
184
185 func (m *metrics) setup() {
186         m.registry = prometheus.NewRegistry()
187         m.timeToStatus = prometheus.NewSummaryVec(prometheus.SummaryOpts{
188                 Name: "time_to_status_seconds",
189                 Help: "Summary of request TTFB.",
190         }, []string{"code", "method"})
191         m.reqDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
192                 Name: "request_duration_seconds",
193                 Help: "Summary of request duration.",
194         }, []string{"code", "method"})
195         m.registry.MustRegister(m.timeToStatus)
196         m.registry.MustRegister(m.reqDuration)
197         m.exportProm = promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
198                 ErrorLog: log,
199         })
200         log.AddHook(m)
201 }
202
203 func (m *metrics) exportJSON(w http.ResponseWriter, req *http.Request) {
204         jm := jsonpb.Marshaler{Indent: "  "}
205         mfs, _ := m.registry.Gather()
206         w.Write([]byte{'['})
207         for i, mf := range mfs {
208                 if i > 0 {
209                         w.Write([]byte{','})
210                 }
211                 jm.Marshal(w, mf)
212         }
213         w.Write([]byte{']'})
214 }
215
216 func (m *metrics) Instrument(next http.Handler) http.Handler {
217         return promhttp.InstrumentHandlerDuration(m.reqDuration, next)
218 }
219
220 // VolumeTypes is built up by init() funcs in the source files that
221 // define the volume types.
222 var VolumeTypes = []func() VolumeWithExamples{}
223
224 type VolumeList []Volume
225
226 // UnmarshalJSON -- given an array of objects -- deserializes each
227 // object as the volume type indicated by the object's Type field.
228 func (vl *VolumeList) UnmarshalJSON(data []byte) error {
229         typeMap := map[string]func() VolumeWithExamples{}
230         for _, factory := range VolumeTypes {
231                 t := factory().Type()
232                 if _, ok := typeMap[t]; ok {
233                         log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
234                 }
235                 typeMap[t] = factory
236         }
237
238         var mapList []map[string]interface{}
239         err := json.Unmarshal(data, &mapList)
240         if err != nil {
241                 return err
242         }
243         for _, mapIn := range mapList {
244                 typeIn, ok := mapIn["Type"].(string)
245                 if !ok {
246                         return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
247                 }
248                 factory, ok := typeMap[typeIn]
249                 if !ok {
250                         return fmt.Errorf("unsupported volume type %+q", typeIn)
251                 }
252                 data, err := json.Marshal(mapIn)
253                 if err != nil {
254                         return err
255                 }
256                 vol := factory()
257                 err = json.Unmarshal(data, vol)
258                 if err != nil {
259                         return err
260                 }
261                 *vl = append(*vl, vol)
262         }
263         return nil
264 }
265
266 // MarshalJSON adds a "Type" field to each volume corresponding to its
267 // Type().
268 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
269         data := []byte{'['}
270         for _, vs := range *vl {
271                 j, err := json.Marshal(vs)
272                 if err != nil {
273                         return nil, err
274                 }
275                 if len(data) > 1 {
276                         data = append(data, byte(','))
277                 }
278                 t, err := json.Marshal(vs.Type())
279                 if err != nil {
280                         panic(err)
281                 }
282                 data = append(data, j[0])
283                 data = append(data, []byte(`"Type":`)...)
284                 data = append(data, t...)
285                 data = append(data, byte(','))
286                 data = append(data, j[1:]...)
287         }
288         return append(data, byte(']')), nil
289 }