X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c04608e40b971b2fc342db8e5e44817b568a3a4c..25296def9590c8a29bdcf75c78a03444d39ca6b6:/doc/_includes/_compute_ping_rb.liquid diff --git a/doc/_includes/_compute_ping_rb.liquid b/doc/_includes/_compute_ping_rb.liquid new file mode 100644 index 0000000000..edd18e3e6e --- /dev/null +++ b/doc/_includes/_compute_ping_rb.liquid @@ -0,0 +1,285 @@ +#!/usr/bin/env ruby + +require 'rubygems' + +require 'cgi' +require 'fileutils' +require 'json' +require 'net/https' +require 'socket' +require 'syslog' + +class ComputeNodePing + @@NODEDATA_DIR = "/var/tmp/arv-node-data" + @@PUPPET_CONFFILE = "/etc/puppet/puppet.conf" + @@HOST_STATEFILE = "/var/run/arvados-compute-ping-hoststate.json" + + def initialize(args, stdout, stderr) + @stdout = stdout + @stderr = stderr + @stderr_loglevel = ((args.first == "quiet") ? + Syslog::LOG_ERR : Syslog::LOG_DEBUG) + @puppet_disabled = false + @syslog = Syslog.open("arvados-compute-ping", + Syslog::LOG_CONS | Syslog::LOG_PID, + Syslog::LOG_DAEMON) + @puppetless = File.exist?('/compute-node.puppetless') + + begin + prepare_ping + load_puppet_conf unless @puppetless + begin + @host_state = JSON.parse(IO.read(@@HOST_STATEFILE)) + rescue Errno::ENOENT + @host_state = nil + end + rescue + @syslog.close + raise + end + end + + def send + pong = send_raw_ping + + if pong["hostname"] and pong["domain"] and pong["first_ping_at"] + if @host_state.nil? + @host_state = { + "fqdn" => (Socket.gethostbyname(Socket.gethostname).first rescue nil), + "resumed_slurm" => + ["busy", "idle"].include?(pong["crunch_worker_state"]), + } + update_host_state({}) + end + + if hostname_changed?(pong) + disable_puppet unless @puppetless + rename_host(pong) + update_host_state("fqdn" => fqdn_from_pong(pong), + "resumed_slurm" => false) + end + + unless @host_state["resumed_slurm"] + run_puppet_agent unless @puppetless + resume_slurm_node(pong["hostname"]) + update_host_state("resumed_slurm" => true) + end + end + + log("Last ping at #{pong['last_ping_at']}") + end + + def cleanup + enable_puppet if @puppet_disabled and not @puppetless + @syslog.close + end + + private + + def log(message, level=Syslog::LOG_INFO) + @syslog.log(level, message) + if level <= @stderr_loglevel + @stderr.write("#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{message}\n") + end + end + + def abort(message, code=1) + log(message, Syslog::LOG_ERR) + exit(code) + end + + def run_and_check(cmd_a, accept_codes, io_opts, &block) + result = IO.popen(cmd_a, "r", io_opts, &block) + unless accept_codes.include?($?.exitstatus) + abort("#{cmd_a} exited #{$?.exitstatus}") + end + result + end + + DEFAULT_ACCEPT_CODES=[0] + def check_output(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={}) + # Run a command, check the exit status, and return its stdout as a string. + run_and_check(cmd_a, accept_codes, io_opts) do |pipe| + pipe.read + end + end + + def check_command(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={}) + # Run a command, send stdout to syslog, and check the exit status. + run_and_check(cmd_a, accept_codes, io_opts) do |pipe| + pipe.each_line do |line| + line.chomp! + log("#{cmd_a.first}: #{line}") unless line.empty? + end + end + end + + def replace_file(path, body) + open(path, "w") { |f| f.write(body) } + end + + def update_host_state(updates_h) + @host_state.merge!(updates_h) + replace_file(@@HOST_STATEFILE, @host_state.to_json) + end + + def disable_puppet + check_command(["puppet", "agent", "--disable"]) + @puppet_disabled = true + loop do + # Wait for any running puppet agents to finish. + check_output(["pgrep", "puppet"], 0..1) + break if $?.exitstatus == 1 + sleep(1) + end + end + + def enable_puppet + check_command(["puppet", "agent", "--enable"]) + @puppet_disabled = false + end + + def prepare_ping + begin + ping_uri_s = File.read(File.join(@@NODEDATA_DIR, "arv-ping-url")) + rescue Errno::ENOENT + abort("ping URL file is not present yet, skipping run") + end + + ping_uri = URI.parse(ping_uri_s) + payload_h = CGI.parse(ping_uri.query) + + # Collect all extra data to be sent + dirname = File.join(@@NODEDATA_DIR, "meta-data") + Dir.open(dirname).each do |basename| + filename = File.join(dirname, basename) + if File.file?(filename) + payload_h[basename.gsub('-', '_')] = File.read(filename).chomp + end + end + + ping_uri.query = nil + @ping_req = Net::HTTP::Post.new(ping_uri.to_s) + @ping_req.set_form_data(payload_h) + @ping_client = Net::HTTP.new(ping_uri.host, ping_uri.port) + @ping_client.use_ssl = ping_uri.scheme == 'https' + end + + def send_raw_ping + begin + response = @ping_client.start do |http| + http.request(@ping_req) + end + if response.is_a? Net::HTTPSuccess + pong = JSON.parse(response.body) + else + raise "response was a #{response}" + end + rescue JSON::ParserError => error + abort("Error sending ping: could not parse JSON response: #{error}") + rescue => error + abort("Error sending ping: #{error}") + end + + replace_file(File.join(@@NODEDATA_DIR, "pong.json"), response.body) + if pong["errors"] then + log(pong["errors"].join("; "), Syslog::LOG_ERR) + if pong["errors"].grep(/Incorrect ping_secret/).any? + system("halt") + end + exit(1) + end + pong + end + + def load_puppet_conf + # Parse Puppet configuration suitable for rewriting. + # Save certnames in @puppet_certnames. + # Save other functional configuration lines in @puppet_conf. + @puppet_conf = [] + @puppet_certnames = [] + open(@@PUPPET_CONFFILE, "r") do |conffile| + conffile.each_line do |line| + key, value = line.strip.split(/\s*=\s*/, 2) + if key == "certname" + @puppet_certnames << value + elsif not (key.nil? or key.empty? or key.start_with?("#")) + @puppet_conf << line + end + end + end + end + + def fqdn_from_pong(pong) + "#{pong['hostname']}.#{pong['domain']}" + end + + def certname_from_pong(pong) + fqdn = fqdn_from_pong(pong).sub(".", ".compute.") + "#{pong['first_ping_at'].gsub(':', '-').downcase}.#{fqdn}" + end + + def hostname_changed?(pong) + if @puppetless + (@host_state["fqdn"] != fqdn_from_pong(pong)) + else + (@host_state["fqdn"] != fqdn_from_pong(pong)) or + (@puppet_certnames != [certname_from_pong(pong)]) + end + end + + def rename_host(pong) + new_fqdn = fqdn_from_pong(pong) + log("Renaming host from #{@host_state["fqdn"]} to #{new_fqdn}") + + replace_file("/etc/hostname", "#{new_fqdn.split('.', 2).first}\n") + check_output(["hostname", new_fqdn]) + + ip_address = check_output(["facter", "ipaddress"]).chomp + esc_address = Regexp.escape(ip_address) + check_command(["sed", "-i", "/etc/hosts", + "-e", "s/^#{esc_address}.*$/#{ip_address}\t#{new_fqdn}/"]) + + unless @puppetless + new_conflines = @puppet_conf + ["\n[agent]\n", + "certname=#{certname_from_pong(pong)}\n"] + replace_file(@@PUPPET_CONFFILE, new_conflines.join("")) + FileUtils.remove_entry_secure("/var/lib/puppet/ssl") + end + end + + def run_puppet_agent + log("Running puppet agent") + enable_puppet + check_command(["puppet", "agent", "--onetime", "--no-daemonize", + "--no-splay", "--detailed-exitcodes", + "--ignorecache", "--no-usecacheonfailure"], + [0, 2], {err: [:child, :out]}) + end + + def resume_slurm_node(node_name) + current_state = check_output(["sinfo", "--noheader", "-o", "%t", + "-n", node_name]).chomp + if %w(down drain drng).include?(current_state) + log("Resuming node in SLURM") + check_command(["scontrol", "update", "NodeName=#{node_name}", + "State=RESUME"], [0], {err: [:child, :out]}) + end + end +end + +LOCK_DIRNAME = "/var/lock/arvados-compute-node.lock" +begin + Dir.mkdir(LOCK_DIRNAME) +rescue Errno::EEXIST + exit(0) +end + +ping_sender = nil +begin + ping_sender = ComputeNodePing.new(ARGV, $stdout, $stderr) + ping_sender.send +ensure + Dir.rmdir(LOCK_DIRNAME) + ping_sender.cleanup unless ping_sender.nil? +end