API Changes
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.nio.ByteBuffer;
17
18
19 /**
20  * IoTTable data structure.  Provides client inferface.
21  * @author Brian Demsky
22  * @version 1.0
23  */
24
25 final public class Table {
26         private int numslots;   //number of slots stored in buffer
27
28         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
29         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
30         // machine id -> ...
31         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
32         private Vector<Long> rejectedmessagelist = new Vector<Long>();
33         private SlotBuffer buffer;
34         private CloudComm cloud;
35         private long sequencenumber; //Largest sequence number a client has received
36         private long localmachineid;
37         private TableStatus lastTableStatus;
38         static final int FREE_SLOTS = 10; //number of slots that should be kept free
39         static final int SKIP_THRESHOLD = 10;
40         private long liveslotcount = 0;
41         private int chance;
42         static final double RESIZE_MULTIPLE = 1.2;
43         static final double RESIZE_THRESHOLD = 0.75;
44         static final int REJECTED_THRESHOLD = 5;
45         private int resizethreshold;
46         private long lastliveslotseqn;  //smallest sequence number with a live entry
47         private Random random = new Random();
48         private long lastUncommittedTransaction = 0;
49
50         private int smallestTableStatusSeen = -1;
51         private int largestTableStatusSeen = -1;
52         private int lastSeenPendingTransactionSpeculateIndex = 0;
53         private int commitSequenceNumber = 0;
54         private long localTransactionSequenceNumber = 0;
55
56         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
57         private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
58         private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
59         private Map<Long, Abort> abortMap = null; // Set of the live aborts
60         private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
61         private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
62         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
63         private Map<Long, Transaction> uncommittedTransactionsMap = null;
64         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
65         private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
66         private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
67         private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
68         private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
69         private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
70         private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
71         private List<Commit> pendingCommitsList = null;
72         private List<Commit> pendingCommitsToDelete = null;
73         private Map<Long, LocalComm> localCommunicationChannels;
74         private Map<Long, TransactionStatus> transactionStatusMap = null;
75
76
77         public Table(String hostname, String baseurl, String password, long _localmachineid) {
78                 localmachineid = _localmachineid;
79                 buffer = new SlotBuffer();
80                 numslots = buffer.capacity();
81                 setResizeThreshold();
82                 sequencenumber = 0;
83                 cloud = new CloudComm(this, hostname, baseurl, password);
84                 lastliveslotseqn = 1;
85
86                 setupDataStructs();
87         }
88
89         public Table(CloudComm _cloud, long _localmachineid) {
90                 localmachineid = _localmachineid;
91                 buffer = new SlotBuffer();
92                 numslots = buffer.capacity();
93                 setResizeThreshold();
94                 sequencenumber = 0;
95                 cloud = _cloud;
96
97                 setupDataStructs();
98         }
99
100         private void setupDataStructs() {
101                 pendingTransQueue = new LinkedList<PendingTransaction>();
102                 commitMap = new HashMap<Long, Map<Long, Commit>>();
103                 abortMap = new HashMap<Long, Abort>();
104                 committedMapByKey = new HashMap<IoTString, Commit>();
105                 commitedTable = new HashMap<IoTString, KeyValue>();
106                 speculativeTable = new HashMap<IoTString, KeyValue>();
107                 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
108                 arbitratorTable = new HashMap<IoTString, Long>();
109                 newKeyTable = new HashMap<IoTString, NewKey>();
110                 newCommitMap = new HashMap<Long, Map<Long, Commit>>();
111                 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
112                 lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
113                 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
114                 pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
115                 pendingCommitsList = new LinkedList<Commit>();
116                 pendingCommitsToDelete = new LinkedList<Commit>();
117                 localCommunicationChannels = new HashMap<Long, LocalComm>();
118                 transactionStatusMap = new HashMap<Long, TransactionStatus>();
119         }
120
121         public void initTable() throws ServerException {
122                 cloud.setSalt();//Set the salt
123                 Slot s = new Slot(this, 1, localmachineid);
124                 TableStatus status = new TableStatus(s, numslots);
125                 s.addEntry(status);
126                 Slot[] array = cloud.putSlot(s, numslots);
127                 if (array == null) {
128                         array = new Slot[] {s};
129                         /* update data structure */
130                         validateandupdate(array, true);
131                 } else {
132                         throw new Error("Error on initialization");
133                 }
134         }
135
136         public void rebuild() throws ServerException {
137                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
138                 validateandupdate(newslots, true);
139         }
140
141         // TODO: delete method
142         public void printSlots() {
143                 long o = buffer.getOldestSeqNum();
144                 long n = buffer.getNewestSeqNum();
145
146                 int[] types = new int[10];
147
148                 int num = 0;
149
150                 int livec = 0;
151                 int deadc = 0;
152                 for (long i = o; i < (n + 1); i++) {
153                         Slot s = buffer.getSlot(i);
154
155                         Vector<Entry> entries = s.getEntries();
156
157                         for (Entry e : entries) {
158                                 if (e.isLive()) {
159                                         int type = e.getType();
160                                         types[type] = types[type] + 1;
161                                         num++;
162                                         livec++;
163                                 } else {
164                                         deadc++;
165                                 }
166                         }
167                 }
168
169                 for (int i = 0; i < 10; i++) {
170                         System.out.println(i + "    " + types[i]);
171                 }
172                 System.out.println("Live count:   " + livec);
173                 System.out.println("Dead count:   " + deadc);
174                 System.out.println("Old:   " + o);
175                 System.out.println("New:   " + n);
176                 System.out.println("Size:   " + buffer.size());
177                 System.out.println("Commits Key Map:   " + commitedTable.size());
178                 // System.out.println("Commits Live Map:   " + commitMap.size());
179                 System.out.println("Pending:   " + pendingTransQueue.size());
180
181                 // List<IoTString> strList = new ArrayList<IoTString>();
182                 // for (int i = 0; i < 100; i++) {
183                 //      String keyA = "a" + i;
184                 //      String keyB = "b" + i;
185                 //      String keyC = "c" + i;
186                 //      String keyD = "d" + i;
187
188                 //      IoTString iKeyA = new IoTString(keyA);
189                 //      IoTString iKeyB = new IoTString(keyB);
190                 //      IoTString iKeyC = new IoTString(keyC);
191                 //      IoTString iKeyD = new IoTString(keyD);
192
193                 //      strList.add(iKeyA);
194                 //      strList.add(iKeyB);
195                 //      strList.add(iKeyC);
196                 //      strList.add(iKeyD);
197                 // }
198
199
200                 // for (Long l : commitMap.keySet()) {
201                 //      for (Long l2 : commitMap.get(l).keySet()) {
202                 //              for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
203                 //                      strList.remove(kv.getKey());
204                 //                      System.out.print(kv.getKey() + "    ");
205                 //              }
206                 //      }
207                 // }
208
209                 // System.out.println();
210                 // System.out.println();
211
212                 // for (IoTString s : strList) {
213                 //      System.out.print(s + "    ");
214                 // }
215                 // System.out.println();
216                 // System.out.println(strList.size());
217         }
218
219         public long getId() {
220                 return localmachineid;
221         }
222
223         public boolean hasConnection() {
224                 return cloud.hasConnection();
225         }
226
227         public String toString() {
228                 String retString = " Committed Table: \n";
229                 retString += "---------------------------\n";
230                 retString += commitedTable.toString();
231
232                 retString += "\n\n";
233
234                 retString += " Speculative Table: \n";
235                 retString += "---------------------------\n";
236                 retString += speculativeTable.toString();
237
238                 return retString;
239         }
240
241         public void addLocalComm(long machineId, LocalComm lc) {
242                 localCommunicationChannels.put(machineId, lc);
243         }
244         public Long getArbitrator(IoTString key) {
245                 return arbitratorTable.get(key);
246         }
247
248         public IoTString getCommitted(IoTString key) {
249                 KeyValue kv = commitedTable.get(key);
250                 if (kv != null) {
251                         return kv.getValue();
252                 } else {
253                         return null;
254                 }
255         }
256
257         public IoTString getSpeculative(IoTString key) {
258                 KeyValue kv = pendingTransSpeculativeTable.get(key);
259
260                 if (kv == null) {
261                         kv = speculativeTable.get(key);
262                 }
263
264                 if (kv == null) {
265                         kv = commitedTable.get(key);
266                 }
267
268                 if (kv != null) {
269                         return kv.getValue();
270                 } else {
271                         return null;
272                 }
273         }
274
275         public IoTString getCommittedAtomic(IoTString key) {
276                 KeyValue kv = commitedTable.get(key);
277
278                 if (arbitratorTable.get(key) == null) {
279                         throw new Error("Key not Found.");
280                 }
281
282                 // Make sure new key value pair matches the current arbitrator
283                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
284                         // TODO: Maybe not throw en error
285                         throw new Error("Not all Key Values Match Arbitrator.");
286                 }
287
288                 if (kv != null) {
289                         pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
290                         return kv.getValue();
291                 } else {
292                         pendingTransBuild.addKVGuard(new KeyValue(key, null));
293                         return null;
294                 }
295         }
296
297         public IoTString getSpeculativeAtomic(IoTString key) {
298
299                 if (arbitratorTable.get(key) == null) {
300                         throw new Error("Key not Found.");
301                 }
302
303                 // Make sure new key value pair matches the current arbitrator
304                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
305                         // TODO: Maybe not throw en error
306                         throw new Error("Not all Key Values Match Arbitrator.");
307                 }
308
309                 KeyValue kv = pendingTransSpeculativeTable.get(key);
310
311                 if (kv == null) {
312                         kv = speculativeTable.get(key);
313                 }
314
315                 if (kv == null) {
316                         kv = commitedTable.get(key);
317                 }
318
319                 if (kv != null) {
320                         pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
321                         return kv.getValue();
322                 } else {
323                         pendingTransBuild.addKVGuard(new KeyValue(key, null));
324                         return null;
325                 }
326         }
327
328         public void update() {
329                 try {
330                         Slot[] newslots = cloud.getSlots(sequencenumber + 1);
331                         validateandupdate(newslots, false);
332
333                         if (!pendingTransQueue.isEmpty()) {
334
335                                 // We have a pending transaction so do full insertion
336                                 processPendingTrans();
337                         } else {
338
339                                 // We dont have a pending transaction so do minimal effort
340                                 updateWithNotPendingTrans();
341                         }
342
343                 } catch (Exception e) {
344                         // could not update so do nothing
345                 }
346         }
347
348         public void startTransaction() {
349                 // Create a new transaction, invalidates any old pending transactions.
350                 pendingTransBuild = new PendingTransaction();
351         }
352
353         public void addKV(IoTString key, IoTString value) {
354
355                 if (arbitratorTable.get(key) == null) {
356                         throw new Error("Key not Found.");
357                 }
358
359                 // Make sure new key value pair matches the current arbitrator
360                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
361                         // TODO: Maybe not throw en error
362                         throw new Error("Not all Key Values Match Arbitrator.");
363                 }
364
365                 KeyValue kv = new KeyValue(key, value);
366                 pendingTransBuild.addKV(kv);
367         }
368
369         public TransactionStatus commitTransaction() {
370
371                 if (pendingTransBuild.getKVUpdates().size() == 0) {
372
373                         // transaction with no updates will have no effect on the system
374                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
375                 }
376
377                 TransactionStatus transStatus = null;
378
379                 if (pendingTransBuild.getArbitrator() != localmachineid) {
380
381                         // set the local sequence number so we can recognize this transaction later
382                         pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
383                         localTransactionSequenceNumber++;
384
385                         transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
386                         transactionStatusMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
387
388                         // Add the pending transaction to the queue
389                         pendingTransQueue.add(pendingTransBuild);
390
391
392                         for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
393                                 PendingTransaction pt = pendingTransQueue.get(i);
394
395                                 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
396
397                                         lastSeenPendingTransactionSpeculateIndex = i;
398
399                                         for (KeyValue kv : pt.getKVUpdates()) {
400                                                 pendingTransSpeculativeTable.put(kv.getKey(), kv);
401                                         }
402
403                                 }
404                         }
405                 } else {
406                         Transaction ut = new Transaction(null,
407                                                          -1,
408                                                          localmachineid,
409                                                          pendingTransBuild.getArbitrator(),
410                                                          pendingTransBuild.getKVUpdates(),
411                                                          pendingTransBuild.getKVGuard());
412
413                         Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
414
415                         if (retData.getFirst()) {
416                                 transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
417                         } else {
418                                 transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
419                         }
420                 }
421
422                 // Try to insert transactions if possible
423                 if (!pendingTransQueue.isEmpty()) {
424                         // We have a pending transaction so do full insertion
425                         processPendingTrans();
426                 } else {
427                         try {
428                                 // We dont have a pending transaction so do minimal effort
429                                 updateWithNotPendingTrans();
430                         } catch (Exception e) {
431                                 // Do nothing
432                         }
433                 }
434
435                 // reset it so next time is fresh
436                 pendingTransBuild = new PendingTransaction();
437
438                 return transStatus;
439         }
440
441         public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
442
443                 while (true) {
444                         if (arbitratorTable.get(keyName) != null) {
445                                 // There is already an arbitrator
446                                 return false;
447                         }
448
449                         if (tryput(keyName, machineId, false)) {
450                                 // If successfully inserted
451                                 return true;
452                         }
453                 }
454         }
455
456         private void processPendingTrans() {
457
458                 boolean sentAllPending = false;
459                 try {
460                         while (!pendingTransQueue.isEmpty()) {
461                                 if (tryput( pendingTransQueue.peek(), false)) {
462                                         pendingTransQueue.poll();
463                                 }
464                         }
465
466                         // if got here then all pending transactions were sent
467                         sentAllPending = true;
468                 } catch (Exception e) {
469                         // There was a connection error
470                         e.printStackTrace();
471                         sentAllPending = false;
472                 }
473
474
475                 if (!sentAllPending) {
476
477                         for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
478                                 PendingTransaction pt = i.next();
479                                 LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
480                                 if (lc == null) {
481                                         // Cant talk directly to arbitrator so cant do anything
482                                         continue;
483                                 }
484
485
486                                 Transaction ut = new Transaction(null,
487                                                                  -1,
488                                                                  localmachineid,
489                                                                  pendingTransBuild.getArbitrator(),
490                                                                  pendingTransBuild.getKVUpdates(),
491                                                                  pendingTransBuild.getKVGuard());
492
493
494                                 Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
495
496                                 for (Commit commit : retData.getSecond()) {
497                                         // Prepare to process the commit
498                                         processEntry(commit);
499                                 }
500
501                                 boolean didCommitOrSpeculate = proccessAllNewCommits();
502
503                                 // Go through all uncommitted transactions and kill the ones that are dead
504                                 deleteDeadUncommittedTransactions();
505
506                                 // Speculate on key value pairs
507                                 didCommitOrSpeculate |= createSpeculativeTable();
508                                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
509
510
511                                 if (retData.getFirst()) {
512                                         TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
513                                         if (transStatus != null) {
514                                                 transStatus.setStatus(TransactionStatus.StatusCommitted);
515                                         }
516
517                                 } else {
518                                         TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
519                                         if (transStatus != null) {
520                                                 transStatus.setStatus(TransactionStatus.StatusAborted);
521                                         }
522                                 }
523                         }
524                 }
525         }
526
527         private void updateWithNotPendingTrans() throws ServerException {
528
529                 boolean doEnd = false;
530                 boolean needResize = false;
531                 while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0)  || (pendingCommitsList.size() > 0))   ) {
532                         boolean resize = needResize;
533                         needResize = false;
534
535                         Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
536                         int newsize = 0;
537                         if (liveslotcount > resizethreshold) {
538                                 resize = true; //Resize is forced
539                         }
540
541                         if (resize) {
542                                 newsize = (int) (numslots * RESIZE_MULTIPLE);
543                                 TableStatus status = new TableStatus(s, newsize);
544                                 s.addEntry(status);
545                         }
546
547                         doRejectedMessages(s);
548
549                         ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
550
551                         // Resize was needed so redo call
552                         if (retTup.getFirst()) {
553                                 needResize = true;
554                                 continue;
555                         }
556
557                         // Extract working variables
558                         boolean seenliveslot = retTup.getSecond();
559                         long seqn = retTup.getThird();
560
561                         // Did need to arbitrate
562                         doEnd = !doArbitration(s);
563
564                         doOptionalRescue(s, seenliveslot, seqn, resize);
565
566                         int max = 0;
567                         if (resize) {
568                                 max = newsize;
569                         }
570
571                         Slot[] array = cloud.putSlot(s, max);
572                         if (array == null) {
573                                 array = new Slot[] {s};
574                                 rejectedmessagelist.clear();
575
576                                 // Delete pending commits that were sent to the cloud
577                                 deletePendingCommits();
578
579                         }       else {
580                                 if (array.length == 0)
581                                         throw new Error("Server Error: Did not send any slots");
582                                 rejectedmessagelist.add(s.getSequenceNumber());
583                                 doEnd = false;
584                         }
585
586                         /* update data structure */
587                         validateandupdate(array, true);
588                 }
589         }
590
591         private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
592
593                 // encode the request
594                 byte[] array = new byte[Long.BYTES + ut.getSize()];
595                 ByteBuffer bbEncode = ByteBuffer.wrap(array);
596                 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
597                 if (lastSeenCommit != null) {
598                         bbEncode.putLong(lastSeenCommit);
599                 } else {
600                         bbEncode.putLong(0);
601                 }
602                 ut.encode(bbEncode);
603
604                 byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
605
606                 // Decode the data
607                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
608                 boolean didCommit = bbDecode.get() == 1;
609                 int numberOfCommites = bbDecode.getInt();
610
611                 List<Commit> newCommits = new LinkedList<Commit>();
612                 for (int i = 0; i < numberOfCommites; i++ ) {
613                         bbDecode.get();
614                         Commit com = (Commit)Commit.decode(null, bbDecode);
615                         newCommits.add(com);
616                 }
617
618                 return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
619         }
620
621         public byte[] localCommInput(byte[] data) {
622
623                 // Decode the data
624                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
625                 long lastSeenCommit = bbDecode.getLong();
626                 bbDecode.get();
627                 Transaction ut = (Transaction)Transaction.decode(null, bbDecode);
628
629                 // Do the local update and arbitrate
630                 Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
631
632                 // Calculate the size of the response
633                 int size = Byte.BYTES + Integer.BYTES;
634                 for (Commit com : returnData.getSecond()) {
635                         size += com.getSize();
636                 }
637
638                 // encode the response
639                 byte[] array = new byte[size];
640                 ByteBuffer bbEncode = ByteBuffer.wrap(array);
641                 if (returnData.getFirst()) {
642                         bbEncode.put((byte)1);
643                 } else {
644                         bbEncode.put((byte)0);
645                 }
646                 bbEncode.putInt(returnData.getSecond().size());
647
648                 for (Commit com : returnData.getSecond()) {
649                         com.encode(bbEncode);
650                 }
651
652                 return bbEncode.array();
653         }
654
655         private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
656
657                 if (ut.getArbitrator() != localmachineid) {
658                         // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
659                         return null;
660                 }
661
662                 List<Commit> returnCommits = new ArrayList<Commit>();
663
664                 if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
665                         // There is a commit that the other client has not seen yet
666
667                         Map<Long, Commit> cm = commitMap.get(localmachineid);
668                         if (cm != null) {
669
670                                 List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
671                                 Collections.sort(commitKeys);
672
673
674                                 for (int i = (commitKeys.size() - 1); i >= 0; i--) {
675                                         Commit com = cm.get(commitKeys.get(i));
676
677                                         if (com.getSequenceNumber() <= lastCommitSeen) {
678                                                 break;
679                                         }
680                                         returnCommits.add((Commit)com.getCopy(null));
681                                 }
682                         }
683                 }
684
685                 if (!ut.evaluateGuard(commitedTable, null)) {
686                         // Guard evaluated as false so return only the commits that the other device has not seen yet
687                         return new Pair<Boolean, List<Commit>>(false, returnCommits);
688                 }
689
690                 // create the commit
691                 Commit commit = new Commit(null,
692                                            -1,
693                                            commitSequenceNumber,
694                                            ut.getArbitrator(),
695                                            ut.getkeyValueUpdateSet());
696                 commitSequenceNumber = commitSequenceNumber + 1;
697
698                 // Add to the pending commits list
699                 pendingCommitsList.add(commit);
700
701                 // Add this commit so we can send it back
702                 returnCommits.add(commit);
703
704                 // Prepare to process the commit
705                 processEntry(commit);
706
707                 boolean didCommitOrSpeculate = proccessAllNewCommits();
708
709                 // Go through all uncommitted transactions and kill the ones that are dead
710                 deleteDeadUncommittedTransactions();
711
712                 // Speculate on key value pairs
713                 didCommitOrSpeculate |= createSpeculativeTable();
714                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
715
716                 return new Pair<Boolean, List<Commit>>(true, returnCommits);
717         }
718
719         public void decrementLiveCount() {
720                 liveslotcount--;
721         }
722
723         private void setResizeThreshold() {
724                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
725                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
726         }
727
728         private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
729                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
730
731                 int newsize = 0;
732                 if (liveslotcount > resizethreshold) {
733                         resize = true; //Resize is forced
734                 }
735
736                 if (resize) {
737                         newsize = (int) (numslots * RESIZE_MULTIPLE);
738                         TableStatus status = new TableStatus(s, newsize);
739                         s.addEntry(status);
740                 }
741
742                 doRejectedMessages(s);
743
744                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
745
746                 // Resize was needed so redo call
747                 if (retTup.getFirst()) {
748                         return tryput(pendingTrans, true);
749                 }
750
751                 // Extract working variables
752                 boolean seenliveslot = retTup.getSecond();
753                 long seqn = retTup.getThird();
754
755                 doArbitration(s);
756
757                 Transaction trans = new Transaction(s,
758                                                     s.getSequenceNumber(),
759                                                     localmachineid,
760                                                     pendingTrans.getArbitrator(),
761                                                     pendingTrans.getKVUpdates(),
762                                                     pendingTrans.getKVGuard());
763                 boolean insertedTrans = false;
764                 if (s.hasSpace(trans)) {
765                         s.addEntry(trans);
766                         insertedTrans = true;
767                 }
768
769                 doOptionalRescue(s, seenliveslot, seqn, resize);
770                 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
771
772                 if (sendRetData.getFirst()) {
773                         // update the status and change what the sequence number is for the
774                         TransactionStatus transStatus = transactionStatusMap.remove(pendingTrans.getMachineLocalTransSeqNum());
775                         transStatus.setStatus(TransactionStatus.StatusSent);
776                         transStatus.setSentTransaction();
777                         transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
778                 }
779
780
781                 if (sendRetData.getSecond().length != 0) {
782                         // insert into the local block chain
783                         validateandupdate(sendRetData.getSecond(), true);
784                 }
785
786                 return sendRetData.getFirst();
787         }
788
789         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
790                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
791                 int newsize = 0;
792                 if (liveslotcount > resizethreshold) {
793                         resize = true; //Resize is forced
794                 }
795
796                 if (resize) {
797                         newsize = (int) (numslots * RESIZE_MULTIPLE);
798                         TableStatus status = new TableStatus(s, newsize);
799                         s.addEntry(status);
800                 }
801
802                 doRejectedMessages(s);
803                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
804
805                 // Resize was needed so redo call
806                 if (retTup.getFirst()) {
807                         return tryput(keyName, arbMachineid, true);
808                 }
809
810                 // Extract working variables
811                 boolean seenliveslot = retTup.getSecond();
812                 long seqn = retTup.getThird();
813
814                 doArbitration(s);
815
816                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
817
818                 boolean insertedNewKey = false;
819                 if (s.hasSpace(newKey)) {
820                         s.addEntry(newKey);
821                         insertedNewKey = true;
822                 }
823
824                 doOptionalRescue(s, seenliveslot, seqn, resize);
825                 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
826
827                 if (sendRetData.getSecond().length != 0) {
828                         // insert into the local block chain
829                         validateandupdate(sendRetData.getSecond(), true);
830                 }
831
832                 return sendRetData.getFirst();
833         }
834
835         private void doRejectedMessages(Slot s) {
836                 if (! rejectedmessagelist.isEmpty()) {
837                         /* TODO: We should avoid generating a rejected message entry if
838                          * there is already a sufficient entry in the queue (e.g.,
839                          * equalsto value of true and same sequence number).  */
840
841                         long old_seqn = rejectedmessagelist.firstElement();
842                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
843                                 long new_seqn = rejectedmessagelist.lastElement();
844                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
845                                 s.addEntry(rm);
846                         } else {
847                                 long prev_seqn = -1;
848                                 int i = 0;
849                                 /* Go through list of missing messages */
850                                 for (; i < rejectedmessagelist.size(); i++) {
851                                         long curr_seqn = rejectedmessagelist.get(i);
852                                         Slot s_msg = buffer.getSlot(curr_seqn);
853                                         if (s_msg != null)
854                                                 break;
855                                         prev_seqn = curr_seqn;
856                                 }
857                                 /* Generate rejected message entry for missing messages */
858                                 if (prev_seqn != -1) {
859                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
860                                         s.addEntry(rm);
861                                 }
862                                 /* Generate rejected message entries for present messages */
863                                 for (; i < rejectedmessagelist.size(); i++) {
864                                         long curr_seqn = rejectedmessagelist.get(i);
865                                         Slot s_msg = buffer.getSlot(curr_seqn);
866                                         long machineid = s_msg.getMachineID();
867                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
868                                         s.addEntry(rm);
869                                 }
870                         }
871                 }
872         }
873
874         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
875                 long newestseqnum = buffer.getNewestSeqNum();
876                 long oldestseqnum = buffer.getOldestSeqNum();
877                 if (lastliveslotseqn < oldestseqnum)
878                         lastliveslotseqn = oldestseqnum;
879
880                 long seqn = lastliveslotseqn;
881                 boolean seenliveslot = false;
882                 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
883                 long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
884
885
886                 // Mandatory Rescue
887                 for (; seqn < threshold; seqn++) {
888                         Slot prevslot = buffer.getSlot(seqn);
889                         // Push slot number forward
890                         if (! seenliveslot)
891                                 lastliveslotseqn = seqn;
892
893                         if (! prevslot.isLive())
894                                 continue;
895                         seenliveslot = true;
896                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
897                         for (Entry liveentry : liveentries) {
898                                 if (s.hasSpace(liveentry)) {
899                                         s.addEntry(liveentry);
900                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
901                                         if (!resize) {
902                                                 System.out.println("B"); //?
903                                                 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
904                                         }
905                                 }
906                         }
907                 }
908
909                 // Did not resize
910                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
911         }
912
913         private boolean doArbitration(Slot s) {
914
915                 // flag whether we have finished all arbitration
916                 boolean stillHasArbitration = false;
917
918                 pendingCommitsToDelete.clear();
919
920                 // First add queue commits
921                 for (Commit commit : pendingCommitsList) {
922                         if (s.hasSpace(commit)) {
923                                 s.addEntry(commit);
924                                 pendingCommitsToDelete.add(commit);
925                         } else {
926                                 // Ran out of space so move on but still not done
927                                 stillHasArbitration = true;
928                                 return stillHasArbitration;
929                         }
930                 }
931
932                 // Arbitrate
933                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
934                 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
935
936                 // Sort from oldest to newest
937                 Collections.sort(transSeqNums);
938
939                 for (Long transNum : transSeqNums) {
940                         Transaction ut = uncommittedTransactionsMap.get(transNum);
941
942                         // Check if this machine arbitrates for this transaction
943                         if (ut.getArbitrator() != localmachineid ) {
944                                 continue;
945                         }
946
947                         // we did have something to arbitrate on
948                         stillHasArbitration = true;
949
950                         Entry newEntry = null;
951
952                         if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
953                                 // Guard evaluated as true
954
955                                 // update the local tmp current key set
956                                 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
957                                         speculativeTableTmp.put(kv.getKey(), kv);
958                                 }
959
960                                 // create the commit
961                                 newEntry = new Commit(s,
962                                                       ut.getSequenceNumber(),
963                                                       commitSequenceNumber,
964                                                       ut.getArbitrator(),
965                                                       ut.getkeyValueUpdateSet());
966                                 commitSequenceNumber = commitSequenceNumber + 1;
967                         } else {
968                                 // Guard was false
969
970                                 // create the abort
971                                 newEntry = new Abort(s,
972                                                      ut.getSequenceNumber(),
973                                                      ut.getMachineID(),
974                                                      ut.getArbitrator());
975                         }
976
977                         if ((newEntry != null) && s.hasSpace(newEntry)) {
978                                 s.addEntry(newEntry);
979                         } else {
980                                 break;
981                         }
982                 }
983
984                 return stillHasArbitration;
985         }
986
987         private void deletePendingCommits() {
988                 for (Commit com : pendingCommitsToDelete) {
989                         pendingCommitsList.remove(com);
990                 }
991                 pendingCommitsToDelete.clear();
992         }
993
994         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
995                 /* now go through live entries from least to greatest sequence number until
996                  * either all live slots added, or the slot doesn't have enough room
997                  * for SKIP_THRESHOLD consecutive entries*/
998                 int skipcount = 0;
999                 long newestseqnum = buffer.getNewestSeqNum();
1000                 search:
1001                 for (; seqn <= newestseqnum; seqn++) {
1002                         Slot prevslot = buffer.getSlot(seqn);
1003                         //Push slot number forward
1004                         if (!seenliveslot)
1005                                 lastliveslotseqn = seqn;
1006
1007                         if (!prevslot.isLive())
1008                                 continue;
1009                         seenliveslot = true;
1010                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1011                         for (Entry liveentry : liveentries) {
1012                                 if (s.hasSpace(liveentry))
1013                                         s.addEntry(liveentry);
1014                                 else {
1015                                         skipcount++;
1016                                         if (skipcount > SKIP_THRESHOLD)
1017                                                 break search;
1018                                 }
1019                         }
1020                 }
1021         }
1022
1023         private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
1024                 int max = 0;
1025                 if (resize)
1026                         max = newsize;
1027
1028                 Slot[] array = cloud.putSlot(s, max);
1029                 if (array == null) {
1030                         array = new Slot[] {s};
1031                         rejectedmessagelist.clear();
1032
1033                         // Delete pending commits that were sent to the cloud
1034                         deletePendingCommits();
1035                 }       else {
1036                         // if (array.length == 0)
1037                         // throw new Error("Server Error: Did not send any slots");
1038                         rejectedmessagelist.add(s.getSequenceNumber());
1039                         inserted = false;
1040                 }
1041
1042                 return new Pair<Boolean, Slot[]>(inserted, array);
1043         }
1044
1045         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
1046                 /* The cloud communication layer has checked slot HMACs already
1047                          before decoding */
1048                 if (newslots.length == 0) return;
1049
1050                 // Reset the table status declared sizes
1051                 smallestTableStatusSeen = -1;
1052                 largestTableStatusSeen = -1;
1053
1054                 long firstseqnum = newslots[0].getSequenceNumber();
1055                 if (firstseqnum <= sequencenumber) {
1056                         throw new Error("Server Error: Sent older slots!");
1057                 }
1058
1059                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
1060                 checkHMACChain(indexer, newslots);
1061
1062                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
1063
1064                 // initExpectedSize(firstseqnum);
1065                 for (Slot slot : newslots) {
1066                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
1067                         // updateExpectedSize();
1068                 }
1069
1070                 /* If there is a gap, check to see if the server sent us everything. */
1071                 if (firstseqnum != (sequencenumber + 1)) {
1072
1073                         // TODO: Check size
1074                         checkNumSlots(newslots.length);
1075                         if (!machineSet.isEmpty()) {
1076                                 throw new Error("Missing record for machines: " + machineSet);
1077                         }
1078                 }
1079
1080
1081                 commitNewMaxSize();
1082
1083                 /* Commit new to slots. */
1084                 for (Slot slot : newslots) {
1085                         buffer.putSlot(slot);
1086                         liveslotcount++;
1087                 }
1088                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
1089
1090                 // Process all on key value pairs
1091                 boolean didCommitOrSpeculate = proccessAllNewCommits();
1092
1093                 // Go through all uncommitted transactions and kill the ones that are dead
1094                 deleteDeadUncommittedTransactions();
1095
1096                 // Speculate on key value pairs
1097                 didCommitOrSpeculate |= createSpeculativeTable();
1098
1099                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
1100         }
1101
1102         private boolean proccessAllNewCommits() {
1103                 // Process only if there are commit
1104                 if (newCommitMap.keySet().size() == 0) {
1105                         return false;
1106                 }
1107                 boolean didProcessNewCommit = false;
1108
1109                 for (Long arb : newCommitMap.keySet()) {
1110
1111                         List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
1112
1113                         // Sort from oldest to newest commit
1114                         Collections.sort(commitSeqNums);
1115
1116                         // Go through each new commit one by one
1117                         for (Long entrySeqNum : commitSeqNums) {
1118                                 Commit entry = newCommitMap.get(arb).get(entrySeqNum);
1119
1120                                 long lastCommitSeenSeqNum = -1;
1121                                 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
1122                                         lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
1123                                 }
1124
1125                                 if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
1126                                         Map<Long, Commit> cm = commitMap.get(arb);
1127                                         if (cm == null) {
1128                                                 cm = new HashMap<Long, Commit>();
1129                                         }
1130
1131                                         Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
1132                                         commitMap.put(arb, cm);
1133
1134                                         if (prevCommit != null) {
1135                                                 prevCommit.setDead();
1136
1137                                                 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
1138                                                         committedMapByKey.put(kv.getKey(), entry);
1139                                                 }
1140                                         }
1141
1142                                         continue;
1143                                 }
1144
1145                                 Set<Commit> commitsToEditSet = new HashSet<Commit>();
1146
1147                                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1148                                         commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
1149                                 }
1150
1151                                 commitsToEditSet.remove(null);
1152
1153                                 for (Commit prevCommit : commitsToEditSet) {
1154
1155                                         Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
1156
1157                                         if (!prevCommit.isLive()) {
1158                                                 Map<Long, Commit> cm = commitMap.get(arb);
1159
1160                                                 // remove it from the map so that it can be set as dead
1161                                                 if (cm != null) {
1162                                                         cm.remove(prevCommit.getSequenceNumber());
1163                                                         commitMap.put(arb, cm);
1164                                                 }
1165                                         }
1166                                 }
1167
1168                                 // Add the new commit
1169                                 Map<Long, Commit> cm = commitMap.get(arb);
1170                                 if (cm == null) {
1171                                         cm = new HashMap<Long, Commit>();
1172                                 }
1173                                 cm.put(entry.getSequenceNumber(), entry);
1174                                 commitMap.put(arb, cm);
1175
1176                                 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
1177
1178                                 // set the trans sequence number if we are able to
1179                                 if (entry.getTransSequenceNumber() != -1) {
1180                                         lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1181                                 }
1182
1183                                 didProcessNewCommit = true;
1184
1185                                 // Update the committed table list
1186                                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1187                                         IoTString key = kv.getKey();
1188                                         commitedTable.put(key, kv);
1189                                         committedMapByKey.put(key, entry);
1190                                 }
1191                         }
1192                 }
1193                 // Clear the new commits storage so we can use it later
1194                 newCommitMap.clear();
1195
1196                 // go through all saved transactions and update the status of those that can be updated
1197                 for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
1198                         Map.Entry<Long, TransactionStatus> entry = i.next();
1199                         long seqnum = entry.getKey();
1200                         TransactionStatus status = entry.getValue();
1201
1202                         if ( status.getSentTransaction() && (lastCommitSeenTransSeqNumMap.get(status.getArbitrator()) != null) && (seqnum <= lastCommitSeenTransSeqNumMap.get(status.getArbitrator()))) {
1203                                 status.setStatus(TransactionStatus.StatusCommitted);
1204                                 i.remove();
1205                         }
1206                 }
1207
1208                 return didProcessNewCommit;
1209         }
1210
1211         private void deleteDeadUncommittedTransactions() {
1212                 // Make dead the transactions
1213                 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
1214                         Transaction prevtrans = i.next().getValue();
1215                         long transArb = prevtrans.getArbitrator();
1216
1217                         Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
1218                         Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
1219
1220                         if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
1221                                 ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
1222                                 i.remove();
1223                                 prevtrans.setDead();
1224                         }
1225                 }
1226         }
1227
1228         private boolean createSpeculativeTable() {
1229                 if (uncommittedTransactionsMap.keySet().size() == 0) {
1230                         return false;
1231                 }
1232
1233                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1234                 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
1235
1236                 // Sort from oldest to newest commit
1237                 Collections.sort(utSeqNums);
1238
1239                 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
1240
1241                         speculativeTable.clear();
1242                         lastUncommittedTransaction = -1;
1243
1244                         for (Long key : utSeqNums) {
1245                                 Transaction trans = uncommittedTransactionsMap.get(key);
1246
1247                                 lastUncommittedTransaction = key;
1248
1249                                 if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
1250                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1251                                                 speculativeTableTmp.put(kv.getKey(), kv);
1252                                         }
1253                                 }
1254
1255                         }
1256                 } else {
1257                         for (Long key : utSeqNums) {
1258
1259                                 if (key <= lastUncommittedTransaction) {
1260                                         continue;
1261                                 }
1262
1263                                 lastUncommittedTransaction = key;
1264
1265                                 Transaction trans = uncommittedTransactionsMap.get(key);
1266
1267                                 if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
1268                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1269                                                 speculativeTableTmp.put(kv.getKey(), kv);
1270                                         }
1271                                 }
1272                         }
1273                 }
1274
1275                 for (IoTString key : speculativeTableTmp.keySet()) {
1276                         speculativeTable.put(key, speculativeTableTmp.get(key));
1277                 }
1278
1279                 return true;
1280         }
1281
1282         private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
1283
1284                 if (didCommitOrSpeculate) {
1285                         pendingTransSpeculativeTable.clear();
1286                         lastSeenPendingTransactionSpeculateIndex = 0;
1287
1288                         int index = 0;
1289                         for (PendingTransaction pt : pendingTransQueue) {
1290                                 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
1291
1292                                         lastSeenPendingTransactionSpeculateIndex = index;
1293                                         index++;
1294
1295                                         for (KeyValue kv : pt.getKVUpdates()) {
1296                                                 pendingTransSpeculativeTable.put(kv.getKey(), kv);
1297                                         }
1298
1299                                 }
1300                         }
1301                 }
1302         }
1303
1304         private int expectedsize, currmaxsize;
1305
1306         private void checkNumSlots(int numslots) {
1307
1308
1309                 // We only have 1 size so we must have this many slots
1310                 if (largestTableStatusSeen == smallestTableStatusSeen) {
1311                         if (numslots != smallestTableStatusSeen) {
1312                                 throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numslots);
1313                         }
1314                 } else {
1315                         // We have more than 1
1316                         if (numslots < smallestTableStatusSeen) {
1317                                 throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
1318                         }
1319                 }
1320
1321                 // if (numslots != expectedsize) {
1322                 // throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
1323                 // }
1324         }
1325
1326         private void initExpectedSize(long firstsequencenumber) {
1327                 long prevslots = firstsequencenumber;
1328                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
1329                 currmaxsize = numslots;
1330         }
1331
1332         private void updateExpectedSize() {
1333                 expectedsize++;
1334                 if (expectedsize > currmaxsize) {
1335                         expectedsize = currmaxsize;
1336                 }
1337         }
1338
1339         private void updateCurrMaxSize(int newmaxsize) {
1340                 currmaxsize = newmaxsize;
1341         }
1342
1343         private void commitNewMaxSize() {
1344
1345                 if (largestTableStatusSeen == -1) {
1346                         currmaxsize = numslots;
1347                 } else {
1348                         currmaxsize = largestTableStatusSeen;
1349                 }
1350
1351                 if (numslots != currmaxsize) {
1352                         buffer.resize(currmaxsize);
1353                 }
1354
1355                 numslots = currmaxsize;
1356                 setResizeThreshold();
1357         }
1358
1359         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
1360                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
1361         }
1362
1363         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
1364                 long oldseqnum = entry.getOldSeqNum();
1365                 long newseqnum = entry.getNewSeqNum();
1366                 boolean isequal = entry.getEqual();
1367                 long machineid = entry.getMachineID();
1368                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
1369                         Slot slot = indexer.getSlot(seqnum);
1370                         if (slot != null) {
1371                                 long slotmachineid = slot.getMachineID();
1372                                 if (isequal != (slotmachineid == machineid)) {
1373                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
1374                                 }
1375                         }
1376                 }
1377
1378                 HashSet<Long> watchset = new HashSet<Long>();
1379                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
1380                         long entry_mid = lastmsg_entry.getKey();
1381                         /* We've seen it, don't need to continue to watch.  Our next
1382                          * message will implicitly acknowledge it. */
1383                         if (entry_mid == localmachineid)
1384                                 continue;
1385                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
1386                         long entry_seqn = v.getFirst();
1387                         if (entry_seqn < newseqnum) {
1388                                 addWatchList(entry_mid, entry);
1389                                 watchset.add(entry_mid);
1390                         }
1391                 }
1392                 if (watchset.isEmpty())
1393                         entry.setDead();
1394                 else
1395                         entry.setWatchSet(watchset);
1396         }
1397
1398         private void processEntry(NewKey entry) {
1399                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
1400
1401                 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
1402
1403                 if (oldNewKey != null) {
1404                         oldNewKey.setDead();
1405                 }
1406         }
1407
1408         private void processEntry(Transaction entry) {
1409
1410                 long arb = entry.getArbitrator();
1411                 Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
1412                 Long abLast = lastAbortSeenSeqNumMap.get(arb);
1413
1414                 Transaction prevTrans = null;
1415
1416                 if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
1417                         prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1418                 } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
1419                         prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1420                 } else {
1421                         prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
1422                 }
1423
1424                 // Duplicate so delete old copy
1425                 if (prevTrans != null) {
1426                         prevTrans.setDead();
1427                 }
1428         }
1429
1430         private void processEntry(Abort entry) {
1431                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
1432                         // Abort has not been seen yet so we need to keep track of it
1433
1434                         Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
1435                         if (prevAbort != null) {
1436                                 prevAbort.setDead(); // delete old version of the duplicate
1437                         }
1438
1439                         if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
1440                                 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1441                         }
1442                 } else {
1443                         // The machine already saw this so it is dead
1444                         entry.setDead();
1445                 }
1446
1447                 // Update the status of the transaction and remove it since we are done with this transaction
1448                 TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
1449                 if (status != null) {
1450                         status.setStatus(TransactionStatus.StatusAborted);
1451                 }
1452         }
1453
1454         private void processEntry(Commit entry) {
1455                 Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
1456
1457                 if (arbMap == null) {
1458                         arbMap = new HashMap<Long, Commit>();
1459                 }
1460
1461                 Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
1462                 newCommitMap.put(entry.getTransArbitrator(), arbMap);
1463
1464                 if (prevCommit != null) {
1465                         prevCommit.setDead();
1466                 }
1467         }
1468
1469         private void processEntry(TableStatus entry) {
1470                 int newnumslots = entry.getMaxSlots();
1471                 // updateCurrMaxSize(newnumslots);
1472                 if (lastTableStatus != null)
1473                         lastTableStatus.setDead();
1474                 lastTableStatus = entry;
1475
1476                 if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
1477                         smallestTableStatusSeen = newnumslots;
1478                 }
1479
1480                 if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
1481                         largestTableStatusSeen = newnumslots;
1482                 }
1483         }
1484
1485         private void addWatchList(long machineid, RejectedMessage entry) {
1486                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1487                 if (entries == null)
1488                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1489                 entries.add(entry);
1490         }
1491
1492         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1493                 machineSet.remove(machineid);
1494
1495                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1496                 if (watchset != null) {
1497                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1498                                 RejectedMessage rm = rmit.next();
1499                                 if (rm.getNewSeqNum() <= seqnum) {
1500                                         /* Remove it from our watchlist */
1501                                         rmit.remove();
1502                                         /* Decrement machines that need to see this notification */
1503                                         rm.removeWatcher(machineid);
1504                                 }
1505                         }
1506                 }
1507
1508                 if (machineid == localmachineid) {
1509                         /* Our own messages are immediately dead. */
1510                         if (liveness instanceof LastMessage) {
1511                                 ((LastMessage)liveness).setDead();
1512                         } else if (liveness instanceof Slot) {
1513                                 ((Slot)liveness).setDead();
1514                         } else {
1515                                 throw new Error("Unrecognized type");
1516                         }
1517                 }
1518
1519                 // Set dead the abort
1520                 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1521                         Abort abort = i.next().getValue();
1522
1523                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1524                                 abort.setDead();
1525                                 i.remove();
1526                         }
1527                 }
1528
1529                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1530                 if (lastmsgentry == null)
1531                         return;
1532
1533                 long lastmsgseqnum = lastmsgentry.getFirst();
1534                 Liveness lastentry = lastmsgentry.getSecond();
1535                 if (machineid != localmachineid) {
1536                         if (lastentry instanceof LastMessage) {
1537                                 ((LastMessage)lastentry).setDead();
1538                         } else if (lastentry instanceof Slot) {
1539                                 ((Slot)lastentry).setDead();
1540                         } else {
1541                                 throw new Error("Unrecognized type");
1542                         }
1543                 }
1544
1545                 if (machineid == localmachineid) {
1546                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1547                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  seqnum + " got: " + lastmsgseqnum);
1548                 } else {
1549                         if (lastmsgseqnum > seqnum)
1550                                 throw new Error("Server Error: Rollback on remote machine sequence number");
1551                 }
1552         }
1553
1554         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1555                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1556                 for (Entry entry : slot.getEntries()) {
1557                         switch (entry.getType()) {
1558
1559                         case Entry.TypeNewKey:
1560                                 processEntry((NewKey)entry);
1561                                 break;
1562
1563                         case Entry.TypeCommit:
1564                                 processEntry((Commit)entry);
1565                                 break;
1566
1567                         case Entry.TypeAbort:
1568                                 processEntry((Abort)entry);
1569                                 break;
1570
1571                         case Entry.TypeTransaction:
1572                                 processEntry((Transaction)entry);
1573                                 break;
1574
1575                         case Entry.TypeLastMessage:
1576                                 processEntry((LastMessage)entry, machineSet);
1577                                 break;
1578
1579                         case Entry.TypeRejectedMessage:
1580                                 processEntry((RejectedMessage)entry, indexer);
1581                                 break;
1582
1583                         case Entry.TypeTableStatus:
1584                                 processEntry((TableStatus)entry);
1585                                 break;
1586
1587                         default:
1588                                 throw new Error("Unrecognized type: " + entry.getType());
1589                         }
1590                 }
1591         }
1592
1593         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1594                 for (int i = 0; i < newslots.length; i++) {
1595                         Slot currslot = newslots[i];
1596                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1597                         if (prevslot != null &&
1598                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1599                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
1600                 }
1601         }
1602 }