edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void * a, const void *b) {
25         const int64_t * pa = (const int64_t *) a;
26         const int64_t * pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localTransactionSequenceNumber(0),
50         lastTransactionSequenceNumberSpeculatedOn(0),
51         oldestTransactionSequenceNumberSpeculatedOn(0),
52         localArbitrationSequenceNumber(0),
53         hadPartialSendToServer(false),
54         attemptedToSendToServer(false),
55         expectedsize(0),
56         didFindTableStatus(false),
57         currMaxSize(0),
58         lastSlotAttemptedToSend(NULL),
59         lastIsNewKey(false),
60         lastNewSize(0),
61         lastTransactionPartsSent(NULL),
62         lastPendingSendArbitrationEntriesToDelete(NULL),
63         lastNewKey(NULL),
64         committedKeyValueTable(NULL),
65         speculatedKeyValueTable(NULL),
66         pendingTransactionSpeculatedKeyValueTable(NULL),
67         liveNewKeyTable(NULL),
68         lastMessageTable(NULL),
69         rejectedMessageWatchVectorTable(NULL),
70         arbitratorTable(NULL),
71         liveAbortTable(NULL),
72         newTransactionParts(NULL),
73         newCommitParts(NULL),
74         lastArbitratedTransactionNumberByArbitratorTable(NULL),
75         liveTransactionBySequenceNumberTable(NULL),
76         liveTransactionByTransactionIdTable(NULL),
77         liveCommitsTable(NULL),
78         liveCommitsByKeyTable(NULL),
79         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80         rejectedSlotVector(NULL),
81         pendingTransactionQueue(NULL),
82         pendingSendArbitrationRounds(NULL),
83         pendingSendArbitrationEntriesToDelete(NULL),
84         transactionPartsSent(NULL),
85         outstandingTransactionStatus(NULL),
86         liveAbortsGeneratedByLocal(NULL),
87         offlineTransactionsCommittedAndAtServer(NULL),
88         localCommunicationTable(NULL),
89         lastTransactionSeenFromMachineFromServer(NULL),
90         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91         lastInsertedNewKey(false),
92         lastSeqNumArbOn(0)
93 {
94         init();
95 }
96
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
98         buffer(NULL),
99         cloud(_cloud),
100         random(NULL),
101         liveTableStatus(NULL),
102         pendingTransactionBuilder(NULL),
103         lastPendingTransactionSpeculatedOn(NULL),
104         firstPendingTransaction(NULL),
105         numberOfSlots(0),
106         bufferResizeThreshold(0),
107         liveSlotCount(0),
108         oldestLiveSlotSequenceNumver(1),
109         localMachineId(_localMachineId),
110         sequenceNumber(0),
111         localTransactionSequenceNumber(0),
112         lastTransactionSequenceNumberSpeculatedOn(0),
113         oldestTransactionSequenceNumberSpeculatedOn(0),
114         localArbitrationSequenceNumber(0),
115         hadPartialSendToServer(false),
116         attemptedToSendToServer(false),
117         expectedsize(0),
118         didFindTableStatus(false),
119         currMaxSize(0),
120         lastSlotAttemptedToSend(NULL),
121         lastIsNewKey(false),
122         lastNewSize(0),
123         lastTransactionPartsSent(NULL),
124         lastPendingSendArbitrationEntriesToDelete(NULL),
125         lastNewKey(NULL),
126         committedKeyValueTable(NULL),
127         speculatedKeyValueTable(NULL),
128         pendingTransactionSpeculatedKeyValueTable(NULL),
129         liveNewKeyTable(NULL),
130         lastMessageTable(NULL),
131         rejectedMessageWatchVectorTable(NULL),
132         arbitratorTable(NULL),
133         liveAbortTable(NULL),
134         newTransactionParts(NULL),
135         newCommitParts(NULL),
136         lastArbitratedTransactionNumberByArbitratorTable(NULL),
137         liveTransactionBySequenceNumberTable(NULL),
138         liveTransactionByTransactionIdTable(NULL),
139         liveCommitsTable(NULL),
140         liveCommitsByKeyTable(NULL),
141         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142         rejectedSlotVector(NULL),
143         pendingTransactionQueue(NULL),
144         pendingSendArbitrationRounds(NULL),
145         pendingSendArbitrationEntriesToDelete(NULL),
146         transactionPartsSent(NULL),
147         outstandingTransactionStatus(NULL),
148         liveAbortsGeneratedByLocal(NULL),
149         offlineTransactionsCommittedAndAtServer(NULL),
150         localCommunicationTable(NULL),
151         lastTransactionSeenFromMachineFromServer(NULL),
152         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153         lastInsertedNewKey(false),
154         lastSeqNumArbOn(0)
155 {
156         init();
157 }
158
159 /**
160  * Init all the stuff needed for for table usage
161  */
162 void Table::init() {
163         // Init helper objects
164         random = new Random();
165         buffer = new SlotBuffer();
166
167         // init data structs
168         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
169         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
170         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
171         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
172         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
173         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
174         arbitratorTable = new Hashtable<IoTString *, int64_t>();
175         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
176         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
177         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
178         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
179         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
180         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
181         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
182         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
183         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
184         rejectedSlotVector = new Vector<int64_t>();
185         pendingTransactionQueue = new Vector<Transaction *>();
186         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
187         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
188         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
189         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
190         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
191         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
192         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
193         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
194         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
195
196         // Other init stuff
197         numberOfSlots = buffer->capacity();
198         setResizeThreshold();
199 }
200
201 /**
202  * Initialize the table by inserting a table status as the first entry
203  * into the table status also initialize the crypto stuff.
204  */
205 void Table::initTable() {
206         cloud->initSecurity();
207
208         // Create the first insertion into the block chain which is the table status
209         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
210         localSequenceNumber++;
211         TableStatus *status = new TableStatus(s, numberOfSlots);
212         s->addEntry(status);
213         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
214
215         if (array == NULL) {
216                 array = new Array<Slot *>(1);
217                 array->set(0, s);
218                 // update local block chain
219                 validateAndUpdate(array, true);
220         } else if (array->length() == 1) {
221                 // in case we did push the slot BUT we failed to init it
222                 validateAndUpdate(array, true);
223         } else {
224                 throw new Error("Error on initialization");
225         }
226 }
227
228 /**
229  * Rebuild the table from scratch by pulling the latest block chain
230  * from the server.
231  */
232 void Table::rebuild() {
233         // Just pull the latest slots from the server
234         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
235         validateAndUpdate(newslots, true);
236         sendToServer(NULL);
237         updateLiveTransactionsAndStatus();
238 }
239
240 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
241         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
242 }
243
244 int64_t Table::getArbitrator(IoTString *key) {
245         return arbitratorTable->get(key);
246 }
247
248 void Table::close() {
249         cloud->close();
250 }
251
252 IoTString *Table::getCommitted(IoTString *key)  {
253         KeyValue *kv = committedKeyValueTable->get(key);
254
255         if (kv != NULL) {
256                 return kv->getValue();
257         } else {
258                 return NULL;
259         }
260 }
261
262 IoTString *Table::getSpeculative(IoTString *key) {
263         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
264
265         if (kv == NULL) {
266                 kv = speculatedKeyValueTable->get(key);
267         }
268
269         if (kv == NULL) {
270                 kv = committedKeyValueTable->get(key);
271         }
272
273         if (kv != NULL) {
274                 return kv->getValue();
275         } else {
276                 return NULL;
277         }
278 }
279
280 IoTString *Table::getCommittedAtomic(IoTString *key) {
281         KeyValue *kv = committedKeyValueTable->get(key);
282
283         if (!arbitratorTable->contains(key)) {
284                 throw new Error("Key not Found.");
285         }
286
287         // Make sure new key value pair matches the current arbitrator
288         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
289                 // TODO: Maybe not throw en error
290                 throw new Error("Not all Key Values Match Arbitrator.");
291         }
292
293         if (kv != NULL) {
294                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
295                 return kv->getValue();
296         } else {
297                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
298                 return NULL;
299         }
300 }
301
302 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
303         if (!arbitratorTable->contains(key)) {
304                 throw new Error("Key not Found.");
305         }
306
307         // Make sure new key value pair matches the current arbitrator
308         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
309                 // TODO: Maybe not throw en error
310                 throw new Error("Not all Key Values Match Arbitrator.");
311         }
312
313         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
314
315         if (kv == NULL) {
316                 kv = speculatedKeyValueTable->get(key);
317         }
318
319         if (kv == NULL) {
320                 kv = committedKeyValueTable->get(key);
321         }
322
323         if (kv != NULL) {
324                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
325                 return kv->getValue();
326         } else {
327                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
328                 return NULL;
329         }
330 }
331
332 bool Table::update()  {
333         try {
334                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
335                 validateAndUpdate(newSlots, false);
336                 sendToServer(NULL);
337                 updateLiveTransactionsAndStatus();
338                 return true;
339         } catch (Exception *e) {
340                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
341                 while (kit->hasNext()) {
342                         int64_t m = kit->next();
343                         updateFromLocal(m);
344                 }
345                 delete kit;
346         }
347
348         return false;
349 }
350
351 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
352         while (true) {
353                 if (!arbitratorTable->contains(keyName)) {
354                         // There is already an arbitrator
355                         return false;
356                 }
357                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
358
359                 if (sendToServer(newKey)) {
360                         // If successfully inserted
361                         return true;
362                 }
363         }
364 }
365
366 void Table::startTransaction() {
367         // Create a new transaction, invalidates any old pending transactions.
368         pendingTransactionBuilder = new PendingTransaction(localMachineId);
369 }
370
371 void Table::addKV(IoTString *key, IoTString *value) {
372
373         // Make sure it is a valid key
374         if (!arbitratorTable->contains(key)) {
375                 throw new Error("Key not Found.");
376         }
377
378         // Make sure new key value pair matches the current arbitrator
379         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
380                 // TODO: Maybe not throw en error
381                 throw new Error("Not all Key Values Match Arbitrator.");
382         }
383
384         // Add the key value to this transaction
385         KeyValue *kv = new KeyValue(key, value);
386         pendingTransactionBuilder->addKV(kv);
387 }
388
389 TransactionStatus *Table::commitTransaction() {
390         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
391                 // transaction with no updates will have no effect on the system
392                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
393         }
394
395         // Set the local transaction sequence number and increment
396         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
397         localTransactionSequenceNumber++;
398
399         // Create the transaction status
400         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
401
402         // Create the new transaction
403         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
404         newTransaction->setTransactionStatus(transactionStatus);
405
406         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
407                 // Add it to the queue and invalidate the builder for safety
408                 pendingTransactionQueue->add(newTransaction);
409         } else {
410                 arbitrateOnLocalTransaction(newTransaction);
411                 updateLiveStateFromLocal();
412         }
413
414         pendingTransactionBuilder = new PendingTransaction(localMachineId);
415
416         try {
417                 sendToServer(NULL);
418         } catch (ServerException *e) {
419
420                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
421                 uint size = pendingTransactionQueue->size();
422                 uint oldindex = 0;
423                 for (int iter = 0; iter < size; iter++) {
424                         Transaction *transaction = pendingTransactionQueue->get(iter);
425                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
426
427                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
428                                 // Already contacted this client so ignore all attempts to contact this client
429                                 // to preserve ordering for arbitrator
430                                 continue;
431                         }
432
433                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
434
435                         if (sendReturn.getFirst()) {
436                                 // Failed to contact over local
437                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
438                         } else {
439                                 // Successful contact or should not contact
440
441                                 if (sendReturn.getSecond()) {
442                                         // did arbitrate
443                                         oldindex--;
444                                 }
445                         }
446                 }
447                 pendingTransactionQueue->setSize(oldindex);
448         }
449
450         updateLiveStateFromLocal();
451
452         return transactionStatus;
453 }
454
455 /**
456  * Recalculate the new resize threshold
457  */
458 void Table::setResizeThreshold() {
459         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
460         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
461 }
462
463 int64_t Table::getLocalSequenceNumber() {
464         return localSequenceNumber;
465 }
466
467 bool Table::sendToServer(NewKey *newKey) {
468         bool fromRetry = false;
469         try {
470                 if (hadPartialSendToServer) {
471                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
472                         if (newSlots->length() == 0) {
473                                 fromRetry = true;
474                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
475
476                                 if (sendSlotsReturn.getFirst()) {
477                                         if (newKey != NULL) {
478                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
479                                                         newKey = NULL;
480                                                 }
481                                         }
482
483                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
484                                         while (trit->hasNext()) {
485                                                 Transaction *transaction = trit->next();
486                                                 transaction->resetServerFailure();
487                                                 // Update which transactions parts still need to be sent
488                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
489                                                 // Add the transaction status to the outstanding list
490                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
491
492                                                 // Update the transaction status
493                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
494
495                                                 // Check if all the transaction parts were successfully
496                                                 // sent and if so then remove it from pending
497                                                 if (transaction->didSendAllParts()) {
498                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
499                                                         pendingTransactionQueue->remove(transaction);
500                                                 }
501                                         }
502                                         delete trit;
503                                 } else {
504                                         newSlots = sendSlotsReturn.getThird();
505                                         bool isInserted = false;
506                                         for (uint si = 0; si < newSlots->length(); si++) {
507                                                 Slot *s = newSlots->get(si);
508                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
509                                                         isInserted = true;
510                                                         break;
511                                                 }
512                                         }
513
514                                         for (uint si = 0; si < newSlots->length(); si++) {
515                                                 Slot *s = newSlots->get(si);
516                                                 if (isInserted) {
517                                                         break;
518                                                 }
519
520                                                 // Process each entry in the slot
521                                                 Vector<Entry *> *ventries = s->getEntries();
522                                                 uint vesize = ventries->size();
523                                                 for (uint vei = 0; vei < vesize; vei++) {
524                                                         Entry *entry = ventries->get(vei);
525                                                         if (entry->getType() == TypeLastMessage) {
526                                                                 LastMessage *lastMessage = (LastMessage *)entry;
527                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
528                                                                         isInserted = true;
529                                                                         break;
530                                                                 }
531                                                         }
532                                                 }
533                                         }
534
535                                         if (isInserted) {
536                                                 if (newKey != NULL) {
537                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
538                                                                 newKey = NULL;
539                                                         }
540                                                 }
541
542                                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
543                                                 while (trit->hasNext()) {
544                                                         Transaction *transaction = trit->next();
545                                                         transaction->resetServerFailure();
546
547                                                         // Update which transactions parts still need to be sent
548                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
549
550                                                         // Add the transaction status to the outstanding list
551                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
552
553                                                         // Update the transaction status
554                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
555
556                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
557                                                         if (transaction->didSendAllParts()) {
558                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
559                                                                 pendingTransactionQueue->remove(transaction);
560                                                         } else {
561                                                                 transaction->resetServerFailure();
562                                                                 // Set the transaction sequence number back to nothing
563                                                                 if (!transaction->didSendAPartToServer()) {
564                                                                         transaction->setSequenceNumber(-1);
565                                                                 }
566                                                         }
567                                                 }
568                                                 delete trit;
569                                         }
570                                 }
571
572                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
573                                 while (trit->hasNext()) {
574                                         Transaction *transaction = trit->next();
575                                         transaction->resetServerFailure();
576                                         // Set the transaction sequence number back to nothing
577                                         if (!transaction->didSendAPartToServer()) {
578                                                 transaction->setSequenceNumber(-1);
579                                         }
580                                 }
581                                 delete trit;
582
583                                 if (sendSlotsReturn.getThird()->length() != 0) {
584                                         // insert into the local block chain
585                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
586                                 }
587                                 // continue;
588                         } else {
589                                 bool isInserted = false;
590                                 for (uint si = 0; si < newSlots->length(); si++) {
591                                         Slot *s = newSlots->get(si);
592                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
593                                                 isInserted = true;
594                                                 break;
595                                         }
596                                 }
597
598                                 for (uint si = 0; si < newSlots->length(); si++) {
599                                         Slot *s = newSlots->get(si);
600                                         if (isInserted) {
601                                                 break;
602                                         }
603
604                                         // Process each entry in the slot
605                                         Vector<Entry *> *entries = s->getEntries();
606                                         uint eSize = entries->size();
607                                         for(uint ei=0; ei < eSize; ei++) {
608                                                 Entry * entry = entries->get(ei);
609                                                 
610                                                 if (entry->getType() == TypeLastMessage) {
611                                                         LastMessage *lastMessage = (LastMessage *)entry;
612                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
613                                                                 isInserted = true;
614                                                                 break;
615                                                         }
616                                                 }
617                                         }
618                                 }
619
620                                 if (isInserted) {
621                                         if (newKey != NULL) {
622                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
623                                                         newKey = NULL;
624                                                 }
625                                         }
626
627                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
628                                         while (trit->hasNext()) {
629                                                 Transaction *transaction = trit->next();
630                                                 transaction->resetServerFailure();
631
632                                                 // Update which transactions parts still need to be sent
633                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
634
635                                                 // Add the transaction status to the outstanding list
636                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
637
638                                                 // Update the transaction status
639                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
640
641                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
642                                                 if (transaction->didSendAllParts()) {
643                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
644                                                         pendingTransactionQueue->remove(transaction);
645                                                 } else {
646                                                         transaction->resetServerFailure();
647                                                         // Set the transaction sequence number back to nothing
648                                                         if (!transaction->didSendAPartToServer()) {
649                                                                 transaction->setSequenceNumber(-1);
650                                                         }
651                                                 }
652                                         }
653                                         delete trit;
654                                 } else {
655                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
656                                         while (trit->hasNext()) {
657                                                 Transaction *transaction = trit->next();
658                                                 transaction->resetServerFailure();
659                                                 // Set the transaction sequence number back to nothing
660                                                 if (!transaction->didSendAPartToServer()) {
661                                                         transaction->setSequenceNumber(-1);
662                                                 }
663                                         }
664                                         delete trit;
665                                 }
666
667                                 // insert into the local block chain
668                                 validateAndUpdate(newSlots, true);
669                         }
670                 }
671         } catch (ServerException *e) {
672                 throw e;
673         }
674
675
676
677         try {
678                 // While we have stuff that needs inserting into the block chain
679                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
680
681                         fromRetry = false;
682
683                         if (hadPartialSendToServer) {
684                                 throw new Error("Should Be error free");
685                         }
686
687
688
689                         // If there is a new key with same name then end
690                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
691                                 return false;
692                         }
693
694                         // Create the slot
695                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
696                         localSequenceNumber++;
697
698                         // Try to fill the slot with data
699                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
700                         bool needsResize = fillSlotsReturn.getFirst();
701                         int newSize = fillSlotsReturn.getSecond();
702                         bool insertedNewKey = fillSlotsReturn.getThird();
703
704                         if (needsResize) {
705                                 // Reset which transaction to send
706                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
707                                 while (trit->hasNext()) {
708                                         Transaction *transaction = trit->next();
709                                         transaction->resetNextPartToSend();
710
711                                         // Set the transaction sequence number back to nothing
712                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
713                                                 transaction->setSequenceNumber(-1);
714                                         }
715                                 }
716                                 delete trit;
717
718                                 // Clear the sent data since we are trying again
719                                 pendingSendArbitrationEntriesToDelete->clear();
720                                 transactionPartsSent->clear();
721
722                                 // We needed a resize so try again
723                                 fillSlot(slot, true, newKey);
724                         }
725
726                         lastSlotAttemptedToSend = slot;
727                         lastIsNewKey = (newKey != NULL);
728                         lastInsertedNewKey = insertedNewKey;
729                         lastNewSize = newSize;
730                         lastNewKey = newKey;
731                         lastTransactionPartsSent = transactionPartsSent->clone();
732                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
733
734                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
735
736                         if (sendSlotsReturn.getFirst()) {
737
738                                 // Did insert into the block chain
739
740                                 if (insertedNewKey) {
741                                         // This slot was what was inserted not a previous slot
742
743                                         // New Key was successfully inserted into the block chain so dont want to insert it again
744                                         newKey = NULL;
745                                 }
746
747                                 // Remove the aborts and commit parts that were sent from the pending to send queue
748                                 uint size = pendingSendArbitrationRounds->size();
749                                 uint oldcount = 0;
750                                 for (uint i = 0; i < size; i++) {
751                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
752                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
753
754                                         if (!round->isDoneSending()) {
755                                                 // Sent all the parts
756                                                 pendingSendArbitrationRounds->set(oldcount++,
757                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
758                                         }
759                                 }
760                                 pendingSendArbitrationRounds->setSize(oldcount);
761
762                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
763                                 while (trit->hasNext()) {
764                                         Transaction *transaction = trit->next();
765                                         transaction->resetServerFailure();
766
767                                         // Update which transactions parts still need to be sent
768                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
769
770                                         // Add the transaction status to the outstanding list
771                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
772
773                                         // Update the transaction status
774                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
775
776                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
777                                         if (transaction->didSendAllParts()) {
778                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
779                                                 pendingTransactionQueue->remove(transaction);
780                                         }
781                                 }
782                                 delete trit;
783                         } else {
784                                 // Reset which transaction to send
785                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
786                                 while (trit->hasNext()) {
787                                         Transaction *transaction = trit->next();
788                                         transaction->resetNextPartToSend();
789
790                                         // Set the transaction sequence number back to nothing
791                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
792                                                 transaction->setSequenceNumber(-1);
793                                         }
794                                 }
795                                 delete trit;
796                         }
797
798                         // Clear the sent data in preparation for next send
799                         pendingSendArbitrationEntriesToDelete->clear();
800                         transactionPartsSent->clear();
801
802                         if (sendSlotsReturn.getThird()->length() != 0) {
803                                 // insert into the local block chain
804                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
805                         }
806                 }
807
808         } catch (ServerException *e) {
809                 if (e->getType() != ServerException_TypeInputTimeout) {
810                         // Nothing was able to be sent to the server so just clear these data structures
811                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
812                         while (trit->hasNext()) {
813                                 Transaction *transaction = trit->next();
814                                 transaction->resetNextPartToSend();
815
816                                 // Set the transaction sequence number back to nothing
817                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
818                                         transaction->setSequenceNumber(-1);
819                                 }
820                         }
821                         delete trit;
822                 } else {
823                         // There was a partial send to the server
824                         hadPartialSendToServer = true;
825
826                         // Nothing was able to be sent to the server so just clear these data structures
827                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
828                         while (trit->hasNext()) {
829                                 Transaction *transaction = trit->next();
830                                 transaction->resetNextPartToSend();
831                                 transaction->setServerFailure();
832                         }
833                         delete trit;
834                 }
835
836                 pendingSendArbitrationEntriesToDelete->clear();
837                 transactionPartsSent->clear();
838
839                 throw e;
840         }
841
842         return newKey == NULL;
843 }
844
845 bool Table::updateFromLocal(int64_t machineId) {
846         if (!localCommunicationTable->contains(machineId))
847                 return false;
848
849         Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
850
851         // Get the size of the send data
852         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
853
854         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
855         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
856                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
857         }
858
859         Array<char> *sendData = new Array<char>(sendDataSize);
860         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
861
862         // Encode the data
863         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
864         bbEncode->putInt(0);
865
866         // Send by local
867         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868         localSequenceNumber++;
869
870         if (returnData == NULL) {
871                 // Could not contact server
872                 return false;
873         }
874
875         // Decode the data
876         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877         int numberOfEntries = bbDecode->getInt();
878
879         for (int i = 0; i < numberOfEntries; i++) {
880                 char type = bbDecode->get();
881                 if (type == TypeAbort) {
882                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
883                         processEntry(abort);
884                 } else if (type == TypeCommitPart) {
885                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
886                         processEntry(commitPart);
887                 }
888         }
889
890         updateLiveStateFromLocal();
891
892         return true;
893 }
894
895 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
896
897         // Get the devices local communications
898         if (!localCommunicationTable->contains(transaction->getArbitrator()))
899                 return Pair<bool, bool>(true, false);
900         
901         Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
902
903         // Get the size of the send data
904         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
905         {
906                 Vector<TransactionPart *> * tParts = transaction->getParts();
907                 uint tPartsSize = tParts->size();
908                 for (uint i = 0; i < tPartsSize; i++) {
909                         TransactionPart * part = tParts->get(i);
910                         sendDataSize += part->getSize();
911                 }
912         }
913
914         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
915         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
916                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
917         }
918
919         // Make the send data size
920         Array<char> *sendData = new Array<char>(sendDataSize);
921         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
922
923         // Encode the data
924         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
925         bbEncode->putInt(transaction->getParts()->size());
926         {
927                 Vector<TransactionPart *> * tParts = transaction->getParts();
928                 uint tPartsSize = tParts->size();
929                 for (uint i = 0; i < tPartsSize; i++) {
930                         TransactionPart * part = tParts->get(i);
931                         part->encode(bbEncode);
932                 }
933         }
934
935         // Send by local
936         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
937         localSequenceNumber++;
938
939         if (returnData == NULL) {
940                 // Could not contact server
941                 return Pair<bool, bool>(true, false);
942         }
943
944         // Decode the data
945         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
946         bool didCommit = bbDecode->get() == 1;
947         bool couldArbitrate = bbDecode->get() == 1;
948         int numberOfEntries = bbDecode->getInt();
949         bool foundAbort = false;
950
951         for (int i = 0; i < numberOfEntries; i++) {
952                 char type = bbDecode->get();
953                 if (type == TypeAbort) {
954                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
955
956                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
957                                 foundAbort = true;
958                         }
959
960                         processEntry(abort);
961                 } else if (type == TypeCommitPart) {
962                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
963                         processEntry(commitPart);
964                 }
965         }
966
967         updateLiveStateFromLocal();
968
969         if (couldArbitrate) {
970                 TransactionStatus * status =  transaction->getTransactionStatus();
971                 if (didCommit) {
972                         status->setStatus(TransactionStatus_StatusCommitted);
973                 } else {
974                         status->setStatus(TransactionStatus_StatusAborted);
975                 }
976         } else {
977                 TransactionStatus * status =  transaction->getTransactionStatus();
978                 if (foundAbort) {
979                         status->setStatus(TransactionStatus_StatusAborted);
980                 } else {
981                         status->setStatus(TransactionStatus_StatusCommitted);
982                 }
983         }
984
985         return Pair<bool, bool>(false, true);
986 }
987
988 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
989         // Decode the data
990         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
991         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
992         int numberOfParts = bbDecode->getInt();
993
994         // If we did commit a transaction or not
995         bool didCommit = false;
996         bool couldArbitrate = false;
997
998         if (numberOfParts != 0) {
999
1000                 // decode the transaction
1001                 Transaction *transaction = new Transaction();
1002                 for (int i = 0; i < numberOfParts; i++) {
1003                         bbDecode->get();
1004                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1005                         transaction->addPartDecode(newPart);
1006                 }
1007
1008                 // Arbitrate on transaction and pull relevant return data
1009                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1010                 couldArbitrate = localArbitrateReturn.getFirst();
1011                 didCommit = localArbitrateReturn.getSecond();
1012
1013                 updateLiveStateFromLocal();
1014
1015                 // Transaction was sent to the server so keep track of it to prevent double commit
1016                 if (transaction->getSequenceNumber() != -1) {
1017                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1018                 }
1019         }
1020
1021         // The data to send back
1022         int returnDataSize = 0;
1023         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1024
1025         // Get the aborts to send back
1026         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1027         {
1028                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1029                 while(abortit->hasNext())
1030                         abortLocalSequenceNumbers->add(abortit->next());
1031                 delete abortit;
1032         }
1033         
1034         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1035
1036         uint asize = abortLocalSequenceNumbers->size();
1037         for(uint i=0; i<asize; i++) {
1038                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1039                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1040                         continue;
1041                 }
1042                 
1043                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1044                 unseenArbitrations->add(abort);
1045                 returnDataSize += abort->getSize();
1046         }
1047
1048         // Get the commits to send back
1049         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1050         if (commitForClientTable != NULL) {
1051                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1052                 {
1053                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1054                         while(commitit->hasNext())
1055                                 commitLocalSequenceNumbers->add(commitit->next());
1056                         delete commitit;
1057                 }
1058                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1059
1060                 uint clsSize = commitLocalSequenceNumbers->size();
1061                 for(uint clsi = 0; clsi < clsSize; clsi++) {
1062                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1063                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1064
1065                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1066                                 continue;
1067                         }
1068
1069                         {
1070                                 Vector<CommitPart *> * parts = commit->getParts();
1071                                 uint nParts = parts->size();
1072                                 for(uint i=0; i<nParts; i++) {
1073                                         CommitPart * commitPart = parts->get(i);
1074                                         unseenArbitrations->add(commitPart);
1075                                         returnDataSize += commitPart->getSize();
1076                                 }
1077                         }
1078                 }
1079         }
1080
1081         // Number of arbitration entries to decode
1082         returnDataSize += 2 * sizeof(int32_t);
1083
1084         // bool of did commit or not
1085         if (numberOfParts != 0) {
1086                 returnDataSize += sizeof(char);
1087         }
1088
1089         // Data to send Back
1090         Array<char> *returnData = new Array<char>(returnDataSize);
1091         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1092
1093         if (numberOfParts != 0) {
1094                 if (didCommit) {
1095                         bbEncode->put((char)1);
1096                 } else {
1097                         bbEncode->put((char)0);
1098                 }
1099                 if (couldArbitrate) {
1100                         bbEncode->put((char)1);
1101                 } else {
1102                         bbEncode->put((char)0);
1103                 }
1104         }
1105
1106         bbEncode->putInt(unseenArbitrations->size());
1107         uint size = unseenArbitrations->size();
1108         for (uint i = 0; i < size; i++) {
1109                 Entry *entry = unseenArbitrations->get(i);
1110                 entry->encode(bbEncode);
1111         }
1112
1113         localSequenceNumber++;
1114         return returnData;
1115 }
1116
1117 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1118         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1119         attemptedToSendToServer = true;
1120
1121         bool inserted = false;
1122         bool lastTryInserted = false;
1123
1124         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1125         if (array == NULL) {
1126                 array = new Array<Slot *>();
1127                 array->set(0, slot);
1128                 rejectedSlotVector->clear();
1129                 inserted = true;
1130         } else {
1131                 if (array->length() == 0) {
1132                         throw new Error("Server Error: Did not send any slots");
1133                 }
1134
1135                 // if (attemptedToSendToServerTmp) {
1136                 if (hadPartialSendToServer) {
1137
1138                         bool isInserted = false;
1139                         uint size = array->length();
1140                         for (uint i = 0; i < size; i++) {
1141                                 Slot *s = array->get(i);
1142                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1143                                         isInserted = true;
1144                                         break;
1145                                 }
1146                         }
1147
1148                         for (uint i = 0; i < size; i++) {
1149                                 Slot *s = array->get(i);
1150                                 if (isInserted) {
1151                                         break;
1152                                 }
1153
1154                                 // Process each entry in the slot
1155                                 Vector<Entry *> *entries = s->getEntries();
1156                                 uint eSize = entries->size();
1157                                 for(uint ei=0; ei < eSize; ei++) {
1158                                         Entry * entry = entries->get(ei);
1159
1160                                         if (entry->getType() == TypeLastMessage) {
1161                                                 LastMessage *lastMessage = (LastMessage *)entry;
1162
1163                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1164                                                         isInserted = true;
1165                                                         break;
1166                                                 }
1167                                         }
1168                                 }
1169                         }
1170
1171                         if (!isInserted) {
1172                                 rejectedSlotVector->add(slot->getSequenceNumber());
1173                                 lastTryInserted = false;
1174                         } else {
1175                                 lastTryInserted = true;
1176                         }
1177                 } else {
1178                         rejectedSlotVector->add(slot->getSequenceNumber());
1179                         lastTryInserted = false;
1180                 }
1181         }
1182
1183         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1184 }
1185
1186 /**
1187  * Returns false if a resize was needed
1188  */
1189 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1190         int newSize = 0;
1191         if (liveSlotCount > bufferResizeThreshold) {
1192                 resize = true;//Resize is forced
1193         }
1194
1195         if (resize) {
1196                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1197                 TableStatus *status = new TableStatus(slot, newSize);
1198                 slot->addEntry(status);
1199         }
1200
1201         // Fill with rejected slots first before doing anything else
1202         doRejectedMessages(slot);
1203
1204         // Do mandatory rescue of entries
1205         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1206
1207         // Extract working variables
1208         bool needsResize = mandatoryRescueReturn.getFirst();
1209         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1210         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1211
1212         if (needsResize && !resize) {
1213                 // We need to resize but we are not resizing so return false
1214                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1215         }
1216
1217         bool inserted = false;
1218         if (newKeyEntry != NULL) {
1219                 newKeyEntry->setSlot(slot);
1220                 if (slot->hasSpace(newKeyEntry)) {
1221                         slot->addEntry(newKeyEntry);
1222                         inserted = true;
1223                 }
1224         }
1225
1226         // Clear the transactions, aborts and commits that were sent previously
1227         transactionPartsSent->clear();
1228         pendingSendArbitrationEntriesToDelete->clear();
1229         uint size = pendingSendArbitrationRounds->size();
1230         for (uint i = 0; i < size; i++) {
1231                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1232                 bool isFull = false;
1233                 round->generateParts();
1234                 Vector<Entry *> *parts = round->getParts();
1235
1236                 // Insert pending arbitration data
1237                 uint vsize = parts->size();
1238                 for (uint vi = 0; vi < vsize; vi++) {
1239                         Entry *arbitrationData = parts->get(vi);
1240
1241                         // If it is an abort then we need to set some information
1242                         if (arbitrationData->getType() == TypeAbort) {
1243                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1244                         }
1245
1246                         if (!slot->hasSpace(arbitrationData)) {
1247                                 // No space so cant do anything else with these data entries
1248                                 isFull = true;
1249                                 break;
1250                         }
1251
1252                         // Add to this current slot and add it to entries to delete
1253                         slot->addEntry(arbitrationData);
1254                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1255                 }
1256
1257                 if (isFull) {
1258                         break;
1259                 }
1260         }
1261
1262         if (pendingTransactionQueue->size() > 0) {
1263                 Transaction *transaction = pendingTransactionQueue->get(0);
1264                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1265                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1266                         transaction->setSequenceNumber(slot->getSequenceNumber());
1267                 }
1268
1269                 while (true) {
1270                         TransactionPart *part = transaction->getNextPartToSend();
1271                         if (part == NULL) {
1272                                 // Ran out of parts to send for this transaction so move on
1273                                 break;
1274                         }
1275
1276                         if (slot->hasSpace(part)) {
1277                                 slot->addEntry(part);
1278                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1279                                 if (partsSent == NULL) {
1280                                         partsSent = new Vector<int32_t>();
1281                                         transactionPartsSent->put(transaction, partsSent);
1282                                 }
1283                                 partsSent->add(part->getPartNumber());
1284                                 transactionPartsSent->put(transaction, partsSent);
1285                         } else {
1286                                 break;
1287                         }
1288                 }
1289         }
1290
1291         // Fill the remainder of the slot with rescue data
1292         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1293
1294         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1295 }
1296
1297 void Table::doRejectedMessages(Slot *s) {
1298         if (!rejectedSlotVector->isEmpty()) {
1299                 /* TODO: We should avoid generating a rejected message entry if
1300                  * there is already a sufficient entry in the queue (e->g->,
1301                  * equalsto value of true and same sequence number)->  */
1302
1303                 int64_t old_seqn = rejectedSlotVector->get(0);
1304                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1305                         int64_t new_seqn = rejectedSlotVector->lastElement();
1306                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1307                         s->addEntry(rm);
1308                 } else {
1309                         int64_t prev_seqn = -1;
1310                         int i = 0;
1311                         /* Go through list of missing messages */
1312                         for (; i < rejectedSlotVector->size(); i++) {
1313                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1314                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1315                                 if (s_msg != NULL)
1316                                         break;
1317                                 prev_seqn = curr_seqn;
1318                         }
1319                         /* Generate rejected message entry for missing messages */
1320                         if (prev_seqn != -1) {
1321                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1322                                 s->addEntry(rm);
1323                         }
1324                         /* Generate rejected message entries for present messages */
1325                         for (; i < rejectedSlotVector->size(); i++) {
1326                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1327                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1328                                 int64_t machineid = s_msg->getMachineID();
1329                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1330                                 s->addEntry(rm);
1331                         }
1332                 }
1333         }
1334 }
1335
1336 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1337         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1338         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1339         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1340                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1341         }
1342
1343         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1344         bool seenLiveSlot = false;
1345         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1346         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1347
1348
1349         // Mandatory Rescue
1350         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1351                 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1352                 // Push slot number forward
1353                 if (!seenLiveSlot) {
1354                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1355                 }
1356
1357                 if (!previousSlot->isLive()) {
1358                         continue;
1359                 }
1360
1361                 // We have seen a live slot
1362                 seenLiveSlot = true;
1363
1364                 // Get all the live entries for a slot
1365                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1366
1367                 // Iterate over all the live entries and try to rescue them
1368                 uint lESize = liveEntries->size();
1369                 for (uint i=0; i< lESize; i++) {
1370                         Entry * liveEntry = liveEntries->get(i);
1371                         if (slot->hasSpace(liveEntry)) {
1372                                 // Enough space to rescue the entry
1373                                 slot->addEntry(liveEntry);
1374                         } else if (currentSequenceNumber == firstIfFull) {
1375                                 //if there's no space but the entry is about to fall off the queue
1376                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1377                         }
1378                 }
1379         }
1380
1381         // Did not resize
1382         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1383 }
1384
1385 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1386         /* now go through live entries from least to greatest sequence number until
1387          * either all live slots added, or the slot doesn't have enough room
1388          * for SKIP_THRESHOLD consecutive entries*/
1389         int skipcount = 0;
1390         int64_t newestseqnum = buffer->getNewestSeqNum();
1391 search:
1392         for (; seqn <= newestseqnum; seqn++) {
1393                 Slot *prevslot = buffer->getSlot(seqn);
1394                 //Push slot number forward
1395                 if (!seenliveslot)
1396                         oldestLiveSlotSequenceNumver = seqn;
1397
1398                 if (!prevslot->isLive())
1399                         continue;
1400                 seenliveslot = true;
1401                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1402                 uint lESize = liveentries->size();
1403                 for (uint i=0; i< lESize; i++) {
1404                         Entry * liveentry = liveentries->get(i);
1405                         if (s->hasSpace(liveentry))
1406                                 s->addEntry(liveentry);
1407                         else {
1408                                 skipcount++;
1409                                 if (skipcount > Table_SKIP_THRESHOLD)
1410                                         goto donesearch;
1411                         }
1412                 }
1413         }
1414  donesearch:
1415         ;
1416 }
1417
1418 /**
1419  * Checks for malicious activity and updates the local copy of the block chain->
1420  */
1421 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1422         // The cloud communication layer has checked slot HMACs already
1423         // before decoding
1424         if (newSlots->length() == 0) {
1425                 return;
1426         }
1427
1428         // Make sure all slots are newer than the last largest slot this
1429         // client has seen
1430         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1431         if (firstSeqNum <= sequenceNumber) {
1432                 throw new Error("Server Error: Sent older slots!");
1433         }
1434
1435         // Create an object that can access both new slots and slots in our
1436         // local chain without committing slots to our local chain
1437         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1438
1439         // Check that the HMAC chain is not broken
1440         checkHMACChain(indexer, newSlots);
1441
1442         // Set to keep track of messages from clients
1443         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1444         {
1445                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> * lmit=getKeyIterator(lastMessageTable);
1446                 while(lmit->hasNext())
1447                         machineSet->add(lmit->next());
1448                 delete lmit;
1449         }
1450
1451         // Process each slots data
1452         {
1453                 uint numSlots = newSlots->length();
1454                 for(uint i=0; i<numSlots; i++) {
1455                         Slot *slot = newSlots->get(i);
1456                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1457                         updateExpectedSize();
1458                 }
1459         }
1460
1461         // If there is a gap, check to see if the server sent us
1462         // everything->
1463         if (firstSeqNum != (sequenceNumber + 1)) {
1464
1465                 // Check the size of the slots that were sent down by the server->
1466                 // Can only check the size if there was a gap
1467                 checkNumSlots(newSlots->length());
1468
1469                 // Since there was a gap every machine must have pushed a slot or
1470                 // must have a last message message-> If not then the server is
1471                 // hiding slots
1472                 if (!machineSet->isEmpty()) {
1473                         throw new Error("Missing record for machines: ");
1474                 }
1475         }
1476
1477         // Update the size of our local block chain->
1478         commitNewMaxSize();
1479
1480         // Commit new to slots to the local block chain->
1481         {
1482                 uint numSlots = newSlots->length();
1483                 for(uint i=0; i<numSlots; i++) {
1484                         Slot *slot = newSlots->get(i);
1485                         
1486                         // Insert this slot into our local block chain copy->
1487                         buffer->putSlot(slot);
1488                         
1489                         // Keep track of how many slots are currently live (have live data
1490                         // in them)->
1491                         liveSlotCount++;
1492                 }
1493         }
1494         // Get the sequence number of the latest slot in the system
1495         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1496         updateLiveStateFromServer();
1497
1498         // No Need to remember after we pulled from the server
1499         offlineTransactionsCommittedAndAtServer->clear();
1500
1501         // This is invalidated now
1502         hadPartialSendToServer = false;
1503 }
1504
1505 void Table::updateLiveStateFromServer() {
1506         // Process the new transaction parts
1507         processNewTransactionParts();
1508
1509         // Do arbitration on new transactions that were received
1510         arbitrateFromServer();
1511
1512         // Update all the committed keys
1513         bool didCommitOrSpeculate = updateCommittedTable();
1514
1515         // Delete the transactions that are now dead
1516         updateLiveTransactionsAndStatus();
1517
1518         // Do speculations
1519         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1520         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1521 }
1522
1523 void Table::updateLiveStateFromLocal() {
1524         // Update all the committed keys
1525         bool didCommitOrSpeculate = updateCommittedTable();
1526
1527         // Delete the transactions that are now dead
1528         updateLiveTransactionsAndStatus();
1529
1530         // Do speculations
1531         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1532         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1533 }
1534
1535 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1536         int64_t prevslots = firstSequenceNumber;
1537
1538         if (didFindTableStatus) {
1539         } else {
1540                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1541         }
1542
1543         didFindTableStatus = true;
1544         currMaxSize = numberOfSlots;
1545 }
1546
1547 void Table::updateExpectedSize() {
1548         expectedsize++;
1549
1550         if (expectedsize > currMaxSize) {
1551                 expectedsize = currMaxSize;
1552         }
1553 }
1554
1555
1556 /**
1557  * Check the size of the block chain to make sure there are enough
1558  * slots sent back by the server-> This is only called when we have a
1559  * gap between the slots that we have locally and the slots sent by
1560  * the server therefore in the slots sent by the server there will be
1561  * at least 1 Table status message
1562  */
1563 void Table::checkNumSlots(int numberOfSlots) {
1564         if (numberOfSlots != expectedsize) {
1565                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1566         }
1567 }
1568
1569 /**
1570  * Update the size of of the local buffer if it is needed->
1571  */
1572 void Table::commitNewMaxSize() {
1573         didFindTableStatus = false;
1574
1575         // Resize the local slot buffer
1576         if (numberOfSlots != currMaxSize) {
1577                 buffer->resize((int32_t)currMaxSize);
1578         }
1579
1580         // Change the number of local slots to the new size
1581         numberOfSlots = (int32_t)currMaxSize;
1582
1583         // Recalculate the resize threshold since the size of the local
1584         // buffer has changed
1585         setResizeThreshold();
1586 }
1587
1588 /**
1589  * Process the new transaction parts from this latest round of slots
1590  * received from the server
1591  */
1592 void Table::processNewTransactionParts() {
1593
1594         if (newTransactionParts->size() == 0) {
1595                 // Nothing new to process
1596                 return;
1597         }
1598
1599         // Iterate through all the machine Ids that we received new parts
1600         // for
1601         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts);
1602         while(tpit->hasNext()) {
1603                 int64_t machineId = tpit->next();
1604                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1605
1606                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1607                 // Iterate through all the parts for that machine Id
1608                 while(ptit->hasNext()) {
1609                         Pair<int64_t, int32_t> * partId = ptit->next();
1610                         TransactionPart *part = parts->get(partId);
1611
1612                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1613                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1614                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1615                                         // Set dead the transaction part
1616                                         part->setDead();
1617                                         continue;
1618                                 }
1619                         }
1620                         
1621                         // Get the transaction object for that sequence number
1622                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1623
1624                         if (transaction == NULL) {
1625                                 // This is a new transaction that we dont have so make a new one
1626                                 transaction = new Transaction();
1627
1628                                 // Insert this new transaction into the live tables
1629                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1630                                 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1631                         }
1632
1633                         // Add that part to the transaction
1634                         transaction->addPartDecode(part);
1635                 }
1636                 delete ptit;
1637         }
1638         delete tpit;
1639         // Clear all the new transaction parts in preparation for the next
1640         // time the server sends slots
1641         newTransactionParts->clear();
1642 }
1643
1644 void Table::arbitrateFromServer() {
1645
1646         if (liveTransactionBySequenceNumberTable->size() == 0) {
1647                 // Nothing to arbitrate on so move on
1648                 return;
1649         }
1650
1651         // Get the transaction sequence numbers and sort from oldest to newest
1652         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1653         {
1654                 SetIterator<int64_t, Transaction *> * trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1655                 while(trit->hasNext())
1656                         transactionSequenceNumbers->add(trit->next());
1657                 delete trit;
1658         }
1659         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1660
1661         // Collection of key value pairs that are
1662         Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1663
1664         // The last transaction arbitrated on
1665         int64_t lastTransactionCommitted = -1;
1666         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1667         uint tsnSize = transactionSequenceNumbers->size();
1668         for(uint i=0; i<tsnSize; i++) {
1669                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1670                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1671
1672                 // Check if this machine arbitrates for this transaction if not
1673                 // then we cant arbitrate this transaction
1674                 if (transaction->getArbitrator() != localMachineId) {
1675                         continue;
1676                 }
1677
1678                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1679                         continue;
1680                 }
1681
1682                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1683                         // We have seen this already locally so dont commit again
1684                         continue;
1685                 }
1686
1687
1688                 if (!transaction->isComplete()) {
1689                         // Will arbitrate in incorrect order if we continue so just break
1690                         // Most likely this
1691                         break;
1692                 }
1693
1694
1695                 // update the largest transaction seen by arbitrator from server
1696                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1697                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1698                 } else {
1699                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1700                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1701                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1702                         }
1703                 }
1704
1705                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1706                         // Guard evaluated as true
1707
1708                         // Update the local changes so we can make the commit
1709                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1710                         while (kvit->hasNext()) {
1711                                 KeyValue *kv = kvit->next();
1712                                 speculativeTableTmp->put(kv->getKey(), kv);
1713                         }
1714                         delete kvit;
1715                         
1716                         // Update what the last transaction committed was for use in batch commit
1717                         lastTransactionCommitted = transactionSequenceNumber;
1718                 } else {
1719                         // Guard evaluated was false so create abort
1720                         // Create the abort
1721                         Abort *newAbort = new Abort(NULL,
1722                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1723                                                                                                                                         transaction->getSequenceNumber(),
1724                                                                                                                                         transaction->getMachineId(),
1725                                                                                                                                         transaction->getArbitrator(),
1726                                                                                                                                         localArbitrationSequenceNumber);
1727                         localArbitrationSequenceNumber++;
1728                         generatedAborts->add(newAbort);
1729
1730                         // Insert the abort so we can process
1731                         processEntry(newAbort);
1732                 }
1733
1734                 lastSeqNumArbOn = transactionSequenceNumber;
1735         }
1736
1737         Commit *newCommit = NULL;
1738
1739         // If there is something to commit
1740         if (speculativeTableTmp->size() != 0) {
1741                 // Create the commit and increment the commit sequence number
1742                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1743                 localArbitrationSequenceNumber++;
1744
1745                 // Add all the new keys to the commit
1746                 SetIterator<IoTString *, KeyValue *> * spit = getKeyIterator(speculativeTableTmp);
1747                 while(spit->hasNext()) {
1748                         IoTString * string = spit->next();
1749                         KeyValue * kv = speculativeTableTmp->get(string);
1750                         newCommit->addKV(kv);
1751                 }
1752                 delete spit;
1753                 
1754                 // create the commit parts
1755                 newCommit->createCommitParts();
1756
1757                 // Append all the commit parts to the end of the pending queue
1758                 // waiting for sending to the server
1759                 // Insert the commit so we can process it
1760                 Vector<CommitPart *> * parts = newCommit->getParts();
1761                 uint partsSize = parts->size();
1762                 for(uint i=0; i<partsSize; i++) {
1763                         CommitPart * commitPart = parts->get(i);
1764                         processEntry(commitPart);
1765                 }
1766         }
1767
1768         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1769                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1770                 pendingSendArbitrationRounds->add(arbitrationRound);
1771
1772                 if (compactArbitrationData()) {
1773                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1774                         if (newArbitrationRound->getCommit() != NULL) {
1775                                 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1776                                 uint partsSize = parts->size();
1777                                 for(uint i=0; i<partsSize; i++) {
1778                                         CommitPart * commitPart = parts->get(i);
1779                                         processEntry(commitPart);
1780                                 }
1781                         }
1782                 }
1783         }
1784 }
1785
1786 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1787
1788         // Check if this machine arbitrates for this transaction if not then
1789         // we cant arbitrate this transaction
1790         if (transaction->getArbitrator() != localMachineId) {
1791                 return Pair<bool, bool>(false, false);
1792         }
1793
1794         if (!transaction->isComplete()) {
1795                 // Will arbitrate in incorrect order if we continue so just break
1796                 // Most likely this
1797                 return Pair<bool, bool>(false, false);
1798         }
1799
1800         if (transaction->getMachineId() != localMachineId) {
1801                 // dont do this check for local transactions
1802                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1803                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1804                                 // We've have already seen this from the server
1805                                 return Pair<bool, bool>(false, false);
1806                         }
1807                 }
1808         }
1809
1810         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1811                 // Guard evaluated as true Create the commit and increment the
1812                 // commit sequence number
1813                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1814                 localArbitrationSequenceNumber++;
1815
1816                 // Update the local changes so we can make the commit
1817                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1818                 while (kvit->hasNext()) {
1819                         KeyValue *kv = kvit->next();
1820                         newCommit->addKV(kv);
1821                 }
1822                 delete kvit;
1823                 
1824                 // create the commit parts
1825                 newCommit->createCommitParts();
1826
1827                 // Append all the commit parts to the end of the pending queue
1828                 // waiting for sending to the server
1829                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1830                 pendingSendArbitrationRounds->add(arbitrationRound);
1831
1832                 if (compactArbitrationData()) {
1833                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1834                         Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1835                         uint partsSize = parts->size();
1836                         for(uint i=0; i<partsSize; i++) {
1837                                 CommitPart * commitPart = parts->get(i);
1838                                 processEntry(commitPart);
1839                         }
1840                 } else {
1841                         // Insert the commit so we can process it
1842                         Vector<CommitPart *> * parts = newCommit->getParts();
1843                         uint partsSize = parts->size();
1844                         for(uint i=0; i<partsSize; i++) {
1845                                 CommitPart * commitPart = parts->get(i);
1846                                 processEntry(commitPart);
1847                         }
1848                 }
1849
1850                 if (transaction->getMachineId() == localMachineId) {
1851                         TransactionStatus *status = transaction->getTransactionStatus();
1852                         if (status != NULL) {
1853                                 status->setStatus(TransactionStatus_StatusCommitted);
1854                         }
1855                 }
1856
1857                 updateLiveStateFromLocal();
1858                 return Pair<bool, bool>(true, true);
1859         } else {
1860                 if (transaction->getMachineId() == localMachineId) {
1861                         // For locally created messages update the status
1862                         // Guard evaluated was false so create abort
1863                         TransactionStatus * status = transaction->getTransactionStatus();
1864                         if (status != NULL) {
1865                                 status->setStatus(TransactionStatus_StatusAborted);
1866                         }
1867                 } else {
1868                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1869
1870                         // Create the abort
1871                         Abort *newAbort = new Abort(NULL,
1872                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1873                                                                                                                                         -1,
1874                                                                                                                                         transaction->getMachineId(),
1875                                                                                                                                         transaction->getArbitrator(),
1876                                                                                                                                         localArbitrationSequenceNumber);
1877                         localArbitrationSequenceNumber++;
1878                         addAbortSet->add(newAbort);
1879
1880                         // Append all the commit parts to the end of the pending queue
1881                         // waiting for sending to the server
1882                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1883                         pendingSendArbitrationRounds->add(arbitrationRound);
1884
1885                         if (compactArbitrationData()) {
1886                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1887
1888                                 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1889                                 uint partsSize = parts->size();
1890                                 for(uint i=0; i<partsSize; i++) {
1891                                         CommitPart * commitPart = parts->get(i);
1892                                         processEntry(commitPart);
1893                                 }
1894                         }
1895                 }
1896
1897                 updateLiveStateFromLocal();
1898                 return Pair<bool, bool>(true, false);
1899         }
1900 }
1901
1902 /**
1903  * Compacts the arbitration data my merging commits and aggregating
1904  * aborts so that a single large push of commits can be done instead
1905  * of many small updates
1906  */
1907 bool Table::compactArbitrationData() {
1908         if (pendingSendArbitrationRounds->size() < 2) {
1909                 // Nothing to compact so do nothing
1910                 return false;
1911         }
1912
1913         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1914         if (lastRound->getDidSendPart()) {
1915                 return false;
1916         }
1917
1918         bool hadCommit = (lastRound->getCommit() == NULL);
1919         bool gotNewCommit = false;
1920
1921         int numberToDelete = 1;
1922         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1923                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1924
1925                 if (round->isFull() || round->getDidSendPart()) {
1926                         // Stop since there is a part that cannot be compacted and we
1927                         // need to compact in order
1928                         break;
1929                 }
1930
1931                 if (round->getCommit() == NULL) {
1932                         // Try compacting aborts only
1933                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1934                         if (newSize > ArbitrationRound_MAX_PARTS) {
1935                                 // Cant compact since it would be too large
1936                                 break;
1937                         }
1938                         lastRound->addAborts(round->getAborts());
1939                 } else {
1940                         // Create a new larger commit
1941                         Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1942                         localArbitrationSequenceNumber++;
1943
1944                         // Create the commit parts so that we can count them
1945                         newCommit->createCommitParts();
1946
1947                         // Calculate the new size of the parts
1948                         int newSize = newCommit->getNumberOfParts();
1949                         newSize += lastRound->getAbortsCount();
1950                         newSize += round->getAbortsCount();
1951
1952                         if (newSize > ArbitrationRound_MAX_PARTS) {
1953                                 // Cant compact since it would be too large
1954                                 break;
1955                         }
1956
1957                         // Set the new compacted part
1958                         lastRound->setCommit(newCommit);
1959                         lastRound->addAborts(round->getAborts());
1960                         gotNewCommit = true;
1961                 }
1962
1963                 numberToDelete++;
1964         }
1965
1966         if (numberToDelete != 1) {
1967                 // If there is a compaction
1968                 // Delete the previous pieces that are now in the new compacted piece
1969                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1970                         pendingSendArbitrationRounds->clear();
1971                 } else {
1972                         for (int i = 0; i < numberToDelete; i++) {
1973                                 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1974                         }
1975                 }
1976
1977                 // Add the new compacted into the pending to send list
1978                 pendingSendArbitrationRounds->add(lastRound);
1979
1980                 // Should reinsert into the commit processor
1981                 if (hadCommit && gotNewCommit) {
1982                         return true;
1983                 }
1984         }
1985
1986         return false;
1987 }
1988
1989 /**
1990  * Update all the commits and the committed tables, sets dead the dead
1991  * transactions
1992  */
1993 bool Table::updateCommittedTable() {
1994
1995         if (newCommitParts->size() == 0) {
1996                 // Nothing new to process
1997                 return false;
1998         }
1999
2000         // Iterate through all the machine Ids that we received new parts for
2001         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * partsit=getKeyIterator(newCommitParts);
2002         while(partsit->hasNext()) {
2003                 int64_t machineId = partsit->next();
2004                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2005
2006                 // Iterate through all the parts for that machine Id
2007                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> * pairit=getKeyIterator(parts);
2008                 while(pairit->hasNext()) {
2009                         Pair<int64_t, int32_t> * partId = pairit->next();
2010                         CommitPart *part = parts->get(partId);
2011
2012                         // Get the transaction object for that sequence number
2013                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2014
2015                         if (commitForClientTable == NULL) {
2016                                 // This is the first commit from this device
2017                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2018                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2019                         }
2020
2021                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2022
2023                         if (commit == NULL) {
2024                                 // This is a new commit that we dont have so make a new one
2025                                 commit = new Commit();
2026
2027                                 // Insert this new commit into the live tables
2028                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2029                         }
2030
2031                         // Add that part to the commit
2032                         commit->addPartDecode(part);
2033                 }
2034                 delete pairit;
2035         }
2036         delete partsit;
2037         
2038         // Clear all the new commits parts in preparation for the next time
2039         // the server sends slots
2040         newCommitParts->clear();
2041
2042         // If we process a new commit keep track of it for future use
2043         bool didProcessANewCommit = false;
2044
2045         // Process the commits one by one
2046         for (int64_t arbitratorId : liveCommitsTable->keySet()) {
2047
2048                 // Get all the commits for a specific arbitrator
2049                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2050
2051                 // Sort the commits in order
2052                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
2053                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2054
2055                 // Get the last commit seen from this arbitrator
2056                 int64_t lastCommitSeenSequenceNumber = -1;
2057                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
2058                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2059                 }
2060
2061                 // Go through each new commit one by one
2062                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
2063                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2064                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2065
2066                         // Special processing if a commit is not complete
2067                         if (!commit->isComplete()) {
2068                                 if (i == (commitSequenceNumbers->size() - 1)) {
2069                                         // If there is an incomplete commit and this commit is the
2070                                         // latest one seen then this commit cannot be processed and
2071                                         // there are no other commits
2072                                         break;
2073                                 } else {
2074                                         // This is a commit that was already dead but parts of it
2075                                         // are still in the block chain (not flushed out yet)->
2076                                         // Delete it and move on
2077                                         commit->setDead();
2078                                         commitForClientTable->remove(commit->getSequenceNumber());
2079                                         continue;
2080                                 }
2081                         }
2082
2083                         // Update the last transaction that was updated if we can
2084                         if (commit->getTransactionSequenceNumber() != -1) {
2085                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2086                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber())) {
2087                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2088                                 }
2089                         }
2090
2091                         // Update the last arbitration data that we have seen so far
2092                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2093                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2094                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2095                                         // Is larger
2096                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2097                                 }
2098                         } else {
2099                                 // Never seen any data from this arbitrator so record the first one
2100                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2101                         }
2102
2103                         // We have already seen this commit before so need to do the
2104                         // full processing on this commit
2105                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2106
2107                                 // Update the last transaction that was updated if we can
2108                                 if (commit->getTransactionSequenceNumber() != -1) {
2109                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2110
2111                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2112                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2113                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2114                                         }
2115                                 }
2116
2117                                 continue;
2118                         }
2119
2120                         // If we got here then this is a brand new commit and needs full
2121                         // processing
2122                         // Get what commits should be edited, these are the commits that
2123                         // have live values for their keys
2124                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2125                         {
2126                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2127                                 while (kvit->hasNext()) {
2128                                         KeyValue *kv = kvit->next();
2129                                         commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2130                                 }
2131                                 delete kvit;
2132                         }
2133                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
2134
2135                         // Update each previous commit that needs to be updated
2136                         SetIterator<Commit *, Commit *> * commitit = commitsToEdit->iterator();
2137                         while(commitit->hasNext()) {
2138                                 Commit *previousCommit = commitit->next();
2139
2140                                 // Only bother with live commits (TODO: Maybe remove this check)
2141                                 if (previousCommit->isLive()) {
2142
2143                                         // Update which keys in the old commits are still live
2144                                         {
2145                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2146                                                 while (kvit->hasNext()) {
2147                                                         KeyValue *kv = kvit->next();
2148                                                         previousCommit->invalidateKey(kv->getKey());
2149                                                 }
2150                                                 delete kvit;
2151                                         }
2152                                         
2153                                         // if the commit is now dead then remove it
2154                                         if (!previousCommit->isLive()) {
2155                                                 commitForClientTable->remove(previousCommit);
2156                                         }
2157                                 }
2158                         }
2159                         delete commitit;
2160
2161                         // Update the last seen sequence number from this arbitrator
2162                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2163                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2164                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2165                                 }
2166                         } else {
2167                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2168                         }
2169
2170                         // We processed a new commit that we havent seen before
2171                         didProcessANewCommit = true;
2172
2173                         // Update the committed table of keys and which commit is using which key
2174                         {
2175                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2176                                 while (kvit->hasNext()) {
2177                                         KeyValue *kv = kvit->next();
2178                                         committedKeyValueTable->put(kv->getKey(), kv);
2179                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2180                                 }
2181                                 delete kvit;
2182                         }
2183                 }
2184         }
2185
2186         return didProcessANewCommit;
2187 }
2188
2189 /**
2190  * Create the speculative table from transactions that are still live
2191  * and have come from the cloud
2192  */
2193 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2194         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2195                 // There is nothing to speculate on
2196                 return false;
2197         }
2198
2199         // Create a list of the transaction sequence numbers and sort them
2200         // from oldest to newest
2201         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2202         qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2203
2204         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2205
2206
2207         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2208                 // If there is a gap in the transaction sequence numbers then
2209                 // there was a commit or an abort of a transaction OR there was a
2210                 // new commit (Could be from offline commit) so a redo the
2211                 // speculation from scratch
2212
2213                 // Start from scratch
2214                 speculatedKeyValueTable->clear();
2215                 lastTransactionSequenceNumberSpeculatedOn = -1;
2216                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2217
2218         }
2219
2220         // Remember the front of the transaction list
2221         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2222
2223         // Find where to start arbitration from
2224         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2225
2226         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2227                 // Make sure we are not out of bounds
2228                 return false;           // did not speculate
2229         }
2230
2231         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2232         bool didSkip = true;
2233
2234         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2235                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2236                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2237
2238                 if (!transaction->isComplete()) {
2239                         // If there is an incomplete transaction then there is nothing
2240                         // we can do add this transactions arbitrator to the list of
2241                         // arbitrators we should ignore
2242                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2243                         didSkip = true;
2244                         continue;
2245                 }
2246
2247                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2248                         continue;
2249                 }
2250
2251                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2252
2253                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2254                         // Guard evaluated to true so update the speculative table
2255                         {
2256                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2257                                 while (kvit->hasNext()) {
2258                                         KeyValue *kv = kvit->next();
2259                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2260                                 }
2261                                 delete kvit;
2262                         }
2263                 }
2264         }
2265
2266         if (didSkip) {
2267                 // Since there was a skip we need to redo the speculation next time around
2268                 lastTransactionSequenceNumberSpeculatedOn = -1;
2269                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2270         }
2271
2272         // We did some speculation
2273         return true;
2274 }
2275
2276 /**
2277  * Create the pending transaction speculative table from transactions
2278  * that are still in the pending transaction buffer
2279  */
2280 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2281         if (pendingTransactionQueue->size() == 0) {
2282                 // There is nothing to speculate on
2283                 return;
2284         }
2285
2286         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2287                 // need to reset on the pending speculation
2288                 lastPendingTransactionSpeculatedOn = NULL;
2289                 firstPendingTransaction = pendingTransactionQueue->get(0);
2290                 pendingTransactionSpeculatedKeyValueTable->clear();
2291         }
2292
2293         // Find where to start arbitration from
2294         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2295
2296         if (startIndex >= pendingTransactionQueue->size()) {
2297                 // Make sure we are not out of bounds
2298                 return;
2299         }
2300
2301         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2302                 Transaction *transaction = pendingTransactionQueue->get(i);
2303
2304                 lastPendingTransactionSpeculatedOn = transaction;
2305
2306                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2307                         // Guard evaluated to true so update the speculative table
2308                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2309                         while (kvit->hasNext()) {
2310                                 KeyValue *kv = kvit->next();
2311                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2312                         }
2313                         delete kvit;
2314                 }
2315         }
2316 }
2317
2318 /**
2319  * Set dead and remove from the live transaction tables the
2320  * transactions that are dead
2321  */
2322 void Table::updateLiveTransactionsAndStatus() {
2323
2324         // Go through each of the transactions
2325         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2326                 Transaction *transaction = iter->next()->getValue();
2327
2328                 // Check if the transaction is dead
2329                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2330                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2331
2332                         // Set dead the transaction
2333                         transaction->setDead();
2334
2335                         // Remove the transaction from the live table
2336                         iter->remove();
2337                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2338                 }
2339         }
2340
2341         // Go through each of the transactions
2342         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2343                 TransactionStatus *status = iter->next()->getValue();
2344
2345                 // Check if the transaction is dead
2346                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2347                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2348
2349                         // Set committed
2350                         status->setStatus(TransactionStatus_StatusCommitted);
2351
2352                         // Remove
2353                         iter->remove();
2354                 }
2355         }
2356 }
2357
2358 /**
2359  * Process this slot, entry by entry->  Also update the latest message sent by slot
2360  */
2361 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2362
2363         // Update the last message seen
2364         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2365
2366         // Process each entry in the slot
2367         Vector<Entry *> *entries = slot->getEntries();
2368         uint eSize = entries->size();
2369         for(uint ei=0; ei < eSize; ei++) {
2370                 Entry * entry = entries->get(ei);
2371                 switch (entry->getType()) {
2372                 case TypeCommitPart:
2373                         processEntry((CommitPart *)entry);
2374                         break;
2375                 case TypeAbort:
2376                         processEntry((Abort *)entry);
2377                         break;
2378                 case TypeTransactionPart:
2379                         processEntry((TransactionPart *)entry);
2380                         break;
2381                 case TypeNewKey:
2382                         processEntry((NewKey *)entry);
2383                         break;
2384                 case TypeLastMessage:
2385                         processEntry((LastMessage *)entry, machineSet);
2386                         break;
2387                 case TypeRejectedMessage:
2388                         processEntry((RejectedMessage *)entry, indexer);
2389                         break;
2390                 case TypeTableStatus:
2391                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2392                         break;
2393                 default:
2394                         throw new Error("Unrecognized type: ");
2395                 }
2396         }
2397 }
2398
2399 /**
2400  * Update the last message that was sent for a machine Id
2401  */
2402 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2403         // Update what the last message received by a machine was
2404         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2405 }
2406
2407 /**
2408  * Add the new key to the arbitrators table and update the set of live
2409  * new keys (in case of a rescued new key message)
2410  */
2411 void Table::processEntry(NewKey *entry) {
2412         // Update the arbitrator table with the new key information
2413         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2414
2415         // Update what the latest live new key is
2416         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2417         if (oldNewKey != NULL) {
2418                 // Delete the old new key messages
2419                 oldNewKey->setDead();
2420         }
2421 }
2422
2423 /**
2424  * Process new table status entries and set dead the old ones as new
2425  * ones come in-> keeps track of the largest and smallest table status
2426  * seen in this current round of updating the local copy of the block
2427  * chain
2428  */
2429 void Table::processEntry(TableStatus * entry, int64_t seq) {
2430         int newNumSlots = entry->getMaxSlots();
2431         updateCurrMaxSize(newNumSlots);
2432         initExpectedSize(seq, newNumSlots);
2433
2434         if (liveTableStatus != NULL) {
2435                 // We have a larger table status so the old table status is no
2436                 // int64_ter alive
2437                 liveTableStatus->setDead();
2438         }
2439
2440         // Make this new table status the latest alive table status
2441         liveTableStatus = entry;
2442 }
2443
2444 /**
2445  * Check old messages to see if there is a block chain violation->
2446  * Also
2447  */
2448 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2449         int64_t oldSeqNum = entry->getOldSeqNum();
2450         int64_t newSeqNum = entry->getNewSeqNum();
2451         bool isequal = entry->getEqual();
2452         int64_t machineId = entry->getMachineID();
2453         int64_t seq = entry->getSequenceNumber();
2454
2455         // Check if we have messages that were supposed to be rejected in
2456         // our local block chain
2457         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2458                 // Get the slot
2459                 Slot *slot = indexer->getSlot(seqNum);
2460
2461                 if (slot != NULL) {
2462                         // If we have this slot make sure that it was not supposed to be
2463                         // a rejected slot
2464                         int64_t slotMachineId = slot->getMachineID();
2465                         if (isequal != (slotMachineId == machineId)) {
2466                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2467                         }
2468                 }
2469         }
2470
2471         // Create a list of clients to watch until they see this rejected
2472         // message entry->
2473         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2474         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2475                 // Machine ID for the last message entry
2476                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2477
2478                 // We've seen it, don't need to continue to watch->  Our next
2479                 // message will implicitly acknowledge it->
2480                 if (lastMessageEntryMachineId == localMachineId) {
2481                         continue;
2482                 }
2483
2484                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2485                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2486
2487                 if (entrySequenceNumber < seq) {
2488                         // Add this rejected message to the set of messages that this
2489                         // machine ID did not see yet
2490                         addWatchVector(lastMessageEntryMachineId, entry);
2491                         // This client did not see this rejected message yet so add it
2492                         // to the watch set to monitor
2493                         deviceWatchSet->add(lastMessageEntryMachineId);
2494                 }
2495         }
2496         if (deviceWatchSet->isEmpty()) {
2497                 // This rejected message has been seen by all the clients so
2498                 entry->setDead();
2499         } else {
2500                 // We need to watch this rejected message
2501                 entry->setWatchSet(deviceWatchSet);
2502         }
2503 }
2504
2505 /**
2506  * Check if this abort is live, if not then save it so we can kill it
2507  * later-> update the last transaction number that was arbitrated on->
2508  */
2509 void Table::processEntry(Abort *entry) {
2510         if (entry->getTransactionSequenceNumber() != -1) {
2511                 // update the transaction status if it was sent to the server
2512                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2513                 if (status != NULL) {
2514                         status->setStatus(TransactionStatus_StatusAborted);
2515                 }
2516         }
2517
2518         // Abort has not been seen by the client it is for yet so we need to
2519         // keep track of it
2520         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2521         if (previouslySeenAbort != NULL) {
2522                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2523         }
2524
2525         if (entry->getTransactionArbitrator() == localMachineId) {
2526                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2527         }
2528
2529         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2530                 // The machine already saw this so it is dead
2531                 entry->setDead();
2532                 liveAbortTable->remove(&entry->getAbortId());
2533
2534                 if (entry->getTransactionArbitrator() == localMachineId) {
2535                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2536                 }
2537                 return;
2538         }
2539
2540         // Update the last arbitration data that we have seen so far
2541         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2542                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2543                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2544                         // Is larger
2545                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2546                 }
2547         } else {
2548                 // Never seen any data from this arbitrator so record the first one
2549                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2550         }
2551
2552         // Set dead a transaction if we can
2553         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2554         if (transactionToSetDead != NULL) {
2555                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2556         }
2557
2558         // Update the last transaction sequence number that the arbitrator
2559         // arbitrated on
2560         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2561         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2562                 // Is a valid one
2563                 if (entry->getTransactionSequenceNumber() != -1) {
2564                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2565                 }
2566         }
2567 }
2568
2569 /**
2570  * Set dead the transaction part if that transaction is dead and keep
2571  * track of all new parts
2572  */
2573 void Table::processEntry(TransactionPart *entry) {
2574         // Check if we have already seen this transaction and set it dead OR
2575         // if it is not alive
2576         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2577         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2578                 // This transaction is dead, it was already committed or aborted
2579                 entry->setDead();
2580                 return;
2581         }
2582
2583         // This part is still alive
2584         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2585
2586         if (transactionPart == NULL) {
2587                 // Dont have a table for this machine Id yet so make one
2588                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2589                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2590         }
2591
2592         // Update the part and set dead ones we have already seen (got a
2593         // rescued version)
2594         TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2595         if (previouslySeenPart != NULL) {
2596                 previouslySeenPart->setDead();
2597         }
2598 }
2599
2600 /**
2601  * Process new commit entries and save them for future use->  Delete duplicates
2602  */
2603 void Table::processEntry(CommitPart *entry) {
2604         // Update the last transaction that was updated if we can
2605         if (entry->getTransactionSequenceNumber() != -1) {
2606                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2607                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2608                 }
2609         }
2610
2611         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2612         if (commitPart == NULL) {
2613                 // Don't have a table for this machine Id yet so make one
2614                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2615                 newCommitParts->put(entry->getMachineId(), commitPart);
2616         }
2617         // Update the part and set dead ones we have already seen (got a
2618         // rescued version)
2619         CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2620         if (previouslySeenPart != NULL) {
2621                 previouslySeenPart->setDead();
2622         }
2623 }
2624
2625 /**
2626  * Update the last message seen table-> Update and set dead the
2627  * appropriate RejectedMessages as clients see them-> Updates the live
2628  * aborts, removes those that are dead and sets them dead-> Check that
2629  * the last message seen is correct and that there is no mismatch of
2630  * our own last message or that other clients have not had a rollback
2631  * on the last message->
2632  */
2633 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2634         // We have seen this machine ID
2635         machineSet->remove(machineId);
2636
2637         // Get the set of rejected messages that this machine Id is has not seen yet
2638         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2639         // If there is a rejected message that this machine Id has not seen yet
2640         if (watchset != NULL) {
2641                 // Go through each rejected message that this machine Id has not
2642                 // seen yet
2643
2644                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2645                 while(rmit->hasNext()) {
2646                         RejectedMessage *rm = rmit->next();
2647                         // If this machine Id has seen this rejected message->->->
2648                         if (rm->getSequenceNumber() <= seqNum) {
2649                                 // Remove it from our watchlist
2650                                 rmit->remove();
2651                                 // Decrement machines that need to see this notification
2652                                 rm->removeWatcher(machineId);
2653                         }
2654                 }
2655                 delete rmit;
2656         }
2657
2658         // Set dead the abort
2659         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2660                 Abort *abort = i->next()->getValue();
2661                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2662                         abort->setDead();
2663                         i->remove();
2664                         if (abort->getTransactionArbitrator() == localMachineId) {
2665                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2666                         }
2667                 }
2668         }
2669         if (machineId == localMachineId) {
2670                 // Our own messages are immediately dead->
2671                 char livenessType = liveness->getType();
2672                 if (livenessType==TypeLastMessage) {
2673                         ((LastMessage *)liveness)->setDead();
2674                 } else if (livenessType == TypeSlot) {
2675                         ((Slot *)liveness)->setDead();
2676                 } else {
2677                         throw new Error("Unrecognized type");
2678                 }
2679         }
2680         // Get the old last message for this device
2681         Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2682         if (lastMessageEntry == NULL) {
2683                 // If no last message then there is nothing else to process
2684                 return;
2685         }
2686
2687         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2688         Liveness *lastEntry = lastMessageEntry->getSecond();
2689         delete lastMessageEntry;
2690         
2691         // If it is not our machine Id since we already set ours to dead
2692         if (machineId != localMachineId) {
2693                 char lastEntryType = lastEntry->getType();
2694                 
2695                 if (lastEntryType == TypeLastMessage) {
2696                         ((LastMessage *)lastEntry)->setDead();
2697                 } else if (lastEntryType == TypeSlot) {
2698                         ((Slot *)lastEntry)->setDead();
2699                 } else {
2700                         throw new Error("Unrecognized type");
2701                 }
2702         }
2703         // Make sure the server is not playing any games
2704         if (machineId == localMachineId) {
2705                 if (hadPartialSendToServer) {
2706                         // We were not making any updates and we had a machine mismatch
2707                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2708                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2709                         }
2710                 } else {
2711                         // We were not making any updates and we had a machine mismatch
2712                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2713                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2714                         }
2715                 }
2716         } else {
2717                 if (lastMessageSeqNum > seqNum) {
2718                         throw new Error("Server Error: Rollback on remote machine sequence number");
2719                 }
2720         }
2721 }
2722
2723 /**
2724  * Add a rejected message entry to the watch set to keep track of
2725  * which clients have seen that rejected message entry and which have
2726  * not.
2727  */
2728 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2729         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2730         if (entries == NULL) {
2731                 // There is no set for this machine ID yet so create one
2732                 entries = new Hashset<RejectedMessage *>();
2733                 rejectedMessageWatchVectorTable->put(machineId, entries);
2734         }
2735         entries->add(entry);
2736 }
2737
2738 /**
2739  * Check if the HMAC chain is not violated
2740  */
2741 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2742         for (int i = 0; i < newSlots->length(); i++) {
2743                 Slot *currSlot = newSlots->get(i);
2744                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2745                 if (prevSlot != NULL &&
2746                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2747                         throw new Error("Server Error: Invalid HMAC Chain");
2748         }
2749 }