Fixed bug
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15
16 /**
17  * IoTTable data structure.  Provides client interface.
18  * @author Brian Demsky
19  * @version 1.0
20  */
21
22 final public class Table {
23
24         /* Constants */
25         static final int FREE_SLOTS = 10; // Number of slots that should be kept free
26         static final int SKIP_THRESHOLD = 10;
27         static final double RESIZE_MULTIPLE = 1.2;
28         static final double RESIZE_THRESHOLD = 0.75;
29         static final int REJECTED_THRESHOLD = 5;
30
31         /* Helper Objects */
32         private SlotBuffer buffer = null;
33         private CloudComm cloud = null;
34         private Random random = null;
35         private TableStatus liveTableStatus = null;
36         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
37         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
38         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
39
40         /* Variables */
41         private int numberOfSlots = 0;  // Number of slots stored in buffer
42         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
43         private long liveSlotCount = 0; // Number of currently live slots
44         private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
45         private long localMachineId = 0; // Machine ID of this client device
46         private long sequenceNumber = 0; // Largest sequence number a client has received
47         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
48         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
49         private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
50         private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
51         private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
52         private long localArbitrationSequenceNumber = 0;
53         private boolean hadPartialSendToServer = false;
54         private boolean attemptedToSendToServer = false;
55         private long expectedsize;
56         private boolean didFindTableStatus = false;
57         private long currMaxSize = 0;
58
59         private Slot lastSlotAttemptedToSend = null;
60         private boolean lastIsNewKey = false;
61         private int lastNewSize = 0;
62         private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
63         private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
64         private NewKey lastNewKey = null;
65
66
67         /* Data Structures  */
68         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
69         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
70         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
71         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
72         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
73         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
74         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
75         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
76         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
77         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
78         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
79         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
80         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
81         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
82         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
83         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
84         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
85         private List<Transaction> pendingTransactionQueue = null;
86         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
87         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
88         private Map<Transaction, List<Integer>> transactionPartsSent = null;
89         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
90         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
91         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
92         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
93         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
94         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
95
96
97         public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
98                 localMachineId = _localMachineId;
99                 cloud = new CloudComm(this, baseurl, password, listeningPort);
100
101                 init();
102         }
103
104         public Table(CloudComm _cloud, long _localMachineId) {
105                 localMachineId = _localMachineId;
106                 cloud = _cloud;
107
108                 init();
109         }
110
111         /**
112          * Init all the stuff needed for for table usage
113          */
114         private void init() {
115
116                 // Init helper objects
117                 random = new Random();
118                 buffer = new SlotBuffer();
119
120                 // Set Variables
121                 oldestLiveSlotSequenceNumver = 1;
122
123                 // init data structs
124                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
125                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
126                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
127                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
128                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
129                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
130                 arbitratorTable = new HashMap<IoTString, Long>();
131                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
132                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
133                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
134                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
135                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
136                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
137                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
138                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
139                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
140                 rejectedSlotList = new Vector<Long>();
141                 pendingTransactionQueue = new ArrayList<Transaction>();
142                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
143                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
144                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
145                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
146                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
147                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
148                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
149                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
150                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
151
152
153                 // Other init stuff
154                 numberOfSlots = buffer.capacity();
155                 setResizeThreshold();
156         }
157
158         // TODO: delete method
159         public synchronized void printSlots() {
160                 long o = buffer.getOldestSeqNum();
161                 long n = buffer.getNewestSeqNum();
162
163                 int[] types = new int[10];
164
165                 int num = 0;
166
167                 int livec = 0;
168                 int deadc = 0;
169                 for (long i = o; i < (n + 1); i++) {
170                         Slot s = buffer.getSlot(i);
171
172                         Vector<Entry> entries = s.getEntries();
173
174                         for (Entry e : entries) {
175                                 if (e.isLive()) {
176                                         int type = e.getType();
177                                         types[type] = types[type] + 1;
178                                         num++;
179                                         livec++;
180                                 } else {
181                                         deadc++;
182                                 }
183                         }
184                 }
185
186                 for (int i = 0; i < 10; i++) {
187                         System.out.println(i + "    " + types[i]);
188                 }
189                 System.out.println("Live count:   " + livec);
190                 System.out.println("Dead count:   " + deadc);
191                 System.out.println("Old:   " + o);
192                 System.out.println("New:   " + n);
193                 System.out.println("Size:   " + buffer.size());
194                 // System.out.println("Commits:   " + liveCommitsTable.size());
195                 System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
196                 System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
197
198                 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
199                         System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
200                 }
201
202
203                 for (Long a : liveCommitsTable.keySet()) {
204                         for (Long b : liveCommitsTable.get(a).keySet()) {
205                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
206                                         System.out.print(kv + " ");
207                                 }
208                                 System.out.print("|| ");
209                         }
210                         System.out.println();
211                 }
212
213         }
214
215         /**
216          * Initialize the table by inserting a table status as the first entry into the table status
217          * also initialize the crypto stuff.
218          */
219         public synchronized void initTable() throws ServerException {
220                 cloud.initSecurity();
221
222                 // Create the first insertion into the block chain which is the table status
223                 Slot s = new Slot(this, 1, localMachineId);
224                 TableStatus status = new TableStatus(s, numberOfSlots);
225                 s.addEntry(status);
226                 Slot[] array = cloud.putSlot(s, numberOfSlots);
227
228                 if (array == null) {
229                         array = new Slot[] {s};
230                         // update local block chain
231                         validateAndUpdate(array, true);
232                 } else if (array.length == 1) {
233                         // in case we did push the slot BUT we failed to init it
234                         validateAndUpdate(array, true);
235                 } else {
236                         throw new Error("Error on initialization");
237                 }
238         }
239
240         /**
241          * Rebuild the table from scratch by pulling the latest block chain from the server.
242          */
243         public synchronized void rebuild() throws ServerException {
244                 // Just pull the latest slots from the server
245                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
246                 validateAndUpdate(newslots, true);
247         }
248
249         // public String toString() {
250         //      String retString = " Committed Table: \n";
251         //      retString += "---------------------------\n";
252         //      retString += commitedTable.toString();
253
254         //      retString += "\n\n";
255
256         //      retString += " Speculative Table: \n";
257         //      retString += "---------------------------\n";
258         //      retString += speculativeTable.toString();
259
260         //      return retString;
261         // }
262
263         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
264                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
265         }
266
267         public synchronized Long getArbitrator(IoTString key) {
268                 return arbitratorTable.get(key);
269         }
270
271         public synchronized void close() {
272                 cloud.close();
273         }
274
275         public synchronized IoTString getCommitted(IoTString key)  {
276                 KeyValue kv = committedKeyValueTable.get(key);
277
278                 if (kv != null) {
279                         return kv.getValue();
280                 } else {
281                         return null;
282                 }
283         }
284
285         public synchronized IoTString getSpeculative(IoTString key) {
286                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
287
288                 if (kv == null) {
289                         kv = speculatedKeyValueTable.get(key);
290                 }
291
292                 if (kv == null) {
293                         kv = committedKeyValueTable.get(key);
294                 }
295
296                 if (kv != null) {
297                         return kv.getValue();
298                 } else {
299                         return null;
300                 }
301         }
302
303         public synchronized IoTString getCommittedAtomic(IoTString key) {
304                 KeyValue kv = committedKeyValueTable.get(key);
305
306                 if (arbitratorTable.get(key) == null) {
307                         throw new Error("Key not Found.");
308                 }
309
310                 // Make sure new key value pair matches the current arbitrator
311                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
312                         // TODO: Maybe not throw en error
313                         throw new Error("Not all Key Values Match Arbitrator.");
314                 }
315
316                 if (kv != null) {
317                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
318                         return kv.getValue();
319                 } else {
320                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
321                         return null;
322                 }
323         }
324
325         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
326                 if (arbitratorTable.get(key) == null) {
327                         throw new Error("Key not Found.");
328                 }
329
330                 // Make sure new key value pair matches the current arbitrator
331                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
332                         // TODO: Maybe not throw en error
333                         throw new Error("Not all Key Values Match Arbitrator.");
334                 }
335
336                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
337
338                 if (kv == null) {
339                         kv = speculatedKeyValueTable.get(key);
340                 }
341
342                 if (kv == null) {
343                         kv = committedKeyValueTable.get(key);
344                 }
345
346                 if (kv != null) {
347                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
348                         return kv.getValue();
349                 } else {
350                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
351                         return null;
352                 }
353         }
354
355         public synchronized boolean update()  {
356                 try {
357                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
358                         validateAndUpdate(newSlots, false);
359                         sendToServer(null);
360
361
362                         updateLiveTransactionsAndStatus();
363
364                         return true;
365                 } catch (Exception e) {
366                         // e.printStackTrace();
367                 }
368
369                 return false;
370         }
371
372         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
373                 while (true) {
374                         if (arbitratorTable.get(keyName) != null) {
375                                 // There is already an arbitrator
376                                 return false;
377                         }
378
379                         NewKey newKey = new NewKey(null, keyName, machineId);
380                         if (sendToServer(newKey)) {
381                                 // If successfully inserted
382                                 return true;
383                         }
384                 }
385         }
386
387         public synchronized void startTransaction() {
388                 // Create a new transaction, invalidates any old pending transactions.
389                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
390         }
391
392         public synchronized void addKV(IoTString key, IoTString value) {
393
394                 // Make sure it is a valid key
395                 if (arbitratorTable.get(key) == null) {
396                         throw new Error("Key not Found.");
397                 }
398
399                 // Make sure new key value pair matches the current arbitrator
400                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
401                         // TODO: Maybe not throw en error
402                         throw new Error("Not all Key Values Match Arbitrator.");
403                 }
404
405                 // Add the key value to this transaction
406                 KeyValue kv = new KeyValue(key, value);
407                 pendingTransactionBuilder.addKV(kv);
408         }
409
410         public synchronized TransactionStatus commitTransaction() {
411
412                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
413                         // transaction with no updates will have no effect on the system
414                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
415                 }
416
417                 // Set the local transaction sequence number and increment
418                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
419                 localTransactionSequenceNumber++;
420
421                 // Create the transaction status
422                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
423
424                 // Create the new transaction
425                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
426                 newTransaction.setTransactionStatus(transactionStatus);
427
428                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
429                         // Add it to the queue and invalidate the builder for safety
430                         pendingTransactionQueue.add(newTransaction);
431                 } else {
432                         arbitrateOnLocalTransaction(newTransaction);
433                         updateLiveStateFromLocal();
434                 }
435
436                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
437
438                 try {
439                         sendToServer(null);
440                 } catch (ServerException e) {
441
442                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
443                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
444                                 Transaction transaction = iter.next();
445
446                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
447                                         // Already contacted this client so ignore all attempts to contact this client
448                                         // to preserve ordering for arbitrator
449                                         continue;
450                                 }
451
452                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
453
454                                 if (sendReturn.getFirst()) {
455                                         // Failed to contact over local
456                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
457                                 } else {
458                                         // Successful contact or should not contact
459
460                                         if (sendReturn.getSecond()) {
461                                                 // did arbitrate
462                                                 iter.remove();
463                                         }
464                                 }
465                         }
466                 }
467
468                 updateLiveStateFromLocal();
469
470                 return transactionStatus;
471         }
472
473         /**
474          * Get the machine ID for this client
475          */
476         public long getMachineId() {
477                 return localMachineId;
478         }
479
480         /**
481          * Decrement the number of live slots that we currently have
482          */
483         public void decrementLiveCount() {
484                 liveSlotCount--;
485         }
486
487         /**
488          * Recalculate the new resize threshold
489          */
490         private void setResizeThreshold() {
491                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
492                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
493         }
494
495
496         boolean lastInsertedNewKey = false;
497
498         private boolean sendToServer(NewKey newKey) throws ServerException {
499
500                 boolean fromRetry = false;
501
502                 try {
503                         if (hadPartialSendToServer) {
504                                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
505                                 if (newSlots.length == 0) {
506                                         fromRetry = true;
507                                         ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
508
509                                         if (sendSlotsReturn.getFirst()) {
510                                                 if (newKey != null) {
511                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
512                                                                 newKey = null;
513                                                         }
514                                                 }
515
516                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
517                                                         transaction.resetServerFailure();
518
519                                                         // Update which transactions parts still need to be sent
520                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
521
522                                                         // Add the transaction status to the outstanding list
523                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
524
525                                                         // Update the transaction status
526                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
527
528                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
529                                                         if (transaction.didSendAllParts()) {
530                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
531                                                                 pendingTransactionQueue.remove(transaction);
532                                                         }
533                                                 }
534                                         } else {
535
536                                                 newSlots = sendSlotsReturn.getThird();
537
538                                                 boolean isInserted = false;
539                                                 for (Slot s : newSlots) {
540                                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
541                                                                 isInserted = true;
542                                                                 break;
543                                                         }
544                                                 }
545
546                                                 for (Slot s : newSlots) {
547                                                         if (isInserted) {
548                                                                 break;
549                                                         }
550
551                                                         // Process each entry in the slot
552                                                         for (Entry entry : s.getEntries()) {
553
554                                                                 if (entry.getType() == Entry.TypeLastMessage) {
555                                                                         LastMessage lastMessage = (LastMessage)entry;
556                                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
557                                                                                 isInserted = true;
558                                                                                 break;
559                                                                         }
560                                                                 }
561                                                         }
562                                                 }
563
564                                                 if (isInserted) {
565                                                         if (newKey != null) {
566                                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
567                                                                         newKey = null;
568                                                                 }
569                                                         }
570
571                                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
572                                                                 transaction.resetServerFailure();
573
574                                                                 // Update which transactions parts still need to be sent
575                                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
576
577                                                                 // Add the transaction status to the outstanding list
578                                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
579
580                                                                 // Update the transaction status
581                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
582
583                                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
584                                                                 if (transaction.didSendAllParts()) {
585                                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
586                                                                         pendingTransactionQueue.remove(transaction);
587                                                                 } else {
588                                                                         transaction.resetServerFailure();
589                                                                         // Set the transaction sequence number back to nothing
590                                                                         if (!transaction.didSendAPartToServer()) {
591                                                                                 transaction.setSequenceNumber(-1);
592                                                                         }
593                                                                 }
594                                                         }
595                                                 }
596                                         }
597
598                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
599                                                 transaction.resetServerFailure();
600                                                 // Set the transaction sequence number back to nothing
601                                                 if (!transaction.didSendAPartToServer()) {
602                                                         transaction.setSequenceNumber(-1);
603                                                 }
604                                         }
605
606                                         if (sendSlotsReturn.getThird().length != 0) {
607                                                 // insert into the local block chain
608                                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
609                                         }
610                                         // continue;
611                                 } else {
612                                         boolean isInserted = false;
613                                         for (Slot s : newSlots) {
614                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
615                                                         isInserted = true;
616                                                         break;
617                                                 }
618                                         }
619
620                                         for (Slot s : newSlots) {
621                                                 if (isInserted) {
622                                                         break;
623                                                 }
624
625                                                 // Process each entry in the slot
626                                                 for (Entry entry : s.getEntries()) {
627
628                                                         if (entry.getType() == Entry.TypeLastMessage) {
629                                                                 LastMessage lastMessage = (LastMessage)entry;
630                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
631                                                                         isInserted = true;
632                                                                         break;
633                                                                 }
634                                                         }
635                                                 }
636                                         }
637
638                                         if (isInserted) {
639                                                 if (newKey != null) {
640                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
641                                                                 newKey = null;
642                                                         }
643                                                 }
644
645                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
646                                                         transaction.resetServerFailure();
647
648                                                         // Update which transactions parts still need to be sent
649                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
650
651                                                         // Add the transaction status to the outstanding list
652                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
653
654                                                         // Update the transaction status
655                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
656
657                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
658                                                         if (transaction.didSendAllParts()) {
659                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
660                                                                 pendingTransactionQueue.remove(transaction);
661                                                         } else {
662                                                                 transaction.resetServerFailure();
663                                                                 // Set the transaction sequence number back to nothing
664                                                                 if (!transaction.didSendAPartToServer()) {
665                                                                         transaction.setSequenceNumber(-1);
666                                                                 }
667                                                         }
668                                                 }
669                                         } else {
670                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
671                                                         transaction.resetServerFailure();
672                                                         // Set the transaction sequence number back to nothing
673                                                         if (!transaction.didSendAPartToServer()) {
674                                                                 transaction.setSequenceNumber(-1);
675                                                         }
676                                                 }
677                                         }
678
679                                         // insert into the local block chain
680                                         validateAndUpdate(newSlots, true);
681                                 }
682                         }
683                 } catch (ServerException e) {
684                         throw e;
685                 }
686
687
688                 try {
689                         // While we have stuff that needs inserting into the block chain
690                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
691                                 fromRetry = false;
692
693                                 if (hadPartialSendToServer) {
694                                         throw new Error("Should Be error free");
695                                 }
696
697
698
699                                 // If there is a new key with same name then end
700                                 if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
701                                         System.out.println("New Key Fail");
702                                         return false;
703                                 }
704
705                                 // Create the slot
706                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
707
708                                 // Try to fill the slot with data
709                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
710                                 boolean needsResize = fillSlotsReturn.getFirst();
711                                 int newSize = fillSlotsReturn.getSecond();
712                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
713
714                                 if (needsResize) {
715                                         // Reset which transaction to send
716                                         for (Transaction transaction : transactionPartsSent.keySet()) {
717                                                 transaction.resetNextPartToSend();
718
719                                                 // Set the transaction sequence number back to nothing
720                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
721                                                         transaction.setSequenceNumber(-1);
722                                                 }
723                                         }
724
725                                         // Clear the sent data since we are trying again
726                                         pendingSendArbitrationEntriesToDelete.clear();
727                                         transactionPartsSent.clear();
728
729                                         // We needed a resize so try again
730                                         fillSlot(slot, true, newKey);
731                                 }
732
733                                 lastSlotAttemptedToSend = slot;
734                                 lastIsNewKey = (newKey != null);
735                                 lastInsertedNewKey = insertedNewKey;
736                                 lastNewSize = newSize;
737                                 lastNewKey = newKey;
738                                 lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
739                                 lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
740
741
742                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
743
744                                 if (sendSlotsReturn.getFirst()) {
745
746                                         // Did insert into the block chain
747
748                                         if (insertedNewKey) {
749                                                 // This slot was what was inserted not a previous slot
750
751                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
752                                                 newKey = null;
753                                         }
754
755                                         // Remove the aborts and commit parts that were sent from the pending to send queue
756                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
757                                                 ArbitrationRound round = iter.next();
758                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
759
760                                                 if (round.isDoneSending()) {
761                                                         // Sent all the parts
762                                                         iter.remove();
763                                                 }
764                                         }
765
766                                         for (Transaction transaction : transactionPartsSent.keySet()) {
767                                                 transaction.resetServerFailure();
768
769                                                 // Update which transactions parts still need to be sent
770                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
771
772                                                 // Add the transaction status to the outstanding list
773                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
774
775                                                 // Update the transaction status
776                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
777
778                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
779                                                 if (transaction.didSendAllParts()) {
780                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
781                                                         pendingTransactionQueue.remove(transaction);
782
783                                                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
784                                                                 System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + slot.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
785                                                         }
786                                                 }
787                                         }
788                                 } else {
789
790                                         // if (!sendSlotsReturn.getSecond()) {
791                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
792                                         //              transaction.resetServerFailure();
793                                         //      }
794                                         // } else {
795                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
796                                         //              transaction.resetServerFailure();
797
798                                         //              // Update which transactions parts still need to be sent
799                                         //              transaction.removeSentParts(transactionPartsSent.get(transaction));
800
801                                         //              // Add the transaction status to the outstanding list
802                                         //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
803
804                                         //              // Update the transaction status
805                                         //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
806
807                                         //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
808                                         //              if (transaction.didSendAllParts()) {
809                                         //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
810                                         //                      pendingTransactionQueue.remove(transaction);
811
812                                         //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
813                                         //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
814                                         //                      }
815                                         //              }
816                                         //      }
817                                         // }
818
819                                         // Reset which transaction to send
820                                         for (Transaction transaction : transactionPartsSent.keySet()) {
821                                                 transaction.resetNextPartToSend();
822                                                 // transaction.resetNextPartToSend();
823
824                                                 // Set the transaction sequence number back to nothing
825                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
826                                                         transaction.setSequenceNumber(-1);
827                                                 }
828                                         }
829                                 }
830
831                                 // Clear the sent data in preparation for next send
832                                 pendingSendArbitrationEntriesToDelete.clear();
833                                 transactionPartsSent.clear();
834
835                                 if (sendSlotsReturn.getThird().length != 0) {
836                                         // insert into the local block chain
837                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
838                                 }
839                         }
840
841                 } catch (ServerException e) {
842
843                         System.out.println("Server Failure:   " + e.getType());
844                         for (Transaction transaction : transactionPartsSent.keySet()) {
845                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
846                                         System.out.println("Sent Error: " + kv + "    " + e.getType());
847                                 }
848                         }
849
850                         if (e.getType() != ServerException.TypeInputTimeout) {
851                                 // e.printStackTrace();
852
853                                 // Nothing was able to be sent to the server so just clear these data structures
854                                 for (Transaction transaction : transactionPartsSent.keySet()) {
855                                         transaction.resetNextPartToSend();
856
857                                         // Set the transaction sequence number back to nothing
858                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
859                                                 transaction.setSequenceNumber(-1);
860                                         }
861                                 }
862                         } else {
863                                 // There was a partial send to the server
864                                 hadPartialSendToServer = true;
865
866
867                                 // if (!fromRetry) {
868                                 //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
869                                 //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
870                                 // }
871
872                                 // Nothing was able to be sent to the server so just clear these data structures
873                                 for (Transaction transaction : transactionPartsSent.keySet()) {
874                                         transaction.resetNextPartToSend();
875                                         transaction.setServerFailure();
876                                 }
877                         }
878
879                         pendingSendArbitrationEntriesToDelete.clear();
880                         transactionPartsSent.clear();
881
882                         throw e;
883                 }
884
885                 return newKey == null;
886         }
887
888         public synchronized boolean updateFromLocal(long machineId) {
889                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
890                 if (localCommunicationInformation == null) {
891                         // Cant talk to that device locally so do nothing
892                         return false;
893                 }
894
895                 // Get the size of the send data
896                 int sendDataSize = Integer.BYTES + Long.BYTES;
897
898                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
899                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
900                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
901                 }
902
903                 byte[] sendData = new byte[sendDataSize];
904                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
905
906                 // Encode the data
907                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
908                 bbEncode.putInt(0);
909
910                 // Send by local
911                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
912
913                 if (returnData == null) {
914                         // Could not contact server
915                         return false;
916                 }
917
918                 // Decode the data
919                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
920                 int numberOfEntries = bbDecode.getInt();
921
922                 for (int i = 0; i < numberOfEntries; i++) {
923                         byte type = bbDecode.get();
924                         if (type == Entry.TypeAbort) {
925                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
926                                 processEntry(abort);
927                         } else if (type == Entry.TypeCommitPart) {
928                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
929                                 processEntry(commitPart);
930                         }
931                 }
932
933                 updateLiveStateFromLocal();
934
935                 return true;
936         }
937
938         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
939
940                 // Get the devices local communications
941                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
942
943                 if (localCommunicationInformation == null) {
944                         // Cant talk to that device locally so do nothing
945                         return new Pair<Boolean, Boolean>(true, false);
946                 }
947
948                 // Get the size of the send data
949                 int sendDataSize = Integer.BYTES + Long.BYTES;
950                 for (TransactionPart part : transaction.getParts().values()) {
951                         sendDataSize += part.getSize();
952                 }
953
954                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
955                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
956                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
957                 }
958
959                 // Make the send data size
960                 byte[] sendData = new byte[sendDataSize];
961                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
962
963                 // Encode the data
964                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
965                 bbEncode.putInt(transaction.getParts().size());
966                 for (TransactionPart part : transaction.getParts().values()) {
967                         part.encode(bbEncode);
968                 }
969
970
971                 // Send by local
972                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
973
974                 if (returnData == null) {
975                         // Could not contact server
976                         return new Pair<Boolean, Boolean>(true, false);
977                 }
978
979                 // Decode the data
980                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
981                 boolean didCommit = bbDecode.get() == 1;
982                 boolean couldArbitrate = bbDecode.get() == 1;
983                 int numberOfEntries = bbDecode.getInt();
984                 boolean foundAbort = false;
985
986                 for (int i = 0; i < numberOfEntries; i++) {
987                         byte type = bbDecode.get();
988                         if (type == Entry.TypeAbort) {
989                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
990
991                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
992                                         foundAbort = true;
993                                 }
994
995                                 processEntry(abort);
996                         } else if (type == Entry.TypeCommitPart) {
997                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
998                                 processEntry(commitPart);
999                         }
1000                 }
1001
1002                 updateLiveStateFromLocal();
1003
1004                 if (couldArbitrate) {
1005                         TransactionStatus status =  transaction.getTransactionStatus();
1006                         if (didCommit) {
1007                                 status.setStatus(TransactionStatus.StatusCommitted);
1008                         } else {
1009                                 status.setStatus(TransactionStatus.StatusAborted);
1010                         }
1011                 } else {
1012                         TransactionStatus status =  transaction.getTransactionStatus();
1013                         if (foundAbort) {
1014                                 status.setStatus(TransactionStatus.StatusAborted);
1015                         } else {
1016                                 status.setStatus(TransactionStatus.StatusCommitted);
1017                         }
1018                 }
1019
1020                 return new Pair<Boolean, Boolean>(false, true);
1021         }
1022
1023         public synchronized byte[] acceptDataFromLocal(byte[] data) {
1024
1025                 // Decode the data
1026                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1027                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1028                 int numberOfParts = bbDecode.getInt();
1029
1030                 // If we did commit a transaction or not
1031                 boolean didCommit = false;
1032                 boolean couldArbitrate = false;
1033
1034                 if (numberOfParts != 0) {
1035
1036                         // decode the transaction
1037                         Transaction transaction = new Transaction();
1038                         for (int i = 0; i < numberOfParts; i++) {
1039                                 bbDecode.get();
1040                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
1041                                 transaction.addPartDecode(newPart);
1042                         }
1043
1044                         // Arbitrate on transaction and pull relevant return data
1045                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1046                         couldArbitrate = localArbitrateReturn.getFirst();
1047                         didCommit = localArbitrateReturn.getSecond();
1048
1049                         updateLiveStateFromLocal();
1050
1051                         // Transaction was sent to the server so keep track of it to prevent double commit
1052                         if (transaction.getSequenceNumber() != -1) {
1053                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1054                         }
1055                 }
1056
1057                 // The data to send back
1058                 int returnDataSize = 0;
1059                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
1060
1061                 // Get the aborts to send back
1062                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
1063                 Collections.sort(abortLocalSequenceNumbers);
1064                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1065                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1066                                 continue;
1067                         }
1068
1069                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1070                         unseenArbitrations.add(abort);
1071                         returnDataSize += abort.getSize();
1072                 }
1073
1074                 // Get the commits to send back
1075                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1076                 if (commitForClientTable != null) {
1077                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1078                         Collections.sort(commitLocalSequenceNumbers);
1079
1080                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1081                                 Commit commit = commitForClientTable.get(localSequenceNumber);
1082
1083                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1084                                         continue;
1085                                 }
1086
1087                                 unseenArbitrations.addAll(commit.getParts().values());
1088
1089                                 for (CommitPart commitPart : commit.getParts().values()) {
1090                                         returnDataSize += commitPart.getSize();
1091                                 }
1092                         }
1093                 }
1094
1095                 // Number of arbitration entries to decode
1096                 returnDataSize += 2 * Integer.BYTES;
1097
1098                 // Boolean of did commit or not
1099                 if (numberOfParts != 0) {
1100                         returnDataSize += Byte.BYTES;
1101                 }
1102
1103                 // Data to send Back
1104                 byte[] returnData = new byte[returnDataSize];
1105                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1106
1107                 if (numberOfParts != 0) {
1108                         if (didCommit) {
1109                                 bbEncode.put((byte)1);
1110                         } else {
1111                                 bbEncode.put((byte)0);
1112                         }
1113                         if (couldArbitrate) {
1114                                 bbEncode.put((byte)1);
1115                         } else {
1116                                 bbEncode.put((byte)0);
1117                         }
1118                 }
1119
1120                 bbEncode.putInt(unseenArbitrations.size());
1121                 for (Entry entry : unseenArbitrations) {
1122                         entry.encode(bbEncode);
1123                 }
1124
1125                 return returnData;
1126         }
1127
1128         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
1129
1130                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
1131                 attemptedToSendToServer = true;
1132
1133                 boolean inserted = false;
1134                 boolean lastTryInserted = false;
1135
1136                 Slot[] array = cloud.putSlot(slot, newSize);
1137                 if (array == null) {
1138                         array = new Slot[] {slot};
1139                         rejectedSlotList.clear();
1140                         inserted = true;
1141                 }       else {
1142                         if (array.length == 0) {
1143                                 throw new Error("Server Error: Did not send any slots");
1144                         }
1145
1146                         // if (attemptedToSendToServerTmp) {
1147                         if (hadPartialSendToServer) {
1148
1149                                 boolean isInserted = false;
1150                                 for (Slot s : array) {
1151                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1152                                                 isInserted = true;
1153                                                 break;
1154                                         }
1155                                 }
1156
1157                                 for (Slot s : array) {
1158                                         if (isInserted) {
1159                                                 break;
1160                                         }
1161
1162                                         // Process each entry in the slot
1163                                         for (Entry entry : s.getEntries()) {
1164
1165                                                 if (entry.getType() == Entry.TypeLastMessage) {
1166                                                         LastMessage lastMessage = (LastMessage)entry;
1167
1168                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1169                                                                 isInserted = true;
1170                                                                 break;
1171                                                         }
1172                                                 }
1173                                         }
1174                                 }
1175
1176                                 if (!isInserted) {
1177                                         rejectedSlotList.add(slot.getSequenceNumber());
1178                                         lastTryInserted = false;
1179                                 } else {
1180                                         lastTryInserted = true;
1181                                 }
1182                         } else {
1183                                 rejectedSlotList.add(slot.getSequenceNumber());
1184                                 lastTryInserted = false;
1185                         }
1186                 }
1187
1188                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
1189         }
1190
1191         /**
1192          * Returns false if a resize was needed
1193          */
1194         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
1195                 int newSize = 0;
1196                 if (liveSlotCount > bufferResizeThreshold) {
1197                         resize = true; //Resize is forced
1198                 }
1199
1200                 if (resize) {
1201                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1202                         TableStatus status = new TableStatus(slot, newSize);
1203                         slot.addEntry(status);
1204                 }
1205
1206                 // Fill with rejected slots first before doing anything else
1207                 doRejectedMessages(slot);
1208
1209                 // Do mandatory rescue of entries
1210                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1211
1212                 // Extract working variables
1213                 boolean needsResize = mandatoryRescueReturn.getFirst();
1214                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
1215                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1216
1217                 if (needsResize && !resize) {
1218                         // We need to resize but we are not resizing so return false
1219                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
1220                 }
1221
1222                 boolean inserted = false;
1223                 if (newKeyEntry != null) {
1224                         newKeyEntry.setSlot(slot);
1225                         if (slot.hasSpace(newKeyEntry)) {
1226                                 slot.addEntry(newKeyEntry);
1227                                 inserted = true;
1228                         }
1229                 }
1230
1231                 // Clear the transactions, aborts and commits that were sent previously
1232                 transactionPartsSent.clear();
1233                 pendingSendArbitrationEntriesToDelete.clear();
1234
1235                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1236                         boolean isFull = false;
1237                         round.generateParts();
1238                         List<Entry> parts = round.getParts();
1239
1240                         // Insert pending arbitration data
1241                         for (Entry arbitrationData : parts) {
1242
1243                                 // If it is an abort then we need to set some information
1244                                 if (arbitrationData instanceof Abort) {
1245                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1246                                 }
1247
1248                                 if (!slot.hasSpace(arbitrationData)) {
1249                                         // No space so cant do anything else with these data entries
1250                                         isFull = true;
1251                                         break;
1252                                 }
1253
1254                                 // Add to this current slot and add it to entries to delete
1255                                 slot.addEntry(arbitrationData);
1256                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1257                         }
1258
1259                         if (isFull) {
1260                                 break;
1261                         }
1262                 }
1263
1264                 if (pendingTransactionQueue.size() > 0) {
1265
1266                         Transaction transaction = pendingTransactionQueue.get(0);
1267
1268                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1269                         // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1270                         //      transaction.setSequenceNumber(slot.getSequenceNumber());
1271                         // }
1272
1273                         if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1274                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1275                         }
1276
1277
1278                         while (true) {
1279                                 TransactionPart part = transaction.getNextPartToSend();
1280
1281                                 if (part == null) {
1282                                         // Ran out of parts to send for this transaction so move on
1283                                         break;
1284                                 }
1285
1286                                 if (slot.hasSpace(part)) {
1287                                         slot.addEntry(part);
1288                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1289                                         if (partsSent == null) {
1290                                                 partsSent = new ArrayList<Integer>();
1291                                                 transactionPartsSent.put(transaction, partsSent);
1292                                         }
1293                                         partsSent.add(part.getPartNumber());
1294                                         transactionPartsSent.put(transaction, partsSent);
1295                                 } else {
1296                                         break;
1297                                 }
1298                         }
1299                 }
1300
1301                 // Fill the remainder of the slot with rescue data
1302                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1303
1304                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1305         }
1306
1307         private void doRejectedMessages(Slot s) {
1308                 if (! rejectedSlotList.isEmpty()) {
1309                         /* TODO: We should avoid generating a rejected message entry if
1310                          * there is already a sufficient entry in the queue (e.g.,
1311                          * equalsto value of true and same sequence number).  */
1312
1313                         long old_seqn = rejectedSlotList.firstElement();
1314                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1315                                 long new_seqn = rejectedSlotList.lastElement();
1316                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1317                                 s.addEntry(rm);
1318                         } else {
1319                                 long prev_seqn = -1;
1320                                 int i = 0;
1321                                 /* Go through list of missing messages */
1322                                 for (; i < rejectedSlotList.size(); i++) {
1323                                         long curr_seqn = rejectedSlotList.get(i);
1324                                         Slot s_msg = buffer.getSlot(curr_seqn);
1325                                         if (s_msg != null)
1326                                                 break;
1327                                         prev_seqn = curr_seqn;
1328                                 }
1329                                 /* Generate rejected message entry for missing messages */
1330                                 if (prev_seqn != -1) {
1331                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1332                                         s.addEntry(rm);
1333                                 }
1334                                 /* Generate rejected message entries for present messages */
1335                                 for (; i < rejectedSlotList.size(); i++) {
1336                                         long curr_seqn = rejectedSlotList.get(i);
1337                                         Slot s_msg = buffer.getSlot(curr_seqn);
1338                                         long machineid = s_msg.getMachineID();
1339                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1340                                         s.addEntry(rm);
1341                                 }
1342                         }
1343                 }
1344         }
1345
1346         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1347                 long newestSequenceNumber = buffer.getNewestSeqNum();
1348                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1349                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1350                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1351                 }
1352
1353                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1354                 boolean seenLiveSlot = false;
1355                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1356                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1357
1358
1359                 // Mandatory Rescue
1360                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1361                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1362                         // Push slot number forward
1363                         if (! seenLiveSlot) {
1364                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1365                         }
1366
1367                         if (!previousSlot.isLive()) {
1368                                 continue;
1369                         }
1370
1371                         // We have seen a live slot
1372                         seenLiveSlot = true;
1373
1374                         // Get all the live entries for a slot
1375                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1376
1377                         // Iterate over all the live entries and try to rescue them
1378                         for (Entry liveEntry : liveEntries) {
1379                                 if (slot.hasSpace(liveEntry)) {
1380
1381                                         // Enough space to rescue the entry
1382                                         slot.addEntry(liveEntry);
1383                                 } else if (currentSequenceNumber == firstIfFull) {
1384                                         //if there's no space but the entry is about to fall off the queue
1385                                         System.out.println("B"); //?
1386                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1387
1388                                 }
1389                         }
1390                 }
1391
1392                 // Did not resize
1393                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1394         }
1395
1396         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1397                 /* now go through live entries from least to greatest sequence number until
1398                  * either all live slots added, or the slot doesn't have enough room
1399                  * for SKIP_THRESHOLD consecutive entries*/
1400                 int skipcount = 0;
1401                 long newestseqnum = buffer.getNewestSeqNum();
1402                 search:
1403                 for (; seqn <= newestseqnum; seqn++) {
1404                         Slot prevslot = buffer.getSlot(seqn);
1405                         //Push slot number forward
1406                         if (!seenliveslot)
1407                                 oldestLiveSlotSequenceNumver = seqn;
1408
1409                         if (!prevslot.isLive())
1410                                 continue;
1411                         seenliveslot = true;
1412                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1413                         for (Entry liveentry : liveentries) {
1414                                 if (s.hasSpace(liveentry))
1415                                         s.addEntry(liveentry);
1416                                 else {
1417                                         skipcount++;
1418                                         if (skipcount > SKIP_THRESHOLD)
1419                                                 break search;
1420                                 }
1421                         }
1422                 }
1423         }
1424
1425         /**
1426          * Checks for malicious activity and updates the local copy of the block chain.
1427          */
1428         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1429
1430                 // The cloud communication layer has checked slot HMACs already before decoding
1431                 if (newSlots.length == 0) {
1432                         return;
1433                 }
1434
1435                 // Make sure all slots are newer than the last largest slot this client has seen
1436                 long firstSeqNum = newSlots[0].getSequenceNumber();
1437                 if (firstSeqNum <= sequenceNumber) {
1438                         throw new Error("Server Error: Sent older slots!");
1439                 }
1440
1441                 // Create an object that can access both new slots and slots in our local chain
1442                 // without committing slots to our local chain
1443                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1444
1445                 // Check that the HMAC chain is not broken
1446                 checkHMACChain(indexer, newSlots);
1447
1448                 // Set to keep track of messages from clients
1449                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1450
1451                 // Process each slots data
1452                 for (Slot slot : newSlots) {
1453                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1454                         updateExpectedSize();
1455                 }
1456
1457                 // If there is a gap, check to see if the server sent us everything.
1458                 if (firstSeqNum != (sequenceNumber + 1)) {
1459
1460                         // Check the size of the slots that were sent down by the server.
1461                         // Can only check the size if there was a gap
1462                         checkNumSlots(newSlots.length);
1463
1464                         // Since there was a gap every machine must have pushed a slot or must have
1465                         // a last message message.  If not then the server is hiding slots
1466                         if (!machineSet.isEmpty()) {
1467                                 throw new Error("Missing record for machines: " + machineSet);
1468                         }
1469                 }
1470
1471                 // Update the size of our local block chain.
1472                 commitNewMaxSize();
1473
1474                 // Commit new to slots to the local block chain.
1475                 for (Slot slot : newSlots) {
1476
1477                         // Insert this slot into our local block chain copy.
1478                         buffer.putSlot(slot);
1479
1480                         // Keep track of how many slots are currently live (have live data in them).
1481                         liveSlotCount++;
1482                 }
1483
1484                 // Get the sequence number of the latest slot in the system
1485                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1486
1487                 updateLiveStateFromServer();
1488
1489                 // No Need to remember after we pulled from the server
1490                 offlineTransactionsCommittedAndAtServer.clear();
1491
1492                 // This is invalidated now
1493                 hadPartialSendToServer = false;
1494         }
1495
1496         private void updateLiveStateFromServer() {
1497                 // Process the new transaction parts
1498                 processNewTransactionParts();
1499
1500                 // Do arbitration on new transactions that were received
1501                 arbitrateFromServer();
1502
1503                 // Update all the committed keys
1504                 boolean didCommitOrSpeculate = updateCommittedTable();
1505
1506                 // Delete the transactions that are now dead
1507                 updateLiveTransactionsAndStatus();
1508
1509                 // Do speculations
1510                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1511                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1512         }
1513
1514         private void updateLiveStateFromLocal() {
1515                 // Update all the committed keys
1516                 boolean didCommitOrSpeculate = updateCommittedTable();
1517
1518                 // Delete the transactions that are now dead
1519                 updateLiveTransactionsAndStatus();
1520
1521                 // Do speculations
1522                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1523                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1524         }
1525
1526         private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
1527                 if (didFindTableStatus) {
1528                         return;
1529                 }
1530                 long prevslots = firstSequenceNumber;
1531                 expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1532                 currMaxSize = numberOfSlots;
1533         }
1534
1535         private void updateExpectedSize() {
1536                 expectedsize++;
1537                 if (expectedsize > currMaxSize) {
1538                         expectedsize = currMaxSize;
1539                 }
1540         }
1541
1542
1543         /**
1544          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1545          * This is only called when we have a gap between the slots that we have locally and the slots
1546          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1547          * status message
1548          */
1549         private void checkNumSlots(int numberOfSlots) {
1550                 if (numberOfSlots != expectedsize) {
1551                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1552                 }
1553         }
1554
1555         private void updateCurrMaxSize(int newmaxsize) {
1556                 currMaxSize = newmaxsize;
1557         }
1558
1559
1560         /**
1561          * Update the size of of the local buffer if it is needed.
1562          */
1563         private void commitNewMaxSize() {
1564                 didFindTableStatus = false;
1565
1566                 // Resize the local slot buffer
1567                 if (numberOfSlots != currMaxSize) {
1568                         buffer.resize((int)currMaxSize);
1569                 }
1570
1571                 // Change the number of local slots to the new size
1572                 numberOfSlots = (int)currMaxSize;
1573
1574                 // Recalculate the resize threshold since the size of the local buffer has changed
1575                 setResizeThreshold();
1576         }
1577
1578         /**
1579          * Process the new transaction parts from this latest round of slots received from the server
1580          */
1581         private void processNewTransactionParts() {
1582
1583                 if (newTransactionParts.size() == 0) {
1584                         // Nothing new to process
1585                         return;
1586                 }
1587
1588                 // Iterate through all the machine Ids that we received new parts for
1589                 for (Long machineId : newTransactionParts.keySet()) {
1590                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1591
1592                         // Iterate through all the parts for that machine Id
1593                         for (Pair<Long, Integer> partId : parts.keySet()) {
1594                                 TransactionPart part = parts.get(partId);
1595
1596                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1597                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1598                                         // Set dead the transaction part
1599                                         part.setDead();
1600                                         continue;
1601                                 }
1602
1603                                 // Get the transaction object for that sequence number
1604                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1605
1606                                 if (transaction == null) {
1607                                         // This is a new transaction that we dont have so make a new one
1608                                         transaction = new Transaction();
1609
1610                                         // Insert this new transaction into the live tables
1611                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1612                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1613                                 }
1614
1615                                 // Add that part to the transaction
1616                                 transaction.addPartDecode(part);
1617                         }
1618                 }
1619
1620                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1621                 newTransactionParts.clear();
1622         }
1623
1624
1625         private long lastSeqNumArbOn = 0;
1626
1627         private void arbitrateFromServer() {
1628
1629                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1630                         // Nothing to arbitrate on so move on
1631                         return;
1632                 }
1633
1634                 // Get the transaction sequence numbers and sort from oldest to newest
1635                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1636                 Collections.sort(transactionSequenceNumbers);
1637
1638                 // Collection of key value pairs that are
1639                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1640
1641                 // The last transaction arbitrated on
1642                 long lastTransactionCommitted = -1;
1643                 Set<Abort> generatedAborts = new HashSet<Abort>();
1644
1645                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1646                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1647
1648                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1649                                 System.out.println("Arb Seen: " + kv + "   " + lastSeqNumArbOn + "    " + transactionSequenceNumber + "  " + localMachineId + "   " + transaction.getArbitrator());
1650                         }
1651
1652
1653                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1654                         if (transaction.getArbitrator() != localMachineId) {
1655                                 continue;
1656                         }
1657
1658                         if (transactionSequenceNumber < lastSeqNumArbOn) {
1659                                 continue;
1660                         }
1661
1662                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1663                                 System.out.println("Arb Seen: " + kv + "   " + lastSeqNumArbOn + "    " + transactionSequenceNumber + "  " + localMachineId);
1664                         }
1665
1666
1667                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1668                                 // We have seen this already locally so dont commit again
1669                                 continue;
1670                         }
1671
1672
1673                         if (!transaction.isComplete()) {
1674                                 // Will arbitrate in incorrect order if we continue so just break
1675                                 // Most likely this
1676                                 break;
1677                         }
1678
1679                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1680                                 System.out.println("Arb on: " + kv + "   " + lastSeqNumArbOn + "    " + transactionSequenceNumber + "  " + localMachineId);
1681                         }
1682
1683
1684                         // update the largest transaction seen by arbitrator from server
1685                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1686                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1687                         } else {
1688                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1689                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1690                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1691                                 }
1692                         }
1693
1694                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1695                                 // Guard evaluated as true
1696
1697                                 // Update the local changes so we can make the commit
1698                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1699                                         speculativeTableTmp.put(kv.getKey(), kv);
1700                                 }
1701
1702                                 // Update what the last transaction committed was for use in batch commit
1703                                 lastTransactionCommitted = transactionSequenceNumber;
1704
1705                                 System.out.println("Commit Generated: " + lastTransactionCommitted + "   " + localMachineId);
1706                         } else {
1707                                 // Guard evaluated was false so create abort
1708
1709                                 // Create the abort
1710                                 Abort newAbort = new Abort(null,
1711                                                            transaction.getClientLocalSequenceNumber(),
1712                                                            transaction.getSequenceNumber(),
1713                                                            transaction.getMachineId(),
1714                                                            transaction.getArbitrator(),
1715                                                            localArbitrationSequenceNumber);
1716                                 localArbitrationSequenceNumber++;
1717
1718                                 generatedAborts.add(newAbort);
1719
1720                                 // Insert the abort so we can process
1721                                 processEntry(newAbort);
1722                         }
1723
1724                         lastSeqNumArbOn = transactionSequenceNumber;
1725
1726                         // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1727                 }
1728
1729                 Commit newCommit = null;
1730
1731                 // If there is something to commit
1732                 if (speculativeTableTmp.size() != 0) {
1733
1734                         // Create the commit and increment the commit sequence number
1735                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1736                         localArbitrationSequenceNumber++;
1737
1738                         // Add all the new keys to the commit
1739                         for (KeyValue kv : speculativeTableTmp.values()) {
1740                                 newCommit.addKV(kv);
1741                         }
1742
1743                         // create the commit parts
1744                         newCommit.createCommitParts();
1745
1746                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1747
1748                         // Insert the commit so we can process it
1749                         for (CommitPart commitPart : newCommit.getParts().values()) {
1750                                 processEntry(commitPart);
1751                         }
1752                 }
1753
1754                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1755                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1756                         pendingSendArbitrationRounds.add(arbitrationRound);
1757
1758                         if (compactArbitrationData()) {
1759                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1760                                 if (newArbitrationRound.getCommit() != null) {
1761                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1762                                                 processEntry(commitPart);
1763                                         }
1764                                 }
1765                         }
1766                 }
1767         }
1768
1769         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1770
1771                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1772                 if (transaction.getArbitrator() != localMachineId) {
1773                         return new Pair<Boolean, Boolean>(false, false);
1774                 }
1775
1776                 if (!transaction.isComplete()) {
1777                         // Will arbitrate in incorrect order if we continue so just break
1778                         // Most likely this
1779                         return new Pair<Boolean, Boolean>(false, false);
1780                 }
1781
1782                 if (transaction.getMachineId() != localMachineId) {
1783                         // dont do this check for local transactions
1784                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1785                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1786                                         // We've have already seen this from the server
1787                                         return new Pair<Boolean, Boolean>(false, false);
1788                                 }
1789                         }
1790                 }
1791
1792                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1793                         // Guard evaluated as true
1794
1795                         // Create the commit and increment the commit sequence number
1796                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1797                         localArbitrationSequenceNumber++;
1798
1799                         // Update the local changes so we can make the commit
1800                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1801                                 newCommit.addKV(kv);
1802                         }
1803
1804                         // create the commit parts
1805                         newCommit.createCommitParts();
1806
1807                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1808                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1809                         pendingSendArbitrationRounds.add(arbitrationRound);
1810
1811                         if (compactArbitrationData()) {
1812                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1813                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1814                                         processEntry(commitPart);
1815                                 }
1816                         } else {
1817                                 // Insert the commit so we can process it
1818                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1819                                         processEntry(commitPart);
1820                                 }
1821                         }
1822
1823                         if (transaction.getMachineId() == localMachineId) {
1824                                 TransactionStatus status = transaction.getTransactionStatus();
1825                                 if (status != null) {
1826                                         status.setStatus(TransactionStatus.StatusCommitted);
1827                                 }
1828                         }
1829
1830                         updateLiveStateFromLocal();
1831                         return new Pair<Boolean, Boolean>(true, true);
1832                 } else {
1833
1834                         if (transaction.getMachineId() == localMachineId) {
1835                                 // For locally created messages update the status
1836
1837                                 // Guard evaluated was false so create abort
1838                                 TransactionStatus status = transaction.getTransactionStatus();
1839                                 if (status != null) {
1840                                         status.setStatus(TransactionStatus.StatusAborted);
1841                                 }
1842                         } else {
1843                                 Set addAbortSet = new HashSet<Abort>();
1844
1845
1846                                 // Create the abort
1847                                 Abort newAbort = new Abort(null,
1848                                                            transaction.getClientLocalSequenceNumber(),
1849                                                            -1,
1850                                                            transaction.getMachineId(),
1851                                                            transaction.getArbitrator(),
1852                                                            localArbitrationSequenceNumber);
1853                                 localArbitrationSequenceNumber++;
1854
1855                                 addAbortSet.add(newAbort);
1856
1857
1858                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1859                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1860                                 pendingSendArbitrationRounds.add(arbitrationRound);
1861
1862                                 if (compactArbitrationData()) {
1863                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1864                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1865                                                 processEntry(commitPart);
1866                                         }
1867                                 }
1868                         }
1869
1870                         updateLiveStateFromLocal();
1871                         return new Pair<Boolean, Boolean>(true, false);
1872                 }
1873         }
1874
1875         /**
1876          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1877          */
1878         private boolean compactArbitrationData() {
1879
1880                 if (pendingSendArbitrationRounds.size() < 2) {
1881                         // Nothing to compact so do nothing
1882                         return false;
1883                 }
1884
1885                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1886                 if (lastRound.didSendPart()) {
1887                         return false;
1888                 }
1889
1890                 boolean hadCommit = (lastRound.getCommit() == null);
1891                 boolean gotNewCommit = false;
1892
1893                 int numberToDelete = 1;
1894                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1895                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1896
1897                         if (round.isFull() || round.didSendPart()) {
1898                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1899                                 break;
1900                         }
1901
1902                         if (round.getCommit() == null) {
1903
1904                                 // Try compacting aborts only
1905                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1906                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1907                                         // Cant compact since it would be too large
1908                                         break;
1909                                 }
1910                                 lastRound.addAborts(round.getAborts());
1911                         } else {
1912
1913                                 // Create a new larger commit
1914                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1915                                 localArbitrationSequenceNumber++;
1916
1917                                 // Create the commit parts so that we can count them
1918                                 newCommit.createCommitParts();
1919
1920                                 // Calculate the new size of the parts
1921                                 int newSize = newCommit.getNumberOfParts();
1922                                 newSize += lastRound.getAbortsCount();
1923                                 newSize += round.getAbortsCount();
1924
1925                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1926                                         // Cant compact since it would be too large
1927                                         break;
1928                                 }
1929
1930                                 // Set the new compacted part
1931                                 lastRound.setCommit(newCommit);
1932                                 lastRound.addAborts(round.getAborts());
1933                                 gotNewCommit = true;
1934                         }
1935
1936                         numberToDelete++;
1937                 }
1938
1939                 if (numberToDelete != 1) {
1940                         // If there is a compaction
1941
1942                         // Delete the previous pieces that are now in the new compacted piece
1943                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1944                                 pendingSendArbitrationRounds.clear();
1945                         } else {
1946                                 for (int i = 0; i < numberToDelete; i++) {
1947                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1948                                 }
1949                         }
1950
1951                         // Add the new compacted into the pending to send list
1952                         pendingSendArbitrationRounds.add(lastRound);
1953
1954                         // Should reinsert into the commit processor
1955                         if (hadCommit && gotNewCommit) {
1956                                 return true;
1957                         }
1958                 }
1959
1960                 return false;
1961         }
1962         // private boolean compactArbitrationData() {
1963         //      return false;
1964         // }
1965
1966         /**
1967          * Update all the commits and the committed tables, sets dead the dead transactions
1968          */
1969         private boolean updateCommittedTable() {
1970
1971                 if (newCommitParts.size() == 0) {
1972                         // Nothing new to process
1973                         return false;
1974                 }
1975
1976                 // Iterate through all the machine Ids that we received new parts for
1977                 for (Long machineId : newCommitParts.keySet()) {
1978                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
1979
1980                         // Iterate through all the parts for that machine Id
1981                         for (Pair<Long, Integer> partId : parts.keySet()) {
1982                                 CommitPart part = parts.get(partId);
1983
1984                                 // Get the transaction object for that sequence number
1985                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
1986
1987                                 if (commitForClientTable == null) {
1988                                         // This is the first commit from this device
1989                                         commitForClientTable = new HashMap<Long, Commit>();
1990                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
1991                                 }
1992
1993                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
1994
1995                                 if (commit == null) {
1996                                         // This is a new commit that we dont have so make a new one
1997                                         commit = new Commit();
1998
1999                                         // Insert this new commit into the live tables
2000                                         commitForClientTable.put(part.getSequenceNumber(), commit);
2001                                 }
2002
2003                                 // Add that part to the commit
2004                                 commit.addPartDecode(part);
2005                         }
2006                 }
2007
2008                 // Clear all the new commits parts in preparation for the next time the server sends slots
2009                 newCommitParts.clear();
2010
2011                 // If we process a new commit keep track of it for future use
2012                 boolean didProcessANewCommit = false;
2013
2014                 // Process the commits one by one
2015                 for (Long arbitratorId : liveCommitsTable.keySet()) {
2016
2017                         // Get all the commits for a specific arbitrator
2018                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2019
2020                         // Sort the commits in order
2021                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
2022                         Collections.sort(commitSequenceNumbers);
2023
2024                         // Get the last commit seen from this arbitrator
2025                         long lastCommitSeenSequenceNumber = -1;
2026                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
2027                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2028                         }
2029
2030                         // Go through each new commit one by one
2031                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2032                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2033                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
2034
2035                                 // Special processing if a commit is not complete
2036                                 if (!commit.isComplete()) {
2037                                         if (i == (commitSequenceNumbers.size() - 1)) {
2038                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2039                                                 break;
2040                                         } else {
2041                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2042                                                 // Delete it and move on
2043                                                 commit.setDead();
2044                                                 commitForClientTable.remove(commit.getSequenceNumber());
2045                                                 continue;
2046                                         }
2047                                 }
2048
2049                                 // Update the last transaction that was updated if we can
2050                                 if (commit.getTransactionSequenceNumber() != -1) {
2051                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2052
2053                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2054                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2055                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2056                                         }
2057                                 }
2058
2059
2060                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2061                                         System.out.println("Commit Seen: " + kv + "   " + commit.getTransactionSequenceNumber() + "   " + localMachineId);
2062                                 }
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073                                 // Update the last arbitration data that we have seen so far
2074                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
2075
2076                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2077                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2078                                                 // Is larger
2079                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2080                                         }
2081                                 } else {
2082                                         // Never seen any data from this arbitrator so record the first one
2083                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2084                                 }
2085
2086                                 // We have already seen this commit before so need to do the full processing on this commit
2087                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2088
2089                                         // Update the last transaction that was updated if we can
2090                                         if (commit.getTransactionSequenceNumber() != -1) {
2091                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2092
2093                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2094                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2095                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2096                                                 }
2097                                         }
2098
2099                                         continue;
2100                                 }
2101
2102                                 // If we got here then this is a brand new commit and needs full processing
2103
2104                                 // Get what commits should be edited, these are the commits that have live values for their keys
2105                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
2106                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2107                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2108                                 }
2109                                 commitsToEdit.remove(null); // remove null since it could be in this set
2110
2111                                 // Update each previous commit that needs to be updated
2112                                 for (Commit previousCommit : commitsToEdit) {
2113
2114                                         // Only bother with live commits (TODO: Maybe remove this check)
2115                                         if (previousCommit.isLive()) {
2116
2117                                                 // Update which keys in the old commits are still live
2118                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2119                                                         previousCommit.invalidateKey(kv.getKey());
2120                                                 }
2121
2122                                                 // if the commit is now dead then remove it
2123                                                 if (!previousCommit.isLive()) {
2124                                                         commitForClientTable.remove(previousCommit);
2125                                                 }
2126                                         }
2127                                 }
2128
2129                                 // Update the last seen sequence number from this arbitrator
2130                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
2131                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2132                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2133                                         }
2134                                 } else {
2135                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2136                                 }
2137
2138                                 // We processed a new commit that we havent seen before
2139                                 didProcessANewCommit = true;
2140
2141                                 // Update the committed table of keys and which commit is using which key
2142                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2143                                         committedKeyValueTable.put(kv.getKey(), kv);
2144                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
2145                                 }
2146                         }
2147                 }
2148
2149                 return didProcessANewCommit;
2150         }
2151
2152         /**
2153          * Create the speculative table from transactions that are still live and have come from the cloud
2154          */
2155         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
2156                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2157                         // There is nothing to speculate on
2158                         return false;
2159                 }
2160
2161                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2162                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
2163                 Collections.sort(transactionSequenceNumbersSorted);
2164
2165                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2166
2167
2168                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2169                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2170                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2171
2172                         // Start from scratch
2173                         speculatedKeyValueTable.clear();
2174                         lastTransactionSequenceNumberSpeculatedOn = -1;
2175                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2176
2177                 }
2178
2179                 // Remember the front of the transaction list
2180                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2181
2182                 // Find where to start arbitration from
2183                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2184
2185                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2186                         // Make sure we are not out of bounds
2187                         return false; // did not speculate
2188                 }
2189
2190                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2191                 boolean didSkip = true;
2192
2193                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2194                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2195                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2196
2197                         if (!transaction.isComplete()) {
2198                                 // If there is an incomplete transaction then there is nothing we can do
2199                                 // add this transactions arbitrator to the list of arbitrators we should ignore
2200                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2201                                 didSkip = true;
2202                                 continue;
2203                         }
2204
2205                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2206                                 continue;
2207                         }
2208
2209                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2210
2211                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
2212                                 // Guard evaluated to true so update the speculative table
2213                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2214                                         speculatedKeyValueTable.put(kv.getKey(), kv);
2215                                 }
2216                         }
2217                 }
2218
2219                 if (didSkip) {
2220                         // Since there was a skip we need to redo the speculation next time around
2221                         lastTransactionSequenceNumberSpeculatedOn = -1;
2222                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2223                 }
2224
2225                 // We did some speculation
2226                 return true;
2227         }
2228
2229         /**
2230          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2231          */
2232         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
2233                 if (pendingTransactionQueue.size() == 0) {
2234                         // There is nothing to speculate on
2235                         return;
2236                 }
2237
2238
2239                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2240                         // need to reset on the pending speculation
2241                         lastPendingTransactionSpeculatedOn = null;
2242                         firstPendingTransaction = pendingTransactionQueue.get(0);
2243                         pendingTransactionSpeculatedKeyValueTable.clear();
2244                 }
2245
2246                 // Find where to start arbitration from
2247                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2248
2249                 if (startIndex >= pendingTransactionQueue.size()) {
2250                         // Make sure we are not out of bounds
2251                         return;
2252                 }
2253
2254                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2255                         Transaction transaction = pendingTransactionQueue.get(i);
2256
2257                         lastPendingTransactionSpeculatedOn = transaction;
2258
2259                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2260                                 // Guard evaluated to true so update the speculative table
2261                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2262                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2263                                 }
2264                         }
2265                 }
2266         }
2267
2268         /**
2269          * Set dead and remove from the live transaction tables the transactions that are dead
2270          */
2271         private void updateLiveTransactionsAndStatus() {
2272
2273                 // Go through each of the transactions
2274                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2275                         Transaction transaction = iter.next().getValue();
2276
2277                         // Check if the transaction is dead
2278                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2279                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2280
2281                                 // Set dead the transaction
2282                                 transaction.setDead();
2283
2284                                 // Remove the transaction from the live table
2285                                 iter.remove();
2286                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2287                         }
2288                 }
2289
2290                 // Go through each of the transactions
2291                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2292                         TransactionStatus status = iter.next().getValue();
2293
2294                         // Check if the transaction is dead
2295                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2296                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2297
2298                                 // Set committed
2299                                 status.setStatus(TransactionStatus.StatusCommitted);
2300
2301                                 // Remove
2302                                 iter.remove();
2303                         }
2304                 }
2305         }
2306
2307         /**
2308          * Process this slot, entry by entry.  Also update the latest message sent by slot
2309          */
2310         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2311
2312                 // Update the last message seen
2313                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2314
2315                 // Process each entry in the slot
2316                 for (Entry entry : slot.getEntries()) {
2317                         switch (entry.getType()) {
2318
2319                         case Entry.TypeCommitPart:
2320                                 processEntry((CommitPart)entry);
2321                                 break;
2322
2323                         case Entry.TypeAbort:
2324                                 processEntry((Abort)entry);
2325                                 break;
2326
2327                         case Entry.TypeTransactionPart:
2328                                 processEntry((TransactionPart)entry);
2329                                 break;
2330
2331                         case Entry.TypeNewKey:
2332                                 processEntry((NewKey)entry);
2333                                 break;
2334
2335                         case Entry.TypeLastMessage:
2336                                 processEntry((LastMessage)entry, machineSet);
2337                                 break;
2338
2339                         case Entry.TypeRejectedMessage:
2340                                 processEntry((RejectedMessage)entry, indexer);
2341                                 break;
2342
2343                         case Entry.TypeTableStatus:
2344                                 processEntry((TableStatus)entry, slot.getSequenceNumber());
2345                                 break;
2346
2347                         default:
2348                                 throw new Error("Unrecognized type: " + entry.getType());
2349                         }
2350                 }
2351         }
2352
2353         /**
2354          * Update the last message that was sent for a machine Id
2355          */
2356         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2357                 // Update what the last message received by a machine was
2358                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2359         }
2360
2361         /**
2362          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2363          */
2364         private void processEntry(NewKey entry) {
2365
2366                 // Update the arbitrator table with the new key information
2367                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2368
2369                 // Update what the latest live new key is
2370                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2371                 if (oldNewKey != null) {
2372                         // Delete the old new key messages
2373                         oldNewKey.setDead();
2374                 }
2375         }
2376
2377         /**
2378          * Process new table status entries and set dead the old ones as new ones come in.
2379          * keeps track of the largest and smallest table status seen in this current round
2380          * of updating the local copy of the block chain
2381          */
2382         private void processEntry(TableStatus entry, long seq) {
2383                 int newNumSlots = entry.getMaxSlots();
2384                 updateCurrMaxSize(newNumSlots);
2385
2386                 initExpectedSize(seq, newNumSlots);
2387
2388                 if (liveTableStatus != null) {
2389                         // We have a larger table status so the old table status is no longer alive
2390                         liveTableStatus.setDead();
2391                 }
2392
2393                 // Make this new table status the latest alive table status
2394                 liveTableStatus = entry;
2395         }
2396
2397         /**
2398          * Check old messages to see if there is a block chain violation. Also
2399          */
2400         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2401                 long oldSeqNum = entry.getOldSeqNum();
2402                 long newSeqNum = entry.getNewSeqNum();
2403                 boolean isequal = entry.getEqual();
2404                 long machineId = entry.getMachineID();
2405                 long seq = entry.getSequenceNumber();
2406
2407
2408                 // Check if we have messages that were supposed to be rejected in our local block chain
2409                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2410
2411                         // Get the slot
2412                         Slot slot = indexer.getSlot(seqNum);
2413
2414                         if (slot != null) {
2415                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2416
2417                                 long slotMachineId = slot.getMachineID();
2418                                 if (isequal != (slotMachineId == machineId)) {
2419                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2420                                 }
2421                         }
2422                 }
2423
2424
2425                 // Create a list of clients to watch until they see this rejected message entry.
2426                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2427                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2428
2429                         // Machine ID for the last message entry
2430                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2431
2432                         // We've seen it, don't need to continue to watch.  Our next
2433                         // message will implicitly acknowledge it.
2434                         if (lastMessageEntryMachineId == localMachineId) {
2435                                 continue;
2436                         }
2437
2438                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2439                         long entrySequenceNumber = lastMessageValue.getFirst();
2440
2441                         if (entrySequenceNumber < seq) {
2442
2443                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2444                                 addWatchList(lastMessageEntryMachineId, entry);
2445
2446                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2447                                 deviceWatchSet.add(lastMessageEntryMachineId);
2448                         }
2449                 }
2450
2451                 if (deviceWatchSet.isEmpty()) {
2452                         // This rejected message has been seen by all the clients so
2453                         entry.setDead();
2454                 } else {
2455                         // We need to watch this rejected message
2456                         entry.setWatchSet(deviceWatchSet);
2457                 }
2458         }
2459
2460         /**
2461          * Check if this abort is live, if not then save it so we can kill it later.
2462          * update the last transaction number that was arbitrated on.
2463          */
2464         private void processEntry(Abort entry) {
2465
2466
2467                 if (entry.getTransactionSequenceNumber() != -1) {
2468                         // update the transaction status if it was sent to the server
2469                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2470                         if (status != null) {
2471                                 status.setStatus(TransactionStatus.StatusAborted);
2472                         }
2473                 }
2474
2475                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2476                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2477                 if (previouslySeenAbort != null) {
2478                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2479                 }
2480
2481                 if (entry.getTransactionArbitrator() == localMachineId) {
2482                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2483                 }
2484
2485                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2486
2487                         // The machine already saw this so it is dead
2488                         entry.setDead();
2489                         liveAbortTable.remove(entry.getAbortId());
2490
2491                         if (entry.getTransactionArbitrator() == localMachineId) {
2492                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2493                         }
2494
2495                         return;
2496                 }
2497
2498
2499
2500
2501                 // Update the last arbitration data that we have seen so far
2502                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2503
2504                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2505                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2506                                 // Is larger
2507                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2508                         }
2509                 } else {
2510                         // Never seen any data from this arbitrator so record the first one
2511                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2512                 }
2513
2514
2515                 // Set dead a transaction if we can
2516                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2517                 if (transactionToSetDead != null) {
2518                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2519                 }
2520
2521                 // Update the last transaction sequence number that the arbitrator arbitrated on
2522                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2523                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2524
2525                         // Is a valid one
2526                         if (entry.getTransactionSequenceNumber() != -1) {
2527                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2528                         }
2529                 }
2530         }
2531
2532         /**
2533          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2534          */
2535         private void processEntry(TransactionPart entry) {
2536                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2537                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2538                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2539                         // This transaction is dead, it was already committed or aborted
2540                         entry.setDead();
2541                         return;
2542                 }
2543
2544                 // This part is still alive
2545                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2546
2547                 if (transactionPart == null) {
2548                         // Dont have a table for this machine Id yet so make one
2549                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2550                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2551                 }
2552
2553                 // Update the part and set dead ones we have already seen (got a rescued version)
2554                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2555                 if (previouslySeenPart != null) {
2556                         previouslySeenPart.setDead();
2557                 }
2558         }
2559
2560         /**
2561          * Process new commit entries and save them for future use.  Delete duplicates
2562          */
2563         private void processEntry(CommitPart entry) {
2564
2565
2566                 // Update the last transaction that was updated if we can
2567                 if (entry.getTransactionSequenceNumber() != -1) {
2568                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2569
2570                         // Update the last transaction sequence number that the arbitrator arbitrated on
2571                         if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2572                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2573                         }
2574                 }
2575
2576
2577
2578
2579                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2580
2581                 if (commitPart == null) {
2582                         // Don't have a table for this machine Id yet so make one
2583                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2584                         newCommitParts.put(entry.getMachineId(), commitPart);
2585                 }
2586
2587                 // Update the part and set dead ones we have already seen (got a rescued version)
2588                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2589                 if (previouslySeenPart != null) {
2590                         previouslySeenPart.setDead();
2591                 }
2592         }
2593
2594         /**
2595          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2596          * Updates the live aborts, removes those that are dead and sets them dead.
2597          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2598          * other clients have not had a rollback on the last message.
2599          */
2600         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2601
2602                 // We have seen this machine ID
2603                 machineSet.remove(machineId);
2604
2605                 // Get the set of rejected messages that this machine Id is has not seen yet
2606                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2607
2608                 // If there is a rejected message that this machine Id has not seen yet
2609                 if (watchset != null) {
2610
2611                         // Go through each rejected message that this machine Id has not seen yet
2612                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2613
2614                                 RejectedMessage rm = rmit.next();
2615
2616                                 // If this machine Id has seen this rejected message...
2617                                 if (rm.getSequenceNumber() <= seqNum) {
2618
2619                                         // Remove it from our watchlist
2620                                         rmit.remove();
2621
2622                                         // Decrement machines that need to see this notification
2623                                         rm.removeWatcher(machineId);
2624                                 }
2625                         }
2626                 }
2627
2628                 // Set dead the abort
2629                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2630                         Abort abort = i.next().getValue();
2631
2632                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2633                                 abort.setDead();
2634                                 i.remove();
2635
2636                                 if (abort.getTransactionArbitrator() == localMachineId) {
2637                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2638                                 }
2639                         }
2640                 }
2641
2642
2643
2644                 if (machineId == localMachineId) {
2645                         // Our own messages are immediately dead.
2646                         if (liveness instanceof LastMessage) {
2647                                 ((LastMessage)liveness).setDead();
2648                         } else if (liveness instanceof Slot) {
2649                                 ((Slot)liveness).setDead();
2650                         } else {
2651                                 throw new Error("Unrecognized type");
2652                         }
2653                 }
2654
2655                 // Get the old last message for this device
2656                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2657                 if (lastMessageEntry == null) {
2658                         // If no last message then there is nothing else to process
2659                         return;
2660                 }
2661
2662                 long lastMessageSeqNum = lastMessageEntry.getFirst();
2663                 Liveness lastEntry = lastMessageEntry.getSecond();
2664
2665                 // If it is not our machine Id since we already set ours to dead
2666                 if (machineId != localMachineId) {
2667                         if (lastEntry instanceof LastMessage) {
2668                                 ((LastMessage)lastEntry).setDead();
2669                         } else if (lastEntry instanceof Slot) {
2670                                 ((Slot)lastEntry).setDead();
2671                         } else {
2672                                 throw new Error("Unrecognized type");
2673                         }
2674                 }
2675
2676                 // Make sure the server is not playing any games
2677                 if (machineId == localMachineId) {
2678
2679                         if (hadPartialSendToServer) {
2680                                 // We were not making any updates and we had a machine mismatch
2681                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2682                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2683                                 }
2684
2685                         } else {
2686                                 // We were not making any updates and we had a machine mismatch
2687                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2688                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2689                                 }
2690                         }
2691                 } else {
2692                         if (lastMessageSeqNum > seqNum) {
2693                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2694                         }
2695                 }
2696         }
2697
2698         /**
2699          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2700          * rejected message entry and which have not.
2701          */
2702         private void addWatchList(long machineId, RejectedMessage entry) {
2703                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2704                 if (entries == null) {
2705                         // There is no set for this machine ID yet so create one
2706                         entries = new HashSet<RejectedMessage>();
2707                         rejectedMessageWatchListTable.put(machineId, entries);
2708                 }
2709                 entries.add(entry);
2710         }
2711
2712         /**
2713          * Check if the HMAC chain is not violated
2714          */
2715         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2716                 for (int i = 0; i < newSlots.length; i++) {
2717                         Slot currSlot = newSlots[i];
2718                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2719                         if (prevSlot != null &&
2720                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2721                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2722                 }
2723         }
2724 }