send queue/worker stats while waiting for job to start. fixes #1591
[arvados.git] / services / keep / keep.rb
1 #!/usr/bin/env ruby
2
3 require 'sinatra/base'
4 require 'digest/md5'
5 require 'digest/sha1'
6 require 'arvados'
7
8 class Keep < Sinatra::Base
9   @@ssl_flag = false
10   def self.ssl_flag
11     @@ssl_flag
12   end
13
14   configure do
15     mime_type :binary, 'application/octet-stream'
16     enable :logging
17     set :port, (ENV['PORT'] || '25107').to_i
18     set :bind, (ENV['IP'] || '0.0.0.0')
19   end
20
21   def verify_hash(data, hash)
22     if hash.length == 32
23       Digest::MD5.hexdigest(data) == hash && hash
24     elsif hash.length == 40
25       Digest::SHA1.hexdigest(data) == hash && hash
26     else
27       false
28     end
29   end
30
31   def self.debuglevel
32     if ENV['DEBUG'] and ENV['DEBUG'].match /^-?\d+/
33       ENV['DEBUG'].to_i
34     else
35       0
36     end
37   end
38
39   def self.debuglog(loglevel, msg)
40     if debuglevel >= loglevel
41       $stderr.puts "[keepd/#{$$} #{Time.now}] #{msg}"
42     end
43   end
44   def debuglog(*args)
45     self.class.debuglog *args
46   end
47
48   def self.keepdirs
49     return @@keepdirs if defined? @@keepdirs
50     # Configure backing store directories
51     @@keepdirs = []
52     rootdir = (ENV['KEEP_ROOT'] || '/').sub /\/$/, ''
53     `mount`.split("\n").each do |mountline|
54       dev, on_txt, mountpoint, type_txt, fstype, opts = mountline.split
55       if on_txt == 'on' and type_txt == 'type'
56         debuglog 2, "dir #{mountpoint} is mounted"
57         if mountpoint[0..(rootdir.length)] == rootdir + '/'
58           debuglog 2, "dir #{mountpoint} is in #{rootdir}/"
59           keepdir = "#{mountpoint.sub /\/$/, ''}/keep"
60           if File.exists? "#{keepdir}/."
61             kd = {
62               :root => keepdir,
63               :arvados => {},
64               :arvados_file => File.join(keepdir, 'arvados_keep_disk.json'),
65               :readonly => false,
66               :device => dev,
67               :device_inode => File.stat(dev).ino
68             }
69             if opts.gsub(/[\(\)]/, '').split(',').index('ro')
70               kd[:readonly] = true
71             end
72             debuglog 2, "keepdir #{kd.inspect}"
73             begin
74               kd[:arvados] = JSON.parse(File.read(kd[:arvados_file]), symbolize_names: true)
75             rescue
76               debuglog 0, "keepdir #{kd.inspect} is new (no #{kd[:arvados_file]})"
77             end
78             @@keepdirs << kd
79           end
80         end
81       end
82     end
83     Dir.open('/dev/disk/by-uuid/').each do |fs_uuid|
84       next if fs_uuid.match /^\./
85       fs_root_inode = File.stat("/dev/disk/by-uuid/#{fs_uuid}").ino
86       @@keepdirs.each do |kd|
87         if kd[:device_inode] == fs_root_inode
88           kd[:filesystem_uuid] = fs_uuid
89           debuglog 0, "keepdir #{kd.reject { |k,v| k==:arvados }.inspect}"
90         end
91       end
92     end
93     @@keepdirs
94   end
95   self.keepdirs
96
97   def find_backfile(hash, opts)
98     subdir = hash[0..2]
99     @@keepdirs.each do |keepdir|
100       backfile = "#{keepdir[:root]}/#{subdir}/#{hash}"
101       if File.exists? backfile
102         data = nil
103         File.open("#{keepdir[:root]}/lock", "a+") do |f|
104           if f.flock File::LOCK_EX
105             data = File.read backfile
106           end
107         end
108         if data and (!opts[:verify_hash] or verify_hash data, hash)
109           return [backfile, data]
110         end
111       end
112     end
113     nil
114   end
115
116   get '/:locator' do |locator|
117     regs = locator.match /^([0-9a-f]{32,})/
118     if regs
119       hash = regs[1]
120       backfile, data = find_backfile hash, :verify_hash => false
121       if data
122         content_type :binary
123         body data
124       else
125         status 404
126         body 'not found'
127       end
128     else
129       pass
130     end
131     self.class.ping_arvados
132   end
133
134   put '/:locator' do |locator|
135     data = request.body.read
136     hash = verify_hash(data, locator)
137     if not hash
138       status 422
139       body "Checksum mismatch"
140       return
141     end
142     backfile, havedata = find_backfile hash, :verify_hash => true
143     if havedata
144       status 200
145       body 'OK'
146     else
147       wrote = nil
148       subdir = hash[0..2]
149       @@keepdirs.each do |keepdir|
150         next if keepdir[:readonly]
151         backdir = "#{keepdir[:root]}/#{subdir}"
152         if !File.exists? backdir
153           begin
154             Dir.mkdir backdir
155           rescue
156           end
157         end
158         backfile = "#{keepdir[:root]}/#{subdir}/#{hash}"
159         File.open("#{keepdir[:root]}/lock", "a+") do |lf|
160           if lf.flock File::LOCK_EX
161             File.open(backfile + ".tmp", "a+") do |wf|
162               if wf.flock File::LOCK_EX
163                 wf.seek 0, File::SEEK_SET
164                 wf.truncate 0
165                 wrote = wf.write data
166               end
167               if wrote == data.length
168                 File.rename backfile+".tmp", backfile
169                 break
170               else
171                 File.unlink backfile+".tmp"
172               end
173             end
174           end
175         end
176       end
177       if wrote == data.length
178         status 200
179         body 'OK'
180       else
181         status 500
182         body 'Fail'
183       end
184     end
185     self.class.ping_arvados
186   end
187
188   protected
189
190   def self.ping_arvados
191     return if defined? @@last_ping_at and @@last_ping_at > Time.now - 300
192     @@last_ping_at = Time.now
193     begin
194       @@arvados ||= Arvados.new(api_token: '')
195       @@keepdirs.each do |kd|
196         ack = @@arvados.keep_disk.ping(uuid: kd[:arvados][:uuid],
197                                        service_port: settings.port,
198                                        service_ssl_flag: Keep.ssl_flag,
199                                        ping_secret: kd[:arvados][:ping_secret],
200                                        is_readable: true,
201                                        is_writable: !kd[:readonly],
202                                        filesystem_uuid: kd[:filesystem_uuid])
203         if ack and ack[:last_ping_at]
204           debuglog 0, "device #{kd[:device]} uuid #{ack[:uuid]} last_ping_at #{ack[:last_ping_at]}"
205           if kd[:arvados].empty?
206             File.open(kd[:arvados_file]+'.tmp', 'a+', 0600) do end
207             File.open(kd[:arvados_file]+'.tmp', 'r+', 0600) do |f|
208               if f.flock File::LOCK_EX
209                 f.seek 0, File::SEEK_SET
210                 f.truncate 0
211                 f.write ack.to_json
212                 File.rename kd[:arvados_file]+'.tmp', kd[:arvados_file]
213                 kd[:arvados] = ack
214               end
215             end
216           end
217         else
218           debuglog 0, "device #{kd[:device]} ping fail"
219         end
220       end
221     rescue Exception => e
222       debuglog 0, "ping_arvados: #{e.inspect}"
223     end
224   end
225   self.ping_arvados
226
227   if app_file == $0
228     run! do |server|
229       if ENV['SSL_CERT'] and ENV['SSL_KEY']
230         ssl_options = {
231           :cert_chain_file => ENV['SSL_CERT'],
232           :private_key_file => ENV['SSL_KEY'],
233           :verify_peer => false
234         }
235         @@ssl_flag = true
236         server.ssl = true
237         server.ssl_options = ssl_options
238       end
239     end
240   end
241 end