#!/usr/bin/env ruby {% comment %} Copyright (C) The Arvados Authors. All rights reserved. SPDX-License-Identifier: CC-BY-SA-3.0 {% endcomment %} require 'rubygems' require 'cgi' require 'fileutils' require 'json' require 'net/https' require 'socket' require 'syslog' class ComputeNodePing @@NODEDATA_DIR = "/var/tmp/arv-node-data" @@PUPPET_CONFFILE = "/etc/puppet/puppet.conf" @@HOST_STATEFILE = "/var/run/arvados-compute-ping-hoststate.json" def initialize(args, stdout, stderr) @stdout = stdout @stderr = stderr @stderr_loglevel = ((args.first == "quiet") ? Syslog::LOG_ERR : Syslog::LOG_DEBUG) @puppet_disabled = false @syslog = Syslog.open("arvados-compute-ping", Syslog::LOG_CONS | Syslog::LOG_PID, Syslog::LOG_DAEMON) @puppetless = File.exist?('/compute-node.puppetless') begin prepare_ping load_puppet_conf unless @puppetless begin @host_state = JSON.parse(IO.read(@@HOST_STATEFILE)) rescue Errno::ENOENT @host_state = nil end rescue @syslog.close raise end end def send pong = send_raw_ping if pong["hostname"] and pong["domain"] and pong["first_ping_at"] if @host_state.nil? @host_state = { "fqdn" => (Socket.gethostbyname(Socket.gethostname).first rescue nil), "resumed_slurm" => ["busy", "idle"].include?(pong["crunch_worker_state"]), } update_host_state({}) end if hostname_changed?(pong) disable_puppet unless @puppetless rename_host(pong) update_host_state("fqdn" => fqdn_from_pong(pong), "resumed_slurm" => false) end unless @host_state["resumed_slurm"] run_puppet_agent unless @puppetless resume_slurm_node(pong["hostname"]) update_host_state("resumed_slurm" => true) end end log("Last ping at #{pong['last_ping_at']}") end def cleanup enable_puppet if @puppet_disabled and not @puppetless @syslog.close end private def log(message, level=Syslog::LOG_INFO) @syslog.log(level, message) if level <= @stderr_loglevel @stderr.write("#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{message}\n") end end def abort(message, code=1) log(message, Syslog::LOG_ERR) exit(code) end def run_and_check(cmd_a, accept_codes, io_opts, &block) result = IO.popen(cmd_a, "r", io_opts, &block) unless accept_codes.include?($?.exitstatus) abort("#{cmd_a} exited #{$?.exitstatus}") end result end DEFAULT_ACCEPT_CODES=[0] def check_output(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={}) # Run a command, check the exit status, and return its stdout as a string. run_and_check(cmd_a, accept_codes, io_opts) do |pipe| pipe.read end end def check_command(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={}) # Run a command, send stdout to syslog, and check the exit status. run_and_check(cmd_a, accept_codes, io_opts) do |pipe| pipe.each_line do |line| line.chomp! log("#{cmd_a.first}: #{line}") unless line.empty? end end end def replace_file(path, body) open(path, "w") { |f| f.write(body) } end def update_host_state(updates_h) @host_state.merge!(updates_h) replace_file(@@HOST_STATEFILE, @host_state.to_json) end def disable_puppet check_command(["puppet", "agent", "--disable"]) @puppet_disabled = true loop do # Wait for any running puppet agents to finish. check_output(["pgrep", "puppet"], 0..1) break if $?.exitstatus == 1 sleep(1) end end def enable_puppet check_command(["puppet", "agent", "--enable"]) @puppet_disabled = false end def prepare_ping begin ping_uri_s = File.read(File.join(@@NODEDATA_DIR, "arv-ping-url")) rescue Errno::ENOENT abort("ping URL file is not present yet, skipping run") end ping_uri = URI.parse(ping_uri_s) payload_h = CGI.parse(ping_uri.query) # Collect all extra data to be sent dirname = File.join(@@NODEDATA_DIR, "meta-data") Dir.open(dirname).each do |basename| filename = File.join(dirname, basename) if File.file?(filename) payload_h[basename.gsub('-', '_')] = File.read(filename).chomp end end ping_uri.query = nil @ping_req = Net::HTTP::Post.new(ping_uri.to_s) @ping_req.set_form_data(payload_h) @ping_client = Net::HTTP.new(ping_uri.host, ping_uri.port) @ping_client.use_ssl = ping_uri.scheme == 'https' end def send_raw_ping begin response = @ping_client.start do |http| http.request(@ping_req) end if response.is_a? Net::HTTPSuccess pong = JSON.parse(response.body) else raise "response was a #{response}" end rescue JSON::ParserError => error abort("Error sending ping: could not parse JSON response: #{error}") rescue => error abort("Error sending ping: #{error}") end replace_file(File.join(@@NODEDATA_DIR, "pong.json"), response.body) if pong["errors"] then log(pong["errors"].join("; "), Syslog::LOG_ERR) if pong["errors"].grep(/Incorrect ping_secret/).any? system("halt") end exit(1) end pong end def load_puppet_conf # Parse Puppet configuration suitable for rewriting. # Save certnames in @puppet_certnames. # Save other functional configuration lines in @puppet_conf. @puppet_conf = [] @puppet_certnames = [] open(@@PUPPET_CONFFILE, "r") do |conffile| conffile.each_line do |line| key, value = line.strip.split(/\s*=\s*/, 2) if key == "certname" @puppet_certnames << value elsif not (key.nil? or key.empty? or key.start_with?("#")) @puppet_conf << line end end end end def fqdn_from_pong(pong) "#{pong['hostname']}.#{pong['domain']}" end def certname_from_pong(pong) fqdn = fqdn_from_pong(pong).sub(".", ".compute.") "#{pong['first_ping_at'].gsub(':', '-').downcase}.#{fqdn}" end def hostname_changed?(pong) if @puppetless (@host_state["fqdn"] != fqdn_from_pong(pong)) else (@host_state["fqdn"] != fqdn_from_pong(pong)) or (@puppet_certnames != [certname_from_pong(pong)]) end end def rename_host(pong) new_fqdn = fqdn_from_pong(pong) log("Renaming host from #{@host_state["fqdn"]} to #{new_fqdn}") replace_file("/etc/hostname", "#{new_fqdn.split('.', 2).first}\n") check_output(["hostname", new_fqdn]) ip_address = check_output(["facter", "ipaddress"]).chomp esc_address = Regexp.escape(ip_address) check_command(["sed", "-i", "/etc/hosts", "-e", "s/^#{esc_address}.*$/#{ip_address}\t#{new_fqdn}/"]) unless @puppetless new_conflines = @puppet_conf + ["\n[agent]\n", "certname=#{certname_from_pong(pong)}\n"] replace_file(@@PUPPET_CONFFILE, new_conflines.join("")) FileUtils.remove_entry_secure("/var/lib/puppet/ssl") end end def run_puppet_agent log("Running puppet agent") enable_puppet check_command(["puppet", "agent", "--onetime", "--no-daemonize", "--no-splay", "--detailed-exitcodes", "--ignorecache", "--no-usecacheonfailure"], [0, 2], {err: [:child, :out]}) end def resume_slurm_node(node_name) current_state = check_output(["sinfo", "--noheader", "-o", "%t", "-n", node_name]).chomp if %w(down drain drng).include?(current_state) log("Resuming node in SLURM") check_command(["scontrol", "update", "NodeName=#{node_name}", "State=RESUME"], [0], {err: [:child, :out]}) end end end LOCK_DIRNAME = "/var/lock/arvados-compute-node.lock" begin Dir.mkdir(LOCK_DIRNAME) rescue Errno::EEXIST exit(0) end ping_sender = nil begin ping_sender = ComputeNodePing.new(ARGV, $stdout, $stderr) ping_sender.send ensure Dir.rmdir(LOCK_DIRNAME) ping_sender.cleanup unless ping_sender.nil? end