Merge branch 'master' into 8019-crunchrun-log-throttle
[arvados.git] / doc / _includes / _compute_ping_rb.liquid
1 #!/usr/bin/env ruby
2
3 require 'rubygems'
4
5 require 'cgi'
6 require 'fileutils'
7 require 'json'
8 require 'net/https'
9 require 'socket'
10 require 'syslog'
11
12 class ComputeNodePing
13   @@NODEDATA_DIR = "/var/tmp/arv-node-data"
14   @@PUPPET_CONFFILE = "/etc/puppet/puppet.conf"
15   @@HOST_STATEFILE = "/var/run/arvados-compute-ping-hoststate.json"
16
17   def initialize(args, stdout, stderr)
18     @stdout = stdout
19     @stderr = stderr
20     @stderr_loglevel = ((args.first == "quiet") ?
21                         Syslog::LOG_ERR : Syslog::LOG_DEBUG)
22     @puppet_disabled = false
23     @syslog = Syslog.open("arvados-compute-ping",
24                           Syslog::LOG_CONS | Syslog::LOG_PID,
25                           Syslog::LOG_DAEMON)
26     @puppetless = File.exist?('/compute-node.puppetless')
27
28     begin
29       prepare_ping
30       load_puppet_conf unless @puppetless
31       begin
32         @host_state = JSON.parse(IO.read(@@HOST_STATEFILE))
33       rescue Errno::ENOENT
34         @host_state = nil
35       end
36     rescue
37       @syslog.close
38       raise
39     end
40   end
41
42   def send
43     pong = send_raw_ping
44
45     if pong["hostname"] and pong["domain"] and pong["first_ping_at"]
46       if @host_state.nil?
47         @host_state = {
48           "fqdn" => (Socket.gethostbyname(Socket.gethostname).first rescue nil),
49           "resumed_slurm" =>
50             ["busy", "idle"].include?(pong["crunch_worker_state"]),
51         }
52         update_host_state({})
53       end
54
55       if hostname_changed?(pong)
56         disable_puppet unless @puppetless
57         rename_host(pong)
58         update_host_state("fqdn" => fqdn_from_pong(pong),
59                           "resumed_slurm" => false)
60       end
61
62       unless @host_state["resumed_slurm"]
63         run_puppet_agent unless @puppetless
64         resume_slurm_node(pong["hostname"])
65         update_host_state("resumed_slurm" => true)
66       end
67     end
68
69     log("Last ping at #{pong['last_ping_at']}")
70   end
71
72   def cleanup
73     enable_puppet if @puppet_disabled and not @puppetless
74     @syslog.close
75   end
76
77   private
78
79   def log(message, level=Syslog::LOG_INFO)
80     @syslog.log(level, message)
81     if level <= @stderr_loglevel
82       @stderr.write("#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{message}\n")
83     end
84   end
85
86   def abort(message, code=1)
87     log(message, Syslog::LOG_ERR)
88     exit(code)
89   end
90
91   def run_and_check(cmd_a, accept_codes, io_opts, &block)
92     result = IO.popen(cmd_a, "r", io_opts, &block)
93     unless accept_codes.include?($?.exitstatus)
94       abort("#{cmd_a} exited #{$?.exitstatus}")
95     end
96     result
97   end
98
99   DEFAULT_ACCEPT_CODES=[0]
100   def check_output(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={})
101     # Run a command, check the exit status, and return its stdout as a string.
102     run_and_check(cmd_a, accept_codes, io_opts) do |pipe|
103       pipe.read
104     end
105   end
106
107   def check_command(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={})
108     # Run a command, send stdout to syslog, and check the exit status.
109     run_and_check(cmd_a, accept_codes, io_opts) do |pipe|
110       pipe.each_line do |line|
111         line.chomp!
112         log("#{cmd_a.first}: #{line}") unless line.empty?
113       end
114     end
115   end
116
117   def replace_file(path, body)
118     open(path, "w") { |f| f.write(body) }
119   end
120
121   def update_host_state(updates_h)
122     @host_state.merge!(updates_h)
123     replace_file(@@HOST_STATEFILE, @host_state.to_json)
124   end
125
126   def disable_puppet
127     check_command(["puppet", "agent", "--disable"])
128     @puppet_disabled = true
129     loop do
130       # Wait for any running puppet agents to finish.
131       check_output(["pgrep", "puppet"], 0..1)
132       break if $?.exitstatus == 1
133       sleep(1)
134     end
135   end
136
137   def enable_puppet
138     check_command(["puppet", "agent", "--enable"])
139     @puppet_disabled = false
140   end
141
142   def prepare_ping
143     begin
144       ping_uri_s = File.read(File.join(@@NODEDATA_DIR, "arv-ping-url"))
145     rescue Errno::ENOENT
146       abort("ping URL file is not present yet, skipping run")
147     end
148
149     ping_uri = URI.parse(ping_uri_s)
150     payload_h = CGI.parse(ping_uri.query)
151
152     # Collect all extra data to be sent
153     dirname = File.join(@@NODEDATA_DIR, "meta-data")
154     Dir.open(dirname).each do |basename|
155       filename = File.join(dirname, basename)
156       if File.file?(filename)
157         payload_h[basename.gsub('-', '_')] = File.read(filename).chomp
158       end
159     end
160
161     ping_uri.query = nil
162     @ping_req = Net::HTTP::Post.new(ping_uri.to_s)
163     @ping_req.set_form_data(payload_h)
164     @ping_client = Net::HTTP.new(ping_uri.host, ping_uri.port)
165     @ping_client.use_ssl = ping_uri.scheme == 'https'
166   end
167
168   def send_raw_ping
169     begin
170       response = @ping_client.start do |http|
171         http.request(@ping_req)
172       end
173       if response.is_a? Net::HTTPSuccess
174         pong = JSON.parse(response.body)
175       else
176         raise "response was a #{response}"
177       end
178     rescue JSON::ParserError => error
179       abort("Error sending ping: could not parse JSON response: #{error}")
180     rescue => error
181       abort("Error sending ping: #{error}")
182     end
183
184     replace_file(File.join(@@NODEDATA_DIR, "pong.json"), response.body)
185     if pong["errors"] then
186       log(pong["errors"].join("; "), Syslog::LOG_ERR)
187       if pong["errors"].grep(/Incorrect ping_secret/).any?
188         system("halt")
189       end
190       exit(1)
191     end
192     pong
193   end
194
195   def load_puppet_conf
196     # Parse Puppet configuration suitable for rewriting.
197     # Save certnames in @puppet_certnames.
198     # Save other functional configuration lines in @puppet_conf.
199     @puppet_conf = []
200     @puppet_certnames = []
201     open(@@PUPPET_CONFFILE, "r") do |conffile|
202       conffile.each_line do |line|
203         key, value = line.strip.split(/\s*=\s*/, 2)
204         if key == "certname"
205           @puppet_certnames << value
206         elsif not (key.nil? or key.empty? or key.start_with?("#"))
207           @puppet_conf << line
208         end
209       end
210     end
211   end
212
213   def fqdn_from_pong(pong)
214     "#{pong['hostname']}.#{pong['domain']}"
215   end
216
217   def certname_from_pong(pong)
218     fqdn = fqdn_from_pong(pong).sub(".", ".compute.")
219     "#{pong['first_ping_at'].gsub(':', '-').downcase}.#{fqdn}"
220   end
221
222   def hostname_changed?(pong)
223     if @puppetless
224       (@host_state["fqdn"] != fqdn_from_pong(pong))
225     else
226       (@host_state["fqdn"] != fqdn_from_pong(pong)) or
227         (@puppet_certnames != [certname_from_pong(pong)])
228     end
229   end
230
231   def rename_host(pong)
232     new_fqdn = fqdn_from_pong(pong)
233     log("Renaming host from #{@host_state["fqdn"]} to #{new_fqdn}")
234
235     replace_file("/etc/hostname", "#{new_fqdn.split('.', 2).first}\n")
236     check_output(["hostname", new_fqdn])
237
238     ip_address = check_output(["facter", "ipaddress"]).chomp
239     esc_address = Regexp.escape(ip_address)
240     check_command(["sed", "-i", "/etc/hosts",
241                    "-e", "s/^#{esc_address}.*$/#{ip_address}\t#{new_fqdn}/"])
242
243     unless @puppetless
244       new_conflines = @puppet_conf + ["\n[agent]\n",
245                                       "certname=#{certname_from_pong(pong)}\n"]
246       replace_file(@@PUPPET_CONFFILE, new_conflines.join(""))
247       FileUtils.remove_entry_secure("/var/lib/puppet/ssl")
248     end
249   end
250
251   def run_puppet_agent
252     log("Running puppet agent")
253     enable_puppet
254     check_command(["puppet", "agent", "--onetime", "--no-daemonize",
255                    "--no-splay", "--detailed-exitcodes",
256                    "--ignorecache", "--no-usecacheonfailure"],
257                   [0, 2], {err: [:child, :out]})
258   end
259
260   def resume_slurm_node(node_name)
261     current_state = check_output(["sinfo", "--noheader", "-o", "%t",
262                                   "-n", node_name]).chomp
263     if %w(down drain drng).include?(current_state)
264       log("Resuming node in SLURM")
265       check_command(["scontrol", "update", "NodeName=#{node_name}",
266                      "State=RESUME"], [0], {err: [:child, :out]})
267     end
268   end
269 end
270
271 LOCK_DIRNAME = "/var/lock/arvados-compute-node.lock"
272 begin
273   Dir.mkdir(LOCK_DIRNAME)
274 rescue Errno::EEXIST
275   exit(0)
276 end
277
278 ping_sender = nil
279 begin
280   ping_sender = ComputeNodePing.new(ARGV, $stdout, $stderr)
281   ping_sender.send
282 ensure
283   Dir.rmdir(LOCK_DIRNAME)
284   ping_sender.cleanup unless ping_sender.nil?
285 end