1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/sdk/go/arvados"
18 "git.curoverse.com/arvados.git/sdk/go/stats"
19 "github.com/Sirupsen/logrus"
20 "github.com/golang/protobuf/jsonpb"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/prometheus/client_golang/prometheus/promhttp"
36 BlobSignatureTTL arvados.Duration
37 BlobSigningKeyFile string
38 RequireSignatures bool
39 SystemAuthTokenFile string
41 TrashLifetime arvados.Duration
42 TrashCheckInterval arvados.Duration
46 TLSCertificateFile string
52 systemAuthToken string
53 debugLogf func(string, ...interface{})
55 ManagementToken string `doc: The secret key that must be provided by monitoring services
56 wishing to access the health check endpoint (/_health).`
62 theConfig = DefaultConfig()
63 formatter = map[string]logrus.Formatter{
64 "text": &logrus.TextFormatter{
66 TimestampFormat: rfc3339NanoFixed,
68 "json": &logrus.JSONFormatter{
69 TimestampFormat: rfc3339NanoFixed,
72 log = logrus.StandardLogger()
75 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
77 // DefaultConfig returns the default configuration.
78 func DefaultConfig() *Config {
83 RequireSignatures: true,
84 BlobSignatureTTL: arvados.Duration(14 * 24 * time.Hour),
85 TrashLifetime: arvados.Duration(14 * 24 * time.Hour),
86 TrashCheckInterval: arvados.Duration(24 * time.Hour),
91 // Start should be called exactly once: after setting all public
92 // fields, and before using the config.
93 func (cfg *Config) Start() error {
95 log.Level = logrus.DebugLevel
96 cfg.debugLogf = log.Printf
97 cfg.debugLogf("debugging enabled")
99 log.Level = logrus.InfoLevel
100 cfg.debugLogf = func(string, ...interface{}) {}
103 f := formatter[strings.ToLower(cfg.LogFormat)]
105 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
109 if cfg.MaxBuffers < 0 {
110 return fmt.Errorf("MaxBuffers must be greater than zero")
112 bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
114 if cfg.MaxRequests < 1 {
115 cfg.MaxRequests = cfg.MaxBuffers * 2
116 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
119 if cfg.BlobSigningKeyFile != "" {
120 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
122 return fmt.Errorf("reading blob signing key file: %s", err)
124 cfg.blobSigningKey = bytes.TrimSpace(buf)
125 if len(cfg.blobSigningKey) == 0 {
126 return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
128 } else if cfg.RequireSignatures {
129 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
131 log.Println("Running without a blob signing key. Block locators " +
132 "returned by this server will not be signed, and will be rejected " +
133 "by a server that enforces permissions.")
134 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
137 if fn := cfg.SystemAuthTokenFile; fn != "" {
138 buf, err := ioutil.ReadFile(fn)
140 return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
142 cfg.systemAuthToken = strings.TrimSpace(string(buf))
145 if cfg.EnableDelete {
146 log.Print("Trash/delete features are enabled. WARNING: this has not " +
147 "been extensively tested. You should disable this unless you can afford to lose data.")
150 if len(cfg.Volumes) == 0 {
151 if (&unixVolumeAdder{cfg}).Discover() == 0 {
152 return fmt.Errorf("no volumes found")
155 for _, v := range cfg.Volumes {
156 if err := v.Start(); err != nil {
157 return fmt.Errorf("volume %s: %s", v, err)
159 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
164 type metrics struct {
165 registry *prometheus.Registry
166 reqDuration *prometheus.SummaryVec
167 timeToStatus *prometheus.SummaryVec
168 exportProm http.Handler
171 func (*metrics) Levels() []logrus.Level {
172 return logrus.AllLevels
175 func (m *metrics) Fire(ent *logrus.Entry) error {
176 if tts, ok := ent.Data["timeToStatus"].(stats.Duration); !ok {
177 } else if method, ok := ent.Data["reqMethod"].(string); !ok {
178 } else if code, ok := ent.Data["respStatusCode"].(int); !ok {
180 m.timeToStatus.WithLabelValues(strconv.Itoa(code), strings.ToLower(method)).Observe(time.Duration(tts).Seconds())
185 func (m *metrics) setup() {
186 m.registry = prometheus.NewRegistry()
187 m.timeToStatus = prometheus.NewSummaryVec(prometheus.SummaryOpts{
188 Name: "time_to_status_seconds",
189 Help: "Summary of request TTFB.",
190 }, []string{"code", "method"})
191 m.reqDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
192 Name: "request_duration_seconds",
193 Help: "Summary of request duration.",
194 }, []string{"code", "method"})
195 m.registry.MustRegister(m.timeToStatus)
196 m.registry.MustRegister(m.reqDuration)
197 m.exportProm = promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
203 func (m *metrics) exportJSON(w http.ResponseWriter, req *http.Request) {
204 jm := jsonpb.Marshaler{Indent: " "}
205 mfs, _ := m.registry.Gather()
207 for i, mf := range mfs {
216 func (m *metrics) Instrument(next http.Handler) http.Handler {
217 return promhttp.InstrumentHandlerDuration(m.reqDuration, next)
220 // VolumeTypes is built up by init() funcs in the source files that
221 // define the volume types.
222 var VolumeTypes = []func() VolumeWithExamples{}
224 type VolumeList []Volume
226 // UnmarshalJSON -- given an array of objects -- deserializes each
227 // object as the volume type indicated by the object's Type field.
228 func (vl *VolumeList) UnmarshalJSON(data []byte) error {
229 typeMap := map[string]func() VolumeWithExamples{}
230 for _, factory := range VolumeTypes {
231 t := factory().Type()
232 if _, ok := typeMap[t]; ok {
233 log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
238 var mapList []map[string]interface{}
239 err := json.Unmarshal(data, &mapList)
243 for _, mapIn := range mapList {
244 typeIn, ok := mapIn["Type"].(string)
246 return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
248 factory, ok := typeMap[typeIn]
250 return fmt.Errorf("unsupported volume type %+q", typeIn)
252 data, err := json.Marshal(mapIn)
257 err = json.Unmarshal(data, vol)
261 *vl = append(*vl, vol)
266 // MarshalJSON adds a "Type" field to each volume corresponding to its
268 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
270 for _, vs := range *vl {
271 j, err := json.Marshal(vs)
276 data = append(data, byte(','))
278 t, err := json.Marshal(vs.Type())
282 data = append(data, j[0])
283 data = append(data, []byte(`"Type":`)...)
284 data = append(data, t...)
285 data = append(data, byte(','))
286 data = append(data, j[1:]...)
288 return append(data, byte(']')), nil