13025: Add request time metrics at /metrics and /metrics.json.
[arvados.git] / services / keepstore / config.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io/ioutil"
12         "net/http"
13         "strconv"
14         "strings"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/stats"
19         "github.com/Sirupsen/logrus"
20         "github.com/golang/protobuf/jsonpb"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/prometheus/client_golang/prometheus/promhttp"
23 )
24
25 type Config struct {
26         Debug  bool
27         Listen string
28
29         LogFormat string
30
31         PIDFile string
32
33         MaxBuffers  int
34         MaxRequests int
35
36         BlobSignatureTTL    arvados.Duration
37         BlobSigningKeyFile  string
38         RequireSignatures   bool
39         SystemAuthTokenFile string
40         EnableDelete        bool
41         TrashLifetime       arvados.Duration
42         TrashCheckInterval  arvados.Duration
43
44         Volumes VolumeList
45
46         blobSigningKey  []byte
47         systemAuthToken string
48         debugLogf       func(string, ...interface{})
49
50         ManagementToken string
51
52         metrics
53 }
54
55 var (
56         theConfig = DefaultConfig()
57         formatter = map[string]logrus.Formatter{
58                 "text": &logrus.TextFormatter{
59                         FullTimestamp:   true,
60                         TimestampFormat: rfc3339NanoFixed,
61                 },
62                 "json": &logrus.JSONFormatter{
63                         TimestampFormat: rfc3339NanoFixed,
64                 },
65         }
66         log = logrus.StandardLogger()
67 )
68
69 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
70
71 // DefaultConfig returns the default configuration.
72 func DefaultConfig() *Config {
73         return &Config{
74                 Listen:             ":25107",
75                 LogFormat:          "json",
76                 MaxBuffers:         128,
77                 RequireSignatures:  true,
78                 BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
79                 TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
80                 TrashCheckInterval: arvados.Duration(24 * time.Hour),
81                 Volumes:            []Volume{},
82         }
83 }
84
85 // Start should be called exactly once: after setting all public
86 // fields, and before using the config.
87 func (cfg *Config) Start() error {
88         if cfg.Debug {
89                 log.Level = logrus.DebugLevel
90                 cfg.debugLogf = log.Printf
91                 cfg.debugLogf("debugging enabled")
92         } else {
93                 log.Level = logrus.InfoLevel
94                 cfg.debugLogf = func(string, ...interface{}) {}
95         }
96
97         if f := formatter[strings.ToLower(cfg.LogFormat)]; f == nil {
98                 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
99         } else {
100                 log.Formatter = f
101         }
102
103         if cfg.MaxBuffers < 0 {
104                 return fmt.Errorf("MaxBuffers must be greater than zero")
105         }
106         bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
107
108         if cfg.MaxRequests < 1 {
109                 cfg.MaxRequests = cfg.MaxBuffers * 2
110                 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
111         }
112
113         if cfg.BlobSigningKeyFile != "" {
114                 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
115                 if err != nil {
116                         return fmt.Errorf("reading blob signing key file: %s", err)
117                 }
118                 cfg.blobSigningKey = bytes.TrimSpace(buf)
119                 if len(cfg.blobSigningKey) == 0 {
120                         return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
121                 }
122         } else if cfg.RequireSignatures {
123                 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
124         } else {
125                 log.Println("Running without a blob signing key. Block locators " +
126                         "returned by this server will not be signed, and will be rejected " +
127                         "by a server that enforces permissions.")
128                 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
129         }
130
131         if fn := cfg.SystemAuthTokenFile; fn != "" {
132                 buf, err := ioutil.ReadFile(fn)
133                 if err != nil {
134                         return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
135                 }
136                 cfg.systemAuthToken = strings.TrimSpace(string(buf))
137         }
138
139         if cfg.EnableDelete {
140                 log.Print("Trash/delete features are enabled. WARNING: this has not " +
141                         "been extensively tested. You should disable this unless you can afford to lose data.")
142         }
143
144         if len(cfg.Volumes) == 0 {
145                 if (&unixVolumeAdder{cfg}).Discover() == 0 {
146                         return fmt.Errorf("no volumes found")
147                 }
148         }
149         for _, v := range cfg.Volumes {
150                 if err := v.Start(); err != nil {
151                         return fmt.Errorf("volume %s: %s", v, err)
152                 }
153                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
154         }
155         return nil
156 }
157
158 type metrics struct {
159         registry     *prometheus.Registry
160         reqDuration  *prometheus.SummaryVec
161         timeToStatus *prometheus.SummaryVec
162         exportProm   http.Handler
163 }
164
165 func (*metrics) Levels() []logrus.Level {
166         return logrus.AllLevels
167 }
168
169 func (m *metrics) Fire(ent *logrus.Entry) error {
170         if tts, ok := ent.Data["timeToStatus"].(stats.Duration); !ok {
171         } else if method, ok := ent.Data["reqMethod"].(string); !ok {
172         } else if code, ok := ent.Data["respStatusCode"].(int); !ok {
173         } else {
174                 m.timeToStatus.WithLabelValues(strconv.Itoa(code), strings.ToLower(method)).Observe(time.Duration(tts).Seconds())
175         }
176         return nil
177 }
178
179 func (m *metrics) setup() {
180         m.registry = prometheus.NewRegistry()
181         m.timeToStatus = prometheus.NewSummaryVec(prometheus.SummaryOpts{
182                 Name: "time_to_status_seconds",
183                 Help: "Summary of request TTFB.",
184         }, []string{"code", "method"})
185         m.reqDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
186                 Name: "request_duration_seconds",
187                 Help: "Summary of request duration.",
188         }, []string{"code", "method"})
189         m.registry.MustRegister(m.timeToStatus)
190         m.registry.MustRegister(m.reqDuration)
191         m.exportProm = promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
192                 ErrorLog: log,
193         })
194         log.AddHook(m)
195 }
196
197 func (m *metrics) exportJSON(w http.ResponseWriter, req *http.Request) {
198         jm := jsonpb.Marshaler{Indent: "  "}
199         mfs, _ := m.registry.Gather()
200         w.Write([]byte{'['})
201         for i, mf := range mfs {
202                 if i > 0 {
203                         w.Write([]byte{','})
204                 }
205                 jm.Marshal(w, mf)
206         }
207         w.Write([]byte{']'})
208 }
209
210 func (m *metrics) Instrument(next http.Handler) http.Handler {
211         return promhttp.InstrumentHandlerDuration(m.reqDuration, next)
212 }
213
214 // VolumeTypes is built up by init() funcs in the source files that
215 // define the volume types.
216 var VolumeTypes = []func() VolumeWithExamples{}
217
218 type VolumeList []Volume
219
220 // UnmarshalJSON, given an array of objects, deserializes each object
221 // as the volume type indicated by the object's Type field.
222 func (vols *VolumeList) UnmarshalJSON(data []byte) error {
223         typeMap := map[string]func() VolumeWithExamples{}
224         for _, factory := range VolumeTypes {
225                 t := factory().Type()
226                 if _, ok := typeMap[t]; ok {
227                         log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
228                 }
229                 typeMap[t] = factory
230         }
231
232         var mapList []map[string]interface{}
233         err := json.Unmarshal(data, &mapList)
234         if err != nil {
235                 return err
236         }
237         for _, mapIn := range mapList {
238                 typeIn, ok := mapIn["Type"].(string)
239                 if !ok {
240                         return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
241                 }
242                 factory, ok := typeMap[typeIn]
243                 if !ok {
244                         return fmt.Errorf("unsupported volume type %+q", typeIn)
245                 }
246                 data, err := json.Marshal(mapIn)
247                 if err != nil {
248                         return err
249                 }
250                 vol := factory()
251                 err = json.Unmarshal(data, vol)
252                 if err != nil {
253                         return err
254                 }
255                 *vols = append(*vols, vol)
256         }
257         return nil
258 }
259
260 // MarshalJSON adds a "Type" field to each volume corresponding to its
261 // Type().
262 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
263         data := []byte{'['}
264         for _, vs := range *vl {
265                 j, err := json.Marshal(vs)
266                 if err != nil {
267                         return nil, err
268                 }
269                 if len(data) > 1 {
270                         data = append(data, byte(','))
271                 }
272                 t, err := json.Marshal(vs.Type())
273                 if err != nil {
274                         panic(err)
275                 }
276                 data = append(data, j[0])
277                 data = append(data, []byte(`"Type":`)...)
278                 data = append(data, t...)
279                 data = append(data, byte(','))
280                 data = append(data, j[1:]...)
281         }
282         return append(data, byte(']')), nil
283 }