Block Chain Transactions, Commits multiple parts version
[iotcloud.git] / version2 / backup / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.nio.ByteBuffer;
17 import java.util.concurrent.Semaphore;
18
19
20 /**
21  * IoTTable data structure.  Provides client inferface.
22  * @author Brian Demsky
23  * @version 1.0
24  */
25
26 final public class Table {
27         private int numslots;   //number of slots stored in buffer
28
29         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
30         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
31         // machine id -> ...
32         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
33         private Vector<Long> rejectedmessagelist = new Vector<Long>();
34         private SlotBuffer buffer;
35         private CloudComm cloud;
36         private long sequencenumber; //Largest sequence number a client has received
37         private long localmachineid;
38         private TableStatus lastTableStatus;
39         static final int FREE_SLOTS = 10; //number of slots that should be kept free
40         static final int SKIP_THRESHOLD = 10;
41         private long liveslotcount = 0;
42         private int chance;
43         static final double RESIZE_MULTIPLE = 1.2;
44         static final double RESIZE_THRESHOLD = 0.75;
45         static final int REJECTED_THRESHOLD = 5;
46         private int resizethreshold;
47         private long lastliveslotseqn;  //smallest sequence number with a live entry
48         private Random random = new Random();
49         private long lastUncommittedTransaction = 0;
50
51         private int smallestTableStatusSeen = -1;
52         private int largestTableStatusSeen = -1;
53         private int lastSeenPendingTransactionSpeculateIndex = 0;
54         private int commitSequenceNumber = 0;
55         private long localTransactionSequenceNumber = 0;
56
57         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
58         private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
59         private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
60         private Map<Long, Abort> abortMap = null; // Set of the live aborts
61         private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
62         private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
63         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
64         private Map<Long, Transaction> uncommittedTransactionsMap = null;
65         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
66         private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
67         private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
68         private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
69         private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
70         private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
71         private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
72         private List<Commit> pendingCommitsList = null;
73         private List<Commit> pendingCommitsToDelete = null;
74         private Map<Long, LocalComm> localCommunicationChannels;
75         private Map<Long, TransactionStatus> transactionStatusMap = null;
76         private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
77
78         private Semaphore mutex = null;
79
80
81
82         public Table(String hostname, String baseurl, String password, long _localmachineid) {
83                 localmachineid = _localmachineid;
84                 buffer = new SlotBuffer();
85                 numslots = buffer.capacity();
86                 setResizeThreshold();
87                 sequencenumber = 0;
88                 cloud = new CloudComm(this, hostname, baseurl, password);
89                 lastliveslotseqn = 1;
90
91                 setupDataStructs();
92         }
93
94         public Table(CloudComm _cloud, long _localmachineid) {
95                 localmachineid = _localmachineid;
96                 buffer = new SlotBuffer();
97                 numslots = buffer.capacity();
98                 setResizeThreshold();
99                 sequencenumber = 0;
100                 cloud = _cloud;
101
102                 setupDataStructs();
103         }
104
105         private void setupDataStructs() {
106                 pendingTransQueue = new LinkedList<PendingTransaction>();
107                 commitMap = new HashMap<Long, Map<Long, Commit>>();
108                 abortMap = new HashMap<Long, Abort>();
109                 committedMapByKey = new HashMap<IoTString, Commit>();
110                 commitedTable = new HashMap<IoTString, KeyValue>();
111                 speculativeTable = new HashMap<IoTString, KeyValue>();
112                 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
113                 arbitratorTable = new HashMap<IoTString, Long>();
114                 newKeyTable = new HashMap<IoTString, NewKey>();
115                 newCommitMap = new HashMap<Long, Map<Long, Commit>>();
116                 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
117                 lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
118                 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
119                 pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
120                 pendingCommitsList = new LinkedList<Commit>();
121                 pendingCommitsToDelete = new LinkedList<Commit>();
122                 localCommunicationChannels = new HashMap<Long, LocalComm>();
123                 transactionStatusMap = new HashMap<Long, TransactionStatus>();
124                 transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
125                 mutex = new Semaphore(1);
126         }
127
128         public void initTable() throws ServerException {
129                 cloud.setSalt();//Set the salt
130                 Slot s = new Slot(this, 1, localmachineid);
131                 TableStatus status = new TableStatus(s, numslots);
132                 s.addEntry(status);
133                 Slot[] array = cloud.putSlot(s, numslots);
134                 if (array == null) {
135                         array = new Slot[] {s};
136                         /* update data structure */
137                         validateandupdate(array, true);
138                 } else {
139                         throw new Error("Error on initialization");
140                 }
141         }
142
143         public void rebuild() throws ServerException, InterruptedException {
144                 mutex.acquire();
145                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
146                 validateandupdate(newslots, true);
147                 mutex.release();
148         }
149
150         // TODO: delete method
151         public void printSlots() {
152                 long o = buffer.getOldestSeqNum();
153                 long n = buffer.getNewestSeqNum();
154
155                 int[] types = new int[10];
156
157                 int num = 0;
158
159                 int livec = 0;
160                 int deadc = 0;
161                 for (long i = o; i < (n + 1); i++) {
162                         Slot s = buffer.getSlot(i);
163
164                         Vector<Entry> entries = s.getEntries();
165
166                         for (Entry e : entries) {
167                                 if (e.isLive()) {
168                                         int type = e.getType();
169                                         types[type] = types[type] + 1;
170                                         num++;
171                                         livec++;
172                                 } else {
173                                         deadc++;
174                                 }
175                         }
176                 }
177
178                 for (int i = 0; i < 10; i++) {
179                         System.out.println(i + "    " + types[i]);
180                 }
181                 System.out.println("Live count:   " + livec);
182                 System.out.println("Dead count:   " + deadc);
183                 System.out.println("Old:   " + o);
184                 System.out.println("New:   " + n);
185                 System.out.println("Size:   " + buffer.size());
186                 System.out.println("Commits Key Map:   " + commitedTable.size());
187                 // System.out.println("Commits Live Map:   " + commitMap.size());
188                 System.out.println("Pending:   " + pendingTransQueue.size());
189
190                 // List<IoTString> strList = new ArrayList<IoTString>();
191                 // for (int i = 0; i < 100; i++) {
192                 //      String keyA = "a" + i;
193                 //      String keyB = "b" + i;
194                 //      String keyC = "c" + i;
195                 //      String keyD = "d" + i;
196
197                 //      IoTString iKeyA = new IoTString(keyA);
198                 //      IoTString iKeyB = new IoTString(keyB);
199                 //      IoTString iKeyC = new IoTString(keyC);
200                 //      IoTString iKeyD = new IoTString(keyD);
201
202                 //      strList.add(iKeyA);
203                 //      strList.add(iKeyB);
204                 //      strList.add(iKeyC);
205                 //      strList.add(iKeyD);
206                 // }
207
208
209                 // for (Long l : commitMap.keySet()) {
210                 //      for (Long l2 : commitMap.get(l).keySet()) {
211                 //              for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
212                 //                      strList.remove(kv.getKey());
213                 //                      System.out.print(kv.getKey() + "    ");
214                 //              }
215                 //      }
216                 // }
217
218                 // System.out.println();
219                 // System.out.println();
220
221                 // for (IoTString s : strList) {
222                 //      System.out.print(s + "    ");
223                 // }
224                 // System.out.println();
225                 // System.out.println(strList.size());
226         }
227
228         public long getId() {
229                 return localmachineid;
230         }
231
232         public boolean hasConnection() {
233                 return cloud.hasConnection();
234         }
235
236         public String toString() {
237                 String retString = " Committed Table: \n";
238                 retString += "---------------------------\n";
239                 retString += commitedTable.toString();
240
241                 retString += "\n\n";
242
243                 retString += " Speculative Table: \n";
244                 retString += "---------------------------\n";
245                 retString += speculativeTable.toString();
246
247                 return retString;
248         }
249
250         public void addLocalComm(long machineId, LocalComm lc) {
251                 localCommunicationChannels.put(machineId, lc);
252         }
253         public Long getArbitrator(IoTString key) throws InterruptedException {
254
255                 mutex.acquire();
256                 Long arb = arbitratorTable.get(key);
257                 mutex.release();
258
259                 return arb;
260         }
261
262         public IoTString getCommitted(IoTString key) throws InterruptedException {
263
264                 mutex.acquire();
265                 KeyValue kv = commitedTable.get(key);
266                 mutex.release();
267
268
269                 if (kv != null) {
270                         return kv.getValue();
271                 } else {
272                         return null;
273                 }
274         }
275
276         public IoTString getSpeculative(IoTString key) throws InterruptedException {
277
278                 mutex.acquire();
279
280                 KeyValue kv = pendingTransSpeculativeTable.get(key);
281
282                 if (kv == null) {
283                         kv = speculativeTable.get(key);
284                 }
285
286                 if (kv == null) {
287                         kv = commitedTable.get(key);
288                 }
289                 mutex.release();
290
291
292                 if (kv != null) {
293                         return kv.getValue();
294                 } else {
295                         return null;
296                 }
297         }
298
299         public IoTString getCommittedAtomic(IoTString key) throws InterruptedException {
300
301                 mutex.acquire();
302
303                 KeyValue kv = commitedTable.get(key);
304
305                 if (arbitratorTable.get(key) == null) {
306                         throw new Error("Key not Found.");
307                 }
308
309                 // Make sure new key value pair matches the current arbitrator
310                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
311                         // TODO: Maybe not throw en error
312                         throw new Error("Not all Key Values Match Arbitrator.");
313                 }
314
315                 mutex.release();
316
317                 if (kv != null) {
318                         pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
319                         return kv.getValue();
320                 } else {
321                         pendingTransBuild.addKVGuard(new KeyValue(key, null));
322                         return null;
323                 }
324         }
325
326         public IoTString getSpeculativeAtomic(IoTString key) throws InterruptedException {
327
328                 mutex.acquire();
329
330                 if (arbitratorTable.get(key) == null) {
331                         throw new Error("Key not Found.");
332                 }
333
334                 // Make sure new key value pair matches the current arbitrator
335                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
336                         // TODO: Maybe not throw en error
337                         throw new Error("Not all Key Values Match Arbitrator.");
338                 }
339
340                 KeyValue kv = pendingTransSpeculativeTable.get(key);
341
342                 if (kv == null) {
343                         kv = speculativeTable.get(key);
344                 }
345
346                 if (kv == null) {
347                         kv = commitedTable.get(key);
348                 }
349
350                 mutex.release();
351
352                 if (kv != null) {
353                         pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
354                         return kv.getValue();
355                 } else {
356                         pendingTransBuild.addKVGuard(new KeyValue(key, null));
357                         return null;
358                 }
359         }
360
361         public Pair<Boolean, Boolean> update() throws InterruptedException {
362
363                 mutex.acquire();
364
365                 boolean gotLatestFromServer = false;
366                 boolean didSendLocal = false;
367
368                 try {
369                         Slot[] newslots = cloud.getSlots(sequencenumber + 1);
370                         validateandupdate(newslots, false);
371                         gotLatestFromServer = true;
372
373                         if (!pendingTransQueue.isEmpty()) {
374
375                                 // We have a pending transaction so do full insertion
376                                 processPendingTrans();
377                         } else {
378
379                                 // We dont have a pending transaction so do minimal effort
380                                 updateWithNotPendingTrans();
381                         }
382
383                         didSendLocal = true;
384
385
386                 } catch (Exception e) {
387                         // could not update so do nothing
388                 }
389
390
391                 mutex.release();
392                 return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
393         }
394
395         public Boolean updateFromLocal(long arb) throws InterruptedException {
396                 LocalComm lc = localCommunicationChannels.get(arb);
397                 if (lc == null) {
398                         // Cant talk directly to arbitrator so cant do anything
399                         return false;
400                 }
401
402                 byte[] array = new byte[Long.BYTES ];
403                 ByteBuffer bbEncode = ByteBuffer.wrap(array);
404                 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(arb);
405                 if (lastSeenCommit != null) {
406                         bbEncode.putLong(lastSeenCommit);
407                 } else {
408                         bbEncode.putLong(0);
409                 }
410
411                 mutex.acquire();
412                 byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
413
414                 // Decode the data
415                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
416                 boolean didCommit = bbDecode.get() == 1;
417                 int numberOfCommites = bbDecode.getInt();
418
419                 List<Commit> newCommits = new LinkedList<Commit>();
420                 for (int i = 0; i < numberOfCommites; i++ ) {
421                         bbDecode.get();
422                         Commit com = (Commit)Commit.decode(null, bbDecode);
423                         newCommits.add(com);
424                 }
425
426                 for (Commit commit : newCommits) {
427                         // Prepare to process the commit
428                         processEntry(commit);
429                 }
430
431                 boolean didCommitOrSpeculate = proccessAllNewCommits();
432
433                 // Go through all uncommitted transactions and kill the ones that are dead
434                 deleteDeadUncommittedTransactions();
435
436                 // Speculate on key value pairs
437                 didCommitOrSpeculate |= createSpeculativeTable();
438                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
439
440
441                 mutex.release();
442
443                 return true;
444         }
445
446         public void startTransaction() throws InterruptedException {
447                 // Create a new transaction, invalidates any old pending transactions.
448                 pendingTransBuild = new PendingTransaction();
449         }
450
451         public void addKV(IoTString key, IoTString value) {
452
453                 if (arbitratorTable.get(key) == null) {
454                         throw new Error("Key not Found.");
455                 }
456
457                 // Make sure new key value pair matches the current arbitrator
458                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
459                         // TODO: Maybe not throw en error
460                         throw new Error("Not all Key Values Match Arbitrator.");
461                 }
462
463                 KeyValue kv = new KeyValue(key, value);
464                 pendingTransBuild.addKV(kv);
465         }
466
467         public TransactionStatus commitTransaction() throws InterruptedException {
468
469                 if (pendingTransBuild.getKVUpdates().size() == 0) {
470
471                         // transaction with no updates will have no effect on the system
472                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
473                 }
474
475                 mutex.acquire();
476
477                 TransactionStatus transStatus = null;
478
479                 if (pendingTransBuild.getArbitrator() != localmachineid) {
480
481                         // set the local sequence number so we can recognize this transaction later
482                         pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
483                         localTransactionSequenceNumber++;
484
485                         transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
486                         transactionStatusNotSentMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
487
488                         // Add the pending transaction to the queue
489                         pendingTransQueue.add(pendingTransBuild);
490
491
492                         for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
493                                 PendingTransaction pt = pendingTransQueue.get(i);
494
495                                 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
496
497                                         lastSeenPendingTransactionSpeculateIndex = i;
498
499                                         for (KeyValue kv : pt.getKVUpdates()) {
500                                                 pendingTransSpeculativeTable.put(kv.getKey(), kv);
501                                         }
502
503                                 }
504                         }
505                 } else {
506                         Transaction ut = new Transaction(null,
507                                                          -1,
508                                                          localmachineid,
509                                                          pendingTransBuild.getArbitrator(),
510                                                          pendingTransBuild.getKVUpdates(),
511                                                          pendingTransBuild.getKVGuard());
512
513                         Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
514
515                         if (retData.getFirst()) {
516                                 transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
517                         } else {
518                                 transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
519                         }
520                 }
521
522                 // Try to insert transactions if possible
523                 if (!pendingTransQueue.isEmpty()) {
524                         // We have a pending transaction so do full insertion
525                         processPendingTrans();
526                 } else {
527                         try {
528                                 // We dont have a pending transaction so do minimal effort
529                                 updateWithNotPendingTrans();
530                         } catch (Exception e) {
531                                 // Do nothing
532                         }
533                 }
534
535                 // reset it so next time is fresh
536                 pendingTransBuild = new PendingTransaction();
537
538
539                 mutex.release();
540                 return transStatus;
541         }
542
543         public boolean createNewKey(IoTString keyName, long machineId) throws ServerException, InterruptedException {
544                 try {
545                         mutex.acquire();
546
547                         while (true) {
548                                 if (arbitratorTable.get(keyName) != null) {
549                                         // There is already an arbitrator
550                                         mutex.release();
551                                         return false;
552                                 }
553
554                                 if (tryput(keyName, machineId, false)) {
555                                         // If successfully inserted
556                                         mutex.release();
557                                         return true;
558                                 }
559                         }
560                 } catch (ServerException e) {
561                         mutex.release();
562                         throw e;
563                 }
564         }
565
566         private void processPendingTrans() throws InterruptedException {
567
568                 boolean sentAllPending = false;
569                 try {
570                         while (!pendingTransQueue.isEmpty()) {
571                                 if (tryput( pendingTransQueue.peek(), false)) {
572                                         pendingTransQueue.poll();
573                                 }
574                         }
575
576                         // if got here then all pending transactions were sent
577                         sentAllPending = true;
578                 } catch (Exception e) {
579                         // There was a connection error
580                         sentAllPending = false;
581                 }
582
583                 if (!sentAllPending) {
584
585                         for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
586                                 PendingTransaction pt = i.next();
587                                 LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
588                                 if (lc == null) {
589                                         // Cant talk directly to arbitrator so cant do anything
590                                         continue;
591                                 }
592
593
594                                 Transaction ut = new Transaction(null,
595                                                                  -1,
596                                                                  localmachineid,
597                                                                  pendingTransBuild.getArbitrator(),
598                                                                  pendingTransBuild.getKVUpdates(),
599                                                                  pendingTransBuild.getKVGuard());
600
601
602                                 Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
603
604                                 for (Commit commit : retData.getSecond()) {
605                                         // Prepare to process the commit
606                                         processEntry(commit);
607                                 }
608
609                                 boolean didCommitOrSpeculate = proccessAllNewCommits();
610
611                                 // Go through all uncommitted transactions and kill the ones that are dead
612                                 deleteDeadUncommittedTransactions();
613
614                                 // Speculate on key value pairs
615                                 didCommitOrSpeculate |= createSpeculativeTable();
616                                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
617
618                                 if (retData.getFirst()) {
619                                         TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
620                                         if (transStatus != null) {
621                                                 transStatus.setStatus(TransactionStatus.StatusCommitted);
622                                         }
623
624                                 } else {
625                                         TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
626                                         if (transStatus != null) {
627                                                 transStatus.setStatus(TransactionStatus.StatusAborted);
628                                         }
629                                 }
630                                 i.remove();
631                         }
632                 }
633         }
634
635         private void updateWithNotPendingTrans() throws ServerException, InterruptedException {
636                 boolean doEnd = false;
637                 boolean needResize = false;
638                 while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0)  || (pendingCommitsList.size() > 0))   ) {
639                         boolean resize = needResize;
640                         needResize = false;
641
642                         Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
643                         int newsize = 0;
644                         if (liveslotcount > resizethreshold) {
645                                 resize = true; //Resize is forced
646                         }
647
648                         if (resize) {
649                                 newsize = (int) (numslots * RESIZE_MULTIPLE);
650                                 TableStatus status = new TableStatus(s, newsize);
651                                 s.addEntry(status);
652                         }
653
654                         doRejectedMessages(s);
655
656                         ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
657
658                         // Resize was needed so redo call
659                         if (retTup.getFirst()) {
660                                 needResize = true;
661                                 continue;
662                         }
663
664                         // Extract working variables
665                         boolean seenliveslot = retTup.getSecond();
666                         long seqn = retTup.getThird();
667
668                         // Did need to arbitrate
669                         doEnd = !doArbitration(s);
670
671                         doOptionalRescue(s, seenliveslot, seqn, resize);
672
673                         int max = 0;
674                         if (resize) {
675                                 max = newsize;
676                         }
677
678                         Slot[] array = cloud.putSlot(s, max);
679                         if (array == null) {
680                                 array = new Slot[] {s};
681                                 rejectedmessagelist.clear();
682
683                                 // Delete pending commits that were sent to the cloud
684                                 deletePendingCommits();
685
686                         }       else {
687                                 if (array.length == 0)
688                                         throw new Error("Server Error: Did not send any slots");
689                                 rejectedmessagelist.add(s.getSequenceNumber());
690                                 doEnd = false;
691                         }
692
693                         /* update data structure */
694                         validateandupdate(array, true);
695                 }
696         }
697
698         private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) throws InterruptedException {
699
700                 // encode the request
701                 byte[] array = new byte[Long.BYTES + ut.getSize()];
702                 ByteBuffer bbEncode = ByteBuffer.wrap(array);
703                 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
704                 if (lastSeenCommit != null) {
705                         bbEncode.putLong(lastSeenCommit);
706                 } else {
707                         bbEncode.putLong(0);
708                 }
709                 ut.encode(bbEncode);
710
711                 byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
712
713                 // Decode the data
714                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
715                 boolean didCommit = bbDecode.get() == 1;
716                 int numberOfCommites = bbDecode.getInt();
717
718                 List<Commit> newCommits = new LinkedList<Commit>();
719                 for (int i = 0; i < numberOfCommites; i++ ) {
720                         bbDecode.get();
721                         Commit com = (Commit)Commit.decode(null, bbDecode);
722                         newCommits.add(com);
723                 }
724
725                 return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
726         }
727
728         public byte[] localCommInput(byte[] data) throws InterruptedException {
729
730
731
732                 // Decode the data
733                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
734                 long lastSeenCommit = bbDecode.getLong();
735
736                 Transaction ut = null;
737                 if (data.length != Long.BYTES) {
738                         bbDecode.get();
739                         ut = (Transaction)Transaction.decode(null, bbDecode);
740                 }
741
742                 mutex.acquire();
743                 // Do the local update and arbitrate
744                 Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
745                 mutex.release();
746
747
748                 // Calculate the size of the response
749                 int size = Byte.BYTES + Integer.BYTES;
750                 for (Commit com : returnData.getSecond()) {
751                         size += com.getSize();
752                 }
753
754                 // encode the response
755                 byte[] array = new byte[size];
756                 ByteBuffer bbEncode = ByteBuffer.wrap(array);
757                 if (returnData.getFirst()) {
758                         bbEncode.put((byte)1);
759                 } else {
760                         bbEncode.put((byte)0);
761                 }
762                 bbEncode.putInt(returnData.getSecond().size());
763
764                 for (Commit com : returnData.getSecond()) {
765                         com.encode(bbEncode);
766                 }
767
768                 return bbEncode.array();
769         }
770
771         private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
772
773                 List<Commit> returnCommits = new ArrayList<Commit>();
774
775                 if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
776                         // There is a commit that the other client has not seen yet
777
778                         Map<Long, Commit> cm = commitMap.get(localmachineid);
779                         if (cm != null) {
780
781                                 List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
782                                 Collections.sort(commitKeys);
783
784
785                                 for (int i = (commitKeys.size() - 1); i >= 0; i--) {
786                                         Commit com = cm.get(commitKeys.get(i));
787
788                                         if (com.getSequenceNumber() <= lastCommitSeen) {
789                                                 break;
790                                         }
791                                         returnCommits.add((Commit)com.getCopy(null));
792                                 }
793                         }
794                 }
795
796
797                 if ((ut == null) || (ut.getArbitrator() != localmachineid)) {
798                         // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
799                         // or there is no transaction to process
800                         return new Pair<Boolean, List<Commit>>(false, returnCommits);
801                 }
802
803                 if (!ut.evaluateGuard(commitedTable, null)) {
804                         // Guard evaluated as false so return only the commits that the other device has not seen yet
805                         return new Pair<Boolean, List<Commit>>(false, returnCommits);
806                 }
807
808                 // create the commit
809                 Commit commit = new Commit(null,
810                                            -1,
811                                            commitSequenceNumber,
812                                            ut.getArbitrator(),
813                                            ut.getkeyValueUpdateSet());
814                 commitSequenceNumber = commitSequenceNumber + 1;
815
816                 // Add to the pending commits list
817                 pendingCommitsList.add(commit);
818
819                 // Add this commit so we can send it back
820                 returnCommits.add(commit);
821
822                 // Prepare to process the commit
823                 processEntry(commit);
824
825                 boolean didCommitOrSpeculate = proccessAllNewCommits();
826
827                 // Go through all uncommitted transactions and kill the ones that are dead
828                 deleteDeadUncommittedTransactions();
829
830                 // Speculate on key value pairs
831                 didCommitOrSpeculate |= createSpeculativeTable();
832                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
833
834                 return new Pair<Boolean, List<Commit>>(true, returnCommits);
835         }
836
837         public void decrementLiveCount() {
838                 liveslotcount--;
839         }
840
841         private void setResizeThreshold() {
842                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
843                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
844         }
845
846         private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
847                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
848
849                 int newsize = 0;
850                 if (liveslotcount > resizethreshold) {
851                         resize = true; //Resize is forced
852                 }
853
854                 if (resize) {
855                         newsize = (int) (numslots * RESIZE_MULTIPLE);
856                         TableStatus status = new TableStatus(s, newsize);
857                         s.addEntry(status);
858                 }
859
860                 doRejectedMessages(s);
861
862                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
863
864                 // Resize was needed so redo call
865                 if (retTup.getFirst()) {
866                         return tryput(pendingTrans, true);
867                 }
868
869                 // Extract working variables
870                 boolean seenliveslot = retTup.getSecond();
871                 long seqn = retTup.getThird();
872
873                 doArbitration(s);
874
875                 Transaction trans = new Transaction(s,
876                                                     s.getSequenceNumber(),
877                                                     localmachineid,
878                                                     pendingTrans.getArbitrator(),
879                                                     pendingTrans.getKVUpdates(),
880                                                     pendingTrans.getKVGuard());
881                 boolean insertedTrans = false;
882                 if (s.hasSpace(trans)) {
883                         s.addEntry(trans);
884                         insertedTrans = true;
885                 }
886
887                 doOptionalRescue(s, seenliveslot, seqn, resize);
888                 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
889
890                 if (sendRetData.getFirst()) {
891                         // update the status and change what the sequence number is for the
892                         TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTrans.getMachineLocalTransSeqNum());
893                         transStatus.setStatus(TransactionStatus.StatusSent);
894                         transStatus.setSentTransaction();
895                         transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
896                 }
897
898
899                 if (sendRetData.getSecond().length != 0) {
900                         // insert into the local block chain
901                         validateandupdate(sendRetData.getSecond(), true);
902                 }
903
904                 return sendRetData.getFirst();
905         }
906
907         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
908                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
909                 int newsize = 0;
910                 if (liveslotcount > resizethreshold) {
911                         resize = true; //Resize is forced
912                 }
913
914                 if (resize) {
915                         newsize = (int) (numslots * RESIZE_MULTIPLE);
916                         TableStatus status = new TableStatus(s, newsize);
917                         s.addEntry(status);
918                 }
919
920                 doRejectedMessages(s);
921                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
922
923                 // Resize was needed so redo call
924                 if (retTup.getFirst()) {
925                         return tryput(keyName, arbMachineid, true);
926                 }
927
928                 // Extract working variables
929                 boolean seenliveslot = retTup.getSecond();
930                 long seqn = retTup.getThird();
931
932                 doArbitration(s);
933
934                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
935
936                 boolean insertedNewKey = false;
937                 if (s.hasSpace(newKey)) {
938                         s.addEntry(newKey);
939                         insertedNewKey = true;
940                 }
941
942                 doOptionalRescue(s, seenliveslot, seqn, resize);
943                 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
944
945                 if (sendRetData.getSecond().length != 0) {
946                         // insert into the local block chain
947                         validateandupdate(sendRetData.getSecond(), true);
948                 }
949
950                 return sendRetData.getFirst();
951         }
952
953         private void doRejectedMessages(Slot s) {
954                 if (! rejectedmessagelist.isEmpty()) {
955                         /* TODO: We should avoid generating a rejected message entry if
956                          * there is already a sufficient entry in the queue (e.g.,
957                          * equalsto value of true and same sequence number).  */
958
959                         long old_seqn = rejectedmessagelist.firstElement();
960                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
961                                 long new_seqn = rejectedmessagelist.lastElement();
962                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
963                                 s.addEntry(rm);
964                         } else {
965                                 long prev_seqn = -1;
966                                 int i = 0;
967                                 /* Go through list of missing messages */
968                                 for (; i < rejectedmessagelist.size(); i++) {
969                                         long curr_seqn = rejectedmessagelist.get(i);
970                                         Slot s_msg = buffer.getSlot(curr_seqn);
971                                         if (s_msg != null)
972                                                 break;
973                                         prev_seqn = curr_seqn;
974                                 }
975                                 /* Generate rejected message entry for missing messages */
976                                 if (prev_seqn != -1) {
977                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
978                                         s.addEntry(rm);
979                                 }
980                                 /* Generate rejected message entries for present messages */
981                                 for (; i < rejectedmessagelist.size(); i++) {
982                                         long curr_seqn = rejectedmessagelist.get(i);
983                                         Slot s_msg = buffer.getSlot(curr_seqn);
984                                         long machineid = s_msg.getMachineID();
985                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
986                                         s.addEntry(rm);
987                                 }
988                         }
989                 }
990         }
991
992         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
993                 long newestseqnum = buffer.getNewestSeqNum();
994                 long oldestseqnum = buffer.getOldestSeqNum();
995                 if (lastliveslotseqn < oldestseqnum)
996                         lastliveslotseqn = oldestseqnum;
997
998                 long seqn = lastliveslotseqn;
999                 boolean seenliveslot = false;
1000                 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
1001                 long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1002
1003
1004                 // Mandatory Rescue
1005                 for (; seqn < threshold; seqn++) {
1006                         Slot prevslot = buffer.getSlot(seqn);
1007                         // Push slot number forward
1008                         if (! seenliveslot)
1009                                 lastliveslotseqn = seqn;
1010
1011                         if (! prevslot.isLive())
1012                                 continue;
1013                         seenliveslot = true;
1014                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1015                         for (Entry liveentry : liveentries) {
1016                                 if (s.hasSpace(liveentry)) {
1017                                         s.addEntry(liveentry);
1018                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
1019                                         if (!resize) {
1020                                                 System.out.println("B"); //?
1021                                                 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
1022                                         }
1023                                 }
1024                         }
1025                 }
1026
1027                 // Did not resize
1028                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
1029         }
1030
1031         private boolean doArbitration(Slot s) {
1032
1033                 // flag whether we have finished all arbitration
1034                 boolean stillHasArbitration = false;
1035
1036                 pendingCommitsToDelete.clear();
1037
1038                 // First add queue commits
1039                 for (Commit commit : pendingCommitsList) {
1040                         if (s.hasSpace(commit)) {
1041                                 s.addEntry(commit);
1042                                 pendingCommitsToDelete.add(commit);
1043                         } else {
1044                                 // Ran out of space so move on but still not done
1045                                 stillHasArbitration = true;
1046                                 return stillHasArbitration;
1047                         }
1048                 }
1049
1050                 // Arbitrate
1051                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1052                 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
1053
1054                 // Sort from oldest to newest
1055                 Collections.sort(transSeqNums);
1056
1057                 for (Long transNum : transSeqNums) {
1058                         Transaction ut = uncommittedTransactionsMap.get(transNum);
1059
1060                         // Check if this machine arbitrates for this transaction
1061                         if (ut.getArbitrator() != localmachineid ) {
1062                                 continue;
1063                         }
1064
1065                         // we did have something to arbitrate on
1066                         stillHasArbitration = true;
1067
1068                         Entry newEntry = null;
1069
1070                         if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
1071                                 // Guard evaluated as true
1072
1073                                 // update the local tmp current key set
1074                                 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
1075                                         speculativeTableTmp.put(kv.getKey(), kv);
1076                                 }
1077
1078                                 // create the commit
1079                                 newEntry = new Commit(s,
1080                                                       ut.getSequenceNumber(),
1081                                                       commitSequenceNumber,
1082                                                       ut.getArbitrator(),
1083                                                       ut.getkeyValueUpdateSet());
1084                                 commitSequenceNumber = commitSequenceNumber + 1;
1085                         } else {
1086                                 // Guard was false
1087
1088                                 // create the abort
1089                                 newEntry = new Abort(s,
1090                                                      ut.getSequenceNumber(),
1091                                                      ut.getMachineID(),
1092                                                      ut.getArbitrator());
1093                         }
1094
1095                         if ((newEntry != null) && s.hasSpace(newEntry)) {
1096                                 s.addEntry(newEntry);
1097                         } else {
1098                                 break;
1099                         }
1100                 }
1101
1102                 return stillHasArbitration;
1103         }
1104
1105         private void deletePendingCommits() {
1106                 for (Commit com : pendingCommitsToDelete) {
1107                         pendingCommitsList.remove(com);
1108                 }
1109                 pendingCommitsToDelete.clear();
1110         }
1111
1112         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1113                 /* now go through live entries from least to greatest sequence number until
1114                  * either all live slots added, or the slot doesn't have enough room
1115                  * for SKIP_THRESHOLD consecutive entries*/
1116                 int skipcount = 0;
1117                 long newestseqnum = buffer.getNewestSeqNum();
1118                 search:
1119                 for (; seqn <= newestseqnum; seqn++) {
1120                         Slot prevslot = buffer.getSlot(seqn);
1121                         //Push slot number forward
1122                         if (!seenliveslot)
1123                                 lastliveslotseqn = seqn;
1124
1125                         if (!prevslot.isLive())
1126                                 continue;
1127                         seenliveslot = true;
1128                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1129                         for (Entry liveentry : liveentries) {
1130                                 if (s.hasSpace(liveentry))
1131                                         s.addEntry(liveentry);
1132                                 else {
1133                                         skipcount++;
1134                                         if (skipcount > SKIP_THRESHOLD)
1135                                                 break search;
1136                                 }
1137                         }
1138                 }
1139         }
1140
1141         private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
1142                 int max = 0;
1143                 if (resize)
1144                         max = newsize;
1145
1146                 Slot[] array = cloud.putSlot(s, max);
1147                 if (array == null) {
1148                         array = new Slot[] {s};
1149                         rejectedmessagelist.clear();
1150
1151                         // Delete pending commits that were sent to the cloud
1152                         deletePendingCommits();
1153                 }       else {
1154                         // if (array.length == 0)
1155                         // throw new Error("Server Error: Did not send any slots");
1156                         rejectedmessagelist.add(s.getSequenceNumber());
1157                         inserted = false;
1158                 }
1159
1160                 return new Pair<Boolean, Slot[]>(inserted, array);
1161         }
1162
1163         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
1164                 /* The cloud communication layer has checked slot HMACs already
1165                          before decoding */
1166                 if (newslots.length == 0) return;
1167
1168                 // Reset the table status declared sizes
1169                 smallestTableStatusSeen = -1;
1170                 largestTableStatusSeen = -1;
1171
1172                 long firstseqnum = newslots[0].getSequenceNumber();
1173                 if (firstseqnum <= sequencenumber) {
1174                         throw new Error("Server Error: Sent older slots!");
1175                 }
1176
1177                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
1178                 checkHMACChain(indexer, newslots);
1179
1180                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
1181
1182                 // initExpectedSize(firstseqnum);
1183                 for (Slot slot : newslots) {
1184                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
1185                         // updateExpectedSize();
1186                 }
1187
1188                 /* If there is a gap, check to see if the server sent us everything. */
1189                 if (firstseqnum != (sequencenumber + 1)) {
1190
1191                         // TODO: Check size
1192                         checkNumSlots(newslots.length);
1193                         if (!machineSet.isEmpty()) {
1194                                 throw new Error("Missing record for machines: " + machineSet);
1195                         }
1196                 }
1197
1198
1199                 commitNewMaxSize();
1200
1201                 /* Commit new to slots. */
1202                 for (Slot slot : newslots) {
1203                         buffer.putSlot(slot);
1204                         liveslotcount++;
1205                 }
1206                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
1207
1208                 // Process all on key value pairs
1209                 boolean didCommitOrSpeculate = proccessAllNewCommits();
1210
1211                 // Go through all uncommitted transactions and kill the ones that are dead
1212                 deleteDeadUncommittedTransactions();
1213
1214                 // Speculate on key value pairs
1215                 didCommitOrSpeculate |= createSpeculativeTable();
1216
1217                 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
1218         }
1219
1220         private boolean proccessAllNewCommits() {
1221                 // Process only if there are commit
1222                 if (newCommitMap.keySet().size() == 0) {
1223                         return false;
1224                 }
1225                 boolean didProcessNewCommit = false;
1226
1227                 for (Long arb : newCommitMap.keySet()) {
1228
1229                         List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
1230
1231                         // Sort from oldest to newest commit
1232                         Collections.sort(commitSeqNums);
1233
1234                         // Go through each new commit one by one
1235                         for (Long entrySeqNum : commitSeqNums) {
1236                                 Commit entry = newCommitMap.get(arb).get(entrySeqNum);
1237
1238                                 long lastCommitSeenSeqNum = -1;
1239                                 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
1240                                         lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
1241                                 }
1242
1243                                 if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
1244                                         Map<Long, Commit> cm = commitMap.get(arb);
1245                                         if (cm == null) {
1246                                                 cm = new HashMap<Long, Commit>();
1247                                         }
1248
1249                                         Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
1250                                         commitMap.put(arb, cm);
1251
1252                                         if (prevCommit != null) {
1253                                                 prevCommit.setDead();
1254
1255                                                 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
1256                                                         committedMapByKey.put(kv.getKey(), entry);
1257                                                 }
1258                                         }
1259
1260                                         continue;
1261                                 }
1262
1263                                 Set<Commit> commitsToEditSet = new HashSet<Commit>();
1264
1265                                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1266                                         commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
1267                                 }
1268
1269                                 commitsToEditSet.remove(null);
1270
1271                                 for (Commit prevCommit : commitsToEditSet) {
1272
1273                                         Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
1274
1275                                         if (!prevCommit.isLive()) {
1276                                                 Map<Long, Commit> cm = commitMap.get(arb);
1277
1278                                                 // remove it from the map so that it can be set as dead
1279                                                 if (cm != null) {
1280                                                         cm.remove(prevCommit.getSequenceNumber());
1281                                                         commitMap.put(arb, cm);
1282                                                 }
1283                                         }
1284                                 }
1285
1286                                 // Add the new commit
1287                                 Map<Long, Commit> cm = commitMap.get(arb);
1288                                 if (cm == null) {
1289                                         cm = new HashMap<Long, Commit>();
1290                                 }
1291                                 cm.put(entry.getSequenceNumber(), entry);
1292                                 commitMap.put(arb, cm);
1293
1294                                 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
1295
1296                                 // set the trans sequence number if we are able to
1297                                 if (entry.getTransSequenceNumber() != -1) {
1298                                         lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1299                                 }
1300
1301                                 didProcessNewCommit = true;
1302
1303                                 // Update the committed table list
1304                                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1305                                         IoTString key = kv.getKey();
1306                                         commitedTable.put(key, kv);
1307                                         committedMapByKey.put(key, entry);
1308                                 }
1309                         }
1310                 }
1311                 // Clear the new commits storage so we can use it later
1312                 newCommitMap.clear();
1313
1314
1315                 // go through all saved transactions and update the status of those that can be updated
1316                 for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
1317                         Map.Entry<Long, TransactionStatus> entry = i.next();
1318                         long seqnum = entry.getKey();
1319                         TransactionStatus status = entry.getValue();
1320
1321                         if (status.getSentTransaction()) {
1322
1323                                 Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(status.getArbitrator());
1324                                 Long abortSeqNum = lastAbortSeenSeqNumMap.get(status.getArbitrator());
1325
1326                                 if (((commitSeqNum != null) && (seqnum <= commitSeqNum)) ||
1327                                         ((abortSeqNum != null) && (seqnum <= abortSeqNum))) {
1328                                         status.setStatus(TransactionStatus.StatusCommitted);
1329                                         i.remove();
1330                                 }
1331                         }
1332                 }
1333
1334                 return didProcessNewCommit;
1335         }
1336
1337         private void deleteDeadUncommittedTransactions() {
1338                 // Make dead the transactions
1339                 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
1340                         Transaction prevtrans = i.next().getValue();
1341                         long transArb = prevtrans.getArbitrator();
1342
1343                         Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
1344                         Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
1345
1346                         if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
1347                                 ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
1348                                 i.remove();
1349                                 prevtrans.setDead();
1350                         }
1351                 }
1352         }
1353
1354         private boolean createSpeculativeTable() {
1355                 if (uncommittedTransactionsMap.keySet().size() == 0) {
1356                         return false;
1357                 }
1358
1359                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1360                 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
1361
1362                 // Sort from oldest to newest commit
1363                 Collections.sort(utSeqNums);
1364
1365                 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
1366
1367                         speculativeTable.clear();
1368                         lastUncommittedTransaction = -1;
1369
1370                         for (Long key : utSeqNums) {
1371                                 Transaction trans = uncommittedTransactionsMap.get(key);
1372
1373                                 lastUncommittedTransaction = key;
1374
1375                                 if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
1376                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1377                                                 speculativeTableTmp.put(kv.getKey(), kv);
1378                                         }
1379                                 }
1380
1381                         }
1382                 } else {
1383                         for (Long key : utSeqNums) {
1384
1385                                 if (key <= lastUncommittedTransaction) {
1386                                         continue;
1387                                 }
1388
1389                                 lastUncommittedTransaction = key;
1390
1391                                 Transaction trans = uncommittedTransactionsMap.get(key);
1392
1393                                 if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
1394                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1395                                                 speculativeTableTmp.put(kv.getKey(), kv);
1396                                         }
1397                                 }
1398                         }
1399                 }
1400
1401                 for (IoTString key : speculativeTableTmp.keySet()) {
1402                         speculativeTable.put(key, speculativeTableTmp.get(key));
1403                 }
1404
1405                 return true;
1406         }
1407
1408         private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
1409
1410                 if (didCommitOrSpeculate) {
1411                         pendingTransSpeculativeTable.clear();
1412                         lastSeenPendingTransactionSpeculateIndex = 0;
1413
1414                         int index = 0;
1415                         for (PendingTransaction pt : pendingTransQueue) {
1416                                 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
1417
1418                                         lastSeenPendingTransactionSpeculateIndex = index;
1419                                         index++;
1420
1421                                         for (KeyValue kv : pt.getKVUpdates()) {
1422                                                 pendingTransSpeculativeTable.put(kv.getKey(), kv);
1423                                         }
1424
1425                                 }
1426                         }
1427                 }
1428         }
1429
1430         private int expectedsize, currmaxsize;
1431
1432         private void checkNumSlots(int numslots) {
1433
1434
1435                 // We only have 1 size so we must have this many slots
1436                 if (largestTableStatusSeen == smallestTableStatusSeen) {
1437                         if (numslots != smallestTableStatusSeen) {
1438                                 throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numslots);
1439                         }
1440                 } else {
1441                         // We have more than 1
1442                         if (numslots < smallestTableStatusSeen) {
1443                                 throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
1444                         }
1445                 }
1446
1447                 // if (numslots != expectedsize) {
1448                 // throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
1449                 // }
1450         }
1451
1452         private void initExpectedSize(long firstsequencenumber) {
1453                 long prevslots = firstsequencenumber;
1454                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
1455                 currmaxsize = numslots;
1456         }
1457
1458         private void updateExpectedSize() {
1459                 expectedsize++;
1460                 if (expectedsize > currmaxsize) {
1461                         expectedsize = currmaxsize;
1462                 }
1463         }
1464
1465         private void updateCurrMaxSize(int newmaxsize) {
1466                 currmaxsize = newmaxsize;
1467         }
1468
1469         private void commitNewMaxSize() {
1470
1471                 if (largestTableStatusSeen == -1) {
1472                         currmaxsize = numslots;
1473                 } else {
1474                         currmaxsize = largestTableStatusSeen;
1475                 }
1476
1477                 if (numslots != currmaxsize) {
1478                         buffer.resize(currmaxsize);
1479                 }
1480
1481                 numslots = currmaxsize;
1482                 setResizeThreshold();
1483         }
1484
1485         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
1486                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
1487         }
1488
1489         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
1490                 long oldseqnum = entry.getOldSeqNum();
1491                 long newseqnum = entry.getNewSeqNum();
1492                 boolean isequal = entry.getEqual();
1493                 long machineid = entry.getMachineID();
1494                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
1495                         Slot slot = indexer.getSlot(seqnum);
1496                         if (slot != null) {
1497                                 long slotmachineid = slot.getMachineID();
1498                                 if (isequal != (slotmachineid == machineid)) {
1499                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
1500                                 }
1501                         }
1502                 }
1503
1504                 HashSet<Long> watchset = new HashSet<Long>();
1505                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
1506                         long entry_mid = lastmsg_entry.getKey();
1507                         /* We've seen it, don't need to continue to watch.  Our next
1508                          * message will implicitly acknowledge it. */
1509                         if (entry_mid == localmachineid)
1510                                 continue;
1511                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
1512                         long entry_seqn = v.getFirst();
1513                         if (entry_seqn < newseqnum) {
1514                                 addWatchList(entry_mid, entry);
1515                                 watchset.add(entry_mid);
1516                         }
1517                 }
1518                 if (watchset.isEmpty())
1519                         entry.setDead();
1520                 else
1521                         entry.setWatchSet(watchset);
1522         }
1523
1524         private void processEntry(NewKey entry) {
1525                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
1526
1527                 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
1528
1529                 if (oldNewKey != null) {
1530                         oldNewKey.setDead();
1531                 }
1532         }
1533
1534         private void processEntry(Transaction entry) {
1535
1536                 long arb = entry.getArbitrator();
1537                 Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
1538                 Long abLast = lastAbortSeenSeqNumMap.get(arb);
1539
1540                 Transaction prevTrans = null;
1541
1542                 if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
1543                         prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1544                 } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
1545                         prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1546                 } else {
1547                         prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
1548                 }
1549
1550                 // Duplicate so delete old copy
1551                 if (prevTrans != null) {
1552                         prevTrans.setDead();
1553                 }
1554         }
1555
1556         private void processEntry(Abort entry) {
1557                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
1558                         // Abort has not been seen yet so we need to keep track of it
1559
1560                         Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
1561                         if (prevAbort != null) {
1562                                 prevAbort.setDead(); // delete old version of the duplicate
1563                         }
1564
1565                         if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
1566                                 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1567                         }
1568                 } else {
1569                         // The machine already saw this so it is dead
1570                         entry.setDead();
1571                 }
1572
1573                 // Update the status of the transaction and remove it since we are done with this transaction
1574                 TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
1575                 if (status != null) {
1576                         status.setStatus(TransactionStatus.StatusAborted);
1577                 }
1578         }
1579
1580         private void processEntry(Commit entry) {
1581                 Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
1582
1583                 if (arbMap == null) {
1584                         arbMap = new HashMap<Long, Commit>();
1585                 }
1586
1587                 Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
1588                 newCommitMap.put(entry.getTransArbitrator(), arbMap);
1589
1590                 if (prevCommit != null) {
1591                         prevCommit.setDead();
1592                 }
1593         }
1594
1595         private void processEntry(TableStatus entry) {
1596                 int newnumslots = entry.getMaxSlots();
1597                 // updateCurrMaxSize(newnumslots);
1598                 if (lastTableStatus != null)
1599                         lastTableStatus.setDead();
1600                 lastTableStatus = entry;
1601
1602                 if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
1603                         smallestTableStatusSeen = newnumslots;
1604                 }
1605
1606                 if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
1607                         largestTableStatusSeen = newnumslots;
1608                 }
1609         }
1610
1611         private void addWatchList(long machineid, RejectedMessage entry) {
1612                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1613                 if (entries == null)
1614                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1615                 entries.add(entry);
1616         }
1617
1618         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1619                 machineSet.remove(machineid);
1620
1621                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1622                 if (watchset != null) {
1623                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1624                                 RejectedMessage rm = rmit.next();
1625                                 if (rm.getNewSeqNum() <= seqnum) {
1626                                         /* Remove it from our watchlist */
1627                                         rmit.remove();
1628                                         /* Decrement machines that need to see this notification */
1629                                         rm.removeWatcher(machineid);
1630                                 }
1631                         }
1632                 }
1633
1634                 if (machineid == localmachineid) {
1635                         /* Our own messages are immediately dead. */
1636                         if (liveness instanceof LastMessage) {
1637                                 ((LastMessage)liveness).setDead();
1638                         } else if (liveness instanceof Slot) {
1639                                 ((Slot)liveness).setDead();
1640                         } else {
1641                                 throw new Error("Unrecognized type");
1642                         }
1643                 }
1644
1645                 // Set dead the abort
1646                 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1647                         Abort abort = i.next().getValue();
1648
1649                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1650                                 abort.setDead();
1651                                 i.remove();
1652                         }
1653                 }
1654
1655                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1656                 if (lastmsgentry == null)
1657                         return;
1658
1659                 long lastmsgseqnum = lastmsgentry.getFirst();
1660                 Liveness lastentry = lastmsgentry.getSecond();
1661                 if (machineid != localmachineid) {
1662                         if (lastentry instanceof LastMessage) {
1663                                 ((LastMessage)lastentry).setDead();
1664                         } else if (lastentry instanceof Slot) {
1665                                 ((Slot)lastentry).setDead();
1666                         } else {
1667                                 throw new Error("Unrecognized type");
1668                         }
1669                 }
1670
1671                 if (machineid == localmachineid) {
1672                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1673                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  seqnum + " got: " + lastmsgseqnum);
1674                 } else {
1675                         if (lastmsgseqnum > seqnum)
1676                                 throw new Error("Server Error: Rollback on remote machine sequence number");
1677                 }
1678         }
1679
1680         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1681                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1682                 for (Entry entry : slot.getEntries()) {
1683                         switch (entry.getType()) {
1684
1685                         case Entry.TypeNewKey:
1686                                 processEntry((NewKey)entry);
1687                                 break;
1688
1689                         case Entry.TypeCommit:
1690                                 processEntry((Commit)entry);
1691                                 break;
1692
1693                         case Entry.TypeAbort:
1694                                 processEntry((Abort)entry);
1695                                 break;
1696
1697                         case Entry.TypeTransaction:
1698                                 processEntry((Transaction)entry);
1699                                 break;
1700
1701                         case Entry.TypeLastMessage:
1702                                 processEntry((LastMessage)entry, machineSet);
1703                                 break;
1704
1705                         case Entry.TypeRejectedMessage:
1706                                 processEntry((RejectedMessage)entry, indexer);
1707                                 break;
1708
1709                         case Entry.TypeTableStatus:
1710                                 processEntry((TableStatus)entry);
1711                                 break;
1712
1713                         default:
1714                                 throw new Error("Unrecognized type: " + entry.getType());
1715                         }
1716                 }
1717         }
1718
1719         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1720                 for (int i = 0; i < newslots.length; i++) {
1721                         Slot currslot = newslots[i];
1722                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1723                         if (prevslot != null &&
1724                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1725                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
1726                 }
1727         }
1728 }