edits
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15
16 /**
17  * IoTTable data structure.  Provides client interface.
18  * @author Brian Demsky
19  * @version 1.0
20  */
21
22 final public class Table {
23
24         /* Constants */
25         static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
26         static final int SKIP_THRESHOLD = 10;
27         static final double RESIZE_MULTIPLE = 1.2;
28         static final double RESIZE_THRESHOLD = 0.75;
29         static final int REJECTED_THRESHOLD = 5;
30
31         /* Helper Objects */
32         private SlotBuffer buffer = null;
33         private CloudComm cloud = null;
34         private Random random = null;
35         private TableStatus liveTableStatus = null;
36         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
37         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
38         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
39
40         /* Variables */
41         private int numberOfSlots = 0;  // Number of slots stored in buffer
42         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
43         private long liveSlotCount = 0; // Number of currently live slots
44         private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
45         private long localMachineId = 0; // Machine ID of this client device
46         private long sequenceNumber = 0; // Largest sequence number a client has received
47         private long localSequenceNumber = 0;
48
49         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
50         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
51         private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
52         private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
53         private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
54         private long localArbitrationSequenceNumber = 0;
55         private boolean hadPartialSendToServer = false;
56         private boolean attemptedToSendToServer = false;
57         private long expectedsize;
58         private boolean didFindTableStatus = false;
59         private long currMaxSize = 0;
60
61         private Slot lastSlotAttemptedToSend = null;
62         private boolean lastIsNewKey = false;
63         private int lastNewSize = 0;
64         private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
65         private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
66         private NewKey lastNewKey = null;
67
68
69         /* Data Structures  */
70         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
71         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
72         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
73         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
74         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
75         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
76         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
77         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
78         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
79         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
80         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
81         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
82         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
83         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
84         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
85         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
86         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
87         private List<Transaction> pendingTransactionQueue = null;
88         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
89         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
90         private Map<Transaction, List<Integer>> transactionPartsSent = null;
91         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
92         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
93         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
94         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
95         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
96         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
97
98
99         public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
100                 localMachineId = _localMachineId;
101                 cloud = new CloudComm(this, baseurl, password, listeningPort);
102
103                 init();
104         }
105
106         public Table(CloudComm _cloud, long _localMachineId) {
107                 localMachineId = _localMachineId;
108                 cloud = _cloud;
109
110                 init();
111         }
112
113         /**
114          * Init all the stuff needed for for table usage
115          */
116         private void init() {
117
118                 // Init helper objects
119                 random = new Random();
120                 buffer = new SlotBuffer();
121
122                 // Set Variables
123                 oldestLiveSlotSequenceNumver = 1;
124
125                 // init data structs
126                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
127                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
128                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
129                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
130                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
131                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
132                 arbitratorTable = new HashMap<IoTString, Long>();
133                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
134                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
135                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
136                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
137                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
138                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
139                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
140                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
141                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
142                 rejectedSlotList = new Vector<Long>();
143                 pendingTransactionQueue = new ArrayList<Transaction>();
144                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
145                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
146                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
147                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
148                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
149                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
150                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
151                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
152                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
153
154
155                 // Other init stuff
156                 numberOfSlots = buffer.capacity();
157                 setResizeThreshold();
158         }
159
160         // TODO: delete method
161         public synchronized void printSlots() {
162                 long o = buffer.getOldestSeqNum();
163                 long n = buffer.getNewestSeqNum();
164
165                 int[] types = new int[10];
166
167                 int num = 0;
168
169                 int livec = 0;
170                 int deadc = 0;
171
172                 int casdasd = 0;
173
174                 int liveslo = 0;
175
176                 for (long i = o; i < (n + 1); i++) {
177                         Slot s = buffer.getSlot(i);
178
179
180                         if (s.isLive()) {
181                                 liveslo++;
182                         }
183
184                         Vector<Entry> entries = s.getEntries();
185
186                         for (Entry e : entries) {
187                                 if (e.isLive()) {
188                                         int type = e.getType();
189
190
191                                         if (type == 6) {
192                                                 RejectedMessage rej = (RejectedMessage)e;
193                                                 casdasd++;
194
195                                                 System.out.println(rej.getMachineID());
196                                         }
197
198
199                                         types[type] = types[type] + 1;
200                                         num++;
201                                         livec++;
202                                 } else {
203                                         deadc++;
204                                 }
205                         }
206                 }
207
208                 for (int i = 0; i < 10; i++) {
209                         System.out.println(i + "    " + types[i]);
210                 }
211                 System.out.println("Live count:   " + livec);
212                 System.out.println("Live Slot count:   " + liveslo);
213
214                 System.out.println("Dead count:   " + deadc);
215                 System.out.println("Old:   " + o);
216                 System.out.println("New:   " + n);
217                 System.out.println("Size:   " + buffer.size());
218                 // System.out.println("Commits:   " + liveCommitsTable.size());
219                 System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
220                 System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
221
222                 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
223                         System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
224                 }
225
226
227                 for (Long a : liveCommitsTable.keySet()) {
228                         for (Long b : liveCommitsTable.get(a).keySet()) {
229                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
230                                         System.out.print(kv + " ");
231                                 }
232                                 System.out.print("|| ");
233                         }
234                         System.out.println();
235                 }
236
237         }
238
239         /**
240          * Initialize the table by inserting a table status as the first entry into the table status
241          * also initialize the crypto stuff.
242          */
243         public synchronized void initTable() throws ServerException {
244                 cloud.initSecurity();
245
246                 // Create the first insertion into the block chain which is the table status
247                 Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
248                 localSequenceNumber++;
249                 TableStatus status = new TableStatus(s, numberOfSlots);
250                 s.addEntry(status);
251                 Slot[] array = cloud.putSlot(s, numberOfSlots);
252
253                 if (array == null) {
254                         array = new Slot[] {s};
255                         // update local block chain
256                         validateAndUpdate(array, true);
257                 } else if (array.length == 1) {
258                         // in case we did push the slot BUT we failed to init it
259                         validateAndUpdate(array, true);
260                 } else {
261                         throw new Error("Error on initialization");
262                 }
263         }
264
265         /**
266          * Rebuild the table from scratch by pulling the latest block chain from the server.
267          */
268         public synchronized void rebuild() throws ServerException {
269                 // Just pull the latest slots from the server
270                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
271                 validateAndUpdate(newslots, true);
272                 sendToServer(null);
273                 updateLiveTransactionsAndStatus();
274
275         }
276
277         // public String toString() {
278         //      String retString = " Committed Table: \n";
279         //      retString += "---------------------------\n";
280         //      retString += commitedTable.toString();
281
282         //      retString += "\n\n";
283
284         //      retString += " Speculative Table: \n";
285         //      retString += "---------------------------\n";
286         //      retString += speculativeTable.toString();
287
288         //      return retString;
289         // }
290
291         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
292                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
293         }
294
295         public synchronized Long getArbitrator(IoTString key) {
296                 return arbitratorTable.get(key);
297         }
298
299         public synchronized void close() {
300                 cloud.close();
301         }
302
303         public synchronized IoTString getCommitted(IoTString key)  {
304                 KeyValue kv = committedKeyValueTable.get(key);
305
306                 if (kv != null) {
307                         return kv.getValue();
308                 } else {
309                         return null;
310                 }
311         }
312
313         public synchronized IoTString getSpeculative(IoTString key) {
314                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
315
316                 if (kv == null) {
317                         kv = speculatedKeyValueTable.get(key);
318                 }
319
320                 if (kv == null) {
321                         kv = committedKeyValueTable.get(key);
322                 }
323
324                 if (kv != null) {
325                         return kv.getValue();
326                 } else {
327                         return null;
328                 }
329         }
330
331         public synchronized IoTString getCommittedAtomic(IoTString key) {
332                 KeyValue kv = committedKeyValueTable.get(key);
333
334                 if (arbitratorTable.get(key) == null) {
335                         throw new Error("Key not Found.");
336                 }
337
338                 // Make sure new key value pair matches the current arbitrator
339                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
340                         // TODO: Maybe not throw en error
341                         throw new Error("Not all Key Values Match Arbitrator.");
342                 }
343
344                 if (kv != null) {
345                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
346                         return kv.getValue();
347                 } else {
348                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
349                         return null;
350                 }
351         }
352
353         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
354                 if (arbitratorTable.get(key) == null) {
355                         throw new Error("Key not Found.");
356                 }
357
358                 // Make sure new key value pair matches the current arbitrator
359                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
360                         // TODO: Maybe not throw en error
361                         throw new Error("Not all Key Values Match Arbitrator.");
362                 }
363
364                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
365
366                 if (kv == null) {
367                         kv = speculatedKeyValueTable.get(key);
368                 }
369
370                 if (kv == null) {
371                         kv = committedKeyValueTable.get(key);
372                 }
373
374                 if (kv != null) {
375                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
376                         return kv.getValue();
377                 } else {
378                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
379                         return null;
380                 }
381         }
382
383         public synchronized boolean update()  {
384                 try {
385                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
386                         validateAndUpdate(newSlots, false);
387                         sendToServer(null);
388
389
390                         updateLiveTransactionsAndStatus();
391
392                         return true;
393                 } catch (Exception e) {
394                         // e.printStackTrace();
395
396                         for (Long m : localCommunicationTable.keySet()) {
397                                 updateFromLocal(m);
398                         }
399                 }
400
401                 return false;
402         }
403
404         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
405                 while (true) {
406                         if (arbitratorTable.get(keyName) != null) {
407                                 // There is already an arbitrator
408                                 return false;
409                         }
410
411                         NewKey newKey = new NewKey(null, keyName, machineId);
412
413                         if (sendToServer(newKey)) {
414                                 // If successfully inserted
415                                 return true;
416                         }
417                 }
418         }
419
420         public synchronized void startTransaction() {
421                 // Create a new transaction, invalidates any old pending transactions.
422                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
423         }
424
425         public synchronized void addKV(IoTString key, IoTString value) {
426
427                 // Make sure it is a valid key
428                 if (arbitratorTable.get(key) == null) {
429                         throw new Error("Key not Found.");
430                 }
431
432                 // Make sure new key value pair matches the current arbitrator
433                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
434                         // TODO: Maybe not throw en error
435                         throw new Error("Not all Key Values Match Arbitrator.");
436                 }
437
438                 // Add the key value to this transaction
439                 KeyValue kv = new KeyValue(key, value);
440                 pendingTransactionBuilder.addKV(kv);
441         }
442
443         public synchronized TransactionStatus commitTransaction() {
444
445                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
446                         // transaction with no updates will have no effect on the system
447                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
448                 }
449
450                 // Set the local transaction sequence number and increment
451                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
452                 localTransactionSequenceNumber++;
453
454                 // Create the transaction status
455                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
456
457                 // Create the new transaction
458                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
459                 newTransaction.setTransactionStatus(transactionStatus);
460
461                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
462                         // Add it to the queue and invalidate the builder for safety
463                         pendingTransactionQueue.add(newTransaction);
464                 } else {
465                         arbitrateOnLocalTransaction(newTransaction);
466                         updateLiveStateFromLocal();
467                 }
468
469                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
470
471                 try {
472                         sendToServer(null);
473                 } catch (ServerException e) {
474
475                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
476                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
477                                 Transaction transaction = iter.next();
478
479                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
480                                         // Already contacted this client so ignore all attempts to contact this client
481                                         // to preserve ordering for arbitrator
482                                         continue;
483                                 }
484
485                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
486
487                                 if (sendReturn.getFirst()) {
488                                         // Failed to contact over local
489                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
490                                 } else {
491                                         // Successful contact or should not contact
492
493                                         if (sendReturn.getSecond()) {
494                                                 // did arbitrate
495                                                 iter.remove();
496                                         }
497                                 }
498                         }
499                 }
500
501                 updateLiveStateFromLocal();
502
503                 return transactionStatus;
504         }
505
506         /**
507          * Get the machine ID for this client
508          */
509         public long getMachineId() {
510                 return localMachineId;
511         }
512
513         /**
514          * Decrement the number of live slots that we currently have
515          */
516         public void decrementLiveCount() {
517                 liveSlotCount--;
518         }
519
520         /**
521          * Recalculate the new resize threshold
522          */
523         private void setResizeThreshold() {
524                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
525                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
526         }
527
528         public long getLocalSequenceNumber() {
529                 return localSequenceNumber;
530         }
531
532
533         boolean lastInsertedNewKey = false;
534
535         private boolean sendToServer(NewKey newKey) throws ServerException {
536
537                 boolean fromRetry = false;
538
539                 try {
540                         if (hadPartialSendToServer) {
541                                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
542                                 if (newSlots.length == 0) {
543                                         fromRetry = true;
544                                         ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
545
546                                         if (sendSlotsReturn.getFirst()) {
547                                                 if (newKey != null) {
548                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
549                                                                 newKey = null;
550                                                         }
551                                                 }
552
553                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
554                                                         transaction.resetServerFailure();
555
556                                                         // Update which transactions parts still need to be sent
557                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
558
559                                                         // Add the transaction status to the outstanding list
560                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
561
562                                                         // Update the transaction status
563                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
564
565                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
566                                                         if (transaction.didSendAllParts()) {
567                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
568                                                                 pendingTransactionQueue.remove(transaction);
569                                                         }
570                                                 }
571                                         } else {
572
573                                                 newSlots = sendSlotsReturn.getThird();
574
575                                                 boolean isInserted = false;
576                                                 for (Slot s : newSlots) {
577                                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
578                                                                 isInserted = true;
579                                                                 break;
580                                                         }
581                                                 }
582
583                                                 for (Slot s : newSlots) {
584                                                         if (isInserted) {
585                                                                 break;
586                                                         }
587
588                                                         // Process each entry in the slot
589                                                         for (Entry entry : s.getEntries()) {
590
591                                                                 if (entry.getType() == Entry.TypeLastMessage) {
592                                                                         LastMessage lastMessage = (LastMessage)entry;
593                                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
594                                                                                 isInserted = true;
595                                                                                 break;
596                                                                         }
597                                                                 }
598                                                         }
599                                                 }
600
601                                                 if (isInserted) {
602                                                         if (newKey != null) {
603                                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
604                                                                         newKey = null;
605                                                                 }
606                                                         }
607
608                                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
609                                                                 transaction.resetServerFailure();
610
611                                                                 // Update which transactions parts still need to be sent
612                                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
613
614                                                                 // Add the transaction status to the outstanding list
615                                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
616
617                                                                 // Update the transaction status
618                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
619
620                                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
621                                                                 if (transaction.didSendAllParts()) {
622                                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
623                                                                         pendingTransactionQueue.remove(transaction);
624                                                                 } else {
625                                                                         transaction.resetServerFailure();
626                                                                         // Set the transaction sequence number back to nothing
627                                                                         if (!transaction.didSendAPartToServer()) {
628                                                                                 transaction.setSequenceNumber(-1);
629                                                                         }
630                                                                 }
631                                                         }
632                                                 }
633                                         }
634
635                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
636                                                 transaction.resetServerFailure();
637                                                 // Set the transaction sequence number back to nothing
638                                                 if (!transaction.didSendAPartToServer()) {
639                                                         transaction.setSequenceNumber(-1);
640                                                 }
641                                         }
642
643                                         if (sendSlotsReturn.getThird().length != 0) {
644                                                 // insert into the local block chain
645                                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
646                                         }
647                                         // continue;
648                                 } else {
649                                         boolean isInserted = false;
650                                         for (Slot s : newSlots) {
651                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
652                                                         isInserted = true;
653                                                         break;
654                                                 }
655                                         }
656
657                                         for (Slot s : newSlots) {
658                                                 if (isInserted) {
659                                                         break;
660                                                 }
661
662                                                 // Process each entry in the slot
663                                                 for (Entry entry : s.getEntries()) {
664
665                                                         if (entry.getType() == Entry.TypeLastMessage) {
666                                                                 LastMessage lastMessage = (LastMessage)entry;
667                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
668                                                                         isInserted = true;
669                                                                         break;
670                                                                 }
671                                                         }
672                                                 }
673                                         }
674
675                                         if (isInserted) {
676                                                 if (newKey != null) {
677                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
678                                                                 newKey = null;
679                                                         }
680                                                 }
681
682                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
683                                                         transaction.resetServerFailure();
684
685                                                         // Update which transactions parts still need to be sent
686                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
687
688                                                         // Add the transaction status to the outstanding list
689                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
690
691                                                         // Update the transaction status
692                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
693
694                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
695                                                         if (transaction.didSendAllParts()) {
696                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
697                                                                 pendingTransactionQueue.remove(transaction);
698                                                         } else {
699                                                                 transaction.resetServerFailure();
700                                                                 // Set the transaction sequence number back to nothing
701                                                                 if (!transaction.didSendAPartToServer()) {
702                                                                         transaction.setSequenceNumber(-1);
703                                                                 }
704                                                         }
705                                                 }
706                                         } else {
707                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
708                                                         transaction.resetServerFailure();
709                                                         // Set the transaction sequence number back to nothing
710                                                         if (!transaction.didSendAPartToServer()) {
711                                                                 transaction.setSequenceNumber(-1);
712                                                         }
713                                                 }
714                                         }
715
716                                         // insert into the local block chain
717                                         validateAndUpdate(newSlots, true);
718                                 }
719                         }
720                 } catch (ServerException e) {
721                         throw e;
722                 }
723
724
725
726                 try {
727                         // While we have stuff that needs inserting into the block chain
728                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
729
730                                 fromRetry = false;
731
732                                 if (hadPartialSendToServer) {
733                                         throw new Error("Should Be error free");
734                                 }
735
736
737
738                                 // If there is a new key with same name then end
739                                 if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
740                                         return false;
741                                 }
742
743                                 // Create the slot
744                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
745                                 localSequenceNumber++;
746
747                                 // Try to fill the slot with data
748                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
749                                 boolean needsResize = fillSlotsReturn.getFirst();
750                                 int newSize = fillSlotsReturn.getSecond();
751                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
752
753                                 if (needsResize) {
754                                         // Reset which transaction to send
755                                         for (Transaction transaction : transactionPartsSent.keySet()) {
756                                                 transaction.resetNextPartToSend();
757
758                                                 // Set the transaction sequence number back to nothing
759                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
760                                                         transaction.setSequenceNumber(-1);
761                                                 }
762                                         }
763
764                                         // Clear the sent data since we are trying again
765                                         pendingSendArbitrationEntriesToDelete.clear();
766                                         transactionPartsSent.clear();
767
768                                         // We needed a resize so try again
769                                         fillSlot(slot, true, newKey);
770                                 }
771
772                                 lastSlotAttemptedToSend = slot;
773                                 lastIsNewKey = (newKey != null);
774                                 lastInsertedNewKey = insertedNewKey;
775                                 lastNewSize = newSize;
776                                 lastNewKey = newKey;
777                                 lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
778                                 lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
779
780
781                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
782
783                                 if (sendSlotsReturn.getFirst()) {
784
785                                         // Did insert into the block chain
786
787                                         if (insertedNewKey) {
788                                                 // This slot was what was inserted not a previous slot
789
790                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
791                                                 newKey = null;
792                                         }
793
794                                         // Remove the aborts and commit parts that were sent from the pending to send queue
795                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
796                                                 ArbitrationRound round = iter.next();
797                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
798
799                                                 if (round.isDoneSending()) {
800                                                         // Sent all the parts
801                                                         iter.remove();
802                                                 }
803                                         }
804
805                                         for (Transaction transaction : transactionPartsSent.keySet()) {
806                                                 transaction.resetServerFailure();
807
808                                                 // Update which transactions parts still need to be sent
809                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
810
811                                                 // Add the transaction status to the outstanding list
812                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
813
814                                                 // Update the transaction status
815                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
816
817                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
818                                                 if (transaction.didSendAllParts()) {
819                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
820                                                         pendingTransactionQueue.remove(transaction);
821                                                 }
822                                         }
823                                 } else {
824
825                                         // if (!sendSlotsReturn.getSecond()) {
826                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
827                                         //              transaction.resetServerFailure();
828                                         //      }
829                                         // } else {
830                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
831                                         //              transaction.resetServerFailure();
832
833                                         //              // Update which transactions parts still need to be sent
834                                         //              transaction.removeSentParts(transactionPartsSent.get(transaction));
835
836                                         //              // Add the transaction status to the outstanding list
837                                         //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
838
839                                         //              // Update the transaction status
840                                         //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
841
842                                         //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
843                                         //              if (transaction.didSendAllParts()) {
844                                         //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
845                                         //                      pendingTransactionQueue.remove(transaction);
846
847                                         //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
848                                         //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
849                                         //                      }
850                                         //              }
851                                         //      }
852                                         // }
853
854                                         // Reset which transaction to send
855                                         for (Transaction transaction : transactionPartsSent.keySet()) {
856                                                 transaction.resetNextPartToSend();
857                                                 // transaction.resetNextPartToSend();
858
859                                                 // Set the transaction sequence number back to nothing
860                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
861                                                         transaction.setSequenceNumber(-1);
862                                                 }
863                                         }
864                                 }
865
866                                 // Clear the sent data in preparation for next send
867                                 pendingSendArbitrationEntriesToDelete.clear();
868                                 transactionPartsSent.clear();
869
870                                 if (sendSlotsReturn.getThird().length != 0) {
871                                         // insert into the local block chain
872                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
873                                 }
874                         }
875
876                 } catch (ServerException e) {
877
878                         if (e.getType() != ServerException.TypeInputTimeout) {
879                                 // e.printStackTrace();
880
881                                 // Nothing was able to be sent to the server so just clear these data structures
882                                 for (Transaction transaction : transactionPartsSent.keySet()) {
883                                         transaction.resetNextPartToSend();
884
885                                         // Set the transaction sequence number back to nothing
886                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
887                                                 transaction.setSequenceNumber(-1);
888                                         }
889                                 }
890                         } else {
891                                 // There was a partial send to the server
892                                 hadPartialSendToServer = true;
893
894
895                                 // if (!fromRetry) {
896                                 //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
897                                 //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
898                                 // }
899
900                                 // Nothing was able to be sent to the server so just clear these data structures
901                                 for (Transaction transaction : transactionPartsSent.keySet()) {
902                                         transaction.resetNextPartToSend();
903                                         transaction.setServerFailure();
904                                 }
905                         }
906
907                         pendingSendArbitrationEntriesToDelete.clear();
908                         transactionPartsSent.clear();
909
910                         throw e;
911                 }
912
913                 return newKey == null;
914         }
915
916         private synchronized boolean updateFromLocal(long machineId) {
917                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
918                 if (localCommunicationInformation == null) {
919                         // Cant talk to that device locally so do nothing
920                         return false;
921                 }
922
923                 // Get the size of the send data
924                 int sendDataSize = Integer.BYTES + Long.BYTES;
925
926                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
927                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
928                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
929                 }
930
931                 byte[] sendData = new byte[sendDataSize];
932                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
933
934                 // Encode the data
935                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
936                 bbEncode.putInt(0);
937
938                 // Send by local
939                 byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
940                 localSequenceNumber++;
941
942                 if (returnData == null) {
943                         // Could not contact server
944                         return false;
945                 }
946
947                 // Decode the data
948                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
949                 int numberOfEntries = bbDecode.getInt();
950
951                 for (int i = 0; i < numberOfEntries; i++) {
952                         byte type = bbDecode.get();
953                         if (type == Entry.TypeAbort) {
954                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
955                                 processEntry(abort);
956                         } else if (type == Entry.TypeCommitPart) {
957                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
958                                 processEntry(commitPart);
959                         }
960                 }
961
962                 updateLiveStateFromLocal();
963
964                 return true;
965         }
966
967         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
968
969                 // Get the devices local communications
970                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
971
972                 if (localCommunicationInformation == null) {
973                         // Cant talk to that device locally so do nothing
974                         return new Pair<Boolean, Boolean>(true, false);
975                 }
976
977                 // Get the size of the send data
978                 int sendDataSize = Integer.BYTES + Long.BYTES;
979                 for (TransactionPart part : transaction.getParts().values()) {
980                         sendDataSize += part.getSize();
981                 }
982
983                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
984                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
985                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
986                 }
987
988                 // Make the send data size
989                 byte[] sendData = new byte[sendDataSize];
990                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
991
992                 // Encode the data
993                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
994                 bbEncode.putInt(transaction.getParts().size());
995                 for (TransactionPart part : transaction.getParts().values()) {
996                         part.encode(bbEncode);
997                 }
998
999
1000                 // Send by local
1001                 byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
1002                 localSequenceNumber++;
1003
1004                 if (returnData == null) {
1005                         // Could not contact server
1006                         return new Pair<Boolean, Boolean>(true, false);
1007                 }
1008
1009                 // Decode the data
1010                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
1011                 boolean didCommit = bbDecode.get() == 1;
1012                 boolean couldArbitrate = bbDecode.get() == 1;
1013                 int numberOfEntries = bbDecode.getInt();
1014                 boolean foundAbort = false;
1015
1016                 for (int i = 0; i < numberOfEntries; i++) {
1017                         byte type = bbDecode.get();
1018                         if (type == Entry.TypeAbort) {
1019                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
1020
1021                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
1022                                         foundAbort = true;
1023                                 }
1024
1025                                 processEntry(abort);
1026                         } else if (type == Entry.TypeCommitPart) {
1027                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
1028                                 processEntry(commitPart);
1029                         }
1030                 }
1031
1032                 updateLiveStateFromLocal();
1033
1034                 if (couldArbitrate) {
1035                         TransactionStatus status =  transaction.getTransactionStatus();
1036                         if (didCommit) {
1037                                 status.setStatus(TransactionStatus.StatusCommitted);
1038                         } else {
1039                                 status.setStatus(TransactionStatus.StatusAborted);
1040                         }
1041                 } else {
1042                         TransactionStatus status =  transaction.getTransactionStatus();
1043                         if (foundAbort) {
1044                                 status.setStatus(TransactionStatus.StatusAborted);
1045                         } else {
1046                                 status.setStatus(TransactionStatus.StatusCommitted);
1047                         }
1048                 }
1049
1050                 return new Pair<Boolean, Boolean>(false, true);
1051         }
1052
1053         public synchronized byte[] acceptDataFromLocal(byte[] data) {
1054
1055                 // Decode the data
1056                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1057                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1058                 int numberOfParts = bbDecode.getInt();
1059
1060                 // If we did commit a transaction or not
1061                 boolean didCommit = false;
1062                 boolean couldArbitrate = false;
1063
1064                 if (numberOfParts != 0) {
1065
1066                         // decode the transaction
1067                         Transaction transaction = new Transaction();
1068                         for (int i = 0; i < numberOfParts; i++) {
1069                                 bbDecode.get();
1070                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
1071                                 transaction.addPartDecode(newPart);
1072                         }
1073
1074                         // Arbitrate on transaction and pull relevant return data
1075                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1076                         couldArbitrate = localArbitrateReturn.getFirst();
1077                         didCommit = localArbitrateReturn.getSecond();
1078
1079                         updateLiveStateFromLocal();
1080
1081                         // Transaction was sent to the server so keep track of it to prevent double commit
1082                         if (transaction.getSequenceNumber() != -1) {
1083                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1084                         }
1085                 }
1086
1087                 // The data to send back
1088                 int returnDataSize = 0;
1089                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
1090
1091                 // Get the aborts to send back
1092                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
1093                 Collections.sort(abortLocalSequenceNumbers);
1094                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1095                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1096                                 continue;
1097                         }
1098
1099                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1100                         unseenArbitrations.add(abort);
1101                         returnDataSize += abort.getSize();
1102                 }
1103
1104                 // Get the commits to send back
1105                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1106                 if (commitForClientTable != null) {
1107                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1108                         Collections.sort(commitLocalSequenceNumbers);
1109
1110                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1111                                 Commit commit = commitForClientTable.get(localSequenceNumber);
1112
1113                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1114                                         continue;
1115                                 }
1116
1117                                 unseenArbitrations.addAll(commit.getParts().values());
1118
1119                                 for (CommitPart commitPart : commit.getParts().values()) {
1120                                         returnDataSize += commitPart.getSize();
1121                                 }
1122                         }
1123                 }
1124
1125                 // Number of arbitration entries to decode
1126                 returnDataSize += 2 * Integer.BYTES;
1127
1128                 // Boolean of did commit or not
1129                 if (numberOfParts != 0) {
1130                         returnDataSize += Byte.BYTES;
1131                 }
1132
1133                 // Data to send Back
1134                 byte[] returnData = new byte[returnDataSize];
1135                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1136
1137                 if (numberOfParts != 0) {
1138                         if (didCommit) {
1139                                 bbEncode.put((byte)1);
1140                         } else {
1141                                 bbEncode.put((byte)0);
1142                         }
1143                         if (couldArbitrate) {
1144                                 bbEncode.put((byte)1);
1145                         } else {
1146                                 bbEncode.put((byte)0);
1147                         }
1148                 }
1149
1150                 bbEncode.putInt(unseenArbitrations.size());
1151                 for (Entry entry : unseenArbitrations) {
1152                         entry.encode(bbEncode);
1153                 }
1154
1155
1156                 localSequenceNumber++;
1157                 return returnData;
1158         }
1159
1160         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
1161
1162                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
1163                 attemptedToSendToServer = true;
1164
1165                 boolean inserted = false;
1166                 boolean lastTryInserted = false;
1167
1168                 Slot[] array = cloud.putSlot(slot, newSize);
1169                 if (array == null) {
1170                         array = new Slot[] {slot};
1171                         rejectedSlotList.clear();
1172                         inserted = true;
1173                 }       else {
1174                         if (array.length == 0) {
1175                                 throw new Error("Server Error: Did not send any slots");
1176                         }
1177
1178                         // if (attemptedToSendToServerTmp) {
1179                         if (hadPartialSendToServer) {
1180
1181                                 boolean isInserted = false;
1182                                 for (Slot s : array) {
1183                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1184                                                 isInserted = true;
1185                                                 break;
1186                                         }
1187                                 }
1188
1189                                 for (Slot s : array) {
1190                                         if (isInserted) {
1191                                                 break;
1192                                         }
1193
1194                                         // Process each entry in the slot
1195                                         for (Entry entry : s.getEntries()) {
1196
1197                                                 if (entry.getType() == Entry.TypeLastMessage) {
1198                                                         LastMessage lastMessage = (LastMessage)entry;
1199
1200                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1201                                                                 isInserted = true;
1202                                                                 break;
1203                                                         }
1204                                                 }
1205                                         }
1206                                 }
1207
1208                                 if (!isInserted) {
1209                                         rejectedSlotList.add(slot.getSequenceNumber());
1210                                         lastTryInserted = false;
1211                                 } else {
1212                                         lastTryInserted = true;
1213                                 }
1214                         } else {
1215                                 rejectedSlotList.add(slot.getSequenceNumber());
1216                                 lastTryInserted = false;
1217                         }
1218                 }
1219
1220                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
1221         }
1222
1223         /**
1224          * Returns false if a resize was needed
1225          */
1226         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
1227
1228
1229                 int newSize = 0;
1230                 if (liveSlotCount > bufferResizeThreshold) {
1231                         resize = true; //Resize is forced
1232
1233                 }
1234
1235                 if (resize) {
1236                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1237                         TableStatus status = new TableStatus(slot, newSize);
1238                         slot.addEntry(status);
1239                 }
1240
1241                 // Fill with rejected slots first before doing anything else
1242                 doRejectedMessages(slot);
1243
1244                 // Do mandatory rescue of entries
1245                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1246
1247                 // Extract working variables
1248                 boolean needsResize = mandatoryRescueReturn.getFirst();
1249                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
1250                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1251
1252                 if (needsResize && !resize) {
1253                         // We need to resize but we are not resizing so return false
1254                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
1255                 }
1256
1257                 boolean inserted = false;
1258                 if (newKeyEntry != null) {
1259                         newKeyEntry.setSlot(slot);
1260                         if (slot.hasSpace(newKeyEntry)) {
1261
1262                                 slot.addEntry(newKeyEntry);
1263                                 inserted = true;
1264                         }
1265                 }
1266
1267                 // Clear the transactions, aborts and commits that were sent previously
1268                 transactionPartsSent.clear();
1269                 pendingSendArbitrationEntriesToDelete.clear();
1270
1271                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1272                         boolean isFull = false;
1273                         round.generateParts();
1274                         List<Entry> parts = round.getParts();
1275
1276                         // Insert pending arbitration data
1277                         for (Entry arbitrationData : parts) {
1278
1279                                 // If it is an abort then we need to set some information
1280                                 if (arbitrationData instanceof Abort) {
1281                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1282                                 }
1283
1284                                 if (!slot.hasSpace(arbitrationData)) {
1285                                         // No space so cant do anything else with these data entries
1286                                         isFull = true;
1287                                         break;
1288                                 }
1289
1290                                 // Add to this current slot and add it to entries to delete
1291                                 slot.addEntry(arbitrationData);
1292                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1293                         }
1294
1295                         if (isFull) {
1296                                 break;
1297                         }
1298                 }
1299
1300                 if (pendingTransactionQueue.size() > 0) {
1301
1302                         Transaction transaction = pendingTransactionQueue.get(0);
1303
1304                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1305                         // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1306                         //      transaction.setSequenceNumber(slot.getSequenceNumber());
1307                         // }
1308
1309                         if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1310                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1311                         }
1312
1313
1314                         while (true) {
1315                                 TransactionPart part = transaction.getNextPartToSend();
1316
1317                                 if (part == null) {
1318                                         // Ran out of parts to send for this transaction so move on
1319                                         break;
1320                                 }
1321
1322                                 if (slot.hasSpace(part)) {
1323                                         slot.addEntry(part);
1324                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1325                                         if (partsSent == null) {
1326                                                 partsSent = new ArrayList<Integer>();
1327                                                 transactionPartsSent.put(transaction, partsSent);
1328                                         }
1329                                         partsSent.add(part.getPartNumber());
1330                                         transactionPartsSent.put(transaction, partsSent);
1331                                 } else {
1332                                         break;
1333                                 }
1334                         }
1335                 }
1336
1337                 // Fill the remainder of the slot with rescue data
1338                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1339
1340                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1341         }
1342
1343         private void doRejectedMessages(Slot s) {
1344                 if (! rejectedSlotList.isEmpty()) {
1345                         /* TODO: We should avoid generating a rejected message entry if
1346                          * there is already a sufficient entry in the queue (e.g.,
1347                          * equalsto value of true and same sequence number).  */
1348
1349                         long old_seqn = rejectedSlotList.firstElement();
1350                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1351                                 long new_seqn = rejectedSlotList.lastElement();
1352                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1353                                 s.addEntry(rm);
1354                         } else {
1355                                 long prev_seqn = -1;
1356                                 int i = 0;
1357                                 /* Go through list of missing messages */
1358                                 for (; i < rejectedSlotList.size(); i++) {
1359                                         long curr_seqn = rejectedSlotList.get(i);
1360                                         Slot s_msg = buffer.getSlot(curr_seqn);
1361                                         if (s_msg != null)
1362                                                 break;
1363                                         prev_seqn = curr_seqn;
1364                                 }
1365                                 /* Generate rejected message entry for missing messages */
1366                                 if (prev_seqn != -1) {
1367                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1368                                         s.addEntry(rm);
1369                                 }
1370                                 /* Generate rejected message entries for present messages */
1371                                 for (; i < rejectedSlotList.size(); i++) {
1372                                         long curr_seqn = rejectedSlotList.get(i);
1373                                         Slot s_msg = buffer.getSlot(curr_seqn);
1374                                         long machineid = s_msg.getMachineID();
1375                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1376                                         s.addEntry(rm);
1377                                 }
1378                         }
1379                 }
1380         }
1381
1382         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1383                 long newestSequenceNumber = buffer.getNewestSeqNum();
1384                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1385                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1386                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1387                 }
1388
1389                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1390                 boolean seenLiveSlot = false;
1391                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1392                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1393
1394
1395                 // Mandatory Rescue
1396                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1397                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1398                         // Push slot number forward
1399                         if (! seenLiveSlot) {
1400                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1401                         }
1402
1403                         if (!previousSlot.isLive()) {
1404                                 continue;
1405                         }
1406
1407                         // We have seen a live slot
1408                         seenLiveSlot = true;
1409
1410                         // Get all the live entries for a slot
1411                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1412
1413                         // Iterate over all the live entries and try to rescue them
1414                         for (Entry liveEntry : liveEntries) {
1415                                 if (slot.hasSpace(liveEntry)) {
1416
1417                                         // Enough space to rescue the entry
1418                                         slot.addEntry(liveEntry);
1419                                 } else if (currentSequenceNumber == firstIfFull) {
1420                                         //if there's no space but the entry is about to fall off the queue
1421                                         System.out.println("B"); //?
1422                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1423
1424                                 }
1425                         }
1426                 }
1427
1428                 // Did not resize
1429                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1430         }
1431
1432         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1433                 /* now go through live entries from least to greatest sequence number until
1434                  * either all live slots added, or the slot doesn't have enough room
1435                  * for SKIP_THRESHOLD consecutive entries*/
1436                 int skipcount = 0;
1437                 long newestseqnum = buffer.getNewestSeqNum();
1438                 search:
1439                 for (; seqn <= newestseqnum; seqn++) {
1440                         Slot prevslot = buffer.getSlot(seqn);
1441                         //Push slot number forward
1442                         if (!seenliveslot)
1443                                 oldestLiveSlotSequenceNumver = seqn;
1444
1445                         if (!prevslot.isLive())
1446                                 continue;
1447                         seenliveslot = true;
1448                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1449                         for (Entry liveentry : liveentries) {
1450                                 if (s.hasSpace(liveentry))
1451                                         s.addEntry(liveentry);
1452                                 else {
1453                                         skipcount++;
1454                                         if (skipcount > SKIP_THRESHOLD)
1455                                                 break search;
1456                                 }
1457                         }
1458                 }
1459         }
1460
1461         /**
1462          * Checks for malicious activity and updates the local copy of the block chain.
1463          */
1464         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1465
1466                 // The cloud communication layer has checked slot HMACs already before decoding
1467                 if (newSlots.length == 0) {
1468                         return;
1469                 }
1470
1471                 // Make sure all slots are newer than the last largest slot this client has seen
1472                 long firstSeqNum = newSlots[0].getSequenceNumber();
1473                 if (firstSeqNum <= sequenceNumber) {
1474                         throw new Error("Server Error: Sent older slots!");
1475                 }
1476
1477                 // Create an object that can access both new slots and slots in our local chain
1478                 // without committing slots to our local chain
1479                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1480
1481                 // Check that the HMAC chain is not broken
1482                 checkHMACChain(indexer, newSlots);
1483
1484                 // Set to keep track of messages from clients
1485                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1486
1487                 // Process each slots data
1488                 for (Slot slot : newSlots) {
1489                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1490
1491                         updateExpectedSize();
1492                 }
1493
1494                 // If there is a gap, check to see if the server sent us everything.
1495                 if (firstSeqNum != (sequenceNumber + 1)) {
1496
1497                         // Check the size of the slots that were sent down by the server.
1498                         // Can only check the size if there was a gap
1499                         checkNumSlots(newSlots.length);
1500
1501                         // Since there was a gap every machine must have pushed a slot or must have
1502                         // a last message message.  If not then the server is hiding slots
1503                         if (!machineSet.isEmpty()) {
1504                                 throw new Error("Missing record for machines: " + machineSet);
1505                         }
1506                 }
1507
1508                 // Update the size of our local block chain.
1509                 commitNewMaxSize();
1510
1511                 // Commit new to slots to the local block chain.
1512                 for (Slot slot : newSlots) {
1513
1514                         // Insert this slot into our local block chain copy.
1515                         buffer.putSlot(slot);
1516
1517                         // Keep track of how many slots are currently live (have live data in them).
1518                         liveSlotCount++;
1519                 }
1520
1521                 // Get the sequence number of the latest slot in the system
1522                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1523
1524                 updateLiveStateFromServer();
1525
1526                 // No Need to remember after we pulled from the server
1527                 offlineTransactionsCommittedAndAtServer.clear();
1528
1529                 // This is invalidated now
1530                 hadPartialSendToServer = false;
1531         }
1532
1533         private void updateLiveStateFromServer() {
1534                 // Process the new transaction parts
1535                 processNewTransactionParts();
1536
1537                 // Do arbitration on new transactions that were received
1538                 arbitrateFromServer();
1539
1540                 // Update all the committed keys
1541                 boolean didCommitOrSpeculate = updateCommittedTable();
1542
1543                 // Delete the transactions that are now dead
1544                 updateLiveTransactionsAndStatus();
1545
1546                 // Do speculations
1547                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1548                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1549         }
1550
1551         private void updateLiveStateFromLocal() {
1552                 // Update all the committed keys
1553                 boolean didCommitOrSpeculate = updateCommittedTable();
1554
1555                 // Delete the transactions that are now dead
1556                 updateLiveTransactionsAndStatus();
1557
1558                 // Do speculations
1559                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1560                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1561         }
1562
1563         private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
1564                 // if (didFindTableStatus) {
1565                 // return;
1566                 // }
1567                 long prevslots = firstSequenceNumber;
1568
1569
1570                 if (didFindTableStatus) {
1571                         // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
1572                         // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1573
1574                 } else {
1575                         expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1576                         // System.out.println("Here: " + expectedsize);
1577                 }
1578
1579                 // System.out.println(numberOfSlots);
1580
1581                 didFindTableStatus = true;
1582                 currMaxSize = numberOfSlots;
1583         }
1584
1585         private void updateExpectedSize() {
1586                 expectedsize++;
1587
1588                 if (expectedsize > currMaxSize) {
1589                         expectedsize = currMaxSize;
1590                 }
1591         }
1592
1593
1594         /**
1595          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1596          * This is only called when we have a gap between the slots that we have locally and the slots
1597          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1598          * status message
1599          */
1600         private void checkNumSlots(int numberOfSlots) {
1601                 if (numberOfSlots != expectedsize) {
1602                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1603                 }
1604         }
1605
1606         private void updateCurrMaxSize(int newmaxsize) {
1607                 currMaxSize = newmaxsize;
1608         }
1609
1610
1611         /**
1612          * Update the size of of the local buffer if it is needed.
1613          */
1614         private void commitNewMaxSize() {
1615                 didFindTableStatus = false;
1616
1617                 // Resize the local slot buffer
1618                 if (numberOfSlots != currMaxSize) {
1619                         buffer.resize((int)currMaxSize);
1620                 }
1621
1622                 // Change the number of local slots to the new size
1623                 numberOfSlots = (int)currMaxSize;
1624
1625
1626                 // Recalculate the resize threshold since the size of the local buffer has changed
1627                 setResizeThreshold();
1628         }
1629
1630         /**
1631          * Process the new transaction parts from this latest round of slots received from the server
1632          */
1633         private void processNewTransactionParts() {
1634
1635                 if (newTransactionParts.size() == 0) {
1636                         // Nothing new to process
1637                         return;
1638                 }
1639
1640                 // Iterate through all the machine Ids that we received new parts for
1641                 for (Long machineId : newTransactionParts.keySet()) {
1642                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1643
1644                         // Iterate through all the parts for that machine Id
1645                         for (Pair<Long, Integer> partId : parts.keySet()) {
1646                                 TransactionPart part = parts.get(partId);
1647
1648                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1649                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1650                                         // Set dead the transaction part
1651                                         part.setDead();
1652                                         continue;
1653                                 }
1654
1655                                 // Get the transaction object for that sequence number
1656                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1657
1658                                 if (transaction == null) {
1659                                         // This is a new transaction that we dont have so make a new one
1660                                         transaction = new Transaction();
1661
1662                                         // Insert this new transaction into the live tables
1663                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1664                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1665                                 }
1666
1667                                 // Add that part to the transaction
1668                                 transaction.addPartDecode(part);
1669                         }
1670                 }
1671
1672                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1673                 newTransactionParts.clear();
1674         }
1675
1676
1677         private long lastSeqNumArbOn = 0;
1678
1679         private void arbitrateFromServer() {
1680
1681                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1682                         // Nothing to arbitrate on so move on
1683                         return;
1684                 }
1685
1686                 // Get the transaction sequence numbers and sort from oldest to newest
1687                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1688                 Collections.sort(transactionSequenceNumbers);
1689
1690                 // Collection of key value pairs that are
1691                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1692
1693                 // The last transaction arbitrated on
1694                 long lastTransactionCommitted = -1;
1695                 Set<Abort> generatedAborts = new HashSet<Abort>();
1696
1697                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1698                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1699
1700
1701
1702                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1703                         if (transaction.getArbitrator() != localMachineId) {
1704                                 continue;
1705                         }
1706
1707                         if (transactionSequenceNumber < lastSeqNumArbOn) {
1708                                 continue;
1709                         }
1710
1711                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1712                                 // We have seen this already locally so dont commit again
1713                                 continue;
1714                         }
1715
1716
1717                         if (!transaction.isComplete()) {
1718                                 // Will arbitrate in incorrect order if we continue so just break
1719                                 // Most likely this
1720                                 break;
1721                         }
1722
1723
1724                         // update the largest transaction seen by arbitrator from server
1725                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1726                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1727                         } else {
1728                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1729                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1730                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1731                                 }
1732                         }
1733
1734                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1735                                 // Guard evaluated as true
1736
1737                                 // Update the local changes so we can make the commit
1738                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1739                                         speculativeTableTmp.put(kv.getKey(), kv);
1740                                 }
1741
1742                                 // Update what the last transaction committed was for use in batch commit
1743                                 lastTransactionCommitted = transactionSequenceNumber;
1744                         } else {
1745                                 // Guard evaluated was false so create abort
1746
1747                                 // Create the abort
1748                                 Abort newAbort = new Abort(null,
1749                                                            transaction.getClientLocalSequenceNumber(),
1750                                                            transaction.getSequenceNumber(),
1751                                                            transaction.getMachineId(),
1752                                                            transaction.getArbitrator(),
1753                                                            localArbitrationSequenceNumber);
1754                                 localArbitrationSequenceNumber++;
1755
1756                                 generatedAborts.add(newAbort);
1757
1758                                 // Insert the abort so we can process
1759                                 processEntry(newAbort);
1760                         }
1761
1762                         lastSeqNumArbOn = transactionSequenceNumber;
1763
1764                         // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1765                 }
1766
1767                 Commit newCommit = null;
1768
1769                 // If there is something to commit
1770                 if (speculativeTableTmp.size() != 0) {
1771
1772                         // Create the commit and increment the commit sequence number
1773                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1774                         localArbitrationSequenceNumber++;
1775
1776                         // Add all the new keys to the commit
1777                         for (KeyValue kv : speculativeTableTmp.values()) {
1778                                 newCommit.addKV(kv);
1779                         }
1780
1781                         // create the commit parts
1782                         newCommit.createCommitParts();
1783
1784                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1785
1786                         // Insert the commit so we can process it
1787                         for (CommitPart commitPart : newCommit.getParts().values()) {
1788                                 processEntry(commitPart);
1789                         }
1790                 }
1791
1792                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1793                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1794                         pendingSendArbitrationRounds.add(arbitrationRound);
1795
1796                         if (compactArbitrationData()) {
1797                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1798                                 if (newArbitrationRound.getCommit() != null) {
1799                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1800                                                 processEntry(commitPart);
1801                                         }
1802                                 }
1803                         }
1804                 }
1805         }
1806
1807         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1808
1809                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1810                 if (transaction.getArbitrator() != localMachineId) {
1811                         return new Pair<Boolean, Boolean>(false, false);
1812                 }
1813
1814                 if (!transaction.isComplete()) {
1815                         // Will arbitrate in incorrect order if we continue so just break
1816                         // Most likely this
1817                         return new Pair<Boolean, Boolean>(false, false);
1818                 }
1819
1820                 if (transaction.getMachineId() != localMachineId) {
1821                         // dont do this check for local transactions
1822                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1823                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1824                                         // We've have already seen this from the server
1825                                         return new Pair<Boolean, Boolean>(false, false);
1826                                 }
1827                         }
1828                 }
1829
1830                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1831                         // Guard evaluated as true
1832
1833                         // Create the commit and increment the commit sequence number
1834                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1835                         localArbitrationSequenceNumber++;
1836
1837                         // Update the local changes so we can make the commit
1838                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1839                                 newCommit.addKV(kv);
1840                         }
1841
1842                         // create the commit parts
1843                         newCommit.createCommitParts();
1844
1845                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1846                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1847                         pendingSendArbitrationRounds.add(arbitrationRound);
1848
1849                         if (compactArbitrationData()) {
1850                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1851                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1852                                         processEntry(commitPart);
1853                                 }
1854                         } else {
1855                                 // Insert the commit so we can process it
1856                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1857                                         processEntry(commitPart);
1858                                 }
1859                         }
1860
1861                         if (transaction.getMachineId() == localMachineId) {
1862                                 TransactionStatus status = transaction.getTransactionStatus();
1863                                 if (status != null) {
1864                                         status.setStatus(TransactionStatus.StatusCommitted);
1865                                 }
1866                         }
1867
1868                         updateLiveStateFromLocal();
1869                         return new Pair<Boolean, Boolean>(true, true);
1870                 } else {
1871
1872                         if (transaction.getMachineId() == localMachineId) {
1873                                 // For locally created messages update the status
1874
1875                                 // Guard evaluated was false so create abort
1876                                 TransactionStatus status = transaction.getTransactionStatus();
1877                                 if (status != null) {
1878                                         status.setStatus(TransactionStatus.StatusAborted);
1879                                 }
1880                         } else {
1881                                 Set addAbortSet = new HashSet<Abort>();
1882
1883
1884                                 // Create the abort
1885                                 Abort newAbort = new Abort(null,
1886                                                            transaction.getClientLocalSequenceNumber(),
1887                                                            -1,
1888                                                            transaction.getMachineId(),
1889                                                            transaction.getArbitrator(),
1890                                                            localArbitrationSequenceNumber);
1891                                 localArbitrationSequenceNumber++;
1892
1893                                 addAbortSet.add(newAbort);
1894
1895
1896                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1897                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1898                                 pendingSendArbitrationRounds.add(arbitrationRound);
1899
1900                                 if (compactArbitrationData()) {
1901                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1902                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1903                                                 processEntry(commitPart);
1904                                         }
1905                                 }
1906                         }
1907
1908                         updateLiveStateFromLocal();
1909                         return new Pair<Boolean, Boolean>(true, false);
1910                 }
1911         }
1912
1913         /**
1914          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1915          */
1916         private boolean compactArbitrationData() {
1917
1918                 if (pendingSendArbitrationRounds.size() < 2) {
1919                         // Nothing to compact so do nothing
1920                         return false;
1921                 }
1922
1923                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1924                 if (lastRound.didSendPart()) {
1925                         return false;
1926                 }
1927
1928                 boolean hadCommit = (lastRound.getCommit() == null);
1929                 boolean gotNewCommit = false;
1930
1931                 int numberToDelete = 1;
1932                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1933                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1934
1935                         if (round.isFull() || round.didSendPart()) {
1936                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1937                                 break;
1938                         }
1939
1940                         if (round.getCommit() == null) {
1941
1942                                 // Try compacting aborts only
1943                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1944                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1945                                         // Cant compact since it would be too large
1946                                         break;
1947                                 }
1948                                 lastRound.addAborts(round.getAborts());
1949                         } else {
1950
1951                                 // Create a new larger commit
1952                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1953                                 localArbitrationSequenceNumber++;
1954
1955                                 // Create the commit parts so that we can count them
1956                                 newCommit.createCommitParts();
1957
1958                                 // Calculate the new size of the parts
1959                                 int newSize = newCommit.getNumberOfParts();
1960                                 newSize += lastRound.getAbortsCount();
1961                                 newSize += round.getAbortsCount();
1962
1963                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1964                                         // Cant compact since it would be too large
1965                                         break;
1966                                 }
1967
1968                                 // Set the new compacted part
1969                                 lastRound.setCommit(newCommit);
1970                                 lastRound.addAborts(round.getAborts());
1971                                 gotNewCommit = true;
1972                         }
1973
1974                         numberToDelete++;
1975                 }
1976
1977                 if (numberToDelete != 1) {
1978                         // If there is a compaction
1979
1980                         // Delete the previous pieces that are now in the new compacted piece
1981                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1982                                 pendingSendArbitrationRounds.clear();
1983                         } else {
1984                                 for (int i = 0; i < numberToDelete; i++) {
1985                                         pendingSendArbitrationRounds.removeIndex(pendingSendArbitrationRounds.size() - 1);
1986                                 }
1987                         }
1988
1989                         // Add the new compacted into the pending to send list
1990                         pendingSendArbitrationRounds.add(lastRound);
1991
1992                         // Should reinsert into the commit processor
1993                         if (hadCommit && gotNewCommit) {
1994                                 return true;
1995                         }
1996                 }
1997
1998                 return false;
1999         }
2000         // private boolean compactArbitrationData() {
2001         //      return false;
2002         // }
2003
2004         /**
2005          * Update all the commits and the committed tables, sets dead the dead transactions
2006          */
2007         private boolean updateCommittedTable() {
2008
2009                 if (newCommitParts.size() == 0) {
2010                         // Nothing new to process
2011                         return false;
2012                 }
2013
2014                 // Iterate through all the machine Ids that we received new parts for
2015                 for (Long machineId : newCommitParts.keySet()) {
2016                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
2017
2018                         // Iterate through all the parts for that machine Id
2019                         for (Pair<Long, Integer> partId : parts.keySet()) {
2020                                 CommitPart part = parts.get(partId);
2021
2022                                 // Get the transaction object for that sequence number
2023                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
2024
2025                                 if (commitForClientTable == null) {
2026                                         // This is the first commit from this device
2027                                         commitForClientTable = new HashMap<Long, Commit>();
2028                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
2029                                 }
2030
2031                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
2032
2033                                 if (commit == null) {
2034                                         // This is a new commit that we dont have so make a new one
2035                                         commit = new Commit();
2036
2037                                         // Insert this new commit into the live tables
2038                                         commitForClientTable.put(part.getSequenceNumber(), commit);
2039                                 }
2040
2041                                 // Add that part to the commit
2042                                 commit.addPartDecode(part);
2043                         }
2044                 }
2045
2046                 // Clear all the new commits parts in preparation for the next time the server sends slots
2047                 newCommitParts.clear();
2048
2049                 // If we process a new commit keep track of it for future use
2050                 boolean didProcessANewCommit = false;
2051
2052                 // Process the commits one by one
2053                 for (Long arbitratorId : liveCommitsTable.keySet()) {
2054
2055                         // Get all the commits for a specific arbitrator
2056                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2057
2058                         // Sort the commits in order
2059                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
2060                         Collections.sort(commitSequenceNumbers);
2061
2062                         // Get the last commit seen from this arbitrator
2063                         long lastCommitSeenSequenceNumber = -1;
2064                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
2065                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2066                         }
2067
2068                         // Go through each new commit one by one
2069                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2070                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2071                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
2072
2073                                 // Special processing if a commit is not complete
2074                                 if (!commit.isComplete()) {
2075                                         if (i == (commitSequenceNumbers.size() - 1)) {
2076                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2077                                                 break;
2078                                         } else {
2079                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2080                                                 // Delete it and move on
2081                                                 commit.setDead();
2082                                                 commitForClientTable.remove(commit.getSequenceNumber());
2083                                                 continue;
2084                                         }
2085                                 }
2086
2087                                 // Update the last transaction that was updated if we can
2088                                 if (commit.getTransactionSequenceNumber() != -1) {
2089                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2090
2091                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2092                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2093                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2094                                         }
2095                                 }
2096
2097                                 // Update the last arbitration data that we have seen so far
2098                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
2099
2100                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2101                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2102                                                 // Is larger
2103                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2104                                         }
2105                                 } else {
2106                                         // Never seen any data from this arbitrator so record the first one
2107                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2108                                 }
2109
2110                                 // We have already seen this commit before so need to do the full processing on this commit
2111                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2112
2113                                         // Update the last transaction that was updated if we can
2114                                         if (commit.getTransactionSequenceNumber() != -1) {
2115                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2116
2117                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2118                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2119                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2120                                                 }
2121                                         }
2122
2123                                         continue;
2124                                 }
2125
2126                                 // If we got here then this is a brand new commit and needs full processing
2127
2128                                 // Get what commits should be edited, these are the commits that have live values for their keys
2129                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
2130                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2131                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2132                                 }
2133                                 commitsToEdit.remove(null); // remove null since it could be in this set
2134
2135                                 // Update each previous commit that needs to be updated
2136                                 for (Commit previousCommit : commitsToEdit) {
2137
2138                                         // Only bother with live commits (TODO: Maybe remove this check)
2139                                         if (previousCommit.isLive()) {
2140
2141                                                 // Update which keys in the old commits are still live
2142                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2143                                                         previousCommit.invalidateKey(kv.getKey());
2144                                                 }
2145
2146                                                 // if the commit is now dead then remove it
2147                                                 if (!previousCommit.isLive()) {
2148                                                         commitForClientTable.remove(previousCommit);
2149                                                 }
2150                                         }
2151                                 }
2152
2153                                 // Update the last seen sequence number from this arbitrator
2154                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
2155                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2156                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2157                                         }
2158                                 } else {
2159                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2160                                 }
2161
2162                                 // We processed a new commit that we havent seen before
2163                                 didProcessANewCommit = true;
2164
2165                                 // Update the committed table of keys and which commit is using which key
2166                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2167                                         committedKeyValueTable.put(kv.getKey(), kv);
2168                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
2169                                 }
2170                         }
2171                 }
2172
2173                 return didProcessANewCommit;
2174         }
2175
2176         /**
2177          * Create the speculative table from transactions that are still live and have come from the cloud
2178          */
2179         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
2180                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2181                         // There is nothing to speculate on
2182                         return false;
2183                 }
2184
2185                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2186                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
2187                 Collections.sort(transactionSequenceNumbersSorted);
2188
2189                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2190
2191
2192                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2193                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2194                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2195
2196                         // Start from scratch
2197                         speculatedKeyValueTable.clear();
2198                         lastTransactionSequenceNumberSpeculatedOn = -1;
2199                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2200
2201                 }
2202
2203                 // Remember the front of the transaction list
2204                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2205
2206                 // Find where to start arbitration from
2207                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2208
2209                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2210                         // Make sure we are not out of bounds
2211                         return false; // did not speculate
2212                 }
2213
2214                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2215                 boolean didSkip = true;
2216
2217                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2218                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2219                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2220
2221                         if (!transaction.isComplete()) {
2222                                 // If there is an incomplete transaction then there is nothing we can do
2223                                 // add this transactions arbitrator to the list of arbitrators we should ignore
2224                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2225                                 didSkip = true;
2226                                 continue;
2227                         }
2228
2229                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2230                                 continue;
2231                         }
2232
2233                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2234
2235                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
2236                                 // Guard evaluated to true so update the speculative table
2237                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2238                                         speculatedKeyValueTable.put(kv.getKey(), kv);
2239                                 }
2240                         }
2241                 }
2242
2243                 if (didSkip) {
2244                         // Since there was a skip we need to redo the speculation next time around
2245                         lastTransactionSequenceNumberSpeculatedOn = -1;
2246                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2247                 }
2248
2249                 // We did some speculation
2250                 return true;
2251         }
2252
2253         /**
2254          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2255          */
2256         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
2257                 if (pendingTransactionQueue.size() == 0) {
2258                         // There is nothing to speculate on
2259                         return;
2260                 }
2261
2262
2263                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2264                         // need to reset on the pending speculation
2265                         lastPendingTransactionSpeculatedOn = null;
2266                         firstPendingTransaction = pendingTransactionQueue.get(0);
2267                         pendingTransactionSpeculatedKeyValueTable.clear();
2268                 }
2269
2270                 // Find where to start arbitration from
2271                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2272
2273                 if (startIndex >= pendingTransactionQueue.size()) {
2274                         // Make sure we are not out of bounds
2275                         return;
2276                 }
2277
2278                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2279                         Transaction transaction = pendingTransactionQueue.get(i);
2280
2281                         lastPendingTransactionSpeculatedOn = transaction;
2282
2283                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2284                                 // Guard evaluated to true so update the speculative table
2285                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2286                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2287                                 }
2288                         }
2289                 }
2290         }
2291
2292         /**
2293          * Set dead and remove from the live transaction tables the transactions that are dead
2294          */
2295         private void updateLiveTransactionsAndStatus() {
2296
2297                 // Go through each of the transactions
2298                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2299                         Transaction transaction = iter.next().getValue();
2300
2301                         // Check if the transaction is dead
2302                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2303                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2304
2305                                 // Set dead the transaction
2306                                 transaction.setDead();
2307
2308                                 // Remove the transaction from the live table
2309                                 iter.remove();
2310                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2311                         }
2312                 }
2313
2314                 // Go through each of the transactions
2315                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2316                         TransactionStatus status = iter.next().getValue();
2317
2318                         // Check if the transaction is dead
2319                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2320                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2321
2322                                 // Set committed
2323                                 status.setStatus(TransactionStatus.StatusCommitted);
2324
2325                                 // Remove
2326                                 iter.remove();
2327                         }
2328                 }
2329         }
2330
2331         /**
2332          * Process this slot, entry by entry.  Also update the latest message sent by slot
2333          */
2334         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2335
2336                 // Update the last message seen
2337                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2338
2339                 // Process each entry in the slot
2340                 for (Entry entry : slot.getEntries()) {
2341                         switch (entry.getType()) {
2342
2343                         case Entry.TypeCommitPart:
2344                                 processEntry((CommitPart)entry);
2345                                 break;
2346
2347                         case Entry.TypeAbort:
2348                                 processEntry((Abort)entry);
2349                                 break;
2350
2351                         case Entry.TypeTransactionPart:
2352                                 processEntry((TransactionPart)entry);
2353                                 break;
2354
2355                         case Entry.TypeNewKey:
2356                                 processEntry((NewKey)entry);
2357                                 break;
2358
2359                         case Entry.TypeLastMessage:
2360                                 processEntry((LastMessage)entry, machineSet);
2361                                 break;
2362
2363                         case Entry.TypeRejectedMessage:
2364                                 processEntry((RejectedMessage)entry, indexer);
2365                                 break;
2366
2367                         case Entry.TypeTableStatus:
2368                                 processEntry((TableStatus)entry, slot.getSequenceNumber());
2369                                 break;
2370
2371                         default:
2372                                 throw new Error("Unrecognized type: " + entry.getType());
2373                         }
2374                 }
2375         }
2376
2377         /**
2378          * Update the last message that was sent for a machine Id
2379          */
2380         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2381                 // Update what the last message received by a machine was
2382                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2383         }
2384
2385         /**
2386          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2387          */
2388         private void processEntry(NewKey entry) {
2389
2390                 // Update the arbitrator table with the new key information
2391                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2392
2393                 // Update what the latest live new key is
2394                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2395                 if (oldNewKey != null) {
2396                         // Delete the old new key messages
2397                         oldNewKey.setDead();
2398                 }
2399         }
2400
2401         /**
2402          * Process new table status entries and set dead the old ones as new ones come in.
2403          * keeps track of the largest and smallest table status seen in this current round
2404          * of updating the local copy of the block chain
2405          */
2406         private void processEntry(TableStatus entry, long seq) {
2407                 int newNumSlots = entry.getMaxSlots();
2408                 updateCurrMaxSize(newNumSlots);
2409
2410                 initExpectedSize(seq, newNumSlots);
2411
2412                 if (liveTableStatus != null) {
2413                         // We have a larger table status so the old table status is no longer alive
2414                         liveTableStatus.setDead();
2415                 }
2416
2417                 // Make this new table status the latest alive table status
2418                 liveTableStatus = entry;
2419         }
2420
2421         /**
2422          * Check old messages to see if there is a block chain violation. Also
2423          */
2424         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2425                 long oldSeqNum = entry.getOldSeqNum();
2426                 long newSeqNum = entry.getNewSeqNum();
2427                 boolean isequal = entry.getEqual();
2428                 long machineId = entry.getMachineID();
2429                 long seq = entry.getSequenceNumber();
2430
2431
2432                 // Check if we have messages that were supposed to be rejected in our local block chain
2433                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2434
2435                         // Get the slot
2436                         Slot slot = indexer.getSlot(seqNum);
2437
2438                         if (slot != null) {
2439                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2440
2441                                 long slotMachineId = slot.getMachineID();
2442                                 if (isequal != (slotMachineId == machineId)) {
2443                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2444                                 }
2445                         }
2446                 }
2447
2448
2449                 // Create a list of clients to watch until they see this rejected message entry.
2450                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2451                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2452
2453                         // Machine ID for the last message entry
2454                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2455
2456                         // We've seen it, don't need to continue to watch.  Our next
2457                         // message will implicitly acknowledge it.
2458                         if (lastMessageEntryMachineId == localMachineId) {
2459                                 continue;
2460                         }
2461
2462                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2463                         long entrySequenceNumber = lastMessageValue.getFirst();
2464
2465                         if (entrySequenceNumber < seq) {
2466
2467                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2468                                 addWatchList(lastMessageEntryMachineId, entry);
2469
2470                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2471                                 deviceWatchSet.add(lastMessageEntryMachineId);
2472                         }
2473                 }
2474
2475                 if (deviceWatchSet.isEmpty()) {
2476                         // This rejected message has been seen by all the clients so
2477                         entry.setDead();
2478                 } else {
2479                         // We need to watch this rejected message
2480                         entry.setWatchSet(deviceWatchSet);
2481                 }
2482         }
2483
2484         /**
2485          * Check if this abort is live, if not then save it so we can kill it later.
2486          * update the last transaction number that was arbitrated on.
2487          */
2488         private void processEntry(Abort entry) {
2489
2490
2491                 if (entry.getTransactionSequenceNumber() != -1) {
2492                         // update the transaction status if it was sent to the server
2493                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2494                         if (status != null) {
2495                                 status.setStatus(TransactionStatus.StatusAborted);
2496                         }
2497                 }
2498
2499                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2500                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2501                 if (previouslySeenAbort != null) {
2502                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2503                 }
2504
2505                 if (entry.getTransactionArbitrator() == localMachineId) {
2506                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2507                 }
2508
2509                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2510
2511                         // The machine already saw this so it is dead
2512                         entry.setDead();
2513                         liveAbortTable.remove(entry.getAbortId());
2514
2515                         if (entry.getTransactionArbitrator() == localMachineId) {
2516                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2517                         }
2518
2519                         return;
2520                 }
2521
2522
2523
2524
2525                 // Update the last arbitration data that we have seen so far
2526                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2527
2528                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2529                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2530                                 // Is larger
2531                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2532                         }
2533                 } else {
2534                         // Never seen any data from this arbitrator so record the first one
2535                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2536                 }
2537
2538
2539                 // Set dead a transaction if we can
2540                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2541                 if (transactionToSetDead != null) {
2542                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2543                 }
2544
2545                 // Update the last transaction sequence number that the arbitrator arbitrated on
2546                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2547                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2548
2549                         // Is a valid one
2550                         if (entry.getTransactionSequenceNumber() != -1) {
2551                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2552                         }
2553                 }
2554         }
2555
2556         /**
2557          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2558          */
2559         private void processEntry(TransactionPart entry) {
2560                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2561                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2562                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2563                         // This transaction is dead, it was already committed or aborted
2564                         entry.setDead();
2565                         return;
2566                 }
2567
2568                 // This part is still alive
2569                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2570
2571                 if (transactionPart == null) {
2572                         // Dont have a table for this machine Id yet so make one
2573                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2574                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2575                 }
2576
2577                 // Update the part and set dead ones we have already seen (got a rescued version)
2578                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2579                 if (previouslySeenPart != null) {
2580                         previouslySeenPart.setDead();
2581                 }
2582         }
2583
2584         /**
2585          * Process new commit entries and save them for future use.  Delete duplicates
2586          */
2587         private void processEntry(CommitPart entry) {
2588
2589
2590                 // Update the last transaction that was updated if we can
2591                 if (entry.getTransactionSequenceNumber() != -1) {
2592                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2593
2594                         // Update the last transaction sequence number that the arbitrator arbitrated on
2595                         if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2596                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2597                         }
2598                 }
2599
2600
2601
2602
2603                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2604
2605                 if (commitPart == null) {
2606                         // Don't have a table for this machine Id yet so make one
2607                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2608                         newCommitParts.put(entry.getMachineId(), commitPart);
2609                 }
2610
2611                 // Update the part and set dead ones we have already seen (got a rescued version)
2612                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2613                 if (previouslySeenPart != null) {
2614                         previouslySeenPart.setDead();
2615                 }
2616         }
2617
2618         /**
2619          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2620          * Updates the live aborts, removes those that are dead and sets them dead.
2621          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2622          * other clients have not had a rollback on the last message.
2623          */
2624         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2625
2626                 // We have seen this machine ID
2627                 machineSet.remove(machineId);
2628
2629                 // Get the set of rejected messages that this machine Id is has not seen yet
2630                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2631
2632                 // If there is a rejected message that this machine Id has not seen yet
2633                 if (watchset != null) {
2634
2635                         // Go through each rejected message that this machine Id has not seen yet
2636                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2637
2638                                 RejectedMessage rm = rmit.next();
2639
2640                                 // If this machine Id has seen this rejected message...
2641                                 if (rm.getSequenceNumber() <= seqNum) {
2642
2643                                         // Remove it from our watchlist
2644                                         rmit.remove();
2645
2646                                         // Decrement machines that need to see this notification
2647                                         rm.removeWatcher(machineId);
2648                                 }
2649                         }
2650                 }
2651
2652                 // Set dead the abort
2653                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2654                         Abort abort = i.next().getValue();
2655
2656                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2657                                 abort.setDead();
2658                                 i.remove();
2659
2660                                 if (abort.getTransactionArbitrator() == localMachineId) {
2661                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2662                                 }
2663                         }
2664                 }
2665
2666
2667
2668                 if (machineId == localMachineId) {
2669                         // Our own messages are immediately dead.
2670                         if (liveness instanceof LastMessage) {
2671                                 ((LastMessage)liveness).setDead();
2672                         } else if (liveness instanceof Slot) {
2673                                 ((Slot)liveness).setDead();
2674                         } else {
2675                                 throw new Error("Unrecognized type");
2676                         }
2677                 }
2678
2679                 // Get the old last message for this device
2680                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2681                 if (lastMessageEntry == null) {
2682                         // If no last message then there is nothing else to process
2683                         return;
2684                 }
2685
2686                 long lastMessageSeqNum = lastMessageEntry.getFirst();
2687                 Liveness lastEntry = lastMessageEntry.getSecond();
2688
2689                 // If it is not our machine Id since we already set ours to dead
2690                 if (machineId != localMachineId) {
2691                         if (lastEntry instanceof LastMessage) {
2692                                 ((LastMessage)lastEntry).setDead();
2693                         } else if (lastEntry instanceof Slot) {
2694                                 ((Slot)lastEntry).setDead();
2695                         } else {
2696                                 throw new Error("Unrecognized type");
2697                         }
2698                 }
2699
2700                 // Make sure the server is not playing any games
2701                 if (machineId == localMachineId) {
2702
2703                         if (hadPartialSendToServer) {
2704                                 // We were not making any updates and we had a machine mismatch
2705                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2706                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2707                                 }
2708
2709                         } else {
2710                                 // We were not making any updates and we had a machine mismatch
2711                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2712                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2713                                 }
2714                         }
2715                 } else {
2716                         if (lastMessageSeqNum > seqNum) {
2717                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2718                         }
2719                 }
2720         }
2721
2722         /**
2723          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2724          * rejected message entry and which have not.
2725          */
2726         private void addWatchList(long machineId, RejectedMessage entry) {
2727                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2728                 if (entries == null) {
2729                         // There is no set for this machine ID yet so create one
2730                         entries = new HashSet<RejectedMessage>();
2731                         rejectedMessageWatchListTable.put(machineId, entries);
2732                 }
2733                 entries.add(entry);
2734         }
2735
2736         /**
2737          * Check if the HMAC chain is not violated
2738          */
2739         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2740                 for (int i = 0; i < newSlots.length; i++) {
2741                         Slot currSlot = newSlots[i];
2742                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2743                         if (prevSlot != null &&
2744                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2745                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2746                 }
2747         }
2748 }