1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package sshexecutor provides an implementation of pool.Executor
6 // using a long-lived multiplexed SSH session.
17 "git.arvados.org/arvados.git/lib/cloud"
18 "golang.org/x/crypto/ssh"
21 var ErrNoAddress = errors.New("instance has no address")
23 // New returns a new Executor, using the given target.
24 func New(t cloud.ExecutorTarget) *Executor {
25 return &Executor{target: t}
28 // An Executor uses a multiplexed SSH connection to execute shell
29 // commands on a remote target. It reconnects automatically after
32 // When setting up a connection, the Executor accepts whatever host
33 // key is provided by the remote server, then passes the received key
34 // and the SSH connection to the target's VerifyHostKey method before
35 // executing commands on the connection.
37 // A zero Executor must not be used before calling SetTarget.
39 // An Executor must not be copied.
40 type Executor struct {
41 target cloud.ExecutorTarget
45 mtx sync.RWMutex // controls access to instance after creation
49 clientOnce sync.Once // initialized private state
50 clientSetup chan bool // len>0 while client setup is in progress
51 hostKey ssh.PublicKey // most recent host key that passed verification, if any
54 // SetSigners updates the set of private keys that will be offered to
55 // the target next time the Executor sets up a new connection.
56 func (exr *Executor) SetSigners(signers ...ssh.Signer) {
58 defer exr.mtx.Unlock()
62 // SetTarget sets the current target. The new target will be used next
63 // time a new connection is set up; until then, the Executor will
64 // continue to use the existing target.
66 // The new target is assumed to represent the same host as the
67 // previous target, although its address and host key might differ.
68 func (exr *Executor) SetTarget(t cloud.ExecutorTarget) {
70 defer exr.mtx.Unlock()
74 // SetTargetPort sets the default port (name or number) to connect
75 // to. This is used only when the address returned by the target's
76 // Address() method does not specify a port. If the given port is
77 // empty (or SetTargetPort is not called at all), the default port is
79 func (exr *Executor) SetTargetPort(port string) {
81 defer exr.mtx.Unlock()
85 // Target returns the current target.
86 func (exr *Executor) Target() cloud.ExecutorTarget {
88 defer exr.mtx.RUnlock()
92 // Execute runs cmd on the target. If an existing connection is not
93 // usable, it sets up a new connection to the current target.
94 func (exr *Executor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
95 session, err := exr.newSession()
100 for k, v := range env {
101 err = session.Setenv(k, v)
106 var stdout, stderr bytes.Buffer
107 session.Stdin = stdin
108 session.Stdout = &stdout
109 session.Stderr = &stderr
110 err = session.Run(cmd)
111 return stdout.Bytes(), stderr.Bytes(), err
114 // Close shuts down any active connections.
115 func (exr *Executor) Close() {
116 // Ensure exr is initialized
119 exr.clientSetup <- true
120 if exr.client != nil {
121 defer exr.client.Close()
123 exr.client, exr.clientErr = nil, errors.New("closed")
127 // Create a new SSH session. If session setup fails or the SSH client
128 // hasn't been setup yet, setup a new SSH client and try again.
129 func (exr *Executor) newSession() (*ssh.Session, error) {
130 try := func(create bool) (*ssh.Session, error) {
131 client, err := exr.sshClient(create)
135 return client.NewSession()
137 session, err := try(false)
139 session, err = try(true)
144 // Get the latest SSH client. If another goroutine is in the process
145 // of setting one up, wait for it to finish and return its result (or
146 // the last successfully setup client, if it fails).
147 func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
148 exr.clientOnce.Do(func() {
149 exr.clientSetup = make(chan bool, 1)
150 exr.clientErr = errors.New("client not yet created")
152 defer func() { <-exr.clientSetup }()
154 case exr.clientSetup <- true:
156 client, err := exr.setupSSHClient()
157 if err == nil || exr.client == nil {
158 if exr.client != nil {
159 // Hang up the previous
160 // (non-working) client
161 go exr.client.Close()
163 exr.client, exr.clientErr = client, err
170 // Another goroutine is doing the above case. Wait
171 // for it to finish and return whatever it leaves in
173 exr.clientSetup <- true
175 return exr.client, exr.clientErr
178 func (exr *Executor) TargetHostPort() (string, string) {
179 addr := exr.Target().Address()
183 h, p, err := net.SplitHostPort(addr)
184 if err != nil || p == "" {
185 // Target address does not specify a port. Use
186 // targetPort, or "ssh".
190 if p = exr.targetPort; p == "" {
197 // Create a new SSH client.
198 func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
199 addr := net.JoinHostPort(exr.TargetHostPort())
201 return nil, ErrNoAddress
203 var receivedKey ssh.PublicKey
204 client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
205 User: exr.Target().RemoteUser(),
206 Auth: []ssh.AuthMethod{
207 ssh.PublicKeys(exr.signers...),
209 HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
213 Timeout: time.Minute,
217 } else if receivedKey == nil {
218 return nil, errors.New("BUG: key was never provided to HostKeyCallback")
221 if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
222 err = exr.Target().VerifyHostKey(receivedKey, client)
226 exr.hostKey = receivedKey