Merge branch 'wtsi-hgi-9231-rename-redunancy-to-replication-desired'
[arvados.git] / services / datamanager / experimental / datamanager.py
1 #! /usr/bin/env python
2
3 import arvados
4
5 import argparse
6 import cgi
7 import csv
8 import json
9 import logging
10 import math
11 import pprint
12 import re
13 import threading
14 import urllib2
15
16 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
17 from collections import defaultdict, Counter
18 from functools import partial
19 from operator import itemgetter
20 from SocketServer import ThreadingMixIn
21
22 arv = arvados.api('v1')
23
24 # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
25 byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
26 def fileSizeFormat(value):
27   exponent = 0 if value == 0 else int(math.log(value, 1024))
28   return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
29                          byteunits[exponent])
30
31 def percentageFloor(x):
32   """ Returns a float which is the input rounded down to the neared 0.01.
33
34 e.g. precentageFloor(0.941354) = 0.94
35 """
36   return math.floor(x*100) / 100.0
37
38
39 def byteSizeFromValidUuid(valid_uuid):
40   return int(valid_uuid.split('+')[1])
41
42 class maxdict(dict):
43   """A dictionary that holds the largest value entered for each key."""
44   def addValue(self, key, value):
45     dict.__setitem__(self, key, max(dict.get(self, key), value))
46   def addValues(self, kv_pairs):
47     for key,value in kv_pairs:
48       self.addValue(key, value)
49   def addDict(self, d):
50     self.addValues(d.items())
51
52 class CollectionInfo:
53   DEFAULT_PERSISTER_REPLICATION_LEVEL=2
54   all_by_uuid = {}
55
56   def __init__(self, uuid):
57     if CollectionInfo.all_by_uuid.has_key(uuid):
58       raise ValueError('Collection for uuid "%s" already exists.' % uuid)
59     self.uuid = uuid
60     self.block_uuids = set()  # uuids of keep blocks in this collection
61     self.reader_uuids = set()  # uuids of users who can read this collection
62     self.persister_uuids = set()  # uuids of users who want this collection saved
63     # map from user uuid to replication level they desire
64     self.persister_replication = maxdict()
65
66     # The whole api response in case we need anything else later.
67     self.api_response = []
68     CollectionInfo.all_by_uuid[uuid] = self
69
70   def byteSize(self):
71     return sum(map(byteSizeFromValidUuid, self.block_uuids))
72
73   def __str__(self):
74     return ('CollectionInfo uuid: %s\n'
75             '               %d block(s) containing %s\n'
76             '               reader_uuids: %s\n'
77             '               persister_replication: %s' %
78             (self.uuid,
79              len(self.block_uuids),
80              fileSizeFormat(self.byteSize()),
81              pprint.pformat(self.reader_uuids, indent = 15),
82              pprint.pformat(self.persister_replication, indent = 15)))
83
84   @staticmethod
85   def get(uuid):
86     if not CollectionInfo.all_by_uuid.has_key(uuid):
87       CollectionInfo(uuid)
88     return CollectionInfo.all_by_uuid[uuid]
89
90
91 def extractUuid(candidate):
92   """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
93   match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
94   return match and match.group(1)
95
96 def checkUserIsAdmin():
97   current_user = arv.users().current().execute()
98
99   if not current_user['is_admin']:
100     log.warning('Current user %s (%s - %s) does not have '
101                 'admin access and will not see much of the data.',
102                 current_user['full_name'],
103                 current_user['email'],
104                 current_user['uuid'])
105     if args.require_admin_user:
106       log.critical('Exiting, rerun with --no-require-admin-user '
107                    'if you wish to continue.')
108       exit(1)
109
110 def buildCollectionsList():
111   if args.uuid:
112     return [args.uuid,]
113   else:
114     collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
115
116     print ('Returned %d of %d collections.' %
117            (len(collections_list_response['items']),
118             collections_list_response['items_available']))
119
120     return [item['uuid'] for item in collections_list_response['items']]
121
122
123 def readCollections(collection_uuids):
124   for collection_uuid in collection_uuids:
125     collection_block_uuids = set()
126     collection_response = arv.collections().get(uuid=collection_uuid).execute()
127     collection_info = CollectionInfo.get(collection_uuid)
128     collection_info.api_response = collection_response
129     manifest_lines = collection_response['manifest_text'].split('\n')
130
131     if args.verbose:
132       print 'Manifest text for %s:' % collection_uuid
133       pprint.pprint(manifest_lines)
134
135     for manifest_line in manifest_lines:
136       if manifest_line:
137         manifest_tokens = manifest_line.split(' ')
138         if args.verbose:
139           print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
140         stream_name = manifest_tokens[0]
141
142         line_block_uuids = set(filter(None,
143                                       [extractUuid(candidate)
144                                        for candidate in manifest_tokens[1:]]))
145         collection_info.block_uuids.update(line_block_uuids)
146
147         # file_tokens = [token
148         #                for token in manifest_tokens[1:]
149         #                if extractUuid(token) is None]
150
151         # # Sort file tokens by start position in case they aren't already
152         # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
153
154         # if args.verbose:
155         #   print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
156         #   print 'file_tokens: ' + pprint.pformat(file_tokens)
157
158
159 def readLinks():
160   link_classes = set()
161
162   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
163     # TODO(misha): We may not be seing all the links, but since items
164     # available does not return an accurate number, I don't knos how
165     # to confirm that we saw all of them.
166     collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
167     link_classes.update([link['link_class'] for link in collection_links_response['items']])
168     for link in collection_links_response['items']:
169       if link['link_class'] == 'permission':
170         collection_info.reader_uuids.add(link['tail_uuid'])
171       elif link['link_class'] == 'resources':
172         replication_level = link['properties'].get(
173           'replication',
174           CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
175         collection_info.persister_replication.addValue(
176           link['tail_uuid'],
177           replication_level)
178         collection_info.persister_uuids.add(link['tail_uuid'])
179
180   print 'Found the following link classes:'
181   pprint.pprint(link_classes)
182
183 def reportMostPopularCollections():
184   most_popular_collections = sorted(
185     CollectionInfo.all_by_uuid.values(),
186     key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
187     reverse=True)[:10]
188
189   print 'Most popular Collections:'
190   for collection_info in most_popular_collections:
191     print collection_info
192
193
194 def buildMaps():
195   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
196     # Add the block holding the manifest itself for all calculations
197     block_uuids = collection_info.block_uuids.union([collection_uuid,])
198     for block_uuid in block_uuids:
199       block_to_collections[block_uuid].add(collection_uuid)
200       block_to_readers[block_uuid].update(collection_info.reader_uuids)
201       block_to_persisters[block_uuid].update(collection_info.persister_uuids)
202       block_to_persister_replication[block_uuid].addDict(
203         collection_info.persister_replication)
204     for reader_uuid in collection_info.reader_uuids:
205       reader_to_collections[reader_uuid].add(collection_uuid)
206       reader_to_blocks[reader_uuid].update(block_uuids)
207     for persister_uuid in collection_info.persister_uuids:
208       persister_to_collections[persister_uuid].add(collection_uuid)
209       persister_to_blocks[persister_uuid].update(block_uuids)
210
211
212 def itemsByValueLength(original):
213   return sorted(original.items(),
214                 key=lambda item:len(item[1]),
215                 reverse=True)
216
217
218 def reportBusiestUsers():
219   busiest_readers = itemsByValueLength(reader_to_collections)
220   print 'The busiest readers are:'
221   for reader,collections in busiest_readers:
222     print '%s reading %d collections.' % (reader, len(collections))
223   busiest_persisters = itemsByValueLength(persister_to_collections)
224   print 'The busiest persisters are:'
225   for persister,collections in busiest_persisters:
226     print '%s reading %d collections.' % (persister, len(collections))
227
228
229 def blockDiskUsage(block_uuid):
230   """Returns the disk usage of a block given its uuid.
231
232   Will return 0 before reading the contents of the keep servers.
233   """
234   return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
235
236 def blockPersistedUsage(user_uuid, block_uuid):
237   return (byteSizeFromValidUuid(block_uuid) *
238           block_to_persister_replication[block_uuid].get(user_uuid, 0))
239
240 memo_computeWeightedReplicationCosts = {}
241 def computeWeightedReplicationCosts(replication_levels):
242   """Computes the relative cost of varied replication levels.
243
244   replication_levels: a tuple of integers representing the desired
245   replication level. If n users want a replication level of x then x
246   should appear n times in replication_levels.
247
248   Returns a dictionary from replication level to cost.
249
250   The basic thinking is that the cost of replicating at level x should
251   be shared by everyone who wants replication of level x or higher.
252
253   For example, if we have two users who want 1 copy, one user who
254   wants 3 copies and two users who want 6 copies:
255   the input would be [1, 1, 3, 6, 6] (or any permutation)
256
257   The cost of the first copy is shared by all 5 users, so they each
258   pay 1 copy / 5 users = 0.2.
259   The cost of the second and third copies shared by 3 users, so they
260   each pay 2 copies / 3 users = 0.67 (plus the above costs)
261   The cost of the fourth, fifth and sixth copies is shared by two
262   users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
263
264   Here are some other examples:
265   computeWeightedReplicationCosts([1,]) -> {1:1.0}
266   computeWeightedReplicationCosts([2,]) -> {2:2.0}
267   computeWeightedReplicationCosts([1,1]) -> {1:0.5}
268   computeWeightedReplicationCosts([2,2]) -> {1:1.0}
269   computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
270   computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
271   computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
272   """
273   replication_level_counts = sorted(Counter(replication_levels).items())
274
275   memo_key = str(replication_level_counts)
276
277   if not memo_key in memo_computeWeightedReplicationCosts:
278     last_level = 0
279     current_cost = 0
280     total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
281     cost_for_level = {}
282     for replication_level, count in replication_level_counts:
283       copies_added = replication_level - last_level
284       # compute marginal cost from last level and add it to the last cost
285       current_cost += copies_added / total_interested
286       cost_for_level[replication_level] = current_cost
287       # update invariants
288       last_level = replication_level
289       total_interested -= count
290     memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
291
292   return memo_computeWeightedReplicationCosts[memo_key]
293
294 def blockPersistedWeightedUsage(user_uuid, block_uuid):
295   persister_replication_for_block = block_to_persister_replication[block_uuid]
296   user_replication = persister_replication_for_block[user_uuid]
297   return (
298     byteSizeFromValidUuid(block_uuid) *
299     computeWeightedReplicationCosts(
300       persister_replication_for_block.values())[user_replication])
301
302
303 def computeUserStorageUsage():
304   for user, blocks in reader_to_blocks.items():
305     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
306         byteSizeFromValidUuid,
307         blocks))
308     user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
309         lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
310                                  len(block_to_readers[block_uuid])),
311         blocks))
312   for user, blocks in persister_to_blocks.items():
313     user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
314         partial(blockPersistedUsage, user),
315         blocks))
316     user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
317         partial(blockPersistedWeightedUsage, user),
318         blocks))
319
320 def printUserStorageUsage():
321   print ('user: unweighted readable block size, weighted readable block size, '
322          'unweighted persisted block size, weighted persisted block size:')
323   for user, usage in user_to_usage.items():
324     print ('%s: %s %s %s %s' %
325            (user,
326             fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
327             fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
328             fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
329             fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
330
331 def logUserStorageUsage():
332   for user, usage in user_to_usage.items():
333     body = {}
334     # user could actually represent a user or a group. We don't set
335     # the object_type field since we don't know which we have.
336     body['object_uuid'] = user
337     body['event_type'] = args.user_storage_log_event_type
338     properties = {}
339     properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
340     properties['read_collections_weighted_bytes'] = (
341       usage[WEIGHTED_READ_SIZE_COL])
342     properties['persisted_collections_total_bytes'] = (
343       usage[UNWEIGHTED_PERSIST_SIZE_COL])
344     properties['persisted_collections_weighted_bytes'] = (
345       usage[WEIGHTED_PERSIST_SIZE_COL])
346     body['properties'] = properties
347     # TODO(misha): Confirm that this will throw an exception if it
348     # fails to create the log entry.
349     arv.logs().create(body=body).execute()
350
351 def getKeepServers():
352   response = arv.keep_disks().list().execute()
353   return [[keep_server['service_host'], keep_server['service_port']]
354           for keep_server in response['items']]
355
356
357 def getKeepBlocks(keep_servers):
358   blocks = []
359   for host,port in keep_servers:
360     response = urllib2.urlopen('http://%s:%d/index' % (host, port))
361     server_blocks = [line.split(' ')
362                      for line in response.read().split('\n')
363                      if line]
364     server_blocks = [(block_id, int(mtime))
365                      for block_id, mtime in server_blocks]
366     blocks.append(server_blocks)
367   return blocks
368
369 def getKeepStats(keep_servers):
370   MOUNT_COLUMN = 5
371   TOTAL_COLUMN = 1
372   FREE_COLUMN = 3
373   DISK_BLOCK_SIZE = 1024
374   stats = []
375   for host,port in keep_servers:
376     response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
377
378     parsed_json = json.load(response)
379     df_entries = [line.split()
380                   for line in parsed_json['df'].split('\n')
381                   if line]
382     keep_volumes = [columns
383                     for columns in df_entries
384                     if 'keep' in columns[MOUNT_COLUMN]]
385     total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
386                                                   keep_volumes)))
387     free_space =  DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
388                                                   keep_volumes)))
389     stats.append([total_space, free_space])
390   return stats
391
392
393 def computeReplication(keep_blocks):
394   for server_blocks in keep_blocks:
395     for block_uuid, _ in server_blocks:
396       block_to_replication[block_uuid] += 1
397   log.debug('Seeing the following replication levels among blocks: %s',
398             str(set(block_to_replication.values())))
399
400
401 def computeGarbageCollectionCandidates():
402   for server_blocks in keep_blocks:
403     block_to_latest_mtime.addValues(server_blocks)
404   empty_set = set()
405   garbage_collection_priority = sorted(
406     [(block,mtime)
407      for block,mtime in block_to_latest_mtime.items()
408      if len(block_to_persisters.get(block,empty_set)) == 0],
409     key = itemgetter(1))
410   global garbage_collection_report
411   garbage_collection_report = []
412   cumulative_disk_size = 0
413   for block,mtime in garbage_collection_priority:
414     disk_size = blockDiskUsage(block)
415     cumulative_disk_size += disk_size
416     garbage_collection_report.append(
417       (block,
418        mtime,
419        disk_size,
420        cumulative_disk_size,
421        float(free_keep_space + cumulative_disk_size)/total_keep_space))
422
423   print 'The oldest Garbage Collection Candidates: '
424   pprint.pprint(garbage_collection_report[:20])
425
426
427 def outputGarbageCollectionReport(filename):
428   with open(filename, 'wb') as csvfile:
429     gcwriter = csv.writer(csvfile)
430     gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
431                        'cumulative size', 'disk free'])
432     for line in garbage_collection_report:
433       gcwriter.writerow(line)
434
435 def computeGarbageCollectionHistogram():
436   # TODO(misha): Modify this to allow users to specify the number of
437   # histogram buckets through a flag.
438   histogram = []
439   last_percentage = -1
440   for _,mtime,_,_,disk_free in garbage_collection_report:
441     curr_percentage = percentageFloor(disk_free)
442     if curr_percentage > last_percentage:
443       histogram.append( (mtime, curr_percentage) )
444     last_percentage = curr_percentage
445
446   log.info('Garbage collection histogram is: %s', histogram)
447
448   return histogram
449
450
451 def logGarbageCollectionHistogram():
452   body = {}
453   # TODO(misha): Decide whether we should specify an object_uuid in
454   # the body and if so, which uuid to use.
455   body['event_type'] = args.block_age_free_space_histogram_log_event_type
456   properties = {}
457   properties['histogram'] = garbage_collection_histogram
458   body['properties'] = properties
459   # TODO(misha): Confirm that this will throw an exception if it
460   # fails to create the log entry.
461   arv.logs().create(body=body).execute()
462
463
464 def detectReplicationProblems():
465   blocks_not_in_any_collections.update(
466     set(block_to_replication.keys()).difference(block_to_collections.keys()))
467   underreplicated_persisted_blocks.update(
468     [uuid
469      for uuid, persister_replication in block_to_persister_replication.items()
470      if len(persister_replication) > 0 and
471      block_to_replication[uuid] < max(persister_replication.values())])
472   overreplicated_persisted_blocks.update(
473     [uuid
474      for uuid, persister_replication in block_to_persister_replication.items()
475      if len(persister_replication) > 0 and
476      block_to_replication[uuid] > max(persister_replication.values())])
477
478   log.info('Found %d blocks not in any collections, e.g. %s...',
479            len(blocks_not_in_any_collections),
480            ','.join(list(blocks_not_in_any_collections)[:5]))
481   log.info('Found %d underreplicated blocks, e.g. %s...',
482            len(underreplicated_persisted_blocks),
483            ','.join(list(underreplicated_persisted_blocks)[:5]))
484   log.info('Found %d overreplicated blocks, e.g. %s...',
485            len(overreplicated_persisted_blocks),
486            ','.join(list(overreplicated_persisted_blocks)[:5]))
487
488   # TODO:
489   #  Read blocks sorted by mtime
490   #  Cache window vs % free space
491   #  Collections which candidates will appear in
492   #  Youngest underreplicated read blocks that appear in collections.
493   #  Report Collections that have blocks which are missing from (or
494   #   underreplicated in) keep.
495
496
497 # This is the main flow here
498
499 parser = argparse.ArgumentParser(description='Report on keep disks.')
500 """The command line argument parser we use.
501
502 We only use it in the __main__ block, but leave it outside the block
503 in case another package wants to use it or customize it by specifying
504 it as a parent to their commandline parser.
505 """
506 parser.add_argument('-m',
507                     '--max-api-results',
508                     type=int,
509                     default=5000,
510                     help=('The max results to get at once.'))
511 parser.add_argument('-p',
512                     '--port',
513                     type=int,
514                     default=9090,
515                     help=('The port number to serve on. 0 means no server.'))
516 parser.add_argument('-v',
517                     '--verbose',
518                     help='increase output verbosity',
519                     action='store_true')
520 parser.add_argument('-u',
521                     '--uuid',
522                     help='uuid of specific collection to process')
523 parser.add_argument('--require-admin-user',
524                     action='store_true',
525                     default=True,
526                     help='Fail if the user is not an admin [default]')
527 parser.add_argument('--no-require-admin-user',
528                     dest='require_admin_user',
529                     action='store_false',
530                     help=('Allow users without admin permissions with '
531                           'only a warning.'))
532 parser.add_argument('--log-to-workbench',
533                     action='store_true',
534                     default=False,
535                     help='Log findings to workbench')
536 parser.add_argument('--no-log-to-workbench',
537                     dest='log_to_workbench',
538                     action='store_false',
539                     help='Don\'t log findings to workbench [default]')
540 parser.add_argument('--user-storage-log-event-type',
541                     default='user-storage-report',
542                     help=('The event type to set when logging user '
543                           'storage usage to workbench.'))
544 parser.add_argument('--block-age-free-space-histogram-log-event-type',
545                     default='block-age-free-space-histogram',
546                     help=('The event type to set when logging user '
547                           'storage usage to workbench.'))
548 parser.add_argument('--garbage-collection-file',
549                     default='',
550                     help=('The file to write a garbage collection report, or '
551                           'leave empty for no report.'))
552
553 args = None
554
555 # TODO(misha): Think about moving some of this to the __main__ block.
556 log = logging.getLogger('arvados.services.datamanager')
557 stderr_handler = logging.StreamHandler()
558 log.setLevel(logging.INFO)
559 stderr_handler.setFormatter(
560   logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
561 log.addHandler(stderr_handler)
562
563 # Global Data - don't try this at home
564 collection_uuids = []
565
566 # These maps all map from uuids to a set of uuids
567 block_to_collections = defaultdict(set)  # keep blocks
568 reader_to_collections = defaultdict(set)  # collection(s) for which the user has read access
569 persister_to_collections = defaultdict(set)  # collection(s) which the user has persisted
570 block_to_readers = defaultdict(set)
571 block_to_persisters = defaultdict(set)
572 block_to_persister_replication = defaultdict(maxdict)
573 reader_to_blocks = defaultdict(set)
574 persister_to_blocks = defaultdict(set)
575
576 UNWEIGHTED_READ_SIZE_COL = 0
577 WEIGHTED_READ_SIZE_COL = 1
578 UNWEIGHTED_PERSIST_SIZE_COL = 2
579 WEIGHTED_PERSIST_SIZE_COL = 3
580 NUM_COLS = 4
581 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
582
583 keep_servers = []
584 keep_blocks = []
585 keep_stats = []
586 total_keep_space = 0
587 free_keep_space =  0
588
589 block_to_replication = defaultdict(lambda: 0)
590 block_to_latest_mtime = maxdict()
591
592 garbage_collection_report = []
593 """A list of non-persisted blocks, sorted by increasing mtime
594
595 Each entry is of the form (block uuid, latest mtime, disk size,
596 cumulative size)
597
598 * block uuid: The id of the block we want to delete
599 * latest mtime: The latest mtime of the block across all keep servers.
600 * disk size: The total disk space used by this block (block size
601 multiplied by current replication level)
602 * cumulative disk size: The sum of this block's disk size and all the
603 blocks listed above it
604 * disk free: The proportion of our disk space that would be free if we
605 deleted this block and all the above. So this is (free disk space +
606 cumulative disk size) / total disk capacity
607 """
608
609 garbage_collection_histogram = []
610 """ Shows the tradeoff of keep block age vs keep disk free space.
611
612 Each entry is of the form (mtime, Disk Proportion).
613
614 An entry of the form (1388747781, 0.52) means that if we deleted the
615 oldest non-presisted blocks until we had 52% of the disk free, then
616 all blocks with an mtime greater than 1388747781 would be preserved.
617 """
618
619 # Stuff to report on
620 blocks_not_in_any_collections = set()
621 underreplicated_persisted_blocks = set()
622 overreplicated_persisted_blocks = set()
623
624 all_data_loaded = False
625
626 def loadAllData():
627   checkUserIsAdmin()
628
629   log.info('Building Collection List')
630   global collection_uuids
631   collection_uuids = filter(None, [extractUuid(candidate)
632                                    for candidate in buildCollectionsList()])
633
634   log.info('Reading Collections')
635   readCollections(collection_uuids)
636
637   if args.verbose:
638     pprint.pprint(CollectionInfo.all_by_uuid)
639
640   log.info('Reading Links')
641   readLinks()
642
643   reportMostPopularCollections()
644
645   log.info('Building Maps')
646   buildMaps()
647
648   reportBusiestUsers()
649
650   log.info('Getting Keep Servers')
651   global keep_servers
652   keep_servers = getKeepServers()
653
654   print keep_servers
655
656   log.info('Getting Blocks from each Keep Server.')
657   global keep_blocks
658   keep_blocks = getKeepBlocks(keep_servers)
659
660   log.info('Getting Stats from each Keep Server.')
661   global keep_stats, total_keep_space, free_keep_space
662   keep_stats = getKeepStats(keep_servers)
663
664   total_keep_space = sum(map(itemgetter(0), keep_stats))
665   free_keep_space = sum(map(itemgetter(1), keep_stats))
666
667   # TODO(misha): Delete this hack when the keep servers are fixed!
668   # This hack deals with the fact that keep servers report each other's disks.
669   total_keep_space /= len(keep_stats)
670   free_keep_space /= len(keep_stats)
671
672   log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
673            (fileSizeFormat(total_keep_space),
674             fileSizeFormat(free_keep_space),
675             100*free_keep_space/total_keep_space))
676
677   computeReplication(keep_blocks)
678
679   log.info('average replication level is %f',
680            (float(sum(block_to_replication.values())) /
681             len(block_to_replication)))
682
683   computeGarbageCollectionCandidates()
684
685   if args.garbage_collection_file:
686     log.info('Writing garbage Collection report to %s',
687              args.garbage_collection_file)
688     outputGarbageCollectionReport(args.garbage_collection_file)
689
690   global garbage_collection_histogram
691   garbage_collection_histogram = computeGarbageCollectionHistogram()
692
693   if args.log_to_workbench:
694     logGarbageCollectionHistogram()
695
696   detectReplicationProblems()
697
698   computeUserStorageUsage()
699   printUserStorageUsage()
700   if args.log_to_workbench:
701     logUserStorageUsage()
702
703   global all_data_loaded
704   all_data_loaded = True
705
706
707 class DataManagerHandler(BaseHTTPRequestHandler):
708   USER_PATH = 'user'
709   COLLECTION_PATH = 'collection'
710   BLOCK_PATH = 'block'
711
712   def userLink(self, uuid):
713     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
714             {'uuid': uuid,
715              'path': DataManagerHandler.USER_PATH})
716
717   def collectionLink(self, uuid):
718     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
719             {'uuid': uuid,
720              'path': DataManagerHandler.COLLECTION_PATH})
721
722   def blockLink(self, uuid):
723     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
724             {'uuid': uuid,
725              'path': DataManagerHandler.BLOCK_PATH})
726
727   def writeTop(self, title):
728     self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
729
730   def writeBottom(self):
731     self.wfile.write('</BODY></HTML>\n')
732
733   def writeHomePage(self):
734     self.send_response(200)
735     self.end_headers()
736     self.writeTop('Home')
737     self.wfile.write('<TABLE>')
738     self.wfile.write('<TR><TH>user'
739                      '<TH>unweighted readable block size'
740                      '<TH>weighted readable block size'
741                      '<TH>unweighted persisted block size'
742                      '<TH>weighted persisted block size</TR>\n')
743     for user, usage in user_to_usage.items():
744       self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
745                        (self.userLink(user),
746                         fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
747                         fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
748                         fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
749                         fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
750     self.wfile.write('</TABLE>\n')
751     self.writeBottom()
752
753   def userExists(self, uuid):
754     # Currently this will return false for a user who exists but
755     # doesn't appear on any manifests.
756     # TODO(misha): Figure out if we need to fix this.
757     return user_to_usage.has_key(uuid)
758
759   def writeUserPage(self, uuid):
760     if not self.userExists(uuid):
761       self.send_error(404,
762                       'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
763     else:
764       # Here we assume that since a user exists, they don't need to be
765       # html escaped.
766       self.send_response(200)
767       self.end_headers()
768       self.writeTop('User %s' % uuid)
769       self.wfile.write('<TABLE>')
770       self.wfile.write('<TR><TH>user'
771                        '<TH>unweighted readable block size'
772                        '<TH>weighted readable block size'
773                        '<TH>unweighted persisted block size'
774                        '<TH>weighted persisted block size</TR>\n')
775       usage = user_to_usage[uuid]
776       self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
777                        (self.userLink(uuid),
778                         fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
779                         fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
780                         fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
781                         fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
782       self.wfile.write('</TABLE>\n')
783       self.wfile.write('<P>Persisting Collections: %s\n' %
784                        ', '.join(map(self.collectionLink,
785                                      persister_to_collections[uuid])))
786       self.wfile.write('<P>Reading Collections: %s\n' %
787                        ', '.join(map(self.collectionLink,
788                                      reader_to_collections[uuid])))
789       self.writeBottom()
790
791   def collectionExists(self, uuid):
792     return CollectionInfo.all_by_uuid.has_key(uuid)
793
794   def writeCollectionPage(self, uuid):
795     if not self.collectionExists(uuid):
796       self.send_error(404,
797                       'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
798     else:
799       collection = CollectionInfo.get(uuid)
800       # Here we assume that since a collection exists, its id doesn't
801       # need to be html escaped.
802       self.send_response(200)
803       self.end_headers()
804       self.writeTop('Collection %s' % uuid)
805       self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
806       self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
807                        fileSizeFormat(collection.byteSize()))
808       self.wfile.write('<P>Readers: %s\n' %
809                        ', '.join(map(self.userLink, collection.reader_uuids)))
810
811       if len(collection.persister_replication) == 0:
812         self.wfile.write('<P>No persisters\n')
813       else:
814         replication_to_users = defaultdict(set)
815         for user,replication in collection.persister_replication.items():
816           replication_to_users[replication].add(user)
817         replication_levels = sorted(replication_to_users.keys())
818
819         self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
820                          'out at %dx replication:\n' %
821                          (len(collection.persister_replication),
822                           len(replication_levels),
823                           replication_levels[-1]))
824
825         # TODO(misha): This code is used twice, let's move it to a method.
826         self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
827                          '<TH>'.join(['Replication Level ' + str(x)
828                                       for x in replication_levels]))
829         self.wfile.write('<TR>\n')
830         for replication_level in replication_levels:
831           users = replication_to_users[replication_level]
832           self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
833               map(self.userLink, users)))
834         self.wfile.write('</TR></TABLE>\n')
835
836       replication_to_blocks = defaultdict(set)
837       for block in collection.block_uuids:
838         replication_to_blocks[block_to_replication[block]].add(block)
839       replication_levels = sorted(replication_to_blocks.keys())
840       self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
841                        (len(collection.block_uuids), len(replication_levels)))
842       self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
843                        '<TH>'.join(['Replication Level ' + str(x)
844                                     for x in replication_levels]))
845       self.wfile.write('<TR>\n')
846       for replication_level in replication_levels:
847         blocks = replication_to_blocks[replication_level]
848         self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
849       self.wfile.write('</TR></TABLE>\n')
850
851
852   def do_GET(self):
853     if not all_data_loaded:
854       self.send_error(503,
855                       'Sorry, but I am still loading all the data I need.')
856     else:
857       # Removing leading '/' and process request path
858       split_path = self.path[1:].split('/')
859       request_type = split_path[0]
860       log.debug('path (%s) split as %s with request_type %s' % (self.path,
861                                                                 split_path,
862                                                                 request_type))
863       if request_type == '':
864         self.writeHomePage()
865       elif request_type == DataManagerHandler.USER_PATH:
866         self.writeUserPage(split_path[1])
867       elif request_type == DataManagerHandler.COLLECTION_PATH:
868         self.writeCollectionPage(split_path[1])
869       else:
870         self.send_error(404, 'Unrecognized request path.')
871     return
872
873 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
874   """Handle requests in a separate thread."""
875
876
877 if __name__ == '__main__':
878   args = parser.parse_args()
879
880   if args.port == 0:
881     loadAllData()
882   else:
883     loader = threading.Thread(target = loadAllData, name = 'loader')
884     loader.start()
885
886     server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
887     server.serve_forever()