X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/de9903cfc08ea7c3da459e7c4ee5a744d52a7c89..fdc9a9308c646d23ec50073833f141ceebf78613:/services/datamanager/experimental/datamanager.py diff --git a/services/datamanager/experimental/datamanager.py b/services/datamanager/experimental/datamanager.py index 4bd2a264fe..8207bdcd5c 100755 --- a/services/datamanager/experimental/datamanager.py +++ b/services/datamanager/experimental/datamanager.py @@ -4,6 +4,8 @@ import arvados import argparse import cgi +import csv +import json import logging import math import pprint @@ -26,6 +28,14 @@ def fileSizeFormat(value): return "%7.2f %-3s" % (float(value) / pow(1024, exponent), byteunits[exponent]) +def percentageFloor(x): + """ Returns a float which is the input rounded down to the neared 0.01. + +e.g. precentageFloor(0.941354) = 0.94 +""" + return math.floor(x*100) / 100.0 + + def byteSizeFromValidUuid(valid_uuid): return int(valid_uuid.split('+')[1]) @@ -325,14 +335,15 @@ def logUserStorageUsage(): # the object_type field since we don't know which we have. body['object_uuid'] = user body['event_type'] = args.user_storage_log_event_type - info = {} - info['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL] - info['read_collections_weighted_bytes'] = usage[WEIGHTED_READ_SIZE_COL] - info['persisted_collections_total_bytes'] = ( + properties = {} + properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL] + properties['read_collections_weighted_bytes'] = ( + usage[WEIGHTED_READ_SIZE_COL]) + properties['persisted_collections_total_bytes'] = ( usage[UNWEIGHTED_PERSIST_SIZE_COL]) - info['persisted_collections_weighted_bytes'] = ( + properties['persisted_collections_weighted_bytes'] = ( usage[WEIGHTED_PERSIST_SIZE_COL]) - body['properties'] = info + body['properties'] = properties # TODO(misha): Confirm that this will throw an exception if it # fails to create the log entry. arv.logs().create(body=body).execute() @@ -347,11 +358,37 @@ def getKeepBlocks(keep_servers): blocks = [] for host,port in keep_servers: response = urllib2.urlopen('http://%s:%d/index' % (host, port)) - blocks.append([line.split(' ') - for line in response.read().split('\n') - if line]) + server_blocks = [line.split(' ') + for line in response.read().split('\n') + if line] + server_blocks = [(block_id, int(mtime)) + for block_id, mtime in server_blocks] + blocks.append(server_blocks) return blocks +def getKeepStats(keep_servers): + MOUNT_COLUMN = 5 + TOTAL_COLUMN = 1 + FREE_COLUMN = 3 + DISK_BLOCK_SIZE = 1024 + stats = [] + for host,port in keep_servers: + response = urllib2.urlopen('http://%s:%d/status.json' % (host, port)) + + parsed_json = json.load(response) + df_entries = [line.split() + for line in parsed_json['df'].split('\n') + if line] + keep_volumes = [columns + for columns in df_entries + if 'keep' in columns[MOUNT_COLUMN]] + total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN), + keep_volumes))) + free_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN), + keep_volumes))) + stats.append([total_space, free_space]) + return stats + def computeReplication(keep_blocks): for server_blocks in keep_blocks: @@ -376,15 +413,54 @@ def computeGarbageCollectionCandidates(): for block,mtime in garbage_collection_priority: disk_size = blockDiskUsage(block) cumulative_disk_size += disk_size - garbage_collection_report.append((block, - mtime, - disk_size, - cumulative_disk_size)) + garbage_collection_report.append( + (block, + mtime, + disk_size, + cumulative_disk_size, + float(free_keep_space + cumulative_disk_size)/total_keep_space)) print 'The oldest Garbage Collection Candidates: ' pprint.pprint(garbage_collection_report[:20]) +def outputGarbageCollectionReport(filename): + with open(filename, 'wb') as csvfile: + gcwriter = csv.writer(csvfile) + gcwriter.writerow(['block uuid', 'latest mtime', 'disk size', + 'cumulative size', 'disk free']) + for line in garbage_collection_report: + gcwriter.writerow(line) + +def computeGarbageCollectionHistogram(): + # TODO(misha): Modify this to allow users to specify the number of + # histogram buckets through a flag. + histogram = [] + last_percentage = -1 + for _,mtime,_,_,disk_free in garbage_collection_report: + curr_percentage = percentageFloor(disk_free) + if curr_percentage > last_percentage: + histogram.append( (mtime, curr_percentage) ) + last_percentage = curr_percentage + + log.info('Garbage collection histogram is: %s', histogram) + + return histogram + + +def logGarbageCollectionHistogram(): + body = {} + # TODO(misha): Decide whether we should specify an object_uuid in + # the body and if so, which uuid to use. + body['event_type'] = args.block_age_free_space_histogram_log_event_type + properties = {} + properties['histogram'] = garbage_collection_histogram + body['properties'] = properties + # TODO(misha): Confirm that this will throw an exception if it + # fails to create the log entry. + arv.logs().create(body=body).execute() + + def detectReplicationProblems(): blocks_not_in_any_collections.update( set(block_to_replication.keys()).difference(block_to_collections.keys())) @@ -465,6 +541,14 @@ parser.add_argument('--user-storage-log-event-type', default='user-storage-report', help=('The event type to set when logging user ' 'storage usage to workbench.')) +parser.add_argument('--block-age-free-space-histogram-log-event-type', + default='block-age-free-space-histogram', + help=('The event type to set when logging user ' + 'storage usage to workbench.')) +parser.add_argument('--garbage-collection-file', + default='', + help=('The file to write a garbage collection report, or ' + 'leave empty for no report.')) args = None @@ -498,6 +582,10 @@ user_to_usage = defaultdict(lambda : [0,]*NUM_COLS) keep_servers = [] keep_blocks = [] +keep_stats = [] +total_keep_space = 0 +free_keep_space = 0 + block_to_replication = defaultdict(lambda: 0) block_to_latest_mtime = maxdict() @@ -513,9 +601,19 @@ cumulative size) multiplied by current replication level) * cumulative disk size: The sum of this block's disk size and all the blocks listed above it -* TODO: disk free: The proportion of our disk space that would be free -if we deleted this block and all the above. So this is (current disk -space used - cumulative disk size) / total disk capacity +* disk free: The proportion of our disk space that would be free if we +deleted this block and all the above. So this is (free disk space + +cumulative disk size) / total disk capacity +""" + +garbage_collection_histogram = [] +""" Shows the tradeoff of keep block age vs keep disk free space. + +Each entry is of the form (mtime, Disk Proportion). + +An entry of the form (1388747781, 0.52) means that if we deleted the +oldest non-presisted blocks until we had 52% of the disk free, then +all blocks with an mtime greater than 1388747781 would be preserved. """ # Stuff to report on @@ -559,14 +657,42 @@ def loadAllData(): global keep_blocks keep_blocks = getKeepBlocks(keep_servers) - computeReplication(keep_blocks) + log.info('Getting Stats from each Keep Server.') + global keep_stats, total_keep_space, free_keep_space + keep_stats = getKeepStats(keep_servers) - computeGarbageCollectionCandidates() + total_keep_space = sum(map(itemgetter(0), keep_stats)) + free_keep_space = sum(map(itemgetter(1), keep_stats)) + + # TODO(misha): Delete this hack when the keep servers are fixed! + # This hack deals with the fact that keep servers report each other's disks. + total_keep_space /= len(keep_stats) + free_keep_space /= len(keep_stats) + + log.info('Total disk space: %s, Free disk space: %s (%d%%).' % + (fileSizeFormat(total_keep_space), + fileSizeFormat(free_keep_space), + 100*free_keep_space/total_keep_space)) + + computeReplication(keep_blocks) log.info('average replication level is %f', (float(sum(block_to_replication.values())) / len(block_to_replication))) + computeGarbageCollectionCandidates() + + if args.garbage_collection_file: + log.info('Writing garbage Collection report to %s', + args.garbage_collection_file) + outputGarbageCollectionReport(args.garbage_collection_file) + + global garbage_collection_histogram + garbage_collection_histogram = computeGarbageCollectionHistogram() + + if args.log_to_workbench: + logGarbageCollectionHistogram() + detectReplicationProblems() computeUserStorageUsage()