#! /usr/bin/env python import arvados import argparse import cgi import csv import json import logging import math import pprint import re import threading import urllib2 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from collections import defaultdict, Counter from functools import partial from operator import itemgetter from SocketServer import ThreadingMixIn arv = arvados.api('v1') # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB') def fileSizeFormat(value): exponent = 0 if value == 0 else int(math.log(value, 1024)) return "%7.2f %-3s" % (float(value) / pow(1024, exponent), byteunits[exponent]) def percentageFloor(x): """ Returns a float which is the input rounded down to the neared 0.01. e.g. precentageFloor(0.941354) = 0.94 """ return math.floor(x*100) / 100.0 def byteSizeFromValidUuid(valid_uuid): return int(valid_uuid.split('+')[1]) class maxdict(dict): """A dictionary that holds the largest value entered for each key.""" def addValue(self, key, value): dict.__setitem__(self, key, max(dict.get(self, key), value)) def addValues(self, kv_pairs): for key,value in kv_pairs: self.addValue(key, value) def addDict(self, d): self.addValues(d.items()) class CollectionInfo: DEFAULT_PERSISTER_REPLICATION_LEVEL=2 all_by_uuid = {} def __init__(self, uuid): if CollectionInfo.all_by_uuid.has_key(uuid): raise ValueError('Collection for uuid "%s" already exists.' % uuid) self.uuid = uuid self.block_uuids = set() # uuids of keep blocks in this collection self.reader_uuids = set() # uuids of users who can read this collection self.persister_uuids = set() # uuids of users who want this collection saved # map from user uuid to replication level they desire self.persister_replication = maxdict() # The whole api response in case we need anything else later. self.api_response = [] CollectionInfo.all_by_uuid[uuid] = self def byteSize(self): return sum(map(byteSizeFromValidUuid, self.block_uuids)) def __str__(self): return ('CollectionInfo uuid: %s\n' ' %d block(s) containing %s\n' ' reader_uuids: %s\n' ' persister_replication: %s' % (self.uuid, len(self.block_uuids), fileSizeFormat(self.byteSize()), pprint.pformat(self.reader_uuids, indent = 15), pprint.pformat(self.persister_replication, indent = 15))) @staticmethod def get(uuid): if not CollectionInfo.all_by_uuid.has_key(uuid): CollectionInfo(uuid) return CollectionInfo.all_by_uuid[uuid] def extractUuid(candidate): """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid.""" match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate) return match and match.group(1) def checkUserIsAdmin(): current_user = arv.users().current().execute() if not current_user['is_admin']: log.warning('Current user %s (%s - %s) does not have ' 'admin access and will not see much of the data.', current_user['full_name'], current_user['email'], current_user['uuid']) if args.require_admin_user: log.critical('Exiting, rerun with --no-require-admin-user ' 'if you wish to continue.') exit(1) def buildCollectionsList(): if args.uuid: return [args.uuid,] else: collections_list_response = arv.collections().list(limit=args.max_api_results).execute() print ('Returned %d of %d collections.' % (len(collections_list_response['items']), collections_list_response['items_available'])) return [item['uuid'] for item in collections_list_response['items']] def readCollections(collection_uuids): for collection_uuid in collection_uuids: collection_block_uuids = set() collection_response = arv.collections().get(uuid=collection_uuid).execute() collection_info = CollectionInfo.get(collection_uuid) collection_info.api_response = collection_response manifest_lines = collection_response['manifest_text'].split('\n') if args.verbose: print 'Manifest text for %s:' % collection_uuid pprint.pprint(manifest_lines) for manifest_line in manifest_lines: if manifest_line: manifest_tokens = manifest_line.split(' ') if args.verbose: print 'manifest tokens: ' + pprint.pformat(manifest_tokens) stream_name = manifest_tokens[0] line_block_uuids = set(filter(None, [extractUuid(candidate) for candidate in manifest_tokens[1:]])) collection_info.block_uuids.update(line_block_uuids) # file_tokens = [token # for token in manifest_tokens[1:] # if extractUuid(token) is None] # # Sort file tokens by start position in case they aren't already # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0])) # if args.verbose: # print 'line_block_uuids: ' + pprint.pformat(line_block_uuids) # print 'file_tokens: ' + pprint.pformat(file_tokens) def readLinks(): link_classes = set() for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items(): # TODO(misha): We may not be seing all the links, but since items # available does not return an accurate number, I don't knos how # to confirm that we saw all of them. collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute() link_classes.update([link['link_class'] for link in collection_links_response['items']]) for link in collection_links_response['items']: if link['link_class'] == 'permission': collection_info.reader_uuids.add(link['tail_uuid']) elif link['link_class'] == 'resources': replication_level = link['properties'].get( 'replication', CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL) collection_info.persister_replication.addValue( link['tail_uuid'], replication_level) collection_info.persister_uuids.add(link['tail_uuid']) print 'Found the following link classes:' pprint.pprint(link_classes) def reportMostPopularCollections(): most_popular_collections = sorted( CollectionInfo.all_by_uuid.values(), key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication), reverse=True)[:10] print 'Most popular Collections:' for collection_info in most_popular_collections: print collection_info def buildMaps(): for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items(): # Add the block holding the manifest itself for all calculations block_uuids = collection_info.block_uuids.union([collection_uuid,]) for block_uuid in block_uuids: block_to_collections[block_uuid].add(collection_uuid) block_to_readers[block_uuid].update(collection_info.reader_uuids) block_to_persisters[block_uuid].update(collection_info.persister_uuids) block_to_persister_replication[block_uuid].addDict( collection_info.persister_replication) for reader_uuid in collection_info.reader_uuids: reader_to_collections[reader_uuid].add(collection_uuid) reader_to_blocks[reader_uuid].update(block_uuids) for persister_uuid in collection_info.persister_uuids: persister_to_collections[persister_uuid].add(collection_uuid) persister_to_blocks[persister_uuid].update(block_uuids) def itemsByValueLength(original): return sorted(original.items(), key=lambda item:len(item[1]), reverse=True) def reportBusiestUsers(): busiest_readers = itemsByValueLength(reader_to_collections) print 'The busiest readers are:' for reader,collections in busiest_readers: print '%s reading %d collections.' % (reader, len(collections)) busiest_persisters = itemsByValueLength(persister_to_collections) print 'The busiest persisters are:' for persister,collections in busiest_persisters: print '%s reading %d collections.' % (persister, len(collections)) def blockDiskUsage(block_uuid): """Returns the disk usage of a block given its uuid. Will return 0 before reading the contents of the keep servers. """ return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid] def blockPersistedUsage(user_uuid, block_uuid): return (byteSizeFromValidUuid(block_uuid) * block_to_persister_replication[block_uuid].get(user_uuid, 0)) memo_computeWeightedReplicationCosts = {} def computeWeightedReplicationCosts(replication_levels): """Computes the relative cost of varied replication levels. replication_levels: a tuple of integers representing the desired replication level. If n users want a replication level of x then x should appear n times in replication_levels. Returns a dictionary from replication level to cost. The basic thinking is that the cost of replicating at level x should be shared by everyone who wants replication of level x or higher. For example, if we have two users who want 1 copy, one user who wants 3 copies and two users who want 6 copies: the input would be [1, 1, 3, 6, 6] (or any permutation) The cost of the first copy is shared by all 5 users, so they each pay 1 copy / 5 users = 0.2. The cost of the second and third copies shared by 3 users, so they each pay 2 copies / 3 users = 0.67 (plus the above costs) The cost of the fourth, fifth and sixth copies is shared by two users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs) Here are some other examples: computeWeightedReplicationCosts([1,]) -> {1:1.0} computeWeightedReplicationCosts([2,]) -> {2:2.0} computeWeightedReplicationCosts([1,1]) -> {1:0.5} computeWeightedReplicationCosts([2,2]) -> {1:1.0} computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5} computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5} computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7} """ replication_level_counts = sorted(Counter(replication_levels).items()) memo_key = str(replication_level_counts) if not memo_key in memo_computeWeightedReplicationCosts: last_level = 0 current_cost = 0 total_interested = float(sum(map(itemgetter(1), replication_level_counts))) cost_for_level = {} for replication_level, count in replication_level_counts: copies_added = replication_level - last_level # compute marginal cost from last level and add it to the last cost current_cost += copies_added / total_interested cost_for_level[replication_level] = current_cost # update invariants last_level = replication_level total_interested -= count memo_computeWeightedReplicationCosts[memo_key] = cost_for_level return memo_computeWeightedReplicationCosts[memo_key] def blockPersistedWeightedUsage(user_uuid, block_uuid): persister_replication_for_block = block_to_persister_replication[block_uuid] user_replication = persister_replication_for_block[user_uuid] return ( byteSizeFromValidUuid(block_uuid) * computeWeightedReplicationCosts( persister_replication_for_block.values())[user_replication]) def computeUserStorageUsage(): for user, blocks in reader_to_blocks.items(): user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map( byteSizeFromValidUuid, blocks)) user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map( lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/ len(block_to_readers[block_uuid])), blocks)) for user, blocks in persister_to_blocks.items(): user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map( partial(blockPersistedUsage, user), blocks)) user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map( partial(blockPersistedWeightedUsage, user), blocks)) def printUserStorageUsage(): print ('user: unweighted readable block size, weighted readable block size, ' 'unweighted persisted block size, weighted persisted block size:') for user, usage in user_to_usage.items(): print ('%s: %s %s %s %s' % (user, fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]), fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]), fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]), fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL]))) def logUserStorageUsage(): for user, usage in user_to_usage.items(): body = {} # user could actually represent a user or a group. We don't set # the object_type field since we don't know which we have. body['object_uuid'] = user body['event_type'] = args.user_storage_log_event_type properties = {} properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL] properties['read_collections_weighted_bytes'] = ( usage[WEIGHTED_READ_SIZE_COL]) properties['persisted_collections_total_bytes'] = ( usage[UNWEIGHTED_PERSIST_SIZE_COL]) properties['persisted_collections_weighted_bytes'] = ( usage[WEIGHTED_PERSIST_SIZE_COL]) body['properties'] = properties # TODO(misha): Confirm that this will throw an exception if it # fails to create the log entry. arv.logs().create(body=body).execute() def getKeepServers(): response = arv.keep_disks().list().execute() return [[keep_server['service_host'], keep_server['service_port']] for keep_server in response['items']] def getKeepBlocks(keep_servers): blocks = [] for host,port in keep_servers: response = urllib2.urlopen('http://%s:%d/index' % (host, port)) server_blocks = [line.split(' ') for line in response.read().split('\n') if line] server_blocks = [(block_id, int(mtime)) for block_id, mtime in server_blocks] blocks.append(server_blocks) return blocks def getKeepStats(keep_servers): MOUNT_COLUMN = 5 TOTAL_COLUMN = 1 FREE_COLUMN = 3 DISK_BLOCK_SIZE = 1024 stats = [] for host,port in keep_servers: response = urllib2.urlopen('http://%s:%d/status.json' % (host, port)) parsed_json = json.load(response) df_entries = [line.split() for line in parsed_json['df'].split('\n') if line] keep_volumes = [columns for columns in df_entries if 'keep' in columns[MOUNT_COLUMN]] total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN), keep_volumes))) free_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN), keep_volumes))) stats.append([total_space, free_space]) return stats def computeReplication(keep_blocks): for server_blocks in keep_blocks: for block_uuid, _ in server_blocks: block_to_replication[block_uuid] += 1 log.debug('Seeing the following replication levels among blocks: %s', str(set(block_to_replication.values()))) def computeGarbageCollectionCandidates(): for server_blocks in keep_blocks: block_to_latest_mtime.addValues(server_blocks) empty_set = set() garbage_collection_priority = sorted( [(block,mtime) for block,mtime in block_to_latest_mtime.items() if len(block_to_persisters.get(block,empty_set)) == 0], key = itemgetter(1)) global garbage_collection_report garbage_collection_report = [] cumulative_disk_size = 0 for block,mtime in garbage_collection_priority: disk_size = blockDiskUsage(block) cumulative_disk_size += disk_size garbage_collection_report.append( (block, mtime, disk_size, cumulative_disk_size, float(free_keep_space + cumulative_disk_size)/total_keep_space)) print 'The oldest Garbage Collection Candidates: ' pprint.pprint(garbage_collection_report[:20]) def outputGarbageCollectionReport(filename): with open(filename, 'wb') as csvfile: gcwriter = csv.writer(csvfile) gcwriter.writerow(['block uuid', 'latest mtime', 'disk size', 'cumulative size', 'disk free']) for line in garbage_collection_report: gcwriter.writerow(line) def computeGarbageCollectionHistogram(): # TODO(misha): Modify this to allow users to specify the number of # histogram buckets through a flag. histogram = [] last_percentage = -1 for _,mtime,_,_,disk_free in garbage_collection_report: curr_percentage = percentageFloor(disk_free) if curr_percentage > last_percentage: histogram.append( (mtime, curr_percentage) ) last_percentage = curr_percentage log.info('Garbage collection histogram is: %s', histogram) return histogram def logGarbageCollectionHistogram(): body = {} # TODO(misha): Decide whether we should specify an object_uuid in # the body and if so, which uuid to use. body['event_type'] = args.block_age_free_space_histogram_log_event_type properties = {} properties['histogram'] = garbage_collection_histogram body['properties'] = properties # TODO(misha): Confirm that this will throw an exception if it # fails to create the log entry. arv.logs().create(body=body).execute() def detectReplicationProblems(): blocks_not_in_any_collections.update( set(block_to_replication.keys()).difference(block_to_collections.keys())) underreplicated_persisted_blocks.update( [uuid for uuid, persister_replication in block_to_persister_replication.items() if len(persister_replication) > 0 and block_to_replication[uuid] < max(persister_replication.values())]) overreplicated_persisted_blocks.update( [uuid for uuid, persister_replication in block_to_persister_replication.items() if len(persister_replication) > 0 and block_to_replication[uuid] > max(persister_replication.values())]) log.info('Found %d blocks not in any collections, e.g. %s...', len(blocks_not_in_any_collections), ','.join(list(blocks_not_in_any_collections)[:5])) log.info('Found %d underreplicated blocks, e.g. %s...', len(underreplicated_persisted_blocks), ','.join(list(underreplicated_persisted_blocks)[:5])) log.info('Found %d overreplicated blocks, e.g. %s...', len(overreplicated_persisted_blocks), ','.join(list(overreplicated_persisted_blocks)[:5])) # TODO: # Read blocks sorted by mtime # Cache window vs % free space # Collections which candidates will appear in # Youngest underreplicated read blocks that appear in collections. # Report Collections that have blocks which are missing from (or # underreplicated in) keep. # This is the main flow here parser = argparse.ArgumentParser(description='Report on keep disks.') """The command line argument parser we use. We only use it in the __main__ block, but leave it outside the block in case another package wants to use it or customize it by specifying it as a parent to their commandline parser. """ parser.add_argument('-m', '--max-api-results', type=int, default=5000, help=('The max results to get at once.')) parser.add_argument('-p', '--port', type=int, default=9090, help=('The port number to serve on. 0 means no server.')) parser.add_argument('-v', '--verbose', help='increase output verbosity', action='store_true') parser.add_argument('-u', '--uuid', help='uuid of specific collection to process') parser.add_argument('--require-admin-user', action='store_true', default=True, help='Fail if the user is not an admin [default]') parser.add_argument('--no-require-admin-user', dest='require_admin_user', action='store_false', help=('Allow users without admin permissions with ' 'only a warning.')) parser.add_argument('--log-to-workbench', action='store_true', default=False, help='Log findings to workbench') parser.add_argument('--no-log-to-workbench', dest='log_to_workbench', action='store_false', help='Don\'t log findings to workbench [default]') parser.add_argument('--user-storage-log-event-type', default='user-storage-report', help=('The event type to set when logging user ' 'storage usage to workbench.')) parser.add_argument('--block-age-free-space-histogram-log-event-type', default='block-age-free-space-histogram', help=('The event type to set when logging user ' 'storage usage to workbench.')) parser.add_argument('--garbage-collection-file', default='', help=('The file to write a garbage collection report, or ' 'leave empty for no report.')) args = None # TODO(misha): Think about moving some of this to the __main__ block. log = logging.getLogger('arvados.services.datamanager') stderr_handler = logging.StreamHandler() log.setLevel(logging.INFO) stderr_handler.setFormatter( logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s')) log.addHandler(stderr_handler) # Global Data - don't try this at home collection_uuids = [] # These maps all map from uuids to a set of uuids block_to_collections = defaultdict(set) # keep blocks reader_to_collections = defaultdict(set) # collection(s) for which the user has read access persister_to_collections = defaultdict(set) # collection(s) which the user has persisted block_to_readers = defaultdict(set) block_to_persisters = defaultdict(set) block_to_persister_replication = defaultdict(maxdict) reader_to_blocks = defaultdict(set) persister_to_blocks = defaultdict(set) UNWEIGHTED_READ_SIZE_COL = 0 WEIGHTED_READ_SIZE_COL = 1 UNWEIGHTED_PERSIST_SIZE_COL = 2 WEIGHTED_PERSIST_SIZE_COL = 3 NUM_COLS = 4 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS) keep_servers = [] keep_blocks = [] keep_stats = [] total_keep_space = 0 free_keep_space = 0 block_to_replication = defaultdict(lambda: 0) block_to_latest_mtime = maxdict() garbage_collection_report = [] """A list of non-persisted blocks, sorted by increasing mtime Each entry is of the form (block uuid, latest mtime, disk size, cumulative size) * block uuid: The id of the block we want to delete * latest mtime: The latest mtime of the block across all keep servers. * disk size: The total disk space used by this block (block size multiplied by current replication level) * cumulative disk size: The sum of this block's disk size and all the blocks listed above it * disk free: The proportion of our disk space that would be free if we deleted this block and all the above. So this is (free disk space + cumulative disk size) / total disk capacity """ garbage_collection_histogram = [] """ Shows the tradeoff of keep block age vs keep disk free space. Each entry is of the form (mtime, Disk Proportion). An entry of the form (1388747781, 0.52) means that if we deleted the oldest non-presisted blocks until we had 52% of the disk free, then all blocks with an mtime greater than 1388747781 would be preserved. """ # Stuff to report on blocks_not_in_any_collections = set() underreplicated_persisted_blocks = set() overreplicated_persisted_blocks = set() all_data_loaded = False def loadAllData(): checkUserIsAdmin() log.info('Building Collection List') global collection_uuids collection_uuids = filter(None, [extractUuid(candidate) for candidate in buildCollectionsList()]) log.info('Reading Collections') readCollections(collection_uuids) if args.verbose: pprint.pprint(CollectionInfo.all_by_uuid) log.info('Reading Links') readLinks() reportMostPopularCollections() log.info('Building Maps') buildMaps() reportBusiestUsers() log.info('Getting Keep Servers') global keep_servers keep_servers = getKeepServers() print keep_servers log.info('Getting Blocks from each Keep Server.') global keep_blocks keep_blocks = getKeepBlocks(keep_servers) log.info('Getting Stats from each Keep Server.') global keep_stats, total_keep_space, free_keep_space keep_stats = getKeepStats(keep_servers) total_keep_space = sum(map(itemgetter(0), keep_stats)) free_keep_space = sum(map(itemgetter(1), keep_stats)) # TODO(misha): Delete this hack when the keep servers are fixed! # This hack deals with the fact that keep servers report each other's disks. total_keep_space /= len(keep_stats) free_keep_space /= len(keep_stats) log.info('Total disk space: %s, Free disk space: %s (%d%%).' % (fileSizeFormat(total_keep_space), fileSizeFormat(free_keep_space), 100*free_keep_space/total_keep_space)) computeReplication(keep_blocks) log.info('average replication level is %f', (float(sum(block_to_replication.values())) / len(block_to_replication))) computeGarbageCollectionCandidates() if args.garbage_collection_file: log.info('Writing garbage Collection report to %s', args.garbage_collection_file) outputGarbageCollectionReport(args.garbage_collection_file) global garbage_collection_histogram garbage_collection_histogram = computeGarbageCollectionHistogram() if args.log_to_workbench: logGarbageCollectionHistogram() detectReplicationProblems() computeUserStorageUsage() printUserStorageUsage() if args.log_to_workbench: logUserStorageUsage() global all_data_loaded all_data_loaded = True class DataManagerHandler(BaseHTTPRequestHandler): USER_PATH = 'user' COLLECTION_PATH = 'collection' BLOCK_PATH = 'block' def userLink(self, uuid): return ('%(uuid)s' % {'uuid': uuid, 'path': DataManagerHandler.USER_PATH}) def collectionLink(self, uuid): return ('%(uuid)s' % {'uuid': uuid, 'path': DataManagerHandler.COLLECTION_PATH}) def blockLink(self, uuid): return ('%(uuid)s' % {'uuid': uuid, 'path': DataManagerHandler.BLOCK_PATH}) def writeTop(self, title): self.wfile.write('%s\n' % title) def writeBottom(self): self.wfile.write('\n') def writeHomePage(self): self.send_response(200) self.end_headers() self.writeTop('Home') self.wfile.write('') self.wfile.write('\n') for user, usage in user_to_usage.items(): self.wfile.write('\n' % (self.userLink(user), fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]), fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]), fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]), fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL]))) self.wfile.write('
user' 'unweighted readable block size' 'weighted readable block size' 'unweighted persisted block size' 'weighted persisted block size
%s%s%s%s%s
\n') self.writeBottom() def userExists(self, uuid): # Currently this will return false for a user who exists but # doesn't appear on any manifests. # TODO(misha): Figure out if we need to fix this. return user_to_usage.has_key(uuid) def writeUserPage(self, uuid): if not self.userExists(uuid): self.send_error(404, 'User (%s) Not Found.' % cgi.escape(uuid, quote=False)) else: # Here we assume that since a user exists, they don't need to be # html escaped. self.send_response(200) self.end_headers() self.writeTop('User %s' % uuid) self.wfile.write('') self.wfile.write('\n') usage = user_to_usage[uuid] self.wfile.write('\n' % (self.userLink(uuid), fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]), fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]), fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]), fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL]))) self.wfile.write('
user' 'unweighted readable block size' 'weighted readable block size' 'unweighted persisted block size' 'weighted persisted block size
%s%s%s%s%s
\n') self.wfile.write('

