Changes
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15
16 /**
17  * IoTTable data structure.  Provides client interface.
18  * @author Brian Demsky
19  * @version 1.0
20  */
21
22 final public class Table {
23
24         /* Constants */
25         static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
26         static final int SKIP_THRESHOLD = 10;
27         static final double RESIZE_MULTIPLE = 1.2;
28         static final double RESIZE_THRESHOLD = 0.75;
29         static final int REJECTED_THRESHOLD = 5;
30
31         /* Helper Objects */
32         private SlotBuffer buffer = null;
33         private CloudComm cloud = null;
34         private Random random = null;
35         private TableStatus liveTableStatus = null;
36         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
37         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
38         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
39
40         /* Variables */
41         private int numberOfSlots = 0;  // Number of slots stored in buffer
42         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
43         private long liveSlotCount = 0; // Number of currently live slots
44         private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
45         private long localMachineId = 0; // Machine ID of this client device
46         private long sequenceNumber = 0; // Largest sequence number a client has received
47         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
48         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
49         private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
50         private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
51         private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
52         private long localArbitrationSequenceNumber = 0;
53         private boolean hadPartialSendToServer = false;
54         private boolean attemptedToSendToServer = false;
55         private long expectedsize;
56         private boolean didFindTableStatus = false;
57         private long currMaxSize = 0;
58
59         private Slot lastSlotAttemptedToSend = null;
60         private boolean lastIsNewKey = false;
61         private int lastNewSize = 0;
62         private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
63         private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
64         private NewKey lastNewKey = null;
65
66
67         /* Data Structures  */
68         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
69         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
70         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
71         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
72         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
73         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
74         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
75         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
76         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
77         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
78         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
79         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
80         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
81         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
82         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
83         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
84         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
85         private List<Transaction> pendingTransactionQueue = null;
86         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
87         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
88         private Map<Transaction, List<Integer>> transactionPartsSent = null;
89         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
90         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
91         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
92         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
93         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
94         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
95
96
97         public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
98                 localMachineId = _localMachineId;
99                 cloud = new CloudComm(this, baseurl, password, listeningPort);
100
101                 init();
102         }
103
104         public Table(CloudComm _cloud, long _localMachineId) {
105                 localMachineId = _localMachineId;
106                 cloud = _cloud;
107
108                 init();
109         }
110
111         /**
112          * Init all the stuff needed for for table usage
113          */
114         private void init() {
115
116                 // Init helper objects
117                 random = new Random();
118                 buffer = new SlotBuffer();
119
120                 // Set Variables
121                 oldestLiveSlotSequenceNumver = 1;
122
123                 // init data structs
124                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
125                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
126                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
127                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
128                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
129                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
130                 arbitratorTable = new HashMap<IoTString, Long>();
131                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
132                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
133                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
134                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
135                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
136                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
137                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
138                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
139                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
140                 rejectedSlotList = new Vector<Long>();
141                 pendingTransactionQueue = new ArrayList<Transaction>();
142                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
143                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
144                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
145                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
146                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
147                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
148                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
149                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
150                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
151
152
153                 // Other init stuff
154                 numberOfSlots = buffer.capacity();
155                 setResizeThreshold();
156         }
157
158         // TODO: delete method
159         public synchronized void printSlots() {
160                 long o = buffer.getOldestSeqNum();
161                 long n = buffer.getNewestSeqNum();
162
163                 int[] types = new int[10];
164
165                 int num = 0;
166
167                 int livec = 0;
168                 int deadc = 0;
169
170                 int casdasd = 0;
171
172                 int liveslo = 0;
173
174                 for (long i = o; i < (n + 1); i++) {
175                         Slot s = buffer.getSlot(i);
176
177
178                         if (s.isLive()) {
179                                 liveslo++;
180                         }
181
182                         Vector<Entry> entries = s.getEntries();
183
184                         for (Entry e : entries) {
185                                 if (e.isLive()) {
186                                         int type = e.getType();
187
188
189                                         if (type == 6) {
190                                                 RejectedMessage rej = (RejectedMessage)e;
191                                                 casdasd++;
192
193                                                 System.out.println(rej.getMachineID());
194                                         }
195
196
197                                         types[type] = types[type] + 1;
198                                         num++;
199                                         livec++;
200                                 } else {
201                                         deadc++;
202                                 }
203                         }
204                 }
205
206                 for (int i = 0; i < 10; i++) {
207                         System.out.println(i + "    " + types[i]);
208                 }
209                 System.out.println("Live count:   " + livec);
210                 System.out.println("Live Slot count:   " + liveslo);
211
212                 System.out.println("Dead count:   " + deadc);
213                 System.out.println("Old:   " + o);
214                 System.out.println("New:   " + n);
215                 System.out.println("Size:   " + buffer.size());
216                 // System.out.println("Commits:   " + liveCommitsTable.size());
217                 System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
218                 System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
219
220                 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
221                         System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
222                 }
223
224
225                 for (Long a : liveCommitsTable.keySet()) {
226                         for (Long b : liveCommitsTable.get(a).keySet()) {
227                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
228                                         System.out.print(kv + " ");
229                                 }
230                                 System.out.print("|| ");
231                         }
232                         System.out.println();
233                 }
234
235         }
236
237         /**
238          * Initialize the table by inserting a table status as the first entry into the table status
239          * also initialize the crypto stuff.
240          */
241         public synchronized void initTable() throws ServerException {
242                 cloud.initSecurity();
243
244                 // Create the first insertion into the block chain which is the table status
245                 Slot s = new Slot(this, 1, localMachineId);
246                 TableStatus status = new TableStatus(s, numberOfSlots);
247                 s.addEntry(status);
248                 Slot[] array = cloud.putSlot(s, numberOfSlots);
249
250                 if (array == null) {
251                         array = new Slot[] {s};
252                         // update local block chain
253                         validateAndUpdate(array, true);
254                 } else if (array.length == 1) {
255                         // in case we did push the slot BUT we failed to init it
256                         validateAndUpdate(array, true);
257                 } else {
258                         throw new Error("Error on initialization");
259                 }
260         }
261
262         /**
263          * Rebuild the table from scratch by pulling the latest block chain from the server.
264          */
265         public synchronized void rebuild() throws ServerException {
266                 // Just pull the latest slots from the server
267                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
268                 validateAndUpdate(newslots, true);
269                 sendToServer(null);
270                 updateLiveTransactionsAndStatus();
271
272         }
273
274         // public String toString() {
275         //      String retString = " Committed Table: \n";
276         //      retString += "---------------------------\n";
277         //      retString += commitedTable.toString();
278
279         //      retString += "\n\n";
280
281         //      retString += " Speculative Table: \n";
282         //      retString += "---------------------------\n";
283         //      retString += speculativeTable.toString();
284
285         //      return retString;
286         // }
287
288         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
289                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
290         }
291
292         public synchronized Long getArbitrator(IoTString key) {
293                 return arbitratorTable.get(key);
294         }
295
296         public synchronized void close() {
297                 cloud.close();
298         }
299
300         public synchronized IoTString getCommitted(IoTString key)  {
301                 KeyValue kv = committedKeyValueTable.get(key);
302
303                 if (kv != null) {
304                         return kv.getValue();
305                 } else {
306                         return null;
307                 }
308         }
309
310         public synchronized IoTString getSpeculative(IoTString key) {
311                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
312
313                 if (kv == null) {
314                         kv = speculatedKeyValueTable.get(key);
315                 }
316
317                 if (kv == null) {
318                         kv = committedKeyValueTable.get(key);
319                 }
320
321                 if (kv != null) {
322                         return kv.getValue();
323                 } else {
324                         return null;
325                 }
326         }
327
328         public synchronized IoTString getCommittedAtomic(IoTString key) {
329                 KeyValue kv = committedKeyValueTable.get(key);
330
331                 if (arbitratorTable.get(key) == null) {
332                         throw new Error("Key not Found.");
333                 }
334
335                 // Make sure new key value pair matches the current arbitrator
336                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
337                         // TODO: Maybe not throw en error
338                         throw new Error("Not all Key Values Match Arbitrator.");
339                 }
340
341                 if (kv != null) {
342                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
343                         return kv.getValue();
344                 } else {
345                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
346                         return null;
347                 }
348         }
349
350         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
351                 if (arbitratorTable.get(key) == null) {
352                         throw new Error("Key not Found.");
353                 }
354
355                 // Make sure new key value pair matches the current arbitrator
356                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
357                         // TODO: Maybe not throw en error
358                         throw new Error("Not all Key Values Match Arbitrator.");
359                 }
360
361                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
362
363                 if (kv == null) {
364                         kv = speculatedKeyValueTable.get(key);
365                 }
366
367                 if (kv == null) {
368                         kv = committedKeyValueTable.get(key);
369                 }
370
371                 if (kv != null) {
372                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
373                         return kv.getValue();
374                 } else {
375                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
376                         return null;
377                 }
378         }
379
380         public synchronized boolean update()  {
381                 try {
382                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
383                         validateAndUpdate(newSlots, false);
384                         sendToServer(null);
385
386
387                         updateLiveTransactionsAndStatus();
388
389                         return true;
390                 } catch (Exception e) {
391                         // e.printStackTrace();
392
393                         for (Long m : localCommunicationTable.keySet()) {
394                                 updateFromLocal(m);
395                         }
396                 }
397
398                 return false;
399         }
400
401         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
402                 while (true) {
403                         if (arbitratorTable.get(keyName) != null) {
404                                 // There is already an arbitrator
405                                 return false;
406                         }
407
408                         NewKey newKey = new NewKey(null, keyName, machineId);
409
410                         if (sendToServer(newKey)) {
411                                 // If successfully inserted
412                                 return true;
413                         }
414                 }
415         }
416
417         public synchronized void startTransaction() {
418                 // Create a new transaction, invalidates any old pending transactions.
419                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
420         }
421
422         public synchronized void addKV(IoTString key, IoTString value) {
423
424                 // Make sure it is a valid key
425                 if (arbitratorTable.get(key) == null) {
426                         throw new Error("Key not Found.");
427                 }
428
429                 // Make sure new key value pair matches the current arbitrator
430                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
431                         // TODO: Maybe not throw en error
432                         throw new Error("Not all Key Values Match Arbitrator.");
433                 }
434
435                 // Add the key value to this transaction
436                 KeyValue kv = new KeyValue(key, value);
437                 pendingTransactionBuilder.addKV(kv);
438         }
439
440         public synchronized TransactionStatus commitTransaction() {
441
442                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
443                         // transaction with no updates will have no effect on the system
444                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
445                 }
446
447                 // Set the local transaction sequence number and increment
448                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
449                 localTransactionSequenceNumber++;
450
451                 // Create the transaction status
452                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
453
454                 // Create the new transaction
455                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
456                 newTransaction.setTransactionStatus(transactionStatus);
457
458                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
459                         // Add it to the queue and invalidate the builder for safety
460                         pendingTransactionQueue.add(newTransaction);
461                 } else {
462                         arbitrateOnLocalTransaction(newTransaction);
463                         updateLiveStateFromLocal();
464                 }
465
466                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
467
468                 try {
469                         sendToServer(null);
470                 } catch (ServerException e) {
471
472                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
473                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
474                                 Transaction transaction = iter.next();
475
476                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
477                                         // Already contacted this client so ignore all attempts to contact this client
478                                         // to preserve ordering for arbitrator
479                                         continue;
480                                 }
481
482                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
483
484                                 if (sendReturn.getFirst()) {
485                                         // Failed to contact over local
486                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
487                                 } else {
488                                         // Successful contact or should not contact
489
490                                         if (sendReturn.getSecond()) {
491                                                 // did arbitrate
492                                                 iter.remove();
493                                         }
494                                 }
495                         }
496                 }
497
498                 updateLiveStateFromLocal();
499
500                 return transactionStatus;
501         }
502
503         /**
504          * Get the machine ID for this client
505          */
506         public long getMachineId() {
507                 return localMachineId;
508         }
509
510         /**
511          * Decrement the number of live slots that we currently have
512          */
513         public void decrementLiveCount() {
514                 liveSlotCount--;
515         }
516
517         /**
518          * Recalculate the new resize threshold
519          */
520         private void setResizeThreshold() {
521                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
522                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
523         }
524
525
526         boolean lastInsertedNewKey = false;
527
528         private boolean sendToServer(NewKey newKey) throws ServerException {
529
530                 boolean fromRetry = false;
531
532                 try {
533                         if (hadPartialSendToServer) {
534                                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
535                                 if (newSlots.length == 0) {
536                                         fromRetry = true;
537                                         ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
538
539                                         if (sendSlotsReturn.getFirst()) {
540                                                 if (newKey != null) {
541                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
542                                                                 newKey = null;
543                                                         }
544                                                 }
545
546                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
547                                                         transaction.resetServerFailure();
548
549                                                         // Update which transactions parts still need to be sent
550                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
551
552                                                         // Add the transaction status to the outstanding list
553                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
554
555                                                         // Update the transaction status
556                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
557
558                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
559                                                         if (transaction.didSendAllParts()) {
560                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
561                                                                 pendingTransactionQueue.remove(transaction);
562                                                         }
563                                                 }
564                                         } else {
565
566                                                 newSlots = sendSlotsReturn.getThird();
567
568                                                 boolean isInserted = false;
569                                                 for (Slot s : newSlots) {
570                                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
571                                                                 isInserted = true;
572                                                                 break;
573                                                         }
574                                                 }
575
576                                                 for (Slot s : newSlots) {
577                                                         if (isInserted) {
578                                                                 break;
579                                                         }
580
581                                                         // Process each entry in the slot
582                                                         for (Entry entry : s.getEntries()) {
583
584                                                                 if (entry.getType() == Entry.TypeLastMessage) {
585                                                                         LastMessage lastMessage = (LastMessage)entry;
586                                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
587                                                                                 isInserted = true;
588                                                                                 break;
589                                                                         }
590                                                                 }
591                                                         }
592                                                 }
593
594                                                 if (isInserted) {
595                                                         if (newKey != null) {
596                                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
597                                                                         newKey = null;
598                                                                 }
599                                                         }
600
601                                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
602                                                                 transaction.resetServerFailure();
603
604                                                                 // Update which transactions parts still need to be sent
605                                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
606
607                                                                 // Add the transaction status to the outstanding list
608                                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
609
610                                                                 // Update the transaction status
611                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
612
613                                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
614                                                                 if (transaction.didSendAllParts()) {
615                                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
616                                                                         pendingTransactionQueue.remove(transaction);
617                                                                 } else {
618                                                                         transaction.resetServerFailure();
619                                                                         // Set the transaction sequence number back to nothing
620                                                                         if (!transaction.didSendAPartToServer()) {
621                                                                                 transaction.setSequenceNumber(-1);
622                                                                         }
623                                                                 }
624                                                         }
625                                                 }
626                                         }
627
628                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
629                                                 transaction.resetServerFailure();
630                                                 // Set the transaction sequence number back to nothing
631                                                 if (!transaction.didSendAPartToServer()) {
632                                                         transaction.setSequenceNumber(-1);
633                                                 }
634                                         }
635
636                                         if (sendSlotsReturn.getThird().length != 0) {
637                                                 // insert into the local block chain
638                                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
639                                         }
640                                         // continue;
641                                 } else {
642                                         boolean isInserted = false;
643                                         for (Slot s : newSlots) {
644                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
645                                                         isInserted = true;
646                                                         break;
647                                                 }
648                                         }
649
650                                         for (Slot s : newSlots) {
651                                                 if (isInserted) {
652                                                         break;
653                                                 }
654
655                                                 // Process each entry in the slot
656                                                 for (Entry entry : s.getEntries()) {
657
658                                                         if (entry.getType() == Entry.TypeLastMessage) {
659                                                                 LastMessage lastMessage = (LastMessage)entry;
660                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
661                                                                         isInserted = true;
662                                                                         break;
663                                                                 }
664                                                         }
665                                                 }
666                                         }
667
668                                         if (isInserted) {
669                                                 if (newKey != null) {
670                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
671                                                                 newKey = null;
672                                                         }
673                                                 }
674
675                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
676                                                         transaction.resetServerFailure();
677
678                                                         // Update which transactions parts still need to be sent
679                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
680
681                                                         // Add the transaction status to the outstanding list
682                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
683
684                                                         // Update the transaction status
685                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
686
687                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
688                                                         if (transaction.didSendAllParts()) {
689                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
690                                                                 pendingTransactionQueue.remove(transaction);
691                                                         } else {
692                                                                 transaction.resetServerFailure();
693                                                                 // Set the transaction sequence number back to nothing
694                                                                 if (!transaction.didSendAPartToServer()) {
695                                                                         transaction.setSequenceNumber(-1);
696                                                                 }
697                                                         }
698                                                 }
699                                         } else {
700                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
701                                                         transaction.resetServerFailure();
702                                                         // Set the transaction sequence number back to nothing
703                                                         if (!transaction.didSendAPartToServer()) {
704                                                                 transaction.setSequenceNumber(-1);
705                                                         }
706                                                 }
707                                         }
708
709                                         // insert into the local block chain
710                                         validateAndUpdate(newSlots, true);
711                                 }
712                         }
713                 } catch (ServerException e) {
714                         throw e;
715                 }
716
717
718
719                 try {
720                         // While we have stuff that needs inserting into the block chain
721                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
722
723
724                                 fromRetry = false;
725
726                                 if (hadPartialSendToServer) {
727                                         throw new Error("Should Be error free");
728                                 }
729
730
731
732                                 // If there is a new key with same name then end
733                                 if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
734                                         return false;
735                                 }
736
737                                 // Create the slot
738                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
739
740                                 // Try to fill the slot with data
741                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
742                                 boolean needsResize = fillSlotsReturn.getFirst();
743                                 int newSize = fillSlotsReturn.getSecond();
744                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
745
746                                 if (needsResize) {
747                                         // Reset which transaction to send
748                                         for (Transaction transaction : transactionPartsSent.keySet()) {
749                                                 transaction.resetNextPartToSend();
750
751                                                 // Set the transaction sequence number back to nothing
752                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
753                                                         transaction.setSequenceNumber(-1);
754                                                 }
755                                         }
756
757                                         // Clear the sent data since we are trying again
758                                         pendingSendArbitrationEntriesToDelete.clear();
759                                         transactionPartsSent.clear();
760
761                                         // We needed a resize so try again
762                                         fillSlot(slot, true, newKey);
763                                 }
764
765                                 lastSlotAttemptedToSend = slot;
766                                 lastIsNewKey = (newKey != null);
767                                 lastInsertedNewKey = insertedNewKey;
768                                 lastNewSize = newSize;
769                                 lastNewKey = newKey;
770                                 lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
771                                 lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
772
773
774                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
775
776                                 if (sendSlotsReturn.getFirst()) {
777
778                                         // Did insert into the block chain
779
780                                         if (insertedNewKey) {
781                                                 // This slot was what was inserted not a previous slot
782
783                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
784                                                 newKey = null;
785                                         } 
786
787                                         // Remove the aborts and commit parts that were sent from the pending to send queue
788                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
789                                                 ArbitrationRound round = iter.next();
790                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
791
792                                                 if (round.isDoneSending()) {
793                                                         // Sent all the parts
794                                                         iter.remove();
795                                                 }
796                                         }
797
798                                         for (Transaction transaction : transactionPartsSent.keySet()) {
799                                                 transaction.resetServerFailure();
800
801                                                 // Update which transactions parts still need to be sent
802                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
803
804                                                 // Add the transaction status to the outstanding list
805                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
806
807                                                 // Update the transaction status
808                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
809
810                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
811                                                 if (transaction.didSendAllParts()) {
812                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
813                                                         pendingTransactionQueue.remove(transaction);
814                                                 }
815                                         }
816                                 } else {
817
818                                         // if (!sendSlotsReturn.getSecond()) {
819                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
820                                         //              transaction.resetServerFailure();
821                                         //      }
822                                         // } else {
823                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
824                                         //              transaction.resetServerFailure();
825
826                                         //              // Update which transactions parts still need to be sent
827                                         //              transaction.removeSentParts(transactionPartsSent.get(transaction));
828
829                                         //              // Add the transaction status to the outstanding list
830                                         //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
831
832                                         //              // Update the transaction status
833                                         //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
834
835                                         //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
836                                         //              if (transaction.didSendAllParts()) {
837                                         //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
838                                         //                      pendingTransactionQueue.remove(transaction);
839
840                                         //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
841                                         //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
842                                         //                      }
843                                         //              }
844                                         //      }
845                                         // }
846
847                                         // Reset which transaction to send
848                                         for (Transaction transaction : transactionPartsSent.keySet()) {
849                                                 transaction.resetNextPartToSend();
850                                                 // transaction.resetNextPartToSend();
851
852                                                 // Set the transaction sequence number back to nothing
853                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
854                                                         transaction.setSequenceNumber(-1);
855                                                 }
856                                         }
857                                 }
858
859                                 // Clear the sent data in preparation for next send
860                                 pendingSendArbitrationEntriesToDelete.clear();
861                                 transactionPartsSent.clear();
862
863                                 if (sendSlotsReturn.getThird().length != 0) {
864                                         // insert into the local block chain
865                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
866                                 }
867                         }
868
869                 } catch (ServerException e) {
870
871                         if (e.getType() != ServerException.TypeInputTimeout) {
872                                 // e.printStackTrace();
873
874                                 // Nothing was able to be sent to the server so just clear these data structures
875                                 for (Transaction transaction : transactionPartsSent.keySet()) {
876                                         transaction.resetNextPartToSend();
877
878                                         // Set the transaction sequence number back to nothing
879                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
880                                                 transaction.setSequenceNumber(-1);
881                                         }
882                                 }
883                         } else {
884                                 // There was a partial send to the server
885                                 hadPartialSendToServer = true;
886
887
888                                 // if (!fromRetry) {
889                                 //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
890                                 //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
891                                 // }
892
893                                 // Nothing was able to be sent to the server so just clear these data structures
894                                 for (Transaction transaction : transactionPartsSent.keySet()) {
895                                         transaction.resetNextPartToSend();
896                                         transaction.setServerFailure();
897                                 }
898                         }
899
900                         pendingSendArbitrationEntriesToDelete.clear();
901                         transactionPartsSent.clear();
902
903                         throw e;
904                 }
905
906                 return newKey == null;
907         }
908
909         private synchronized boolean updateFromLocal(long machineId) {
910                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
911                 if (localCommunicationInformation == null) {
912                         // Cant talk to that device locally so do nothing
913                         return false;
914                 }
915
916                 // Get the size of the send data
917                 int sendDataSize = Integer.BYTES + Long.BYTES;
918
919                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
920                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
921                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
922                 }
923
924                 byte[] sendData = new byte[sendDataSize];
925                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
926
927                 // Encode the data
928                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
929                 bbEncode.putInt(0);
930
931                 // Send by local
932                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
933
934                 if (returnData == null) {
935                         // Could not contact server
936                         return false;
937                 }
938
939                 // Decode the data
940                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
941                 int numberOfEntries = bbDecode.getInt();
942
943                 for (int i = 0; i < numberOfEntries; i++) {
944                         byte type = bbDecode.get();
945                         if (type == Entry.TypeAbort) {
946                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
947                                 processEntry(abort);
948                         } else if (type == Entry.TypeCommitPart) {
949                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
950                                 processEntry(commitPart);
951                         }
952                 }
953
954                 updateLiveStateFromLocal();
955
956                 return true;
957         }
958
959         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
960
961                 // Get the devices local communications
962                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
963
964                 if (localCommunicationInformation == null) {
965                         // Cant talk to that device locally so do nothing
966                         return new Pair<Boolean, Boolean>(true, false);
967                 }
968
969                 // Get the size of the send data
970                 int sendDataSize = Integer.BYTES + Long.BYTES;
971                 for (TransactionPart part : transaction.getParts().values()) {
972                         sendDataSize += part.getSize();
973                 }
974
975                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
976                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
977                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
978                 }
979
980                 // Make the send data size
981                 byte[] sendData = new byte[sendDataSize];
982                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
983
984                 // Encode the data
985                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
986                 bbEncode.putInt(transaction.getParts().size());
987                 for (TransactionPart part : transaction.getParts().values()) {
988                         part.encode(bbEncode);
989                 }
990
991
992                 // Send by local
993                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
994
995                 if (returnData == null) {
996                         // Could not contact server
997                         return new Pair<Boolean, Boolean>(true, false);
998                 }
999
1000                 // Decode the data
1001                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
1002                 boolean didCommit = bbDecode.get() == 1;
1003                 boolean couldArbitrate = bbDecode.get() == 1;
1004                 int numberOfEntries = bbDecode.getInt();
1005                 boolean foundAbort = false;
1006
1007                 for (int i = 0; i < numberOfEntries; i++) {
1008                         byte type = bbDecode.get();
1009                         if (type == Entry.TypeAbort) {
1010                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
1011
1012                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
1013                                         foundAbort = true;
1014                                 }
1015
1016                                 processEntry(abort);
1017                         } else if (type == Entry.TypeCommitPart) {
1018                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
1019                                 processEntry(commitPart);
1020                         }
1021                 }
1022
1023                 updateLiveStateFromLocal();
1024
1025                 if (couldArbitrate) {
1026                         TransactionStatus status =  transaction.getTransactionStatus();
1027                         if (didCommit) {
1028                                 status.setStatus(TransactionStatus.StatusCommitted);
1029                         } else {
1030                                 status.setStatus(TransactionStatus.StatusAborted);
1031                         }
1032                 } else {
1033                         TransactionStatus status =  transaction.getTransactionStatus();
1034                         if (foundAbort) {
1035                                 status.setStatus(TransactionStatus.StatusAborted);
1036                         } else {
1037                                 status.setStatus(TransactionStatus.StatusCommitted);
1038                         }
1039                 }
1040
1041                 return new Pair<Boolean, Boolean>(false, true);
1042         }
1043
1044         public synchronized byte[] acceptDataFromLocal(byte[] data) {
1045
1046                 // Decode the data
1047                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1048                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1049                 int numberOfParts = bbDecode.getInt();
1050
1051                 // If we did commit a transaction or not
1052                 boolean didCommit = false;
1053                 boolean couldArbitrate = false;
1054
1055                 if (numberOfParts != 0) {
1056
1057                         // decode the transaction
1058                         Transaction transaction = new Transaction();
1059                         for (int i = 0; i < numberOfParts; i++) {
1060                                 bbDecode.get();
1061                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
1062                                 transaction.addPartDecode(newPart);
1063                         }
1064
1065                         // Arbitrate on transaction and pull relevant return data
1066                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1067                         couldArbitrate = localArbitrateReturn.getFirst();
1068                         didCommit = localArbitrateReturn.getSecond();
1069
1070                         updateLiveStateFromLocal();
1071
1072                         // Transaction was sent to the server so keep track of it to prevent double commit
1073                         if (transaction.getSequenceNumber() != -1) {
1074                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1075                         }
1076                 }
1077
1078                 // The data to send back
1079                 int returnDataSize = 0;
1080                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
1081
1082                 // Get the aborts to send back
1083                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
1084                 Collections.sort(abortLocalSequenceNumbers);
1085                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1086                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1087                                 continue;
1088                         }
1089
1090                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1091                         unseenArbitrations.add(abort);
1092                         returnDataSize += abort.getSize();
1093                 }
1094
1095                 // Get the commits to send back
1096                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1097                 if (commitForClientTable != null) {
1098                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1099                         Collections.sort(commitLocalSequenceNumbers);
1100
1101                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1102                                 Commit commit = commitForClientTable.get(localSequenceNumber);
1103
1104                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1105                                         continue;
1106                                 }
1107
1108                                 unseenArbitrations.addAll(commit.getParts().values());
1109
1110                                 for (CommitPart commitPart : commit.getParts().values()) {
1111                                         returnDataSize += commitPart.getSize();
1112                                 }
1113                         }
1114                 }
1115
1116                 // Number of arbitration entries to decode
1117                 returnDataSize += 2 * Integer.BYTES;
1118
1119                 // Boolean of did commit or not
1120                 if (numberOfParts != 0) {
1121                         returnDataSize += Byte.BYTES;
1122                 }
1123
1124                 // Data to send Back
1125                 byte[] returnData = new byte[returnDataSize];
1126                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1127
1128                 if (numberOfParts != 0) {
1129                         if (didCommit) {
1130                                 bbEncode.put((byte)1);
1131                         } else {
1132                                 bbEncode.put((byte)0);
1133                         }
1134                         if (couldArbitrate) {
1135                                 bbEncode.put((byte)1);
1136                         } else {
1137                                 bbEncode.put((byte)0);
1138                         }
1139                 }
1140
1141                 bbEncode.putInt(unseenArbitrations.size());
1142                 for (Entry entry : unseenArbitrations) {
1143                         entry.encode(bbEncode);
1144                 }
1145
1146                 return returnData;
1147         }
1148
1149         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
1150
1151                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
1152                 attemptedToSendToServer = true;
1153
1154                 boolean inserted = false;
1155                 boolean lastTryInserted = false;
1156
1157                 Slot[] array = cloud.putSlot(slot, newSize);
1158                 if (array == null) {
1159                         array = new Slot[] {slot};
1160                         rejectedSlotList.clear();
1161                         inserted = true;
1162                 }       else {
1163                         if (array.length == 0) {
1164                                 throw new Error("Server Error: Did not send any slots");
1165                         }
1166
1167                         // if (attemptedToSendToServerTmp) {
1168                         if (hadPartialSendToServer) {
1169
1170                                 boolean isInserted = false;
1171                                 for (Slot s : array) {
1172                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1173                                                 isInserted = true;
1174                                                 break;
1175                                         }
1176                                 }
1177
1178                                 for (Slot s : array) {
1179                                         if (isInserted) {
1180                                                 break;
1181                                         }
1182
1183                                         // Process each entry in the slot
1184                                         for (Entry entry : s.getEntries()) {
1185
1186                                                 if (entry.getType() == Entry.TypeLastMessage) {
1187                                                         LastMessage lastMessage = (LastMessage)entry;
1188
1189                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1190                                                                 isInserted = true;
1191                                                                 break;
1192                                                         }
1193                                                 }
1194                                         }
1195                                 }
1196
1197                                 if (!isInserted) {
1198                                         rejectedSlotList.add(slot.getSequenceNumber());
1199                                         lastTryInserted = false;
1200                                 } else {
1201                                         lastTryInserted = true;
1202                                 }
1203                         } else {
1204                                 rejectedSlotList.add(slot.getSequenceNumber());
1205                                 lastTryInserted = false;
1206                         }
1207                 }
1208
1209                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
1210         }
1211
1212         /**
1213          * Returns false if a resize was needed
1214          */
1215         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
1216
1217
1218                 int newSize = 0;
1219                 if (liveSlotCount > bufferResizeThreshold) {
1220                         resize = true; //Resize is forced
1221
1222                 }
1223
1224                 if (resize) {
1225                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1226                         TableStatus status = new TableStatus(slot, newSize);
1227                         slot.addEntry(status);
1228                 }
1229
1230                 // Fill with rejected slots first before doing anything else
1231                 doRejectedMessages(slot);
1232
1233                 // Do mandatory rescue of entries
1234                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1235
1236                 // Extract working variables
1237                 boolean needsResize = mandatoryRescueReturn.getFirst();
1238                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
1239                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1240
1241                 if (needsResize && !resize) {
1242                         // We need to resize but we are not resizing so return false
1243                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
1244                 }
1245
1246                 boolean inserted = false;
1247                 if (newKeyEntry != null) {
1248                         newKeyEntry.setSlot(slot);
1249                         if (slot.hasSpace(newKeyEntry)) {
1250
1251                                 slot.addEntry(newKeyEntry);
1252                                 inserted = true;
1253                         } 
1254                 }
1255
1256                 // Clear the transactions, aborts and commits that were sent previously
1257                 transactionPartsSent.clear();
1258                 pendingSendArbitrationEntriesToDelete.clear();
1259
1260                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1261                         boolean isFull = false;
1262                         round.generateParts();
1263                         List<Entry> parts = round.getParts();
1264
1265                         // Insert pending arbitration data
1266                         for (Entry arbitrationData : parts) {
1267
1268                                 // If it is an abort then we need to set some information
1269                                 if (arbitrationData instanceof Abort) {
1270                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1271                                 }
1272
1273                                 if (!slot.hasSpace(arbitrationData)) {
1274                                         // No space so cant do anything else with these data entries
1275                                         isFull = true;
1276                                         break;
1277                                 }
1278
1279                                 // Add to this current slot and add it to entries to delete
1280                                 slot.addEntry(arbitrationData);
1281                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1282                         }
1283
1284                         if (isFull) {
1285                                 break;
1286                         }
1287                 }
1288
1289                 if (pendingTransactionQueue.size() > 0) {
1290
1291                         Transaction transaction = pendingTransactionQueue.get(0);
1292
1293                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1294                         // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1295                         //      transaction.setSequenceNumber(slot.getSequenceNumber());
1296                         // }
1297
1298                         if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1299                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1300                         }
1301
1302
1303                         while (true) {
1304                                 TransactionPart part = transaction.getNextPartToSend();
1305
1306                                 if (part == null) {
1307                                         // Ran out of parts to send for this transaction so move on
1308                                         break;
1309                                 }
1310
1311                                 if (slot.hasSpace(part)) {
1312                                         slot.addEntry(part);
1313                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1314                                         if (partsSent == null) {
1315                                                 partsSent = new ArrayList<Integer>();
1316                                                 transactionPartsSent.put(transaction, partsSent);
1317                                         }
1318                                         partsSent.add(part.getPartNumber());
1319                                         transactionPartsSent.put(transaction, partsSent);
1320                                 } else {
1321                                         break;
1322                                 }
1323                         }
1324                 }
1325
1326                 // Fill the remainder of the slot with rescue data
1327                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1328
1329                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1330         }
1331
1332         private void doRejectedMessages(Slot s) {
1333                 if (! rejectedSlotList.isEmpty()) {
1334                         /* TODO: We should avoid generating a rejected message entry if
1335                          * there is already a sufficient entry in the queue (e.g.,
1336                          * equalsto value of true and same sequence number).  */
1337
1338                         long old_seqn = rejectedSlotList.firstElement();
1339                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1340                                 long new_seqn = rejectedSlotList.lastElement();
1341                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1342                                 s.addEntry(rm);
1343                         } else {
1344                                 long prev_seqn = -1;
1345                                 int i = 0;
1346                                 /* Go through list of missing messages */
1347                                 for (; i < rejectedSlotList.size(); i++) {
1348                                         long curr_seqn = rejectedSlotList.get(i);
1349                                         Slot s_msg = buffer.getSlot(curr_seqn);
1350                                         if (s_msg != null)
1351                                                 break;
1352                                         prev_seqn = curr_seqn;
1353                                 }
1354                                 /* Generate rejected message entry for missing messages */
1355                                 if (prev_seqn != -1) {
1356                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1357                                         s.addEntry(rm);
1358                                 }
1359                                 /* Generate rejected message entries for present messages */
1360                                 for (; i < rejectedSlotList.size(); i++) {
1361                                         long curr_seqn = rejectedSlotList.get(i);
1362                                         Slot s_msg = buffer.getSlot(curr_seqn);
1363                                         long machineid = s_msg.getMachineID();
1364                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1365                                         s.addEntry(rm);
1366                                 }
1367                         }
1368                 }
1369         }
1370
1371         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1372                 long newestSequenceNumber = buffer.getNewestSeqNum();
1373                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1374                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1375                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1376                 }
1377
1378                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1379                 boolean seenLiveSlot = false;
1380                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1381                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1382
1383
1384                 // Mandatory Rescue
1385                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1386                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1387                         // Push slot number forward
1388                         if (! seenLiveSlot) {
1389                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1390                         }
1391
1392                         if (!previousSlot.isLive()) {
1393                                 continue;
1394                         }
1395
1396                         // We have seen a live slot
1397                         seenLiveSlot = true;
1398
1399                         // Get all the live entries for a slot
1400                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1401
1402                         // Iterate over all the live entries and try to rescue them
1403                         for (Entry liveEntry : liveEntries) {
1404                                 if (slot.hasSpace(liveEntry)) {
1405
1406                                         // Enough space to rescue the entry
1407                                         slot.addEntry(liveEntry);
1408                                 } else if (currentSequenceNumber == firstIfFull) {
1409                                         //if there's no space but the entry is about to fall off the queue
1410                                         System.out.println("B"); //?
1411                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1412
1413                                 }
1414                         }
1415                 }
1416
1417                 // Did not resize
1418                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1419         }
1420
1421         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1422                 /* now go through live entries from least to greatest sequence number until
1423                  * either all live slots added, or the slot doesn't have enough room
1424                  * for SKIP_THRESHOLD consecutive entries*/
1425                 int skipcount = 0;
1426                 long newestseqnum = buffer.getNewestSeqNum();
1427                 search:
1428                 for (; seqn <= newestseqnum; seqn++) {
1429                         Slot prevslot = buffer.getSlot(seqn);
1430                         //Push slot number forward
1431                         if (!seenliveslot)
1432                                 oldestLiveSlotSequenceNumver = seqn;
1433
1434                         if (!prevslot.isLive())
1435                                 continue;
1436                         seenliveslot = true;
1437                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1438                         for (Entry liveentry : liveentries) {
1439                                 if (s.hasSpace(liveentry))
1440                                         s.addEntry(liveentry);
1441                                 else {
1442                                         skipcount++;
1443                                         if (skipcount > SKIP_THRESHOLD)
1444                                                 break search;
1445                                 }
1446                         }
1447                 }
1448         }
1449
1450         /**
1451          * Checks for malicious activity and updates the local copy of the block chain.
1452          */
1453         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1454
1455                 // The cloud communication layer has checked slot HMACs already before decoding
1456                 if (newSlots.length == 0) {
1457                         return;
1458                 }
1459
1460                 // Make sure all slots are newer than the last largest slot this client has seen
1461                 long firstSeqNum = newSlots[0].getSequenceNumber();
1462                 if (firstSeqNum <= sequenceNumber) {
1463                         throw new Error("Server Error: Sent older slots!");
1464                 }
1465
1466                 // Create an object that can access both new slots and slots in our local chain
1467                 // without committing slots to our local chain
1468                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1469
1470                 // Check that the HMAC chain is not broken
1471                 checkHMACChain(indexer, newSlots);
1472
1473                 // Set to keep track of messages from clients
1474                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1475
1476                 // Process each slots data
1477                 for (Slot slot : newSlots) {
1478                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1479
1480                         updateExpectedSize();
1481                 }
1482
1483                 // If there is a gap, check to see if the server sent us everything.
1484                 if (firstSeqNum != (sequenceNumber + 1)) {
1485
1486                         // Check the size of the slots that were sent down by the server.
1487                         // Can only check the size if there was a gap
1488                         checkNumSlots(newSlots.length);
1489
1490                         // Since there was a gap every machine must have pushed a slot or must have
1491                         // a last message message.  If not then the server is hiding slots
1492                         if (!machineSet.isEmpty()) {
1493                                 throw new Error("Missing record for machines: " + machineSet);
1494                         }
1495                 }
1496
1497                 // Update the size of our local block chain.
1498                 commitNewMaxSize();
1499
1500                 // Commit new to slots to the local block chain.
1501                 for (Slot slot : newSlots) {
1502
1503                         // Insert this slot into our local block chain copy.
1504                         buffer.putSlot(slot);
1505
1506                         // Keep track of how many slots are currently live (have live data in them).
1507                         liveSlotCount++;
1508                 }
1509
1510                 // Get the sequence number of the latest slot in the system
1511                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1512
1513                 updateLiveStateFromServer();
1514
1515                 // No Need to remember after we pulled from the server
1516                 offlineTransactionsCommittedAndAtServer.clear();
1517
1518                 // This is invalidated now
1519                 hadPartialSendToServer = false;
1520         }
1521
1522         private void updateLiveStateFromServer() {
1523                 // Process the new transaction parts
1524                 processNewTransactionParts();
1525
1526                 // Do arbitration on new transactions that were received
1527                 arbitrateFromServer();
1528
1529                 // Update all the committed keys
1530                 boolean didCommitOrSpeculate = updateCommittedTable();
1531
1532                 // Delete the transactions that are now dead
1533                 updateLiveTransactionsAndStatus();
1534
1535                 // Do speculations
1536                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1537                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1538         }
1539
1540         private void updateLiveStateFromLocal() {
1541                 // Update all the committed keys
1542                 boolean didCommitOrSpeculate = updateCommittedTable();
1543
1544                 // Delete the transactions that are now dead
1545                 updateLiveTransactionsAndStatus();
1546
1547                 // Do speculations
1548                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1549                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1550         }
1551
1552         private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
1553                 // if (didFindTableStatus) {
1554                 // return;
1555                 // }
1556                 long prevslots = firstSequenceNumber;
1557
1558
1559                 if (didFindTableStatus) {
1560                         // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
1561                         // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1562
1563                 } else {
1564                         expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1565                         // System.out.println("Here: " + expectedsize);
1566                 }
1567
1568                 // System.out.println(numberOfSlots);
1569
1570                 didFindTableStatus = true;
1571                 currMaxSize = numberOfSlots;
1572         }
1573
1574         private void updateExpectedSize() {
1575                 expectedsize++;
1576
1577                 if (expectedsize > currMaxSize) {
1578                         expectedsize = currMaxSize;
1579                 }
1580         }
1581
1582
1583         /**
1584          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1585          * This is only called when we have a gap between the slots that we have locally and the slots
1586          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1587          * status message
1588          */
1589         private void checkNumSlots(int numberOfSlots) {
1590                 if (numberOfSlots != expectedsize) {
1591                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1592                 }
1593         }
1594
1595         private void updateCurrMaxSize(int newmaxsize) {
1596                 currMaxSize = newmaxsize;
1597         }
1598
1599
1600         /**
1601          * Update the size of of the local buffer if it is needed.
1602          */
1603         private void commitNewMaxSize() {
1604                 didFindTableStatus = false;
1605
1606                 // Resize the local slot buffer
1607                 if (numberOfSlots != currMaxSize) {
1608                         buffer.resize((int)currMaxSize);
1609                 }
1610
1611                 // Change the number of local slots to the new size
1612                 numberOfSlots = (int)currMaxSize;
1613
1614
1615                 // Recalculate the resize threshold since the size of the local buffer has changed
1616                 setResizeThreshold();
1617         }
1618
1619         /**
1620          * Process the new transaction parts from this latest round of slots received from the server
1621          */
1622         private void processNewTransactionParts() {
1623
1624                 if (newTransactionParts.size() == 0) {
1625                         // Nothing new to process
1626                         return;
1627                 }
1628
1629                 // Iterate through all the machine Ids that we received new parts for
1630                 for (Long machineId : newTransactionParts.keySet()) {
1631                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1632
1633                         // Iterate through all the parts for that machine Id
1634                         for (Pair<Long, Integer> partId : parts.keySet()) {
1635                                 TransactionPart part = parts.get(partId);
1636
1637                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1638                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1639                                         // Set dead the transaction part
1640                                         part.setDead();
1641                                         continue;
1642                                 }
1643
1644                                 // Get the transaction object for that sequence number
1645                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1646
1647                                 if (transaction == null) {
1648                                         // This is a new transaction that we dont have so make a new one
1649                                         transaction = new Transaction();
1650
1651                                         // Insert this new transaction into the live tables
1652                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1653                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1654                                 }
1655
1656                                 // Add that part to the transaction
1657                                 transaction.addPartDecode(part);
1658                         }
1659                 }
1660
1661                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1662                 newTransactionParts.clear();
1663         }
1664
1665
1666         private long lastSeqNumArbOn = 0;
1667
1668         private void arbitrateFromServer() {
1669
1670                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1671                         // Nothing to arbitrate on so move on
1672                         return;
1673                 }
1674
1675                 // Get the transaction sequence numbers and sort from oldest to newest
1676                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1677                 Collections.sort(transactionSequenceNumbers);
1678
1679                 // Collection of key value pairs that are
1680                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1681
1682                 // The last transaction arbitrated on
1683                 long lastTransactionCommitted = -1;
1684                 Set<Abort> generatedAborts = new HashSet<Abort>();
1685
1686                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1687                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1688
1689
1690
1691                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1692                         if (transaction.getArbitrator() != localMachineId) {
1693                                 continue;
1694                         }
1695
1696                         if (transactionSequenceNumber < lastSeqNumArbOn) {
1697                                 continue;
1698                         }
1699
1700                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1701                                 // We have seen this already locally so dont commit again
1702                                 continue;
1703                         }
1704
1705
1706                         if (!transaction.isComplete()) {
1707                                 // Will arbitrate in incorrect order if we continue so just break
1708                                 // Most likely this
1709                                 break;
1710                         }
1711
1712
1713                         // update the largest transaction seen by arbitrator from server
1714                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1715                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1716                         } else {
1717                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1718                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1719                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1720                                 }
1721                         }
1722
1723                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1724                                 // Guard evaluated as true
1725
1726                                 // Update the local changes so we can make the commit
1727                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1728                                         speculativeTableTmp.put(kv.getKey(), kv);
1729                                 }
1730
1731                                 // Update what the last transaction committed was for use in batch commit
1732                                 lastTransactionCommitted = transactionSequenceNumber;
1733                         } else {
1734                                 // Guard evaluated was false so create abort
1735
1736                                 // Create the abort
1737                                 Abort newAbort = new Abort(null,
1738                                                            transaction.getClientLocalSequenceNumber(),
1739                                                            transaction.getSequenceNumber(),
1740                                                            transaction.getMachineId(),
1741                                                            transaction.getArbitrator(),
1742                                                            localArbitrationSequenceNumber);
1743                                 localArbitrationSequenceNumber++;
1744
1745                                 generatedAborts.add(newAbort);
1746
1747                                 // Insert the abort so we can process
1748                                 processEntry(newAbort);
1749                         }
1750
1751                         lastSeqNumArbOn = transactionSequenceNumber;
1752
1753                         // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1754                 }
1755
1756                 Commit newCommit = null;
1757
1758                 // If there is something to commit
1759                 if (speculativeTableTmp.size() != 0) {
1760
1761                         // Create the commit and increment the commit sequence number
1762                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1763                         localArbitrationSequenceNumber++;
1764
1765                         // Add all the new keys to the commit
1766                         for (KeyValue kv : speculativeTableTmp.values()) {
1767                                 newCommit.addKV(kv);
1768                         }
1769
1770                         // create the commit parts
1771                         newCommit.createCommitParts();
1772
1773                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1774
1775                         // Insert the commit so we can process it
1776                         for (CommitPart commitPart : newCommit.getParts().values()) {
1777                                 processEntry(commitPart);
1778                         }
1779                 }
1780
1781                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1782                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1783                         pendingSendArbitrationRounds.add(arbitrationRound);
1784
1785                         if (compactArbitrationData()) {
1786                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1787                                 if (newArbitrationRound.getCommit() != null) {
1788                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1789                                                 processEntry(commitPart);
1790                                         }
1791                                 }
1792                         }
1793                 }
1794         }
1795
1796         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1797
1798                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1799                 if (transaction.getArbitrator() != localMachineId) {
1800                         return new Pair<Boolean, Boolean>(false, false);
1801                 }
1802
1803                 if (!transaction.isComplete()) {
1804                         // Will arbitrate in incorrect order if we continue so just break
1805                         // Most likely this
1806                         return new Pair<Boolean, Boolean>(false, false);
1807                 }
1808
1809                 if (transaction.getMachineId() != localMachineId) {
1810                         // dont do this check for local transactions
1811                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1812                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1813                                         // We've have already seen this from the server
1814                                         return new Pair<Boolean, Boolean>(false, false);
1815                                 }
1816                         }
1817                 }
1818
1819                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1820                         // Guard evaluated as true
1821
1822                         // Create the commit and increment the commit sequence number
1823                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1824                         localArbitrationSequenceNumber++;
1825
1826                         // Update the local changes so we can make the commit
1827                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1828                                 newCommit.addKV(kv);
1829                         }
1830
1831                         // create the commit parts
1832                         newCommit.createCommitParts();
1833
1834                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1835                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1836                         pendingSendArbitrationRounds.add(arbitrationRound);
1837
1838                         if (compactArbitrationData()) {
1839                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1840                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1841                                         processEntry(commitPart);
1842                                 }
1843                         } else {
1844                                 // Insert the commit so we can process it
1845                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1846                                         processEntry(commitPart);
1847                                 }
1848                         }
1849
1850                         if (transaction.getMachineId() == localMachineId) {
1851                                 TransactionStatus status = transaction.getTransactionStatus();
1852                                 if (status != null) {
1853                                         status.setStatus(TransactionStatus.StatusCommitted);
1854                                 }
1855                         }
1856
1857                         updateLiveStateFromLocal();
1858                         return new Pair<Boolean, Boolean>(true, true);
1859                 } else {
1860
1861                         if (transaction.getMachineId() == localMachineId) {
1862                                 // For locally created messages update the status
1863
1864                                 // Guard evaluated was false so create abort
1865                                 TransactionStatus status = transaction.getTransactionStatus();
1866                                 if (status != null) {
1867                                         status.setStatus(TransactionStatus.StatusAborted);
1868                                 }
1869                         } else {
1870                                 Set addAbortSet = new HashSet<Abort>();
1871
1872
1873                                 // Create the abort
1874                                 Abort newAbort = new Abort(null,
1875                                                            transaction.getClientLocalSequenceNumber(),
1876                                                            -1,
1877                                                            transaction.getMachineId(),
1878                                                            transaction.getArbitrator(),
1879                                                            localArbitrationSequenceNumber);
1880                                 localArbitrationSequenceNumber++;
1881
1882                                 addAbortSet.add(newAbort);
1883
1884
1885                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1886                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1887                                 pendingSendArbitrationRounds.add(arbitrationRound);
1888
1889                                 if (compactArbitrationData()) {
1890                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1891                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1892                                                 processEntry(commitPart);
1893                                         }
1894                                 }
1895                         }
1896
1897                         updateLiveStateFromLocal();
1898                         return new Pair<Boolean, Boolean>(true, false);
1899                 }
1900         }
1901
1902         /**
1903          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1904          */
1905         private boolean compactArbitrationData() {
1906
1907                 if (pendingSendArbitrationRounds.size() < 2) {
1908                         // Nothing to compact so do nothing
1909                         return false;
1910                 }
1911
1912                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1913                 if (lastRound.didSendPart()) {
1914                         return false;
1915                 }
1916
1917                 boolean hadCommit = (lastRound.getCommit() == null);
1918                 boolean gotNewCommit = false;
1919
1920                 int numberToDelete = 1;
1921                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1922                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1923
1924                         if (round.isFull() || round.didSendPart()) {
1925                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1926                                 break;
1927                         }
1928
1929                         if (round.getCommit() == null) {
1930
1931                                 // Try compacting aborts only
1932                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1933                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1934                                         // Cant compact since it would be too large
1935                                         break;
1936                                 }
1937                                 lastRound.addAborts(round.getAborts());
1938                         } else {
1939
1940                                 // Create a new larger commit
1941                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1942                                 localArbitrationSequenceNumber++;
1943
1944                                 // Create the commit parts so that we can count them
1945                                 newCommit.createCommitParts();
1946
1947                                 // Calculate the new size of the parts
1948                                 int newSize = newCommit.getNumberOfParts();
1949                                 newSize += lastRound.getAbortsCount();
1950                                 newSize += round.getAbortsCount();
1951
1952                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1953                                         // Cant compact since it would be too large
1954                                         break;
1955                                 }
1956
1957                                 // Set the new compacted part
1958                                 lastRound.setCommit(newCommit);
1959                                 lastRound.addAborts(round.getAborts());
1960                                 gotNewCommit = true;
1961                         }
1962
1963                         numberToDelete++;
1964                 }
1965
1966                 if (numberToDelete != 1) {
1967                         // If there is a compaction
1968
1969                         // Delete the previous pieces that are now in the new compacted piece
1970                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1971                                 pendingSendArbitrationRounds.clear();
1972                         } else {
1973                                 for (int i = 0; i < numberToDelete; i++) {
1974                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1975                                 }
1976                         }
1977
1978                         // Add the new compacted into the pending to send list
1979                         pendingSendArbitrationRounds.add(lastRound);
1980
1981                         // Should reinsert into the commit processor
1982                         if (hadCommit && gotNewCommit) {
1983                                 return true;
1984                         }
1985                 }
1986
1987                 return false;
1988         }
1989         // private boolean compactArbitrationData() {
1990         //      return false;
1991         // }
1992
1993         /**
1994          * Update all the commits and the committed tables, sets dead the dead transactions
1995          */
1996         private boolean updateCommittedTable() {
1997
1998                 if (newCommitParts.size() == 0) {
1999                         // Nothing new to process
2000                         return false;
2001                 }
2002
2003                 // Iterate through all the machine Ids that we received new parts for
2004                 for (Long machineId : newCommitParts.keySet()) {
2005                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
2006
2007                         // Iterate through all the parts for that machine Id
2008                         for (Pair<Long, Integer> partId : parts.keySet()) {
2009                                 CommitPart part = parts.get(partId);
2010
2011                                 // Get the transaction object for that sequence number
2012                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
2013
2014                                 if (commitForClientTable == null) {
2015                                         // This is the first commit from this device
2016                                         commitForClientTable = new HashMap<Long, Commit>();
2017                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
2018                                 }
2019
2020                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
2021
2022                                 if (commit == null) {
2023                                         // This is a new commit that we dont have so make a new one
2024                                         commit = new Commit();
2025
2026                                         // Insert this new commit into the live tables
2027                                         commitForClientTable.put(part.getSequenceNumber(), commit);
2028                                 }
2029
2030                                 // Add that part to the commit
2031                                 commit.addPartDecode(part);
2032                         }
2033                 }
2034
2035                 // Clear all the new commits parts in preparation for the next time the server sends slots
2036                 newCommitParts.clear();
2037
2038                 // If we process a new commit keep track of it for future use
2039                 boolean didProcessANewCommit = false;
2040
2041                 // Process the commits one by one
2042                 for (Long arbitratorId : liveCommitsTable.keySet()) {
2043
2044                         // Get all the commits for a specific arbitrator
2045                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2046
2047                         // Sort the commits in order
2048                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
2049                         Collections.sort(commitSequenceNumbers);
2050
2051                         // Get the last commit seen from this arbitrator
2052                         long lastCommitSeenSequenceNumber = -1;
2053                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
2054                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2055                         }
2056
2057                         // Go through each new commit one by one
2058                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2059                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2060                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
2061
2062                                 // Special processing if a commit is not complete
2063                                 if (!commit.isComplete()) {
2064                                         if (i == (commitSequenceNumbers.size() - 1)) {
2065                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2066                                                 break;
2067                                         } else {
2068                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2069                                                 // Delete it and move on
2070                                                 commit.setDead();
2071                                                 commitForClientTable.remove(commit.getSequenceNumber());
2072                                                 continue;
2073                                         }
2074                                 }
2075
2076                                 // Update the last transaction that was updated if we can
2077                                 if (commit.getTransactionSequenceNumber() != -1) {
2078                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2079
2080                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2081                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2082                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2083                                         }
2084                                 }
2085
2086                                 // Update the last arbitration data that we have seen so far
2087                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
2088
2089                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2090                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2091                                                 // Is larger
2092                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2093                                         }
2094                                 } else {
2095                                         // Never seen any data from this arbitrator so record the first one
2096                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2097                                 }
2098
2099                                 // We have already seen this commit before so need to do the full processing on this commit
2100                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2101
2102                                         // Update the last transaction that was updated if we can
2103                                         if (commit.getTransactionSequenceNumber() != -1) {
2104                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2105
2106                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2107                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2108                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2109                                                 }
2110                                         }
2111
2112                                         continue;
2113                                 }
2114
2115                                 // If we got here then this is a brand new commit and needs full processing
2116
2117                                 // Get what commits should be edited, these are the commits that have live values for their keys
2118                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
2119                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2120                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2121                                 }
2122                                 commitsToEdit.remove(null); // remove null since it could be in this set
2123
2124                                 // Update each previous commit that needs to be updated
2125                                 for (Commit previousCommit : commitsToEdit) {
2126
2127                                         // Only bother with live commits (TODO: Maybe remove this check)
2128                                         if (previousCommit.isLive()) {
2129
2130                                                 // Update which keys in the old commits are still live
2131                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2132                                                         previousCommit.invalidateKey(kv.getKey());
2133                                                 }
2134
2135                                                 // if the commit is now dead then remove it
2136                                                 if (!previousCommit.isLive()) {
2137                                                         commitForClientTable.remove(previousCommit);
2138                                                 }
2139                                         }
2140                                 }
2141
2142                                 // Update the last seen sequence number from this arbitrator
2143                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
2144                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2145                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2146                                         }
2147                                 } else {
2148                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2149                                 }
2150
2151                                 // We processed a new commit that we havent seen before
2152                                 didProcessANewCommit = true;
2153
2154                                 // Update the committed table of keys and which commit is using which key
2155                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2156                                         committedKeyValueTable.put(kv.getKey(), kv);
2157                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
2158                                 }
2159                         }
2160                 }
2161
2162                 return didProcessANewCommit;
2163         }
2164
2165         /**
2166          * Create the speculative table from transactions that are still live and have come from the cloud
2167          */
2168         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
2169                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2170                         // There is nothing to speculate on
2171                         return false;
2172                 }
2173
2174                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2175                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
2176                 Collections.sort(transactionSequenceNumbersSorted);
2177
2178                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2179
2180
2181                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2182                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2183                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2184
2185                         // Start from scratch
2186                         speculatedKeyValueTable.clear();
2187                         lastTransactionSequenceNumberSpeculatedOn = -1;
2188                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2189
2190                 }
2191
2192                 // Remember the front of the transaction list
2193                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2194
2195                 // Find where to start arbitration from
2196                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2197
2198                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2199                         // Make sure we are not out of bounds
2200                         return false; // did not speculate
2201                 }
2202
2203                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2204                 boolean didSkip = true;
2205
2206                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2207                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2208                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2209
2210                         if (!transaction.isComplete()) {
2211                                 // If there is an incomplete transaction then there is nothing we can do
2212                                 // add this transactions arbitrator to the list of arbitrators we should ignore
2213                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2214                                 didSkip = true;
2215                                 continue;
2216                         }
2217
2218                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2219                                 continue;
2220                         }
2221
2222                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2223
2224                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
2225                                 // Guard evaluated to true so update the speculative table
2226                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2227                                         speculatedKeyValueTable.put(kv.getKey(), kv);
2228                                 }
2229                         }
2230                 }
2231
2232                 if (didSkip) {
2233                         // Since there was a skip we need to redo the speculation next time around
2234                         lastTransactionSequenceNumberSpeculatedOn = -1;
2235                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2236                 }
2237
2238                 // We did some speculation
2239                 return true;
2240         }
2241
2242         /**
2243          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2244          */
2245         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
2246                 if (pendingTransactionQueue.size() == 0) {
2247                         // There is nothing to speculate on
2248                         return;
2249                 }
2250
2251
2252                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2253                         // need to reset on the pending speculation
2254                         lastPendingTransactionSpeculatedOn = null;
2255                         firstPendingTransaction = pendingTransactionQueue.get(0);
2256                         pendingTransactionSpeculatedKeyValueTable.clear();
2257                 }
2258
2259                 // Find where to start arbitration from
2260                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2261
2262                 if (startIndex >= pendingTransactionQueue.size()) {
2263                         // Make sure we are not out of bounds
2264                         return;
2265                 }
2266
2267                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2268                         Transaction transaction = pendingTransactionQueue.get(i);
2269
2270                         lastPendingTransactionSpeculatedOn = transaction;
2271
2272                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2273                                 // Guard evaluated to true so update the speculative table
2274                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2275                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2276                                 }
2277                         }
2278                 }
2279         }
2280
2281         /**
2282          * Set dead and remove from the live transaction tables the transactions that are dead
2283          */
2284         private void updateLiveTransactionsAndStatus() {
2285
2286                 // Go through each of the transactions
2287                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2288                         Transaction transaction = iter.next().getValue();
2289
2290                         // Check if the transaction is dead
2291                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2292                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2293
2294                                 // Set dead the transaction
2295                                 transaction.setDead();
2296
2297                                 // Remove the transaction from the live table
2298                                 iter.remove();
2299                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2300                         }
2301                 }
2302
2303                 // Go through each of the transactions
2304                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2305                         TransactionStatus status = iter.next().getValue();
2306
2307                         // Check if the transaction is dead
2308                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2309                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2310
2311                                 // Set committed
2312                                 status.setStatus(TransactionStatus.StatusCommitted);
2313
2314                                 // Remove
2315                                 iter.remove();
2316                         }
2317                 }
2318         }
2319
2320         /**
2321          * Process this slot, entry by entry.  Also update the latest message sent by slot
2322          */
2323         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2324
2325                 // Update the last message seen
2326                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2327
2328                 // Process each entry in the slot
2329                 for (Entry entry : slot.getEntries()) {
2330                         switch (entry.getType()) {
2331
2332                         case Entry.TypeCommitPart:
2333                                 processEntry((CommitPart)entry);
2334                                 break;
2335
2336                         case Entry.TypeAbort:
2337                                 processEntry((Abort)entry);
2338                                 break;
2339
2340                         case Entry.TypeTransactionPart:
2341                                 processEntry((TransactionPart)entry);
2342                                 break;
2343
2344                         case Entry.TypeNewKey:
2345                                 processEntry((NewKey)entry);
2346                                 break;
2347
2348                         case Entry.TypeLastMessage:
2349                                 processEntry((LastMessage)entry, machineSet);
2350                                 break;
2351
2352                         case Entry.TypeRejectedMessage:
2353                                 processEntry((RejectedMessage)entry, indexer);
2354                                 break;
2355
2356                         case Entry.TypeTableStatus:
2357                                 processEntry((TableStatus)entry, slot.getSequenceNumber());
2358                                 break;
2359
2360                         default:
2361                                 throw new Error("Unrecognized type: " + entry.getType());
2362                         }
2363                 }
2364         }
2365
2366         /**
2367          * Update the last message that was sent for a machine Id
2368          */
2369         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2370                 // Update what the last message received by a machine was
2371                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2372         }
2373
2374         /**
2375          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2376          */
2377         private void processEntry(NewKey entry) {
2378
2379                 // Update the arbitrator table with the new key information
2380                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2381
2382                 // Update what the latest live new key is
2383                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2384                 if (oldNewKey != null) {
2385                         // Delete the old new key messages
2386                         oldNewKey.setDead();
2387                 }
2388         }
2389
2390         /**
2391          * Process new table status entries and set dead the old ones as new ones come in.
2392          * keeps track of the largest and smallest table status seen in this current round
2393          * of updating the local copy of the block chain
2394          */
2395         private void processEntry(TableStatus entry, long seq) {
2396                 int newNumSlots = entry.getMaxSlots();
2397                 updateCurrMaxSize(newNumSlots);
2398
2399                 initExpectedSize(seq, newNumSlots);
2400
2401                 if (liveTableStatus != null) {
2402                         // We have a larger table status so the old table status is no longer alive
2403                         liveTableStatus.setDead();
2404                 }
2405
2406                 // Make this new table status the latest alive table status
2407                 liveTableStatus = entry;
2408         }
2409
2410         /**
2411          * Check old messages to see if there is a block chain violation. Also
2412          */
2413         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2414                 long oldSeqNum = entry.getOldSeqNum();
2415                 long newSeqNum = entry.getNewSeqNum();
2416                 boolean isequal = entry.getEqual();
2417                 long machineId = entry.getMachineID();
2418                 long seq = entry.getSequenceNumber();
2419
2420
2421                 // Check if we have messages that were supposed to be rejected in our local block chain
2422                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2423
2424                         // Get the slot
2425                         Slot slot = indexer.getSlot(seqNum);
2426
2427                         if (slot != null) {
2428                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2429
2430                                 long slotMachineId = slot.getMachineID();
2431                                 if (isequal != (slotMachineId == machineId)) {
2432                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2433                                 }
2434                         }
2435                 }
2436
2437
2438                 // Create a list of clients to watch until they see this rejected message entry.
2439                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2440                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2441
2442                         // Machine ID for the last message entry
2443                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2444
2445                         // We've seen it, don't need to continue to watch.  Our next
2446                         // message will implicitly acknowledge it.
2447                         if (lastMessageEntryMachineId == localMachineId) {
2448                                 continue;
2449                         }
2450
2451                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2452                         long entrySequenceNumber = lastMessageValue.getFirst();
2453
2454                         if (entrySequenceNumber < seq) {
2455
2456                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2457                                 addWatchList(lastMessageEntryMachineId, entry);
2458
2459                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2460                                 deviceWatchSet.add(lastMessageEntryMachineId);
2461                         }
2462                 }
2463
2464                 if (deviceWatchSet.isEmpty()) {
2465                         // This rejected message has been seen by all the clients so
2466                         entry.setDead();
2467                 } else {
2468                         // We need to watch this rejected message
2469                         entry.setWatchSet(deviceWatchSet);
2470                 }
2471         }
2472
2473         /**
2474          * Check if this abort is live, if not then save it so we can kill it later.
2475          * update the last transaction number that was arbitrated on.
2476          */
2477         private void processEntry(Abort entry) {
2478
2479
2480                 if (entry.getTransactionSequenceNumber() != -1) {
2481                         // update the transaction status if it was sent to the server
2482                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2483                         if (status != null) {
2484                                 status.setStatus(TransactionStatus.StatusAborted);
2485                         }
2486                 }
2487
2488                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2489                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2490                 if (previouslySeenAbort != null) {
2491                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2492                 }
2493
2494                 if (entry.getTransactionArbitrator() == localMachineId) {
2495                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2496                 }
2497
2498                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2499
2500                         // The machine already saw this so it is dead
2501                         entry.setDead();
2502                         liveAbortTable.remove(entry.getAbortId());
2503
2504                         if (entry.getTransactionArbitrator() == localMachineId) {
2505                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2506                         }
2507
2508                         return;
2509                 }
2510
2511
2512
2513
2514                 // Update the last arbitration data that we have seen so far
2515                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2516
2517                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2518                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2519                                 // Is larger
2520                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2521                         }
2522                 } else {
2523                         // Never seen any data from this arbitrator so record the first one
2524                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2525                 }
2526
2527
2528                 // Set dead a transaction if we can
2529                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2530                 if (transactionToSetDead != null) {
2531                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2532                 }
2533
2534                 // Update the last transaction sequence number that the arbitrator arbitrated on
2535                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2536                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2537
2538                         // Is a valid one
2539                         if (entry.getTransactionSequenceNumber() != -1) {
2540                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2541                         }
2542                 }
2543         }
2544
2545         /**
2546          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2547          */
2548         private void processEntry(TransactionPart entry) {
2549                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2550                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2551                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2552                         // This transaction is dead, it was already committed or aborted
2553                         entry.setDead();
2554                         return;
2555                 }
2556
2557                 // This part is still alive
2558                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2559
2560                 if (transactionPart == null) {
2561                         // Dont have a table for this machine Id yet so make one
2562                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2563                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2564                 }
2565
2566                 // Update the part and set dead ones we have already seen (got a rescued version)
2567                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2568                 if (previouslySeenPart != null) {
2569                         previouslySeenPart.setDead();
2570                 }
2571         }
2572
2573         /**
2574          * Process new commit entries and save them for future use.  Delete duplicates
2575          */
2576         private void processEntry(CommitPart entry) {
2577
2578
2579                 // Update the last transaction that was updated if we can
2580                 if (entry.getTransactionSequenceNumber() != -1) {
2581                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2582
2583                         // Update the last transaction sequence number that the arbitrator arbitrated on
2584                         if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2585                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2586                         }
2587                 }
2588
2589
2590
2591
2592                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2593
2594                 if (commitPart == null) {
2595                         // Don't have a table for this machine Id yet so make one
2596                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2597                         newCommitParts.put(entry.getMachineId(), commitPart);
2598                 }
2599
2600                 // Update the part and set dead ones we have already seen (got a rescued version)
2601                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2602                 if (previouslySeenPart != null) {
2603                         previouslySeenPart.setDead();
2604                 }
2605         }
2606
2607         /**
2608          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2609          * Updates the live aborts, removes those that are dead and sets them dead.
2610          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2611          * other clients have not had a rollback on the last message.
2612          */
2613         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2614
2615                 // We have seen this machine ID
2616                 machineSet.remove(machineId);
2617
2618                 // Get the set of rejected messages that this machine Id is has not seen yet
2619                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2620
2621                 // If there is a rejected message that this machine Id has not seen yet
2622                 if (watchset != null) {
2623
2624                         // Go through each rejected message that this machine Id has not seen yet
2625                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2626
2627                                 RejectedMessage rm = rmit.next();
2628
2629                                 // If this machine Id has seen this rejected message...
2630                                 if (rm.getSequenceNumber() <= seqNum) {
2631
2632                                         // Remove it from our watchlist
2633                                         rmit.remove();
2634
2635                                         // Decrement machines that need to see this notification
2636                                         rm.removeWatcher(machineId);
2637                                 }
2638                         }
2639                 }
2640
2641                 // Set dead the abort
2642                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2643                         Abort abort = i.next().getValue();
2644
2645                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2646                                 abort.setDead();
2647                                 i.remove();
2648
2649                                 if (abort.getTransactionArbitrator() == localMachineId) {
2650                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2651                                 }
2652                         }
2653                 }
2654
2655
2656
2657                 if (machineId == localMachineId) {
2658                         // Our own messages are immediately dead.
2659                         if (liveness instanceof LastMessage) {
2660                                 ((LastMessage)liveness).setDead();
2661                         } else if (liveness instanceof Slot) {
2662                                 ((Slot)liveness).setDead();
2663                         } else {
2664                                 throw new Error("Unrecognized type");
2665                         }
2666                 }
2667
2668                 // Get the old last message for this device
2669                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2670                 if (lastMessageEntry == null) {
2671                         // If no last message then there is nothing else to process
2672                         return;
2673                 }
2674
2675                 long lastMessageSeqNum = lastMessageEntry.getFirst();
2676                 Liveness lastEntry = lastMessageEntry.getSecond();
2677
2678                 // If it is not our machine Id since we already set ours to dead
2679                 if (machineId != localMachineId) {
2680                         if (lastEntry instanceof LastMessage) {
2681                                 ((LastMessage)lastEntry).setDead();
2682                         } else if (lastEntry instanceof Slot) {
2683                                 ((Slot)lastEntry).setDead();
2684                         } else {
2685                                 throw new Error("Unrecognized type");
2686                         }
2687                 }
2688
2689                 // Make sure the server is not playing any games
2690                 if (machineId == localMachineId) {
2691
2692                         if (hadPartialSendToServer) {
2693                                 // We were not making any updates and we had a machine mismatch
2694                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2695                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2696                                 }
2697
2698                         } else {
2699                                 // We were not making any updates and we had a machine mismatch
2700                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2701                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2702                                 }
2703                         }
2704                 } else {
2705                         if (lastMessageSeqNum > seqNum) {
2706                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2707                         }
2708                 }
2709         }
2710
2711         /**
2712          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2713          * rejected message entry and which have not.
2714          */
2715         private void addWatchList(long machineId, RejectedMessage entry) {
2716                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2717                 if (entries == null) {
2718                         // There is no set for this machine ID yet so create one
2719                         entries = new HashSet<RejectedMessage>();
2720                         rejectedMessageWatchListTable.put(machineId, entries);
2721                 }
2722                 entries.add(entry);
2723         }
2724
2725         /**
2726          * Check if the HMAC chain is not violated
2727          */
2728         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2729                 for (int i = 0; i < newSlots.length; i++) {
2730                         Slot currSlot = newSlots[i];
2731                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2732                         if (prevSlot != null &&
2733                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2734                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2735                 }
2736         }
2737 }