import argparse
import cgi
+import csv
+import json
import logging
import math
import pprint
return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
byteunits[exponent])
+def percentageFloor(x):
+ """ Returns a float which is the input rounded down to the neared 0.01.
+
+e.g. precentageFloor(0.941354) = 0.94
+"""
+ return math.floor(x*100) / 100.0
+
+
def byteSizeFromValidUuid(valid_uuid):
return int(valid_uuid.split('+')[1])
if not CollectionInfo.all_by_uuid.has_key(uuid):
CollectionInfo(uuid)
return CollectionInfo.all_by_uuid[uuid]
-
+
def extractUuid(candidate):
""" Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
# the object_type field since we don't know which we have.
body['object_uuid'] = user
body['event_type'] = args.user_storage_log_event_type
- info = {}
- info['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
- info['read_collections_weighted_bytes'] = usage[WEIGHTED_READ_SIZE_COL]
- info['persisted_collections_total_bytes'] = (
+ properties = {}
+ properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
+ properties['read_collections_weighted_bytes'] = (
+ usage[WEIGHTED_READ_SIZE_COL])
+ properties['persisted_collections_total_bytes'] = (
usage[UNWEIGHTED_PERSIST_SIZE_COL])
- info['persisted_collections_weighted_bytes'] = (
+ properties['persisted_collections_weighted_bytes'] = (
usage[WEIGHTED_PERSIST_SIZE_COL])
- body['properties'] = info
+ body['properties'] = properties
# TODO(misha): Confirm that this will throw an exception if it
# fails to create the log entry.
arv.logs().create(body=body).execute()
blocks = []
for host,port in keep_servers:
response = urllib2.urlopen('http://%s:%d/index' % (host, port))
- blocks.append([line.split(' ')
- for line in response.read().split('\n')
- if line])
+ server_blocks = [line.split(' ')
+ for line in response.read().split('\n')
+ if line]
+ server_blocks = [(block_id, int(mtime))
+ for block_id, mtime in server_blocks]
+ blocks.append(server_blocks)
return blocks
+def getKeepStats(keep_servers):
+ MOUNT_COLUMN = 5
+ TOTAL_COLUMN = 1
+ FREE_COLUMN = 3
+ DISK_BLOCK_SIZE = 1024
+ stats = []
+ for host,port in keep_servers:
+ response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
+
+ parsed_json = json.load(response)
+ df_entries = [line.split()
+ for line in parsed_json['df'].split('\n')
+ if line]
+ keep_volumes = [columns
+ for columns in df_entries
+ if 'keep' in columns[MOUNT_COLUMN]]
+ total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
+ keep_volumes)))
+ free_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
+ keep_volumes)))
+ stats.append([total_space, free_space])
+ return stats
+
def computeReplication(keep_blocks):
for server_blocks in keep_blocks:
log.debug('Seeing the following replication levels among blocks: %s',
str(set(block_to_replication.values())))
+
+def computeGarbageCollectionCandidates():
+ for server_blocks in keep_blocks:
+ block_to_latest_mtime.addValues(server_blocks)
+ empty_set = set()
+ garbage_collection_priority = sorted(
+ [(block,mtime)
+ for block,mtime in block_to_latest_mtime.items()
+ if len(block_to_persisters.get(block,empty_set)) == 0],
+ key = itemgetter(1))
+ global garbage_collection_report
+ garbage_collection_report = []
+ cumulative_disk_size = 0
+ for block,mtime in garbage_collection_priority:
+ disk_size = blockDiskUsage(block)
+ cumulative_disk_size += disk_size
+ garbage_collection_report.append(
+ (block,
+ mtime,
+ disk_size,
+ cumulative_disk_size,
+ float(free_keep_space + cumulative_disk_size)/total_keep_space))
+
+ print 'The oldest Garbage Collection Candidates: '
+ pprint.pprint(garbage_collection_report[:20])
+
+
+def outputGarbageCollectionReport(filename):
+ with open(filename, 'wb') as csvfile:
+ gcwriter = csv.writer(csvfile)
+ gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
+ 'cumulative size', 'disk free'])
+ for line in garbage_collection_report:
+ gcwriter.writerow(line)
+
+def computeGarbageCollectionHistogram():
+ # TODO(misha): Modify this to allow users to specify the number of
+ # histogram buckets through a flag.
+ histogram = []
+ last_percentage = -1
+ for _,mtime,_,_,disk_free in garbage_collection_report:
+ curr_percentage = percentageFloor(disk_free)
+ if curr_percentage > last_percentage:
+ histogram.append( (mtime, curr_percentage) )
+ last_percentage = curr_percentage
+
+ log.info('Garbage collection histogram is: %s', histogram)
+
+ return histogram
+
+
+def logGarbageCollectionHistogram():
+ body = {}
+ # TODO(misha): Decide whether we should specify an object_uuid in
+ # the body and if so, which uuid to use.
+ body['event_type'] = args.block_age_free_space_histogram_log_event_type
+ properties = {}
+ properties['histogram'] = garbage_collection_histogram
+ body['properties'] = properties
+ # TODO(misha): Confirm that this will throw an exception if it
+ # fails to create the log entry.
+ arv.logs().create(body=body).execute()
+
+
def detectReplicationProblems():
blocks_not_in_any_collections.update(
set(block_to_replication.keys()).difference(block_to_collections.keys()))
default='user-storage-report',
help=('The event type to set when logging user '
'storage usage to workbench.'))
+parser.add_argument('--block-age-free-space-histogram-log-event-type',
+ default='block-age-free-space-histogram',
+ help=('The event type to set when logging user '
+ 'storage usage to workbench.'))
+parser.add_argument('--garbage-collection-file',
+ default='',
+ help=('The file to write a garbage collection report, or '
+ 'leave empty for no report.'))
args = None
keep_servers = []
keep_blocks = []
+keep_stats = []
+total_keep_space = 0
+free_keep_space = 0
+
block_to_replication = defaultdict(lambda: 0)
+block_to_latest_mtime = maxdict()
+
+garbage_collection_report = []
+"""A list of non-persisted blocks, sorted by increasing mtime
+
+Each entry is of the form (block uuid, latest mtime, disk size,
+cumulative size)
+
+* block uuid: The id of the block we want to delete
+* latest mtime: The latest mtime of the block across all keep servers.
+* disk size: The total disk space used by this block (block size
+multiplied by current replication level)
+* cumulative disk size: The sum of this block's disk size and all the
+blocks listed above it
+* disk free: The proportion of our disk space that would be free if we
+deleted this block and all the above. So this is (free disk space +
+cumulative disk size) / total disk capacity
+"""
+
+garbage_collection_histogram = []
+""" Shows the tradeoff of keep block age vs keep disk free space.
+
+Each entry is of the form (mtime, Disk Proportion).
+
+An entry of the form (1388747781, 0.52) means that if we deleted the
+oldest non-presisted blocks until we had 52% of the disk free, then
+all blocks with an mtime greater than 1388747781 would be preserved.
+"""
# Stuff to report on
blocks_not_in_any_collections = set()
global keep_blocks
keep_blocks = getKeepBlocks(keep_servers)
+ log.info('Getting Stats from each Keep Server.')
+ global keep_stats, total_keep_space, free_keep_space
+ keep_stats = getKeepStats(keep_servers)
+
+ total_keep_space = sum(map(itemgetter(0), keep_stats))
+ free_keep_space = sum(map(itemgetter(1), keep_stats))
+
+ # TODO(misha): Delete this hack when the keep servers are fixed!
+ # This hack deals with the fact that keep servers report each other's disks.
+ total_keep_space /= len(keep_stats)
+ free_keep_space /= len(keep_stats)
+
+ log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
+ (fileSizeFormat(total_keep_space),
+ fileSizeFormat(free_keep_space),
+ 100*free_keep_space/total_keep_space))
+
computeReplication(keep_blocks)
log.info('average replication level is %f',
(float(sum(block_to_replication.values())) /
len(block_to_replication)))
+ computeGarbageCollectionCandidates()
+
+ if args.garbage_collection_file:
+ log.info('Writing garbage Collection report to %s',
+ args.garbage_collection_file)
+ outputGarbageCollectionReport(args.garbage_collection_file)
+
+ global garbage_collection_histogram
+ garbage_collection_histogram = computeGarbageCollectionHistogram()
+
+ if args.log_to_workbench:
+ logGarbageCollectionHistogram()
+
detectReplicationProblems()
computeUserStorageUsage()
def writeTop(self, title):
self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
-
+
def writeBottom(self):
self.wfile.write('</BODY></HTML>\n')
-
+
def writeHomePage(self):
self.send_response(200)
self.end_headers()
blocks = replication_to_blocks[replication_level]
self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
self.wfile.write('</TR></TABLE>\n')
-
+
def do_GET(self):
if not all_data_loaded: