84efc9d3a4279971a4733363e90494c63dd337fa
[arvados.git] / services / datamanager / datamanager.py
1 #! /usr/bin/env python
2
3 import arvados
4
5 import argparse
6 import pprint
7 import re
8 import urllib2
9
10 from collections import defaultdict
11 from math import log
12 from operator import itemgetter
13
14 arv = arvados.api('v1')
15
16 # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
17 byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
18 def fileSizeFormat(value):
19   exponent = 0 if value == 0 else int(log(value, 1024))
20   return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
21                          byteunits[exponent])
22
23 def byteSizeFromValidUuid(valid_uuid):
24   return int(valid_uuid.split('+')[1])
25
26 class CollectionInfo:
27   all_by_uuid = {}
28
29   def __init__(self, uuid):
30     if CollectionInfo.all_by_uuid.has_key(uuid):
31       raise ValueError('Collection for uuid "%s" already exists.' % uuid)
32     self.uuid = uuid
33     self.block_uuids = set()  # uuids of keep blocks in this collection
34     self.reader_uuids = set()  # uuids of users who can read this collection
35     self.persister_uuids = set()  # uuids of users who want this collection saved
36     CollectionInfo.all_by_uuid[uuid] = self
37
38   def byte_size(self):
39     return sum(map(byteSizeFromValidUuid, self.block_uuids))
40
41   def __str__(self):
42     return ('CollectionInfo uuid: %s\n'
43             '               %d block(s) containing %s\n'
44             '               reader_uuids: %s\n'
45             '               persister_uuids: %s' %
46             (self.uuid,
47              len(self.block_uuids),
48              fileSizeFormat(self.byte_size()),
49              pprint.pformat(self.reader_uuids, indent = 15),
50              pprint.pformat(self.persister_uuids, indent = 15)))
51
52   @staticmethod
53   def get(uuid):
54     if not CollectionInfo.all_by_uuid.has_key(uuid):
55       CollectionInfo(uuid)
56     return CollectionInfo.all_by_uuid[uuid]
57   
58
59 def extractUuid(candidate):
60   """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
61   match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
62   return match and match.group(1)
63
64 def checkUserIsAdmin():
65   current_user = arv.users().current().execute()
66
67   if not current_user['is_admin']:
68     # TODO(misha): Use a logging framework here
69     print ('Warning current user %s (%s - %s) does not have admin access '
70            'and will not see much of the data.' %
71            (current_user['full_name'],
72             current_user['email'],
73             current_user['uuid']))
74     if args.require_admin_user:
75       print 'Exiting, rerun with --no-require-admin-user if you wish to continue.'
76       exit(1)
77
78 def buildCollectionsList():
79   if args.uuid:
80     return [args.uuid,]
81   else:
82     collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
83
84     print ('Returned %d of %d collections.' %
85            (len(collections_list_response['items']),
86             collections_list_response['items_available']))
87
88     return [item['uuid'] for item in collections_list_response['items']]
89
90
91 def readCollections(collection_uuids):
92   for collection_uuid in collection_uuids:
93     collection_block_uuids = set()
94     collection_response = arv.collections().get(uuid=collection_uuid).execute()
95     collection_info = CollectionInfo.get(collection_uuid)
96     manifest_lines = collection_response['manifest_text'].split('\n')
97
98     if args.verbose:
99       print 'Manifest text for %s:' % collection_uuid
100       pprint.pprint(manifest_lines)
101
102     for manifest_line in manifest_lines:
103       if manifest_line:
104         manifest_tokens = manifest_line.split(' ')
105         if args.verbose:
106           print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
107         stream_name = manifest_tokens[0]
108
109         line_block_uuids = set(filter(None,
110                                       [extractUuid(candidate)
111                                        for candidate in manifest_tokens[1:]]))
112         collection_info.block_uuids.update(line_block_uuids)
113
114         # file_tokens = [token
115         #                for token in manifest_tokens[1:]
116         #                if extractUuid(token) is None]
117
118         # # Sort file tokens by start position in case they aren't already
119         # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
120
121         # if args.verbose:
122         #   print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
123         #   print 'file_tokens: ' + pprint.pformat(file_tokens)
124
125
126 def readLinks():
127   link_classes = set()
128
129   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
130     collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
131     link_classes.update([link['link_class'] for link in collection_links_response['items']])
132     for link in collection_links_response['items']:
133       if link['link_class'] == 'permission':
134         collection_info.reader_uuids.add(link['tail_uuid'])
135       elif link['link_class'] == 'resources':
136         collection_info.persister_uuids.add(link['tail_uuid'])
137
138   print 'Found the following link classes:'
139   pprint.pprint(link_classes)
140
141 def reportMostPopularCollections():
142   most_popular_collections = sorted(
143     CollectionInfo.all_by_uuid.values(),
144     key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_uuids),
145     reverse=True)[:10]
146
147   print 'Most popular Collections:'
148   for collection_info in most_popular_collections:
149     print collection_info
150
151
152 def buildMaps():
153   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
154     # Add the block holding the manifest itself for all calculations
155     block_uuids = collection_info.block_uuids.union([collection_uuid,])
156     for block_uuid in block_uuids:
157       block_to_collections[block_uuid].add(collection_uuid)
158       block_to_readers[block_uuid].update(collection_info.reader_uuids)
159       block_to_persisters[block_uuid].update(collection_info.persister_uuids)
160     for reader_uuid in collection_info.reader_uuids:
161       reader_to_collections[reader_uuid].add(collection_uuid)
162       reader_to_blocks[reader_uuid].update(block_uuids)
163     for persister_uuid in collection_info.persister_uuids:
164       persister_to_collections[persister_uuid].add(collection_uuid)
165       persister_to_blocks[persister_uuid].update(block_uuids)
166
167
168 def itemsByValueLength(original):
169   return sorted(original.items(),
170                 key=lambda item:len(item[1]),
171                 reverse=True)
172
173
174 def reportBusiestUsers():
175   busiest_readers = itemsByValueLength(reader_to_collections)
176   print 'The busiest readers are:'
177   for reader,collections in busiest_readers:
178     print '%s reading %d collections.' % (reader, len(collections))
179   busiest_persisters = itemsByValueLength(persister_to_collections)
180   print 'The busiest persisters are:'
181   for persister,collections in busiest_persisters:
182     print '%s reading %d collections.' % (persister, len(collections))
183
184
185 def reportUserDiskUsage():
186   for user, blocks in reader_to_blocks.items():
187     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
188         byteSizeFromValidUuid,
189         blocks))
190     user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
191         lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
192                                  len(block_to_readers[block_uuid])),
193         blocks))
194   for user, blocks in persister_to_blocks.items():
195     user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
196         byteSizeFromValidUuid,
197         blocks))
198     user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
199         lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
200                                  len(block_to_persisters[block_uuid])),
201         blocks))
202   print ('user: unweighted readable block size, weighted readable block size, '
203          'unweighted persisted block size, weighted persisted block size:')
204   for user, usage in user_to_usage.items():
205     print ('%s: %s %s %s %s' %
206            (user,
207             fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
208             fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
209             fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
210             fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
211
212
213 def getKeepServers():
214   response = arv.keep_disks().list().execute()
215   return [[keep_server['service_host'], keep_server['service_port']]
216           for keep_server in response['items']]
217
218
219 def getKeepBlocks(keep_servers):
220   blocks = []
221   for host,port in keep_servers:
222     response = urllib2.urlopen('http://%s:%d/index' % (host, port))
223     blocks.append([line.split(' ')
224                    for line in response.read().split('\n')
225                    if line])
226   return blocks
227
228
229 def computeReplication(keep_blocks):
230   block_to_replication = defaultdict(lambda: 0)
231   for server_blocks in keep_blocks:
232     for block_uuid, _ in server_blocks:
233       block_to_replication[block_uuid] += 1
234   return block_to_replication
235
236
237 # This is the main flow here
238
239 parser = argparse.ArgumentParser(description='Report on keep disks.')
240 parser.add_argument('-m',
241                     '--max-api-results',
242                     type=int,
243                     default=5000,
244                     help=('The max results to get at once.'))
245 parser.add_argument('-v',
246                     '--verbose',
247                     help='increase output verbosity',
248                     action='store_true')
249 parser.add_argument('-u',
250                     '--uuid',
251                     help='uuid of specific collection to process')
252 parser.add_argument('--require-admin-user',
253                     action='store_true',
254                     help='Fail if the user is not an admin [default]')
255 parser.add_argument('--no-require-admin-user',
256                     dest='require_admin_user',
257                     action='store_false',
258                     help='Allow users without admin permissions with only a warning.')
259 args = parser.parse_args()
260
261 checkUserIsAdmin()
262
263 print 'Building Collection List'
264 collection_uuids = filter(None, [extractUuid(candidate)
265                                  for candidate in buildCollectionsList()])
266
267 print 'Reading Collections'
268 readCollections(collection_uuids)
269
270 if args.verbose:
271   pprint.pprint(CollectionInfo.all_by_uuid)
272
273 print 'Reading Links'
274 readLinks()
275
276 reportMostPopularCollections()
277
278 # These maps all map from uuids to a set of uuids
279 # The sets all contain collection uuids.
280 block_to_collections = defaultdict(set)  # keep blocks
281 reader_to_collections = defaultdict(set)  # collection(s) for which the user has read access
282 persister_to_collections = defaultdict(set)  # collection(s) which the user has persisted
283 block_to_readers = defaultdict(set)
284 block_to_persisters = defaultdict(set)
285 reader_to_blocks = defaultdict(set)
286 persister_to_blocks = defaultdict(set)
287
288 print 'Building Maps'
289 buildMaps()
290
291 reportBusiestUsers()
292
293 UNWEIGHTED_READ_SIZE_COL = 0
294 WEIGHTED_READ_SIZE_COL = 1
295 UNWEIGHTED_PERSIST_SIZE_COL = 2
296 WEIGHTED_PERSIST_SIZE_COL = 3
297 NUM_COLS = 4
298 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
299
300 reportUserDiskUsage()
301
302 print 'Getting Keep Servers'
303 keep_servers = getKeepServers()
304
305 print keep_servers
306
307 print 'Getting Blocks from each Keep Server.'
308 keep_blocks = getKeepBlocks(keep_servers)
309
310 block_to_replication = computeReplication(keep_blocks)
311
312 print 'average replication level is %f' % (float(sum(block_to_replication.values())) / len(block_to_replication))