16 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
17 from collections import defaultdict, Counter
18 from functools import partial
19 from operator import itemgetter
20 from SocketServer import ThreadingMixIn
22 arv = arvados.api('v1')
24 # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
25 byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
26 def fileSizeFormat(value):
27 exponent = 0 if value == 0 else int(math.log(value, 1024))
28 return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
31 def percentageFloor(x):
32 """ Returns a float which is the input rounded down to the neared 0.01.
34 e.g. precentageFloor(0.941354) = 0.94
36 return math.floor(x*100) / 100.0
39 def byteSizeFromValidUuid(valid_uuid):
40 return int(valid_uuid.split('+')[1])
43 """A dictionary that holds the largest value entered for each key."""
44 def addValue(self, key, value):
45 dict.__setitem__(self, key, max(dict.get(self, key), value))
46 def addValues(self, kv_pairs):
47 for key,value in kv_pairs:
48 self.addValue(key, value)
50 self.addValues(d.items())
53 DEFAULT_PERSISTER_REPLICATION_LEVEL=2
56 def __init__(self, uuid):
57 if CollectionInfo.all_by_uuid.has_key(uuid):
58 raise ValueError('Collection for uuid "%s" already exists.' % uuid)
60 self.block_uuids = set() # uuids of keep blocks in this collection
61 self.reader_uuids = set() # uuids of users who can read this collection
62 self.persister_uuids = set() # uuids of users who want this collection saved
63 # map from user uuid to replication level they desire
64 self.persister_replication = maxdict()
66 # The whole api response in case we need anything else later.
67 self.api_response = []
68 CollectionInfo.all_by_uuid[uuid] = self
71 return sum(map(byteSizeFromValidUuid, self.block_uuids))
74 return ('CollectionInfo uuid: %s\n'
75 ' %d block(s) containing %s\n'
77 ' persister_replication: %s' %
79 len(self.block_uuids),
80 fileSizeFormat(self.byteSize()),
81 pprint.pformat(self.reader_uuids, indent = 15),
82 pprint.pformat(self.persister_replication, indent = 15)))
86 if not CollectionInfo.all_by_uuid.has_key(uuid):
88 return CollectionInfo.all_by_uuid[uuid]
91 def extractUuid(candidate):
92 """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
93 match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
94 return match and match.group(1)
96 def checkUserIsAdmin():
97 current_user = arv.users().current().execute()
99 if not current_user['is_admin']:
100 log.warning('Current user %s (%s - %s) does not have '
101 'admin access and will not see much of the data.',
102 current_user['full_name'],
103 current_user['email'],
104 current_user['uuid'])
105 if args.require_admin_user:
106 log.critical('Exiting, rerun with --no-require-admin-user '
107 'if you wish to continue.')
110 def buildCollectionsList():
114 collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
116 print ('Returned %d of %d collections.' %
117 (len(collections_list_response['items']),
118 collections_list_response['items_available']))
120 return [item['uuid'] for item in collections_list_response['items']]
123 def readCollections(collection_uuids):
124 for collection_uuid in collection_uuids:
125 collection_block_uuids = set()
126 collection_response = arv.collections().get(uuid=collection_uuid).execute()
127 collection_info = CollectionInfo.get(collection_uuid)
128 collection_info.api_response = collection_response
129 manifest_lines = collection_response['manifest_text'].split('\n')
132 print 'Manifest text for %s:' % collection_uuid
133 pprint.pprint(manifest_lines)
135 for manifest_line in manifest_lines:
137 manifest_tokens = manifest_line.split(' ')
139 print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
140 stream_name = manifest_tokens[0]
142 line_block_uuids = set(filter(None,
143 [extractUuid(candidate)
144 for candidate in manifest_tokens[1:]]))
145 collection_info.block_uuids.update(line_block_uuids)
147 # file_tokens = [token
148 # for token in manifest_tokens[1:]
149 # if extractUuid(token) is None]
151 # # Sort file tokens by start position in case they aren't already
152 # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
155 # print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
156 # print 'file_tokens: ' + pprint.pformat(file_tokens)
162 for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
163 # TODO(misha): We may not be seing all the links, but since items
164 # available does not return an accurate number, I don't knos how
165 # to confirm that we saw all of them.
166 collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
167 link_classes.update([link['link_class'] for link in collection_links_response['items']])
168 for link in collection_links_response['items']:
169 if link['link_class'] == 'permission':
170 collection_info.reader_uuids.add(link['tail_uuid'])
171 elif link['link_class'] == 'resources':
172 replication_level = link['properties'].get(
174 CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
175 collection_info.persister_replication.addValue(
178 collection_info.persister_uuids.add(link['tail_uuid'])
180 print 'Found the following link classes:'
181 pprint.pprint(link_classes)
183 def reportMostPopularCollections():
184 most_popular_collections = sorted(
185 CollectionInfo.all_by_uuid.values(),
186 key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
189 print 'Most popular Collections:'
190 for collection_info in most_popular_collections:
191 print collection_info
195 for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
196 # Add the block holding the manifest itself for all calculations
197 block_uuids = collection_info.block_uuids.union([collection_uuid,])
198 for block_uuid in block_uuids:
199 block_to_collections[block_uuid].add(collection_uuid)
200 block_to_readers[block_uuid].update(collection_info.reader_uuids)
201 block_to_persisters[block_uuid].update(collection_info.persister_uuids)
202 block_to_persister_replication[block_uuid].addDict(
203 collection_info.persister_replication)
204 for reader_uuid in collection_info.reader_uuids:
205 reader_to_collections[reader_uuid].add(collection_uuid)
206 reader_to_blocks[reader_uuid].update(block_uuids)
207 for persister_uuid in collection_info.persister_uuids:
208 persister_to_collections[persister_uuid].add(collection_uuid)
209 persister_to_blocks[persister_uuid].update(block_uuids)
212 def itemsByValueLength(original):
213 return sorted(original.items(),
214 key=lambda item:len(item[1]),
218 def reportBusiestUsers():
219 busiest_readers = itemsByValueLength(reader_to_collections)
220 print 'The busiest readers are:'
221 for reader,collections in busiest_readers:
222 print '%s reading %d collections.' % (reader, len(collections))
223 busiest_persisters = itemsByValueLength(persister_to_collections)
224 print 'The busiest persisters are:'
225 for persister,collections in busiest_persisters:
226 print '%s reading %d collections.' % (persister, len(collections))
229 def blockDiskUsage(block_uuid):
230 """Returns the disk usage of a block given its uuid.
232 Will return 0 before reading the contents of the keep servers.
234 return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
236 def blockPersistedUsage(user_uuid, block_uuid):
237 return (byteSizeFromValidUuid(block_uuid) *
238 block_to_persister_replication[block_uuid].get(user_uuid, 0))
240 memo_computeWeightedReplicationCosts = {}
241 def computeWeightedReplicationCosts(replication_levels):
242 """Computes the relative cost of varied replication levels.
244 replication_levels: a tuple of integers representing the desired
245 replication level. If n users want a replication level of x then x
246 should appear n times in replication_levels.
248 Returns a dictionary from replication level to cost.
250 The basic thinking is that the cost of replicating at level x should
251 be shared by everyone who wants replication of level x or higher.
253 For example, if we have two users who want 1 copy, one user who
254 wants 3 copies and two users who want 6 copies:
255 the input would be [1, 1, 3, 6, 6] (or any permutation)
257 The cost of the first copy is shared by all 5 users, so they each
258 pay 1 copy / 5 users = 0.2.
259 The cost of the second and third copies shared by 3 users, so they
260 each pay 2 copies / 3 users = 0.67 (plus the above costs)
261 The cost of the fourth, fifth and sixth copies is shared by two
262 users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
264 Here are some other examples:
265 computeWeightedReplicationCosts([1,]) -> {1:1.0}
266 computeWeightedReplicationCosts([2,]) -> {2:2.0}
267 computeWeightedReplicationCosts([1,1]) -> {1:0.5}
268 computeWeightedReplicationCosts([2,2]) -> {1:1.0}
269 computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
270 computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
271 computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
273 replication_level_counts = sorted(Counter(replication_levels).items())
275 memo_key = str(replication_level_counts)
277 if not memo_key in memo_computeWeightedReplicationCosts:
280 total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
282 for replication_level, count in replication_level_counts:
283 copies_added = replication_level - last_level
284 # compute marginal cost from last level and add it to the last cost
285 current_cost += copies_added / total_interested
286 cost_for_level[replication_level] = current_cost
288 last_level = replication_level
289 total_interested -= count
290 memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
292 return memo_computeWeightedReplicationCosts[memo_key]
294 def blockPersistedWeightedUsage(user_uuid, block_uuid):
295 persister_replication_for_block = block_to_persister_replication[block_uuid]
296 user_replication = persister_replication_for_block[user_uuid]
298 byteSizeFromValidUuid(block_uuid) *
299 computeWeightedReplicationCosts(
300 persister_replication_for_block.values())[user_replication])
303 def computeUserStorageUsage():
304 for user, blocks in reader_to_blocks.items():
305 user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
306 byteSizeFromValidUuid,
308 user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
309 lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
310 len(block_to_readers[block_uuid])),
312 for user, blocks in persister_to_blocks.items():
313 user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
314 partial(blockPersistedUsage, user),
316 user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
317 partial(blockPersistedWeightedUsage, user),
320 def printUserStorageUsage():
321 print ('user: unweighted readable block size, weighted readable block size, '
322 'unweighted persisted block size, weighted persisted block size:')
323 for user, usage in user_to_usage.items():
324 print ('%s: %s %s %s %s' %
326 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
327 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
328 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
329 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
331 def logUserStorageUsage():
332 for user, usage in user_to_usage.items():
334 # user could actually represent a user or a group. We don't set
335 # the object_type field since we don't know which we have.
336 body['object_uuid'] = user
337 body['event_type'] = args.user_storage_log_event_type
339 properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
340 properties['read_collections_weighted_bytes'] = (
341 usage[WEIGHTED_READ_SIZE_COL])
342 properties['persisted_collections_total_bytes'] = (
343 usage[UNWEIGHTED_PERSIST_SIZE_COL])
344 properties['persisted_collections_weighted_bytes'] = (
345 usage[WEIGHTED_PERSIST_SIZE_COL])
346 body['properties'] = properties
347 # TODO(misha): Confirm that this will throw an exception if it
348 # fails to create the log entry.
349 arv.logs().create(body=body).execute()
351 def getKeepServers():
352 response = arv.keep_disks().list().execute()
353 return [[keep_server['service_host'], keep_server['service_port']]
354 for keep_server in response['items']]
357 def getKeepBlocks(keep_servers):
359 for host,port in keep_servers:
360 response = urllib2.urlopen('http://%s:%d/index' % (host, port))
361 server_blocks = [line.split(' ')
362 for line in response.read().split('\n')
364 server_blocks = [(block_id, int(mtime))
365 for block_id, mtime in server_blocks]
366 blocks.append(server_blocks)
369 def getKeepStats(keep_servers):
373 DISK_BLOCK_SIZE = 1024
375 for host,port in keep_servers:
376 response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
378 parsed_json = json.load(response)
379 df_entries = [line.split()
380 for line in parsed_json['df'].split('\n')
382 keep_volumes = [columns
383 for columns in df_entries
384 if 'keep' in columns[MOUNT_COLUMN]]
385 total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
387 free_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
389 stats.append([total_space, free_space])
393 def computeReplication(keep_blocks):
394 for server_blocks in keep_blocks:
395 for block_uuid, _ in server_blocks:
396 block_to_replication[block_uuid] += 1
397 log.debug('Seeing the following replication levels among blocks: %s',
398 str(set(block_to_replication.values())))
401 def computeGarbageCollectionCandidates():
402 for server_blocks in keep_blocks:
403 block_to_latest_mtime.addValues(server_blocks)
405 garbage_collection_priority = sorted(
407 for block,mtime in block_to_latest_mtime.items()
408 if len(block_to_persisters.get(block,empty_set)) == 0],
410 global garbage_collection_report
411 garbage_collection_report = []
412 cumulative_disk_size = 0
413 for block,mtime in garbage_collection_priority:
414 disk_size = blockDiskUsage(block)
415 cumulative_disk_size += disk_size
416 garbage_collection_report.append(
420 cumulative_disk_size,
421 float(free_keep_space + cumulative_disk_size)/total_keep_space))
423 print 'The oldest Garbage Collection Candidates: '
424 pprint.pprint(garbage_collection_report[:20])
427 def outputGarbageCollectionReport(filename):
428 with open(filename, 'wb') as csvfile:
429 gcwriter = csv.writer(csvfile)
430 gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
431 'cumulative size', 'disk free'])
432 for line in garbage_collection_report:
433 gcwriter.writerow(line)
436 def computeGarbageCollectionHistogram():
439 for _,mtime,_,_,disk_free in garbage_collection_report:
440 curr_percentage = percentageFloor(disk_free)
441 if curr_percentage > last_percentage:
442 histogram.append( (curr_percentage, mtime) )
443 last_percentage = curr_percentage
445 log.info('Garbage collection histogram is: %s', histogram)
450 def logGarbageCollectionHistogram():
452 # TODO(misha): Decide whether we should specify an object_uuid in
453 # the body and if so, which uuid to use.
454 body['event_type'] = args.block_age_free_space_histogram_log_event_type
456 properties['histogram'] = garbage_collection_histogram
457 body['properties'] = properties
458 # TODO(misha): Confirm that this will throw an exception if it
459 # fails to create the log entry.
460 arv.logs().create(body=body).execute()
463 def detectReplicationProblems():
464 blocks_not_in_any_collections.update(
465 set(block_to_replication.keys()).difference(block_to_collections.keys()))
466 underreplicated_persisted_blocks.update(
468 for uuid, persister_replication in block_to_persister_replication.items()
469 if len(persister_replication) > 0 and
470 block_to_replication[uuid] < max(persister_replication.values())])
471 overreplicated_persisted_blocks.update(
473 for uuid, persister_replication in block_to_persister_replication.items()
474 if len(persister_replication) > 0 and
475 block_to_replication[uuid] > max(persister_replication.values())])
477 log.info('Found %d blocks not in any collections, e.g. %s...',
478 len(blocks_not_in_any_collections),
479 ','.join(list(blocks_not_in_any_collections)[:5]))
480 log.info('Found %d underreplicated blocks, e.g. %s...',
481 len(underreplicated_persisted_blocks),
482 ','.join(list(underreplicated_persisted_blocks)[:5]))
483 log.info('Found %d overreplicated blocks, e.g. %s...',
484 len(overreplicated_persisted_blocks),
485 ','.join(list(overreplicated_persisted_blocks)[:5]))
488 # Read blocks sorted by mtime
489 # Cache window vs % free space
490 # Collections which candidates will appear in
491 # Youngest underreplicated read blocks that appear in collections.
492 # Report Collections that have blocks which are missing from (or
493 # underreplicated in) keep.
496 # This is the main flow here
498 parser = argparse.ArgumentParser(description='Report on keep disks.')
499 """The command line argument parser we use.
501 We only use it in the __main__ block, but leave it outside the block
502 in case another package wants to use it or customize it by specifying
503 it as a parent to their commandline parser.
505 parser.add_argument('-m',
509 help=('The max results to get at once.'))
510 parser.add_argument('-p',
514 help=('The port number to serve on. 0 means no server.'))
515 parser.add_argument('-v',
517 help='increase output verbosity',
519 parser.add_argument('-u',
521 help='uuid of specific collection to process')
522 parser.add_argument('--require-admin-user',
525 help='Fail if the user is not an admin [default]')
526 parser.add_argument('--no-require-admin-user',
527 dest='require_admin_user',
528 action='store_false',
529 help=('Allow users without admin permissions with '
531 parser.add_argument('--log-to-workbench',
534 help='Log findings to workbench')
535 parser.add_argument('--no-log-to-workbench',
536 dest='log_to_workbench',
537 action='store_false',
538 help='Don\'t log findings to workbench [default]')
539 parser.add_argument('--user-storage-log-event-type',
540 default='user-storage-report',
541 help=('The event type to set when logging user '
542 'storage usage to workbench.'))
543 parser.add_argument('--block-age-free-space-histogram-log-event-type',
544 default='block-age-free-space-histogram',
545 help=('The event type to set when logging user '
546 'storage usage to workbench.'))
547 parser.add_argument('--garbage-collection-file',
549 help=('The file to write a garbage collection report, or '
550 'leave empty for no report.'))
554 # TODO(misha): Think about moving some of this to the __main__ block.
555 log = logging.getLogger('arvados.services.datamanager')
556 stderr_handler = logging.StreamHandler()
557 log.setLevel(logging.INFO)
558 stderr_handler.setFormatter(
559 logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
560 log.addHandler(stderr_handler)
562 # Global Data - don't try this at home
563 collection_uuids = []
565 # These maps all map from uuids to a set of uuids
566 block_to_collections = defaultdict(set) # keep blocks
567 reader_to_collections = defaultdict(set) # collection(s) for which the user has read access
568 persister_to_collections = defaultdict(set) # collection(s) which the user has persisted
569 block_to_readers = defaultdict(set)
570 block_to_persisters = defaultdict(set)
571 block_to_persister_replication = defaultdict(maxdict)
572 reader_to_blocks = defaultdict(set)
573 persister_to_blocks = defaultdict(set)
575 UNWEIGHTED_READ_SIZE_COL = 0
576 WEIGHTED_READ_SIZE_COL = 1
577 UNWEIGHTED_PERSIST_SIZE_COL = 2
578 WEIGHTED_PERSIST_SIZE_COL = 3
580 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
588 block_to_replication = defaultdict(lambda: 0)
589 block_to_latest_mtime = maxdict()
591 garbage_collection_report = []
592 """A list of non-persisted blocks, sorted by increasing mtime
594 Each entry is of the form (block uuid, latest mtime, disk size,
597 * block uuid: The id of the block we want to delete
598 * latest mtime: The latest mtime of the block across all keep servers.
599 * disk size: The total disk space used by this block (block size
600 multiplied by current replication level)
601 * cumulative disk size: The sum of this block's disk size and all the
602 blocks listed above it
603 * disk free: The proportion of our disk space that would be free if we
604 deleted this block and all the above. So this is (free disk space +
605 cumulative disk size) / total disk capacity
608 garbage_collection_histogram = []
609 """ Shows the tradeoff of keep block age vs keep disk free space.
611 Each entry is of the form (Disk Proportion, mtime).
613 An entry of the form (0.52, 1388747781) means that if we deleted the
614 oldest non-presisted blocks until we had 52% of the disk free, then
615 all blocks with an mtime greater than 1388747781 would be preserved.
619 blocks_not_in_any_collections = set()
620 underreplicated_persisted_blocks = set()
621 overreplicated_persisted_blocks = set()
623 all_data_loaded = False
628 log.info('Building Collection List')
629 global collection_uuids
630 collection_uuids = filter(None, [extractUuid(candidate)
631 for candidate in buildCollectionsList()])
633 log.info('Reading Collections')
634 readCollections(collection_uuids)
637 pprint.pprint(CollectionInfo.all_by_uuid)
639 log.info('Reading Links')
642 reportMostPopularCollections()
644 log.info('Building Maps')
649 log.info('Getting Keep Servers')
651 keep_servers = getKeepServers()
655 log.info('Getting Blocks from each Keep Server.')
657 keep_blocks = getKeepBlocks(keep_servers)
659 log.info('Getting Stats from each Keep Server.')
660 global keep_stats, total_keep_space, free_keep_space
661 keep_stats = getKeepStats(keep_servers)
663 total_keep_space = sum(map(itemgetter(0), keep_stats))
664 free_keep_space = sum(map(itemgetter(1), keep_stats))
666 # TODO(misha): Delete this hack when the keep servers are fixed!
667 # This hack deals with the fact that keep servers report each other's disks.
668 total_keep_space /= len(keep_stats)
669 free_keep_space /= len(keep_stats)
671 log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
672 (fileSizeFormat(total_keep_space),
673 fileSizeFormat(free_keep_space),
674 100*free_keep_space/total_keep_space))
676 computeReplication(keep_blocks)
678 log.info('average replication level is %f',
679 (float(sum(block_to_replication.values())) /
680 len(block_to_replication)))
682 computeGarbageCollectionCandidates()
684 if args.garbage_collection_file:
685 log.info('Writing garbage Collection report to %s',
686 args.garbage_collection_file)
687 outputGarbageCollectionReport(args.garbage_collection_file)
689 global garbage_collection_histogram
690 garbage_collection_histogram = computeGarbageCollectionHistogram()
692 if args.log_to_workbench:
693 logGarbageCollectionHistogram()
695 detectReplicationProblems()
697 computeUserStorageUsage()
698 printUserStorageUsage()
699 if args.log_to_workbench:
700 logUserStorageUsage()
702 global all_data_loaded
703 all_data_loaded = True
706 class DataManagerHandler(BaseHTTPRequestHandler):
708 COLLECTION_PATH = 'collection'
711 def userLink(self, uuid):
712 return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
714 'path': DataManagerHandler.USER_PATH})
716 def collectionLink(self, uuid):
717 return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
719 'path': DataManagerHandler.COLLECTION_PATH})
721 def blockLink(self, uuid):
722 return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
724 'path': DataManagerHandler.BLOCK_PATH})
726 def writeTop(self, title):
727 self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
729 def writeBottom(self):
730 self.wfile.write('</BODY></HTML>\n')
732 def writeHomePage(self):
733 self.send_response(200)
735 self.writeTop('Home')
736 self.wfile.write('<TABLE>')
737 self.wfile.write('<TR><TH>user'
738 '<TH>unweighted readable block size'
739 '<TH>weighted readable block size'
740 '<TH>unweighted persisted block size'
741 '<TH>weighted persisted block size</TR>\n')
742 for user, usage in user_to_usage.items():
743 self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
744 (self.userLink(user),
745 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
746 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
747 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
748 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
749 self.wfile.write('</TABLE>\n')
752 def userExists(self, uuid):
753 # Currently this will return false for a user who exists but
754 # doesn't appear on any manifests.
755 # TODO(misha): Figure out if we need to fix this.
756 return user_to_usage.has_key(uuid)
758 def writeUserPage(self, uuid):
759 if not self.userExists(uuid):
761 'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
763 # Here we assume that since a user exists, they don't need to be
765 self.send_response(200)
767 self.writeTop('User %s' % uuid)
768 self.wfile.write('<TABLE>')
769 self.wfile.write('<TR><TH>user'
770 '<TH>unweighted readable block size'
771 '<TH>weighted readable block size'
772 '<TH>unweighted persisted block size'
773 '<TH>weighted persisted block size</TR>\n')
774 usage = user_to_usage[uuid]
775 self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
776 (self.userLink(uuid),
777 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
778 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
779 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
780 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
781 self.wfile.write('</TABLE>\n')
782 self.wfile.write('<P>Persisting Collections: %s\n' %
783 ', '.join(map(self.collectionLink,
784 persister_to_collections[uuid])))
785 self.wfile.write('<P>Reading Collections: %s\n' %
786 ', '.join(map(self.collectionLink,
787 reader_to_collections[uuid])))
790 def collectionExists(self, uuid):
791 return CollectionInfo.all_by_uuid.has_key(uuid)
793 def writeCollectionPage(self, uuid):
794 if not self.collectionExists(uuid):
796 'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
798 collection = CollectionInfo.get(uuid)
799 # Here we assume that since a collection exists, its id doesn't
800 # need to be html escaped.
801 self.send_response(200)
803 self.writeTop('Collection %s' % uuid)
804 self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
805 self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
806 fileSizeFormat(collection.byteSize()))
807 self.wfile.write('<P>Readers: %s\n' %
808 ', '.join(map(self.userLink, collection.reader_uuids)))
810 if len(collection.persister_replication) == 0:
811 self.wfile.write('<P>No persisters\n')
813 replication_to_users = defaultdict(set)
814 for user,replication in collection.persister_replication.items():
815 replication_to_users[replication].add(user)
816 replication_levels = sorted(replication_to_users.keys())
818 self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
819 'out at %dx replication:\n' %
820 (len(collection.persister_replication),
821 len(replication_levels),
822 replication_levels[-1]))
824 # TODO(misha): This code is used twice, let's move it to a method.
825 self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
826 '<TH>'.join(['Replication Level ' + str(x)
827 for x in replication_levels]))
828 self.wfile.write('<TR>\n')
829 for replication_level in replication_levels:
830 users = replication_to_users[replication_level]
831 self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
832 map(self.userLink, users)))
833 self.wfile.write('</TR></TABLE>\n')
835 replication_to_blocks = defaultdict(set)
836 for block in collection.block_uuids:
837 replication_to_blocks[block_to_replication[block]].add(block)
838 replication_levels = sorted(replication_to_blocks.keys())
839 self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
840 (len(collection.block_uuids), len(replication_levels)))
841 self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
842 '<TH>'.join(['Replication Level ' + str(x)
843 for x in replication_levels]))
844 self.wfile.write('<TR>\n')
845 for replication_level in replication_levels:
846 blocks = replication_to_blocks[replication_level]
847 self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
848 self.wfile.write('</TR></TABLE>\n')
852 if not all_data_loaded:
854 'Sorry, but I am still loading all the data I need.')
856 # Removing leading '/' and process request path
857 split_path = self.path[1:].split('/')
858 request_type = split_path[0]
859 log.debug('path (%s) split as %s with request_type %s' % (self.path,
862 if request_type == '':
864 elif request_type == DataManagerHandler.USER_PATH:
865 self.writeUserPage(split_path[1])
866 elif request_type == DataManagerHandler.COLLECTION_PATH:
867 self.writeCollectionPage(split_path[1])
869 self.send_error(404, 'Unrecognized request path.')
872 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
873 """Handle requests in a separate thread."""
876 if __name__ == '__main__':
877 args = parser.parse_args()
882 loader = threading.Thread(target = loadAllData, name = 'loader')
885 server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
886 server.serve_forever()