X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d034adbaf26a19e8fd48124cadd5108d2d3de642..fa615bfa6748a0b552c4e3a33be1921f0760664e:/services/datamanager/experimental/datamanager.py diff --git a/services/datamanager/experimental/datamanager.py b/services/datamanager/experimental/datamanager.py index 08e688e8ac..8207bdcd5c 100755 --- a/services/datamanager/experimental/datamanager.py +++ b/services/datamanager/experimental/datamanager.py @@ -4,6 +4,8 @@ import arvados import argparse import cgi +import csv +import json import logging import math import pprint @@ -26,6 +28,14 @@ def fileSizeFormat(value): return "%7.2f %-3s" % (float(value) / pow(1024, exponent), byteunits[exponent]) +def percentageFloor(x): + """ Returns a float which is the input rounded down to the neared 0.01. + +e.g. precentageFloor(0.941354) = 0.94 +""" + return math.floor(x*100) / 100.0 + + def byteSizeFromValidUuid(valid_uuid): return int(valid_uuid.split('+')[1]) @@ -76,7 +86,7 @@ class CollectionInfo: if not CollectionInfo.all_by_uuid.has_key(uuid): CollectionInfo(uuid) return CollectionInfo.all_by_uuid[uuid] - + def extractUuid(candidate): """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid.""" @@ -325,14 +335,15 @@ def logUserStorageUsage(): # the object_type field since we don't know which we have. body['object_uuid'] = user body['event_type'] = args.user_storage_log_event_type - info = {} - info['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL] - info['read_collections_weighted_bytes'] = usage[WEIGHTED_READ_SIZE_COL] - info['persisted_collections_total_bytes'] = ( + properties = {} + properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL] + properties['read_collections_weighted_bytes'] = ( + usage[WEIGHTED_READ_SIZE_COL]) + properties['persisted_collections_total_bytes'] = ( usage[UNWEIGHTED_PERSIST_SIZE_COL]) - info['persisted_collections_weighted_bytes'] = ( + properties['persisted_collections_weighted_bytes'] = ( usage[WEIGHTED_PERSIST_SIZE_COL]) - body['properties'] = info + body['properties'] = properties # TODO(misha): Confirm that this will throw an exception if it # fails to create the log entry. arv.logs().create(body=body).execute() @@ -347,11 +358,37 @@ def getKeepBlocks(keep_servers): blocks = [] for host,port in keep_servers: response = urllib2.urlopen('http://%s:%d/index' % (host, port)) - blocks.append([line.split(' ') - for line in response.read().split('\n') - if line]) + server_blocks = [line.split(' ') + for line in response.read().split('\n') + if line] + server_blocks = [(block_id, int(mtime)) + for block_id, mtime in server_blocks] + blocks.append(server_blocks) return blocks +def getKeepStats(keep_servers): + MOUNT_COLUMN = 5 + TOTAL_COLUMN = 1 + FREE_COLUMN = 3 + DISK_BLOCK_SIZE = 1024 + stats = [] + for host,port in keep_servers: + response = urllib2.urlopen('http://%s:%d/status.json' % (host, port)) + + parsed_json = json.load(response) + df_entries = [line.split() + for line in parsed_json['df'].split('\n') + if line] + keep_volumes = [columns + for columns in df_entries + if 'keep' in columns[MOUNT_COLUMN]] + total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN), + keep_volumes))) + free_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN), + keep_volumes))) + stats.append([total_space, free_space]) + return stats + def computeReplication(keep_blocks): for server_blocks in keep_blocks: @@ -360,6 +397,70 @@ def computeReplication(keep_blocks): log.debug('Seeing the following replication levels among blocks: %s', str(set(block_to_replication.values()))) + +def computeGarbageCollectionCandidates(): + for server_blocks in keep_blocks: + block_to_latest_mtime.addValues(server_blocks) + empty_set = set() + garbage_collection_priority = sorted( + [(block,mtime) + for block,mtime in block_to_latest_mtime.items() + if len(block_to_persisters.get(block,empty_set)) == 0], + key = itemgetter(1)) + global garbage_collection_report + garbage_collection_report = [] + cumulative_disk_size = 0 + for block,mtime in garbage_collection_priority: + disk_size = blockDiskUsage(block) + cumulative_disk_size += disk_size + garbage_collection_report.append( + (block, + mtime, + disk_size, + cumulative_disk_size, + float(free_keep_space + cumulative_disk_size)/total_keep_space)) + + print 'The oldest Garbage Collection Candidates: ' + pprint.pprint(garbage_collection_report[:20]) + + +def outputGarbageCollectionReport(filename): + with open(filename, 'wb') as csvfile: + gcwriter = csv.writer(csvfile) + gcwriter.writerow(['block uuid', 'latest mtime', 'disk size', + 'cumulative size', 'disk free']) + for line in garbage_collection_report: + gcwriter.writerow(line) + +def computeGarbageCollectionHistogram(): + # TODO(misha): Modify this to allow users to specify the number of + # histogram buckets through a flag. + histogram = [] + last_percentage = -1 + for _,mtime,_,_,disk_free in garbage_collection_report: + curr_percentage = percentageFloor(disk_free) + if curr_percentage > last_percentage: + histogram.append( (mtime, curr_percentage) ) + last_percentage = curr_percentage + + log.info('Garbage collection histogram is: %s', histogram) + + return histogram + + +def logGarbageCollectionHistogram(): + body = {} + # TODO(misha): Decide whether we should specify an object_uuid in + # the body and if so, which uuid to use. + body['event_type'] = args.block_age_free_space_histogram_log_event_type + properties = {} + properties['histogram'] = garbage_collection_histogram + body['properties'] = properties + # TODO(misha): Confirm that this will throw an exception if it + # fails to create the log entry. + arv.logs().create(body=body).execute() + + def detectReplicationProblems(): blocks_not_in_any_collections.update( set(block_to_replication.keys()).difference(block_to_collections.keys())) @@ -440,6 +541,14 @@ parser.add_argument('--user-storage-log-event-type', default='user-storage-report', help=('The event type to set when logging user ' 'storage usage to workbench.')) +parser.add_argument('--block-age-free-space-histogram-log-event-type', + default='block-age-free-space-histogram', + help=('The event type to set when logging user ' + 'storage usage to workbench.')) +parser.add_argument('--garbage-collection-file', + default='', + help=('The file to write a garbage collection report, or ' + 'leave empty for no report.')) args = None @@ -473,7 +582,39 @@ user_to_usage = defaultdict(lambda : [0,]*NUM_COLS) keep_servers = [] keep_blocks = [] +keep_stats = [] +total_keep_space = 0 +free_keep_space = 0 + block_to_replication = defaultdict(lambda: 0) +block_to_latest_mtime = maxdict() + +garbage_collection_report = [] +"""A list of non-persisted blocks, sorted by increasing mtime + +Each entry is of the form (block uuid, latest mtime, disk size, +cumulative size) + +* block uuid: The id of the block we want to delete +* latest mtime: The latest mtime of the block across all keep servers. +* disk size: The total disk space used by this block (block size +multiplied by current replication level) +* cumulative disk size: The sum of this block's disk size and all the +blocks listed above it +* disk free: The proportion of our disk space that would be free if we +deleted this block and all the above. So this is (free disk space + +cumulative disk size) / total disk capacity +""" + +garbage_collection_histogram = [] +""" Shows the tradeoff of keep block age vs keep disk free space. + +Each entry is of the form (mtime, Disk Proportion). + +An entry of the form (1388747781, 0.52) means that if we deleted the +oldest non-presisted blocks until we had 52% of the disk free, then +all blocks with an mtime greater than 1388747781 would be preserved. +""" # Stuff to report on blocks_not_in_any_collections = set() @@ -516,12 +657,42 @@ def loadAllData(): global keep_blocks keep_blocks = getKeepBlocks(keep_servers) + log.info('Getting Stats from each Keep Server.') + global keep_stats, total_keep_space, free_keep_space + keep_stats = getKeepStats(keep_servers) + + total_keep_space = sum(map(itemgetter(0), keep_stats)) + free_keep_space = sum(map(itemgetter(1), keep_stats)) + + # TODO(misha): Delete this hack when the keep servers are fixed! + # This hack deals with the fact that keep servers report each other's disks. + total_keep_space /= len(keep_stats) + free_keep_space /= len(keep_stats) + + log.info('Total disk space: %s, Free disk space: %s (%d%%).' % + (fileSizeFormat(total_keep_space), + fileSizeFormat(free_keep_space), + 100*free_keep_space/total_keep_space)) + computeReplication(keep_blocks) log.info('average replication level is %f', (float(sum(block_to_replication.values())) / len(block_to_replication))) + computeGarbageCollectionCandidates() + + if args.garbage_collection_file: + log.info('Writing garbage Collection report to %s', + args.garbage_collection_file) + outputGarbageCollectionReport(args.garbage_collection_file) + + global garbage_collection_histogram + garbage_collection_histogram = computeGarbageCollectionHistogram() + + if args.log_to_workbench: + logGarbageCollectionHistogram() + detectReplicationProblems() computeUserStorageUsage() @@ -555,10 +726,10 @@ class DataManagerHandler(BaseHTTPRequestHandler): def writeTop(self, title): self.wfile.write('%s\n' % title) - + def writeBottom(self): self.wfile.write('\n') - + def writeHomePage(self): self.send_response(200) self.end_headers() @@ -676,7 +847,7 @@ class DataManagerHandler(BaseHTTPRequestHandler): blocks = replication_to_blocks[replication_level] self.wfile.write('%s\n' % '
\n'.join(blocks)) self.wfile.write('\n') - + def do_GET(self): if not all_data_loaded: