bug fix
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15
16 /**
17  * IoTTable data structure.  Provides client interface.
18  * @author Brian Demsky
19  * @version 1.0
20  */
21
22 final public class Table {
23
24         /* Constants */
25         static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
26         static final int SKIP_THRESHOLD = 10;
27         static final double RESIZE_MULTIPLE = 1.2;
28         static final double RESIZE_THRESHOLD = 0.75;
29         static final int REJECTED_THRESHOLD = 5;
30
31         /* Helper Objects */
32         private SlotBuffer buffer = null;
33         private CloudComm cloud = null;
34         private Random random = null;
35         private TableStatus liveTableStatus = null;
36         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
37         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
38         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
39
40         /* Variables */
41         private int numberOfSlots = 0;  // Number of slots stored in buffer
42         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
43         private long liveSlotCount = 0; // Number of currently live slots
44         private long oldestLiveSlotSequenceNumver = 1;  // Smallest sequence number of the slot with a live entry
45         private long localMachineId = 0; // Machine ID of this client device
46         private long sequenceNumber = 0; // Largest sequence number a client has received
47         private long localSequenceNumber = 0;
48
49         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
50         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
51         private long localTransactionSequenceNumber = 1; // Local sequence number counter for transactions
52         private long lastTransactionSequenceNumberSpeculatedOn = 0; // the last transaction that was speculated on
53         private long oldestTransactionSequenceNumberSpeculatedOn = 0; // the oldest transaction that was speculated on
54         private long localArbitrationSequenceNumber = 1;
55         private boolean hadPartialSendToServer = false;
56         private boolean attemptedToSendToServer = false;
57         private long expectedsize;
58         private boolean didFindTableStatus = false;
59         private long currMaxSize = 0;
60
61         private Slot lastSlotAttemptedToSend = null;
62         private boolean lastIsNewKey = false;
63         private int lastNewSize = 0;
64         private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
65         private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
66         private NewKey lastNewKey = null;
67
68
69         /* Data Structures  */
70         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
71         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
72         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
73         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
74         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
75         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
76         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
77         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
78         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
79         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
80         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
81         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
82         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
83         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
84         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
85         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
86         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
87         private List<Transaction> pendingTransactionQueue = null;
88         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
89         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
90         private Map<Transaction, List<Integer>> transactionPartsSent = null;
91         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
92         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
93         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
94         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
95         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
96         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
97
98
99         public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
100                 localMachineId = _localMachineId;
101                 cloud = new CloudComm(this, baseurl, password, listeningPort);
102
103                 init();
104         }
105
106         public Table(CloudComm _cloud, long _localMachineId) {
107                 localMachineId = _localMachineId;
108                 cloud = _cloud;
109
110                 init();
111         }
112
113         /**
114          * Init all the stuff needed for for table usage
115          */
116         private void init() {
117
118                 // Init helper objects
119                 random = new Random();
120                 buffer = new SlotBuffer();
121
122                 // Set Variables
123                 oldestLiveSlotSequenceNumver = 1;
124
125                 // init data structs
126                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
127                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
128                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
129                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
130                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
131                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
132                 arbitratorTable = new HashMap<IoTString, Long>();
133                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
134                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
135                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
136                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
137                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
138                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
139                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
140                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
141                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
142                 rejectedSlotList = new Vector<Long>();
143                 pendingTransactionQueue = new ArrayList<Transaction>();
144                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
145                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
146                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
147                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
148                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
149                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
150                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
151                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
152                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
153
154
155                 // Other init stuff
156                 numberOfSlots = buffer.capacity();
157                 setResizeThreshold();
158         }
159
160         // TODO: delete method
161         public synchronized void printSlots() {
162                 long o = buffer.getOldestSeqNum();
163                 long n = buffer.getNewestSeqNum();
164
165                 int[] types = new int[10];
166
167                 int num = 0;
168
169                 int livec = 0;
170                 int deadc = 0;
171
172                 int casdasd = 0;
173
174                 int liveslo = 0;
175
176                 for (long i = o; i < (n + 1); i++) {
177                         Slot s = buffer.getSlot(i);
178
179
180                         if (s.isLive()) {
181                                 liveslo++;
182                         }
183
184                         Vector<Entry> entries = s.getEntries();
185
186                         for (Entry e : entries) {
187                                 if (e.isLive()) {
188                                         int type = e.getType();
189
190
191                                         if (type == 6) {
192                                                 RejectedMessage rej = (RejectedMessage)e;
193                                                 casdasd++;
194
195                                                 System.out.println(rej.getMachineID());
196                                         }
197
198
199                                         types[type] = types[type] + 1;
200                                         num++;
201                                         livec++;
202                                 } else {
203                                         deadc++;
204                                 }
205                         }
206                 }
207
208                 for (int i = 0; i < 10; i++) {
209                         System.out.println(i + "    " + types[i]);
210                 }
211                 System.out.println("Live count:   " + livec);
212                 System.out.println("Live Slot count:   " + liveslo);
213
214                 System.out.println("Dead count:   " + deadc);
215                 System.out.println("Old:   " + o);
216                 System.out.println("New:   " + n);
217                 System.out.println("Size:   " + buffer.size());
218                 // System.out.println("Commits:   " + liveCommitsTable.size());
219                 System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
220                 System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
221
222                 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
223                         System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
224                 }
225
226
227                 for (Long a : liveCommitsTable.keySet()) {
228                         for (Long b : liveCommitsTable.get(a).keySet()) {
229                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
230                                         System.out.print(kv + " ");
231                                 }
232                                 System.out.print("|| ");
233                         }
234                         System.out.println();
235                 }
236
237         }
238
239         /**
240          * Initialize the table by inserting a table status as the first entry into the table status
241          * also initialize the crypto stuff.
242          */
243         public synchronized void initTable() throws ServerException {
244                 cloud.initSecurity();
245
246                 // Create the first insertion into the block chain which is the table status
247                 Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
248                 localSequenceNumber++;
249                 TableStatus status = new TableStatus(s, numberOfSlots);
250                 s.addEntry(status);
251                 Slot[] array = cloud.putSlot(s, numberOfSlots);
252
253                 if (array == null) {
254                         array = new Slot[] {s};
255                         // update local block chain
256                         validateAndUpdate(array, true);
257                 } else if (array.length == 1) {
258                         // in case we did push the slot BUT we failed to init it
259                         validateAndUpdate(array, true);
260                 } else {
261                         throw new Error("Error on initialization");
262                 }
263         }
264
265         /**
266          * Rebuild the table from scratch by pulling the latest block chain from the server.
267          */
268         public synchronized void rebuild() throws ServerException {
269                 // Just pull the latest slots from the server
270                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
271                 validateAndUpdate(newslots, true);
272                 sendToServer(null);
273                 updateLiveTransactionsAndStatus();
274
275         }
276
277         // public String toString() {
278         //      String retString = " Committed Table: \n";
279         //      retString += "---------------------------\n";
280         //      retString += commitedTable.toString();
281
282         //      retString += "\n\n";
283
284         //      retString += " Speculative Table: \n";
285         //      retString += "---------------------------\n";
286         //      retString += speculativeTable.toString();
287
288         //      return retString;
289         // }
290
291         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
292                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
293         }
294
295         public synchronized Long getArbitrator(IoTString key) {
296                 return arbitratorTable.get(key);
297         }
298
299         public synchronized void close() {
300                 cloud.close();
301         }
302         
303         // Return all keys in the table
304         public synchronized Set<IoTString> getKeys() {
305                 return committedKeyValueTable.keySet();
306         }
307
308         public synchronized IoTString getCommitted(IoTString key)  {
309                 KeyValue kv = committedKeyValueTable.get(key);
310
311                 if (kv != null) {
312                         return kv.getValue();
313                 } else {
314                         return null;
315                 }
316         }
317
318         public synchronized IoTString getSpeculative(IoTString key) {
319                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
320
321                 if (kv == null) {
322                         kv = speculatedKeyValueTable.get(key);
323                 }
324
325                 if (kv == null) {
326                         kv = committedKeyValueTable.get(key);
327                 }
328
329                 if (kv != null) {
330                         return kv.getValue();
331                 } else {
332                         return null;
333                 }
334         }
335
336         public synchronized IoTString getCommittedAtomic(IoTString key) {
337                 KeyValue kv = committedKeyValueTable.get(key);
338
339                 if (arbitratorTable.get(key) == null) {
340                         throw new Error("Key not Found.");
341                 }
342
343                 // Make sure new key value pair matches the current arbitrator
344                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
345                         // TODO: Maybe not throw en error
346                         throw new Error("Not all Key Values Match Arbitrator.");
347                 }
348
349                 if (kv != null) {
350                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
351                         return kv.getValue();
352                 } else {
353                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
354                         return null;
355                 }
356         }
357
358         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
359                 if (arbitratorTable.get(key) == null) {
360                         throw new Error("Key not Found.");
361                 }
362
363                 // Make sure new key value pair matches the current arbitrator
364                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
365                         // TODO: Maybe not throw en error
366                         throw new Error("Not all Key Values Match Arbitrator.");
367                 }
368
369                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
370
371                 if (kv == null) {
372                         kv = speculatedKeyValueTable.get(key);
373                 }
374
375                 if (kv == null) {
376                         kv = committedKeyValueTable.get(key);
377                 }
378
379                 if (kv != null) {
380                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
381                         return kv.getValue();
382                 } else {
383                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
384                         return null;
385                 }
386         }
387
388         public synchronized boolean update()  {
389                 try {
390                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
391                         validateAndUpdate(newSlots, false);
392                         sendToServer(null);
393
394
395                         updateLiveTransactionsAndStatus();
396
397                         return true;
398                 } catch (Exception e) {
399                         // e.printStackTrace();
400
401                         for (Long m : localCommunicationTable.keySet()) {
402                                 updateFromLocal(m);
403                         }
404                 }
405
406                 return false;
407         }
408
409         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
410                 while (true) {
411                         if (arbitratorTable.get(keyName) != null) {
412                                 // There is already an arbitrator
413                                 return false;
414                         }
415
416                         NewKey newKey = new NewKey(null, keyName, machineId);
417
418                         if (sendToServer(newKey)) {
419                                 // If successfully inserted
420                                 return true;
421                         }
422                 }
423         }
424
425         public synchronized void startTransaction() {
426                 // Create a new transaction, invalidates any old pending transactions.
427                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
428         }
429
430         public synchronized void addKV(IoTString key, IoTString value) {
431
432                 // Make sure it is a valid key
433                 if (arbitratorTable.get(key) == null) {
434                         throw new Error("Key not Found.");
435                 }
436
437                 // Make sure new key value pair matches the current arbitrator
438                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
439                         // TODO: Maybe not throw en error
440                         throw new Error("Not all Key Values Match Arbitrator.");
441                 }
442
443                 // Add the key value to this transaction
444                 KeyValue kv = new KeyValue(key, value);
445                 pendingTransactionBuilder.addKV(kv);
446         }
447
448         public synchronized TransactionStatus commitTransaction() {
449
450                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
451                         // transaction with no updates will have no effect on the system
452                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
453                 }
454
455                 // Set the local transaction sequence number and increment
456                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
457                 localTransactionSequenceNumber++;
458
459                 // Create the transaction status
460                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
461
462                 // Create the new transaction
463                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
464                 newTransaction.setTransactionStatus(transactionStatus);
465
466                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
467                         // Add it to the queue and invalidate the builder for safety
468                         pendingTransactionQueue.add(newTransaction);
469                 } else {
470                         arbitrateOnLocalTransaction(newTransaction);
471                         updateLiveStateFromLocal();
472                 }
473
474                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
475
476                 try {
477                         sendToServer(null);
478                 } catch (ServerException e) {
479
480                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
481                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
482                                 Transaction transaction = iter.next();
483
484                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
485                                         // Already contacted this client so ignore all attempts to contact this client
486                                         // to preserve ordering for arbitrator
487                                         continue;
488                                 }
489
490                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
491
492                                 if (sendReturn.getFirst()) {
493                                         // Failed to contact over local
494                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
495                                 } else {
496                                         // Successful contact or should not contact
497
498                                         if (sendReturn.getSecond()) {
499                                                 // did arbitrate
500                                                 iter.remove();
501                                         }
502                                 }
503                         }
504                 }
505
506                 updateLiveStateFromLocal();
507
508                 return transactionStatus;
509         }
510
511         /**
512          * Get the machine ID for this client
513          */
514         public long getMachineId() {
515                 return localMachineId;
516         }
517
518         /**
519          * Decrement the number of live slots that we currently have
520          */
521         public void decrementLiveCount() {
522                 liveSlotCount--;
523         }
524
525         /**
526          * Recalculate the new resize threshold
527          */
528         private void setResizeThreshold() {
529                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
530                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
531         }
532
533         public long getLocalSequenceNumber() {
534                 return localSequenceNumber;
535         }
536
537
538         boolean lastInsertedNewKey = false;
539
540         private boolean sendToServer(NewKey newKey) throws ServerException {
541
542                 boolean fromRetry = false;
543
544                 try {
545                         if (hadPartialSendToServer) {
546                                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
547                                 if (newSlots.length == 0) {
548                                         fromRetry = true;
549                                         ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
550
551                                         if (sendSlotsReturn.getFirst()) {
552                                                 if (newKey != null) {
553                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
554                                                                 newKey = null;
555                                                         }
556                                                 }
557
558                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
559                                                         transaction.resetServerFailure();
560
561                                                         // Update which transactions parts still need to be sent
562                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
563
564                                                         // Add the transaction status to the outstanding list
565                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
566
567                                                         // Update the transaction status
568                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
569
570                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
571                                                         if (transaction.didSendAllParts()) {
572                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
573                                                                 pendingTransactionQueue.remove(transaction);
574                                                         }
575                                                 }
576                                         } else {
577
578                                                 newSlots = sendSlotsReturn.getThird();
579
580                                                 boolean isInserted = false;
581                                                 for (Slot s : newSlots) {
582                                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
583                                                                 isInserted = true;
584                                                                 break;
585                                                         }
586                                                 }
587
588                                                 for (Slot s : newSlots) {
589                                                         if (isInserted) {
590                                                                 break;
591                                                         }
592
593                                                         // Process each entry in the slot
594                                                         for (Entry entry : s.getEntries()) {
595
596                                                                 if (entry.getType() == Entry.TypeLastMessage) {
597                                                                         LastMessage lastMessage = (LastMessage)entry;
598                                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
599                                                                                 isInserted = true;
600                                                                                 break;
601                                                                         }
602                                                                 }
603                                                         }
604                                                 }
605
606                                                 if (isInserted) {
607                                                         if (newKey != null) {
608                                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
609                                                                         newKey = null;
610                                                                 }
611                                                         }
612
613                                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
614                                                                 transaction.resetServerFailure();
615
616                                                                 // Update which transactions parts still need to be sent
617                                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
618
619                                                                 // Add the transaction status to the outstanding list
620                                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
621
622                                                                 // Update the transaction status
623                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
624
625                                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
626                                                                 if (transaction.didSendAllParts()) {
627                                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
628                                                                         pendingTransactionQueue.remove(transaction);
629                                                                 } else {
630                                                                         transaction.resetServerFailure();
631                                                                         // Set the transaction sequence number back to nothing
632                                                                         if (!transaction.didSendAPartToServer()) {
633                                                                                 transaction.setSequenceNumber(-1);
634                                                                         }
635                                                                 }
636                                                         }
637                                                 }
638                                         }
639
640                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
641                                                 transaction.resetServerFailure();
642                                                 // Set the transaction sequence number back to nothing
643                                                 if (!transaction.didSendAPartToServer()) {
644                                                         transaction.setSequenceNumber(-1);
645                                                 }
646                                         }
647
648                                         if (sendSlotsReturn.getThird().length != 0) {
649                                                 // insert into the local block chain
650                                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
651                                         }
652                                         // continue;
653                                 } else {
654                                         boolean isInserted = false;
655                                         for (Slot s : newSlots) {
656                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
657                                                         isInserted = true;
658                                                         break;
659                                                 }
660                                         }
661
662                                         for (Slot s : newSlots) {
663                                                 if (isInserted) {
664                                                         break;
665                                                 }
666
667                                                 // Process each entry in the slot
668                                                 for (Entry entry : s.getEntries()) {
669
670                                                         if (entry.getType() == Entry.TypeLastMessage) {
671                                                                 LastMessage lastMessage = (LastMessage)entry;
672                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
673                                                                         isInserted = true;
674                                                                         break;
675                                                                 }
676                                                         }
677                                                 }
678                                         }
679
680                                         if (isInserted) {
681                                                 if (newKey != null) {
682                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
683                                                                 newKey = null;
684                                                         }
685                                                 }
686
687                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
688                                                         transaction.resetServerFailure();
689
690                                                         // Update which transactions parts still need to be sent
691                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
692
693                                                         // Add the transaction status to the outstanding list
694                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
695
696                                                         // Update the transaction status
697                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
698
699                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
700                                                         if (transaction.didSendAllParts()) {
701                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
702                                                                 pendingTransactionQueue.remove(transaction);
703                                                         } else {
704                                                                 transaction.resetServerFailure();
705                                                                 // Set the transaction sequence number back to nothing
706                                                                 if (!transaction.didSendAPartToServer()) {
707                                                                         transaction.setSequenceNumber(-1);
708                                                                 }
709                                                         }
710                                                 }
711                                         } else {
712                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
713                                                         transaction.resetServerFailure();
714                                                         // Set the transaction sequence number back to nothing
715                                                         if (!transaction.didSendAPartToServer()) {
716                                                                 transaction.setSequenceNumber(-1);
717                                                         }
718                                                 }
719                                         }
720
721                                         // insert into the local block chain
722                                         validateAndUpdate(newSlots, true);
723                                 }
724                         }
725                 } catch (ServerException e) {
726                         throw e;
727                 }
728
729
730
731                 try {
732                         // While we have stuff that needs inserting into the block chain
733                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
734
735                                 fromRetry = false;
736
737                                 if (hadPartialSendToServer) {
738                                         throw new Error("Should Be error free");
739                                 }
740
741
742
743                                 // If there is a new key with same name then end
744                                 if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
745                                         return false;
746                                 }
747
748                                 // Create the slot
749                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
750                                 localSequenceNumber++;
751
752                                 // Try to fill the slot with data
753                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
754                                 boolean needsResize = fillSlotsReturn.getFirst();
755                                 int newSize = fillSlotsReturn.getSecond();
756                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
757
758                                 if (needsResize) {
759                                         // Reset which transaction to send
760                                         for (Transaction transaction : transactionPartsSent.keySet()) {
761                                                 transaction.resetNextPartToSend();
762
763                                                 // Set the transaction sequence number back to nothing
764                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
765                                                         transaction.setSequenceNumber(-1);
766                                                 }
767                                         }
768
769                                         // Clear the sent data since we are trying again
770                                         pendingSendArbitrationEntriesToDelete.clear();
771                                         transactionPartsSent.clear();
772
773                                         // We needed a resize so try again
774                                         fillSlot(slot, true, newKey);
775                                 }
776
777                                 lastSlotAttemptedToSend = slot;
778                                 lastIsNewKey = (newKey != null);
779                                 lastInsertedNewKey = insertedNewKey;
780                                 lastNewSize = newSize;
781                                 lastNewKey = newKey;
782                                 lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
783                                 lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
784
785
786                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
787
788                                 if (sendSlotsReturn.getFirst()) {
789
790                                         // Did insert into the block chain
791
792                                         if (insertedNewKey) {
793                                                 // This slot was what was inserted not a previous slot
794
795                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
796                                                 newKey = null;
797                                         }
798
799                                         // Remove the aborts and commit parts that were sent from the pending to send queue
800                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
801                                                 ArbitrationRound round = iter.next();
802                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
803
804                                                 if (round.isDoneSending()) {
805                                                         // Sent all the parts
806                                                         iter.remove();
807                                                 }
808                                         }
809
810                                         for (Transaction transaction : transactionPartsSent.keySet()) {
811                                                 transaction.resetServerFailure();
812
813                                                 // Update which transactions parts still need to be sent
814                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
815
816                                                 // Add the transaction status to the outstanding list
817                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
818
819                                                 // Update the transaction status
820                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
821
822                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
823                                                 if (transaction.didSendAllParts()) {
824                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
825                                                         pendingTransactionQueue.remove(transaction);
826                                                 }
827                                         }
828                                 } else {
829
830                                         // if (!sendSlotsReturn.getSecond()) {
831                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
832                                         //              transaction.resetServerFailure();
833                                         //      }
834                                         // } else {
835                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
836                                         //              transaction.resetServerFailure();
837
838                                         //              // Update which transactions parts still need to be sent
839                                         //              transaction.removeSentParts(transactionPartsSent.get(transaction));
840
841                                         //              // Add the transaction status to the outstanding list
842                                         //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
843
844                                         //              // Update the transaction status
845                                         //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
846
847                                         //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
848                                         //              if (transaction.didSendAllParts()) {
849                                         //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
850                                         //                      pendingTransactionQueue.remove(transaction);
851
852                                         //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
853                                         //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
854                                         //                      }
855                                         //              }
856                                         //      }
857                                         // }
858
859                                         // Reset which transaction to send
860                                         for (Transaction transaction : transactionPartsSent.keySet()) {
861                                                 transaction.resetNextPartToSend();
862                                                 // transaction.resetNextPartToSend();
863
864                                                 // Set the transaction sequence number back to nothing
865                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
866                                                         transaction.setSequenceNumber(-1);
867                                                 }
868                                         }
869                                 }
870
871                                 // Clear the sent data in preparation for next send
872                                 pendingSendArbitrationEntriesToDelete.clear();
873                                 transactionPartsSent.clear();
874
875                                 if (sendSlotsReturn.getThird().length != 0) {
876                                         // insert into the local block chain
877                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
878                                 }
879                         }
880
881                 } catch (ServerException e) {
882
883                         if (e.getType() != ServerException.TypeInputTimeout) {
884                                 // e.printStackTrace();
885
886                                 // Nothing was able to be sent to the server so just clear these data structures
887                                 for (Transaction transaction : transactionPartsSent.keySet()) {
888                                         transaction.resetNextPartToSend();
889
890                                         // Set the transaction sequence number back to nothing
891                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
892                                                 transaction.setSequenceNumber(-1);
893                                         }
894                                 }
895                         } else {
896                                 // There was a partial send to the server
897                                 hadPartialSendToServer = true;
898
899
900                                 // if (!fromRetry) {
901                                 //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
902                                 //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
903                                 // }
904
905                                 // Nothing was able to be sent to the server so just clear these data structures
906                                 for (Transaction transaction : transactionPartsSent.keySet()) {
907                                         transaction.resetNextPartToSend();
908                                         transaction.setServerFailure();
909                                 }
910                         }
911
912                         pendingSendArbitrationEntriesToDelete.clear();
913                         transactionPartsSent.clear();
914
915                         throw e;
916                 }
917
918                 return newKey == null;
919         }
920
921         private synchronized boolean updateFromLocal(long machineId) {
922                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
923                 if (localCommunicationInformation == null) {
924                         // Cant talk to that device locally so do nothing
925                         return false;
926                 }
927
928                 // Get the size of the send data
929                 int sendDataSize = Integer.BYTES + Long.BYTES;
930
931                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
932                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
933                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
934                 }
935
936                 byte[] sendData = new byte[sendDataSize];
937                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
938
939                 // Encode the data
940                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
941                 bbEncode.putInt(0);
942
943                 // Send by local
944                 byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
945                 localSequenceNumber++;
946
947                 if (returnData == null) {
948                         // Could not contact server
949                         return false;
950                 }
951
952                 // Decode the data
953                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
954                 int numberOfEntries = bbDecode.getInt();
955
956                 for (int i = 0; i < numberOfEntries; i++) {
957                         byte type = bbDecode.get();
958                         if (type == Entry.TypeAbort) {
959                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
960                                 processEntry(abort);
961                         } else if (type == Entry.TypeCommitPart) {
962                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
963                                 processEntry(commitPart);
964                         }
965                 }
966
967                 updateLiveStateFromLocal();
968
969                 return true;
970         }
971
972         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
973
974                 // Get the devices local communications
975                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
976
977                 if (localCommunicationInformation == null) {
978                         // Cant talk to that device locally so do nothing
979                         return new Pair<Boolean, Boolean>(true, false);
980                 }
981
982                 // Get the size of the send data
983                 int sendDataSize = Integer.BYTES + Long.BYTES;
984                 for (TransactionPart part : transaction.getParts().values()) {
985                         sendDataSize += part.getSize();
986                 }
987
988                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
989                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
990                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
991                 }
992
993                 // Make the send data size
994                 byte[] sendData = new byte[sendDataSize];
995                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
996
997                 // Encode the data
998                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
999                 bbEncode.putInt(transaction.getParts().size());
1000                 for (TransactionPart part : transaction.getParts().values()) {
1001                         part.encode(bbEncode);
1002                 }
1003
1004
1005                 // Send by local
1006                 byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
1007                 localSequenceNumber++;
1008
1009                 if (returnData == null) {
1010                         // Could not contact server
1011                         return new Pair<Boolean, Boolean>(true, false);
1012                 }
1013
1014                 // Decode the data
1015                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
1016                 boolean didCommit = bbDecode.get() == 1;
1017                 boolean couldArbitrate = bbDecode.get() == 1;
1018                 int numberOfEntries = bbDecode.getInt();
1019                 boolean foundAbort = false;
1020
1021                 for (int i = 0; i < numberOfEntries; i++) {
1022                         byte type = bbDecode.get();
1023                         if (type == Entry.TypeAbort) {
1024                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
1025
1026                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
1027                                         foundAbort = true;
1028                                 }
1029
1030                                 processEntry(abort);
1031                         } else if (type == Entry.TypeCommitPart) {
1032                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
1033                                 processEntry(commitPart);
1034                         }
1035                 }
1036
1037                 updateLiveStateFromLocal();
1038
1039                 if (couldArbitrate) {
1040                         TransactionStatus status =  transaction.getTransactionStatus();
1041                         if (didCommit) {
1042                                 status.setStatus(TransactionStatus.StatusCommitted);
1043                         } else {
1044                                 status.setStatus(TransactionStatus.StatusAborted);
1045                         }
1046                 } else {
1047                         TransactionStatus status =  transaction.getTransactionStatus();
1048                         if (foundAbort) {
1049                                 status.setStatus(TransactionStatus.StatusAborted);
1050                         } else {
1051                                 status.setStatus(TransactionStatus.StatusCommitted);
1052                         }
1053                 }
1054
1055                 return new Pair<Boolean, Boolean>(false, true);
1056         }
1057
1058         public synchronized byte[] acceptDataFromLocal(byte[] data) {
1059
1060                 // Decode the data
1061                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1062                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1063                 int numberOfParts = bbDecode.getInt();
1064
1065                 // If we did commit a transaction or not
1066                 boolean didCommit = false;
1067                 boolean couldArbitrate = false;
1068
1069                 if (numberOfParts != 0) {
1070
1071                         // decode the transaction
1072                         Transaction transaction = new Transaction();
1073                         for (int i = 0; i < numberOfParts; i++) {
1074                                 bbDecode.get();
1075                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
1076                                 transaction.addPartDecode(newPart);
1077                         }
1078
1079                         // Arbitrate on transaction and pull relevant return data
1080                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1081                         couldArbitrate = localArbitrateReturn.getFirst();
1082                         didCommit = localArbitrateReturn.getSecond();
1083
1084                         updateLiveStateFromLocal();
1085
1086                         // Transaction was sent to the server so keep track of it to prevent double commit
1087                         if (transaction.getSequenceNumber() != -1) {
1088                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1089                         }
1090                 }
1091
1092                 // The data to send back
1093                 int returnDataSize = 0;
1094                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
1095
1096                 // Get the aborts to send back
1097                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
1098                 Collections.sort(abortLocalSequenceNumbers);
1099                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1100                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1101                                 continue;
1102                         }
1103
1104                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1105                         unseenArbitrations.add(abort);
1106                         returnDataSize += abort.getSize();
1107                 }
1108
1109                 // Get the commits to send back
1110                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1111                 if (commitForClientTable != null) {
1112                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1113                         Collections.sort(commitLocalSequenceNumbers);
1114
1115                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1116                                 Commit commit = commitForClientTable.get(localSequenceNumber);
1117
1118                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1119                                         continue;
1120                                 }
1121
1122                                 unseenArbitrations.addAll(commit.getParts().values());
1123
1124                                 for (CommitPart commitPart : commit.getParts().values()) {
1125                                         returnDataSize += commitPart.getSize();
1126                                 }
1127                         }
1128                 }
1129
1130                 // Number of arbitration entries to decode
1131                 returnDataSize += 2 * Integer.BYTES;
1132
1133                 // Boolean of did commit or not
1134                 if (numberOfParts != 0) {
1135                         returnDataSize += Byte.BYTES;
1136                 }
1137
1138                 // Data to send Back
1139                 byte[] returnData = new byte[returnDataSize];
1140                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1141
1142                 if (numberOfParts != 0) {
1143                         if (didCommit) {
1144                                 bbEncode.put((byte)1);
1145                         } else {
1146                                 bbEncode.put((byte)0);
1147                         }
1148                         if (couldArbitrate) {
1149                                 bbEncode.put((byte)1);
1150                         } else {
1151                                 bbEncode.put((byte)0);
1152                         }
1153                 }
1154
1155                 bbEncode.putInt(unseenArbitrations.size());
1156                 for (Entry entry : unseenArbitrations) {
1157                         entry.encode(bbEncode);
1158                 }
1159
1160
1161                 localSequenceNumber++;
1162                 return returnData;
1163         }
1164
1165         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
1166
1167                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
1168                 attemptedToSendToServer = true;
1169
1170                 boolean inserted = false;
1171                 boolean lastTryInserted = false;
1172
1173                 Slot[] array = cloud.putSlot(slot, newSize);
1174                 if (array == null) {
1175                         array = new Slot[] {slot};
1176                         rejectedSlotList.clear();
1177                         inserted = true;
1178                 }       else {
1179                         if (array.length == 0) {
1180                                 throw new Error("Server Error: Did not send any slots");
1181                         }
1182
1183                         // if (attemptedToSendToServerTmp) {
1184                         if (hadPartialSendToServer) {
1185
1186                                 boolean isInserted = false;
1187                                 for (Slot s : array) {
1188                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1189                                                 isInserted = true;
1190                                                 break;
1191                                         }
1192                                 }
1193
1194                                 for (Slot s : array) {
1195                                         if (isInserted) {
1196                                                 break;
1197                                         }
1198
1199                                         // Process each entry in the slot
1200                                         for (Entry entry : s.getEntries()) {
1201
1202                                                 if (entry.getType() == Entry.TypeLastMessage) {
1203                                                         LastMessage lastMessage = (LastMessage)entry;
1204
1205                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1206                                                                 isInserted = true;
1207                                                                 break;
1208                                                         }
1209                                                 }
1210                                         }
1211                                 }
1212
1213                                 if (!isInserted) {
1214                                         rejectedSlotList.add(slot.getSequenceNumber());
1215                                         lastTryInserted = false;
1216                                 } else {
1217                                         lastTryInserted = true;
1218                                 }
1219                         } else {
1220                                 rejectedSlotList.add(slot.getSequenceNumber());
1221                                 lastTryInserted = false;
1222                         }
1223                 }
1224
1225                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
1226         }
1227
1228         /**
1229          * Returns false if a resize was needed
1230          */
1231         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
1232
1233
1234                 int newSize = 0;
1235                 if (liveSlotCount > bufferResizeThreshold) {
1236                         resize = true; //Resize is forced
1237
1238                 }
1239
1240                 if (resize) {
1241                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1242                         TableStatus status = new TableStatus(slot, newSize);
1243                         slot.addEntry(status);
1244                 }
1245
1246                 // Fill with rejected slots first before doing anything else
1247                 doRejectedMessages(slot);
1248
1249                 // Do mandatory rescue of entries
1250                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1251
1252                 // Extract working variables
1253                 boolean needsResize = mandatoryRescueReturn.getFirst();
1254                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
1255                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1256
1257                 if (needsResize && !resize) {
1258                         // We need to resize but we are not resizing so return false
1259                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
1260                 }
1261
1262                 boolean inserted = false;
1263                 if (newKeyEntry != null) {
1264                         newKeyEntry.setSlot(slot);
1265                         if (slot.hasSpace(newKeyEntry)) {
1266
1267                                 slot.addEntry(newKeyEntry);
1268                                 inserted = true;
1269                         }
1270                 }
1271
1272                 // Clear the transactions, aborts and commits that were sent previously
1273                 transactionPartsSent.clear();
1274                 pendingSendArbitrationEntriesToDelete.clear();
1275
1276                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1277                         boolean isFull = false;
1278                         round.generateParts();
1279                         List<Entry> parts = round.getParts();
1280
1281                         // Insert pending arbitration data
1282                         for (Entry arbitrationData : parts) {
1283
1284                                 // If it is an abort then we need to set some information
1285                                 if (arbitrationData instanceof Abort) {
1286                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1287                                 }
1288
1289                                 if (!slot.hasSpace(arbitrationData)) {
1290                                         // No space so cant do anything else with these data entries
1291                                         isFull = true;
1292                                         break;
1293                                 }
1294
1295                                 // Add to this current slot and add it to entries to delete
1296                                 slot.addEntry(arbitrationData);
1297                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1298                         }
1299
1300                         if (isFull) {
1301                                 break;
1302                         }
1303                 }
1304
1305                 if (pendingTransactionQueue.size() > 0) {
1306
1307                         Transaction transaction = pendingTransactionQueue.get(0);
1308
1309                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1310                         // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1311                         //      transaction.setSequenceNumber(slot.getSequenceNumber());
1312                         // }
1313
1314                         if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1315                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1316                         }
1317
1318
1319                         while (true) {
1320                                 TransactionPart part = transaction.getNextPartToSend();
1321
1322                                 if (part == null) {
1323                                         // Ran out of parts to send for this transaction so move on
1324                                         break;
1325                                 }
1326
1327                                 if (slot.hasSpace(part)) {
1328                                         slot.addEntry(part);
1329                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1330                                         if (partsSent == null) {
1331                                                 partsSent = new ArrayList<Integer>();
1332                                                 transactionPartsSent.put(transaction, partsSent);
1333                                         }
1334                                         partsSent.add(part.getPartNumber());
1335                                         transactionPartsSent.put(transaction, partsSent);
1336                                 } else {
1337                                         break;
1338                                 }
1339                         }
1340                 }
1341
1342                 // Fill the remainder of the slot with rescue data
1343                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1344
1345                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1346         }
1347
1348         private void doRejectedMessages(Slot s) {
1349                 if (! rejectedSlotList.isEmpty()) {
1350                         /* TODO: We should avoid generating a rejected message entry if
1351                          * there is already a sufficient entry in the queue (e.g.,
1352                          * equalsto value of true and same sequence number).  */
1353
1354                         long old_seqn = rejectedSlotList.firstElement();
1355                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1356                                 long new_seqn = rejectedSlotList.lastElement();
1357                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1358                                 s.addEntry(rm);
1359                         } else {
1360                                 long prev_seqn = -1;
1361                                 int i = 0;
1362                                 /* Go through list of missing messages */
1363                                 for (; i < rejectedSlotList.size(); i++) {
1364                                         long curr_seqn = rejectedSlotList.get(i);
1365                                         Slot s_msg = buffer.getSlot(curr_seqn);
1366                                         if (s_msg != null)
1367                                                 break;
1368                                         prev_seqn = curr_seqn;
1369                                 }
1370                                 /* Generate rejected message entry for missing messages */
1371                                 if (prev_seqn != -1) {
1372                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1373                                         s.addEntry(rm);
1374                                 }
1375                                 /* Generate rejected message entries for present messages */
1376                                 for (; i < rejectedSlotList.size(); i++) {
1377                                         long curr_seqn = rejectedSlotList.get(i);
1378                                         Slot s_msg = buffer.getSlot(curr_seqn);
1379                                         long machineid = s_msg.getMachineID();
1380                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1381                                         s.addEntry(rm);
1382                                 }
1383                         }
1384                 }
1385         }
1386
1387         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1388                 long newestSequenceNumber = buffer.getNewestSeqNum();
1389                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1390                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1391                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1392                 }
1393
1394                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1395                 boolean seenLiveSlot = false;
1396                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1397                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1398
1399
1400                 // Mandatory Rescue
1401                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1402                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1403                         // Push slot number forward
1404                         if (! seenLiveSlot) {
1405                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1406                         }
1407
1408                         if (!previousSlot.isLive()) {
1409                                 continue;
1410                         }
1411
1412                         // We have seen a live slot
1413                         seenLiveSlot = true;
1414
1415                         // Get all the live entries for a slot
1416                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1417
1418                         // Iterate over all the live entries and try to rescue them
1419                         for (Entry liveEntry : liveEntries) {
1420                                 if (slot.hasSpace(liveEntry)) {
1421
1422                                         // Enough space to rescue the entry
1423                                         slot.addEntry(liveEntry);
1424                                 } else if (currentSequenceNumber == firstIfFull) {
1425                                         //if there's no space but the entry is about to fall off the queue
1426                                         System.out.println("B"); //?
1427                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1428
1429                                 }
1430                         }
1431                 }
1432
1433                 // Did not resize
1434                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1435         }
1436
1437         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1438                 /* now go through live entries from least to greatest sequence number until
1439                  * either all live slots added, or the slot doesn't have enough room
1440                  * for SKIP_THRESHOLD consecutive entries*/
1441                 int skipcount = 0;
1442                 long newestseqnum = buffer.getNewestSeqNum();
1443                 search:
1444                 for (; seqn <= newestseqnum; seqn++) {
1445                         Slot prevslot = buffer.getSlot(seqn);
1446                         //Push slot number forward
1447                         if (!seenliveslot)
1448                                 oldestLiveSlotSequenceNumver = seqn;
1449
1450                         if (!prevslot.isLive())
1451                                 continue;
1452                         seenliveslot = true;
1453                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1454                         for (Entry liveentry : liveentries) {
1455                                 if (s.hasSpace(liveentry))
1456                                         s.addEntry(liveentry);
1457                                 else {
1458                                         skipcount++;
1459                                         if (skipcount > SKIP_THRESHOLD)
1460                                                 break search;
1461                                 }
1462                         }
1463                 }
1464         }
1465
1466         /**
1467          * Checks for malicious activity and updates the local copy of the block chain.
1468          */
1469         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1470
1471                 // The cloud communication layer has checked slot HMACs already before decoding
1472                 if (newSlots.length == 0) {
1473                         return;
1474                 }
1475
1476                 // Make sure all slots are newer than the last largest slot this client has seen
1477                 long firstSeqNum = newSlots[0].getSequenceNumber();
1478                 if (firstSeqNum <= sequenceNumber) {
1479                         throw new Error("Server Error: Sent older slots!");
1480                 }
1481
1482                 // Create an object that can access both new slots and slots in our local chain
1483                 // without committing slots to our local chain
1484                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1485
1486                 // Check that the HMAC chain is not broken
1487                 checkHMACChain(indexer, newSlots);
1488
1489                 // Set to keep track of messages from clients
1490                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1491
1492                 // Process each slots data
1493                 for (Slot slot : newSlots) {
1494                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1495
1496                         updateExpectedSize();
1497                 }
1498
1499                 // If there is a gap, check to see if the server sent us everything.
1500                 if (firstSeqNum != (sequenceNumber + 1)) {
1501
1502                         // Check the size of the slots that were sent down by the server.
1503                         // Can only check the size if there was a gap
1504                         checkNumSlots(newSlots.length);
1505
1506                         // Since there was a gap every machine must have pushed a slot or must have
1507                         // a last message message.  If not then the server is hiding slots
1508                         if (!machineSet.isEmpty()) {
1509                                 throw new Error("Missing record for machines: " + machineSet);
1510                         }
1511                 }
1512
1513                 // Update the size of our local block chain.
1514                 commitNewMaxSize();
1515
1516                 // Commit new to slots to the local block chain.
1517                 for (Slot slot : newSlots) {
1518
1519                         // Insert this slot into our local block chain copy.
1520                         buffer.putSlot(slot);
1521
1522                         // Keep track of how many slots are currently live (have live data in them).
1523                         liveSlotCount++;
1524                 }
1525
1526                 // Get the sequence number of the latest slot in the system
1527                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1528
1529                 updateLiveStateFromServer();
1530
1531                 // No Need to remember after we pulled from the server
1532                 offlineTransactionsCommittedAndAtServer.clear();
1533
1534                 // This is invalidated now
1535                 hadPartialSendToServer = false;
1536         }
1537
1538         private void updateLiveStateFromServer() {
1539                 // Process the new transaction parts
1540                 processNewTransactionParts();
1541
1542                 // Do arbitration on new transactions that were received
1543                 arbitrateFromServer();
1544
1545                 // Update all the committed keys
1546                 boolean didCommitOrSpeculate = updateCommittedTable();
1547
1548                 // Delete the transactions that are now dead
1549                 updateLiveTransactionsAndStatus();
1550
1551                 // Do speculations
1552                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1553                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1554         }
1555
1556         private void updateLiveStateFromLocal() {
1557                 // Update all the committed keys
1558                 boolean didCommitOrSpeculate = updateCommittedTable();
1559
1560                 // Delete the transactions that are now dead
1561                 updateLiveTransactionsAndStatus();
1562
1563                 // Do speculations
1564                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1565                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1566         }
1567
1568         private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
1569                 // if (didFindTableStatus) {
1570                 // return;
1571                 // }
1572                 long prevslots = firstSequenceNumber;
1573
1574
1575                 if (didFindTableStatus) {
1576                         // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
1577                         // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1578
1579                 } else {
1580                         expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1581                         // System.out.println("Here: " + expectedsize);
1582                 }
1583
1584                 // System.out.println(numberOfSlots);
1585
1586                 didFindTableStatus = true;
1587                 currMaxSize = numberOfSlots;
1588         }
1589
1590         private void updateExpectedSize() {
1591                 expectedsize++;
1592
1593                 if (expectedsize > currMaxSize) {
1594                         expectedsize = currMaxSize;
1595                 }
1596         }
1597
1598
1599         /**
1600          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1601          * This is only called when we have a gap between the slots that we have locally and the slots
1602          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1603          * status message
1604          */
1605         private void checkNumSlots(int numberOfSlots) {
1606                 if (numberOfSlots != expectedsize) {
1607                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1608                 }
1609         }
1610
1611         private void updateCurrMaxSize(int newmaxsize) {
1612                 currMaxSize = newmaxsize;
1613         }
1614
1615
1616         /**
1617          * Update the size of of the local buffer if it is needed.
1618          */
1619         private void commitNewMaxSize() {
1620                 didFindTableStatus = false;
1621
1622                 // Resize the local slot buffer
1623                 if (numberOfSlots != currMaxSize) {
1624                         buffer.resize((int)currMaxSize);
1625                 }
1626
1627                 // Change the number of local slots to the new size
1628                 numberOfSlots = (int)currMaxSize;
1629
1630
1631                 // Recalculate the resize threshold since the size of the local buffer has changed
1632                 setResizeThreshold();
1633         }
1634
1635         /**
1636          * Process the new transaction parts from this latest round of slots received from the server
1637          */
1638         private void processNewTransactionParts() {
1639
1640                 if (newTransactionParts.size() == 0) {
1641                         // Nothing new to process
1642                         return;
1643                 }
1644
1645                 // Iterate through all the machine Ids that we received new parts for
1646                 for (Long machineId : newTransactionParts.keySet()) {
1647                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1648
1649                         // Iterate through all the parts for that machine Id
1650                         for (Pair<Long, Integer> partId : parts.keySet()) {
1651                                 TransactionPart part = parts.get(partId);
1652
1653                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1654                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1655                                         // Set dead the transaction part
1656                                         part.setDead();
1657                                         continue;
1658                                 }
1659
1660                                 // Get the transaction object for that sequence number
1661                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1662
1663                                 if (transaction == null) {
1664                                         // This is a new transaction that we dont have so make a new one
1665                                         transaction = new Transaction();
1666
1667                                         // Insert this new transaction into the live tables
1668                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1669                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1670                                 }
1671
1672                                 // Add that part to the transaction
1673                                 transaction.addPartDecode(part);
1674                         }
1675                 }
1676
1677                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1678                 newTransactionParts.clear();
1679         }
1680
1681
1682         private long lastSeqNumArbOn = 0;
1683
1684         private void arbitrateFromServer() {
1685
1686                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1687                         // Nothing to arbitrate on so move on
1688                         return;
1689                 }
1690
1691                 // Get the transaction sequence numbers and sort from oldest to newest
1692                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1693                 Collections.sort(transactionSequenceNumbers);
1694
1695                 // Collection of key value pairs that are
1696                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1697
1698                 // The last transaction arbitrated on
1699                 long lastTransactionCommitted = -1;
1700                 Set<Abort> generatedAborts = new HashSet<Abort>();
1701
1702                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1703                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1704
1705
1706
1707                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1708                         if (transaction.getArbitrator() != localMachineId) {
1709                                 continue;
1710                         }
1711
1712                         if (transactionSequenceNumber < lastSeqNumArbOn) {
1713                                 continue;
1714                         }
1715
1716                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1717                                 // We have seen this already locally so dont commit again
1718                                 continue;
1719                         }
1720
1721
1722                         if (!transaction.isComplete()) {
1723                                 // Will arbitrate in incorrect order if we continue so just break
1724                                 // Most likely this
1725                                 break;
1726                         }
1727
1728
1729                         // update the largest transaction seen by arbitrator from server
1730                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1731                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1732                         } else {
1733                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1734                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1735                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1736                                 }
1737                         }
1738
1739                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1740                                 // Guard evaluated as true
1741
1742                                 // Update the local changes so we can make the commit
1743                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1744                                         speculativeTableTmp.put(kv.getKey(), kv);
1745                                 }
1746
1747                                 // Update what the last transaction committed was for use in batch commit
1748                                 lastTransactionCommitted = transactionSequenceNumber;
1749                         } else {
1750                                 // Guard evaluated was false so create abort
1751
1752                                 // Create the abort
1753                                 Abort newAbort = new Abort(null,
1754                                                            transaction.getClientLocalSequenceNumber(),
1755                                                            transaction.getSequenceNumber(),
1756                                                            transaction.getMachineId(),
1757                                                            transaction.getArbitrator(),
1758                                                            localArbitrationSequenceNumber);
1759                                 localArbitrationSequenceNumber++;
1760
1761                                 generatedAborts.add(newAbort);
1762
1763                                 // Insert the abort so we can process
1764                                 processEntry(newAbort);
1765                         }
1766
1767                         lastSeqNumArbOn = transactionSequenceNumber;
1768
1769                         // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1770                 }
1771
1772                 Commit newCommit = null;
1773
1774                 // If there is something to commit
1775                 if (speculativeTableTmp.size() != 0) {
1776
1777                         // Create the commit and increment the commit sequence number
1778                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1779                         localArbitrationSequenceNumber++;
1780
1781                         // Add all the new keys to the commit
1782                         for (KeyValue kv : speculativeTableTmp.values()) {
1783                                 newCommit.addKV(kv);
1784                         }
1785
1786                         // create the commit parts
1787                         newCommit.createCommitParts();
1788
1789                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1790
1791                         // Insert the commit so we can process it
1792                         for (CommitPart commitPart : newCommit.getParts().values()) {
1793                                 processEntry(commitPart);
1794                         }
1795                 }
1796
1797                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1798                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1799                         pendingSendArbitrationRounds.add(arbitrationRound);
1800
1801                         if (compactArbitrationData()) {
1802                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1803                                 if (newArbitrationRound.getCommit() != null) {
1804                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1805                                                 processEntry(commitPart);
1806                                         }
1807                                 }
1808                         }
1809                 }
1810         }
1811
1812         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1813
1814                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1815                 if (transaction.getArbitrator() != localMachineId) {
1816                         return new Pair<Boolean, Boolean>(false, false);
1817                 }
1818
1819                 if (!transaction.isComplete()) {
1820                         // Will arbitrate in incorrect order if we continue so just break
1821                         // Most likely this
1822                         return new Pair<Boolean, Boolean>(false, false);
1823                 }
1824
1825                 if (transaction.getMachineId() != localMachineId) {
1826                         // dont do this check for local transactions
1827                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1828                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1829                                         // We've have already seen this from the server
1830                                         return new Pair<Boolean, Boolean>(false, false);
1831                                 }
1832                         }
1833                 }
1834
1835                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1836                         // Guard evaluated as true
1837
1838                         // Create the commit and increment the commit sequence number
1839                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1840                         localArbitrationSequenceNumber++;
1841
1842                         // Update the local changes so we can make the commit
1843                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1844                                 newCommit.addKV(kv);
1845                         }
1846
1847                         // create the commit parts
1848                         newCommit.createCommitParts();
1849
1850                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1851                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1852                         pendingSendArbitrationRounds.add(arbitrationRound);
1853
1854                         if (compactArbitrationData()) {
1855                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1856                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1857                                         processEntry(commitPart);
1858                                 }
1859                         } else {
1860                                 // Insert the commit so we can process it
1861                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1862                                         processEntry(commitPart);
1863                                 }
1864                         }
1865
1866                         if (transaction.getMachineId() == localMachineId) {
1867                                 TransactionStatus status = transaction.getTransactionStatus();
1868                                 if (status != null) {
1869                                         status.setStatus(TransactionStatus.StatusCommitted);
1870                                 }
1871                         }
1872
1873                         updateLiveStateFromLocal();
1874                         return new Pair<Boolean, Boolean>(true, true);
1875                 } else {
1876
1877                         if (transaction.getMachineId() == localMachineId) {
1878                                 // For locally created messages update the status
1879
1880                                 // Guard evaluated was false so create abort
1881                                 TransactionStatus status = transaction.getTransactionStatus();
1882                                 if (status != null) {
1883                                         status.setStatus(TransactionStatus.StatusAborted);
1884                                 }
1885                         } else {
1886                                 Set addAbortSet = new HashSet<Abort>();
1887
1888
1889                                 // Create the abort
1890                                 Abort newAbort = new Abort(null,
1891                                                            transaction.getClientLocalSequenceNumber(),
1892                                                            -1,
1893                                                            transaction.getMachineId(),
1894                                                            transaction.getArbitrator(),
1895                                                            localArbitrationSequenceNumber);
1896                                 localArbitrationSequenceNumber++;
1897
1898                                 addAbortSet.add(newAbort);
1899
1900
1901                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1902                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1903                                 pendingSendArbitrationRounds.add(arbitrationRound);
1904
1905                                 if (compactArbitrationData()) {
1906                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1907                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1908                                                 processEntry(commitPart);
1909                                         }
1910                                 }
1911                         }
1912
1913                         updateLiveStateFromLocal();
1914                         return new Pair<Boolean, Boolean>(true, false);
1915                 }
1916         }
1917
1918         /**
1919          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1920          */
1921         private boolean compactArbitrationData() {
1922
1923                 if (pendingSendArbitrationRounds.size() < 2) {
1924                         // Nothing to compact so do nothing
1925                         return false;
1926                 }
1927
1928                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1929                 if (lastRound.didSendPart()) {
1930                         return false;
1931                 }
1932
1933                 boolean hadCommit = (lastRound.getCommit() == null);
1934                 boolean gotNewCommit = false;
1935
1936                 int numberToDelete = 1;
1937                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1938                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1939
1940                         if (round.isFull() || round.didSendPart()) {
1941                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1942                                 break;
1943                         }
1944
1945                         if (round.getCommit() == null) {
1946
1947                                 // Try compacting aborts only
1948                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1949                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1950                                         // Cant compact since it would be too large
1951                                         break;
1952                                 }
1953                                 lastRound.addAborts(round.getAborts());
1954                         } else {
1955
1956                                 // Create a new larger commit
1957                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1958                                 localArbitrationSequenceNumber++;
1959
1960                                 // Create the commit parts so that we can count them
1961                                 newCommit.createCommitParts();
1962
1963                                 // Calculate the new size of the parts
1964                                 int newSize = newCommit.getNumberOfParts();
1965                                 newSize += lastRound.getAbortsCount();
1966                                 newSize += round.getAbortsCount();
1967
1968                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1969                                         // Cant compact since it would be too large
1970                                         break;
1971                                 }
1972
1973                                 // Set the new compacted part
1974                                 lastRound.setCommit(newCommit);
1975                                 lastRound.addAborts(round.getAborts());
1976                                 gotNewCommit = true;
1977                         }
1978
1979                         numberToDelete++;
1980                 }
1981
1982                 if (numberToDelete != 1) {
1983                         // If there is a compaction
1984
1985                         // Delete the previous pieces that are now in the new compacted piece
1986                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1987                                 pendingSendArbitrationRounds.clear();
1988                         } else {
1989                                 for (int i = 0; i < numberToDelete; i++) {
1990                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1991                                 }
1992                         }
1993
1994                         // Add the new compacted into the pending to send list
1995                         pendingSendArbitrationRounds.add(lastRound);
1996
1997                         // Should reinsert into the commit processor
1998                         if (hadCommit && gotNewCommit) {
1999                                 return true;
2000                         }
2001                 }
2002
2003                 return false;
2004         }
2005         // private boolean compactArbitrationData() {
2006         //      return false;
2007         // }
2008
2009         /**
2010          * Update all the commits and the committed tables, sets dead the dead transactions
2011          */
2012         private boolean updateCommittedTable() {
2013
2014                 if (newCommitParts.size() == 0) {
2015                         // Nothing new to process
2016                         return false;
2017                 }
2018
2019                 // Iterate through all the machine Ids that we received new parts for
2020                 for (Long machineId : newCommitParts.keySet()) {
2021                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
2022
2023                         // Iterate through all the parts for that machine Id
2024                         for (Pair<Long, Integer> partId : parts.keySet()) {
2025                                 CommitPart part = parts.get(partId);
2026
2027                                 // Get the transaction object for that sequence number
2028                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
2029
2030                                 if (commitForClientTable == null) {
2031                                         // This is the first commit from this device
2032                                         commitForClientTable = new HashMap<Long, Commit>();
2033                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
2034                                 }
2035
2036                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
2037
2038                                 if (commit == null) {
2039                                         // This is a new commit that we dont have so make a new one
2040                                         commit = new Commit();
2041
2042                                         // Insert this new commit into the live tables
2043                                         commitForClientTable.put(part.getSequenceNumber(), commit);
2044                                 }
2045
2046                                 // Add that part to the commit
2047                                 commit.addPartDecode(part);
2048                         }
2049                 }
2050
2051                 // Clear all the new commits parts in preparation for the next time the server sends slots
2052                 newCommitParts.clear();
2053
2054                 // If we process a new commit keep track of it for future use
2055                 boolean didProcessANewCommit = false;
2056
2057                 // Process the commits one by one
2058                 for (Long arbitratorId : liveCommitsTable.keySet()) {
2059
2060                         // Get all the commits for a specific arbitrator
2061                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2062
2063                         // Sort the commits in order
2064                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
2065                         Collections.sort(commitSequenceNumbers);
2066
2067                         // Get the last commit seen from this arbitrator
2068                         long lastCommitSeenSequenceNumber = -1;
2069                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
2070                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2071                         }
2072
2073                         // Go through each new commit one by one
2074                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2075                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2076                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
2077
2078                                 // Special processing if a commit is not complete
2079                                 if (!commit.isComplete()) {
2080                                         if (i == (commitSequenceNumbers.size() - 1)) {
2081                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2082                                                 break;
2083                                         } else {
2084                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2085                                                 // Delete it and move on
2086                                                 commit.setDead();
2087                                                 commitForClientTable.remove(commit.getSequenceNumber());
2088                                                 continue;
2089                                         }
2090                                 }
2091
2092                                 // Update the last transaction that was updated if we can
2093                                 if (commit.getTransactionSequenceNumber() != -1) {
2094                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2095
2096                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2097                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2098                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2099                                         }
2100                                 }
2101
2102                                 // Update the last arbitration data that we have seen so far
2103                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
2104
2105                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2106                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2107                                                 // Is larger
2108                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2109                                         }
2110                                 } else {
2111                                         // Never seen any data from this arbitrator so record the first one
2112                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2113                                 }
2114
2115                                 // We have already seen this commit before so need to do the full processing on this commit
2116                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2117
2118                                         // Update the last transaction that was updated if we can
2119                                         if (commit.getTransactionSequenceNumber() != -1) {
2120                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2121
2122                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2123                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2124                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2125                                                 }
2126                                         }
2127
2128                                         continue;
2129                                 }
2130
2131                                 // If we got here then this is a brand new commit and needs full processing
2132
2133                                 // Get what commits should be edited, these are the commits that have live values for their keys
2134                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
2135                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2136                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2137                                 }
2138                                 commitsToEdit.remove(null); // remove null since it could be in this set
2139
2140                                 // Update each previous commit that needs to be updated
2141                                 for (Commit previousCommit : commitsToEdit) {
2142
2143                                         // Only bother with live commits (TODO: Maybe remove this check)
2144                                         if (previousCommit.isLive()) {
2145
2146                                                 // Update which keys in the old commits are still live
2147                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2148                                                         previousCommit.invalidateKey(kv.getKey());
2149                                                 }
2150
2151                                                 // if the commit is now dead then remove it
2152                                                 if (!previousCommit.isLive()) {
2153                                                         commitForClientTable.remove(previousCommit);
2154                                                 }
2155                                         }
2156                                 }
2157
2158                                 // Update the last seen sequence number from this arbitrator
2159                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
2160                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2161                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2162                                         }
2163                                 } else {
2164                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2165                                 }
2166
2167                                 // We processed a new commit that we havent seen before
2168                                 didProcessANewCommit = true;
2169
2170                                 // Update the committed table of keys and which commit is using which key
2171                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2172                                         committedKeyValueTable.put(kv.getKey(), kv);
2173                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
2174                                 }
2175                         }
2176                 }
2177
2178                 return didProcessANewCommit;
2179         }
2180
2181         /**
2182          * Create the speculative table from transactions that are still live and have come from the cloud
2183          */
2184         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
2185                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2186                         // There is nothing to speculate on
2187                         return false;
2188                 }
2189
2190                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2191                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
2192                 Collections.sort(transactionSequenceNumbersSorted);
2193
2194                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2195
2196
2197                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2198                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2199                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2200
2201                         // Start from scratch
2202                         speculatedKeyValueTable.clear();
2203                         lastTransactionSequenceNumberSpeculatedOn = -1;
2204                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2205
2206                 }
2207
2208                 // Remember the front of the transaction list
2209                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2210
2211                 // Find where to start arbitration from
2212                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2213
2214                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2215                         // Make sure we are not out of bounds
2216                         return false; // did not speculate
2217                 }
2218
2219                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2220                 boolean didSkip = true;
2221
2222                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2223                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2224                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2225
2226                         if (!transaction.isComplete()) {
2227                                 // If there is an incomplete transaction then there is nothing we can do
2228                                 // add this transactions arbitrator to the list of arbitrators we should ignore
2229                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2230                                 didSkip = true;
2231                                 continue;
2232                         }
2233
2234                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2235                                 continue;
2236                         }
2237
2238                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2239
2240                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
2241                                 // Guard evaluated to true so update the speculative table
2242                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2243                                         speculatedKeyValueTable.put(kv.getKey(), kv);
2244                                 }
2245                         }
2246                 }
2247
2248                 if (didSkip) {
2249                         // Since there was a skip we need to redo the speculation next time around
2250                         lastTransactionSequenceNumberSpeculatedOn = -1;
2251                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2252                 }
2253
2254                 // We did some speculation
2255                 return true;
2256         }
2257
2258         /**
2259          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2260          */
2261         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
2262                 if (pendingTransactionQueue.size() == 0) {
2263                         // There is nothing to speculate on
2264                         return;
2265                 }
2266
2267
2268                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2269                         // need to reset on the pending speculation
2270                         lastPendingTransactionSpeculatedOn = null;
2271                         firstPendingTransaction = pendingTransactionQueue.get(0);
2272                         pendingTransactionSpeculatedKeyValueTable.clear();
2273                 }
2274
2275                 // Find where to start arbitration from
2276                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2277
2278                 if (startIndex >= pendingTransactionQueue.size()) {
2279                         // Make sure we are not out of bounds
2280                         return;
2281                 }
2282
2283                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2284                         Transaction transaction = pendingTransactionQueue.get(i);
2285
2286                         lastPendingTransactionSpeculatedOn = transaction;
2287
2288                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2289                                 // Guard evaluated to true so update the speculative table
2290                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2291                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2292                                 }
2293                         }
2294                 }
2295         }
2296
2297         /**
2298          * Set dead and remove from the live transaction tables the transactions that are dead
2299          */
2300         private void updateLiveTransactionsAndStatus() {
2301
2302                 // Go through each of the transactions
2303                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2304                         Transaction transaction = iter.next().getValue();
2305
2306                         // Check if the transaction is dead
2307                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2308                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2309
2310                                 // Set dead the transaction
2311                                 transaction.setDead();
2312
2313                                 // Remove the transaction from the live table
2314                                 iter.remove();
2315                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2316                         }
2317                 }
2318
2319                 // Go through each of the transactions
2320                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2321                         TransactionStatus status = iter.next().getValue();
2322
2323                         // Check if the transaction is dead
2324                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2325                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2326
2327                                 // Set committed
2328                                 status.setStatus(TransactionStatus.StatusCommitted);
2329
2330                                 // Remove
2331                                 iter.remove();
2332                         }
2333                 }
2334         }
2335
2336         /**
2337          * Process this slot, entry by entry.  Also update the latest message sent by slot
2338          */
2339         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2340
2341                 // Update the last message seen
2342                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2343
2344                 // Process each entry in the slot
2345                 for (Entry entry : slot.getEntries()) {
2346                         switch (entry.getType()) {
2347
2348                         case Entry.TypeCommitPart:
2349                                 processEntry((CommitPart)entry);
2350                                 break;
2351
2352                         case Entry.TypeAbort:
2353                                 processEntry((Abort)entry);
2354                                 break;
2355
2356                         case Entry.TypeTransactionPart:
2357                                 processEntry((TransactionPart)entry);
2358                                 break;
2359
2360                         case Entry.TypeNewKey:
2361                                 processEntry((NewKey)entry);
2362                                 break;
2363
2364                         case Entry.TypeLastMessage:
2365                                 processEntry((LastMessage)entry, machineSet);
2366                                 break;
2367
2368                         case Entry.TypeRejectedMessage:
2369                                 processEntry((RejectedMessage)entry, indexer);
2370                                 break;
2371
2372                         case Entry.TypeTableStatus:
2373                                 processEntry((TableStatus)entry, slot.getSequenceNumber());
2374                                 break;
2375
2376                         default:
2377                                 throw new Error("Unrecognized type: " + entry.getType());
2378                         }
2379                 }
2380         }
2381
2382         /**
2383          * Update the last message that was sent for a machine Id
2384          */
2385         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2386                 // Update what the last message received by a machine was
2387                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2388         }
2389
2390         /**
2391          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2392          */
2393         private void processEntry(NewKey entry) {
2394
2395                 // Update the arbitrator table with the new key information
2396                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2397
2398                 // Update what the latest live new key is
2399                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2400                 if (oldNewKey != null) {
2401                         // Delete the old new key messages
2402                         oldNewKey.setDead();
2403                 }
2404         }
2405
2406         /**
2407          * Process new table status entries and set dead the old ones as new ones come in.
2408          * keeps track of the largest and smallest table status seen in this current round
2409          * of updating the local copy of the block chain
2410          */
2411         private void processEntry(TableStatus entry, long seq) {
2412                 int newNumSlots = entry.getMaxSlots();
2413                 updateCurrMaxSize(newNumSlots);
2414
2415                 initExpectedSize(seq, newNumSlots);
2416
2417                 if (liveTableStatus != null) {
2418                         // We have a larger table status so the old table status is no longer alive
2419                         liveTableStatus.setDead();
2420                 }
2421
2422                 // Make this new table status the latest alive table status
2423                 liveTableStatus = entry;
2424         }
2425
2426         /**
2427          * Check old messages to see if there is a block chain violation. Also
2428          */
2429         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2430                 long oldSeqNum = entry.getOldSeqNum();
2431                 long newSeqNum = entry.getNewSeqNum();
2432                 boolean isequal = entry.getEqual();
2433                 long machineId = entry.getMachineID();
2434                 long seq = entry.getSequenceNumber();
2435
2436
2437                 // Check if we have messages that were supposed to be rejected in our local block chain
2438                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2439
2440                         // Get the slot
2441                         Slot slot = indexer.getSlot(seqNum);
2442
2443                         if (slot != null) {
2444                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2445
2446                                 long slotMachineId = slot.getMachineID();
2447                                 if (isequal != (slotMachineId == machineId)) {
2448                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2449                                 }
2450                         }
2451                 }
2452
2453
2454                 // Create a list of clients to watch until they see this rejected message entry.
2455                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2456                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2457
2458                         // Machine ID for the last message entry
2459                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2460
2461                         // We've seen it, don't need to continue to watch.  Our next
2462                         // message will implicitly acknowledge it.
2463                         if (lastMessageEntryMachineId == localMachineId) {
2464                                 continue;
2465                         }
2466
2467                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2468                         long entrySequenceNumber = lastMessageValue.getFirst();
2469
2470                         if (entrySequenceNumber < seq) {
2471
2472                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2473                                 addWatchList(lastMessageEntryMachineId, entry);
2474
2475                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2476                                 deviceWatchSet.add(lastMessageEntryMachineId);
2477                         }
2478                 }
2479
2480                 if (deviceWatchSet.isEmpty()) {
2481                         // This rejected message has been seen by all the clients so
2482                         entry.setDead();
2483                 } else {
2484                         // We need to watch this rejected message
2485                         entry.setWatchSet(deviceWatchSet);
2486                 }
2487         }
2488
2489         /**
2490          * Check if this abort is live, if not then save it so we can kill it later.
2491          * update the last transaction number that was arbitrated on.
2492          */
2493         private void processEntry(Abort entry) {
2494
2495
2496                 if (entry.getTransactionSequenceNumber() != -1) {
2497                         // update the transaction status if it was sent to the server
2498                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2499                         if (status != null) {
2500                                 status.setStatus(TransactionStatus.StatusAborted);
2501                         }
2502                 }
2503
2504                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2505                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2506                 if (previouslySeenAbort != null) {
2507                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2508                 }
2509
2510                 if (entry.getTransactionArbitrator() == localMachineId) {
2511                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2512                 }
2513
2514                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2515
2516                         // The machine already saw this so it is dead
2517                         entry.setDead();
2518                         liveAbortTable.remove(entry.getAbortId());
2519
2520                         if (entry.getTransactionArbitrator() == localMachineId) {
2521                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2522                         }
2523
2524                         return;
2525                 }
2526
2527
2528
2529
2530                 // Update the last arbitration data that we have seen so far
2531                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2532
2533                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2534                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2535                                 // Is larger
2536                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2537                         }
2538                 } else {
2539                         // Never seen any data from this arbitrator so record the first one
2540                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2541                 }
2542
2543
2544                 // Set dead a transaction if we can
2545                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2546                 if (transactionToSetDead != null) {
2547                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2548                 }
2549
2550                 // Update the last transaction sequence number that the arbitrator arbitrated on
2551                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2552                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2553
2554                         // Is a valid one
2555                         if (entry.getTransactionSequenceNumber() != -1) {
2556                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2557                         }
2558                 }
2559         }
2560
2561         /**
2562          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2563          */
2564         private void processEntry(TransactionPart entry) {
2565                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2566                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2567                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2568                         // This transaction is dead, it was already committed or aborted
2569                         entry.setDead();
2570                         return;
2571                 }
2572
2573                 // This part is still alive
2574                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2575
2576                 if (transactionPart == null) {
2577                         // Dont have a table for this machine Id yet so make one
2578                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2579                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2580                 }
2581
2582                 // Update the part and set dead ones we have already seen (got a rescued version)
2583                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2584                 if (previouslySeenPart != null) {
2585                         previouslySeenPart.setDead();
2586                 }
2587         }
2588
2589         /**
2590          * Process new commit entries and save them for future use.  Delete duplicates
2591          */
2592         private void processEntry(CommitPart entry) {
2593
2594
2595                 // Update the last transaction that was updated if we can
2596                 if (entry.getTransactionSequenceNumber() != -1) {
2597                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2598
2599                         // Update the last transaction sequence number that the arbitrator arbitrated on
2600                         if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2601                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2602                         }
2603                 }
2604
2605
2606
2607
2608                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2609
2610                 if (commitPart == null) {
2611                         // Don't have a table for this machine Id yet so make one
2612                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2613                         newCommitParts.put(entry.getMachineId(), commitPart);
2614                 }
2615
2616                 // Update the part and set dead ones we have already seen (got a rescued version)
2617                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2618                 if (previouslySeenPart != null) {
2619                         previouslySeenPart.setDead();
2620                 }
2621         }
2622
2623         /**
2624          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2625          * Updates the live aborts, removes those that are dead and sets them dead.
2626          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2627          * other clients have not had a rollback on the last message.
2628          */
2629         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2630
2631                 // We have seen this machine ID
2632                 machineSet.remove(machineId);
2633
2634                 // Get the set of rejected messages that this machine Id is has not seen yet
2635                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2636
2637                 // If there is a rejected message that this machine Id has not seen yet
2638                 if (watchset != null) {
2639
2640                         // Go through each rejected message that this machine Id has not seen yet