Updates
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.nio.ByteBuffer;
17
18
19 /**
20  * IoTTable data structure.  Provides client inferface.
21  * @author Brian Demsky
22  * @version 1.0
23  */
24
25 final public class Table {
26         private int numslots;   //number of slots stored in buffer
27
28         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
29         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
30         // machine id -> ...
31         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
32         private Vector<Long> rejectedmessagelist = new Vector<Long>();
33         private SlotBuffer buffer;
34         private CloudComm cloud;
35         private long sequencenumber; //Largest sequence number a client has received
36         private long localmachineid;
37         private TableStatus lastTableStatus;
38         static final int FREE_SLOTS = 10; //number of slots that should be kept free
39         static final int SKIP_THRESHOLD = 10;
40         private long liveslotcount = 0;
41         private int chance;
42         static final double RESIZE_MULTIPLE = 1.2;
43         static final double RESIZE_THRESHOLD = 0.75;
44         static final int REJECTED_THRESHOLD = 5;
45         private int resizethreshold;
46         private long lastliveslotseqn;  //smallest sequence number with a live entry
47         private Random random = new Random();
48         private long lastUncommittedTransaction = 0;
49
50         private int smallestTableStatusSeen = -1;
51         private int largestTableStatusSeen = -1;
52         private int lastSeenPendingTransactionSpeculateIndex = 0;
53         private int commitSequenceNumber = 0;
54         private long localTransactionSequenceNumber = 0;
55
56         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
57         private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
58         private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
59         private Map<Long, Abort> abortMap = null; // Set of the live aborts
60         private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
61         private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
62         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
63         private Map<Long, Transaction> uncommittedTransactionsMap = null;
64         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
65         private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
66         private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
67         private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
68         private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
69         private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
70         private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
71         private List<Commit> pendingCommitsList = null;
72         private List<Commit> pendingCommitsToDelete = null;
73         private Map<Long, LocalComm> localCommunicationChannels;
74         private Map<Long, TransactionStatus> transactionStatusMap = null;
75         private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
76
77
78         public Table(String hostname, String baseurl, String password, long _localmachineid) {
79                 localmachineid = _localmachineid;
80                 buffer = new SlotBuffer();
81                 numslots = buffer.capacity();
82                 setResizeThreshold();
83                 sequencenumber = 0;
84                 cloud = new CloudComm(this, hostname, baseurl, password);
85                 lastliveslotseqn = 1;
86
87                 setupDataStructs();
88         }
89
90         public Table(CloudComm _cloud, long _localmachineid) {
91                 localmachineid = _localmachineid;
92                 buffer = new SlotBuffer();
93                 numslots = buffer.capacity();
94                 setResizeThreshold();
95                 sequencenumber = 0;
96                 cloud = _cloud;
97
98                 setupDataStructs();
99         }
100
101         private void setupDataStructs() {
102                 pendingTransQueue = new LinkedList<PendingTransaction>();
103                 commitMap = new HashMap<Long, Map<Long, Commit>>();
104                 abortMap = new HashMap<Long, Abort>();
105                 committedMapByKey = new HashMap<IoTString, Commit>();
106                 commitedTable = new HashMap<IoTString, KeyValue>();
107                 speculativeTable = new HashMap<IoTString, KeyValue>();
108                 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
109                 arbitratorTable = new HashMap<IoTString, Long>();
110                 newKeyTable = new HashMap<IoTString, NewKey>();
111                 newCommitMap = new HashMap<Long, Map<Long, Commit>>();
112                 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
113                 lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
114                 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
115                 pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
116                 pendingCommitsList = new LinkedList<Commit>();
117                 pendingCommitsToDelete = new LinkedList<Commit>();
118                 localCommunicationChannels = new HashMap<Long, LocalComm>();
119                 transactionStatusMap = new HashMap<Long, TransactionStatus>();
120                 transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
121         }
122
123         public void initTable() throws ServerException {
124                 cloud.setSalt();//Set the salt
125                 Slot s = new Slot(this, 1, localmachineid);
126                 TableStatus status = new TableStatus(s, numslots);
127                 s.addEntry(status);
128                 Slot[] array = cloud.putSlot(s, numslots);
129                 if (array == null) {
130                         array = new Slot[] {s};
131                         /* update data structure */
132                         validateandupdate(array, true);
133                 } else {
134                         throw new Error("Error on initialization");
135                 }
136         }
137
138         public void rebuild() throws ServerException {
139                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
140                 validateandupdate(newslots, true);
141         }
142
143         // TODO: delete method
144         public void printSlots() {
145                 long o = buffer.getOldestSeqNum();
146                 long n = buffer.getNewestSeqNum();
147
148                 int[] types = new int[10];
149
150                 int num = 0;
151
152                 int livec = 0;
153                 int deadc = 0;
154                 for (long i = o; i < (n + 1); i++) {
155                         Slot s = buffer.getSlot(i);
156
157                         Vector<Entry> entries = s.getEntries();
158
159                         for (Entry e : entries) {
160                                 if (e.isLive()) {
161                                         int type = e.getType();
162                                         types[type] = types[type] + 1;
163                                         num++;
164                                         livec++;
165                                 } else {
166                                         deadc++;
167                                 }
168                         }
169                 }
170
171                 for (int i = 0; i < 10; i++) {
172                         System.out.println(i + "    " + types[i]);
173                 }
174                 System.out.println("Live count:   " + livec);
175                 System.out.println("Dead count:   " + deadc);
176                 System.out.println("Old:   " + o);
177                 System.out.println("New:   " + n);
178                 System.out.println("Size:   " + buffer.size());
179                 System.out.println("Commits Key Map:   " + commitedTable.size());
180                 // System.out.println("Commits Live Map:   " + commitMap.size());
181                 System.out.println("Pending:   " + pendingTransQueue.size());
182
183                 // List<IoTString> strList = new ArrayList<IoTString>();
184                 // for (int i = 0; i < 100; i++) {
185                 //      String keyA = "a" + i;
186                 //      String keyB = "b" + i;
187                 //      String keyC = "c" + i;
188                 //      String keyD = "d" + i;
189
190                 //      IoTString iKeyA = new IoTString(keyA);
191                 //      IoTString iKeyB = new IoTString(keyB);
192                 //      IoTString iKeyC = new IoTString(keyC);
193                 //      IoTString iKeyD = new IoTString(keyD);
194
195                 //      strList.add(iKeyA);
196                 //      strList.add(iKeyB);
197                 //      strList.add(iKeyC);
198                 //      strList.add(iKeyD);
199                 // }
200
201
202                 // for (Long l : commitMap.keySet()) {
203                 //      for (Long l2 : commitMap.get(l).keySet()) {
204                 //              for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
205                 //                      strList.remove(kv.getKey());
206                 //                      System.out.print(kv.getKey() + "    ");
207                 //              }
208                 //      }
209                 // }
210
211                 // System.out.println();
212                 // System.out.println();
213
214                 // for (IoTString s : strList) {
215                 //      System.out.print(s + "    ");
216                 // }
217                 // System.out.println();
218                 // System.out.println(strList.size());
219         }
220
221         public long getId() {
222                 return localmachineid;
223         }
224
225         public boolean hasConnection() {
226                 return cloud.hasConnection();
227         }
228
229         public String toString() {
230                 String retString = " Committed Table: \n";
231                 retString += "---------------------------\n";
232                 retString += commitedTable.toString();
233
234                 retString += "\n\n";
235
236                 retString += " Speculative Table: \n";
237                 retString += "---------------------------\n";
238                 retString += speculativeTable.toString();
239
240                 return retString;
241         }
242
243         public void addLocalComm(long machineId, LocalComm lc) {
244                 localCommunicationChannels.put(machineId, lc);
245         }
246         public Long getArbitrator(IoTString key) {
247                 return arbitratorTable.get(key);
248         }
249
250         public IoTString getCommitted(IoTString key) {
251                 KeyValue kv = commitedTable.get(key);
252                 if (kv != null) {
253                         return kv.getValue();
254                 } else {
255                         return null;
256                 }
257         }
258
259         public IoTString getSpeculative(IoTString key) {
260                 KeyValue kv = pendingTransSpeculativeTable.get(key);
261
262                 if (kv == null) {
263                         kv = speculativeTable.get(key);
264                 }
265
266                 if (kv == null) {
267                         kv = commitedTable.get(key);
268                 }
269
270                 if (kv != null) {
271                         return kv.getValue();
272                 } else {
273                         return null;
274                 }
275         }
276
277         public IoTString getCommittedAtomic(IoTString key) {
278                 KeyValue kv = commitedTable.get(key);
279
280                 if (arbitratorTable.get(key) == null) {
281                         throw new Error("Key not Found.");
282                 }
283
284                 // Make sure new key value pair matches the current arbitrator
285                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
286                         // TODO: Maybe not throw en error
287                         throw new Error("Not all Key Values Match Arbitrator.");
288                 }
289
290                 if (kv != null) {
291                         pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
292                         return kv.getValue();
293                 } else {
294                         pendingTransBuild.addKVGuard(new KeyValue(key, null));
295                         return null;
296                 }
297         }
298
299         public IoTString getSpeculativeAtomic(IoTString key) {
300
301                 if (arbitratorTable.get(key) == null) {
302                         throw new Error("Key not Found.");
303                 }
304
305                 // Make sure new key value pair matches the current arbitrator
306                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
307                         // TODO: Maybe not throw en error
308                         throw new Error("Not all Key Values Match Arbitrator.");
309                 }
310
311                 KeyValue kv = pendingTransSpeculativeTable.get(key);
312
313                 if (kv == null) {
314                         kv = speculativeTable.get(key);
315                 }
316
317                 if (kv == null) {
318                         kv = commitedTable.get(key);
319                 }
320
321                 if (kv != null) {
322                         pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
323                         return kv.getValue();
324                 } else {
325                         pendingTransBuild.addKVGuard(new KeyValue(key, null));
326                         return null;
327                 }
328         }
329
330         public Pair<Boolean, Boolean> update() {
331
332                 boolean gotLatestFromServer = false;
333                 boolean didSendLocal = false;
334
335                 try {
336                         Slot[] newslots = cloud.getSlots(sequencenumber + 1);
337                         validateandupdate(newslots, false);
338                         gotLatestFromServer = true;
339
340                         if (!pendingTransQueue.isEmpty()) {
341
342                                 // We have a pending transaction so do full insertion
343                                 processPendingTrans();
344                         } else {
345
346                                 // We dont have a pending transaction so do minimal effort
347                                 updateWithNotPendingTrans();
348                         }
349
350                         didSendLocal = true;
351
352                 } catch (Exception e) {
353                         // could not update so do nothing
354                 }
355
356                 return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
357         }
358
359         public Boolean updateFromLocal(long arb) {
360                 LocalComm lc = localCommunicationChannels.get(arb);
361                 if (lc == null) {
362                         // Cant talk directly to arbitrator so cant do anything
363                         return false;
364                 }
365
366                 byte[] array = new byte[Long.BYTES ];
367                 ByteBuffer bbEncode = ByteBuffer.wrap(array);
368                 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(arb);
369                 if (lastSeenCommit != null) {
370                         bbEncode.putLong(lastSeenCommit);
371                 } else {
372                         bbEncode.putLong(0);
373                 }
374
375                 byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
376
377                 // Decode the data
378                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
379                 boolean didCommit = bbDecode.get() == 1;
380                 int numberOfCommites = bbDecode.getInt();
381
382                 List<Commit> newCommits = new LinkedList<Commit>();
383                 for (int i = 0; i < numberOfCommites; i++ ) {
384                         bbDecode.get();
385                         Commit com = (Commit)Commit.decode(null, bbDecode);
386                         newCommits.add(com);
387                 }
388
389
390                 for (Commit commit : newCommits) {
391                         // Prepare to process the commit
392                         processEntry(commit);
393                 }
394
395                 boolean didCommitOrSpeculate = proccessAllNewCommits();
396
397                 // Go through all uncommitted transactions and kill the ones that are dead
398                 deleteDeadUncommittedTransactions();
399
400                 // Speculate on key value pairs
401                 didCommitOrSpeculate |= createSpeculativeTable();
402                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
403
404                 return true;
405         }
406
407         public void startTransaction() {
408                 // Create a new transaction, invalidates any old pending transactions.
409                 pendingTransBuild = new PendingTransaction();
410         }
411
412         public void addKV(IoTString key, IoTString value) {
413
414                 if (arbitratorTable.get(key) == null) {
415                         throw new Error("Key not Found.");
416                 }
417
418                 // Make sure new key value pair matches the current arbitrator
419                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
420                         // TODO: Maybe not throw en error
421                         throw new Error("Not all Key Values Match Arbitrator.");
422                 }
423
424                 KeyValue kv = new KeyValue(key, value);
425                 pendingTransBuild.addKV(kv);
426         }
427
428         public TransactionStatus commitTransaction() {
429
430                 if (pendingTransBuild.getKVUpdates().size() == 0) {
431
432                         // transaction with no updates will have no effect on the system
433                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
434                 }
435
436                 TransactionStatus transStatus = null;
437
438                 if (pendingTransBuild.getArbitrator() != localmachineid) {
439
440                         // set the local sequence number so we can recognize this transaction later
441                         pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
442                         localTransactionSequenceNumber++;
443
444                         transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
445                         transactionStatusNotSentMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
446
447                         // Add the pending transaction to the queue
448                         pendingTransQueue.add(pendingTransBuild);
449
450
451                         for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
452                                 PendingTransaction pt = pendingTransQueue.get(i);
453
454                                 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
455
456                                         lastSeenPendingTransactionSpeculateIndex = i;
457
458                                         for (KeyValue kv : pt.getKVUpdates()) {
459                                                 pendingTransSpeculativeTable.put(kv.getKey(), kv);
460                                         }
461
462                                 }
463                         }
464                 } else {
465                         Transaction ut = new Transaction(null,
466                                                          -1,
467                                                          localmachineid,
468                                                          pendingTransBuild.getArbitrator(),
469                                                          pendingTransBuild.getKVUpdates(),
470                                                          pendingTransBuild.getKVGuard());
471
472                         Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
473
474                         if (retData.getFirst()) {
475                                 transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
476                         } else {
477                                 transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
478                         }
479                 }
480
481                 // Try to insert transactions if possible
482                 if (!pendingTransQueue.isEmpty()) {
483                         // We have a pending transaction so do full insertion
484                         processPendingTrans();
485                 } else {
486                         try {
487                                 // We dont have a pending transaction so do minimal effort
488                                 updateWithNotPendingTrans();
489                         } catch (Exception e) {
490                                 // Do nothing
491                         }
492                 }
493
494                 // reset it so next time is fresh
495                 pendingTransBuild = new PendingTransaction();
496
497                 return transStatus;
498         }
499
500         public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
501
502                 while (true) {
503                         if (arbitratorTable.get(keyName) != null) {
504                                 // There is already an arbitrator
505                                 return false;
506                         }
507
508                         if (tryput(keyName, machineId, false)) {
509                                 // If successfully inserted
510                                 return true;
511                         }
512                 }
513         }
514
515         private void processPendingTrans() {
516
517                 boolean sentAllPending = false;
518                 try {
519                         while (!pendingTransQueue.isEmpty()) {
520                                 if (tryput( pendingTransQueue.peek(), false)) {
521                                         pendingTransQueue.poll();
522                                 }
523                         }
524
525                         // if got here then all pending transactions were sent
526                         sentAllPending = true;
527                 } catch (Exception e) {
528                         // There was a connection error
529                         sentAllPending = false;
530                 }
531
532                 if (!sentAllPending) {
533
534                         for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
535                                 PendingTransaction pt = i.next();
536                                 LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
537                                 if (lc == null) {
538                                         // Cant talk directly to arbitrator so cant do anything
539                                         continue;
540                                 }
541
542
543                                 Transaction ut = new Transaction(null,
544                                                                  -1,
545                                                                  localmachineid,
546                                                                  pendingTransBuild.getArbitrator(),
547                                                                  pendingTransBuild.getKVUpdates(),
548                                                                  pendingTransBuild.getKVGuard());
549
550
551                                 Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
552
553                                 for (Commit commit : retData.getSecond()) {
554                                         // Prepare to process the commit
555                                         processEntry(commit);
556                                 }
557
558                                 boolean didCommitOrSpeculate = proccessAllNewCommits();
559
560                                 // Go through all uncommitted transactions and kill the ones that are dead
561                                 deleteDeadUncommittedTransactions();
562
563                                 // Speculate on key value pairs
564                                 didCommitOrSpeculate |= createSpeculativeTable();
565                                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
566
567                                 if (retData.getFirst()) {
568                                         TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
569                                         if (transStatus != null) {
570                                                 transStatus.setStatus(TransactionStatus.StatusCommitted);
571                                         }
572
573                                 } else {
574                                         TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
575                                         if (transStatus != null) {
576                                                 transStatus.setStatus(TransactionStatus.StatusAborted);
577                                         }
578                                 }
579                                 i.remove();
580                         }
581                 }
582         }
583
584         private void updateWithNotPendingTrans() throws ServerException {
585
586                 boolean doEnd = false;
587                 boolean needResize = false;
588                 while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0)  || (pendingCommitsList.size() > 0))   ) {
589                         boolean resize = needResize;
590                         needResize = false;
591
592                         Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
593                         int newsize = 0;
594                         if (liveslotcount > resizethreshold) {
595                                 resize = true; //Resize is forced
596                         }
597
598                         if (resize) {
599                                 newsize = (int) (numslots * RESIZE_MULTIPLE);
600                                 TableStatus status = new TableStatus(s, newsize);
601                                 s.addEntry(status);
602                         }
603
604                         doRejectedMessages(s);
605
606                         ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
607
608                         // Resize was needed so redo call
609                         if (retTup.getFirst()) {
610                                 needResize = true;
611                                 continue;
612                         }
613
614                         // Extract working variables
615                         boolean seenliveslot = retTup.getSecond();
616                         long seqn = retTup.getThird();
617
618                         // Did need to arbitrate
619                         doEnd = !doArbitration(s);
620
621                         doOptionalRescue(s, seenliveslot, seqn, resize);
622
623                         int max = 0;
624                         if (resize) {
625                                 max = newsize;
626                         }
627
628                         Slot[] array = cloud.putSlot(s, max);
629                         if (array == null) {
630                                 array = new Slot[] {s};
631                                 rejectedmessagelist.clear();
632
633                                 // Delete pending commits that were sent to the cloud
634                                 deletePendingCommits();
635
636                         }       else {
637                                 if (array.length == 0)
638                                         throw new Error("Server Error: Did not send any slots");
639                                 rejectedmessagelist.add(s.getSequenceNumber());
640                                 doEnd = false;
641                         }
642
643                         /* update data structure */
644                         validateandupdate(array, true);
645                 }
646         }
647
648         private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
649
650                 // encode the request
651                 byte[] array = new byte[Long.BYTES + ut.getSize()];
652                 ByteBuffer bbEncode = ByteBuffer.wrap(array);
653                 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
654                 if (lastSeenCommit != null) {
655                         bbEncode.putLong(lastSeenCommit);
656                 } else {
657                         bbEncode.putLong(0);
658                 }
659                 ut.encode(bbEncode);
660
661                 byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
662
663                 // Decode the data
664                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
665                 boolean didCommit = bbDecode.get() == 1;
666                 int numberOfCommites = bbDecode.getInt();
667
668                 List<Commit> newCommits = new LinkedList<Commit>();
669                 for (int i = 0; i < numberOfCommites; i++ ) {
670                         bbDecode.get();
671                         Commit com = (Commit)Commit.decode(null, bbDecode);
672                         newCommits.add(com);
673                 }
674
675                 return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
676         }
677
678         public byte[] localCommInput(byte[] data) {
679
680                 // Decode the data
681                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
682                 long lastSeenCommit = bbDecode.getLong();
683
684                 Transaction ut = null;
685                 if (data.length != Long.BYTES) {
686                         bbDecode.get();
687                         ut = (Transaction)Transaction.decode(null, bbDecode);
688                 }
689                 // Do the local update and arbitrate
690                 Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
691
692                 // Calculate the size of the response
693                 int size = Byte.BYTES + Integer.BYTES;
694                 for (Commit com : returnData.getSecond()) {
695                         size += com.getSize();
696                 }
697
698                 // encode the response
699                 byte[] array = new byte[size];
700                 ByteBuffer bbEncode = ByteBuffer.wrap(array);
701                 if (returnData.getFirst()) {
702                         bbEncode.put((byte)1);
703                 } else {
704                         bbEncode.put((byte)0);
705                 }
706                 bbEncode.putInt(returnData.getSecond().size());
707
708                 for (Commit com : returnData.getSecond()) {
709                         com.encode(bbEncode);
710                 }
711
712                 return bbEncode.array();
713         }
714
715         private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
716
717                 List<Commit> returnCommits = new ArrayList<Commit>();
718
719                 if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
720                         // There is a commit that the other client has not seen yet
721
722                         Map<Long, Commit> cm = commitMap.get(localmachineid);
723                         if (cm != null) {
724
725                                 List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
726                                 Collections.sort(commitKeys);
727
728
729                                 for (int i = (commitKeys.size() - 1); i >= 0; i--) {
730                                         Commit com = cm.get(commitKeys.get(i));
731
732                                         if (com.getSequenceNumber() <= lastCommitSeen) {
733                                                 break;
734                                         }
735                                         returnCommits.add((Commit)com.getCopy(null));
736                                 }
737                         }
738                 }
739
740
741                 if ((ut == null) || (ut.getArbitrator() != localmachineid)) {
742                         // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
743                         // or there is no transaction to process
744                         return new Pair<Boolean, List<Commit>>(false, returnCommits);
745                 }
746
747                 if (!ut.evaluateGuard(commitedTable, null)) {
748                         // Guard evaluated as false so return only the commits that the other device has not seen yet
749                         return new Pair<Boolean, List<Commit>>(false, returnCommits);
750                 }
751
752                 // create the commit
753                 Commit commit = new Commit(null,
754                                            -1,
755                                            commitSequenceNumber,
756                                            ut.getArbitrator(),
757                                            ut.getkeyValueUpdateSet());
758                 commitSequenceNumber = commitSequenceNumber + 1;
759
760                 // Add to the pending commits list
761                 pendingCommitsList.add(commit);
762
763                 // Add this commit so we can send it back
764                 returnCommits.add(commit);
765
766                 // Prepare to process the commit
767                 processEntry(commit);
768
769                 boolean didCommitOrSpeculate = proccessAllNewCommits();
770
771                 // Go through all uncommitted transactions and kill the ones that are dead
772                 deleteDeadUncommittedTransactions();
773
774                 // Speculate on key value pairs
775                 didCommitOrSpeculate |= createSpeculativeTable();
776                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
777
778                 return new Pair<Boolean, List<Commit>>(true, returnCommits);
779         }
780
781         public void decrementLiveCount() {
782                 liveslotcount--;
783         }
784
785         private void setResizeThreshold() {
786                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
787                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
788         }
789
790         private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
791                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
792
793                 int newsize = 0;
794                 if (liveslotcount > resizethreshold) {
795                         resize = true; //Resize is forced
796                 }
797
798                 if (resize) {
799                         newsize = (int) (numslots * RESIZE_MULTIPLE);
800                         TableStatus status = new TableStatus(s, newsize);
801                         s.addEntry(status);
802                 }
803
804                 doRejectedMessages(s);
805
806                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
807
808                 // Resize was needed so redo call
809                 if (retTup.getFirst()) {
810                         return tryput(pendingTrans, true);
811                 }
812
813                 // Extract working variables
814                 boolean seenliveslot = retTup.getSecond();
815                 long seqn = retTup.getThird();
816
817                 doArbitration(s);
818
819                 Transaction trans = new Transaction(s,
820                                                     s.getSequenceNumber(),
821                                                     localmachineid,
822                                                     pendingTrans.getArbitrator(),
823                                                     pendingTrans.getKVUpdates(),
824                                                     pendingTrans.getKVGuard());
825                 boolean insertedTrans = false;
826                 if (s.hasSpace(trans)) {
827                         s.addEntry(trans);
828                         insertedTrans = true;
829                 }
830
831                 doOptionalRescue(s, seenliveslot, seqn, resize);
832                 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
833
834                 if (sendRetData.getFirst()) {
835                         // update the status and change what the sequence number is for the
836                         TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTrans.getMachineLocalTransSeqNum());
837                         transStatus.setStatus(TransactionStatus.StatusSent);
838                         transStatus.setSentTransaction();
839                         transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
840                 }
841
842
843                 if (sendRetData.getSecond().length != 0) {
844                         // insert into the local block chain
845                         validateandupdate(sendRetData.getSecond(), true);
846                 }
847
848                 return sendRetData.getFirst();
849         }
850
851         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
852                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
853                 int newsize = 0;
854                 if (liveslotcount > resizethreshold) {
855                         resize = true; //Resize is forced
856                 }
857
858                 if (resize) {
859                         newsize = (int) (numslots * RESIZE_MULTIPLE);
860                         TableStatus status = new TableStatus(s, newsize);
861                         s.addEntry(status);
862                 }
863
864                 doRejectedMessages(s);
865                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
866
867                 // Resize was needed so redo call
868                 if (retTup.getFirst()) {
869                         return tryput(keyName, arbMachineid, true);
870                 }
871
872                 // Extract working variables
873                 boolean seenliveslot = retTup.getSecond();
874                 long seqn = retTup.getThird();
875
876                 doArbitration(s);
877
878                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
879
880                 boolean insertedNewKey = false;
881                 if (s.hasSpace(newKey)) {
882                         s.addEntry(newKey);
883                         insertedNewKey = true;
884                 }
885
886                 doOptionalRescue(s, seenliveslot, seqn, resize);
887                 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
888
889                 if (sendRetData.getSecond().length != 0) {
890                         // insert into the local block chain
891                         validateandupdate(sendRetData.getSecond(), true);
892                 }
893
894                 return sendRetData.getFirst();
895         }
896
897         private void doRejectedMessages(Slot s) {
898                 if (! rejectedmessagelist.isEmpty()) {
899                         /* TODO: We should avoid generating a rejected message entry if
900                          * there is already a sufficient entry in the queue (e.g.,
901                          * equalsto value of true and same sequence number).  */
902
903                         long old_seqn = rejectedmessagelist.firstElement();
904                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
905                                 long new_seqn = rejectedmessagelist.lastElement();
906                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
907                                 s.addEntry(rm);
908                         } else {
909                                 long prev_seqn = -1;
910                                 int i = 0;
911                                 /* Go through list of missing messages */
912                                 for (; i < rejectedmessagelist.size(); i++) {
913                                         long curr_seqn = rejectedmessagelist.get(i);
914                                         Slot s_msg = buffer.getSlot(curr_seqn);
915                                         if (s_msg != null)
916                                                 break;
917                                         prev_seqn = curr_seqn;
918                                 }
919                                 /* Generate rejected message entry for missing messages */
920                                 if (prev_seqn != -1) {
921                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
922                                         s.addEntry(rm);
923                                 }
924                                 /* Generate rejected message entries for present messages */
925                                 for (; i < rejectedmessagelist.size(); i++) {
926                                         long curr_seqn = rejectedmessagelist.get(i);
927                                         Slot s_msg = buffer.getSlot(curr_seqn);
928                                         long machineid = s_msg.getMachineID();
929                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
930                                         s.addEntry(rm);
931                                 }
932                         }
933                 }
934         }
935
936         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
937                 long newestseqnum = buffer.getNewestSeqNum();
938                 long oldestseqnum = buffer.getOldestSeqNum();
939                 if (lastliveslotseqn < oldestseqnum)
940                         lastliveslotseqn = oldestseqnum;
941
942                 long seqn = lastliveslotseqn;
943                 boolean seenliveslot = false;
944                 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
945                 long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
946
947
948                 // Mandatory Rescue
949                 for (; seqn < threshold; seqn++) {
950                         Slot prevslot = buffer.getSlot(seqn);
951                         // Push slot number forward
952                         if (! seenliveslot)
953                                 lastliveslotseqn = seqn;
954
955                         if (! prevslot.isLive())
956                                 continue;
957                         seenliveslot = true;
958                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
959                         for (Entry liveentry : liveentries) {
960                                 if (s.hasSpace(liveentry)) {
961                                         s.addEntry(liveentry);
962                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
963                                         if (!resize) {
964                                                 System.out.println("B"); //?
965                                                 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
966                                         }
967                                 }
968                         }
969                 }
970
971                 // Did not resize
972                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
973         }
974
975         private boolean doArbitration(Slot s) {
976
977                 // flag whether we have finished all arbitration
978                 boolean stillHasArbitration = false;
979
980                 pendingCommitsToDelete.clear();
981
982                 // First add queue commits
983                 for (Commit commit : pendingCommitsList) {
984                         if (s.hasSpace(commit)) {
985                                 s.addEntry(commit);
986                                 pendingCommitsToDelete.add(commit);
987                         } else {
988                                 // Ran out of space so move on but still not done
989                                 stillHasArbitration = true;
990                                 return stillHasArbitration;
991                         }
992                 }
993
994                 // Arbitrate
995                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
996                 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
997
998                 // Sort from oldest to newest
999                 Collections.sort(transSeqNums);
1000
1001                 for (Long transNum : transSeqNums) {
1002                         Transaction ut = uncommittedTransactionsMap.get(transNum);
1003
1004                         // Check if this machine arbitrates for this transaction
1005                         if (ut.getArbitrator() != localmachineid ) {
1006                                 continue;
1007                         }
1008
1009                         // we did have something to arbitrate on
1010                         stillHasArbitration = true;
1011
1012                         Entry newEntry = null;
1013
1014                         if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
1015                                 // Guard evaluated as true
1016
1017                                 // update the local tmp current key set
1018                                 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
1019                                         speculativeTableTmp.put(kv.getKey(), kv);
1020                                 }
1021
1022                                 // create the commit
1023                                 newEntry = new Commit(s,
1024                                                       ut.getSequenceNumber(),
1025                                                       commitSequenceNumber,
1026                                                       ut.getArbitrator(),
1027                                                       ut.getkeyValueUpdateSet());
1028                                 commitSequenceNumber = commitSequenceNumber + 1;
1029                         } else {
1030                                 // Guard was false
1031
1032                                 // create the abort
1033                                 newEntry = new Abort(s,
1034                                                      ut.getSequenceNumber(),
1035                                                      ut.getMachineID(),
1036                                                      ut.getArbitrator());
1037                         }
1038
1039                         if ((newEntry != null) && s.hasSpace(newEntry)) {
1040                                 s.addEntry(newEntry);
1041                         } else {
1042                                 break;
1043                         }
1044                 }
1045
1046                 return stillHasArbitration;
1047         }
1048
1049         private void deletePendingCommits() {
1050                 for (Commit com : pendingCommitsToDelete) {
1051                         pendingCommitsList.remove(com);
1052                 }
1053                 pendingCommitsToDelete.clear();
1054         }
1055
1056         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1057                 /* now go through live entries from least to greatest sequence number until
1058                  * either all live slots added, or the slot doesn't have enough room
1059                  * for SKIP_THRESHOLD consecutive entries*/
1060                 int skipcount = 0;
1061                 long newestseqnum = buffer.getNewestSeqNum();
1062                 search:
1063                 for (; seqn <= newestseqnum; seqn++) {
1064                         Slot prevslot = buffer.getSlot(seqn);
1065                         //Push slot number forward
1066                         if (!seenliveslot)
1067                                 lastliveslotseqn = seqn;
1068
1069                         if (!prevslot.isLive())
1070                                 continue;
1071                         seenliveslot = true;
1072                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1073                         for (Entry liveentry : liveentries) {
1074                                 if (s.hasSpace(liveentry))
1075                                         s.addEntry(liveentry);
1076                                 else {
1077                                         skipcount++;
1078                                         if (skipcount > SKIP_THRESHOLD)
1079                                                 break search;
1080                                 }
1081                         }
1082                 }
1083         }
1084
1085         private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
1086                 int max = 0;
1087                 if (resize)
1088                         max = newsize;
1089
1090                 Slot[] array = cloud.putSlot(s, max);
1091                 if (array == null) {
1092                         array = new Slot[] {s};
1093                         rejectedmessagelist.clear();
1094
1095                         // Delete pending commits that were sent to the cloud
1096                         deletePendingCommits();
1097                 }       else {
1098                         // if (array.length == 0)
1099                         // throw new Error("Server Error: Did not send any slots");
1100                         rejectedmessagelist.add(s.getSequenceNumber());
1101                         inserted = false;
1102                 }
1103
1104                 return new Pair<Boolean, Slot[]>(inserted, array);
1105         }
1106
1107         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
1108                 /* The cloud communication layer has checked slot HMACs already
1109                          before decoding */
1110                 if (newslots.length == 0) return;
1111
1112                 // Reset the table status declared sizes
1113                 smallestTableStatusSeen = -1;
1114                 largestTableStatusSeen = -1;
1115
1116                 long firstseqnum = newslots[0].getSequenceNumber();
1117                 if (firstseqnum <= sequencenumber) {
1118                         throw new Error("Server Error: Sent older slots!");
1119                 }
1120
1121                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
1122                 checkHMACChain(indexer, newslots);
1123
1124                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
1125
1126                 // initExpectedSize(firstseqnum);
1127                 for (Slot slot : newslots) {
1128                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
1129                         // updateExpectedSize();
1130                 }
1131
1132                 /* If there is a gap, check to see if the server sent us everything. */
1133                 if (firstseqnum != (sequencenumber + 1)) {
1134
1135                         // TODO: Check size
1136                         checkNumSlots(newslots.length);
1137                         if (!machineSet.isEmpty()) {
1138                                 throw new Error("Missing record for machines: " + machineSet);
1139                         }
1140                 }
1141
1142
1143                 commitNewMaxSize();
1144
1145                 /* Commit new to slots. */
1146                 for (Slot slot : newslots) {
1147                         buffer.putSlot(slot);
1148                         liveslotcount++;
1149                 }
1150                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
1151
1152                 // Process all on key value pairs
1153                 boolean didCommitOrSpeculate = proccessAllNewCommits();
1154
1155                 // Go through all uncommitted transactions and kill the ones that are dead
1156                 deleteDeadUncommittedTransactions();
1157
1158                 // Speculate on key value pairs
1159                 didCommitOrSpeculate |= createSpeculativeTable();
1160
1161                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
1162         }
1163
1164         private boolean proccessAllNewCommits() {
1165                 // Process only if there are commit
1166                 if (newCommitMap.keySet().size() == 0) {
1167                         return false;
1168                 }
1169                 boolean didProcessNewCommit = false;
1170
1171                 for (Long arb : newCommitMap.keySet()) {
1172
1173                         List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
1174
1175                         // Sort from oldest to newest commit
1176                         Collections.sort(commitSeqNums);
1177
1178                         // Go through each new commit one by one
1179                         for (Long entrySeqNum : commitSeqNums) {
1180                                 Commit entry = newCommitMap.get(arb).get(entrySeqNum);
1181
1182                                 long lastCommitSeenSeqNum = -1;
1183                                 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
1184                                         lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
1185                                 }
1186
1187                                 if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
1188                                         Map<Long, Commit> cm = commitMap.get(arb);
1189                                         if (cm == null) {
1190                                                 cm = new HashMap<Long, Commit>();
1191                                         }
1192
1193                                         Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
1194                                         commitMap.put(arb, cm);
1195
1196                                         if (prevCommit != null) {
1197                                                 prevCommit.setDead();
1198
1199                                                 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
1200                                                         committedMapByKey.put(kv.getKey(), entry);
1201                                                 }
1202                                         }
1203
1204                                         continue;
1205                                 }
1206
1207                                 Set<Commit> commitsToEditSet = new HashSet<Commit>();
1208
1209                                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1210                                         commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
1211                                 }
1212
1213                                 commitsToEditSet.remove(null);
1214
1215                                 for (Commit prevCommit : commitsToEditSet) {
1216
1217                                         Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
1218
1219                                         if (!prevCommit.isLive()) {
1220                                                 Map<Long, Commit> cm = commitMap.get(arb);
1221
1222                                                 // remove it from the map so that it can be set as dead
1223                                                 if (cm != null) {
1224                                                         cm.remove(prevCommit.getSequenceNumber());
1225                                                         commitMap.put(arb, cm);
1226                                                 }
1227                                         }
1228                                 }
1229
1230                                 // Add the new commit
1231                                 Map<Long, Commit> cm = commitMap.get(arb);
1232                                 if (cm == null) {
1233                                         cm = new HashMap<Long, Commit>();
1234                                 }
1235                                 cm.put(entry.getSequenceNumber(), entry);
1236                                 commitMap.put(arb, cm);
1237
1238                                 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
1239
1240                                 // set the trans sequence number if we are able to
1241                                 if (entry.getTransSequenceNumber() != -1) {
1242                                         lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1243                                 }
1244
1245                                 didProcessNewCommit = true;
1246
1247                                 // Update the committed table list
1248                                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1249                                         IoTString key = kv.getKey();
1250                                         commitedTable.put(key, kv);
1251                                         committedMapByKey.put(key, entry);
1252                                 }
1253                         }
1254                 }
1255                 // Clear the new commits storage so we can use it later
1256                 newCommitMap.clear();
1257
1258
1259                 // go through all saved transactions and update the status of those that can be updated
1260                 for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
1261                         Map.Entry<Long, TransactionStatus> entry = i.next();
1262                         long seqnum = entry.getKey();
1263                         TransactionStatus status = entry.getValue();
1264
1265                         if (status.getSentTransaction()) {
1266
1267                                 Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(status.getArbitrator());
1268                                 Long abortSeqNum = lastAbortSeenSeqNumMap.get(status.getArbitrator());
1269
1270                                 if (((commitSeqNum != null) && (seqnum <= commitSeqNum)) ||
1271                                         ((abortSeqNum != null) && (seqnum <= abortSeqNum))) {
1272                                         status.setStatus(TransactionStatus.StatusCommitted);
1273                                         i.remove();
1274                                 }
1275                         }
1276                 }
1277
1278                 return didProcessNewCommit;
1279         }
1280
1281         private void deleteDeadUncommittedTransactions() {
1282                 // Make dead the transactions
1283                 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
1284                         Transaction prevtrans = i.next().getValue();
1285                         long transArb = prevtrans.getArbitrator();
1286
1287                         Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
1288                         Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
1289
1290                         if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
1291                                 ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
1292                                 i.remove();
1293                                 prevtrans.setDead();
1294                         }
1295                 }
1296         }
1297
1298         private boolean createSpeculativeTable() {
1299                 if (uncommittedTransactionsMap.keySet().size() == 0) {
1300                         return false;
1301                 }
1302
1303                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1304                 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
1305
1306                 // Sort from oldest to newest commit
1307                 Collections.sort(utSeqNums);
1308
1309                 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
1310
1311                         speculativeTable.clear();
1312                         lastUncommittedTransaction = -1;
1313
1314                         for (Long key : utSeqNums) {
1315                                 Transaction trans = uncommittedTransactionsMap.get(key);
1316
1317                                 lastUncommittedTransaction = key;
1318
1319                                 if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
1320                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1321                                                 speculativeTableTmp.put(kv.getKey(), kv);
1322                                         }
1323                                 }
1324
1325                         }
1326                 } else {
1327                         for (Long key : utSeqNums) {
1328
1329                                 if (key <= lastUncommittedTransaction) {
1330                                         continue;
1331                                 }
1332
1333                                 lastUncommittedTransaction = key;
1334
1335                                 Transaction trans = uncommittedTransactionsMap.get(key);
1336
1337                                 if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
1338                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1339                                                 speculativeTableTmp.put(kv.getKey(), kv);
1340                                         }
1341                                 }
1342                         }
1343                 }
1344
1345                 for (IoTString key : speculativeTableTmp.keySet()) {
1346                         speculativeTable.put(key, speculativeTableTmp.get(key));
1347                 }
1348
1349                 return true;
1350         }
1351
1352         private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
1353
1354                 if (didCommitOrSpeculate) {
1355                         pendingTransSpeculativeTable.clear();
1356                         lastSeenPendingTransactionSpeculateIndex = 0;
1357
1358                         int index = 0;
1359                         for (PendingTransaction pt : pendingTransQueue) {
1360                                 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
1361
1362                                         lastSeenPendingTransactionSpeculateIndex = index;
1363                                         index++;
1364
1365                                         for (KeyValue kv : pt.getKVUpdates()) {
1366                                                 pendingTransSpeculativeTable.put(kv.getKey(), kv);
1367                                         }
1368
1369                                 }
1370                         }
1371                 }
1372         }
1373
1374         private int expectedsize, currmaxsize;
1375
1376         private void checkNumSlots(int numslots) {
1377
1378
1379                 // We only have 1 size so we must have this many slots
1380                 if (largestTableStatusSeen == smallestTableStatusSeen) {
1381                         if (numslots != smallestTableStatusSeen) {
1382                                 throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numslots);
1383                         }
1384                 } else {
1385                         // We have more than 1
1386                         if (numslots < smallestTableStatusSeen) {
1387                                 throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
1388                         }
1389                 }
1390
1391                 // if (numslots != expectedsize) {
1392                 // throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
1393                 // }
1394         }
1395
1396         private void initExpectedSize(long firstsequencenumber) {
1397                 long prevslots = firstsequencenumber;
1398                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
1399                 currmaxsize = numslots;
1400         }
1401
1402         private void updateExpectedSize() {
1403                 expectedsize++;
1404                 if (expectedsize > currmaxsize) {
1405                         expectedsize = currmaxsize;
1406                 }
1407         }
1408
1409         private void updateCurrMaxSize(int newmaxsize) {
1410                 currmaxsize = newmaxsize;
1411         }
1412
1413         private void commitNewMaxSize() {
1414
1415                 if (largestTableStatusSeen == -1) {
1416                         currmaxsize = numslots;
1417                 } else {
1418                         currmaxsize = largestTableStatusSeen;
1419                 }
1420
1421                 if (numslots != currmaxsize) {
1422                         buffer.resize(currmaxsize);
1423                 }
1424
1425                 numslots = currmaxsize;
1426                 setResizeThreshold();
1427         }
1428
1429         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
1430                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
1431         }
1432
1433         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
1434                 long oldseqnum = entry.getOldSeqNum();
1435                 long newseqnum = entry.getNewSeqNum();
1436                 boolean isequal = entry.getEqual();
1437                 long machineid = entry.getMachineID();
1438                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
1439                         Slot slot = indexer.getSlot(seqnum);
1440                         if (slot != null) {
1441                                 long slotmachineid = slot.getMachineID();
1442                                 if (isequal != (slotmachineid == machineid)) {
1443                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
1444                                 }
1445                         }
1446                 }
1447
1448                 HashSet<Long> watchset = new HashSet<Long>();
1449                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
1450                         long entry_mid = lastmsg_entry.getKey();
1451                         /* We've seen it, don't need to continue to watch.  Our next
1452                          * message will implicitly acknowledge it. */
1453                         if (entry_mid == localmachineid)
1454                                 continue;
1455                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
1456                         long entry_seqn = v.getFirst();
1457                         if (entry_seqn < newseqnum) {
1458                                 addWatchList(entry_mid, entry);
1459                                 watchset.add(entry_mid);
1460                         }
1461                 }
1462                 if (watchset.isEmpty())
1463                         entry.setDead();
1464                 else
1465                         entry.setWatchSet(watchset);
1466         }
1467
1468         private void processEntry(NewKey entry) {
1469                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
1470
1471                 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
1472
1473                 if (oldNewKey != null) {
1474                         oldNewKey.setDead();
1475                 }
1476         }
1477
1478         private void processEntry(Transaction entry) {
1479
1480                 long arb = entry.getArbitrator();
1481                 Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
1482                 Long abLast = lastAbortSeenSeqNumMap.get(arb);
1483
1484                 Transaction prevTrans = null;
1485
1486                 if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
1487                         prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1488                 } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
1489                         prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1490                 } else {
1491                         prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
1492                 }
1493
1494                 // Duplicate so delete old copy
1495                 if (prevTrans != null) {
1496                         prevTrans.setDead();
1497                 }
1498         }
1499
1500         private void processEntry(Abort entry) {
1501                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
1502                         // Abort has not been seen yet so we need to keep track of it
1503
1504                         Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
1505                         if (prevAbort != null) {
1506                                 prevAbort.setDead(); // delete old version of the duplicate
1507                         }
1508
1509                         if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
1510                                 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1511                         }
1512                 } else {
1513                         // The machine already saw this so it is dead
1514                         entry.setDead();
1515                 }
1516
1517                 // Update the status of the transaction and remove it since we are done with this transaction
1518                 TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
1519                 if (status != null) {
1520                         status.setStatus(TransactionStatus.StatusAborted);
1521                 }
1522         }
1523
1524         private void processEntry(Commit entry) {
1525                 Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
1526
1527                 if (arbMap == null) {
1528                         arbMap = new HashMap<Long, Commit>();
1529                 }
1530
1531                 Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
1532                 newCommitMap.put(entry.getTransArbitrator(), arbMap);
1533
1534                 if (prevCommit != null) {
1535                         prevCommit.setDead();
1536                 }
1537         }
1538
1539         private void processEntry(TableStatus entry) {
1540                 int newnumslots = entry.getMaxSlots();
1541                 // updateCurrMaxSize(newnumslots);
1542                 if (lastTableStatus != null)
1543                         lastTableStatus.setDead();
1544                 lastTableStatus = entry;
1545
1546                 if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
1547                         smallestTableStatusSeen = newnumslots;
1548                 }
1549
1550                 if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
1551                         largestTableStatusSeen = newnumslots;
1552                 }
1553         }
1554
1555         private void addWatchList(long machineid, RejectedMessage entry) {
1556                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1557                 if (entries == null)
1558                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1559                 entries.add(entry);
1560         }
1561
1562         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1563                 machineSet.remove(machineid);
1564
1565                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1566                 if (watchset != null) {
1567                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1568                                 RejectedMessage rm = rmit.next();
1569                                 if (rm.getNewSeqNum() <= seqnum) {
1570                                         /* Remove it from our watchlist */
1571                                         rmit.remove();
1572                                         /* Decrement machines that need to see this notification */
1573                                         rm.removeWatcher(machineid);
1574                                 }
1575                         }
1576                 }
1577
1578                 if (machineid == localmachineid) {
1579                         /* Our own messages are immediately dead. */
1580                         if (liveness instanceof LastMessage) {
1581                                 ((LastMessage)liveness).setDead();
1582                         } else if (liveness instanceof Slot) {
1583                                 ((Slot)liveness).setDead();
1584                         } else {
1585                                 throw new Error("Unrecognized type");
1586                         }
1587                 }
1588
1589                 // Set dead the abort
1590                 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1591                         Abort abort = i.next().getValue();
1592
1593                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1594                                 abort.setDead();
1595                                 i.remove();
1596                         }
1597                 }
1598
1599                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1600                 if (lastmsgentry == null)
1601                         return;
1602
1603                 long lastmsgseqnum = lastmsgentry.getFirst();
1604                 Liveness lastentry = lastmsgentry.getSecond();
1605                 if (machineid != localmachineid) {
1606                         if (lastentry instanceof LastMessage) {
1607                                 ((LastMessage)lastentry).setDead();
1608                         } else if (lastentry instanceof Slot) {
1609                                 ((Slot)lastentry).setDead();
1610                         } else {
1611                                 throw new Error("Unrecognized type");
1612                         }
1613                 }
1614
1615                 if (machineid == localmachineid) {
1616                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1617                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  seqnum + " got: " + lastmsgseqnum);
1618                 } else {
1619                         if (lastmsgseqnum > seqnum)
1620                                 throw new Error("Server Error: Rollback on remote machine sequence number");
1621                 }
1622         }
1623
1624         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1625                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1626                 for (Entry entry : slot.getEntries()) {
1627                         switch (entry.getType()) {
1628
1629                         case Entry.TypeNewKey:
1630                                 processEntry((NewKey)entry);
1631                                 break;
1632
1633                         case Entry.TypeCommit:
1634                                 processEntry((Commit)entry);
1635                                 break;
1636
1637                         case Entry.TypeAbort:
1638                                 processEntry((Abort)entry);
1639                                 break;
1640
1641                         case Entry.TypeTransaction:
1642                                 processEntry((Transaction)entry);
1643                                 break;
1644
1645                         case Entry.TypeLastMessage:
1646                                 processEntry((LastMessage)entry, machineSet);
1647                                 break;
1648
1649                         case Entry.TypeRejectedMessage:
1650                                 processEntry((RejectedMessage)entry, indexer);
1651                                 break;
1652
1653                         case Entry.TypeTableStatus:
1654                                 processEntry((TableStatus)entry);
1655                                 break;
1656
1657                         default:
1658                                 throw new Error("Unrecognized type: " + entry.getType());
1659                         }
1660                 }
1661         }
1662
1663         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1664                 for (int i = 0; i < newslots.length; i++) {
1665                         Slot currslot = newslots[i];
1666                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1667                         if (prevslot != null &&
1668                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1669                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
1670                 }
1671         }
1672 }