9629ddb6573217d1b47dbdcca2a7af97cc13731e
[iotcloud.git] / version2 / src / Control / app / src / main / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15 import android.content.*;
16
17 /**
18  * IoTTable data structure.  Provides client interface.
19  * @author Brian Demsky
20  * @version 1.0
21  */
22
23 final public class Table {
24
25         /* Constants */
26         static final int FREE_SLOTS = 10; // Number of slots that should be kept free
27         static final int SKIP_THRESHOLD = 10;
28         static final double RESIZE_MULTIPLE = 1.2;
29         static final double RESIZE_THRESHOLD = 0.75;
30         static final int REJECTED_THRESHOLD = 5;
31
32         /* Helper Objects */
33         private SlotBuffer buffer = null;
34         private CloudComm cloud = null;
35         private Random random = null;
36         private TableStatus liveTableStatus = null;
37         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
38         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
39         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
40
41         /* Variables */
42         private int numberOfSlots = 0;  // Number of slots stored in buffer
43         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
44         private long liveSlotCount = 0; // Number of currently live slots
45         private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
46         private long localMachineId = 0; // Machine ID of this client device
47         private long sequenceNumber = 0; // Largest sequence number a client has received
48         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
49         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
50         private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
51         private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
52         private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
53         private long localArbitrationSequenceNumber = 0;
54         private boolean hadPartialSendToServer = false;
55         private boolean attemptedToSendToServer = false;
56         private long expectedsize;
57         private boolean didFindTableStatus = false;
58         private long currMaxSize = 0;
59
60         private Slot lastSlotAttemptedToSend = null;
61         private boolean lastIsNewKey = false;
62         private int lastNewSize = 0;
63         private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
64         private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
65         private NewKey lastNewKey = null;
66
67
68         /* Data Structures  */
69         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
70         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
71         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
72         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
73         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
74         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
75         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
76         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
77         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
78         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
79         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
80         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
81         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
82         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
83         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
84         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
85         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
86         private List<Transaction> pendingTransactionQueue = null;
87         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
88         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
89         private Map<Transaction, List<Integer>> transactionPartsSent = null;
90         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
91         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
92         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
93         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
94         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
95         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
96
97
98         public Table(String baseurl, String password, long _localMachineId, int listeningPort, Context context) {
99                 localMachineId = _localMachineId;
100                 cloud = new CloudComm(this, baseurl, password, listeningPort, context);
101
102                 init();
103         }
104
105         public Table(CloudComm _cloud, long _localMachineId) {
106                 localMachineId = _localMachineId;
107                 cloud = _cloud;
108
109                 init();
110         }
111
112         /**
113          * Init all the stuff needed for for table usage
114          */
115         private void init() {
116
117                 // Init helper objects
118                 random = new Random();
119                 buffer = new SlotBuffer();
120
121                 // Set Variables
122                 oldestLiveSlotSequenceNumver = 1;
123
124                 // init data structs
125                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
126                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
127                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
128                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
129                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
130                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
131                 arbitratorTable = new HashMap<IoTString, Long>();
132                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
133                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
134                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
135                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
136                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
137                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
138                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
139                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
140                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
141                 rejectedSlotList = new Vector<Long>();
142                 pendingTransactionQueue = new ArrayList<Transaction>();
143                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
144                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
145                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
146                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
147                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
148                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
149                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
150                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
151                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
152
153
154                 // Other init stuff
155                 numberOfSlots = buffer.capacity();
156                 setResizeThreshold();
157         }
158
159         // TODO: delete method
160         public synchronized void printSlots() {
161                 long o = buffer.getOldestSeqNum();
162                 long n = buffer.getNewestSeqNum();
163
164                 int[] types = new int[10];
165
166                 int num = 0;
167
168                 int livec = 0;
169                 int deadc = 0;
170                 for (long i = o; i < (n + 1); i++) {
171                         Slot s = buffer.getSlot(i);
172
173                         Vector<Entry> entries = s.getEntries();
174
175                         for (Entry e : entries) {
176                                 if (e.isLive()) {
177                                         int type = e.getType();
178                                         types[type] = types[type] + 1;
179                                         num++;
180                                         livec++;
181                                 } else {
182                                         deadc++;
183                                 }
184                         }
185                 }
186
187                 for (int i = 0; i < 10; i++) {
188                         System.out.println(i + "    " + types[i]);
189                 }
190                 System.out.println("Live count:   " + livec);
191                 System.out.println("Dead count:   " + deadc);
192                 System.out.println("Old:   " + o);
193                 System.out.println("New:   " + n);
194                 System.out.println("Size:   " + buffer.size());
195                 // System.out.println("Commits:   " + liveCommitsTable.size());
196                 System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
197                 System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
198
199                 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
200                         System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
201                 }
202
203
204                 for (Long a : liveCommitsTable.keySet()) {
205                         for (Long b : liveCommitsTable.get(a).keySet()) {
206                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
207                                         System.out.print(kv + " ");
208                                 }
209                                 System.out.print("|| ");
210                         }
211                         System.out.println();
212                 }
213
214         }
215
216         /**
217          * Initialize the table by inserting a table status as the first entry into the table status
218          * also initialize the crypto stuff.
219          */
220         public synchronized void initTable() throws ServerException {
221                 cloud.initSecurity();
222
223                 // Create the first insertion into the block chain which is the table status
224                 Slot s = new Slot(this, 1, localMachineId);
225                 TableStatus status = new TableStatus(s, numberOfSlots);
226                 s.addEntry(status);
227                 Slot[] array = cloud.putSlot(s, numberOfSlots);
228
229                 if (array == null) {
230                         array = new Slot[] {s};
231                         // update local block chain
232                         validateAndUpdate(array, true);
233                 } else if (array.length == 1) {
234                         // in case we did push the slot BUT we failed to init it
235                         validateAndUpdate(array, true);
236                 } else {
237                         throw new Error("Error on initialization");
238                 }
239         }
240
241         /**
242          * Rebuild the table from scratch by pulling the latest block chain from the server.
243          */
244         public synchronized void rebuild() throws ServerException {
245                 // Just pull the latest slots from the server
246                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
247                 validateAndUpdate(newslots, true);
248         }
249
250         // public String toString() {
251         //      String retString = " Committed Table: \n";
252         //      retString += "---------------------------\n";
253         //      retString += commitedTable.toString();
254
255         //      retString += "\n\n";
256
257         //      retString += " Speculative Table: \n";
258         //      retString += "---------------------------\n";
259         //      retString += speculativeTable.toString();
260
261         //      return retString;
262         // }
263
264         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
265                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
266         }
267
268         public synchronized Long getArbitrator(IoTString key) {
269                 return arbitratorTable.get(key);
270         }
271
272         public synchronized void close() {
273                 cloud.close();
274         }
275
276         public synchronized IoTString getCommitted(IoTString key)  {
277                 KeyValue kv = committedKeyValueTable.get(key);
278
279                 if (kv != null) {
280                         return kv.getValue();
281                 } else {
282                         return null;
283                 }
284         }
285
286         public synchronized IoTString getSpeculative(IoTString key) {
287                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
288
289                 if (kv == null) {
290                         kv = speculatedKeyValueTable.get(key);
291                 }
292
293                 if (kv == null) {
294                         kv = committedKeyValueTable.get(key);
295                 }
296
297                 if (kv != null) {
298                         return kv.getValue();
299                 } else {
300                         return null;
301                 }
302         }
303
304         public synchronized IoTString getCommittedAtomic(IoTString key) {
305                 KeyValue kv = committedKeyValueTable.get(key);
306
307                 if (arbitratorTable.get(key) == null) {
308                         throw new Error("Key not Found.");
309                 }
310
311                 // Make sure new key value pair matches the current arbitrator
312                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
313                         // TODO: Maybe not throw en error
314                         throw new Error("Not all Key Values Match Arbitrator.");
315                 }
316
317                 if (kv != null) {
318                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
319                         return kv.getValue();
320                 } else {
321                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
322                         return null;
323                 }
324         }
325
326         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
327                 if (arbitratorTable.get(key) == null) {
328                         throw new Error("Key not Found.");
329                 }
330
331                 // Make sure new key value pair matches the current arbitrator
332                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
333                         // TODO: Maybe not throw en error
334                         throw new Error("Not all Key Values Match Arbitrator.");
335                 }
336
337                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
338
339                 if (kv == null) {
340                         kv = speculatedKeyValueTable.get(key);
341                 }
342
343                 if (kv == null) {
344                         kv = committedKeyValueTable.get(key);
345                 }
346
347                 if (kv != null) {
348                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
349                         return kv.getValue();
350                 } else {
351                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
352                         return null;
353                 }
354         }
355
356         public synchronized boolean update()  {
357                 try {
358                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
359                         validateAndUpdate(newSlots, false);
360                         sendToServer(null);
361
362
363                         updateLiveTransactionsAndStatus();
364
365                         return true;
366                 } catch (Exception e) {
367                         // e.printStackTrace();
368
369                         for (Long m : localCommunicationTable.keySet()) {
370                                 updateFromLocal(m);
371                         }
372                 }
373
374                 return false;
375         }
376
377         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
378                 while (true) {
379                         if (arbitratorTable.get(keyName) != null) {
380                                 // There is already an arbitrator
381                                 return false;
382                         }
383
384                         NewKey newKey = new NewKey(null, keyName, machineId);
385                         if (sendToServer(newKey)) {
386                                 // If successfully inserted
387                                 return true;
388                         }
389                 }
390         }
391
392         public synchronized void startTransaction() {
393                 // Create a new transaction, invalidates any old pending transactions.
394                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
395         }
396
397         public synchronized void addKV(IoTString key, IoTString value) {
398
399                 // Make sure it is a valid key
400                 if (arbitratorTable.get(key) == null) {
401                         throw new Error("Key not Found.");
402                 }
403
404                 // Make sure new key value pair matches the current arbitrator
405                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
406                         // TODO: Maybe not throw en error
407                         throw new Error("Not all Key Values Match Arbitrator.");
408                 }
409
410                 // Add the key value to this transaction
411                 KeyValue kv = new KeyValue(key, value);
412                 pendingTransactionBuilder.addKV(kv);
413         }
414
415         public synchronized TransactionStatus commitTransaction() {
416
417                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
418                         // transaction with no updates will have no effect on the system
419                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
420                 }
421
422                 // Set the local transaction sequence number and increment
423                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
424                 localTransactionSequenceNumber++;
425
426                 // Create the transaction status
427                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
428
429                 // Create the new transaction
430                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
431                 newTransaction.setTransactionStatus(transactionStatus);
432
433                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
434                         // Add it to the queue and invalidate the builder for safety
435                         pendingTransactionQueue.add(newTransaction);
436                 } else {
437                         arbitrateOnLocalTransaction(newTransaction);
438                         updateLiveStateFromLocal();
439                 }
440
441                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
442
443                 try {
444                         sendToServer(null);
445                 } catch (ServerException e) {
446
447                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
448                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
449                                 Transaction transaction = iter.next();
450
451                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
452                                         // Already contacted this client so ignore all attempts to contact this client
453                                         // to preserve ordering for arbitrator
454                                         continue;
455                                 }
456
457                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
458
459                                 if (sendReturn.getFirst()) {
460                                         // Failed to contact over local
461                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
462                                 } else {
463                                         // Successful contact or should not contact
464
465                                         if (sendReturn.getSecond()) {
466                                                 // did arbitrate
467                                                 iter.remove();
468                                         }
469                                 }
470                         }
471                 }
472
473                 updateLiveStateFromLocal();
474
475                 return transactionStatus;
476         }
477
478         /**
479          * Get the machine ID for this client
480          */
481         public long getMachineId() {
482                 return localMachineId;
483         }
484
485         /**
486          * Decrement the number of live slots that we currently have
487          */
488         public void decrementLiveCount() {
489                 liveSlotCount--;
490         }
491
492         /**
493          * Recalculate the new resize threshold
494          */
495         private void setResizeThreshold() {
496                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
497                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
498         }
499
500
501         boolean lastInsertedNewKey = false;
502
503         private boolean sendToServer(NewKey newKey) throws ServerException {
504
505                 boolean fromRetry = false;
506
507                 try {
508                         if (hadPartialSendToServer) {
509                                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
510                                 if (newSlots.length == 0) {
511                                         fromRetry = true;
512                                         ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
513
514                                         if (sendSlotsReturn.getFirst()) {
515                                                 if (newKey != null) {
516                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
517                                                                 newKey = null;
518                                                         }
519                                                 }
520
521                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
522                                                         transaction.resetServerFailure();
523
524                                                         // Update which transactions parts still need to be sent
525                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
526
527                                                         // Add the transaction status to the outstanding list
528                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
529
530                                                         // Update the transaction status
531                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
532
533                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
534                                                         if (transaction.didSendAllParts()) {
535                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
536                                                                 pendingTransactionQueue.remove(transaction);
537                                                         }
538                                                 }
539                                         } else {
540
541                                                 newSlots = sendSlotsReturn.getThird();
542
543                                                 boolean isInserted = false;
544                                                 for (Slot s : newSlots) {
545                                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
546                                                                 isInserted = true;
547                                                                 break;
548                                                         }
549                                                 }
550
551                                                 for (Slot s : newSlots) {
552                                                         if (isInserted) {
553                                                                 break;
554                                                         }
555
556                                                         // Process each entry in the slot
557                                                         for (Entry entry : s.getEntries()) {
558
559                                                                 if (entry.getType() == Entry.TypeLastMessage) {
560                                                                         LastMessage lastMessage = (LastMessage)entry;
561                                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
562                                                                                 isInserted = true;
563                                                                                 break;
564                                                                         }
565                                                                 }
566                                                         }
567                                                 }
568
569                                                 if (isInserted) {
570                                                         if (newKey != null) {
571                                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
572                                                                         newKey = null;
573                                                                 }
574                                                         }
575
576                                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
577                                                                 transaction.resetServerFailure();
578
579                                                                 // Update which transactions parts still need to be sent
580                                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
581
582                                                                 // Add the transaction status to the outstanding list
583                                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
584
585                                                                 // Update the transaction status
586                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
587
588                                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
589                                                                 if (transaction.didSendAllParts()) {
590                                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
591                                                                         pendingTransactionQueue.remove(transaction);
592                                                                 } else {
593                                                                         transaction.resetServerFailure();
594                                                                         // Set the transaction sequence number back to nothing
595                                                                         if (!transaction.didSendAPartToServer()) {
596                                                                                 transaction.setSequenceNumber(-1);
597                                                                         }
598                                                                 }
599                                                         }
600                                                 }
601                                         }
602
603                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
604                                                 transaction.resetServerFailure();
605                                                 // Set the transaction sequence number back to nothing
606                                                 if (!transaction.didSendAPartToServer()) {
607                                                         transaction.setSequenceNumber(-1);
608                                                 }
609                                         }
610
611                                         if (sendSlotsReturn.getThird().length != 0) {
612                                                 // insert into the local block chain
613                                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
614                                         }
615                                         // continue;
616                                 } else {
617                                         boolean isInserted = false;
618                                         for (Slot s : newSlots) {
619                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
620                                                         isInserted = true;
621                                                         break;
622                                                 }
623                                         }
624
625                                         for (Slot s : newSlots) {
626                                                 if (isInserted) {
627                                                         break;
628                                                 }
629
630                                                 // Process each entry in the slot
631                                                 for (Entry entry : s.getEntries()) {
632
633                                                         if (entry.getType() == Entry.TypeLastMessage) {
634                                                                 LastMessage lastMessage = (LastMessage)entry;
635                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
636                                                                         isInserted = true;
637                                                                         break;
638                                                                 }
639                                                         }
640                                                 }
641                                         }
642
643                                         if (isInserted) {
644                                                 if (newKey != null) {
645                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
646                                                                 newKey = null;
647                                                         }
648                                                 }
649
650                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
651                                                         transaction.resetServerFailure();
652
653                                                         // Update which transactions parts still need to be sent
654                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
655
656                                                         // Add the transaction status to the outstanding list
657                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
658
659                                                         // Update the transaction status
660                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
661
662                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
663                                                         if (transaction.didSendAllParts()) {
664                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
665                                                                 pendingTransactionQueue.remove(transaction);
666                                                         } else {
667                                                                 transaction.resetServerFailure();
668                                                                 // Set the transaction sequence number back to nothing
669                                                                 if (!transaction.didSendAPartToServer()) {
670                                                                         transaction.setSequenceNumber(-1);
671                                                                 }
672                                                         }
673                                                 }
674                                         } else {
675                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
676                                                         transaction.resetServerFailure();
677                                                         // Set the transaction sequence number back to nothing
678                                                         if (!transaction.didSendAPartToServer()) {
679                                                                 transaction.setSequenceNumber(-1);
680                                                         }
681                                                 }
682                                         }
683
684                                         // insert into the local block chain
685                                         validateAndUpdate(newSlots, true);
686                                 }
687                         }
688                 } catch (ServerException e) {
689                         throw e;
690                 }
691
692
693                 try {
694                         // While we have stuff that needs inserting into the block chain
695                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
696                                 fromRetry = false;
697
698                                 if (hadPartialSendToServer) {
699                                         throw new Error("Should Be error free");
700                                 }
701
702
703
704                                 // If there is a new key with same name then end
705                                 if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
706                                         return false;
707                                 }
708
709                                 // Create the slot
710                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
711
712                                 // Try to fill the slot with data
713                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
714                                 boolean needsResize = fillSlotsReturn.getFirst();
715                                 int newSize = fillSlotsReturn.getSecond();
716                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
717
718                                 if (needsResize) {
719                                         // Reset which transaction to send
720                                         for (Transaction transaction : transactionPartsSent.keySet()) {
721                                                 transaction.resetNextPartToSend();
722
723                                                 // Set the transaction sequence number back to nothing
724                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
725                                                         transaction.setSequenceNumber(-1);
726                                                 }
727                                         }
728
729                                         // Clear the sent data since we are trying again
730                                         pendingSendArbitrationEntriesToDelete.clear();
731                                         transactionPartsSent.clear();
732
733                                         // We needed a resize so try again
734                                         fillSlot(slot, true, newKey);
735                                 }
736
737                                 lastSlotAttemptedToSend = slot;
738                                 lastIsNewKey = (newKey != null);
739                                 lastInsertedNewKey = insertedNewKey;
740                                 lastNewSize = newSize;
741                                 lastNewKey = newKey;
742                                 lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
743                                 lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
744
745
746                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
747
748                                 if (sendSlotsReturn.getFirst()) {
749
750                                         // Did insert into the block chain
751
752                                         if (insertedNewKey) {
753                                                 // This slot was what was inserted not a previous slot
754
755                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
756                                                 newKey = null;
757                                         }
758
759                                         // Remove the aborts and commit parts that were sent from the pending to send queue
760                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
761                                                 ArbitrationRound round = iter.next();
762                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
763
764                                                 if (round.isDoneSending()) {
765                                                         // Sent all the parts
766                                                         iter.remove();
767                                                 }
768                                         }
769
770                                         for (Transaction transaction : transactionPartsSent.keySet()) {
771                                                 transaction.resetServerFailure();
772
773                                                 // Update which transactions parts still need to be sent
774                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
775
776                                                 // Add the transaction status to the outstanding list
777                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
778
779                                                 // Update the transaction status
780                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
781
782                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
783                                                 if (transaction.didSendAllParts()) {
784                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
785                                                         pendingTransactionQueue.remove(transaction);
786                                                 }
787                                         }
788                                 } else {
789
790                                         // if (!sendSlotsReturn.getSecond()) {
791                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
792                                         //              transaction.resetServerFailure();
793                                         //      }
794                                         // } else {
795                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
796                                         //              transaction.resetServerFailure();
797
798                                         //              // Update which transactions parts still need to be sent
799                                         //              transaction.removeSentParts(transactionPartsSent.get(transaction));
800
801                                         //              // Add the transaction status to the outstanding list
802                                         //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
803
804                                         //              // Update the transaction status
805                                         //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
806
807                                         //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
808                                         //              if (transaction.didSendAllParts()) {
809                                         //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
810                                         //                      pendingTransactionQueue.remove(transaction);
811
812                                         //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
813                                         //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
814                                         //                      }
815                                         //              }
816                                         //      }
817                                         // }
818
819                                         // Reset which transaction to send
820                                         for (Transaction transaction : transactionPartsSent.keySet()) {
821                                                 transaction.resetNextPartToSend();
822                                                 // transaction.resetNextPartToSend();
823
824                                                 // Set the transaction sequence number back to nothing
825                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
826                                                         transaction.setSequenceNumber(-1);
827                                                 }
828                                         }
829                                 }
830
831                                 // Clear the sent data in preparation for next send
832                                 pendingSendArbitrationEntriesToDelete.clear();
833                                 transactionPartsSent.clear();
834
835                                 if (sendSlotsReturn.getThird().length != 0) {
836                                         // insert into the local block chain
837                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
838                                 }
839                         }
840
841                 } catch (ServerException e) {
842
843                         if (e.getType() != ServerException.TypeInputTimeout) {
844                                 // e.printStackTrace();
845
846                                 // Nothing was able to be sent to the server so just clear these data structures
847                                 for (Transaction transaction : transactionPartsSent.keySet()) {
848                                         transaction.resetNextPartToSend();
849
850                                         // Set the transaction sequence number back to nothing
851                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
852                                                 transaction.setSequenceNumber(-1);
853                                         }
854                                 }
855                         } else {
856                                 // There was a partial send to the server
857                                 hadPartialSendToServer = true;
858
859
860                                 // if (!fromRetry) {
861                                 //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
862                                 //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
863                                 // }
864
865                                 // Nothing was able to be sent to the server so just clear these data structures
866                                 for (Transaction transaction : transactionPartsSent.keySet()) {
867                                         transaction.resetNextPartToSend();
868                                         transaction.setServerFailure();
869                                 }
870                         }
871
872                         pendingSendArbitrationEntriesToDelete.clear();
873                         transactionPartsSent.clear();
874
875                         throw e;
876                 }
877
878                 return newKey == null;
879         }
880
881         private synchronized boolean updateFromLocal(long machineId) {
882                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
883                 if (localCommunicationInformation == null) {
884                         // Cant talk to that device locally so do nothing
885                         return false;
886                 }
887
888                 // Get the size of the send data
889                 int sendDataSize = Integer.BYTES + Long.BYTES;
890
891                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
892                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
893                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
894                 }
895
896                 byte[] sendData = new byte[sendDataSize];
897                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
898
899                 // Encode the data
900                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
901                 bbEncode.putInt(0);
902
903                 // Send by local
904                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
905
906                 if (returnData == null) {
907                         // Could not contact server
908                         return false;
909                 }
910
911                 // Decode the data
912                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
913                 int numberOfEntries = bbDecode.getInt();
914
915                 for (int i = 0; i < numberOfEntries; i++) {
916                         byte type = bbDecode.get();
917                         if (type == Entry.TypeAbort) {
918                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
919                                 processEntry(abort);
920                         } else if (type == Entry.TypeCommitPart) {
921                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
922                                 processEntry(commitPart);
923                         }
924                 }
925
926                 updateLiveStateFromLocal();
927
928                 return true;
929         }
930
931         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
932
933                 // Get the devices local communications
934                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
935
936                 if (localCommunicationInformation == null) {
937                         // Cant talk to that device locally so do nothing
938                         return new Pair<Boolean, Boolean>(true, false);
939                 }
940
941                 // Get the size of the send data
942                 int sendDataSize = Integer.BYTES + Long.BYTES;
943                 for (TransactionPart part : transaction.getParts().values()) {
944                         sendDataSize += part.getSize();
945                 }
946
947                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
948                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
949                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
950                 }
951
952                 // Make the send data size
953                 byte[] sendData = new byte[sendDataSize];
954                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
955
956                 // Encode the data
957                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
958                 bbEncode.putInt(transaction.getParts().size());
959                 for (TransactionPart part : transaction.getParts().values()) {
960                         part.encode(bbEncode);
961                 }
962
963
964                 // Send by local
965                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
966
967                 if (returnData == null) {
968                         // Could not contact server
969                         return new Pair<Boolean, Boolean>(true, false);
970                 }
971
972                 // Decode the data
973                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
974                 boolean didCommit = bbDecode.get() == 1;
975                 boolean couldArbitrate = bbDecode.get() == 1;
976                 int numberOfEntries = bbDecode.getInt();
977                 boolean foundAbort = false;
978
979                 for (int i = 0; i < numberOfEntries; i++) {
980                         byte type = bbDecode.get();
981                         if (type == Entry.TypeAbort) {
982                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
983
984                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
985                                         foundAbort = true;
986                                 }
987
988                                 processEntry(abort);
989                         } else if (type == Entry.TypeCommitPart) {
990                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
991                                 processEntry(commitPart);
992                         }
993                 }
994
995                 updateLiveStateFromLocal();
996
997                 if (couldArbitrate) {
998                         TransactionStatus status =  transaction.getTransactionStatus();
999                         if (didCommit) {
1000                                 status.setStatus(TransactionStatus.StatusCommitted);
1001                         } else {
1002                                 status.setStatus(TransactionStatus.StatusAborted);
1003                         }
1004                 } else {
1005                         TransactionStatus status =  transaction.getTransactionStatus();
1006                         if (foundAbort) {
1007                                 status.setStatus(TransactionStatus.StatusAborted);
1008                         } else {
1009                                 status.setStatus(TransactionStatus.StatusCommitted);
1010                         }
1011                 }
1012
1013                 return new Pair<Boolean, Boolean>(false, true);
1014         }
1015
1016         public synchronized byte[] acceptDataFromLocal(byte[] data) {
1017
1018                 // Decode the data
1019                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1020                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1021                 int numberOfParts = bbDecode.getInt();
1022
1023                 // If we did commit a transaction or not
1024                 boolean didCommit = false;
1025                 boolean couldArbitrate = false;
1026
1027                 if (numberOfParts != 0) {
1028
1029                         // decode the transaction
1030                         Transaction transaction = new Transaction();
1031                         for (int i = 0; i < numberOfParts; i++) {
1032                                 bbDecode.get();
1033                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
1034                                 transaction.addPartDecode(newPart);
1035                         }
1036
1037                         // Arbitrate on transaction and pull relevant return data
1038                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1039                         couldArbitrate = localArbitrateReturn.getFirst();
1040                         didCommit = localArbitrateReturn.getSecond();
1041
1042                         updateLiveStateFromLocal();
1043
1044                         // Transaction was sent to the server so keep track of it to prevent double commit
1045                         if (transaction.getSequenceNumber() != -1) {
1046                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1047                         }
1048                 }
1049
1050                 // The data to send back
1051                 int returnDataSize = 0;
1052                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
1053
1054                 // Get the aborts to send back
1055                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
1056                 Collections.sort(abortLocalSequenceNumbers);
1057                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1058                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1059                                 continue;
1060                         }
1061
1062                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1063                         unseenArbitrations.add(abort);
1064                         returnDataSize += abort.getSize();
1065                 }
1066
1067                 // Get the commits to send back
1068                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1069                 if (commitForClientTable != null) {
1070                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1071                         Collections.sort(commitLocalSequenceNumbers);
1072
1073                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1074                                 Commit commit = commitForClientTable.get(localSequenceNumber);
1075
1076                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1077                                         continue;
1078                                 }
1079
1080                                 unseenArbitrations.addAll(commit.getParts().values());
1081
1082                                 for (CommitPart commitPart : commit.getParts().values()) {
1083                                         returnDataSize += commitPart.getSize();
1084                                 }
1085                         }
1086                 }
1087
1088                 // Number of arbitration entries to decode
1089                 returnDataSize += 2 * Integer.BYTES;
1090
1091                 // Boolean of did commit or not
1092                 if (numberOfParts != 0) {
1093                         returnDataSize += Byte.BYTES;
1094                 }
1095
1096                 // Data to send Back
1097                 byte[] returnData = new byte[returnDataSize];
1098                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1099
1100                 if (numberOfParts != 0) {
1101                         if (didCommit) {
1102                                 bbEncode.put((byte)1);
1103                         } else {
1104                                 bbEncode.put((byte)0);
1105                         }
1106                         if (couldArbitrate) {
1107                                 bbEncode.put((byte)1);
1108                         } else {
1109                                 bbEncode.put((byte)0);
1110                         }
1111                 }
1112
1113                 bbEncode.putInt(unseenArbitrations.size());
1114                 for (Entry entry : unseenArbitrations) {
1115                         entry.encode(bbEncode);
1116                 }
1117
1118                 return returnData;
1119         }
1120
1121         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
1122
1123                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
1124                 attemptedToSendToServer = true;
1125
1126                 boolean inserted = false;
1127                 boolean lastTryInserted = false;
1128
1129                 Slot[] array = cloud.putSlot(slot, newSize);
1130                 if (array == null) {
1131                         array = new Slot[] {slot};
1132                         rejectedSlotList.clear();
1133                         inserted = true;
1134                 }       else {
1135                         if (array.length == 0) {
1136                                 throw new Error("Server Error: Did not send any slots");
1137                         }
1138
1139                         // if (attemptedToSendToServerTmp) {
1140                         if (hadPartialSendToServer) {
1141
1142                                 boolean isInserted = false;
1143                                 for (Slot s : array) {
1144                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1145                                                 isInserted = true;
1146                                                 break;
1147                                         }
1148                                 }
1149
1150                                 for (Slot s : array) {
1151                                         if (isInserted) {
1152                                                 break;
1153                                         }
1154
1155                                         // Process each entry in the slot
1156                                         for (Entry entry : s.getEntries()) {
1157
1158                                                 if (entry.getType() == Entry.TypeLastMessage) {
1159                                                         LastMessage lastMessage = (LastMessage)entry;
1160
1161                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1162                                                                 isInserted = true;
1163                                                                 break;
1164                                                         }
1165                                                 }
1166                                         }
1167                                 }
1168
1169                                 if (!isInserted) {
1170                                         rejectedSlotList.add(slot.getSequenceNumber());
1171                                         lastTryInserted = false;
1172                                 } else {
1173                                         lastTryInserted = true;
1174                                 }
1175                         } else {
1176                                 rejectedSlotList.add(slot.getSequenceNumber());
1177                                 lastTryInserted = false;
1178                         }
1179                 }
1180
1181                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
1182         }
1183
1184         /**
1185          * Returns false if a resize was needed
1186          */
1187         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
1188                 int newSize = 0;
1189                 if (liveSlotCount > bufferResizeThreshold) {
1190                         resize = true; //Resize is forced
1191                 }
1192
1193                 if (resize) {
1194                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1195                         TableStatus status = new TableStatus(slot, newSize);
1196                         slot.addEntry(status);
1197                 }
1198
1199                 // Fill with rejected slots first before doing anything else
1200                 doRejectedMessages(slot);
1201
1202                 // Do mandatory rescue of entries
1203                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1204
1205                 // Extract working variables
1206                 boolean needsResize = mandatoryRescueReturn.getFirst();
1207                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
1208                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1209
1210                 if (needsResize && !resize) {
1211                         // We need to resize but we are not resizing so return false
1212                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
1213                 }
1214
1215                 boolean inserted = false;
1216                 if (newKeyEntry != null) {
1217                         newKeyEntry.setSlot(slot);
1218                         if (slot.hasSpace(newKeyEntry)) {
1219                                 slot.addEntry(newKeyEntry);
1220                                 inserted = true;
1221                         }
1222                 }
1223
1224                 // Clear the transactions, aborts and commits that were sent previously
1225                 transactionPartsSent.clear();
1226                 pendingSendArbitrationEntriesToDelete.clear();
1227
1228                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1229                         boolean isFull = false;
1230                         round.generateParts();
1231                         List<Entry> parts = round.getParts();
1232
1233                         // Insert pending arbitration data
1234                         for (Entry arbitrationData : parts) {
1235
1236                                 // If it is an abort then we need to set some information
1237                                 if (arbitrationData instanceof Abort) {
1238                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1239                                 }
1240
1241                                 if (!slot.hasSpace(arbitrationData)) {
1242                                         // No space so cant do anything else with these data entries
1243                                         isFull = true;
1244                                         break;
1245                                 }
1246
1247                                 // Add to this current slot and add it to entries to delete
1248                                 slot.addEntry(arbitrationData);
1249                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1250                         }
1251
1252                         if (isFull) {
1253                                 break;
1254                         }
1255                 }
1256
1257                 if (pendingTransactionQueue.size() > 0) {
1258
1259                         Transaction transaction = pendingTransactionQueue.get(0);
1260
1261                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1262                         // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1263                         //      transaction.setSequenceNumber(slot.getSequenceNumber());
1264                         // }
1265
1266                         if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1267                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1268                         }
1269
1270
1271                         while (true) {
1272                                 TransactionPart part = transaction.getNextPartToSend();
1273
1274                                 if (part == null) {
1275                                         // Ran out of parts to send for this transaction so move on
1276                                         break;
1277                                 }
1278
1279                                 if (slot.hasSpace(part)) {
1280                                         slot.addEntry(part);
1281                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1282                                         if (partsSent == null) {
1283                                                 partsSent = new ArrayList<Integer>();
1284                                                 transactionPartsSent.put(transaction, partsSent);
1285                                         }
1286                                         partsSent.add(part.getPartNumber());
1287                                         transactionPartsSent.put(transaction, partsSent);
1288                                 } else {
1289                                         break;
1290                                 }
1291                         }
1292                 }
1293
1294                 // Fill the remainder of the slot with rescue data
1295                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1296
1297                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1298         }
1299
1300         private void doRejectedMessages(Slot s) {
1301                 if (! rejectedSlotList.isEmpty()) {
1302                         /* TODO: We should avoid generating a rejected message entry if
1303                          * there is already a sufficient entry in the queue (e.g.,
1304                          * equalsto value of true and same sequence number).  */
1305
1306                         long old_seqn = rejectedSlotList.firstElement();
1307                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1308                                 long new_seqn = rejectedSlotList.lastElement();
1309                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1310                                 s.addEntry(rm);
1311                         } else {
1312                                 long prev_seqn = -1;
1313                                 int i = 0;
1314                                 /* Go through list of missing messages */
1315                                 for (; i < rejectedSlotList.size(); i++) {
1316                                         long curr_seqn = rejectedSlotList.get(i);
1317                                         Slot s_msg = buffer.getSlot(curr_seqn);
1318                                         if (s_msg != null)
1319                                                 break;
1320                                         prev_seqn = curr_seqn;
1321                                 }
1322                                 /* Generate rejected message entry for missing messages */
1323                                 if (prev_seqn != -1) {
1324                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1325                                         s.addEntry(rm);
1326                                 }
1327                                 /* Generate rejected message entries for present messages */
1328                                 for (; i < rejectedSlotList.size(); i++) {
1329                                         long curr_seqn = rejectedSlotList.get(i);
1330                                         Slot s_msg = buffer.getSlot(curr_seqn);
1331                                         long machineid = s_msg.getMachineID();
1332                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1333                                         s.addEntry(rm);
1334                                 }
1335                         }
1336                 }
1337         }
1338
1339         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1340                 long newestSequenceNumber = buffer.getNewestSeqNum();
1341                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1342                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1343                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1344                 }
1345
1346                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1347                 boolean seenLiveSlot = false;
1348                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1349                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1350
1351
1352                 // Mandatory Rescue
1353                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1354                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1355                         // Push slot number forward
1356                         if (! seenLiveSlot) {
1357                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1358                         }
1359
1360                         if (!previousSlot.isLive()) {
1361                                 continue;
1362                         }
1363
1364                         // We have seen a live slot
1365                         seenLiveSlot = true;
1366
1367                         // Get all the live entries for a slot
1368                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1369
1370                         // Iterate over all the live entries and try to rescue them
1371                         for (Entry liveEntry : liveEntries) {
1372                                 if (slot.hasSpace(liveEntry)) {
1373
1374                                         // Enough space to rescue the entry
1375                                         slot.addEntry(liveEntry);
1376                                 } else if (currentSequenceNumber == firstIfFull) {
1377                                         //if there's no space but the entry is about to fall off the queue
1378                                         System.out.println("B"); //?
1379                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1380
1381                                 }
1382                         }
1383                 }
1384
1385                 // Did not resize
1386                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1387         }
1388
1389         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1390                 /* now go through live entries from least to greatest sequence number until
1391                  * either all live slots added, or the slot doesn't have enough room
1392                  * for SKIP_THRESHOLD consecutive entries*/
1393                 int skipcount = 0;
1394                 long newestseqnum = buffer.getNewestSeqNum();
1395                 search:
1396                 for (; seqn <= newestseqnum; seqn++) {
1397                         Slot prevslot = buffer.getSlot(seqn);
1398                         //Push slot number forward
1399                         if (!seenliveslot)
1400                                 oldestLiveSlotSequenceNumver = seqn;
1401
1402                         if (!prevslot.isLive())
1403                                 continue;
1404                         seenliveslot = true;
1405                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1406                         for (Entry liveentry : liveentries) {
1407                                 if (s.hasSpace(liveentry))
1408                                         s.addEntry(liveentry);
1409                                 else {
1410                                         skipcount++;
1411                                         if (skipcount > SKIP_THRESHOLD)
1412                                                 break search;
1413                                 }
1414                         }
1415                 }
1416         }
1417
1418         /**
1419          * Checks for malicious activity and updates the local copy of the block chain.
1420          */
1421         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1422
1423                 // The cloud communication layer has checked slot HMACs already before decoding
1424                 if (newSlots.length == 0) {
1425                         return;
1426                 }
1427
1428                 // Make sure all slots are newer than the last largest slot this client has seen
1429                 long firstSeqNum = newSlots[0].getSequenceNumber();
1430                 if (firstSeqNum <= sequenceNumber) {
1431                         throw new Error("Server Error: Sent older slots!");
1432                 }
1433
1434                 // Create an object that can access both new slots and slots in our local chain
1435                 // without committing slots to our local chain
1436                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1437
1438                 // Check that the HMAC chain is not broken
1439                 checkHMACChain(indexer, newSlots);
1440
1441                 // Set to keep track of messages from clients
1442                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1443
1444                 // Process each slots data
1445                 for (Slot slot : newSlots) {
1446                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1447                         updateExpectedSize();
1448                 }
1449
1450                 // If there is a gap, check to see if the server sent us everything.
1451                 if (firstSeqNum != (sequenceNumber + 1)) {
1452
1453                         // Check the size of the slots that were sent down by the server.
1454                         // Can only check the size if there was a gap
1455                         checkNumSlots(newSlots.length);
1456
1457                         // Since there was a gap every machine must have pushed a slot or must have
1458                         // a last message message.  If not then the server is hiding slots
1459                         if (!machineSet.isEmpty()) {
1460                                 throw new Error("Missing record for machines: " + machineSet);
1461                         }
1462                 }
1463
1464                 // Update the size of our local block chain.
1465                 commitNewMaxSize();
1466
1467                 // Commit new to slots to the local block chain.
1468                 for (Slot slot : newSlots) {
1469
1470                         // Insert this slot into our local block chain copy.
1471                         buffer.putSlot(slot);
1472
1473                         // Keep track of how many slots are currently live (have live data in them).
1474                         liveSlotCount++;
1475                 }
1476
1477                 // Get the sequence number of the latest slot in the system
1478                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1479
1480                 updateLiveStateFromServer();
1481
1482                 // No Need to remember after we pulled from the server
1483                 offlineTransactionsCommittedAndAtServer.clear();
1484
1485                 // This is invalidated now
1486                 hadPartialSendToServer = false;
1487         }
1488
1489         private void updateLiveStateFromServer() {
1490                 // Process the new transaction parts
1491                 processNewTransactionParts();
1492
1493                 // Do arbitration on new transactions that were received
1494                 arbitrateFromServer();
1495
1496                 // Update all the committed keys
1497                 boolean didCommitOrSpeculate = updateCommittedTable();
1498
1499                 // Delete the transactions that are now dead
1500                 updateLiveTransactionsAndStatus();
1501
1502                 // Do speculations
1503                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1504                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1505         }
1506
1507         private void updateLiveStateFromLocal() {
1508                 // Update all the committed keys
1509                 boolean didCommitOrSpeculate = updateCommittedTable();
1510
1511                 // Delete the transactions that are now dead
1512                 updateLiveTransactionsAndStatus();
1513
1514                 // Do speculations
1515                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1516                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1517         }
1518
1519         private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
1520
1521                 long prevslots = firstSequenceNumber;
1522                 if (didFindTableStatus) {
1523 //                      expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
1524                 } else {
1525                         expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1526                 }
1527                 didFindTableStatus = true;
1528
1529                 currMaxSize = numberOfSlots;
1530         }
1531
1532         private void updateExpectedSize() {
1533                 expectedsize++;
1534                 if (expectedsize > currMaxSize) {
1535                         expectedsize = currMaxSize;
1536                 }
1537         }
1538
1539
1540         /**
1541          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1542          * This is only called when we have a gap between the slots that we have locally and the slots
1543          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1544          * status message
1545          */
1546         private void checkNumSlots(int numberOfSlots) {
1547                 if (numberOfSlots != expectedsize) {
1548                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1549                 }
1550         }
1551
1552         private void updateCurrMaxSize(int newmaxsize) {
1553                 currMaxSize = newmaxsize;
1554         }
1555
1556
1557         /**
1558          * Update the size of of the local buffer if it is needed.
1559          */
1560         private void commitNewMaxSize() {
1561                 didFindTableStatus = false;
1562
1563                 // Resize the local slot buffer
1564                 if (numberOfSlots != currMaxSize) {
1565                         buffer.resize((int)currMaxSize);
1566                 }
1567
1568                 // Change the number of local slots to the new size
1569                 numberOfSlots = (int)currMaxSize;
1570
1571                 // Recalculate the resize threshold since the size of the local buffer has changed
1572                 setResizeThreshold();
1573         }
1574
1575         /**
1576          * Process the new transaction parts from this latest round of slots received from the server
1577          */
1578         private void processNewTransactionParts() {
1579
1580                 if (newTransactionParts.size() == 0) {
1581                         // Nothing new to process
1582                         return;
1583                 }
1584
1585                 // Iterate through all the machine Ids that we received new parts for
1586                 for (Long machineId : newTransactionParts.keySet()) {
1587                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1588
1589                         // Iterate through all the parts for that machine Id
1590                         for (Pair<Long, Integer> partId : parts.keySet()) {
1591                                 TransactionPart part = parts.get(partId);
1592
1593                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1594                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1595                                         // Set dead the transaction part
1596                                         part.setDead();
1597                                         continue;
1598                                 }
1599
1600                                 // Get the transaction object for that sequence number
1601                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1602
1603                                 if (transaction == null) {
1604                                         // This is a new transaction that we dont have so make a new one
1605                                         transaction = new Transaction();
1606
1607                                         // Insert this new transaction into the live tables
1608                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1609                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1610                                 }
1611
1612                                 // Add that part to the transaction
1613                                 transaction.addPartDecode(part);
1614                         }
1615                 }
1616
1617                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1618                 newTransactionParts.clear();
1619         }
1620
1621
1622         private long lastSeqNumArbOn = 0;
1623
1624         private void arbitrateFromServer() {
1625
1626                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1627                         // Nothing to arbitrate on so move on
1628                         return;
1629                 }
1630
1631                 // Get the transaction sequence numbers and sort from oldest to newest
1632                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1633                 Collections.sort(transactionSequenceNumbers);
1634
1635                 // Collection of key value pairs that are
1636                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1637
1638                 // The last transaction arbitrated on
1639                 long lastTransactionCommitted = -1;
1640                 Set<Abort> generatedAborts = new HashSet<Abort>();
1641
1642                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1643                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1644
1645
1646
1647                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1648                         if (transaction.getArbitrator() != localMachineId) {
1649                                 continue;
1650                         }
1651
1652                         if (transactionSequenceNumber < lastSeqNumArbOn) {
1653                                 continue;
1654                         }
1655
1656                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1657                                 // We have seen this already locally so dont commit again
1658                                 continue;
1659                         }
1660
1661
1662                         if (!transaction.isComplete()) {
1663                                 // Will arbitrate in incorrect order if we continue so just break
1664                                 // Most likely this
1665                                 break;
1666                         }
1667
1668
1669                         // update the largest transaction seen by arbitrator from server
1670                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1671                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1672                         } else {
1673                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1674                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1675                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1676                                 }
1677                         }
1678
1679                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1680                                 // Guard evaluated as true
1681
1682                                 // Update the local changes so we can make the commit
1683                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1684                                         speculativeTableTmp.put(kv.getKey(), kv);
1685                                 }
1686
1687                                 // Update what the last transaction committed was for use in batch commit
1688                                 lastTransactionCommitted = transactionSequenceNumber;
1689                         } else {
1690                                 // Guard evaluated was false so create abort
1691
1692                                 // Create the abort
1693                                 Abort newAbort = new Abort(null,
1694                                                            transaction.getClientLocalSequenceNumber(),
1695                                                            transaction.getSequenceNumber(),
1696                                                            transaction.getMachineId(),
1697                                                            transaction.getArbitrator(),
1698                                                            localArbitrationSequenceNumber);
1699                                 localArbitrationSequenceNumber++;
1700
1701                                 generatedAborts.add(newAbort);
1702
1703                                 // Insert the abort so we can process
1704                                 processEntry(newAbort);
1705                         }
1706
1707                         lastSeqNumArbOn = transactionSequenceNumber;
1708
1709                         // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1710                 }
1711
1712                 Commit newCommit = null;
1713
1714                 // If there is something to commit
1715                 if (speculativeTableTmp.size() != 0) {
1716
1717                         // Create the commit and increment the commit sequence number
1718                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1719                         localArbitrationSequenceNumber++;
1720
1721                         // Add all the new keys to the commit
1722                         for (KeyValue kv : speculativeTableTmp.values()) {
1723                                 newCommit.addKV(kv);
1724                         }
1725
1726                         // create the commit parts
1727                         newCommit.createCommitParts();
1728
1729                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1730
1731                         // Insert the commit so we can process it
1732                         for (CommitPart commitPart : newCommit.getParts().values()) {
1733                                 processEntry(commitPart);
1734                         }
1735                 }
1736
1737                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1738                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1739                         pendingSendArbitrationRounds.add(arbitrationRound);
1740
1741                         if (compactArbitrationData()) {
1742                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1743                                 if (newArbitrationRound.getCommit() != null) {
1744                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1745                                                 processEntry(commitPart);
1746                                         }
1747                                 }
1748                         }
1749                 }
1750         }
1751
1752         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1753
1754                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1755                 if (transaction.getArbitrator() != localMachineId) {
1756                         return new Pair<Boolean, Boolean>(false, false);
1757                 }
1758
1759                 if (!transaction.isComplete()) {
1760                         // Will arbitrate in incorrect order if we continue so just break
1761                         // Most likely this
1762                         return new Pair<Boolean, Boolean>(false, false);
1763                 }
1764
1765                 if (transaction.getMachineId() != localMachineId) {
1766                         // dont do this check for local transactions
1767                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1768                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1769                                         // We've have already seen this from the server
1770                                         return new Pair<Boolean, Boolean>(false, false);
1771                                 }
1772                         }
1773                 }
1774
1775                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1776                         // Guard evaluated as true
1777
1778                         // Create the commit and increment the commit sequence number
1779                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1780                         localArbitrationSequenceNumber++;
1781
1782                         // Update the local changes so we can make the commit
1783                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1784                                 newCommit.addKV(kv);
1785                         }
1786
1787                         // create the commit parts
1788                         newCommit.createCommitParts();
1789
1790                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1791                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1792                         pendingSendArbitrationRounds.add(arbitrationRound);
1793
1794                         if (compactArbitrationData()) {
1795                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1796                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1797                                         processEntry(commitPart);
1798                                 }
1799                         } else {
1800                                 // Insert the commit so we can process it
1801                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1802                                         processEntry(commitPart);
1803                                 }
1804                         }
1805
1806                         if (transaction.getMachineId() == localMachineId) {
1807                                 TransactionStatus status = transaction.getTransactionStatus();
1808                                 if (status != null) {
1809                                         status.setStatus(TransactionStatus.StatusCommitted);
1810                                 }
1811                         }
1812
1813                         updateLiveStateFromLocal();
1814                         return new Pair<Boolean, Boolean>(true, true);
1815                 } else {
1816
1817                         if (transaction.getMachineId() == localMachineId) {
1818                                 // For locally created messages update the status
1819
1820                                 // Guard evaluated was false so create abort
1821                                 TransactionStatus status = transaction.getTransactionStatus();
1822                                 if (status != null) {
1823                                         status.setStatus(TransactionStatus.StatusAborted);
1824                                 }
1825                         } else {
1826                                 Set addAbortSet = new HashSet<Abort>();
1827
1828
1829                                 // Create the abort
1830                                 Abort newAbort = new Abort(null,
1831                                                            transaction.getClientLocalSequenceNumber(),
1832                                                            -1,
1833                                                            transaction.getMachineId(),
1834                                                            transaction.getArbitrator(),
1835                                                            localArbitrationSequenceNumber);
1836                                 localArbitrationSequenceNumber++;
1837
1838                                 addAbortSet.add(newAbort);
1839
1840
1841                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1842                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1843                                 pendingSendArbitrationRounds.add(arbitrationRound);
1844
1845                                 if (compactArbitrationData()) {
1846                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1847                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1848                                                 processEntry(commitPart);
1849                                         }
1850                                 }
1851                         }
1852
1853                         updateLiveStateFromLocal();
1854                         return new Pair<Boolean, Boolean>(true, false);
1855                 }
1856         }
1857
1858         /**
1859          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1860          */
1861         private boolean compactArbitrationData() {
1862
1863                 if (pendingSendArbitrationRounds.size() < 2) {
1864                         // Nothing to compact so do nothing
1865                         return false;
1866                 }
1867
1868                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1869                 if (lastRound.didSendPart()) {
1870                         return false;
1871                 }
1872
1873                 boolean hadCommit = (lastRound.getCommit() == null);
1874                 boolean gotNewCommit = false;
1875
1876                 int numberToDelete = 1;
1877                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1878                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1879
1880                         if (round.isFull() || round.didSendPart()) {
1881                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1882                                 break;
1883                         }
1884
1885                         if (round.getCommit() == null) {
1886
1887                                 // Try compacting aborts only
1888                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1889                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1890                                         // Cant compact since it would be too large
1891                                         break;
1892                                 }
1893                                 lastRound.addAborts(round.getAborts());
1894                         } else {
1895
1896                                 // Create a new larger commit
1897                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1898                                 localArbitrationSequenceNumber++;
1899
1900                                 // Create the commit parts so that we can count them
1901                                 newCommit.createCommitParts();
1902
1903                                 // Calculate the new size of the parts
1904                                 int newSize = newCommit.getNumberOfParts();
1905                                 newSize += lastRound.getAbortsCount();
1906                                 newSize += round.getAbortsCount();
1907
1908                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1909                                         // Cant compact since it would be too large
1910                                         break;
1911                                 }
1912
1913                                 // Set the new compacted part
1914                                 lastRound.setCommit(newCommit);
1915                                 lastRound.addAborts(round.getAborts());
1916                                 gotNewCommit = true;
1917                         }
1918
1919                         numberToDelete++;
1920                 }
1921
1922                 if (numberToDelete != 1) {
1923                         // If there is a compaction
1924
1925                         // Delete the previous pieces that are now in the new compacted piece
1926                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1927                                 pendingSendArbitrationRounds.clear();
1928                         } else {
1929                                 for (int i = 0; i < numberToDelete; i++) {
1930                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1931                                 }
1932                         }
1933
1934                         // Add the new compacted into the pending to send list
1935                         pendingSendArbitrationRounds.add(lastRound);
1936
1937                         // Should reinsert into the commit processor
1938                         if (hadCommit && gotNewCommit) {
1939                                 return true;
1940                         }
1941                 }
1942
1943                 return false;
1944         }
1945         // private boolean compactArbitrationData() {
1946         //      return false;
1947         // }
1948
1949         /**
1950          * Update all the commits and the committed tables, sets dead the dead transactions
1951          */
1952         private boolean updateCommittedTable() {
1953
1954                 if (newCommitParts.size() == 0) {
1955                         // Nothing new to process
1956                         return false;
1957                 }
1958
1959                 // Iterate through all the machine Ids that we received new parts for
1960                 for (Long machineId : newCommitParts.keySet()) {
1961                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
1962
1963                         // Iterate through all the parts for that machine Id
1964                         for (Pair<Long, Integer> partId : parts.keySet()) {
1965                                 CommitPart part = parts.get(partId);
1966
1967                                 // Get the transaction object for that sequence number
1968                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
1969
1970                                 if (commitForClientTable == null) {
1971                                         // This is the first commit from this device
1972                                         commitForClientTable = new HashMap<Long, Commit>();
1973                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
1974                                 }
1975
1976                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
1977
1978                                 if (commit == null) {
1979                                         // This is a new commit that we dont have so make a new one
1980                                         commit = new Commit();
1981
1982                                         // Insert this new commit into the live tables
1983                                         commitForClientTable.put(part.getSequenceNumber(), commit);
1984                                 }
1985
1986                                 // Add that part to the commit
1987                                 commit.addPartDecode(part);
1988                         }
1989                 }
1990
1991                 // Clear all the new commits parts in preparation for the next time the server sends slots
1992                 newCommitParts.clear();
1993
1994                 // If we process a new commit keep track of it for future use
1995                 boolean didProcessANewCommit = false;
1996
1997                 // Process the commits one by one
1998                 for (Long arbitratorId : liveCommitsTable.keySet()) {
1999
2000                         // Get all the commits for a specific arbitrator
2001                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2002
2003                         // Sort the commits in order
2004                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
2005                         Collections.sort(commitSequenceNumbers);
2006
2007                         // Get the last commit seen from this arbitrator
2008                         long lastCommitSeenSequenceNumber = -1;
2009                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
2010                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2011                         }
2012
2013                         // Go through each new commit one by one
2014                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2015                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2016                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
2017
2018                                 // Special processing if a commit is not complete
2019                                 if (!commit.isComplete()) {
2020                                         if (i == (commitSequenceNumbers.size() - 1)) {
2021                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2022                                                 break;
2023                                         } else {
2024                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2025                                                 // Delete it and move on
2026                                                 commit.setDead();
2027                                                 commitForClientTable.remove(commit.getSequenceNumber());
2028                                                 continue;
2029                                         }
2030                                 }
2031
2032                                 // Update the last transaction that was updated if we can
2033                                 if (commit.getTransactionSequenceNumber() != -1) {
2034                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2035
2036                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2037                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2038                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2039                                         }
2040                                 }
2041
2042                                 // Update the last arbitration data that we have seen so far
2043                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
2044
2045                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2046                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2047                                                 // Is larger
2048                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2049                                         }
2050                                 } else {
2051                                         // Never seen any data from this arbitrator so record the first one
2052                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2053                                 }
2054
2055                                 // We have already seen this commit before so need to do the full processing on this commit
2056                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2057
2058                                         // Update the last transaction that was updated if we can
2059                                         if (commit.getTransactionSequenceNumber() != -1) {
2060                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2061
2062                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2063                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2064                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2065                                                 }
2066                                         }
2067
2068                                         continue;
2069                                 }
2070
2071                                 // If we got here then this is a brand new commit and needs full processing
2072
2073                                 // Get what commits should be edited, these are the commits that have live values for their keys
2074                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
2075                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2076                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2077                                 }
2078                                 commitsToEdit.remove(null); // remove null since it could be in this set
2079
2080                                 // Update each previous commit that needs to be updated
2081                                 for (Commit previousCommit : commitsToEdit) {
2082
2083                                         // Only bother with live commits (TODO: Maybe remove this check)
2084                                         if (previousCommit.isLive()) {
2085
2086                                                 // Update which keys in the old commits are still live
2087                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2088                                                         previousCommit.invalidateKey(kv.getKey());
2089                                                 }
2090
2091                                                 // if the commit is now dead then remove it
2092                                                 if (!previousCommit.isLive()) {
2093                                                         commitForClientTable.remove(previousCommit);
2094                                                 }
2095                                         }
2096                                 }
2097
2098                                 // Update the last seen sequence number from this arbitrator
2099                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
2100                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2101                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2102                                         }
2103                                 } else {
2104                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2105                                 }
2106
2107                                 // We processed a new commit that we havent seen before
2108                                 didProcessANewCommit = true;
2109
2110                                 // Update the committed table of keys and which commit is using which key
2111                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2112                                         committedKeyValueTable.put(kv.getKey(), kv);
2113                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
2114                                 }
2115                         }
2116                 }
2117
2118                 return didProcessANewCommit;
2119         }
2120
2121         /**
2122          * Create the speculative table from transactions that are still live and have come from the cloud
2123          */
2124         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
2125                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2126                         // There is nothing to speculate on
2127                         return false;
2128                 }
2129
2130                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2131                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
2132                 Collections.sort(transactionSequenceNumbersSorted);
2133
2134                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2135
2136
2137                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2138                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2139                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2140
2141                         // Start from scratch
2142                         speculatedKeyValueTable.clear();
2143                         lastTransactionSequenceNumberSpeculatedOn = -1;
2144                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2145
2146                 }
2147
2148                 // Remember the front of the transaction list
2149                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2150
2151                 // Find where to start arbitration from
2152                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2153
2154                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2155                         // Make sure we are not out of bounds
2156                         return false; // did not speculate
2157                 }
2158
2159                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2160                 boolean didSkip = true;
2161
2162                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2163                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2164                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2165
2166                         if (!transaction.isComplete()) {
2167                                 // If there is an incomplete transaction then there is nothing we can do
2168                                 // add this transactions arbitrator to the list of arbitrators we should ignore
2169                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2170                                 didSkip = true;
2171                                 continue;
2172                         }
2173
2174                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2175                                 continue;
2176                         }
2177
2178                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2179
2180                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
2181                                 // Guard evaluated to true so update the speculative table
2182                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2183                                         speculatedKeyValueTable.put(kv.getKey(), kv);
2184                                 }
2185                         }
2186                 }
2187
2188                 if (didSkip) {
2189                         // Since there was a skip we need to redo the speculation next time around
2190                         lastTransactionSequenceNumberSpeculatedOn = -1;
2191                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2192                 }
2193
2194                 // We did some speculation
2195                 return true;
2196         }
2197
2198         /**
2199          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2200          */
2201         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
2202                 if (pendingTransactionQueue.size() == 0) {
2203                         // There is nothing to speculate on
2204                         return;
2205                 }
2206
2207
2208                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2209                         // need to reset on the pending speculation
2210                         lastPendingTransactionSpeculatedOn = null;
2211                         firstPendingTransaction = pendingTransactionQueue.get(0);
2212                         pendingTransactionSpeculatedKeyValueTable.clear();
2213                 }
2214
2215                 // Find where to start arbitration from
2216                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2217
2218                 if (startIndex >= pendingTransactionQueue.size()) {
2219                         // Make sure we are not out of bounds
2220                         return;
2221                 }
2222
2223                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2224                         Transaction transaction = pendingTransactionQueue.get(i);
2225
2226                         lastPendingTransactionSpeculatedOn = transaction;
2227
2228                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2229                                 // Guard evaluated to true so update the speculative table
2230                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2231                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2232                                 }
2233                         }
2234                 }
2235         }
2236
2237         /**
2238          * Set dead and remove from the live transaction tables the transactions that are dead
2239          */
2240         private void updateLiveTransactionsAndStatus() {
2241
2242                 // Go through each of the transactions
2243                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2244                         Transaction transaction = iter.next().getValue();
2245
2246                         // Check if the transaction is dead
2247                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2248                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2249
2250                                 // Set dead the transaction
2251                                 transaction.setDead();
2252
2253                                 // Remove the transaction from the live table
2254                                 iter.remove();
2255                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2256                         }
2257                 }
2258
2259                 // Go through each of the transactions
2260                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2261                         TransactionStatus status = iter.next().getValue();
2262
2263                         // Check if the transaction is dead
2264                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2265                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2266
2267                                 // Set committed
2268                                 status.setStatus(TransactionStatus.StatusCommitted);
2269
2270                                 // Remove
2271                                 iter.remove();
2272                         }
2273                 }
2274         }
2275
2276         /**
2277          * Process this slot, entry by entry.  Also update the latest message sent by slot
2278          */
2279         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2280
2281                 // Update the last message seen
2282                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2283
2284                 // Process each entry in the slot
2285                 for (Entry entry : slot.getEntries()) {
2286                         switch (entry.getType()) {
2287
2288                         case Entry.TypeCommitPart:
2289                                 processEntry((CommitPart)entry);
2290                                 break;
2291
2292                         case Entry.TypeAbort:
2293                                 processEntry((Abort)entry);
2294                                 break;
2295
2296                         case Entry.TypeTransactionPart:
2297                                 processEntry((TransactionPart)entry);
2298                                 break;
2299
2300                         case Entry.TypeNewKey:
2301                                 processEntry((NewKey)entry);
2302                                 break;
2303
2304                         case Entry.TypeLastMessage:
2305                                 processEntry((LastMessage)entry, machineSet);
2306                                 break;
2307
2308                         case Entry.TypeRejectedMessage:
2309                                 processEntry((RejectedMessage)entry, indexer);
2310                                 break;
2311
2312                         case Entry.TypeTableStatus:
2313                                 processEntry((TableStatus)entry, slot.getSequenceNumber());
2314                                 break;
2315
2316                         default:
2317                                 throw new Error("Unrecognized type: " + entry.getType());
2318                         }
2319                 }
2320         }
2321
2322         /**
2323          * Update the last message that was sent for a machine Id
2324          */
2325         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2326                 // Update what the last message received by a machine was
2327                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2328         }
2329
2330         /**
2331          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2332          */
2333         private void processEntry(NewKey entry) {
2334
2335                 // Update the arbitrator table with the new key information
2336                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2337
2338                 // Update what the latest live new key is
2339                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2340                 if (oldNewKey != null) {
2341                         // Delete the old new key messages
2342                         oldNewKey.setDead();
2343                 }
2344         }
2345
2346         /**
2347          * Process new table status entries and set dead the old ones as new ones come in.
2348          * keeps track of the largest and smallest table status seen in this current round
2349          * of updating the local copy of the block chain
2350          */
2351         private void processEntry(TableStatus entry, long seq) {
2352                 int newNumSlots = entry.getMaxSlots();
2353                 updateCurrMaxSize(newNumSlots);
2354
2355                 initExpectedSize(seq, newNumSlots);
2356
2357                 if (liveTableStatus != null) {
2358                         // We have a larger table status so the old table status is no longer alive
2359                         liveTableStatus.setDead();
2360                 }
2361
2362                 // Make this new table status the latest alive table status
2363                 liveTableStatus = entry;
2364         }
2365
2366         /**
2367          * Check old messages to see if there is a block chain violation. Also
2368          */
2369         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2370                 long oldSeqNum = entry.getOldSeqNum();
2371                 long newSeqNum = entry.getNewSeqNum();
2372                 boolean isequal = entry.getEqual();
2373                 long machineId = entry.getMachineID();
2374                 long seq = entry.getSequenceNumber();
2375
2376
2377                 // Check if we have messages that were supposed to be rejected in our local block chain
2378                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2379
2380                         // Get the slot
2381                         Slot slot = indexer.getSlot(seqNum);
2382
2383                         if (slot != null) {
2384                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2385
2386                                 long slotMachineId = slot.getMachineID();
2387                                 if (isequal != (slotMachineId == machineId)) {
2388                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2389                                 }
2390                         }
2391                 }
2392
2393
2394                 // Create a list of clients to watch until they see this rejected message entry.
2395                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2396                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2397
2398                         // Machine ID for the last message entry
2399                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2400
2401                         // We've seen it, don't need to continue to watch.  Our next
2402                         // message will implicitly acknowledge it.
2403                         if (lastMessageEntryMachineId == localMachineId) {
2404                                 continue;
2405                         }
2406
2407                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2408                         long entrySequenceNumber = lastMessageValue.getFirst();
2409
2410                         if (entrySequenceNumber < seq) {
2411
2412                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2413                                 addWatchList(lastMessageEntryMachineId, entry);
2414
2415                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2416                                 deviceWatchSet.add(lastMessageEntryMachineId);
2417                         }
2418                 }
2419
2420                 if (deviceWatchSet.isEmpty()) {
2421                         // This rejected message has been seen by all the clients so
2422                         entry.setDead();
2423                 } else {
2424                         // We need to watch this rejected message
2425                         entry.setWatchSet(deviceWatchSet);
2426                 }
2427         }
2428
2429         /**
2430          * Check if this abort is live, if not then save it so we can kill it later.
2431          * update the last transaction number that was arbitrated on.
2432          */
2433         private void processEntry(Abort entry) {
2434
2435
2436                 if (entry.getTransactionSequenceNumber() != -1) {
2437                         // update the transaction status if it was sent to the server
2438                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2439                         if (status != null) {
2440                                 status.setStatus(TransactionStatus.StatusAborted);
2441                         }
2442                 }
2443
2444                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2445                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2446                 if (previouslySeenAbort != null) {
2447                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2448                 }
2449
2450                 if (entry.getTransactionArbitrator() == localMachineId) {
2451                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2452                 }
2453
2454                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2455
2456                         // The machine already saw this so it is dead
2457                         entry.setDead();
2458                         liveAbortTable.remove(entry.getAbortId());
2459
2460                         if (entry.getTransactionArbitrator() == localMachineId) {
2461                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2462                         }
2463
2464                         return;
2465                 }
2466
2467
2468
2469
2470                 // Update the last arbitration data that we have seen so far
2471                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2472
2473                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2474                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2475                                 // Is larger
2476                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2477                         }
2478                 } else {
2479                         // Never seen any data from this arbitrator so record the first one
2480                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2481                 }
2482
2483
2484                 // Set dead a transaction if we can
2485                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2486                 if (transactionToSetDead != null) {
2487                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2488                 }
2489
2490                 // Update the last transaction sequence number that the arbitrator arbitrated on
2491                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2492                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2493
2494                         // Is a valid one
2495                         if (entry.getTransactionSequenceNumber() != -1) {
2496                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2497                         }
2498                 }
2499         }
2500
2501         /**
2502          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2503          */
2504         private void processEntry(TransactionPart entry) {
2505                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2506                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2507                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2508                         // This transaction is dead, it was already committed or aborted
2509                         entry.setDead();
2510                         return;
2511                 }
2512
2513                 // This part is still alive
2514                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2515
2516                 if (transactionPart == null) {
2517                         // Dont have a table for this machine Id yet so make one
2518                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2519                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2520                 }
2521
2522                 // Update the part and set dead ones we have already seen (got a rescued version)
2523                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2524                 if (previouslySeenPart != null) {
2525                         previouslySeenPart.setDead();
2526                 }
2527         }
2528
2529         /**
2530          * Process new commit entries and save them for future use.  Delete duplicates
2531          */
2532         private void processEntry(CommitPart entry) {
2533
2534
2535                 // Update the last transaction that was updated if we can
2536                 if (entry.getTransactionSequenceNumber() != -1) {
2537                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2538
2539                         // Update the last transaction sequence number that the arbitrator arbitrated on
2540                         if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2541                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2542                         }
2543                 }
2544
2545
2546
2547
2548                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2549
2550                 if (commitPart == null) {
2551                         // Don't have a table for this machine Id yet so make one
2552                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2553                         newCommitParts.put(entry.getMachineId(), commitPart);
2554                 }
2555
2556                 // Update the part and set dead ones we have already seen (got a rescued version)
2557                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2558                 if (previouslySeenPart != null) {
2559                         previouslySeenPart.setDead();
2560                 }
2561         }
2562
2563         /**
2564          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2565          * Updates the live aborts, removes those that are dead and sets them dead.
2566          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2567          * other clients have not had a rollback on the last message.
2568          */
2569         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2570
2571                 // We have seen this machine ID
2572                 machineSet.remove(machineId);
2573
2574                 // Get the set of rejected messages that this machine Id is has not seen yet
2575                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2576
2577                 // If there is a rejected message that this machine Id has not seen yet
2578                 if (watchset != null) {
2579
2580                         // Go through each rejected message that this machine Id has not seen yet
2581                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2582
2583                                 RejectedMessage rm = rmit.next();
2584
2585                                 // If this machine Id has seen this rejected message...
2586                                 if (rm.getSequenceNumber() <= seqNum) {
2587
2588                                         // Remove it from our watchlist
2589                                         rmit.remove();
2590
2591                                         // Decrement machines that need to see this notification
2592                                         rm.removeWatcher(machineId);
2593                                 }
2594                         }
2595                 }
2596
2597                 // Set dead the abort
2598                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2599                         Abort abort = i.next().getValue();
2600
2601                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2602                                 abort.setDead();
2603                                 i.remove();
2604
2605                                 if (abort.getTransactionArbitrator() == localMachineId) {
2606                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2607                                 }
2608                         }
2609                 }
2610
2611
2612
2613                 if (machineId == localMachineId) {
2614                         // Our own messages are immediately dead.
2615                         if (liveness instanceof LastMessage) {
2616                                 ((LastMessage)liveness).setDead();
2617                         } else if (liveness instanceof Slot) {
2618                                 ((Slot)liveness).setDead();
2619                         } else {
2620                                 throw new Error("Unrecognized type");
2621                         }
2622                 }
2623
2624                 // Get the old last message for this device
2625                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2626                 if (lastMessageEntry == null) {
2627                         // If no last message then there is nothing else to process
2628                         return;
2629                 }
2630
2631                 long lastMessageSeqNum = lastMessageEntry.getFirst();
2632                 Liveness lastEntry = lastMessageEntry.getSecond();
2633
2634                 // If it is not our machine Id since we already set ours to dead
2635                 if (machineId != localMachineId) {
2636                         if (lastEntry instanceof LastMessage) {
2637                                 ((LastMessage)lastEntry).setDead();
2638                         } else if (lastEntry instanceof Slot) {
2639                                 ((Slot)lastEntry).setDead();
2640                         } else {
2641                                 throw new Error("Unrecognized type");
2642                         }
2643                 }
2644
2645                 // Make sure the server is not playing any games
2646                 if (machineId == localMachineId) {
2647
2648                         if (hadPartialSendToServer) {
2649                                 // We were not making any updates and we had a machine mismatch
2650                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2651                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2652                                 }
2653
2654                         } else {
2655                                 // We were not making any updates and we had a machine mismatch
2656                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2657                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2658                                 }
2659                         }
2660                 } else {
2661                         if (lastMessageSeqNum > seqNum) {
2662                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2663                         }
2664                 }
2665         }
2666
2667         /**
2668          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2669          * rejected message entry and which have not.
2670          */
2671         private void addWatchList(long machineId, RejectedMessage entry) {
2672                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2673                 if (entries == null) {
2674                         // There is no set for this machine ID yet so create one
2675                         entries = new HashSet<RejectedMessage>();
2676                         rejectedMessageWatchListTable.put(machineId, entries);
2677                 }
2678                 entries.add(entry);
2679         }
2680
2681         /**
2682          * Check if the HMAC chain is not violated
2683          */
2684         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2685                 for (int i = 0; i < newSlots.length; i++) {
2686                         Slot currSlot = newSlots[i];
2687                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2688                         if (prevSlot != null &&
2689                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2690                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2691                 }
2692         }
2693 }