1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package ssh_executor provides an implementation of pool.Executor
6 // using a long-lived multiplexed SSH session.
17 "git.curoverse.com/arvados.git/lib/cloud"
18 "golang.org/x/crypto/ssh"
21 // New returns a new Executor, using the given target.
22 func New(t cloud.ExecutorTarget) *Executor {
23 return &Executor{target: t}
26 // An Executor uses a multiplexed SSH connection to execute shell
27 // commands on a remote target. It reconnects automatically after
30 // When setting up a connection, the Executor accepts whatever host
31 // key is provided by the remote server, then passes the received key
32 // and the SSH connection to the target's VerifyHostKey method before
33 // executing commands on the connection.
35 // A zero Executor must not be used before calling SetTarget.
37 // An Executor must not be copied.
38 type Executor struct {
39 target cloud.ExecutorTarget
41 mtx sync.RWMutex // controls access to instance after creation
45 clientOnce sync.Once // initialized private state
46 clientSetup chan bool // len>0 while client setup is in progress
47 hostKey ssh.PublicKey // most recent host key that passed verification, if any
50 // SetSigners updates the set of private keys that will be offered to
51 // the target next time the Executor sets up a new connection.
52 func (exr *Executor) SetSigners(signers ...ssh.Signer) {
54 defer exr.mtx.Unlock()
58 // SetTarget sets the current target. The new target will be used next
59 // time a new connection is set up; until then, the Executor will
60 // continue to use the existing target.
62 // The new target is assumed to represent the same host as the
63 // previous target, although its address and host key might differ.
64 func (exr *Executor) SetTarget(t cloud.ExecutorTarget) {
66 defer exr.mtx.Unlock()
70 // Target returns the current target.
71 func (exr *Executor) Target() cloud.ExecutorTarget {
73 defer exr.mtx.RUnlock()
77 // Execute runs cmd on the target. If an existing connection is not
78 // usable, it sets up a new connection to the current target.
79 func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
80 session, err := exr.newSession()
85 var stdout, stderr bytes.Buffer
87 session.Stdout = &stdout
88 session.Stderr = &stderr
89 err = session.Run(cmd)
90 return stdout.Bytes(), stderr.Bytes(), err
93 // Close shuts down any active connections.
94 func (exr *Executor) Close() {
95 // Ensure exr is initialized
98 exr.clientSetup <- true
99 if exr.client != nil {
100 defer exr.client.Close()
102 exr.client, exr.clientErr = nil, errors.New("closed")
106 // Create a new SSH session. If session setup fails or the SSH client
107 // hasn't been setup yet, setup a new SSH client and try again.
108 func (exr *Executor) newSession() (*ssh.Session, error) {
109 try := func(create bool) (*ssh.Session, error) {
110 client, err := exr.sshClient(create)
114 return client.NewSession()
116 session, err := try(false)
118 session, err = try(true)
123 // Get the latest SSH client. If another goroutine is in the process
124 // of setting one up, wait for it to finish and return its result (or
125 // the last successfully setup client, if it fails).
126 func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
127 exr.clientOnce.Do(func() {
128 exr.clientSetup = make(chan bool, 1)
129 exr.clientErr = errors.New("client not yet created")
131 defer func() { <-exr.clientSetup }()
133 case exr.clientSetup <- true:
135 client, err := exr.setupSSHClient()
136 if err == nil || exr.client == nil {
137 if exr.client != nil {
138 // Hang up the previous
139 // (non-working) client
140 go exr.client.Close()
142 exr.client, exr.clientErr = client, err
149 // Another goroutine is doing the above case. Wait
150 // for it to finish and return whatever it leaves in
152 exr.clientSetup <- true
154 return exr.client, exr.clientErr
157 // Create a new SSH client.
158 func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
159 target := exr.Target()
160 addr := target.Address()
162 return nil, errors.New("instance has no address")
164 var receivedKey ssh.PublicKey
165 client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
167 Auth: []ssh.AuthMethod{
168 ssh.PublicKeys(exr.signers...),
170 HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
174 Timeout: time.Minute,
178 } else if receivedKey == nil {
179 return nil, errors.New("BUG: key was never provided to HostKeyCallback")
182 if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
183 err = target.VerifyHostKey(receivedKey, client)
187 exr.hostKey = receivedKey