14325: Configurable SSH target port for cloud VMs.
[arvados.git] / lib / dispatchcloud / ssh_executor / executor.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package ssh_executor provides an implementation of pool.Executor
6 // using a long-lived multiplexed SSH session.
7 package ssh_executor
8
9 import (
10         "bytes"
11         "errors"
12         "io"
13         "net"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/cloud"
18         "golang.org/x/crypto/ssh"
19 )
20
21 // New returns a new Executor, using the given target.
22 func New(t cloud.ExecutorTarget) *Executor {
23         return &Executor{target: t}
24 }
25
26 // An Executor uses a multiplexed SSH connection to execute shell
27 // commands on a remote target. It reconnects automatically after
28 // errors.
29 //
30 // When setting up a connection, the Executor accepts whatever host
31 // key is provided by the remote server, then passes the received key
32 // and the SSH connection to the target's VerifyHostKey method before
33 // executing commands on the connection.
34 //
35 // A zero Executor must not be used before calling SetTarget.
36 //
37 // An Executor must not be copied.
38 type Executor struct {
39         target     cloud.ExecutorTarget
40         targetPort string
41         signers    []ssh.Signer
42         mtx        sync.RWMutex // controls access to instance after creation
43
44         client      *ssh.Client
45         clientErr   error
46         clientOnce  sync.Once     // initialized private state
47         clientSetup chan bool     // len>0 while client setup is in progress
48         hostKey     ssh.PublicKey // most recent host key that passed verification, if any
49 }
50
51 // SetSigners updates the set of private keys that will be offered to
52 // the target next time the Executor sets up a new connection.
53 func (exr *Executor) SetSigners(signers ...ssh.Signer) {
54         exr.mtx.Lock()
55         defer exr.mtx.Unlock()
56         exr.signers = signers
57 }
58
59 // SetTarget sets the current target. The new target will be used next
60 // time a new connection is set up; until then, the Executor will
61 // continue to use the existing target.
62 //
63 // The new target is assumed to represent the same host as the
64 // previous target, although its address and host key might differ.
65 func (exr *Executor) SetTarget(t cloud.ExecutorTarget) {
66         exr.mtx.Lock()
67         defer exr.mtx.Unlock()
68         exr.target = t
69 }
70
71 // SetTargetPort sets the default port (name or number) to connect
72 // to. This is used only when the address returned by the target's
73 // Address() method does not specify a port. If the given port is
74 // empty (or SetTargetPort is not called at all), the default port is
75 // "ssh".
76 func (exr *Executor) SetTargetPort(port string) {
77         exr.mtx.Lock()
78         defer exr.mtx.Unlock()
79         exr.targetPort = port
80 }
81
82 // Target returns the current target.
83 func (exr *Executor) Target() cloud.ExecutorTarget {
84         exr.mtx.RLock()
85         defer exr.mtx.RUnlock()
86         return exr.target
87 }
88
89 // Execute runs cmd on the target. If an existing connection is not
90 // usable, it sets up a new connection to the current target.
91 func (exr *Executor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
92         session, err := exr.newSession()
93         if err != nil {
94                 return nil, nil, err
95         }
96         defer session.Close()
97         for k, v := range env {
98                 err = session.Setenv(k, v)
99                 if err != nil {
100                         return nil, nil, err
101                 }
102         }
103         var stdout, stderr bytes.Buffer
104         session.Stdin = stdin
105         session.Stdout = &stdout
106         session.Stderr = &stderr
107         err = session.Run(cmd)
108         return stdout.Bytes(), stderr.Bytes(), err
109 }
110
111 // Close shuts down any active connections.
112 func (exr *Executor) Close() {
113         // Ensure exr is initialized
114         exr.sshClient(false)
115
116         exr.clientSetup <- true
117         if exr.client != nil {
118                 defer exr.client.Close()
119         }
120         exr.client, exr.clientErr = nil, errors.New("closed")
121         <-exr.clientSetup
122 }
123
124 // Create a new SSH session. If session setup fails or the SSH client
125 // hasn't been setup yet, setup a new SSH client and try again.
126 func (exr *Executor) newSession() (*ssh.Session, error) {
127         try := func(create bool) (*ssh.Session, error) {
128                 client, err := exr.sshClient(create)
129                 if err != nil {
130                         return nil, err
131                 }
132                 return client.NewSession()
133         }
134         session, err := try(false)
135         if err != nil {
136                 session, err = try(true)
137         }
138         return session, err
139 }
140
141 // Get the latest SSH client. If another goroutine is in the process
142 // of setting one up, wait for it to finish and return its result (or
143 // the last successfully setup client, if it fails).
144 func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
145         exr.clientOnce.Do(func() {
146                 exr.clientSetup = make(chan bool, 1)
147                 exr.clientErr = errors.New("client not yet created")
148         })
149         defer func() { <-exr.clientSetup }()
150         select {
151         case exr.clientSetup <- true:
152                 if create {
153                         client, err := exr.setupSSHClient()
154                         if err == nil || exr.client == nil {
155                                 if exr.client != nil {
156                                         // Hang up the previous
157                                         // (non-working) client
158                                         go exr.client.Close()
159                                 }
160                                 exr.client, exr.clientErr = client, err
161                         }
162                         if err != nil {
163                                 return nil, err
164                         }
165                 }
166         default:
167                 // Another goroutine is doing the above case.  Wait
168                 // for it to finish and return whatever it leaves in
169                 // wkr.client.
170                 exr.clientSetup <- true
171         }
172         return exr.client, exr.clientErr
173 }
174
175 // Create a new SSH client.
176 func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
177         target := exr.Target()
178         addr := target.Address()
179         if addr == "" {
180                 return nil, errors.New("instance has no address")
181         }
182         if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
183                 // Target address does not specify a port.  Use
184                 // targetPort, or "ssh".
185                 if p = exr.targetPort; p == "" {
186                         p = "ssh"
187                 }
188                 addr = net.JoinHostPort(h, p)
189         }
190         var receivedKey ssh.PublicKey
191         client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
192                 User: "root",
193                 Auth: []ssh.AuthMethod{
194                         ssh.PublicKeys(exr.signers...),
195                 },
196                 HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
197                         receivedKey = key
198                         return nil
199                 },
200                 Timeout: time.Minute,
201         })
202         if err != nil {
203                 return nil, err
204         } else if receivedKey == nil {
205                 return nil, errors.New("BUG: key was never provided to HostKeyCallback")
206         }
207
208         if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
209                 err = target.VerifyHostKey(receivedKey, client)
210                 if err != nil {
211                         return nil, err
212                 }
213                 exr.hostKey = receivedKey
214         }
215         return client, nil
216 }