2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.List;
14 * IoTTable data structure. Provides client inferface.
15 * @author Brian Demsky
19 final public class Table {
20 private int numslots; //number of slots stored in buffer
22 //table of key-value pairs
23 private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
25 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
26 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
28 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
29 private Vector<Long> rejectedmessagelist = new Vector<Long>();
30 private SlotBuffer buffer;
31 private CloudComm cloud;
32 private long sequencenumber; //Largest sequence number a client has received
33 private long localmachineid;
34 private TableStatus lastTableStatus;
35 static final int FREE_SLOTS = 10; //number of slots that should be kept free
36 static final int SKIP_THRESHOLD = 10;
37 private long liveslotcount = 0;
39 static final double RESIZE_MULTIPLE = 1.2;
40 static final double RESIZE_THRESHOLD = 0.75;
41 static final int REJECTED_THRESHOLD = 5;
42 private int resizethreshold;
43 private long lastliveslotseqn; //smallest sequence number with a live entry
44 private Random random = new Random();
46 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
47 private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
48 private List<Commit> commitList = null; // List of all the most recent live commits
49 private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
50 private List<Transaction> uncommittedTransactionsList = null; //
51 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
52 private Set<Abort> arbitratorTable = null; // Table of arbitrators
55 public Table(String baseurl, String password, long _localmachineid) {
56 localmachineid = _localmachineid;
57 buffer = new SlotBuffer();
58 numslots = buffer.capacity();
61 cloud = new CloudComm(this, baseurl, password);
64 pendingTransQueue = new LinkedList<PendingTransaction>();
65 commitList = new LinkedList<Commit>();
66 commitedTable = new HashMap<IoTString, KeyValue>();
67 uncommittedTransactionsList = new LinkedList<Transaction>();
68 arbitratorTable = new HashMap<IoTString, Long>();
71 public Table(CloudComm _cloud, long _localmachineid) {
72 localmachineid = _localmachineid;
73 buffer = new SlotBuffer();
74 numslots = buffer.capacity();
79 pendingTransQueue = new LinkedList<PendingTransaction>();
80 commitList = new LinkedList<Commit>();
81 commitedTable = new HashMap<IoTString, KeyValue>();
82 uncommittedTransactionsList = new LinkedList<Transaction>();
83 arbitratorTable = new HashMap<IoTString, Long>();
86 public void rebuild() {
87 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
88 validateandupdate(newslots, true);
91 public void update() {
92 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
94 validateandupdate(newslots, false);
97 public IoTString get(IoTString key) {
98 KeyValue kv = table.get(key);
100 return kv.getValue();
105 public void initTable() {
106 cloud.setSalt();//Set the salt
107 Slot s = new Slot(this, 1, localmachineid);
108 TableStatus status = new TableStatus(s, numslots);
110 Slot[] array = cloud.putSlot(s, numslots);
112 array = new Slot[] {s};
113 /* update data structure */
114 validateandupdate(array, true);
116 throw new Error("Error on initialization");
120 public String toString() {
121 return table.toString();
124 public void startTransaction() {
125 // Create a new transaction, invalidates any old pending transactions.
126 pendingTransBuild = new PendingTransaction();
129 public void commitTransaction() {
131 // Add the pending transaction to the queue
132 pendingTransQueue.add(pendingTransBuild);
134 while (!pendingTransQueue.isEmpty()) {
135 if (tryput( pendingTransQueue.peek(), false)) {
136 pendingTransQueue.poll();
141 public void addKV(IoTString key, IoTString value) {
142 KeyValue kv = new KeyValue(key, value);
143 pendingTransBuild.addKV(kv);
146 public void addGuard(IoTString key, IoTString value) {
147 KeyValue kv = new KeyValue(key, value);
148 pendingTransBuild.addKV(kv);
151 void decrementLiveCount() {
156 private void setResizeThreshold() {
157 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
158 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
161 private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
162 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
164 if (liveslotcount > resizethreshold) {
165 resize = true; //Resize is forced
169 newsize = (int) (numslots * RESIZE_MULTIPLE);
170 TableStatus status = new TableStatus(s, newsize);
174 if (! rejectedmessagelist.isEmpty()) {
175 /* TODO: We should avoid generating a rejected message entry if
176 * there is already a sufficient entry in the queue (e.g.,
177 * equalsto value of true and same sequence number). */
179 long old_seqn = rejectedmessagelist.firstElement();
180 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
181 long new_seqn = rejectedmessagelist.lastElement();
182 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
187 /* Go through list of missing messages */
188 for (; i < rejectedmessagelist.size(); i++) {
189 long curr_seqn = rejectedmessagelist.get(i);
190 Slot s_msg = buffer.getSlot(curr_seqn);
193 prev_seqn = curr_seqn;
195 /* Generate rejected message entry for missing messages */
196 if (prev_seqn != -1) {
197 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
200 /* Generate rejected message entries for present messages */
201 for (; i < rejectedmessagelist.size(); i++) {
202 long curr_seqn = rejectedmessagelist.get(i);
203 Slot s_msg = buffer.getSlot(curr_seqn);
204 long machineid = s_msg.getMachineID();
205 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
211 long newestseqnum = buffer.getNewestSeqNum();
212 long oldestseqnum = buffer.getOldestSeqNum();
213 if (lastliveslotseqn < oldestseqnum)
214 lastliveslotseqn = oldestseqnum;
216 long seqn = lastliveslotseqn;
217 boolean seenliveslot = false;
218 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
219 long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
221 for (; seqn < threshold; seqn++) {
222 Slot prevslot = buffer.getSlot(seqn);
223 //Push slot number forward
225 lastliveslotseqn = seqn;
227 if (! prevslot.isLive())
230 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
231 for (Entry liveentry : liveentries) {
232 if (s.hasSpace(liveentry)) {
233 s.addEntry(liveentry);
234 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
236 System.out.print("B"); //?
237 return tryput(pendingTrans, true);
244 Transaction trans = new Transaction(s,
245 s.getSequenceNumber(),
247 pendingTrans.getKVUpdates(),
248 pendingTrans.getGuard());
249 boolean insertedTrans = false;
250 if (s.hasSpace(trans)) {
252 insertedTrans = true;
255 /* now go through live entries from least to greatest sequence number until
256 * either all live slots added, or the slot doesn't have enough room
257 * for SKIP_THRESHOLD consecutive entries*/
260 for (; seqn <= newestseqnum; seqn++) {
261 Slot prevslot = buffer.getSlot(seqn);
262 //Push slot number forward
264 lastliveslotseqn = seqn;
266 if (!prevslot.isLive())
269 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
270 for (Entry liveentry : liveentries) {
271 if (s.hasSpace(liveentry))
272 s.addEntry(liveentry);
275 if (skipcount > SKIP_THRESHOLD)
284 Slot[] array = cloud.putSlot(s, max);
286 array = new Slot[] {s};
287 rejectedmessagelist.clear();
289 if (array.length == 0)
290 throw new Error("Server Error: Did not send any slots");
291 rejectedmessagelist.add(s.getSequenceNumber());
292 insertedTrans = false;
295 /* update data structure */
296 validateandupdate(array, true);
298 return insertedTrans;
301 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
302 /* The cloud communication layer has checked slot HMACs already
304 if (newslots.length == 0) return;
306 long firstseqnum = newslots[0].getSequenceNumber();
307 if (firstseqnum <= sequencenumber)
308 throw new Error("Server Error: Sent older slots!");
310 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
311 checkHMACChain(indexer, newslots);
313 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
315 initExpectedSize(firstseqnum);
316 for (Slot slot : newslots) {
317 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
318 updateExpectedSize();
321 /* If there is a gap, check to see if the server sent us everything. */
322 if (firstseqnum != (sequencenumber + 1)) {
323 checkNumSlots(newslots.length);
324 if (!machineSet.isEmpty())
325 throw new Error("Missing record for machines: " + machineSet);
330 /* Commit new to slots. */
331 for (Slot slot : newslots) {
332 buffer.putSlot(slot);
335 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
338 private int expectedsize, currmaxsize;
340 private void checkNumSlots(int numslots) {
341 if (numslots != expectedsize)
342 throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
345 private void initExpectedSize(long firstsequencenumber) {
346 long prevslots = firstsequencenumber;
347 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
348 currmaxsize = numslots;
351 private void updateExpectedSize() {
353 if (expectedsize > currmaxsize)
354 expectedsize = currmaxsize;
357 private void updateCurrMaxSize(int newmaxsize) {
358 currmaxsize = newmaxsize;
361 private void commitNewMaxSize() {
362 if (numslots != currmaxsize)
363 buffer.resize(currmaxsize);
365 numslots = currmaxsize;
366 setResizeThreshold();
369 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
370 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
373 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
374 long oldseqnum = entry.getOldSeqNum();
375 long newseqnum = entry.getNewSeqNum();
376 boolean isequal = entry.getEqual();
377 long machineid = entry.getMachineID();
378 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
379 Slot slot = indexer.getSlot(seqnum);
381 long slotmachineid = slot.getMachineID();
382 if (isequal != (slotmachineid == machineid)) {
383 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
388 HashSet<Long> watchset = new HashSet<Long>();
389 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
390 long entry_mid = lastmsg_entry.getKey();
391 /* We've seen it, don't need to continue to watch. Our next
392 * message will implicitly acknowledge it. */
393 if (entry_mid == localmachineid)
395 Pair<Long, Liveness> v = lastmsg_entry.getValue();
396 long entry_seqn = v.getFirst();
397 if (entry_seqn < newseqnum) {
398 addWatchList(entry_mid, entry);
399 watchset.add(entry_mid);
402 if (watchset.isEmpty())
405 entry.setWatchSet(watchset);
408 private void processEntry(NewKey entry) {
409 arbitratorTable.put(entry.getKey(), entry.getMachineID());
412 private void processEntry(Transaction entry) {
413 uncommittedTransactionsList.add(entry);
416 private void processEntry(Abort entry) {
417 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
418 Transaction prevtrans = i.next();
419 if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
420 uncommittedTransactionsList.remove(prevtrans);
427 private void processEntry(Commit entry) {
429 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
430 Commit prevcommit = i.next();
431 prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
433 if (!prevcommit.isLive()) {
434 commitList.remove(prevcommit);
438 commitList.add(entry);
440 // Update the committed table list
441 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
442 IoTString key = kv.getKey();
443 commitedTable.put(key, kv);
446 long committedTransSeq = entry.getTransSequenceNumber();
448 // Make dead the transactions
449 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
450 Transaction prevtrans = i.next();
452 if (prevtrans.getSequenceNumber() <= committedTransSeq) {
453 uncommittedTransactionsList.remove(prevtrans);
459 private void addWatchList(long machineid, RejectedMessage entry) {
460 HashSet<RejectedMessage> entries = watchlist.get(machineid);
462 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
466 private void processEntry(TableStatus entry) {
467 int newnumslots = entry.getMaxSlots();
468 updateCurrMaxSize(newnumslots);
469 if (lastTableStatus != null)
470 lastTableStatus.setDead();
471 lastTableStatus = entry;
474 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
475 machineSet.remove(machineid);
477 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
478 if (watchset != null) {
479 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
480 RejectedMessage rm = rmit.next();
481 if (rm.getNewSeqNum() <= seqnum) {
482 /* Remove it from our watchlist */
484 /* Decrement machines that need to see this notification */
485 rm.removeWatcher(machineid);
490 if (machineid == localmachineid) {
491 /* Our own messages are immediately dead. */
492 if (liveness instanceof LastMessage) {
493 ((LastMessage)liveness).setDead();
494 } else if (liveness instanceof Slot) {
495 ((Slot)liveness).setDead();
497 throw new Error("Unrecognized type");
502 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
503 if (lastmsgentry == null)
506 long lastmsgseqnum = lastmsgentry.getFirst();
507 Liveness lastentry = lastmsgentry.getSecond();
508 if (machineid != localmachineid) {
509 if (lastentry instanceof LastMessage) {
510 ((LastMessage)lastentry).setDead();
511 } else if (lastentry instanceof Slot) {
512 ((Slot)lastentry).setDead();
514 throw new Error("Unrecognized type");
518 if (machineid == localmachineid) {
519 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
520 throw new Error("Server Error: Mismatch on local machine sequence number");
522 if (lastmsgseqnum > seqnum)
523 throw new Error("Server Error: Rollback on remote machine sequence number");
527 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
528 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
529 for (Entry entry : slot.getEntries()) {
530 switch (entry.getType()) {
532 case Entry.TypeNewKey:
533 processEntry((NewKey)entry);
536 case Entry.TypeCommit:
537 processEntry((Commit)entry);
540 case Entry.TypeAbort:
541 processEntry((Abort)entry);
544 case Entry.TypeTransaction:
545 processEntry((Transaction)entry);
548 case Entry.TypeLastMessage:
549 processEntry((LastMessage)entry, machineSet);
552 case Entry.TypeRejectedMessage:
553 processEntry((RejectedMessage)entry, indexer);
556 case Entry.TypeTableStatus:
557 processEntry((TableStatus)entry);
561 throw new Error("Unrecognized type: " + entry.getType());
566 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
567 for (int i = 0; i < newslots.length; i++) {
568 Slot currslot = newslots[i];
569 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
570 if (prevslot != null &&
571 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
572 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);