14325: Propagate API env vars to crunch-run.
[arvados.git] / lib / dispatchcloud / ssh_executor / executor.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package ssh_executor provides an implementation of pool.Executor
6 // using a long-lived multiplexed SSH session.
7 package ssh_executor
8
9 import (
10         "bytes"
11         "errors"
12         "io"
13         "net"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/cloud"
18         "golang.org/x/crypto/ssh"
19 )
20
21 // New returns a new Executor, using the given target.
22 func New(t cloud.ExecutorTarget) *Executor {
23         return &Executor{target: t}
24 }
25
26 // An Executor uses a multiplexed SSH connection to execute shell
27 // commands on a remote target. It reconnects automatically after
28 // errors.
29 //
30 // When setting up a connection, the Executor accepts whatever host
31 // key is provided by the remote server, then passes the received key
32 // and the SSH connection to the target's VerifyHostKey method before
33 // executing commands on the connection.
34 //
35 // A zero Executor must not be used before calling SetTarget.
36 //
37 // An Executor must not be copied.
38 type Executor struct {
39         target  cloud.ExecutorTarget
40         signers []ssh.Signer
41         mtx     sync.RWMutex // controls access to instance after creation
42
43         client      *ssh.Client
44         clientErr   error
45         clientOnce  sync.Once     // initialized private state
46         clientSetup chan bool     // len>0 while client setup is in progress
47         hostKey     ssh.PublicKey // most recent host key that passed verification, if any
48 }
49
50 // SetSigners updates the set of private keys that will be offered to
51 // the target next time the Executor sets up a new connection.
52 func (exr *Executor) SetSigners(signers ...ssh.Signer) {
53         exr.mtx.Lock()
54         defer exr.mtx.Unlock()
55         exr.signers = signers
56 }
57
58 // SetTarget sets the current target. The new target will be used next
59 // time a new connection is set up; until then, the Executor will
60 // continue to use the existing target.
61 //
62 // The new target is assumed to represent the same host as the
63 // previous target, although its address and host key might differ.
64 func (exr *Executor) SetTarget(t cloud.ExecutorTarget) {
65         exr.mtx.Lock()
66         defer exr.mtx.Unlock()
67         exr.target = t
68 }
69
70 // Target returns the current target.
71 func (exr *Executor) Target() cloud.ExecutorTarget {
72         exr.mtx.RLock()
73         defer exr.mtx.RUnlock()
74         return exr.target
75 }
76
77 // Execute runs cmd on the target. If an existing connection is not
78 // usable, it sets up a new connection to the current target.
79 func (exr *Executor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
80         session, err := exr.newSession()
81         if err != nil {
82                 return nil, nil, err
83         }
84         defer session.Close()
85         for k, v := range env {
86                 err = session.Setenv(k, v)
87                 if err != nil {
88                         return nil, nil, err
89                 }
90         }
91         var stdout, stderr bytes.Buffer
92         session.Stdin = stdin
93         session.Stdout = &stdout
94         session.Stderr = &stderr
95         err = session.Run(cmd)
96         return stdout.Bytes(), stderr.Bytes(), err
97 }
98
99 // Close shuts down any active connections.
100 func (exr *Executor) Close() {
101         // Ensure exr is initialized
102         exr.sshClient(false)
103
104         exr.clientSetup <- true
105         if exr.client != nil {
106                 defer exr.client.Close()
107         }
108         exr.client, exr.clientErr = nil, errors.New("closed")
109         <-exr.clientSetup
110 }
111
112 // Create a new SSH session. If session setup fails or the SSH client
113 // hasn't been setup yet, setup a new SSH client and try again.
114 func (exr *Executor) newSession() (*ssh.Session, error) {
115         try := func(create bool) (*ssh.Session, error) {
116                 client, err := exr.sshClient(create)
117                 if err != nil {
118                         return nil, err
119                 }
120                 return client.NewSession()
121         }
122         session, err := try(false)
123         if err != nil {
124                 session, err = try(true)
125         }
126         return session, err
127 }
128
129 // Get the latest SSH client. If another goroutine is in the process
130 // of setting one up, wait for it to finish and return its result (or
131 // the last successfully setup client, if it fails).
132 func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
133         exr.clientOnce.Do(func() {
134                 exr.clientSetup = make(chan bool, 1)
135                 exr.clientErr = errors.New("client not yet created")
136         })
137         defer func() { <-exr.clientSetup }()
138         select {
139         case exr.clientSetup <- true:
140                 if create {
141                         client, err := exr.setupSSHClient()
142                         if err == nil || exr.client == nil {
143                                 if exr.client != nil {
144                                         // Hang up the previous
145                                         // (non-working) client
146                                         go exr.client.Close()
147                                 }
148                                 exr.client, exr.clientErr = client, err
149                         }
150                         if err != nil {
151                                 return nil, err
152                         }
153                 }
154         default:
155                 // Another goroutine is doing the above case.  Wait
156                 // for it to finish and return whatever it leaves in
157                 // wkr.client.
158                 exr.clientSetup <- true
159         }
160         return exr.client, exr.clientErr
161 }
162
163 // Create a new SSH client.
164 func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
165         target := exr.Target()
166         addr := target.Address()
167         if addr == "" {
168                 return nil, errors.New("instance has no address")
169         }
170         var receivedKey ssh.PublicKey
171         client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
172                 User: "root",
173                 Auth: []ssh.AuthMethod{
174                         ssh.PublicKeys(exr.signers...),
175                 },
176                 HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
177                         receivedKey = key
178                         return nil
179                 },
180                 Timeout: time.Minute,
181         })
182         if err != nil {
183                 return nil, err
184         } else if receivedKey == nil {
185                 return nil, errors.New("BUG: key was never provided to HostKeyCallback")
186         }
187
188         if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
189                 err = target.VerifyHostKey(receivedKey, client)
190                 if err != nil {
191                         return nil, err
192                 }
193                 exr.hostKey = receivedKey
194         }
195         return client, nil
196 }