16 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
17 from collections import defaultdict, Counter
18 from functools import partial
19 from operator import itemgetter
20 from SocketServer import ThreadingMixIn
22 arv = arvados.api('v1')
24 # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
25 byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
26 def fileSizeFormat(value):
27 exponent = 0 if value == 0 else int(math.log(value, 1024))
28 return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
31 def percentageFloor(x):
32 """ Returns a float which is the input rounded down to the neared 0.01.
34 e.g. precentageFloor(0.941354) = 0.94
36 return math.floor(x*100) / 100.0
39 def byteSizeFromValidUuid(valid_uuid):
40 return int(valid_uuid.split('+')[1])
43 """A dictionary that holds the largest value entered for each key."""
44 def addValue(self, key, value):
45 dict.__setitem__(self, key, max(dict.get(self, key), value))
46 def addValues(self, kv_pairs):
47 for key,value in kv_pairs:
48 self.addValue(key, value)
50 self.addValues(d.items())
53 DEFAULT_PERSISTER_REPLICATION_LEVEL=2
56 def __init__(self, uuid):
57 if CollectionInfo.all_by_uuid.has_key(uuid):
58 raise ValueError('Collection for uuid "%s" already exists.' % uuid)
60 self.block_uuids = set() # uuids of keep blocks in this collection
61 self.reader_uuids = set() # uuids of users who can read this collection
62 self.persister_uuids = set() # uuids of users who want this collection saved
63 # map from user uuid to replication level they desire
64 self.persister_replication = maxdict()
66 # The whole api response in case we need anything else later.
67 self.api_response = []
68 CollectionInfo.all_by_uuid[uuid] = self
71 return sum(map(byteSizeFromValidUuid, self.block_uuids))
74 return ('CollectionInfo uuid: %s\n'
75 ' %d block(s) containing %s\n'
77 ' persister_replication: %s' %
79 len(self.block_uuids),
80 fileSizeFormat(self.byteSize()),
81 pprint.pformat(self.reader_uuids, indent = 15),
82 pprint.pformat(self.persister_replication, indent = 15)))
86 if not CollectionInfo.all_by_uuid.has_key(uuid):
88 return CollectionInfo.all_by_uuid[uuid]
91 def extractUuid(candidate):
92 """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
93 match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
94 return match and match.group(1)
96 def checkUserIsAdmin():
97 current_user = arv.users().current().execute()
99 if not current_user['is_admin']:
100 log.warning('Current user %s (%s - %s) does not have '
101 'admin access and will not see much of the data.',
102 current_user['full_name'],
103 current_user['email'],
104 current_user['uuid'])
105 if args.require_admin_user:
106 log.critical('Exiting, rerun with --no-require-admin-user '
107 'if you wish to continue.')
110 def buildCollectionsList():
114 collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
116 print ('Returned %d of %d collections.' %
117 (len(collections_list_response['items']),
118 collections_list_response['items_available']))
120 return [item['uuid'] for item in collections_list_response['items']]
123 def readCollections(collection_uuids):
124 for collection_uuid in collection_uuids:
125 collection_block_uuids = set()
126 collection_response = arv.collections().get(uuid=collection_uuid).execute()
127 collection_info = CollectionInfo.get(collection_uuid)
128 collection_info.api_response = collection_response
129 manifest_lines = collection_response['manifest_text'].split('\n')
132 print 'Manifest text for %s:' % collection_uuid
133 pprint.pprint(manifest_lines)
135 for manifest_line in manifest_lines:
137 manifest_tokens = manifest_line.split(' ')
139 print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
140 stream_name = manifest_tokens[0]
142 line_block_uuids = set(filter(None,
143 [extractUuid(candidate)
144 for candidate in manifest_tokens[1:]]))
145 collection_info.block_uuids.update(line_block_uuids)
147 # file_tokens = [token
148 # for token in manifest_tokens[1:]
149 # if extractUuid(token) is None]
151 # # Sort file tokens by start position in case they aren't already
152 # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
155 # print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
156 # print 'file_tokens: ' + pprint.pformat(file_tokens)
162 for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
163 # TODO(misha): We may not be seing all the links, but since items
164 # available does not return an accurate number, I don't knos how
165 # to confirm that we saw all of them.
166 collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
167 link_classes.update([link['link_class'] for link in collection_links_response['items']])
168 for link in collection_links_response['items']:
169 if link['link_class'] == 'permission':
170 collection_info.reader_uuids.add(link['tail_uuid'])
171 elif link['link_class'] == 'resources':
172 replication_level = link['properties'].get(
174 CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
175 collection_info.persister_replication.addValue(
178 collection_info.persister_uuids.add(link['tail_uuid'])
180 print 'Found the following link classes:'
181 pprint.pprint(link_classes)
183 def reportMostPopularCollections():
184 most_popular_collections = sorted(
185 CollectionInfo.all_by_uuid.values(),
186 key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
189 print 'Most popular Collections:'
190 for collection_info in most_popular_collections:
191 print collection_info
195 for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
196 # Add the block holding the manifest itself for all calculations
197 block_uuids = collection_info.block_uuids.union([collection_uuid,])
198 for block_uuid in block_uuids:
199 block_to_collections[block_uuid].add(collection_uuid)
200 block_to_readers[block_uuid].update(collection_info.reader_uuids)
201 block_to_persisters[block_uuid].update(collection_info.persister_uuids)
202 block_to_persister_replication[block_uuid].addDict(
203 collection_info.persister_replication)
204 for reader_uuid in collection_info.reader_uuids:
205 reader_to_collections[reader_uuid].add(collection_uuid)
206 reader_to_blocks[reader_uuid].update(block_uuids)
207 for persister_uuid in collection_info.persister_uuids:
208 persister_to_collections[persister_uuid].add(collection_uuid)
209 persister_to_blocks[persister_uuid].update(block_uuids)
212 def itemsByValueLength(original):
213 return sorted(original.items(),
214 key=lambda item:len(item[1]),
218 def reportBusiestUsers():
219 busiest_readers = itemsByValueLength(reader_to_collections)
220 print 'The busiest readers are:'
221 for reader,collections in busiest_readers:
222 print '%s reading %d collections.' % (reader, len(collections))
223 busiest_persisters = itemsByValueLength(persister_to_collections)
224 print 'The busiest persisters are:'
225 for persister,collections in busiest_persisters:
226 print '%s reading %d collections.' % (persister, len(collections))
229 def blockDiskUsage(block_uuid):
230 """Returns the disk usage of a block given its uuid.
232 Will return 0 before reading the contents of the keep servers.
234 return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
236 def blockPersistedUsage(user_uuid, block_uuid):
237 return (byteSizeFromValidUuid(block_uuid) *
238 block_to_persister_replication[block_uuid].get(user_uuid, 0))
240 memo_computeWeightedReplicationCosts = {}
241 def computeWeightedReplicationCosts(replication_levels):
242 """Computes the relative cost of varied replication levels.
244 replication_levels: a tuple of integers representing the desired
245 replication level. If n users want a replication level of x then x
246 should appear n times in replication_levels.
248 Returns a dictionary from replication level to cost.
250 The basic thinking is that the cost of replicating at level x should
251 be shared by everyone who wants replication of level x or higher.
253 For example, if we have two users who want 1 copy, one user who
254 wants 3 copies and two users who want 6 copies:
255 the input would be [1, 1, 3, 6, 6] (or any permutation)
257 The cost of the first copy is shared by all 5 users, so they each
258 pay 1 copy / 5 users = 0.2.
259 The cost of the second and third copies shared by 3 users, so they
260 each pay 2 copies / 3 users = 0.67 (plus the above costs)
261 The cost of the fourth, fifth and sixth copies is shared by two
262 users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
264 Here are some other examples:
265 computeWeightedReplicationCosts([1,]) -> {1:1.0}
266 computeWeightedReplicationCosts([2,]) -> {2:2.0}
267 computeWeightedReplicationCosts([1,1]) -> {1:0.5}
268 computeWeightedReplicationCosts([2,2]) -> {1:1.0}
269 computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
270 computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
271 computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
273 replication_level_counts = sorted(Counter(replication_levels).items())
275 memo_key = str(replication_level_counts)
277 if not memo_key in memo_computeWeightedReplicationCosts:
280 total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
282 for replication_level, count in replication_level_counts:
283 copies_added = replication_level - last_level
284 # compute marginal cost from last level and add it to the last cost
285 current_cost += copies_added / total_interested
286 cost_for_level[replication_level] = current_cost
288 last_level = replication_level
289 total_interested -= count
290 memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
292 return memo_computeWeightedReplicationCosts[memo_key]
294 def blockPersistedWeightedUsage(user_uuid, block_uuid):
295 persister_replication_for_block = block_to_persister_replication[block_uuid]
296 user_replication = persister_replication_for_block[user_uuid]
298 byteSizeFromValidUuid(block_uuid) *
299 computeWeightedReplicationCosts(
300 persister_replication_for_block.values())[user_replication])
303 def computeUserStorageUsage():
304 for user, blocks in reader_to_blocks.items():
305 user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
306 byteSizeFromValidUuid,
308 user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
309 lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
310 len(block_to_readers[block_uuid])),
312 for user, blocks in persister_to_blocks.items():
313 user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
314 partial(blockPersistedUsage, user),
316 user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
317 partial(blockPersistedWeightedUsage, user),
320 def printUserStorageUsage():
321 print ('user: unweighted readable block size, weighted readable block size, '
322 'unweighted persisted block size, weighted persisted block size:')
323 for user, usage in user_to_usage.items():
324 print ('%s: %s %s %s %s' %
326 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
327 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
328 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
329 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
331 def logUserStorageUsage():
332 for user, usage in user_to_usage.items():
334 # user could actually represent a user or a group. We don't set
335 # the object_type field since we don't know which we have.
336 body['object_uuid'] = user
337 body['event_type'] = args.user_storage_log_event_type
339 properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
340 properties['read_collections_weighted_bytes'] = (
341 usage[WEIGHTED_READ_SIZE_COL])
342 properties['persisted_collections_total_bytes'] = (
343 usage[UNWEIGHTED_PERSIST_SIZE_COL])
344 properties['persisted_collections_weighted_bytes'] = (
345 usage[WEIGHTED_PERSIST_SIZE_COL])
346 body['properties'] = properties
347 # TODO(misha): Confirm that this will throw an exception if it
348 # fails to create the log entry.
349 arv.logs().create(body=body).execute()
351 def getKeepServers():
352 response = arv.keep_disks().list().execute()
353 return [[keep_server['service_host'], keep_server['service_port']]
354 for keep_server in response['items']]
357 def getKeepBlocks(keep_servers):
359 for host,port in keep_servers:
360 response = urllib2.urlopen('http://%s:%d/index' % (host, port))
361 server_blocks = [line.split(' ')
362 for line in response.read().split('\n')
364 server_blocks = [(block_id, int(mtime))
365 for block_id, mtime in server_blocks]
366 blocks.append(server_blocks)
369 def getKeepStats(keep_servers):
373 DISK_BLOCK_SIZE = 1024
375 for host,port in keep_servers:
376 response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
378 parsed_json = json.load(response)
379 df_entries = [line.split()
380 for line in parsed_json['df'].split('\n')
382 keep_volumes = [columns
383 for columns in df_entries
384 if 'keep' in columns[MOUNT_COLUMN]]
385 total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
387 free_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
389 stats.append([total_space, free_space])
393 def computeReplication(keep_blocks):
394 for server_blocks in keep_blocks:
395 for block_uuid, _ in server_blocks:
396 block_to_replication[block_uuid] += 1
397 log.debug('Seeing the following replication levels among blocks: %s',
398 str(set(block_to_replication.values())))
401 def computeGarbageCollectionCandidates():
402 for server_blocks in keep_blocks:
403 block_to_latest_mtime.addValues(server_blocks)
405 garbage_collection_priority = sorted(
407 for block,mtime in block_to_latest_mtime.items()
408 if len(block_to_persisters.get(block,empty_set)) == 0],
410 global garbage_collection_report
411 garbage_collection_report = []
412 cumulative_disk_size = 0
413 for block,mtime in garbage_collection_priority:
414 disk_size = blockDiskUsage(block)
415 cumulative_disk_size += disk_size
416 garbage_collection_report.append(
420 cumulative_disk_size,
421 float(free_keep_space + cumulative_disk_size)/total_keep_space))
423 print 'The oldest Garbage Collection Candidates: '
424 pprint.pprint(garbage_collection_report[:20])
427 def outputGarbageCollectionReport(filename):
428 with open(filename, 'wb') as csvfile:
429 gcwriter = csv.writer(csvfile)
430 gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
431 'cumulative size', 'disk free'])
432 for line in garbage_collection_report:
433 gcwriter.writerow(line)
435 def computeGarbageCollectionHistogram():
436 # TODO(misha): Modify this to allow users to specify the number of
437 # histogram buckets through a flag.
440 for _,mtime,_,_,disk_free in garbage_collection_report:
441 curr_percentage = percentageFloor(disk_free)
442 if curr_percentage > last_percentage:
443 histogram.append( (mtime, curr_percentage) )
444 last_percentage = curr_percentage
446 log.info('Garbage collection histogram is: %s', histogram)
451 def logGarbageCollectionHistogram():
453 # TODO(misha): Decide whether we should specify an object_uuid in
454 # the body and if so, which uuid to use.
455 body['event_type'] = args.block_age_free_space_histogram_log_event_type
457 properties['histogram'] = garbage_collection_histogram
458 body['properties'] = properties
459 # TODO(misha): Confirm that this will throw an exception if it
460 # fails to create the log entry.
461 arv.logs().create(body=body).execute()
464 def detectReplicationProblems():
465 blocks_not_in_any_collections.update(
466 set(block_to_replication.keys()).difference(block_to_collections.keys()))
467 underreplicated_persisted_blocks.update(
469 for uuid, persister_replication in block_to_persister_replication.items()
470 if len(persister_replication) > 0 and
471 block_to_replication[uuid] < max(persister_replication.values())])
472 overreplicated_persisted_blocks.update(
474 for uuid, persister_replication in block_to_persister_replication.items()
475 if len(persister_replication) > 0 and
476 block_to_replication[uuid] > max(persister_replication.values())])
478 log.info('Found %d blocks not in any collections, e.g. %s...',
479 len(blocks_not_in_any_collections),
480 ','.join(list(blocks_not_in_any_collections)[:5]))
481 log.info('Found %d underreplicated blocks, e.g. %s...',
482 len(underreplicated_persisted_blocks),
483 ','.join(list(underreplicated_persisted_blocks)[:5]))
484 log.info('Found %d overreplicated blocks, e.g. %s...',
485 len(overreplicated_persisted_blocks),
486 ','.join(list(overreplicated_persisted_blocks)[:5]))
489 # Read blocks sorted by mtime
490 # Cache window vs % free space
491 # Collections which candidates will appear in
492 # Youngest underreplicated read blocks that appear in collections.
493 # Report Collections that have blocks which are missing from (or
494 # underreplicated in) keep.
497 # This is the main flow here
499 parser = argparse.ArgumentParser(description='Report on keep disks.')
500 """The command line argument parser we use.
502 We only use it in the __main__ block, but leave it outside the block
503 in case another package wants to use it or customize it by specifying
504 it as a parent to their commandline parser.
506 parser.add_argument('-m',
510 help=('The max results to get at once.'))
511 parser.add_argument('-p',
515 help=('The port number to serve on. 0 means no server.'))
516 parser.add_argument('-v',
518 help='increase output verbosity',
520 parser.add_argument('-u',
522 help='uuid of specific collection to process')
523 parser.add_argument('--require-admin-user',
526 help='Fail if the user is not an admin [default]')
527 parser.add_argument('--no-require-admin-user',
528 dest='require_admin_user',
529 action='store_false',
530 help=('Allow users without admin permissions with '
532 parser.add_argument('--log-to-workbench',
535 help='Log findings to workbench')
536 parser.add_argument('--no-log-to-workbench',
537 dest='log_to_workbench',
538 action='store_false',
539 help='Don\'t log findings to workbench [default]')
540 parser.add_argument('--user-storage-log-event-type',
541 default='user-storage-report',
542 help=('The event type to set when logging user '
543 'storage usage to workbench.'))
544 parser.add_argument('--block-age-free-space-histogram-log-event-type',
545 default='block-age-free-space-histogram',
546 help=('The event type to set when logging user '
547 'storage usage to workbench.'))
548 parser.add_argument('--garbage-collection-file',
550 help=('The file to write a garbage collection report, or '
551 'leave empty for no report.'))
555 # TODO(misha): Think about moving some of this to the __main__ block.
556 log = logging.getLogger('arvados.services.datamanager')
557 stderr_handler = logging.StreamHandler()
558 log.setLevel(logging.INFO)
559 stderr_handler.setFormatter(
560 logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
561 log.addHandler(stderr_handler)
563 # Global Data - don't try this at home
564 collection_uuids = []
566 # These maps all map from uuids to a set of uuids
567 block_to_collections = defaultdict(set) # keep blocks
568 reader_to_collections = defaultdict(set) # collection(s) for which the user has read access
569 persister_to_collections = defaultdict(set) # collection(s) which the user has persisted
570 block_to_readers = defaultdict(set)
571 block_to_persisters = defaultdict(set)
572 block_to_persister_replication = defaultdict(maxdict)
573 reader_to_blocks = defaultdict(set)
574 persister_to_blocks = defaultdict(set)
576 UNWEIGHTED_READ_SIZE_COL = 0
577 WEIGHTED_READ_SIZE_COL = 1
578 UNWEIGHTED_PERSIST_SIZE_COL = 2
579 WEIGHTED_PERSIST_SIZE_COL = 3
581 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
589 block_to_replication = defaultdict(lambda: 0)
590 block_to_latest_mtime = maxdict()
592 garbage_collection_report = []
593 """A list of non-persisted blocks, sorted by increasing mtime
595 Each entry is of the form (block uuid, latest mtime, disk size,
598 * block uuid: The id of the block we want to delete
599 * latest mtime: The latest mtime of the block across all keep servers.
600 * disk size: The total disk space used by this block (block size
601 multiplied by current replication level)
602 * cumulative disk size: The sum of this block's disk size and all the
603 blocks listed above it
604 * disk free: The proportion of our disk space that would be free if we
605 deleted this block and all the above. So this is (free disk space +
606 cumulative disk size) / total disk capacity
609 garbage_collection_histogram = []
610 """ Shows the tradeoff of keep block age vs keep disk free space.
612 Each entry is of the form (mtime, Disk Proportion).
614 An entry of the form (1388747781, 0.52) means that if we deleted the
615 oldest non-presisted blocks until we had 52% of the disk free, then
616 all blocks with an mtime greater than 1388747781 would be preserved.
620 blocks_not_in_any_collections = set()
621 underreplicated_persisted_blocks = set()
622 overreplicated_persisted_blocks = set()
624 all_data_loaded = False
629 log.info('Building Collection List')
630 global collection_uuids
631 collection_uuids = filter(None, [extractUuid(candidate)
632 for candidate in buildCollectionsList()])
634 log.info('Reading Collections')
635 readCollections(collection_uuids)
638 pprint.pprint(CollectionInfo.all_by_uuid)
640 log.info('Reading Links')
643 reportMostPopularCollections()
645 log.info('Building Maps')
650 log.info('Getting Keep Servers')
652 keep_servers = getKeepServers()
656 log.info('Getting Blocks from each Keep Server.')
658 keep_blocks = getKeepBlocks(keep_servers)
660 log.info('Getting Stats from each Keep Server.')
661 global keep_stats, total_keep_space, free_keep_space
662 keep_stats = getKeepStats(keep_servers)
664 total_keep_space = sum(map(itemgetter(0), keep_stats))
665 free_keep_space = sum(map(itemgetter(1), keep_stats))
667 # TODO(misha): Delete this hack when the keep servers are fixed!
668 # This hack deals with the fact that keep servers report each other's disks.
669 total_keep_space /= len(keep_stats)
670 free_keep_space /= len(keep_stats)
672 log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
673 (fileSizeFormat(total_keep_space),
674 fileSizeFormat(free_keep_space),
675 100*free_keep_space/total_keep_space))
677 computeReplication(keep_blocks)
679 log.info('average replication level is %f',
680 (float(sum(block_to_replication.values())) /
681 len(block_to_replication)))
683 computeGarbageCollectionCandidates()
685 if args.garbage_collection_file:
686 log.info('Writing garbage Collection report to %s',
687 args.garbage_collection_file)
688 outputGarbageCollectionReport(args.garbage_collection_file)
690 global garbage_collection_histogram
691 garbage_collection_histogram = computeGarbageCollectionHistogram()
693 if args.log_to_workbench:
694 logGarbageCollectionHistogram()
696 detectReplicationProblems()
698 computeUserStorageUsage()
699 printUserStorageUsage()
700 if args.log_to_workbench:
701 logUserStorageUsage()
703 global all_data_loaded
704 all_data_loaded = True
707 class DataManagerHandler(BaseHTTPRequestHandler):
709 COLLECTION_PATH = 'collection'
712 def userLink(self, uuid):
713 return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
715 'path': DataManagerHandler.USER_PATH})
717 def collectionLink(self, uuid):
718 return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
720 'path': DataManagerHandler.COLLECTION_PATH})
722 def blockLink(self, uuid):
723 return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
725 'path': DataManagerHandler.BLOCK_PATH})
727 def writeTop(self, title):
728 self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
730 def writeBottom(self):
731 self.wfile.write('</BODY></HTML>\n')
733 def writeHomePage(self):
734 self.send_response(200)
736 self.writeTop('Home')
737 self.wfile.write('<TABLE>')
738 self.wfile.write('<TR><TH>user'
739 '<TH>unweighted readable block size'
740 '<TH>weighted readable block size'
741 '<TH>unweighted persisted block size'
742 '<TH>weighted persisted block size</TR>\n')
743 for user, usage in user_to_usage.items():
744 self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
745 (self.userLink(user),
746 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
747 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
748 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
749 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
750 self.wfile.write('</TABLE>\n')
753 def userExists(self, uuid):
754 # Currently this will return false for a user who exists but
755 # doesn't appear on any manifests.
756 # TODO(misha): Figure out if we need to fix this.
757 return user_to_usage.has_key(uuid)
759 def writeUserPage(self, uuid):
760 if not self.userExists(uuid):
762 'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
764 # Here we assume that since a user exists, they don't need to be
766 self.send_response(200)
768 self.writeTop('User %s' % uuid)
769 self.wfile.write('<TABLE>')
770 self.wfile.write('<TR><TH>user'
771 '<TH>unweighted readable block size'
772 '<TH>weighted readable block size'
773 '<TH>unweighted persisted block size'
774 '<TH>weighted persisted block size</TR>\n')
775 usage = user_to_usage[uuid]
776 self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
777 (self.userLink(uuid),
778 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
779 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
780 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
781 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
782 self.wfile.write('</TABLE>\n')
783 self.wfile.write('<P>Persisting Collections: %s\n' %
784 ', '.join(map(self.collectionLink,
785 persister_to_collections[uuid])))
786 self.wfile.write('<P>Reading Collections: %s\n' %
787 ', '.join(map(self.collectionLink,
788 reader_to_collections[uuid])))
791 def collectionExists(self, uuid):
792 return CollectionInfo.all_by_uuid.has_key(uuid)
794 def writeCollectionPage(self, uuid):
795 if not self.collectionExists(uuid):
797 'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
799 collection = CollectionInfo.get(uuid)
800 # Here we assume that since a collection exists, its id doesn't
801 # need to be html escaped.
802 self.send_response(200)
804 self.writeTop('Collection %s' % uuid)
805 self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
806 self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
807 fileSizeFormat(collection.byteSize()))
808 self.wfile.write('<P>Readers: %s\n' %
809 ', '.join(map(self.userLink, collection.reader_uuids)))
811 if len(collection.persister_replication) == 0:
812 self.wfile.write('<P>No persisters\n')
814 replication_to_users = defaultdict(set)
815 for user,replication in collection.persister_replication.items():
816 replication_to_users[replication].add(user)
817 replication_levels = sorted(replication_to_users.keys())
819 self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
820 'out at %dx replication:\n' %
821 (len(collection.persister_replication),
822 len(replication_levels),
823 replication_levels[-1]))
825 # TODO(misha): This code is used twice, let's move it to a method.
826 self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
827 '<TH>'.join(['Replication Level ' + str(x)
828 for x in replication_levels]))
829 self.wfile.write('<TR>\n')
830 for replication_level in replication_levels:
831 users = replication_to_users[replication_level]
832 self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
833 map(self.userLink, users)))
834 self.wfile.write('</TR></TABLE>\n')
836 replication_to_blocks = defaultdict(set)
837 for block in collection.block_uuids:
838 replication_to_blocks[block_to_replication[block]].add(block)
839 replication_levels = sorted(replication_to_blocks.keys())
840 self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
841 (len(collection.block_uuids), len(replication_levels)))
842 self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
843 '<TH>'.join(['Replication Level ' + str(x)
844 for x in replication_levels]))
845 self.wfile.write('<TR>\n')
846 for replication_level in replication_levels:
847 blocks = replication_to_blocks[replication_level]
848 self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
849 self.wfile.write('</TR></TABLE>\n')
853 if not all_data_loaded:
855 'Sorry, but I am still loading all the data I need.')
857 # Removing leading '/' and process request path
858 split_path = self.path[1:].split('/')
859 request_type = split_path[0]
860 log.debug('path (%s) split as %s with request_type %s' % (self.path,
863 if request_type == '':
865 elif request_type == DataManagerHandler.USER_PATH:
866 self.writeUserPage(split_path[1])
867 elif request_type == DataManagerHandler.COLLECTION_PATH:
868 self.writeCollectionPage(split_path[1])
870 self.send_error(404, 'Unrecognized request path.')
873 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
874 """Handle requests in a separate thread."""
877 if __name__ == '__main__':
878 args = parser.parse_args()
883 loader = threading.Thread(target = loadAllData, name = 'loader')
886 server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
887 server.serve_forever()