14360: Cancel containers asynchronously in Sync cleanup.
[arvados.git] / lib / dispatchcloud / dispatcher.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "crypto/md5"
9         "encoding/json"
10         "fmt"
11         "net/http"
12         "strings"
13         "sync"
14         "time"
15
16         "git.curoverse.com/arvados.git/lib/cloud"
17         "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
18         "git.curoverse.com/arvados.git/lib/dispatchcloud/scheduler"
19         "git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor"
20         "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
21         "git.curoverse.com/arvados.git/sdk/go/arvados"
22         "git.curoverse.com/arvados.git/sdk/go/auth"
23         "git.curoverse.com/arvados.git/sdk/go/httpserver"
24         "github.com/Sirupsen/logrus"
25         "github.com/prometheus/client_golang/prometheus"
26         "github.com/prometheus/client_golang/prometheus/promhttp"
27         "golang.org/x/crypto/ssh"
28 )
29
30 const (
31         defaultPollInterval = time.Second
32 )
33
34 type containerQueue interface {
35         scheduler.ContainerQueue
36         Update() error
37 }
38
39 type pool interface {
40         scheduler.WorkerPool
41         Instances() []worker.InstanceView
42 }
43
44 type dispatcher struct {
45         Cluster       *arvados.Cluster
46         InstanceSetID cloud.InstanceSetID
47
48         logger       logrus.FieldLogger
49         reg          *prometheus.Registry
50         instanceSet  cloud.InstanceSet
51         pool         pool
52         queue        containerQueue
53         httpHandler  http.Handler
54         pollInterval time.Duration
55         sshKey       ssh.Signer
56
57         setupOnce sync.Once
58         stop      chan struct{}
59 }
60
61 // Start starts the dispatcher. Start can be called multiple times
62 // with no ill effect.
63 func (disp *dispatcher) Start() {
64         disp.setupOnce.Do(disp.setup)
65 }
66
67 // ServeHTTP implements service.Handler.
68 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
69         disp.Start()
70         disp.httpHandler.ServeHTTP(w, r)
71 }
72
73 // CheckHealth implements service.Handler.
74 func (disp *dispatcher) CheckHealth() error {
75         disp.Start()
76         return nil
77 }
78
79 // Stop dispatching containers and release resources. Typically used
80 // in tests.
81 func (disp *dispatcher) Close() {
82         disp.Start()
83         select {
84         case disp.stop <- struct{}{}:
85         default:
86         }
87 }
88
89 // Make a worker.Executor for the given instance.
90 func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
91         exr := ssh_executor.New(inst)
92         exr.SetSigners(disp.sshKey)
93         return exr
94 }
95
96 func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
97         return ChooseInstanceType(disp.Cluster, ctr)
98 }
99
100 func (disp *dispatcher) setup() {
101         disp.initialize()
102         go disp.run()
103 }
104
105 func (disp *dispatcher) initialize() {
106         arvClient := arvados.NewClientFromEnv()
107         if disp.InstanceSetID == "" {
108                 if strings.HasPrefix(arvClient.AuthToken, "v2/") {
109                         disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1])
110                 } else {
111                         // Use some other string unique to this token
112                         // that doesn't reveal the token itself.
113                         disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken))))
114                 }
115         }
116         disp.stop = make(chan struct{}, 1)
117         disp.logger = logrus.StandardLogger()
118
119         if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
120                 disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
121         } else {
122                 disp.sshKey = key
123         }
124
125         instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID)
126         if err != nil {
127                 disp.logger.Fatalf("error initializing driver: %s", err)
128         }
129         disp.instanceSet = &instanceSetProxy{instanceSet}
130         disp.reg = prometheus.NewRegistry()
131         disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
132         disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
133
134         mux := http.NewServeMux()
135         mux.HandleFunc("/arvados/v1/dispatch/containers", disp.apiContainers)
136         mux.HandleFunc("/arvados/v1/dispatch/instances", disp.apiInstances)
137         metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
138                 ErrorLog: disp.logger,
139         })
140         mux.Handle("/metrics", metricsH)
141         mux.Handle("/metrics.json", metricsH)
142         disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
143
144         if d := disp.Cluster.Dispatch.PollInterval; d > 0 {
145                 disp.pollInterval = time.Duration(d)
146         } else {
147                 disp.pollInterval = defaultPollInterval
148         }
149 }
150
151 func (disp *dispatcher) run() {
152         defer disp.instanceSet.Stop()
153
154         t0 := time.Now()
155         disp.logger.Infof("FixStaleLocks starting.")
156         scheduler.FixStaleLocks(disp.logger, disp.queue, disp.pool, time.Duration(disp.Cluster.Dispatch.StaleLockTimeout))
157         disp.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
158
159         wp := disp.pool.Subscribe()
160         defer disp.pool.Unsubscribe(wp)
161         poll := time.NewTicker(disp.pollInterval)
162         for {
163                 scheduler.Map(disp.logger, disp.queue, disp.pool)
164                 scheduler.Sync(disp.logger, disp.queue, disp.pool)
165                 select {
166                 case <-disp.stop:
167                         return
168                 case <-wp:
169                 case <-poll.C:
170                         err := disp.queue.Update()
171                         if err != nil {
172                                 disp.logger.Errorf("error updating queue: %s", err)
173                         }
174                 }
175         }
176 }
177
178 // Management API: all active and queued containers.
179 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
180         if r.Method != "GET" {
181                 httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
182                 return
183         }
184         var resp struct {
185                 Items []container.QueueEnt
186         }
187         qEntries, _ := disp.queue.Entries()
188         for _, ent := range qEntries {
189                 resp.Items = append(resp.Items, ent)
190         }
191         json.NewEncoder(w).Encode(resp)
192 }
193
194 // Management API: all active instances (cloud VMs).
195 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
196         if r.Method != "GET" {
197                 httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
198                 return
199         }
200         var resp struct {
201                 Items []worker.InstanceView
202         }
203         resp.Items = disp.pool.Instances()
204         json.NewEncoder(w).Encode(resp)
205 }