Merge branch '12764-writable-file' refs #12764
[arvados.git] / doc / _includes / _compute_ping_rb.liquid
1 #!/usr/bin/env ruby
2 {% comment %}
3 Copyright (C) The Arvados Authors. All rights reserved.
4
5 SPDX-License-Identifier: CC-BY-SA-3.0
6 {% endcomment %}
7
8 require 'rubygems'
9
10 require 'cgi'
11 require 'fileutils'
12 require 'json'
13 require 'net/https'
14 require 'socket'
15 require 'syslog'
16
17 class ComputeNodePing
18   @@NODEDATA_DIR = "/var/tmp/arv-node-data"
19   @@PUPPET_CONFFILE = "/etc/puppet/puppet.conf"
20   @@HOST_STATEFILE = "/var/run/arvados-compute-ping-hoststate.json"
21
22   def initialize(args, stdout, stderr)
23     @stdout = stdout
24     @stderr = stderr
25     @stderr_loglevel = ((args.first == "quiet") ?
26                         Syslog::LOG_ERR : Syslog::LOG_DEBUG)
27     @puppet_disabled = false
28     @syslog = Syslog.open("arvados-compute-ping",
29                           Syslog::LOG_CONS | Syslog::LOG_PID,
30                           Syslog::LOG_DAEMON)
31     @puppetless = File.exist?('/compute-node.puppetless')
32
33     begin
34       prepare_ping
35       load_puppet_conf unless @puppetless
36       begin
37         @host_state = JSON.parse(IO.read(@@HOST_STATEFILE))
38       rescue Errno::ENOENT
39         @host_state = nil
40       end
41     rescue
42       @syslog.close
43       raise
44     end
45   end
46
47   def send
48     pong = send_raw_ping
49
50     if pong["hostname"] and pong["domain"] and pong["first_ping_at"]
51       if @host_state.nil?
52         @host_state = {
53           "fqdn" => (Socket.gethostbyname(Socket.gethostname).first rescue nil),
54           "resumed_slurm" =>
55             ["busy", "idle"].include?(pong["crunch_worker_state"]),
56         }
57         update_host_state({})
58       end
59
60       if hostname_changed?(pong)
61         disable_puppet unless @puppetless
62         rename_host(pong)
63         update_host_state("fqdn" => fqdn_from_pong(pong),
64                           "resumed_slurm" => false)
65       end
66
67       unless @host_state["resumed_slurm"]
68         run_puppet_agent unless @puppetless
69         resume_slurm_node(pong["hostname"])
70         update_host_state("resumed_slurm" => true)
71       end
72     end
73
74     log("Last ping at #{pong['last_ping_at']}")
75   end
76
77   def cleanup
78     enable_puppet if @puppet_disabled and not @puppetless
79     @syslog.close
80   end
81
82   private
83
84   def log(message, level=Syslog::LOG_INFO)
85     @syslog.log(level, message)
86     if level <= @stderr_loglevel
87       @stderr.write("#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{message}\n")
88     end
89   end
90
91   def abort(message, code=1)
92     log(message, Syslog::LOG_ERR)
93     exit(code)
94   end
95
96   def run_and_check(cmd_a, accept_codes, io_opts, &block)
97     result = IO.popen(cmd_a, "r", io_opts, &block)
98     unless accept_codes.include?($?.exitstatus)
99       abort("#{cmd_a} exited #{$?.exitstatus}")
100     end
101     result
102   end
103
104   DEFAULT_ACCEPT_CODES=[0]
105   def check_output(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={})
106     # Run a command, check the exit status, and return its stdout as a string.
107     run_and_check(cmd_a, accept_codes, io_opts) do |pipe|
108       pipe.read
109     end
110   end
111
112   def check_command(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={})
113     # Run a command, send stdout to syslog, and check the exit status.
114     run_and_check(cmd_a, accept_codes, io_opts) do |pipe|
115       pipe.each_line do |line|
116         line.chomp!
117         log("#{cmd_a.first}: #{line}") unless line.empty?
118       end
119     end
120   end
121
122   def replace_file(path, body)
123     open(path, "w") { |f| f.write(body) }
124   end
125
126   def update_host_state(updates_h)
127     @host_state.merge!(updates_h)
128     replace_file(@@HOST_STATEFILE, @host_state.to_json)
129   end
130
131   def disable_puppet
132     check_command(["puppet", "agent", "--disable"])
133     @puppet_disabled = true
134     loop do
135       # Wait for any running puppet agents to finish.
136       check_output(["pgrep", "puppet"], 0..1)
137       break if $?.exitstatus == 1
138       sleep(1)
139     end
140   end
141
142   def enable_puppet
143     check_command(["puppet", "agent", "--enable"])
144     @puppet_disabled = false
145   end
146
147   def prepare_ping
148     begin
149       ping_uri_s = File.read(File.join(@@NODEDATA_DIR, "arv-ping-url"))
150     rescue Errno::ENOENT
151       abort("ping URL file is not present yet, skipping run")
152     end
153
154     ping_uri = URI.parse(ping_uri_s)
155     payload_h = CGI.parse(ping_uri.query)
156
157     # Collect all extra data to be sent
158     dirname = File.join(@@NODEDATA_DIR, "meta-data")
159     Dir.open(dirname).each do |basename|
160       filename = File.join(dirname, basename)
161       if File.file?(filename)
162         payload_h[basename.gsub('-', '_')] = File.read(filename).chomp
163       end
164     end
165
166     ping_uri.query = nil
167     @ping_req = Net::HTTP::Post.new(ping_uri.to_s)
168     @ping_req.set_form_data(payload_h)
169     @ping_client = Net::HTTP.new(ping_uri.host, ping_uri.port)
170     @ping_client.use_ssl = ping_uri.scheme == 'https'
171   end
172
173   def send_raw_ping
174     begin
175       response = @ping_client.start do |http|
176         http.request(@ping_req)
177       end
178       if response.is_a? Net::HTTPSuccess
179         pong = JSON.parse(response.body)
180       else
181         raise "response was a #{response}"
182       end
183     rescue JSON::ParserError => error
184       abort("Error sending ping: could not parse JSON response: #{error}")
185     rescue => error
186       abort("Error sending ping: #{error}")
187     end
188
189     replace_file(File.join(@@NODEDATA_DIR, "pong.json"), response.body)
190     if pong["errors"] then
191       log(pong["errors"].join("; "), Syslog::LOG_ERR)
192       if pong["errors"].grep(/Incorrect ping_secret/).any?
193         system("halt")
194       end
195       exit(1)
196     end
197     pong
198   end
199
200   def load_puppet_conf
201     # Parse Puppet configuration suitable for rewriting.
202     # Save certnames in @puppet_certnames.
203     # Save other functional configuration lines in @puppet_conf.
204     @puppet_conf = []
205     @puppet_certnames = []
206     open(@@PUPPET_CONFFILE, "r") do |conffile|
207       conffile.each_line do |line|
208         key, value = line.strip.split(/\s*=\s*/, 2)
209         if key == "certname"
210           @puppet_certnames << value
211         elsif not (key.nil? or key.empty? or key.start_with?("#"))
212           @puppet_conf << line
213         end
214       end
215     end
216   end
217
218   def fqdn_from_pong(pong)
219     "#{pong['hostname']}.#{pong['domain']}"
220   end
221
222   def certname_from_pong(pong)
223     fqdn = fqdn_from_pong(pong).sub(".", ".compute.")
224     "#{pong['first_ping_at'].gsub(':', '-').downcase}.#{fqdn}"
225   end
226
227   def hostname_changed?(pong)
228     if @puppetless
229       (@host_state["fqdn"] != fqdn_from_pong(pong))
230     else
231       (@host_state["fqdn"] != fqdn_from_pong(pong)) or
232         (@puppet_certnames != [certname_from_pong(pong)])
233     end
234   end
235
236   def rename_host(pong)
237     new_fqdn = fqdn_from_pong(pong)
238     log("Renaming host from #{@host_state["fqdn"]} to #{new_fqdn}")
239
240     replace_file("/etc/hostname", "#{new_fqdn.split('.', 2).first}\n")
241     check_output(["hostname", new_fqdn])
242
243     ip_address = check_output(["facter", "ipaddress"]).chomp
244     esc_address = Regexp.escape(ip_address)
245     check_command(["sed", "-i", "/etc/hosts",
246                    "-e", "s/^#{esc_address}.*$/#{ip_address}\t#{new_fqdn}/"])
247
248     unless @puppetless
249       new_conflines = @puppet_conf + ["\n[agent]\n",
250                                       "certname=#{certname_from_pong(pong)}\n"]
251       replace_file(@@PUPPET_CONFFILE, new_conflines.join(""))
252       FileUtils.remove_entry_secure("/var/lib/puppet/ssl")
253     end
254   end
255
256   def run_puppet_agent
257     log("Running puppet agent")
258     enable_puppet
259     check_command(["puppet", "agent", "--onetime", "--no-daemonize",
260                    "--no-splay", "--detailed-exitcodes",
261                    "--ignorecache", "--no-usecacheonfailure"],
262                   [0, 2], {err: [:child, :out]})
263   end
264
265   def resume_slurm_node(node_name)
266     current_state = check_output(["sinfo", "--noheader", "-o", "%t",
267                                   "-n", node_name]).chomp
268     if %w(down drain drng).include?(current_state)
269       log("Resuming node in SLURM")
270       check_command(["scontrol", "update", "NodeName=#{node_name}",
271                      "State=RESUME"], [0], {err: [:child, :out]})
272     end
273   end
274 end
275
276 LOCK_DIRNAME = "/var/lock/arvados-compute-node.lock"
277 begin
278   Dir.mkdir(LOCK_DIRNAME)
279 rescue Errno::EEXIST
280   exit(0)
281 end
282
283 ping_sender = nil
284 begin
285   ping_sender = ComputeNodePing.new(ARGV, $stdout, $stderr)
286   ping_sender.send
287 ensure
288   Dir.rmdir(LOCK_DIRNAME)
289   ping_sender.cleanup unless ping_sender.nil?
290 end