Persisting Collections: %s\n' % ', '.join(map(self.collectionLink, persister_to_collections[uuid]))) self.wfile.write('

Reading Collections: %s\n' % ', '.join(map(self.collectionLink, reader_to_collections[uuid]))) self.writeBottom() def collectionExists(self, uuid): return CollectionInfo.all_by_uuid.has_key(uuid) def writeCollectionPage(self, uuid): if not self.collectionExists(uuid): self.send_error(404, 'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False)) else: collection = CollectionInfo.get(uuid) # Here we assume that since a collection exists, its id doesn't # need to be html escaped. self.send_response(200) self.end_headers() self.writeTop('Collection %s' % uuid) self.wfile.write('

Collection %s

\n' % uuid) self.wfile.write('

Total size %s (not factoring in replication).\n' % fileSizeFormat(collection.byteSize())) self.wfile.write('

Readers: %s\n' % ', '.join(map(self.userLink, collection.reader_uuids))) if len(collection.persister_replication) == 0: self.wfile.write('

No persisters\n') else: replication_to_users = defaultdict(set) for user,replication in collection.persister_replication.items(): replication_to_users[replication].add(user) replication_levels = sorted(replication_to_users.keys()) self.wfile.write('

%d persisters in %d replication level(s) maxing ' 'out at %dx replication:\n' % (len(collection.persister_replication), len(replication_levels), replication_levels[-1])) # TODO(misha): This code is used twice, let's move it to a method. self.wfile.write('\n' % '\n') for replication_level in replication_levels: users = replication_to_users[replication_level] self.wfile.write('
%s
'.join(['Replication Level ' + str(x) for x in replication_levels])) self.wfile.write('
%s\n' % '
\n'.join( map(self.userLink, users))) self.wfile.write('
\n') replication_to_blocks = defaultdict(set) for block in collection.block_uuids: replication_to_blocks[block_to_replication[block]].add(block) replication_levels = sorted(replication_to_blocks.keys()) self.wfile.write('

%d blocks in %d replication level(s):\n' % (len(collection.block_uuids), len(replication_levels))) self.wfile.write('\n' % '\n') for replication_level in replication_levels: blocks = replication_to_blocks[replication_level] self.wfile.write('
%s
'.join(['Replication Level ' + str(x) for x in replication_levels])) self.wfile.write('
%s\n' % '
\n'.join(blocks)) self.wfile.write('
\n') def do_GET(self): if not all_data_loaded: self.send_error(503, 'Sorry, but I am still loading all the data I need.') else: # Removing leading '/' and process request path split_path = self.path[1:].split('/') request_type = split_path[0] log.debug('path (%s) split as %s with request_type %s' % (self.path, split_path, request_type)) if request_type == '': self.writeHomePage() elif request_type == DataManagerHandler.USER_PATH: self.writeUserPage(split_path[1]) elif request_type == DataManagerHandler.COLLECTION_PATH: self.writeCollectionPage(split_path[1]) else: self.send_error(404, 'Unrecognized request path.') return class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): """Handle requests in a separate thread.""" if __name__ == '__main__': args = parser.parse_args() if args.port == 0: loadAllData() else: loader = threading.Thread(target = loadAllData, name = 'loader') loader.start() server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler) server.serve_forever()