From: bdemsky Date: Fri, 19 Jan 2018 06:40:12 +0000 (-0800) Subject: edits X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=commitdiff_plain;h=bd73414f3ef76dbb9d060e67b6d31f32278b2ffb edits --- diff --git a/version2/src/C/CloudComm.cc b/version2/src/C/CloudComm.cc index 82982ba..4674af2 100644 --- a/version2/src/C/CloudComm.cc +++ b/version2/src/C/CloudComm.cc @@ -113,7 +113,7 @@ void CloudComm::setSalt() { } try { - char[] saltTmp = new char[SALT_SIZE]; + Array *saltTmp = new char[SALT_SIZE]; random->nextBytes(saltTmp); for (int i = 0; i < SALT_SIZE; i++) { @@ -237,9 +237,9 @@ Array *CloudComm::encryptSlotAndPrependIV(Array *rawData, Arrayinit(Cipher.ENCRYPT_MODE, key, ivSpec); - char[] encryptedBytes = cipher->doFinal(rawData); + Array *encryptedBytes = cipher->doFinal(rawData); - char[] chars = new char[encryptedBytes->length + IV_SIZE]; + Array *chars = new char[encryptedBytes->length + IV_SIZE]; System.arraycopy(ivBytes, 0, chars, 0, ivBytes.length); System.arraycopy(encryptedBytes, 0, chars, IV_SIZE, encryptedBytes.length); @@ -291,17 +291,17 @@ Array *CloudComm::putSlot(Slot *slot, int max) { } int64_t sequencenumber = slot->getSequenceNumber(); - char[] slotBytes = slot->encode(mac); + Array *slotBytes = slot->encode(mac); // slotBytes = encryptCipher.doFinal(slotBytes); - // char[] iVBytes = slot.getSlotCryptIV(); + // Array * iVBytes = slot.getSlotCryptIV(); - // char[] chars = new char[slotBytes.length + IV_SIZE]; + // Array * chars = new char[slotBytes.length + IV_SIZE]; // System.arraycopy(iVBytes, 0, chars, 0, iVBytes.length); // System.arraycopy(slotBytes, 0, chars, IV_SIZE, slotBytes.length); - char[] chars = encryptSlotAndPrependIV(slotBytes, slot->getSlotCryptIV()); + Array *chars = encryptSlotAndPrependIV(slotBytes, slot->getSlotCryptIV()); url = buildRequest(true, sequencenumber, max); @@ -343,7 +343,7 @@ Array *CloudComm::putSlot(Slot *slot, int max) { timer->startTime(); InputStream is = http->getInputStream(); DataInputStream dis = new DataInputStream(is); - char[] resptype = new char[7]; + Array *resptype = new char[7]; dis->readFully(resptype); timer->endTime(); @@ -411,7 +411,7 @@ Array *CloudComm::getSlots(int64_t sequencenumber) { timer->startTime(); InputStream is = http->getInputStream(); DataInputStream dis = new DataInputStream(is); - char[] resptype = new char[7]; + Array *resptype = new char[7]; dis->readFully(resptype); timer->endTime(); @@ -444,15 +444,15 @@ Array *CloudComm::processSlots(DataInputStream dis) { for (int i = 0; i < numberofslots; i++) { - char[] rawData = new char[sizesofslots[i]]; + Array *rawData = new char[sizesofslots[i]]; dis->readFully(rawData); - // char[] data = new char[rawData.length - IV_SIZE]; + // Array * data = new char[rawData.length - IV_SIZE]; // System.arraycopy(rawData, IV_SIZE, data, 0, data.length); - char[] data = stripIVAndDecryptSlot(rawData); + Array *data = stripIVAndDecryptSlot(rawData); // data = decryptCipher.doFinal(data); @@ -471,16 +471,16 @@ Array *sendLocalData(Array *sendData, int64_t localSequenceNumber, S System.out.println("Passing Locally"); mac->update(sendData); - char[] genmac = mac->doFinal(); - char[] totalData = new char[sendData->length + genmac->length]; + Array *genmac = mac->doFinal(); + Array *totalData = new char[sendData->length + genmac->length]; System.arraycopy(sendData, 0, totalData, 0, sendData.length); System.arraycopy(genmac, 0, totalData, sendData.length, genmac->length); // Encrypt the data for sending - // char[] encryptedData = encryptCipher.doFinal(totalData); - // char[] encryptedData = encryptCipher.doFinal(totalData); - char[] iv = createIV(table->getMachineId(), table->getLocalSequenceNumber()); - char[] encryptedData = encryptSlotAndPrependIV(totalData, iv); + // Array * encryptedData = encryptCipher.doFinal(totalData); + // Array * encryptedData = encryptCipher.doFinal(totalData); + Array *iv = createIV(table->getMachineId(), table->getLocalSequenceNumber()); + Array *encryptedData = encryptSlotAndPrependIV(totalData, iv); // Open a TCP socket connection to a local device Socket socket = new Socket(host, port); @@ -496,7 +496,7 @@ Array *sendLocalData(Array *sendData, int64_t localSequenceNumber, S output->flush(); int lengthOfReturnData = input->readInt(); - char[] returnData = new char[lengthOfReturnData]; + Array *returnData = new char[lengthOfReturnData]; input->readFully(returnData); timer->endTime(); @@ -509,14 +509,14 @@ Array *sendLocalData(Array *sendData, int64_t localSequenceNumber, S socket->close(); mac->update(returnData, 0, returnData->length - HMAC_SIZE); - char[] realmac = mac->doFinal(); - char[] recmac = new char[HMAC_SIZE]; + Array *realmac = mac->doFinal(); + Array *recmac = new char[HMAC_SIZE]; System->arraycopy(returnData, returnData->length - realmac->length, recmac, 0, realmac->length); if (!Arrays->equals(recmac, realmac)) throw new Error("Local Error: Invalid HMAC! Potential Attack!"); - char[] returnData2 = new char[lengthOfReturnData - recmac->length]; + Array *returnData2 = new char[lengthOfReturnData - recmac->length]; System->arraycopy(returnData, 0, returnData2, 0, returnData2->length); return returnData2; @@ -554,7 +554,7 @@ void CloudComm::localServerWorkerFunction() { // Get the encrypted data from the server int dataSize = input->readInt(); - char[] readData = new char[dataSize]; + Array *readData = new char[dataSize]; input->readFully(readData); timer->endTime(); @@ -564,31 +564,31 @@ void CloudComm::localServerWorkerFunction() { readData = stripIVAndDecryptSlot(readData); mac->update(readData, 0, readData->length - HMAC_SIZE); - char[] genmac = mac->doFinal(); - char[] recmac = new char[HMAC_SIZE]; + Array *genmac = mac->doFinal(); + Array *recmac = new char[HMAC_SIZE]; System->arraycopy(readData, readData->length - recmac->length, recmac, 0, recmac->length); if (!Arrays->equals(recmac, genmac)) throw new Error("Local Error: Invalid HMAC! Potential Attack!"); - char[] returnData = new char[readData->length - recmac->length]; + Array *returnData = new char[readData->length - recmac->length]; System->arraycopy(readData, 0, returnData, 0, returnData->length); // Process the data - // char[] sendData = table->acceptDataFromLocal(readData); - char[] sendData = table->acceptDataFromLocal(returnData); + // Array * sendData = table->acceptDataFromLocal(readData); + Array *sendData = table->acceptDataFromLocal(returnData); mac->update(sendData); - char[] realmac = mac->doFinal(); - char[] totalData = new char[sendData->length + realmac->length]; + Array *realmac = mac->doFinal(); + Array *totalData = new char[sendData->length + realmac->length]; System->arraycopy(sendData, 0, totalData, 0, sendData->length); System->arraycopy(realmac, 0, totalData, sendData->length, realmac->length); // Encrypt the data for sending - // char[] encryptedData = encryptCipher->doFinal(totalData); - char[] iv = createIV(table->getMachineId(), table->getLocalSequenceNumber()); - char[] encryptedData = encryptSlotAndPrependIV(totalData, iv); + // Array * encryptedData = encryptCipher->doFinal(totalData); + Array *iv = createIV(table->getMachineId(), table->getLocalSequenceNumber()); + Array *encryptedData = encryptSlotAndPrependIV(totalData, iv); timer->startTime(); diff --git a/version2/src/C/LocalComm.cc b/version2/src/C/LocalComm.cc index 213fb3e..9a18ee0 100644 --- a/version2/src/C/LocalComm.cc +++ b/version2/src/C/LocalComm.cc @@ -8,7 +8,7 @@ class LocalComm { t2 = _t2; } - char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException { + Array *sendDataToLocalDevice(Long deviceId, Array *data) throws InterruptedException { System.out.println("Passing Locally"); if (deviceId == t1.getMachineId()) { diff --git a/version2/src/C/LocalComm.h b/version2/src/C/LocalComm.h index 10c90b0..4ea7458 100644 --- a/version2/src/C/LocalComm.h +++ b/version2/src/C/LocalComm.h @@ -8,7 +8,7 @@ class LocalComm { t2 = _t2; } - public char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException { + public Array *sendDataToLocalDevice(Long deviceId, Array *data) throws InterruptedException { System.out.println("Passing Locally"); if (deviceId == t1.getMachineId()) { diff --git a/version2/src/C/PendingTransaction.cc b/version2/src/C/PendingTransaction.cc index 5386815..b3da754 100644 --- a/version2/src/C/PendingTransaction.cc +++ b/version2/src/C/PendingTransaction.cc @@ -114,7 +114,7 @@ Transaction *PendingTransaction::createTransaction() { } // Copy to a smaller version - char[] partData = new char[copySize]; + Array *partData = new char[copySize]; System.arraycopy(charData, currentPosition, partData, 0, copySize); TransactionPart part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart); diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index b01cc0e..de0a1da 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -1,638 +1,481 @@ +#include "Table.h" +Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) { + localMachineId = _localMachineId; + cloud = new CloudComm(this, baseurl, password, listeningPort); + + init(); +} + +Table::Table(CloudComm _cloud, int64_t _localMachineId) { + localMachineId = _localMachineId; + cloud = _cloud; + + init(); +} /** - * IoTTable data structure. Provides client interface. - * @author Brian Demsky - * @version 1.0 + * Init all the stuff needed for for table usage */ +void Table::init() { + + // Init helper objects + random = new Random(); + buffer = new SlotBuffer(); + + // Set Variables + oldestLiveSlotSequenceNumver = 1; + + // init data structs + committedKeyValueTable = new Hashtable(); + speculatedKeyValueTable = new Hashtable(); + pendingTransactionSpeculatedKeyValueTable = new Hashtable(); + liveNewKeyTable = new Hashtable(); + lastMessageTable = new Hashtable >(); + rejectedMessageWatchVectorTable = new Hashtable >(); + arbitratorTable = new Hashtable(); + liveAbortTable = new Hashtable, Abort>(); + newTransactionParts = new Hashtable, TransactionPart> >(); + newCommitParts = new Hashtable, CommitPart> >(); + lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); + liveTransactionBySequenceNumberTable = new Hashtable(); + liveTransactionByTransactionIdTable = new Hashtable, Transaction>(); + liveCommitsTable = new Hashtable >(); + liveCommitsByKeyTable = new Hashtable(); + lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); + rejectedSlotVector = new Vector(); + pendingTransactionQueue = new Vector(); + pendingSendArbitrationEntriesToDelete = new Vector(); + transactionPartsSent = new Hashtable >(); + outstandingTransactionStatus = new Hashtable(); + liveAbortsGeneratedByLocal = new Hashtable(); + offlineTransactionsCommittedAndAtServer = new HashSet >(); + localCommunicationTable = new Hashtable >(); + lastTransactionSeenFromMachineFromServer = new Hashtable(); + pendingSendArbitrationRounds = new Vector(); + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); + + + // Other init stuff + numberOfSlots = buffer.capacity(); + setResizeThreshold(); +} -final class Table { - - /* Constants */ - static final int FREE_SLOTS = 2;// Number of slots that should be kept free // 10 - static final int SKIP_THRESHOLD = 10; - static final double RESIZE_MULTIPLE = 1.2; - static final double RESIZE_THRESHOLD = 0.75; - static final int REJECTED_THRESHOLD = 5; - - /* Helper Objects */ - SlotBuffer buffer = NULL; - CloudComm cloud = NULL; - Random random = NULL; - TableStatus liveTableStatus = NULL; - PendingTransaction pendingTransactionBuilder = NULL;// Pending Transaction used in building a Pending Transaction - Transaction lastPendingTransactionSpeculatedOn = NULL;// Last transaction that was speculated on from the pending transaction - Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list - - /* Variables */ - int numberOfSlots = 0; // Number of slots stored in buffer - int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed - int64_t liveSlotCount = 0;// Number of currently live slots - int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry - int64_t localMachineId = 0; // Machine ID of this client device - int64_t sequenceNumber = 0; // Largest sequence number a client has received - int64_t localSequenceNumber = 0; - - // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server - // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server - int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions - int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on - int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on - int64_t localArbitrationSequenceNumber = 0; - bool hadPartialSendToServer = false; - bool attemptedToSendToServer = false; - int64_t expectedsize; - bool didFindTableStatus = false; - int64_t currMaxSize = 0; - - Slot lastSlotAttemptedToSend = NULL; - bool lastIsNewKey = false; - int lastNewSize = 0; - Hashtable > lastTransactionPartsSent = NULL; - Vector lastPendingSendArbitrationEntriesToDelete = NULL; - NewKey lastNewKey = NULL; - - - /* Data Structures */ - Hashtable committedKeyValueTable = NULL; // Table of committed key value pairs - Hashtable speculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value - Hashtable pendingTransactionSpeculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value from the pending transactions - Hashtable liveNewKeyTable = NULL;// Table of live new keys - Hashtable > lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); - Hashtable > rejectedMessageWatchVectorTable = NULL;// Table of machine Ids and the set of rejected messages they have not seen yet - Hashtable arbitratorTable = NULL;// Table of keys and their arbitrators - Hashtable, Abort> liveAbortTable = NULL; // Table live abort messages - Hashtable, TransactionPart> > newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server - Hashtable, CommitPart> > newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server - Hashtable lastArbitratedTransactionNumberByArbitratorTable = NULL;// Last transaction sequence number that an arbitrator arbitrated on - Hashtable liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number - Hashtable, Transaction> liveTransactionByTransactionIdTable = NULL;// live transaction grouped by the transaction ID - Hashtable > liveCommitsTable = NULL; - Hashtable liveCommitsByKeyTable = NULL; - Hashtable lastCommitSeenSequenceNumberByArbitratorTable = NULL; - Vector rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server - Vector pendingTransactionQueue = NULL; - Vector pendingSendArbitrationRounds = NULL; - Vector pendingSendArbitrationEntriesToDelete = NULL; - Hashtable > transactionPartsSent = NULL; - Hashtable outstandingTransactionStatus = NULL; - Hashtable liveAbortsGeneratedByLocal = NULL; - Set > offlineTransactionsCommittedAndAtServer = NULL; - Hashtable > localCommunicationTable = NULL; - Hashtable lastTransactionSeenFromMachineFromServer = NULL; - Hashtable lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL; - - - Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) { - localMachineId = _localMachineId; - cloud = new CloudComm(this, baseurl, password, listeningPort); - - init(); - } - - Table(CloudComm _cloud, int64_t _localMachineId) { - localMachineId = _localMachineId; - cloud = _cloud; - - init(); - } - - /** - * Init all the stuff needed for for table usage - */ - void init() { - - // Init helper objects - random = new Random(); - buffer = new SlotBuffer(); - - // Set Variables - oldestLiveSlotSequenceNumver = 1; - - // init data structs - committedKeyValueTable = new Hashtable(); - speculatedKeyValueTable = new Hashtable(); - pendingTransactionSpeculatedKeyValueTable = new Hashtable(); - liveNewKeyTable = new Hashtable(); - lastMessageTable = new Hashtable >(); - rejectedMessageWatchVectorTable = new Hashtable >(); - arbitratorTable = new Hashtable(); - liveAbortTable = new Hashtable, Abort>(); - newTransactionParts = new Hashtable, TransactionPart> >(); - newCommitParts = new Hashtable, CommitPart> >(); - lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); - liveTransactionBySequenceNumberTable = new Hashtable(); - liveTransactionByTransactionIdTable = new Hashtable, Transaction>(); - liveCommitsTable = new Hashtable >(); - liveCommitsByKeyTable = new Hashtable(); - lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); - rejectedSlotVector = new Vector(); - pendingTransactionQueue = new Vector(); - pendingSendArbitrationEntriesToDelete = new Vector(); - transactionPartsSent = new Hashtable >(); - outstandingTransactionStatus = new Hashtable(); - liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new HashSet >(); - localCommunicationTable = new Hashtable >(); - lastTransactionSeenFromMachineFromServer = new Hashtable(); - pendingSendArbitrationRounds = new Vector(); - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); - - - // Other init stuff - numberOfSlots = buffer.capacity(); - setResizeThreshold(); - } - - // TODO: delete method - synchronized void printSlots() { - int64_t o = buffer.getOldestSeqNum(); - int64_t n = buffer.getNewestSeqNum(); - - int[] types = new int[10]; - - int num = 0; - - int livec = 0; - int deadc = 0; - - int casdasd = 0; - - int liveslo = 0; - - for (int64_t i = o; i < (n + 1); i++) { - Slot s = buffer.getSlot(i); - - - if (s.isLive()) { - liveslo++; - } +// TODO: delete method +synchronized void Table::printSlots() { + int64_t o = buffer.getOldestSeqNum(); + int64_t n = buffer.getNewestSeqNum(); - Vector entries = s.getEntries(); + int[] types = new int[10]; - for (Entry e : entries) { - if (e.isLive()) { - int type = e.getType(); + int num = 0; + int livec = 0; + int deadc = 0; - if (type == 6) { - RejectedMessage rej = (RejectedMessage)e; - casdasd++; + int casdasd = 0; - System.out.println(rej.getMachineID()); - } + int liveslo = 0; + for (int64_t i = o; i < (n + 1); i++) { + Slot s = buffer.getSlot(i); - types[type] = types[type] + 1; - num++; - livec++; - } else { - deadc++; - } - } - } - for (int i = 0; i < 10; i++) { - System.out.println(i + " " + types[i]); + if (s.isLive()) { + liveslo++; } - System.out.println("Live count: " + livec); - System.out.println("Live Slot count: " + liveslo); - System.out.println("Dead count: " + deadc); - System.out.println("Old: " + o); - System.out.println("New: " + n); - System.out.println("Size: " + buffer.size()); - // System.out.println("Commits: " + liveCommitsTable.size()); - System.out.println("pendingTrans: " + pendingTransactionQueue.size()); - System.out.println("Trans Status Out: " + outstandingTransactionStatus.size()); + Vector entries = s.getEntries(); + + for (Entry e : entries) { + if (e.isLive()) { + int type = e.getType(); - for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) { - System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k)); - } + if (type == 6) { + RejectedMessage rej = (RejectedMessage)e; + casdasd++; - for (Long a : liveCommitsTable.keySet()) { - for (Long b : liveCommitsTable.get(a).keySet()) { - for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) { - System.out.print(kv + " "); + System.out.println(rej.getMachineID()); } - System.out.print("|| "); + + + types[type] = types[type] + 1; + num++; + livec++; + } else { + deadc++; } - System.out.println(); } - } - /** - * Initialize the table by inserting a table status as the first entry into the table status - * also initialize the crypto stuff. - */ - synchronized void initTable() throws ServerException { - cloud.initSecurity(); + for (int i = 0; i < 10; i++) { + System.out.println(i + " " + types[i]); + } + System.out.println("Live count: " + livec); + System.out.println("Live Slot count: " + liveslo); + + System.out.println("Dead count: " + deadc); + System.out.println("Old: " + o); + System.out.println("New: " + n); + System.out.println("Size: " + buffer.size()); + // System.out.println("Commits: " + liveCommitsTable.size()); + System.out.println("pendingTrans: " + pendingTransactionQueue.size()); + System.out.println("Trans Status Out: " + outstandingTransactionStatus.size()); + + for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) { + System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k)); + } - // Create the first insertion into the block chain which is the table status - Slot s = new Slot(this, 1, localMachineId, localSequenceNumber); - localSequenceNumber++; - TableStatus status = new TableStatus(s, numberOfSlots); - s.addEntry(status); - Slot[] array = cloud.putSlot(s, numberOfSlots); - if (array == NULL) { - array = new Slot[] {s}; - // update local block chain - validateAndUpdate(array, true); - } else if (array.length == 1) { - // in case we did push the slot BUT we failed to init it - validateAndUpdate(array, true); - } else { - throw new Error("Error on initialization"); + for (Long a : liveCommitsTable.keySet()) { + for (Long b : liveCommitsTable.get(a).keySet()) { + for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) { + System.out.print(kv + " "); + } + System.out.print("|| "); } + System.out.println(); } - /** - * Rebuild the table from scratch by pulling the latest block chain from the server. - */ - synchronized void rebuild() throws ServerException { - // Just pull the latest slots from the server - Slot[] newslots = cloud.getSlots(sequenceNumber + 1); - validateAndUpdate(newslots, true); - sendToServer(NULL); - updateLiveTransactionsAndStatus(); +} +/** + * Initialize the table by inserting a table status as the first entry into the table status + * also initialize the crypto stuff. + */ +synchronized void Table::initTable() throws ServerException { + cloud.initSecurity(); + + // Create the first insertion into the block chain which is the table status + Slot s = new Slot(this, 1, localMachineId, localSequenceNumber); + localSequenceNumber++; + TableStatus status = new TableStatus(s, numberOfSlots); + s.addEntry(status); + Slot[] array = cloud.putSlot(s, numberOfSlots); + + if (array == NULL) { + array = new Slot[] {s}; + // update local block chain + validateAndUpdate(array, true); + } else if (array.length == 1) { + // in case we did push the slot BUT we failed to init it + validateAndUpdate(array, true); + } else { + throw new Error("Error on initialization"); } +} - // String toString() { - // String retString = " Committed Table: \n"; - // retString += "---------------------------\n"; - // retString += commitedTable.toString(); +/** + * Rebuild the table from scratch by pulling the latest block chain from the server. + */ +synchronized void Table::rebuild() throws ServerException { + // Just pull the latest slots from the server + Slot[] newslots = cloud.getSlots(sequenceNumber + 1); + validateAndUpdate(newslots, true); + sendToServer(NULL); + updateLiveTransactionsAndStatus(); - // retString += "\n\n"; +} - // retString += " Speculative Table: \n"; - // retString += "---------------------------\n"; - // retString += speculativeTable.toString(); +// String toString() { +// String retString = " Committed Table: \n"; +// retString += "---------------------------\n"; +// retString += commitedTable.toString(); - // return retString; - // } +// retString += "\n\n"; - synchronized void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) { - localCommunicationTable.put(arbitrator, new Pair(hostName, portNumber)); - } +// retString += " Speculative Table: \n"; +// retString += "---------------------------\n"; +// retString += speculativeTable.toString(); + +// return retString; +// } + +synchronized void Table::addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) { + localCommunicationTable.put(arbitrator, new Pair(hostName, portNumber)); +} + +synchronized Long Table::getArbitrator(IoTString key) { + return arbitratorTable.get(key); +} + +synchronized void Table::close() { + cloud.close(); +} - synchronized Long getArbitrator(IoTString key) { - return arbitratorTable.get(key); +synchronized IoTString Table::getCommitted(IoTString key) { + KeyValue kv = committedKeyValueTable.get(key); + + if (kv != NULL) { + return kv.getValue(); + } else { + return NULL; } +} - synchronized void close() { - cloud.close(); +synchronized IoTString Table::getSpeculative(IoTString key) { + KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key); + + if (kv == NULL) { + kv = speculatedKeyValueTable.get(key); } - synchronized IoTString getCommitted(IoTString key) { - KeyValue kv = committedKeyValueTable.get(key); + if (kv == NULL) { + kv = committedKeyValueTable.get(key); + } - if (kv != NULL) { - return kv.getValue(); - } else { - return NULL; - } + if (kv != NULL) { + return kv.getValue(); + } else { + return NULL; } +} - synchronized IoTString getSpeculative(IoTString key) { - KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key); +synchronized IoTString Table::getCommittedAtomic(IoTString key) { + KeyValue kv = committedKeyValueTable.get(key); - if (kv == NULL) { - kv = speculatedKeyValueTable.get(key); - } + if (arbitratorTable.get(key) == NULL) { + throw new Error("Key not Found."); + } - if (kv == NULL) { - kv = committedKeyValueTable.get(key); - } + // Make sure new key value pair matches the current arbitrator + if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { + // TODO: Maybe not throw en error + throw new Error("Not all Key Values Match Arbitrator."); + } - if (kv != NULL) { - return kv.getValue(); - } else { - return NULL; - } + if (kv != NULL) { + pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue())); + return kv.getValue(); + } else { + pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL)); + return NULL; } +} - synchronized IoTString getCommittedAtomic(IoTString key) { - KeyValue kv = committedKeyValueTable.get(key); +synchronized IoTString Table::getSpeculativeAtomic(IoTString key) { + if (arbitratorTable.get(key) == NULL) { + throw new Error("Key not Found."); + } - if (arbitratorTable.get(key) == NULL) { - throw new Error("Key not Found."); - } + // Make sure new key value pair matches the current arbitrator + if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { + // TODO: Maybe not throw en error + throw new Error("Not all Key Values Match Arbitrator."); + } - // Make sure new key value pair matches the current arbitrator - if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { - // TODO: Maybe not throw en error - throw new Error("Not all Key Values Match Arbitrator."); - } + KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key); - if (kv != NULL) { - pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue())); - return kv.getValue(); - } else { - pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL)); - return NULL; - } + if (kv == NULL) { + kv = speculatedKeyValueTable.get(key); } - synchronized IoTString getSpeculativeAtomic(IoTString key) { - if (arbitratorTable.get(key) == NULL) { - throw new Error("Key not Found."); - } + if (kv == NULL) { + kv = committedKeyValueTable.get(key); + } - // Make sure new key value pair matches the current arbitrator - if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { - // TODO: Maybe not throw en error - throw new Error("Not all Key Values Match Arbitrator."); - } + if (kv != NULL) { + pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue())); + return kv.getValue(); + } else { + pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL)); + return NULL; + } +} - KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key); +synchronized bool Table::update() { + try { + Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); + validateAndUpdate(newSlots, false); + sendToServer(NULL); - if (kv == NULL) { - kv = speculatedKeyValueTable.get(key); - } - if (kv == NULL) { - kv = committedKeyValueTable.get(key); - } + updateLiveTransactionsAndStatus(); - if (kv != NULL) { - pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue())); - return kv.getValue(); - } else { - pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL)); - return NULL; + return true; + } catch (Exception e) { + // e.printStackTrace(); + + for (Long m : localCommunicationTable.keySet()) { + updateFromLocal(m); } } - synchronized bool update() { - try { - Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); - validateAndUpdate(newSlots, false); - sendToServer(NULL); + return false; +} +synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) throws ServerException { + while (true) { + if (arbitratorTable.get(keyName) != NULL) { + // There is already an arbitrator + return false; + } - updateLiveTransactionsAndStatus(); + NewKey newKey = new NewKey(NULL, keyName, machineId); + if (sendToServer(newKey)) { + // If successfully inserted return true; - } catch (Exception e) { - // e.printStackTrace(); - - for (Long m : localCommunicationTable.keySet()) { - updateFromLocal(m); - } } - - return false; } +} - synchronized bool createNewKey(IoTString keyName, int64_t machineId) throws ServerException { - while (true) { - if (arbitratorTable.get(keyName) != NULL) { - // There is already an arbitrator - return false; - } +synchronized void Table::startTransaction() { + // Create a new transaction, invalidates any old pending transactions. + pendingTransactionBuilder = new PendingTransaction(localMachineId); +} - NewKey newKey = new NewKey(NULL, keyName, machineId); +synchronized void Table::addKV(IoTString key, IoTString value) { - if (sendToServer(newKey)) { - // If successfully inserted - return true; - } - } + // Make sure it is a valid key + if (arbitratorTable.get(key) == NULL) { + throw new Error("Key not Found."); } - synchronized void startTransaction() { - // Create a new transaction, invalidates any old pending transactions. - pendingTransactionBuilder = new PendingTransaction(localMachineId); + // Make sure new key value pair matches the current arbitrator + if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { + // TODO: Maybe not throw en error + throw new Error("Not all Key Values Match Arbitrator."); } - synchronized void addKV(IoTString key, IoTString value) { - - // Make sure it is a valid key - if (arbitratorTable.get(key) == NULL) { - throw new Error("Key not Found."); - } + // Add the key value to this transaction + KeyValue kv = new KeyValue(key, value); + pendingTransactionBuilder.addKV(kv); +} - // Make sure new key value pair matches the current arbitrator - if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { - // TODO: Maybe not throw en error - throw new Error("Not all Key Values Match Arbitrator."); - } +synchronized TransactionStatus Table::commitTransaction() { - // Add the key value to this transaction - KeyValue kv = new KeyValue(key, value); - pendingTransactionBuilder.addKV(kv); + if (pendingTransactionBuilder.getKVUpdates().size() == 0) { + // transaction with no updates will have no effect on the system + return new TransactionStatus(TransactionStatus.StatusNoEffect, -1); } - synchronized TransactionStatus commitTransaction() { + // Set the local transaction sequence number and increment + pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber); + localTransactionSequenceNumber++; - if (pendingTransactionBuilder.getKVUpdates().size() == 0) { - // transaction with no updates will have no effect on the system - return new TransactionStatus(TransactionStatus.StatusNoEffect, -1); - } - - // Set the local transaction sequence number and increment - pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber); - localTransactionSequenceNumber++; + // Create the transaction status + TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator()); - // Create the transaction status - TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator()); + // Create the new transaction + Transaction newTransaction = pendingTransactionBuilder.createTransaction(); + newTransaction.setTransactionStatus(transactionStatus); - // Create the new transaction - Transaction newTransaction = pendingTransactionBuilder.createTransaction(); - newTransaction.setTransactionStatus(transactionStatus); - - if (pendingTransactionBuilder.getArbitrator() != localMachineId) { - // Add it to the queue and invalidate the builder for safety - pendingTransactionQueue.add(newTransaction); - } else { - arbitrateOnLocalTransaction(newTransaction); - updateLiveStateFromLocal(); - } + if (pendingTransactionBuilder.getArbitrator() != localMachineId) { + // Add it to the queue and invalidate the builder for safety + pendingTransactionQueue.add(newTransaction); + } else { + arbitrateOnLocalTransaction(newTransaction); + updateLiveStateFromLocal(); + } - pendingTransactionBuilder = new PendingTransaction(localMachineId); + pendingTransactionBuilder = new PendingTransaction(localMachineId); - try { - sendToServer(NULL); - } catch (ServerException e) { + try { + sendToServer(NULL); + } catch (ServerException e) { - Set arbitratorTriedAndFailed = new HashSet(); - for (Iterator iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) { - Transaction transaction = iter.next(); + Set arbitratorTriedAndFailed = new HashSet(); + for (Iterator iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) { + Transaction transaction = iter.next(); - if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) { - // Already contacted this client so ignore all attempts to contact this client - // to preserve ordering for arbitrator - continue; - } + if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) { + // Already contacted this client so ignore all attempts to contact this client + // to preserve ordering for arbitrator + continue; + } - Pair sendReturn = sendTransactionToLocal(transaction); + Pair sendReturn = sendTransactionToLocal(transaction); - if (sendReturn.getFirst()) { - // Failed to contact over local - arbitratorTriedAndFailed.add(transaction.getArbitrator()); - } else { - // Successful contact or should not contact + if (sendReturn.getFirst()) { + // Failed to contact over local + arbitratorTriedAndFailed.add(transaction.getArbitrator()); + } else { + // Successful contact or should not contact - if (sendReturn.getSecond()) { - // did arbitrate - iter.remove(); - } + if (sendReturn.getSecond()) { + // did arbitrate + iter.remove(); } } } - - updateLiveStateFromLocal(); - - return transactionStatus; - } - - /** - * Get the machine ID for this client - */ - int64_t getMachineId() { - return localMachineId; - } - - /** - * Decrement the number of live slots that we currently have - */ - void decrementLiveCount() { - liveSlotCount--; - } - - /** - * Recalculate the new resize threshold - */ - void setResizeThreshold() { - int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots); - bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower); - } - - int64_t getLocalSequenceNumber() { - return localSequenceNumber; } + updateLiveStateFromLocal(); - bool lastInsertedNewKey = false; - - bool sendToServer(NewKey newKey) throws ServerException { + return transactionStatus; +} - bool fromRetry = false; +/** + * Get the machine ID for this client + */ +int64_t Table::getMachineId() { + return localMachineId; +} - try { - if (hadPartialSendToServer) { - Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); - if (newSlots.length == 0) { - fromRetry = true; - ThreeTuple sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); +/** + * Decrement the number of live slots that we currently have + */ +void Table::decrementLiveCount() { + liveSlotCount--; +} - if (sendSlotsReturn.getFirst()) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { - newKey = NULL; - } - } +/** + * Recalculate the new resize threshold + */ +void Table::setResizeThreshold() { + int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots); + bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower); +} - for (Transaction transaction : lastTransactionPartsSent.keySet()) { - transaction.resetServerFailure(); +int64_t Table::getLocalSequenceNumber() { + return localSequenceNumber; +} - // Update which transactions parts still need to be sent - transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); - // Add the transaction status to the outstanding list - outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); +bool lastInsertedNewKey = false; - // Update the transaction status - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); +bool Table::sendToServer(NewKey newKey) throws ServerException { - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction.didSendAllParts()) { - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); - pendingTransactionQueue.remove(transaction); - } - } - } else { + bool fromRetry = false; - newSlots = sendSlotsReturn.getThird(); + try { + if (hadPartialSendToServer) { + Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); + if (newSlots.length == 0) { + fromRetry = true; + ThreeTuple sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); - bool isInserted = false; - for (Slot s : newSlots) { - if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { - isInserted = true; - break; - } + if (sendSlotsReturn.getFirst()) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + newKey = NULL; } + } - for (Slot s : newSlots) { - if (isInserted) { - break; - } - - // Process each entry in the slot - for (Entry entry : s.getEntries()) { + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); - if (entry.getType() == Entry.TypeLastMessage) { - LastMessage lastMessage = (LastMessage)entry; - if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } + // Update which transactions parts still need to be sent + transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); - if (isInserted) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { - newKey = NULL; - } - } + // Add the transaction status to the outstanding list + outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); - for (Transaction transaction : lastTransactionPartsSent.keySet()) { - transaction.resetServerFailure(); + // Update the transaction status + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); - // Update which transactions parts still need to be sent - transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); - - // Update the transaction status - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction.didSendAllParts()) { - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); - pendingTransactionQueue.remove(transaction); - } else { - transaction.resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { - transaction.setSequenceNumber(-1); - } - } - } + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction.didSendAllParts()) { + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + pendingTransactionQueue.remove(transaction); } } + } else { - for (Transaction transaction : lastTransactionPartsSent.keySet()) { - transaction.resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { - transaction.setSequenceNumber(-1); - } - } + newSlots = sendSlotsReturn.getThird(); - if (sendSlotsReturn.getThird().length != 0) { - // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getThird(), true); - } - // continue; - } else { bool isInserted = false; for (Slot s : newSlots) { if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { @@ -690,110 +533,61 @@ final class Table { } } } - } else { - for (Transaction transaction : lastTransactionPartsSent.keySet()) { - transaction.resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { - transaction.setSequenceNumber(-1); - } - } } - - // insert into the local block chain - validateAndUpdate(newSlots, true); } - } - } catch (ServerException e) { - throw e; - } - - - - try { - // While we have stuff that needs inserting into the block chain - while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) { - fromRetry = false; - - if (hadPartialSendToServer) { - throw new Error("Should Be error free"); + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } } - - - // If there is a new key with same name then end - if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) { - return false; + if (sendSlotsReturn.getThird().length != 0) { + // insert into the local block chain + validateAndUpdate(sendSlotsReturn.getThird(), true); } - - // Create the slot - Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber); - localSequenceNumber++; - - // Try to fill the slot with data - ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); - bool needsResize = fillSlotsReturn.getFirst(); - int newSize = fillSlotsReturn.getSecond(); - bool insertedNewKey = fillSlotsReturn.getThird(); - - if (needsResize) { - // Reset which transaction to send - for (Transaction transaction : transactionPartsSent.keySet()) { - transaction.resetNextPartToSend(); - - // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { - transaction.setSequenceNumber(-1); - } + // continue; + } else { + bool isInserted = false; + for (Slot s : newSlots) { + if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + isInserted = true; + break; } - - // Clear the sent data since we are trying again - pendingSendArbitrationEntriesToDelete.clear(); - transactionPartsSent.clear(); - - // We needed a resize so try again - fillSlot(slot, true, newKey); } - lastSlotAttemptedToSend = slot; - lastIsNewKey = (newKey != NULL); - lastInsertedNewKey = insertedNewKey; - lastNewSize = newSize; - lastNewKey = newKey; - lastTransactionPartsSent = new Hashtable >(transactionPartsSent); - lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); - - - ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); - - if (sendSlotsReturn.getFirst()) { - - // Did insert into the block chain + for (Slot s : newSlots) { + if (isInserted) { + break; + } - if (insertedNewKey) { - // This slot was what was inserted not a previous slot + // Process each entry in the slot + for (Entry entry : s.getEntries()) { - // New Key was successfully inserted into the block chain so dont want to insert it again - newKey = NULL; + if (entry.getType() == Entry.TypeLastMessage) { + LastMessage lastMessage = (LastMessage)entry; + if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { + isInserted = true; + break; + } + } } + } - // Remove the aborts and commit parts that were sent from the pending to send queue - for (Iterator iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) { - ArbitrationRound round = iter.next(); - round.removeParts(pendingSendArbitrationEntriesToDelete); - - if (round.isDoneSending()) { - // Sent all the parts - iter.remove(); + if (isInserted) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + newKey = NULL; } } - for (Transaction transaction : transactionPartsSent.keySet()) { + for (Transaction transaction : lastTransactionPartsSent.keySet()) { transaction.resetServerFailure(); // Update which transactions parts still need to be sent - transaction.removeSentParts(transactionPartsSent.get(transaction)); + transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); // Add the transaction status to the outstanding list outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); @@ -805,67 +599,63 @@ final class Table { if (transaction.didSendAllParts()) { transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); pendingTransactionQueue.remove(transaction); + } else { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } } } } else { - - // if (!sendSlotsReturn.getSecond()) { - // for (Transaction transaction : lastTransactionPartsSent.keySet()) { - // transaction.resetServerFailure(); - // } - // } else { - // for (Transaction transaction : lastTransactionPartsSent.keySet()) { - // transaction.resetServerFailure(); - - // // Update which transactions parts still need to be sent - // transaction.removeSentParts(transactionPartsSent.get(transaction)); - - // // Add the transaction status to the outstanding list - // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); - - // // Update the transaction status - // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); - - // // Check if all the transaction parts were successfully sent and if so then remove it from pending - // if (transaction.didSendAllParts()) { - // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); - // pendingTransactionQueue.remove(transaction); - - // for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber()); - // } - // } - // } - // } - - // Reset which transaction to send - for (Transaction transaction : transactionPartsSent.keySet()) { - transaction.resetNextPartToSend(); - // transaction.resetNextPartToSend(); - + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { + if (!transaction.didSendAPartToServer()) { transaction.setSequenceNumber(-1); } } } - // Clear the sent data in preparation for next send - pendingSendArbitrationEntriesToDelete.clear(); - transactionPartsSent.clear(); + // insert into the local block chain + validateAndUpdate(newSlots, true); + } + } + } catch (ServerException e) { + throw e; + } - if (sendSlotsReturn.getThird().length != 0) { - // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getThird(), true); - } + + + try { + // While we have stuff that needs inserting into the block chain + while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) { + + fromRetry = false; + + if (hadPartialSendToServer) { + throw new Error("Should Be error free"); } - } catch (ServerException e) { - if (e.getType() != ServerException.TypeInputTimeout) { - // e.printStackTrace(); - // Nothing was able to be sent to the server so just clear these data structures + // If there is a new key with same name then end + if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) { + return false; + } + + // Create the slot + Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber); + localSequenceNumber++; + + // Try to fill the slot with data + ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); + bool needsResize = fillSlotsReturn.getFirst(); + int newSize = fillSlotsReturn.getSecond(); + bool insertedNewKey = fillSlotsReturn.getThird(); + + if (needsResize) { + // Reset which transaction to send for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); @@ -874,963 +664,1141 @@ final class Table { transaction.setSequenceNumber(-1); } } - } else { - // There was a partial send to the server - hadPartialSendToServer = true; + // Clear the sent data since we are trying again + pendingSendArbitrationEntriesToDelete.clear(); + transactionPartsSent.clear(); + + // We needed a resize so try again + fillSlot(slot, true, newKey); + } + + lastSlotAttemptedToSend = slot; + lastIsNewKey = (newKey != NULL); + lastInsertedNewKey = insertedNewKey; + lastNewSize = newSize; + lastNewKey = newKey; + lastTransactionPartsSent = new Hashtable >(transactionPartsSent); + lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); + + + ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); + + if (sendSlotsReturn.getFirst()) { + + // Did insert into the block chain + + if (insertedNewKey) { + // This slot was what was inserted not a previous slot + + // New Key was successfully inserted into the block chain so dont want to insert it again + newKey = NULL; + } + + // Remove the aborts and commit parts that were sent from the pending to send queue + for (Iterator iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) { + ArbitrationRound round = iter.next(); + round.removeParts(pendingSendArbitrationEntriesToDelete); + + if (round.isDoneSending()) { + // Sent all the parts + iter.remove(); + } + } + + for (Transaction transaction : transactionPartsSent.keySet()) { + transaction.resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction.removeSentParts(transactionPartsSent.get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); - // if (!fromRetry) { - // lastTransactionPartsSent = new Hashtable>(transactionPartsSent); - // lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); + // Update the transaction status + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction.didSendAllParts()) { + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + pendingTransactionQueue.remove(transaction); + } + } + } else { + + // if (!sendSlotsReturn.getSecond()) { + // for (Transaction transaction : lastTransactionPartsSent.keySet()) { + // transaction.resetServerFailure(); + // } + // } else { + // for (Transaction transaction : lastTransactionPartsSent.keySet()) { + // transaction.resetServerFailure(); + + // // Update which transactions parts still need to be sent + // transaction.removeSentParts(transactionPartsSent.get(transaction)); + + // // Add the transaction status to the outstanding list + // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + + // // Update the transaction status + // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // // Check if all the transaction parts were successfully sent and if so then remove it from pending + // if (transaction.didSendAllParts()) { + // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + // pendingTransactionQueue.remove(transaction); + + // for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber()); + // } + // } + // } // } - // Nothing was able to be sent to the server so just clear these data structures + // Reset which transaction to send for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); - transaction.setServerFailure(); + // transaction.resetNextPartToSend(); + + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { + transaction.setSequenceNumber(-1); + } } } + // Clear the sent data in preparation for next send pendingSendArbitrationEntriesToDelete.clear(); transactionPartsSent.clear(); - throw e; - } - - return newKey == NULL; - } - - synchronized bool updateFromLocal(int64_t machineId) { - Pair localCommunicationInformation = localCommunicationTable.get(machineId); - if (localCommunicationInformation == NULL) { - // Cant talk to that device locally so do nothing - return false; + if (sendSlotsReturn.getThird().length != 0) { + // insert into the local block chain + validateAndUpdate(sendSlotsReturn.getThird(), true); + } } - // Get the size of the send data - int sendDataSize = sizeof(int32_t) + sizeof(int64_t); + } catch (ServerException e) { - Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) { - lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId); - } - - char[] sendData = new char[sendDataSize]; - ByteBuffer bbEncode = ByteBuffer.wrap(sendData); + if (e.getType() != ServerException.TypeInputTimeout) { + // e.printStackTrace(); - // Encode the data - bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); - bbEncode.putInt(0); + // Nothing was able to be sent to the server so just clear these data structures + for (Transaction transaction : transactionPartsSent.keySet()) { + transaction.resetNextPartToSend(); - // Send by local - char[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); - localSequenceNumber++; + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { + transaction.setSequenceNumber(-1); + } + } + } else { + // There was a partial send to the server + hadPartialSendToServer = true; - if (returnData == NULL) { - // Could not contact server - return false; - } - // Decode the data - ByteBuffer bbDecode = ByteBuffer.wrap(returnData); - int numberOfEntries = bbDecode.getInt(); + // if (!fromRetry) { + // lastTransactionPartsSent = new Hashtable>(transactionPartsSent); + // lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); + // } - for (int i = 0; i < numberOfEntries; i++) { - char type = bbDecode.get(); - if (type == Entry.TypeAbort) { - Abort abort = (Abort)Abort.decode(NULL, bbDecode); - processEntry(abort); - } else if (type == Entry.TypeCommitPart) { - CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode); - processEntry(commitPart); + // Nothing was able to be sent to the server so just clear these data structures + for (Transaction transaction : transactionPartsSent.keySet()) { + transaction.resetNextPartToSend(); + transaction.setServerFailure(); } } - updateLiveStateFromLocal(); + pendingSendArbitrationEntriesToDelete.clear(); + transactionPartsSent.clear(); - return true; + throw e; } - Pair sendTransactionToLocal(Transaction transaction) { + return newKey == NULL; +} - // Get the devices local communications - Pair localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator()); +synchronized bool Table::updateFromLocal(int64_t machineId) { + Pair localCommunicationInformation = localCommunicationTable.get(machineId); + if (localCommunicationInformation == NULL) { + // Cant talk to that device locally so do nothing + return false; + } - if (localCommunicationInformation == NULL) { - // Cant talk to that device locally so do nothing - return new Pair(true, false); - } + // Get the size of the send data + int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - // Get the size of the send data - int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - for (TransactionPart part : transaction.getParts().values()) { - sendDataSize += part.getSize(); - } + Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId); + } - Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) { - lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()); - } + Array *sendData = new char[sendDataSize]; + ByteBuffer bbEncode = ByteBuffer.wrap(sendData); - // Make the send data size - char[] sendData = new char[sendDataSize]; - ByteBuffer bbEncode = ByteBuffer.wrap(sendData); + // Encode the data + bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode.putInt(0); - // Encode the data - bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); - bbEncode.putInt(transaction.getParts().size()); - for (TransactionPart part : transaction.getParts().values()) { - part.encode(bbEncode); - } + // Send by local + Array *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + localSequenceNumber++; + if (returnData == NULL) { + // Could not contact server + return false; + } - // Send by local - char[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); - localSequenceNumber++; + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(returnData); + int numberOfEntries = bbDecode.getInt(); - if (returnData == NULL) { - // Could not contact server - return new Pair(true, false); + for (int i = 0; i < numberOfEntries; i++) { + char type = bbDecode.get(); + if (type == Entry.TypeAbort) { + Abort abort = (Abort)Abort.decode(NULL, bbDecode); + processEntry(abort); + } else if (type == Entry.TypeCommitPart) { + CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode); + processEntry(commitPart); } + } - // Decode the data - ByteBuffer bbDecode = ByteBuffer.wrap(returnData); - bool didCommit = bbDecode.get() == 1; - bool couldArbitrate = bbDecode.get() == 1; - int numberOfEntries = bbDecode.getInt(); - bool foundAbort = false; + updateLiveStateFromLocal(); - for (int i = 0; i < numberOfEntries; i++) { - char type = bbDecode.get(); - if (type == Entry.TypeAbort) { - Abort abort = (Abort)Abort.decode(NULL, bbDecode); + return true; +} - if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) { - foundAbort = true; - } +Pair Table::sendTransactionToLocal(Transaction transaction) { - processEntry(abort); - } else if (type == Entry.TypeCommitPart) { - CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode); - processEntry(commitPart); - } - } + // Get the devices local communications + Pair localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator()); - updateLiveStateFromLocal(); + if (localCommunicationInformation == NULL) { + // Cant talk to that device locally so do nothing + return new Pair(true, false); + } - if (couldArbitrate) { - TransactionStatus status = transaction.getTransactionStatus(); - if (didCommit) { - status.setStatus(TransactionStatus.StatusCommitted); - } else { - status.setStatus(TransactionStatus.StatusAborted); - } - } else { - TransactionStatus status = transaction.getTransactionStatus(); - if (foundAbort) { - status.setStatus(TransactionStatus.StatusAborted); - } else { - status.setStatus(TransactionStatus.StatusCommitted); - } - } + // Get the size of the send data + int sendDataSize = sizeof(int32_t) + sizeof(int64_t); + for (TransactionPart part : transaction.getParts().values()) { + sendDataSize += part.getSize(); + } - return new Pair(false, true); + Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()); } - synchronized char[] acceptDataFromLocal(char[] data) { + // Make the send data size + Array *sendData = new char[sendDataSize]; + ByteBuffer bbEncode = ByteBuffer.wrap(sendData); - // Decode the data - ByteBuffer bbDecode = ByteBuffer.wrap(data); - int64_t lastArbitratedSequenceNumberSeen = bbDecode.getLong(); - int numberOfParts = bbDecode.getInt(); + // Encode the data + bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode.putInt(transaction.getParts().size()); + for (TransactionPart part : transaction.getParts().values()) { + part.encode(bbEncode); + } - // If we did commit a transaction or not - bool didCommit = false; - bool couldArbitrate = false; - if (numberOfParts != 0) { + // Send by local + Array *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + localSequenceNumber++; - // decode the transaction - Transaction transaction = new Transaction(); - for (int i = 0; i < numberOfParts; i++) { - bbDecode.get(); - TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode); - transaction.addPartDecode(newPart); - } + if (returnData == NULL) { + // Could not contact server + return new Pair(true, false); + } - // Arbitrate on transaction and pull relevant return data - Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); - couldArbitrate = localArbitrateReturn.getFirst(); - didCommit = localArbitrateReturn.getSecond(); + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(returnData); + bool didCommit = bbDecode.get() == 1; + bool couldArbitrate = bbDecode.get() == 1; + int numberOfEntries = bbDecode.getInt(); + bool foundAbort = false; - updateLiveStateFromLocal(); + for (int i = 0; i < numberOfEntries; i++) { + char type = bbDecode.get(); + if (type == Entry.TypeAbort) { + Abort abort = (Abort)Abort.decode(NULL, bbDecode); - // Transaction was sent to the server so keep track of it to prevent double commit - if (transaction.getSequenceNumber() != -1) { - offlineTransactionsCommittedAndAtServer.add(transaction.getId()); + if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) { + foundAbort = true; } - } - // The data to send back - int returnDataSize = 0; - Vector unseenArbitrations = new Vector(); + processEntry(abort); + } else if (type == Entry.TypeCommitPart) { + CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode); + processEntry(commitPart); + } + } - // Get the aborts to send back - Vector abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal.keySet()); - Collections.sort(abortLocalSequenceNumbers); - for (Long localSequenceNumber : abortLocalSequenceNumbers) { - if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { - continue; - } + updateLiveStateFromLocal(); - Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber); - unseenArbitrations.add(abort); - returnDataSize += abort.getSize(); + if (couldArbitrate) { + TransactionStatus status = transaction.getTransactionStatus(); + if (didCommit) { + status.setStatus(TransactionStatus.StatusCommitted); + } else { + status.setStatus(TransactionStatus.StatusAborted); + } + } else { + TransactionStatus status = transaction.getTransactionStatus(); + if (foundAbort) { + status.setStatus(TransactionStatus.StatusAborted); + } else { + status.setStatus(TransactionStatus.StatusCommitted); } + } - // Get the commits to send back - Hashtable commitForClientTable = liveCommitsTable.get(localMachineId); - if (commitForClientTable != NULL) { - Vector commitLocalSequenceNumbers = new Vector(commitForClientTable.keySet()); - Collections.sort(commitLocalSequenceNumbers); + return new Pair(false, true); +} - for (Long localSequenceNumber : commitLocalSequenceNumbers) { - Commit commit = commitForClientTable.get(localSequenceNumber); +synchronized Array *Table::acceptDataFromLocal(Array *data) { - if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { - continue; - } + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(data); + int64_t lastArbitratedSequenceNumberSeen = bbDecode.getLong(); + int numberOfParts = bbDecode.getInt(); - unseenArbitrations.addAll(commit.getParts().values()); + // If we did commit a transaction or not + bool didCommit = false; + bool couldArbitrate = false; - for (CommitPart commitPart : commit.getParts().values()) { - returnDataSize += commitPart.getSize(); - } - } + if (numberOfParts != 0) { + + // decode the transaction + Transaction transaction = new Transaction(); + for (int i = 0; i < numberOfParts; i++) { + bbDecode.get(); + TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode); + transaction.addPartDecode(newPart); } - // Number of arbitration entries to decode - returnDataSize += 2 * sizeof(int32_t); + // Arbitrate on transaction and pull relevant return data + Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); + couldArbitrate = localArbitrateReturn.getFirst(); + didCommit = localArbitrateReturn.getSecond(); + + updateLiveStateFromLocal(); - // bool of did commit or not - if (numberOfParts != 0) { - returnDataSize += sizeof(char); + // Transaction was sent to the server so keep track of it to prevent double commit + if (transaction.getSequenceNumber() != -1) { + offlineTransactionsCommittedAndAtServer.add(transaction.getId()); } + } - // Data to send Back - char[] returnData = new char[returnDataSize]; - ByteBuffer bbEncode = ByteBuffer.wrap(returnData); + // The data to send back + int returnDataSize = 0; + Vector unseenArbitrations = new Vector(); - if (numberOfParts != 0) { - if (didCommit) { - bbEncode.put((char)1); - } else { - bbEncode.put((char)0); + // Get the aborts to send back + Vector abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal.keySet()); + Collections.sort(abortLocalSequenceNumbers); + for (Long localSequenceNumber : abortLocalSequenceNumbers) { + if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { + continue; + } + + Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber); + unseenArbitrations.add(abort); + returnDataSize += abort.getSize(); + } + + // Get the commits to send back + Hashtable commitForClientTable = liveCommitsTable.get(localMachineId); + if (commitForClientTable != NULL) { + Vector commitLocalSequenceNumbers = new Vector(commitForClientTable.keySet()); + Collections.sort(commitLocalSequenceNumbers); + + for (Long localSequenceNumber : commitLocalSequenceNumbers) { + Commit commit = commitForClientTable.get(localSequenceNumber); + + if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { + continue; } - if (couldArbitrate) { - bbEncode.put((char)1); - } else { - bbEncode.put((char)0); + + unseenArbitrations.addAll(commit.getParts().values()); + + for (CommitPart commitPart : commit.getParts().values()) { + returnDataSize += commitPart.getSize(); } } + } - bbEncode.putInt(unseenArbitrations.size()); - for (Entry entry : unseenArbitrations) { - entry.encode(bbEncode); - } + // Number of arbitration entries to decode + returnDataSize += 2 * sizeof(int32_t); + + // bool of did commit or not + if (numberOfParts != 0) { + returnDataSize += sizeof(char); + } + // Data to send Back + Array *returnData = new char[returnDataSize]; + ByteBuffer bbEncode = ByteBuffer.wrap(returnData); - localSequenceNumber++; - return returnData; + if (numberOfParts != 0) { + if (didCommit) { + bbEncode.put((char)1); + } else { + bbEncode.put((char)0); + } + if (couldArbitrate) { + bbEncode.put((char)1); + } else { + bbEncode.put((char)0); + } } - ThreeTuple sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException { + bbEncode.putInt(unseenArbitrations.size()); + for (Entry entry : unseenArbitrations) { + entry.encode(bbEncode); + } - bool attemptedToSendToServerTmp = attemptedToSendToServer; - attemptedToSendToServer = true; - bool inserted = false; - bool lastTryInserted = false; + localSequenceNumber++; + return returnData; +} - Slot[] array = cloud.putSlot(slot, newSize); - if (array == NULL) { - array = new Slot[] {slot}; - rejectedSlotVector.clear(); - inserted = true; - } else { - if (array.length == 0) { - throw new Error("Server Error: Did not send any slots"); - } +ThreeTuple Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException { - // if (attemptedToSendToServerTmp) { - if (hadPartialSendToServer) { + bool attemptedToSendToServerTmp = attemptedToSendToServer; + attemptedToSendToServer = true; - bool isInserted = false; - for (Slot s : array) { - if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { - isInserted = true; - break; - } + bool inserted = false; + bool lastTryInserted = false; + + Slot[] array = cloud.putSlot(slot, newSize); + if (array == NULL) { + array = new Slot[] {slot}; + rejectedSlotVector.clear(); + inserted = true; + } else { + if (array.length == 0) { + throw new Error("Server Error: Did not send any slots"); + } + + // if (attemptedToSendToServerTmp) { + if (hadPartialSendToServer) { + + bool isInserted = false; + for (Slot s : array) { + if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + isInserted = true; + break; } + } - for (Slot s : array) { - if (isInserted) { - break; - } + for (Slot s : array) { + if (isInserted) { + break; + } - // Process each entry in the slot - for (Entry entry : s.getEntries()) { + // Process each entry in the slot + for (Entry entry : s.getEntries()) { - if (entry.getType() == Entry.TypeLastMessage) { - LastMessage lastMessage = (LastMessage)entry; + if (entry.getType() == Entry.TypeLastMessage) { + LastMessage lastMessage = (LastMessage)entry; - if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) { - isInserted = true; - break; - } + if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) { + isInserted = true; + break; } } } + } - if (!isInserted) { - rejectedSlotVector.add(slot.getSequenceNumber()); - lastTryInserted = false; - } else { - lastTryInserted = true; - } - } else { + if (!isInserted) { rejectedSlotVector.add(slot.getSequenceNumber()); lastTryInserted = false; + } else { + lastTryInserted = true; } + } else { + rejectedSlotVector.add(slot.getSequenceNumber()); + lastTryInserted = false; } - - return new ThreeTuple(inserted, lastTryInserted, array); } - /** - * Returns false if a resize was needed - */ - ThreeTuple fillSlot(Slot slot, bool resize, NewKey newKeyEntry) { - + return new ThreeTuple(inserted, lastTryInserted, array); +} - int newSize = 0; - if (liveSlotCount > bufferResizeThreshold) { - resize = true;//Resize is forced +/** + * Returns false if a resize was needed + */ +ThreeTuple *Table::fillSlot(Slot slot, bool resize, NewKey newKeyEntry) { - } - if (resize) { - newSize = (int) (numberOfSlots * RESIZE_MULTIPLE); - TableStatus status = new TableStatus(slot, newSize); - slot.addEntry(status); - } + int newSize = 0; + if (liveSlotCount > bufferResizeThreshold) { + resize = true; //Resize is forced - // Fill with rejected slots first before doing anything else - doRejectedMessages(slot); + } - // Do mandatory rescue of entries - ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); + if (resize) { + newSize = (int) (numberOfSlots * RESIZE_MULTIPLE); + TableStatus status = new TableStatus(slot, newSize); + slot.addEntry(status); + } - // Extract working variables - bool needsResize = mandatoryRescueReturn.getFirst(); - bool seenLiveSlot = mandatoryRescueReturn.getSecond(); - int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); + // Fill with rejected slots first before doing anything else + doRejectedMessages(slot); - if (needsResize && !resize) { - // We need to resize but we are not resizing so return false - return new ThreeTuple(true, NULL, NULL); - } + // Do mandatory rescue of entries + ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); - bool inserted = false; - if (newKeyEntry != NULL) { - newKeyEntry.setSlot(slot); - if (slot.hasSpace(newKeyEntry)) { + // Extract working variables + bool needsResize = mandatoryRescueReturn.getFirst(); + bool seenLiveSlot = mandatoryRescueReturn.getSecond(); + int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); - slot.addEntry(newKeyEntry); - inserted = true; - } - } + if (needsResize && !resize) { + // We need to resize but we are not resizing so return false + return new ThreeTuple(true, NULL, NULL); + } - // Clear the transactions, aborts and commits that were sent previously - transactionPartsSent.clear(); - pendingSendArbitrationEntriesToDelete.clear(); + bool inserted = false; + if (newKeyEntry != NULL) { + newKeyEntry.setSlot(slot); + if (slot.hasSpace(newKeyEntry)) { - for (ArbitrationRound round : pendingSendArbitrationRounds) { - bool isFull = false; - round.generateParts(); - Vector parts = round.getParts(); + slot.addEntry(newKeyEntry); + inserted = true; + } + } - // Insert pending arbitration data - for (Entry arbitrationData : parts) { + // Clear the transactions, aborts and commits that were sent previously + transactionPartsSent.clear(); + pendingSendArbitrationEntriesToDelete.clear(); - // If it is an abort then we need to set some information - if (arbitrationData instanceof Abort) { - ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber()); - } + for (ArbitrationRound round : pendingSendArbitrationRounds) { + bool isFull = false; + round.generateParts(); + Vector parts = round.getParts(); - if (!slot.hasSpace(arbitrationData)) { - // No space so cant do anything else with these data entries - isFull = true; - break; - } + // Insert pending arbitration data + for (Entry arbitrationData : parts) { - // Add to this current slot and add it to entries to delete - slot.addEntry(arbitrationData); - pendingSendArbitrationEntriesToDelete.add(arbitrationData); + // If it is an abort then we need to set some information + if (arbitrationData instanceof Abort) { + ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber()); } - if (isFull) { + if (!slot.hasSpace(arbitrationData)) { + // No space so cant do anything else with these data entries + isFull = true; break; } + + // Add to this current slot and add it to entries to delete + slot.addEntry(arbitrationData); + pendingSendArbitrationEntriesToDelete.add(arbitrationData); } - if (pendingTransactionQueue.size() > 0) { + if (isFull) { + break; + } + } - Transaction transaction = pendingTransactionQueue.get(0); + if (pendingTransactionQueue.size() > 0) { - // Set the transaction sequence number if it has yet to be inserted into the block chain - // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) { - // transaction.setSequenceNumber(slot.getSequenceNumber()); - // } + Transaction transaction = pendingTransactionQueue.get(0); - if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) { - transaction.setSequenceNumber(slot.getSequenceNumber()); - } + // Set the transaction sequence number if it has yet to be inserted into the block chain + // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) { + // transaction.setSequenceNumber(slot.getSequenceNumber()); + // } + if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) { + transaction.setSequenceNumber(slot.getSequenceNumber()); + } - while (true) { - TransactionPart part = transaction.getNextPartToSend(); - if (part == NULL) { - // Ran out of parts to send for this transaction so move on - break; - } + while (true) { + TransactionPart part = transaction.getNextPartToSend(); - if (slot.hasSpace(part)) { - slot.addEntry(part); - Vector partsSent = transactionPartsSent.get(transaction); - if (partsSent == NULL) { - partsSent = new Vector(); - transactionPartsSent.put(transaction, partsSent); - } - partsSent.add(part.getPartNumber()); + if (part == NULL) { + // Ran out of parts to send for this transaction so move on + break; + } + + if (slot.hasSpace(part)) { + slot.addEntry(part); + Vector partsSent = transactionPartsSent.get(transaction); + if (partsSent == NULL) { + partsSent = new Vector(); transactionPartsSent.put(transaction, partsSent); - } else { - break; } + partsSent.add(part.getPartNumber()); + transactionPartsSent.put(transaction, partsSent); + } else { + break; } } - - // Fill the remainder of the slot with rescue data - doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); - - return new ThreeTuple(false, newSize, inserted); } - void doRejectedMessages(Slot s) { - if (!rejectedSlotVector.isEmpty()) { - /* TODO: We should avoid generating a rejected message entry if - * there is already a sufficient entry in the queue (e.g., - * equalsto value of true and same sequence number). */ + // Fill the remainder of the slot with rescue data + doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); - int64_t old_seqn = rejectedSlotVector.firstElement(); - if (rejectedSlotVector.size() > REJECTED_THRESHOLD) { - int64_t new_seqn = rejectedSlotVector.lastElement(); - RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); + return new ThreeTuple(false, newSize, inserted); +} + +void Table::doRejectedMessages(Slot s) { + if (!rejectedSlotVector.isEmpty()) { + /* TODO: We should avoid generating a rejected message entry if + * there is already a sufficient entry in the queue (e.g., + * equalsto value of true and same sequence number). */ + + int64_t old_seqn = rejectedSlotVector.firstElement(); + if (rejectedSlotVector.size() > REJECTED_THRESHOLD) { + int64_t new_seqn = rejectedSlotVector.lastElement(); + RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); + s.addEntry(rm); + } else { + int64_t prev_seqn = -1; + int i = 0; + /* Go through list of missing messages */ + for (; i < rejectedSlotVector.size(); i++) { + int64_t curr_seqn = rejectedSlotVector.get(i); + Slot s_msg = buffer.getSlot(curr_seqn); + if (s_msg != NULL) + break; + prev_seqn = curr_seqn; + } + /* Generate rejected message entry for missing messages */ + if (prev_seqn != -1) { + RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); + s.addEntry(rm); + } + /* Generate rejected message entries for present messages */ + for (; i < rejectedSlotVector.size(); i++) { + int64_t curr_seqn = rejectedSlotVector.get(i); + Slot s_msg = buffer.getSlot(curr_seqn); + int64_t machineid = s_msg.getMachineID(); + RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); s.addEntry(rm); - } else { - int64_t prev_seqn = -1; - int i = 0; - /* Go through list of missing messages */ - for (; i < rejectedSlotVector.size(); i++) { - int64_t curr_seqn = rejectedSlotVector.get(i); - Slot s_msg = buffer.getSlot(curr_seqn); - if (s_msg != NULL) - break; - prev_seqn = curr_seqn; - } - /* Generate rejected message entry for missing messages */ - if (prev_seqn != -1) { - RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); - s.addEntry(rm); - } - /* Generate rejected message entries for present messages */ - for (; i < rejectedSlotVector.size(); i++) { - int64_t curr_seqn = rejectedSlotVector.get(i); - Slot s_msg = buffer.getSlot(curr_seqn); - int64_t machineid = s_msg.getMachineID(); - RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); - s.addEntry(rm); - } } } } +} - ThreeTuple doMandatoryResuce(Slot slot, bool resize) { - int64_t newestSequenceNumber = buffer.getNewestSeqNum(); - int64_t oldestSequenceNumber = buffer.getOldestSeqNum(); - if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { - oldestLiveSlotSequenceNumver = oldestSequenceNumber; - } +ThreeTuple Table::doMandatoryResuce(Slot slot, bool resize) { + int64_t newestSequenceNumber = buffer.getNewestSeqNum(); + int64_t oldestSequenceNumber = buffer.getOldestSeqNum(); + if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { + oldestLiveSlotSequenceNumver = oldestSequenceNumber; + } - int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver; - bool seenLiveSlot = false; - int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full - int64_t threshold = firstIfFull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point + int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver; + bool seenLiveSlot = false; + int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full + int64_t threshold = firstIfFull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point - // Mandatory Rescue - for (; currentSequenceNumber < threshold; currentSequenceNumber++) { - Slot previousSlot = buffer.getSlot(currentSequenceNumber); - // Push slot number forward - if (!seenLiveSlot) { - oldestLiveSlotSequenceNumver = currentSequenceNumber; - } + // Mandatory Rescue + for (; currentSequenceNumber < threshold; currentSequenceNumber++) { + Slot previousSlot = buffer.getSlot(currentSequenceNumber); + // Push slot number forward + if (!seenLiveSlot) { + oldestLiveSlotSequenceNumver = currentSequenceNumber; + } - if (!previousSlot.isLive()) { - continue; - } + if (!previousSlot.isLive()) { + continue; + } - // We have seen a live slot - seenLiveSlot = true; + // We have seen a live slot + seenLiveSlot = true; - // Get all the live entries for a slot - Vector liveEntries = previousSlot.getLiveEntries(resize); + // Get all the live entries for a slot + Vector liveEntries = previousSlot.getLiveEntries(resize); - // Iterate over all the live entries and try to rescue them - for (Entry liveEntry : liveEntries) { - if (slot.hasSpace(liveEntry)) { + // Iterate over all the live entries and try to rescue them + for (Entry liveEntry : liveEntries) { + if (slot.hasSpace(liveEntry)) { - // Enough space to rescue the entry - slot.addEntry(liveEntry); - } else if (currentSequenceNumber == firstIfFull) { - //if there's no space but the entry is about to fall off the queue - System.out.println("B");//? - return new ThreeTuple(true, seenLiveSlot, currentSequenceNumber); + // Enough space to rescue the entry + slot.addEntry(liveEntry); + } else if (currentSequenceNumber == firstIfFull) { + //if there's no space but the entry is about to fall off the queue + System.out.println("B"); //? + return new ThreeTuple(true, seenLiveSlot, currentSequenceNumber); - } } } - - // Did not resize - return new ThreeTuple(false, seenLiveSlot, currentSequenceNumber); } - void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) { - /* now go through live entries from least to greatest sequence number until - * either all live slots added, or the slot doesn't have enough room - * for SKIP_THRESHOLD consecutive entries*/ - int skipcount = 0; - int64_t newestseqnum = buffer.getNewestSeqNum(); -search: - for (; seqn <= newestseqnum; seqn++) { - Slot prevslot = buffer.getSlot(seqn); - //Push slot number forward - if (!seenliveslot) - oldestLiveSlotSequenceNumver = seqn; + // Did not resize + return new ThreeTuple(false, seenLiveSlot, currentSequenceNumber); +} - if (!prevslot.isLive()) - continue; - seenliveslot = true; - Vector liveentries = prevslot.getLiveEntries(resize); - for (Entry liveentry : liveentries) { - if (s.hasSpace(liveentry)) - s.addEntry(liveentry); - else { - skipcount++; - if (skipcount > SKIP_THRESHOLD) - break search; - } +void Table::doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) { + /* now go through live entries from least to greatest sequence number until + * either all live slots added, or the slot doesn't have enough room + * for SKIP_THRESHOLD consecutive entries*/ + int skipcount = 0; + int64_t newestseqnum = buffer.getNewestSeqNum(); +search: + for (; seqn <= newestseqnum; seqn++) { + Slot prevslot = buffer.getSlot(seqn); + //Push slot number forward + if (!seenliveslot) + oldestLiveSlotSequenceNumver = seqn; + + if (!prevslot.isLive()) + continue; + seenliveslot = true; + Vector liveentries = prevslot.getLiveEntries(resize); + for (Entry liveentry : liveentries) { + if (s.hasSpace(liveentry)) + s.addEntry(liveentry); + else { + skipcount++; + if (skipcount > SKIP_THRESHOLD) + break search; } } } +} - /** - * Checks for malicious activity and updates the local copy of the block chain. - */ - void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) { +/** + * Checks for malicious activity and updates the local copy of the block chain. + */ +void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) { - // The cloud communication layer has checked slot HMACs already before decoding - if (newSlots.length == 0) { - return; - } + // The cloud communication layer has checked slot HMACs already before decoding + if (newSlots.length == 0) { + return; + } - // Make sure all slots are newer than the last largest slot this client has seen - int64_t firstSeqNum = newSlots[0].getSequenceNumber(); - if (firstSeqNum <= sequenceNumber) { - throw new Error("Server Error: Sent older slots!"); - } + // Make sure all slots are newer than the last largest slot this client has seen + int64_t firstSeqNum = newSlots[0].getSequenceNumber(); + if (firstSeqNum <= sequenceNumber) { + throw new Error("Server Error: Sent older slots!"); + } - // Create an object that can access both new slots and slots in our local chain - // without committing slots to our local chain - SlotIndexer indexer = new SlotIndexer(newSlots, buffer); + // Create an object that can access both new slots and slots in our local chain + // without committing slots to our local chain + SlotIndexer indexer = new SlotIndexer(newSlots, buffer); - // Check that the HMAC chain is not broken - checkHMACChain(indexer, newSlots); + // Check that the HMAC chain is not broken + checkHMACChain(indexer, newSlots); - // Set to keep track of messages from clients - HashSet machineSet = new HashSet(lastMessageTable.keySet()); + // Set to keep track of messages from clients + HashSet machineSet = new HashSet(lastMessageTable.keySet()); - // Process each slots data - for (Slot slot : newSlots) { - processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); + // Process each slots data + for (Slot slot : newSlots) { + processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); - updateExpectedSize(); - } + updateExpectedSize(); + } - // If there is a gap, check to see if the server sent us everything. - if (firstSeqNum != (sequenceNumber + 1)) { + // If there is a gap, check to see if the server sent us everything. + if (firstSeqNum != (sequenceNumber + 1)) { - // Check the size of the slots that were sent down by the server. - // Can only check the size if there was a gap - checkNumSlots(newSlots.length); + // Check the size of the slots that were sent down by the server. + // Can only check the size if there was a gap + checkNumSlots(newSlots.length); - // Since there was a gap every machine must have pushed a slot or must have - // a last message message. If not then the server is hiding slots - if (!machineSet.isEmpty()) { - throw new Error("Missing record for machines: " + machineSet); - } + // Since there was a gap every machine must have pushed a slot or must have + // a last message message. If not then the server is hiding slots + if (!machineSet.isEmpty()) { + throw new Error("Missing record for machines: " + machineSet); } + } - // Update the size of our local block chain. - commitNewMaxSize(); - - // Commit new to slots to the local block chain. - for (Slot slot : newSlots) { + // Update the size of our local block chain. + commitNewMaxSize(); - // Insert this slot into our local block chain copy. - buffer.putSlot(slot); + // Commit new to slots to the local block chain. + for (Slot slot : newSlots) { - // Keep track of how many slots are currently live (have live data in them). - liveSlotCount++; - } + // Insert this slot into our local block chain copy. + buffer.putSlot(slot); - // Get the sequence number of the latest slot in the system - sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber(); - - updateLiveStateFromServer(); + // Keep track of how many slots are currently live (have live data in them). + liveSlotCount++; + } - // No Need to remember after we pulled from the server - offlineTransactionsCommittedAndAtServer.clear(); + // Get the sequence number of the latest slot in the system + sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber(); - // This is invalidated now - hadPartialSendToServer = false; - } + updateLiveStateFromServer(); - void updateLiveStateFromServer() { - // Process the new transaction parts - processNewTransactionParts(); + // No Need to remember after we pulled from the server + offlineTransactionsCommittedAndAtServer.clear(); - // Do arbitration on new transactions that were received - arbitrateFromServer(); + // This is invalidated now + hadPartialSendToServer = false; +} - // Update all the committed keys - bool didCommitOrSpeculate = updateCommittedTable(); +void Table::updateLiveStateFromServer() { + // Process the new transaction parts + processNewTransactionParts(); - // Delete the transactions that are now dead - updateLiveTransactionsAndStatus(); + // Do arbitration on new transactions that were received + arbitrateFromServer(); - // Do speculations - didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); - updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); - } + // Update all the committed keys + bool didCommitOrSpeculate = updateCommittedTable(); - void updateLiveStateFromLocal() { - // Update all the committed keys - bool didCommitOrSpeculate = updateCommittedTable(); + // Delete the transactions that are now dead + updateLiveTransactionsAndStatus(); - // Delete the transactions that are now dead - updateLiveTransactionsAndStatus(); + // Do speculations + didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); + updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); +} - // Do speculations - didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); - updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); - } +void Table::updateLiveStateFromLocal() { + // Update all the committed keys + bool didCommitOrSpeculate = updateCommittedTable(); - void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) { - // if (didFindTableStatus) { - // return; - // } - int64_t prevslots = firstSequenceNumber; + // Delete the transactions that are now dead + updateLiveTransactionsAndStatus(); + // Do speculations + didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); + updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); +} - if (didFindTableStatus) { - // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize; - // System.out.println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots); +void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) { + // if (didFindTableStatus) { + // return; + // } + int64_t prevslots = firstSequenceNumber; - } else { - expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots; - // System.out.println("Here: " + expectedsize); - } - // System.out.println(numberOfSlots); + if (didFindTableStatus) { + // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize; + // System.out.println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots); - didFindTableStatus = true; - currMaxSize = numberOfSlots; + } else { + expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots; + // System.out.println("Here: " + expectedsize); } - void updateExpectedSize() { - expectedsize++; + // System.out.println(numberOfSlots); - if (expectedsize > currMaxSize) { - expectedsize = currMaxSize; - } - } + didFindTableStatus = true; + currMaxSize = numberOfSlots; +} +void Table::updateExpectedSize() { + expectedsize++; - /** - * Check the size of the block chain to make sure there are enough slots sent back by the server. - * This is only called when we have a gap between the slots that we have locally and the slots - * sent by the server therefore in the slots sent by the server there will be at least 1 Table - * status message - */ - void checkNumSlots(int numberOfSlots) { - if (numberOfSlots != expectedsize) { - throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots); - } + if (expectedsize > currMaxSize) { + expectedsize = currMaxSize; } +} - void updateCurrMaxSize(int newmaxsize) { - currMaxSize = newmaxsize; + +/** + * Check the size of the block chain to make sure there are enough slots sent back by the server. + * This is only called when we have a gap between the slots that we have locally and the slots + * sent by the server therefore in the slots sent by the server there will be at least 1 Table + * status message + */ +void Table::checkNumSlots(int numberOfSlots) { + if (numberOfSlots != expectedsize) { + throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots); } +} +void Table::updateCurrMaxSize(int newmaxsize) { + currMaxSize = newmaxsize; +} - /** - * Update the size of of the local buffer if it is needed. - */ - void commitNewMaxSize() { - didFindTableStatus = false; - // Resize the local slot buffer - if (numberOfSlots != currMaxSize) { - buffer.resize((int)currMaxSize); - } +/** + * Update the size of of the local buffer if it is needed. + */ +void Table::commitNewMaxSize() { + didFindTableStatus = false; - // Change the number of local slots to the new size - numberOfSlots = (int)currMaxSize; + // Resize the local slot buffer + if (numberOfSlots != currMaxSize) { + buffer.resize((int)currMaxSize); + } + // Change the number of local slots to the new size + numberOfSlots = (int)currMaxSize; - // Recalculate the resize threshold since the size of the local buffer has changed - setResizeThreshold(); - } - /** - * Process the new transaction parts from this latest round of slots received from the server - */ - void processNewTransactionParts() { + // Recalculate the resize threshold since the size of the local buffer has changed + setResizeThreshold(); +} - if (newTransactionParts.size() == 0) { - // Nothing new to process - return; - } +/** + * Process the new transaction parts from this latest round of slots received from the server + */ +void Table::processNewTransactionParts() { - // Iterate through all the machine Ids that we received new parts for - for (Long machineId : newTransactionParts.keySet()) { - Hashtable, TransactionPart> parts = newTransactionParts.get(machineId); + if (newTransactionParts.size() == 0) { + // Nothing new to process + return; + } - // Iterate through all the parts for that machine Id - for (Pair partId : parts.keySet()) { - TransactionPart part = parts.get(partId); + // Iterate through all the machine Ids that we received new parts for + for (Long machineId : newTransactionParts.keySet()) { + Hashtable, TransactionPart> parts = newTransactionParts.get(machineId); - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part.getSequenceNumber())) { - // Set dead the transaction part - part.setDead(); - continue; - } + // Iterate through all the parts for that machine Id + for (Pair partId : parts.keySet()) { + TransactionPart part = parts.get(partId); - // Get the transaction object for that sequence number - Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber()); + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId()); + if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part.getSequenceNumber())) { + // Set dead the transaction part + part.setDead(); + continue; + } - if (transaction == NULL) { - // This is a new transaction that we dont have so make a new one - transaction = new Transaction(); + // Get the transaction object for that sequence number + Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber()); - // Insert this new transaction into the live tables - liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction); - } + if (transaction == NULL) { + // This is a new transaction that we dont have so make a new one + transaction = new Transaction(); - // Add that part to the transaction - transaction.addPartDecode(part); + // Insert this new transaction into the live tables + liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction); + liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction); } - } - // Clear all the new transaction parts in preparation for the next time the server sends slots - newTransactionParts.clear(); + // Add that part to the transaction + transaction.addPartDecode(part); + } } + // Clear all the new transaction parts in preparation for the next time the server sends slots + newTransactionParts.clear(); +} - int64_t lastSeqNumArbOn = 0; - void arbitrateFromServer() { +int64_t lastSeqNumArbOn = 0; - if (liveTransactionBySequenceNumberTable.size() == 0) { - // Nothing to arbitrate on so move on - return; - } +void Table::arbitrateFromServer() { + + if (liveTransactionBySequenceNumberTable.size() == 0) { + // Nothing to arbitrate on so move on + return; + } - // Get the transaction sequence numbers and sort from oldest to newest - Vector transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable.keySet()); - Collections.sort(transactionSequenceNumbers); + // Get the transaction sequence numbers and sort from oldest to newest + Vector transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable.keySet()); + Collections.sort(transactionSequenceNumbers); - // Collection of key value pairs that are - Hashtable speculativeTableTmp = new Hashtable(); + // Collection of key value pairs that are + Hashtable speculativeTableTmp = new Hashtable(); - // The last transaction arbitrated on - int64_t lastTransactionCommitted = -1; - Set generatedAborts = new HashSet(); + // The last transaction arbitrated on + int64_t lastTransactionCommitted = -1; + Set generatedAborts = new HashSet(); - for (Long transactionSequenceNumber : transactionSequenceNumbers) { - Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); + for (Long transactionSequenceNumber : transactionSequenceNumbers) { + Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); - // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction - if (transaction.getArbitrator() != localMachineId) { - continue; - } + // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction + if (transaction.getArbitrator() != localMachineId) { + continue; + } - if (transactionSequenceNumber < lastSeqNumArbOn) { - continue; - } + if (transactionSequenceNumber < lastSeqNumArbOn) { + continue; + } - if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) { - // We have seen this already locally so dont commit again - continue; - } + if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) { + // We have seen this already locally so dont commit again + continue; + } - if (!transaction.isComplete()) { - // Will arbitrate in incorrect order if we continue so just break - // Most likely this - break; - } + if (!transaction.isComplete()) { + // Will arbitrate in incorrect order if we continue so just break + // Most likely this + break; + } - // update the largest transaction seen by arbitrator from server - if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == NULL) { + // update the largest transaction seen by arbitrator from server + if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == NULL) { + lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); + } else { + Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()); + if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) { lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); - } else { - Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()); - if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) { - lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); - } } + } - if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { - // Guard evaluated as true + if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { + // Guard evaluated as true - // Update the local changes so we can make the commit - for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - speculativeTableTmp.put(kv.getKey(), kv); - } + // Update the local changes so we can make the commit + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + speculativeTableTmp.put(kv.getKey(), kv); + } - // Update what the last transaction committed was for use in batch commit - lastTransactionCommitted = transactionSequenceNumber; - } else { - // Guard evaluated was false so create abort + // Update what the last transaction committed was for use in batch commit + lastTransactionCommitted = transactionSequenceNumber; + } else { + // Guard evaluated was false so create abort + + // Create the abort + Abort newAbort = new Abort(NULL, + transaction.getClientLocalSequenceNumber(), + transaction.getSequenceNumber(), + transaction.getMachineId(), + transaction.getArbitrator(), + localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; - // Create the abort - Abort newAbort = new Abort(NULL, - transaction.getClientLocalSequenceNumber(), - transaction.getSequenceNumber(), - transaction.getMachineId(), - transaction.getArbitrator(), - localArbitrationSequenceNumber); - localArbitrationSequenceNumber++; + generatedAborts.add(newAbort); - generatedAborts.add(newAbort); + // Insert the abort so we can process + processEntry(newAbort); + } - // Insert the abort so we can process - processEntry(newAbort); - } + lastSeqNumArbOn = transactionSequenceNumber; - lastSeqNumArbOn = transactionSequenceNumber; + // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber); + } - // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber); - } + Commit newCommit = NULL; - Commit newCommit = NULL; + // If there is something to commit + if (speculativeTableTmp.size() != 0) { - // If there is something to commit - if (speculativeTableTmp.size() != 0) { + // Create the commit and increment the commit sequence number + newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); + localArbitrationSequenceNumber++; - // Create the commit and increment the commit sequence number - newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); - localArbitrationSequenceNumber++; + // Add all the new keys to the commit + for (KeyValue kv : speculativeTableTmp.values()) { + newCommit.addKV(kv); + } - // Add all the new keys to the commit - for (KeyValue kv : speculativeTableTmp.values()) { - newCommit.addKV(kv); - } + // create the commit parts + newCommit.createCommitParts(); - // create the commit parts - newCommit.createCommitParts(); + // Append all the commit parts to the end of the pending queue waiting for sending to the server - // Append all the commit parts to the end of the pending queue waiting for sending to the server + // Insert the commit so we can process it + for (CommitPart commitPart : newCommit.getParts().values()) { + processEntry(commitPart); + } + } - // Insert the commit so we can process it - for (CommitPart commitPart : newCommit.getParts().values()) { - processEntry(commitPart); + if ((newCommit != NULL) || (generatedAborts.size() > 0)) { + ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); + pendingSendArbitrationRounds.add(arbitrationRound); + + if (compactArbitrationData()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + if (newArbitrationRound.getCommit() != NULL) { + for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + processEntry(commitPart); + } } } + } +} - if ((newCommit != NULL) || (generatedAborts.size() > 0)) { - ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); - pendingSendArbitrationRounds.add(arbitrationRound); +Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { - if (compactArbitrationData()) { - ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); - if (newArbitrationRound.getCommit() != NULL) { - for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { - processEntry(commitPart); - } - } + // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction + if (transaction.getArbitrator() != localMachineId) { + return new Pair(false, false); + } + + if (!transaction.isComplete()) { + // Will arbitrate in incorrect order if we continue so just break + // Most likely this + return new Pair(false, false); + } + + if (transaction.getMachineId() != localMachineId) { + // dont do this check for local transactions + if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) { + if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) { + // We've have already seen this from the server + return new Pair(false, false); } } } - Pair arbitrateOnLocalTransaction(Transaction transaction) { + if (transaction.evaluateGuard(committedKeyValueTable, NULL, NULL)) { + // Guard evaluated as true - // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction - if (transaction.getArbitrator() != localMachineId) { - return new Pair(false, false); + // Create the commit and increment the commit sequence number + Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); + localArbitrationSequenceNumber++; + + // Update the local changes so we can make the commit + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + newCommit.addKV(kv); } - if (!transaction.isComplete()) { - // Will arbitrate in incorrect order if we continue so just break - // Most likely this - return new Pair(false, false); + // create the commit parts + newCommit.createCommitParts(); + + // Append all the commit parts to the end of the pending queue waiting for sending to the server + ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet()); + pendingSendArbitrationRounds.add(arbitrationRound); + + if (compactArbitrationData()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + processEntry(commitPart); + } + } else { + // Insert the commit so we can process it + for (CommitPart commitPart : newCommit.getParts().values()) { + processEntry(commitPart); + } } - if (transaction.getMachineId() != localMachineId) { - // dont do this check for local transactions - if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) { - if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) { - // We've have already seen this from the server - return new Pair(false, false); - } + if (transaction.getMachineId() == localMachineId) { + TransactionStatus status = transaction.getTransactionStatus(); + if (status != NULL) { + status.setStatus(TransactionStatus.StatusCommitted); } } - if (transaction.evaluateGuard(committedKeyValueTable, NULL, NULL)) { - // Guard evaluated as true + updateLiveStateFromLocal(); + return new Pair(true, true); + } else { - // Create the commit and increment the commit sequence number - Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); - localArbitrationSequenceNumber++; + if (transaction.getMachineId() == localMachineId) { + // For locally created messages update the status - // Update the local changes so we can make the commit - for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - newCommit.addKV(kv); + // Guard evaluated was false so create abort + TransactionStatus status = transaction.getTransactionStatus(); + if (status != NULL) { + status.setStatus(TransactionStatus.StatusAborted); } + } else { + Set addAbortSet = new HashSet(); + + + // Create the abort + Abort newAbort = new Abort(NULL, + transaction.getClientLocalSequenceNumber(), + -1, + transaction.getMachineId(), + transaction.getArbitrator(), + localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; + + addAbortSet.add(newAbort); - // create the commit parts - newCommit.createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server - ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet()); + ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet); pendingSendArbitrationRounds.add(arbitrationRound); if (compactArbitrationData()) { @@ -1838,238 +1806,213 @@ search: for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { processEntry(commitPart); } - } else { - // Insert the commit so we can process it - for (CommitPart commitPart : newCommit.getParts().values()) { - processEntry(commitPart); - } } + } + + updateLiveStateFromLocal(); + return new Pair(true, false); + } +} + +/** + * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates + */ +bool Table::compactArbitrationData() { + + if (pendingSendArbitrationRounds.size() < 2) { + // Nothing to compact so do nothing + return false; + } - if (transaction.getMachineId() == localMachineId) { - TransactionStatus status = transaction.getTransactionStatus(); - if (status != NULL) { - status.setStatus(TransactionStatus.StatusCommitted); - } - } + ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + if (lastRound.didSendPart()) { + return false; + } - updateLiveStateFromLocal(); - return new Pair(true, true); - } else { + bool hadCommit = (lastRound.getCommit() == NULL); + bool gotNewCommit = false; - if (transaction.getMachineId() == localMachineId) { - // For locally created messages update the status + int numberToDelete = 1; + while (numberToDelete < pendingSendArbitrationRounds.size()) { + ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1); - // Guard evaluated was false so create abort - TransactionStatus status = transaction.getTransactionStatus(); - if (status != NULL) { - status.setStatus(TransactionStatus.StatusAborted); - } - } else { - Set addAbortSet = new HashSet(); + if (round.isFull() || round.didSendPart()) { + // Stop since there is a part that cannot be compacted and we need to compact in order + break; + } + if (round.getCommit() == NULL) { - // Create the abort - Abort newAbort = new Abort(NULL, - transaction.getClientLocalSequenceNumber(), - -1, - transaction.getMachineId(), - transaction.getArbitrator(), - localArbitrationSequenceNumber); - localArbitrationSequenceNumber++; + // Try compacting aborts only + int newSize = round.getCurrentSize() + lastRound.getAbortsCount(); + if (newSize > ArbitrationRound.MAX_PARTS) { + // Cant compact since it would be too large + break; + } + lastRound.addAborts(round.getAborts()); + } else { - addAbortSet.add(newAbort); + // Create a new larger commit + Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; + // Create the commit parts so that we can count them + newCommit.createCommitParts(); - // Append all the commit parts to the end of the pending queue waiting for sending to the server - ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet); - pendingSendArbitrationRounds.add(arbitrationRound); + // Calculate the new size of the parts + int newSize = newCommit.getNumberOfParts(); + newSize += lastRound.getAbortsCount(); + newSize += round.getAbortsCount(); - if (compactArbitrationData()) { - ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); - for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { - processEntry(commitPart); - } - } + if (newSize > ArbitrationRound.MAX_PARTS) { + // Cant compact since it would be too large + break; } - updateLiveStateFromLocal(); - return new Pair(true, false); + // Set the new compacted part + lastRound.setCommit(newCommit); + lastRound.addAborts(round.getAborts()); + gotNewCommit = true; } - } - /** - * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates - */ - bool compactArbitrationData() { + numberToDelete++; + } - if (pendingSendArbitrationRounds.size() < 2) { - // Nothing to compact so do nothing - return false; - } + if (numberToDelete != 1) { + // If there is a compaction - ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); - if (lastRound.didSendPart()) { - return false; + // Delete the previous pieces that are now in the new compacted piece + if (numberToDelete == pendingSendArbitrationRounds.size()) { + pendingSendArbitrationRounds.clear(); + } else { + for (int i = 0; i < numberToDelete; i++) { + pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1); + } } - bool hadCommit = (lastRound.getCommit() == NULL); - bool gotNewCommit = false; + // Add the new compacted into the pending to send list + pendingSendArbitrationRounds.add(lastRound); - int numberToDelete = 1; - while (numberToDelete < pendingSendArbitrationRounds.size()) { - ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1); - - if (round.isFull() || round.didSendPart()) { - // Stop since there is a part that cannot be compacted and we need to compact in order - break; - } + // Should reinsert into the commit processor + if (hadCommit && gotNewCommit) { + return true; + } + } - if (round.getCommit() == NULL) { + return false; +} +// bool compactArbitrationData() { +// return false; +// } - // Try compacting aborts only - int newSize = round.getCurrentSize() + lastRound.getAbortsCount(); - if (newSize > ArbitrationRound.MAX_PARTS) { - // Cant compact since it would be too large - break; - } - lastRound.addAborts(round.getAborts()); - } else { +/** + * Update all the commits and the committed tables, sets dead the dead transactions + */ +bool Table::updateCommittedTable() { - // Create a new larger commit - Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber); - localArbitrationSequenceNumber++; + if (newCommitParts.size() == 0) { + // Nothing new to process + return false; + } - // Create the commit parts so that we can count them - newCommit.createCommitParts(); + // Iterate through all the machine Ids that we received new parts for + for (Long machineId : newCommitParts.keySet()) { + Hashtable, CommitPart> parts = newCommitParts.get(machineId); - // Calculate the new size of the parts - int newSize = newCommit.getNumberOfParts(); - newSize += lastRound.getAbortsCount(); - newSize += round.getAbortsCount(); + // Iterate through all the parts for that machine Id + for (Pair partId : parts.keySet()) { + CommitPart part = parts.get(partId); - if (newSize > ArbitrationRound.MAX_PARTS) { - // Cant compact since it would be too large - break; - } + // Get the transaction object for that sequence number + Hashtable commitForClientTable = liveCommitsTable.get(part.getMachineId()); - // Set the new compacted part - lastRound.setCommit(newCommit); - lastRound.addAborts(round.getAborts()); - gotNewCommit = true; + if (commitForClientTable == NULL) { + // This is the first commit from this device + commitForClientTable = new Hashtable(); + liveCommitsTable.put(part.getMachineId(), commitForClientTable); } - numberToDelete++; - } + Commit commit = commitForClientTable.get(part.getSequenceNumber()); - if (numberToDelete != 1) { - // If there is a compaction + if (commit == NULL) { + // This is a new commit that we dont have so make a new one + commit = new Commit(); - // Delete the previous pieces that are now in the new compacted piece - if (numberToDelete == pendingSendArbitrationRounds.size()) { - pendingSendArbitrationRounds.clear(); - } else { - for (int i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1); - } + // Insert this new commit into the live tables + commitForClientTable.put(part.getSequenceNumber(), commit); } - // Add the new compacted into the pending to send list - pendingSendArbitrationRounds.add(lastRound); - - // Should reinsert into the commit processor - if (hadCommit && gotNewCommit) { - return true; - } + // Add that part to the commit + commit.addPartDecode(part); } - - return false; } - // bool compactArbitrationData() { - // return false; - // } - /** - * Update all the commits and the committed tables, sets dead the dead transactions - */ - bool updateCommittedTable() { + // Clear all the new commits parts in preparation for the next time the server sends slots + newCommitParts.clear(); - if (newCommitParts.size() == 0) { - // Nothing new to process - return false; - } + // If we process a new commit keep track of it for future use + bool didProcessANewCommit = false; - // Iterate through all the machine Ids that we received new parts for - for (Long machineId : newCommitParts.keySet()) { - Hashtable, CommitPart> parts = newCommitParts.get(machineId); + // Process the commits one by one + for (Long arbitratorId : liveCommitsTable.keySet()) { - // Iterate through all the parts for that machine Id - for (Pair partId : parts.keySet()) { - CommitPart part = parts.get(partId); + // Get all the commits for a specific arbitrator + Hashtable commitForClientTable = liveCommitsTable.get(arbitratorId); - // Get the transaction object for that sequence number - Hashtable commitForClientTable = liveCommitsTable.get(part.getMachineId()); - - if (commitForClientTable == NULL) { - // This is the first commit from this device - commitForClientTable = new Hashtable(); - liveCommitsTable.put(part.getMachineId(), commitForClientTable); - } + // Sort the commits in order + Vector commitSequenceNumbers = new Vector(commitForClientTable.keySet()); + Collections.sort(commitSequenceNumbers); - Commit commit = commitForClientTable.get(part.getSequenceNumber()); + // Get the last commit seen from this arbitrator + int64_t lastCommitSeenSequenceNumber = -1; + if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != NULL) { + lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId); + } - if (commit == NULL) { - // This is a new commit that we dont have so make a new one - commit = new Commit(); + // Go through each new commit one by one + for (int i = 0; i < commitSequenceNumbers.size(); i++) { + Long commitSequenceNumber = commitSequenceNumbers.get(i); + Commit commit = commitForClientTable.get(commitSequenceNumber); - // Insert this new commit into the live tables - commitForClientTable.put(part.getSequenceNumber(), commit); + // Special processing if a commit is not complete + if (!commit.isComplete()) { + if (i == (commitSequenceNumbers.size() - 1)) { + // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits + break; + } else { + // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet). + // Delete it and move on + commit.setDead(); + commitForClientTable.remove(commit.getSequenceNumber()); + continue; } - - // Add that part to the commit - commit.addPartDecode(part); } - } - - // Clear all the new commits parts in preparation for the next time the server sends slots - newCommitParts.clear(); - - // If we process a new commit keep track of it for future use - bool didProcessANewCommit = false; - // Process the commits one by one - for (Long arbitratorId : liveCommitsTable.keySet()) { + // Update the last transaction that was updated if we can + if (commit.getTransactionSequenceNumber() != -1) { + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); - // Get all the commits for a specific arbitrator - Hashtable commitForClientTable = liveCommitsTable.get(arbitratorId); - - // Sort the commits in order - Vector commitSequenceNumbers = new Vector(commitForClientTable.keySet()); - Collections.sort(commitSequenceNumbers); - - // Get the last commit seen from this arbitrator - int64_t lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != NULL) { - lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId); + // Update the last transaction sequence number that the arbitrator arbitrated on + if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { + lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); + } } - // Go through each new commit one by one - for (int i = 0; i < commitSequenceNumbers.size(); i++) { - Long commitSequenceNumber = commitSequenceNumbers.get(i); - Commit commit = commitForClientTable.get(commitSequenceNumber); + // Update the last arbitration data that we have seen so far + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != NULL) { - // Special processing if a commit is not complete - if (!commit.isComplete()) { - if (i == (commitSequenceNumbers.size() - 1)) { - // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits - break; - } else { - // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet). - // Delete it and move on - commit.setDead(); - commitForClientTable.remove(commit.getSequenceNumber()); - continue; - } + int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()); + if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) { + // Is larger + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); } + } else { + // Never seen any data from this arbitrator so record the first one + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); + } + + // We have already seen this commit before so need to do the full processing on this commit + if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) { // Update the last transaction that was updated if we can if (commit.getTransactionSequenceNumber() != -1) { @@ -2081,655 +2024,629 @@ search: } } - // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != NULL) { - - int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()); - if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) { - // Is larger - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); - } - } else { - // Never seen any data from this arbitrator so record the first one - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); - } - - // We have already seen this commit before so need to do the full processing on this commit - if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) { - - // Update the last transaction that was updated if we can - if (commit.getTransactionSequenceNumber() != -1) { - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { - lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); - } - } - - continue; - } + continue; + } - // If we got here then this is a brand new commit and needs full processing + // If we got here then this is a brand new commit and needs full processing - // Get what commits should be edited, these are the commits that have live values for their keys - Set commitsToEdit = new HashSet(); - for (KeyValue kv : commit.getKeyValueUpdateSet()) { - commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey())); - } - commitsToEdit.remove(NULL); // remove NULL since it could be in this set + // Get what commits should be edited, these are the commits that have live values for their keys + Set commitsToEdit = new HashSet(); + for (KeyValue kv : commit.getKeyValueUpdateSet()) { + commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey())); + } + commitsToEdit.remove(NULL); // remove NULL since it could be in this set - // Update each previous commit that needs to be updated - for (Commit previousCommit : commitsToEdit) { + // Update each previous commit that needs to be updated + for (Commit previousCommit : commitsToEdit) { - // Only bother with live commits (TODO: Maybe remove this check) - if (previousCommit.isLive()) { + // Only bother with live commits (TODO: Maybe remove this check) + if (previousCommit.isLive()) { - // Update which keys in the old commits are still live - for (KeyValue kv : commit.getKeyValueUpdateSet()) { - previousCommit.invalidateKey(kv.getKey()); - } + // Update which keys in the old commits are still live + for (KeyValue kv : commit.getKeyValueUpdateSet()) { + previousCommit.invalidateKey(kv.getKey()); + } - // if the commit is now dead then remove it - if (!previousCommit.isLive()) { - commitForClientTable.remove(previousCommit); - } + // if the commit is now dead then remove it + if (!previousCommit.isLive()) { + commitForClientTable.remove(previousCommit); } } + } - // Update the last seen sequence number from this arbitrator - if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != NULL) { - if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) { - lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); - } - } else { + // Update the last seen sequence number from this arbitrator + if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != NULL) { + if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) { lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); } + } else { + lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); + } - // We processed a new commit that we havent seen before - didProcessANewCommit = true; + // We processed a new commit that we havent seen before + didProcessANewCommit = true; - // Update the committed table of keys and which commit is using which key - for (KeyValue kv : commit.getKeyValueUpdateSet()) { - committedKeyValueTable.put(kv.getKey(), kv); - liveCommitsByKeyTable.put(kv.getKey(), commit); - } + // Update the committed table of keys and which commit is using which key + for (KeyValue kv : commit.getKeyValueUpdateSet()) { + committedKeyValueTable.put(kv.getKey(), kv); + liveCommitsByKeyTable.put(kv.getKey(), commit); } } - - return didProcessANewCommit; } - /** - * Create the speculative table from transactions that are still live and have come from the cloud - */ - bool updateSpeculativeTable(bool didProcessNewCommits) { - if (liveTransactionBySequenceNumberTable.keySet().size() == 0) { - // There is nothing to speculate on - return false; - } - - // Create a list of the transaction sequence numbers and sort them from oldest to newest - Vector transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable.keySet()); - Collections.sort(transactionSequenceNumbersSorted); + return didProcessANewCommit; +} - bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn; +/** + * Create the speculative table from transactions that are still live and have come from the cloud + */ +bool Table::updateSpeculativeTable(bool didProcessNewCommits) { + if (liveTransactionBySequenceNumberTable.keySet().size() == 0) { + // There is nothing to speculate on + return false; + } + // Create a list of the transaction sequence numbers and sort them from oldest to newest + Vector transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable.keySet()); + Collections.sort(transactionSequenceNumbersSorted); - if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) { - // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction - // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch + bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn; - // Start from scratch - speculatedKeyValueTable.clear(); - lastTransactionSequenceNumberSpeculatedOn = -1; - oldestTransactionSequenceNumberSpeculatedOn = -1; - } + if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) { + // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction + // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch - // Remember the front of the transaction list - oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0); + // Start from scratch + speculatedKeyValueTable.clear(); + lastTransactionSequenceNumberSpeculatedOn = -1; + oldestTransactionSequenceNumberSpeculatedOn = -1; - // Find where to start arbitration from - int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; + } - if (startIndex >= transactionSequenceNumbersSorted.size()) { - // Make sure we are not out of bounds - return false; // did not speculate - } + // Remember the front of the transaction list + oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0); - Set incompleteTransactionArbitrator = new HashSet(); - bool didSkip = true; + // Find where to start arbitration from + int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; - for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) { - int64_t transactionSequenceNumber = transactionSequenceNumbersSorted.get(i); - Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); + if (startIndex >= transactionSequenceNumbersSorted.size()) { + // Make sure we are not out of bounds + return false; // did not speculate + } - if (!transaction.isComplete()) { - // If there is an incomplete transaction then there is nothing we can do - // add this transactions arbitrator to the list of arbitrators we should ignore - incompleteTransactionArbitrator.add(transaction.getArbitrator()); - didSkip = true; - continue; - } + Set incompleteTransactionArbitrator = new HashSet(); + bool didSkip = true; - if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) { - continue; - } + for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) { + int64_t transactionSequenceNumber = transactionSequenceNumbersSorted.get(i); + Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); - lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber; + if (!transaction.isComplete()) { + // If there is an incomplete transaction then there is nothing we can do + // add this transactions arbitrator to the list of arbitrators we should ignore + incompleteTransactionArbitrator.add(transaction.getArbitrator()); + didSkip = true; + continue; + } - if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { - // Guard evaluated to true so update the speculative table - for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - speculatedKeyValueTable.put(kv.getKey(), kv); - } - } + if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) { + continue; } - if (didSkip) { - // Since there was a skip we need to redo the speculation next time around - lastTransactionSequenceNumberSpeculatedOn = -1; - oldestTransactionSequenceNumberSpeculatedOn = -1; + lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber; + + if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { + // Guard evaluated to true so update the speculative table + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + speculatedKeyValueTable.put(kv.getKey(), kv); + } } + } - // We did some speculation - return true; + if (didSkip) { + // Since there was a skip we need to redo the speculation next time around + lastTransactionSequenceNumberSpeculatedOn = -1; + oldestTransactionSequenceNumberSpeculatedOn = -1; } - /** - * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer - */ - void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) { - if (pendingTransactionQueue.size() == 0) { - // There is nothing to speculate on - return; - } + // We did some speculation + return true; +} + +/** + * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer + */ +void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) { + if (pendingTransactionQueue.size() == 0) { + // There is nothing to speculate on + return; + } - if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) { - // need to reset on the pending speculation - lastPendingTransactionSpeculatedOn = NULL; - firstPendingTransaction = pendingTransactionQueue.get(0); - pendingTransactionSpeculatedKeyValueTable.clear(); - } + if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) { + // need to reset on the pending speculation + lastPendingTransactionSpeculatedOn = NULL; + firstPendingTransaction = pendingTransactionQueue.get(0); + pendingTransactionSpeculatedKeyValueTable.clear(); + } - // Find where to start arbitration from - int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1; + // Find where to start arbitration from + int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1; - if (startIndex >= pendingTransactionQueue.size()) { - // Make sure we are not out of bounds - return; - } + if (startIndex >= pendingTransactionQueue.size()) { + // Make sure we are not out of bounds + return; + } - for (int i = startIndex; i < pendingTransactionQueue.size(); i++) { - Transaction transaction = pendingTransactionQueue.get(i); + for (int i = startIndex; i < pendingTransactionQueue.size(); i++) { + Transaction transaction = pendingTransactionQueue.get(i); - lastPendingTransactionSpeculatedOn = transaction; + lastPendingTransactionSpeculatedOn = transaction; - if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { - // Guard evaluated to true so update the speculative table - for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv); - } + if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { + // Guard evaluated to true so update the speculative table + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv); } } } +} - /** - * Set dead and remove from the live transaction tables the transactions that are dead - */ - void updateLiveTransactionsAndStatus() { +/** + * Set dead and remove from the live transaction tables the transactions that are dead + */ +void Table::updateLiveTransactionsAndStatus() { - // Go through each of the transactions - for (Iterator > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) { - Transaction transaction = iter.next().getValue(); + // Go through each of the transactions + for (Iterator > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) { + Transaction transaction = iter.next().getValue(); - // Check if the transaction is dead - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction.getSequenceNumber())) { + // Check if the transaction is dead + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator()); + if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction.getSequenceNumber())) { - // Set dead the transaction - transaction.setDead(); + // Set dead the transaction + transaction.setDead(); - // Remove the transaction from the live table - iter.remove(); - liveTransactionByTransactionIdTable.remove(transaction.getId()); - } + // Remove the transaction from the live table + iter.remove(); + liveTransactionByTransactionIdTable.remove(transaction.getId()); } + } - // Go through each of the transactions - for (Iterator > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) { - TransactionStatus status = iter.next().getValue(); + // Go through each of the transactions + for (Iterator > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) { + TransactionStatus status = iter.next().getValue(); - // Check if the transaction is dead - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) { + // Check if the transaction is dead + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator()); + if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) { - // Set committed - status.setStatus(TransactionStatus.StatusCommitted); + // Set committed + status.setStatus(TransactionStatus.StatusCommitted); - // Remove - iter.remove(); - } + // Remove + iter.remove(); } } +} - /** - * Process this slot, entry by entry. Also update the latest message sent by slot - */ - void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet machineSet) { +/** + * Process this slot, entry by entry. Also update the latest message sent by slot + */ +void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet machineSet) { - // Update the last message seen - updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); + // Update the last message seen + updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); - // Process each entry in the slot - for (Entry entry : slot.getEntries()) { - switch (entry.getType()) { + // Process each entry in the slot + for (Entry entry : slot.getEntries()) { + switch (entry.getType()) { - case Entry.TypeCommitPart: - processEntry((CommitPart)entry); - break; + case Entry.TypeCommitPart: + processEntry((CommitPart)entry); + break; - case Entry.TypeAbort: - processEntry((Abort)entry); - break; + case Entry.TypeAbort: + processEntry((Abort)entry); + break; - case Entry.TypeTransactionPart: - processEntry((TransactionPart)entry); - break; + case Entry.TypeTransactionPart: + processEntry((TransactionPart)entry); + break; - case Entry.TypeNewKey: - processEntry((NewKey)entry); - break; + case Entry.TypeNewKey: + processEntry((NewKey)entry); + break; - case Entry.TypeLastMessage: - processEntry((LastMessage)entry, machineSet); - break; + case Entry.TypeLastMessage: + processEntry((LastMessage)entry, machineSet); + break; - case Entry.TypeRejectedMessage: - processEntry((RejectedMessage)entry, indexer); - break; + case Entry.TypeRejectedMessage: + processEntry((RejectedMessage)entry, indexer); + break; - case Entry.TypeTableStatus: - processEntry((TableStatus)entry, slot.getSequenceNumber()); - break; + case Entry.TypeTableStatus: + processEntry((TableStatus)entry, slot.getSequenceNumber()); + break; - default: - throw new Error("Unrecognized type: " + entry.getType()); - } + default: + throw new Error("Unrecognized type: " + entry.getType()); } } +} - /** - * Update the last message that was sent for a machine Id - */ - void processEntry(LastMessage entry, HashSet machineSet) { - // Update what the last message received by a machine was - updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); - } +/** + * Update the last message that was sent for a machine Id + */ +void Table::processEntry(LastMessage entry, HashSet machineSet) { + // Update what the last message received by a machine was + updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); +} - /** - * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) - */ - void processEntry(NewKey entry) { +/** + * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) + */ +void Table::processEntry(NewKey entry) { - // Update the arbitrator table with the new key information - arbitratorTable.put(entry.getKey(), entry.getMachineID()); + // Update the arbitrator table with the new key information + arbitratorTable.put(entry.getKey(), entry.getMachineID()); - // Update what the latest live new key is - NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry); - if (oldNewKey != NULL) { - // Delete the old new key messages - oldNewKey.setDead(); - } + // Update what the latest live new key is + NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry); + if (oldNewKey != NULL) { + // Delete the old new key messages + oldNewKey.setDead(); } +} - /** - * Process new table status entries and set dead the old ones as new ones come in. - * keeps track of the largest and smallest table status seen in this current round - * of updating the local copy of the block chain - */ - void processEntry(TableStatus entry, int64_t seq) { - int newNumSlots = entry.getMaxSlots(); - updateCurrMaxSize(newNumSlots); - - initExpectedSize(seq, newNumSlots); +/** + * Process new table status entries and set dead the old ones as new ones come in. + * keeps track of the largest and smallest table status seen in this current round + * of updating the local copy of the block chain + */ +void Table::processEntry(TableStatus entry, int64_t seq) { + int newNumSlots = entry.getMaxSlots(); + updateCurrMaxSize(newNumSlots); - if (liveTableStatus != NULL) { - // We have a larger table status so the old table status is no int64_ter alive - liveTableStatus.setDead(); - } + initExpectedSize(seq, newNumSlots); - // Make this new table status the latest alive table status - liveTableStatus = entry; + if (liveTableStatus != NULL) { + // We have a larger table status so the old table status is no int64_ter alive + liveTableStatus.setDead(); } - /** - * Check old messages to see if there is a block chain violation. Also - */ - void processEntry(RejectedMessage entry, SlotIndexer indexer) { - int64_t oldSeqNum = entry.getOldSeqNum(); - int64_t newSeqNum = entry.getNewSeqNum(); - bool isequal = entry.getEqual(); - int64_t machineId = entry.getMachineID(); - int64_t seq = entry.getSequenceNumber(); + // Make this new table status the latest alive table status + liveTableStatus = entry; +} +/** + * Check old messages to see if there is a block chain violation. Also + */ +void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { + int64_t oldSeqNum = entry.getOldSeqNum(); + int64_t newSeqNum = entry.getNewSeqNum(); + bool isequal = entry.getEqual(); + int64_t machineId = entry.getMachineID(); + int64_t seq = entry.getSequenceNumber(); - // Check if we have messages that were supposed to be rejected in our local block chain - for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) { - // Get the slot - Slot slot = indexer.getSlot(seqNum); + // Check if we have messages that were supposed to be rejected in our local block chain + for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) { - if (slot != NULL) { - // If we have this slot make sure that it was not supposed to be a rejected slot + // Get the slot + Slot slot = indexer.getSlot(seqNum); - int64_t slotMachineId = slot.getMachineID(); - if (isequal != (slotMachineId == machineId)) { - throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum); - } + if (slot != NULL) { + // If we have this slot make sure that it was not supposed to be a rejected slot + + int64_t slotMachineId = slot.getMachineID(); + if (isequal != (slotMachineId == machineId)) { + throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum); } } + } - // Create a list of clients to watch until they see this rejected message entry. - HashSet deviceWatchSet = new HashSet(); - for (Map.Entry > lastMessageEntry : lastMessageTable.entrySet()) { - - // Machine ID for the last message entry - int64_t lastMessageEntryMachineId = lastMessageEntry.getKey(); + // Create a list of clients to watch until they see this rejected message entry. + HashSet deviceWatchSet = new HashSet(); + for (Map.Entry > lastMessageEntry : lastMessageTable.entrySet()) { - // We've seen it, don't need to continue to watch. Our next - // message will implicitly acknowledge it. - if (lastMessageEntryMachineId == localMachineId) { - continue; - } + // Machine ID for the last message entry + int64_t lastMessageEntryMachineId = lastMessageEntry.getKey(); - Pair lastMessageValue = lastMessageEntry.getValue(); - int64_t entrySequenceNumber = lastMessageValue.getFirst(); + // We've seen it, don't need to continue to watch. Our next + // message will implicitly acknowledge it. + if (lastMessageEntryMachineId == localMachineId) { + continue; + } - if (entrySequenceNumber < seq) { + Pair lastMessageValue = lastMessageEntry.getValue(); + int64_t entrySequenceNumber = lastMessageValue.getFirst(); - // Add this rejected message to the set of messages that this machine ID did not see yet - addWatchVector(lastMessageEntryMachineId, entry); + if (entrySequenceNumber < seq) { - // This client did not see this rejected message yet so add it to the watch set to monitor - deviceWatchSet.add(lastMessageEntryMachineId); - } - } + // Add this rejected message to the set of messages that this machine ID did not see yet + addWatchVector(lastMessageEntryMachineId, entry); - if (deviceWatchSet.isEmpty()) { - // This rejected message has been seen by all the clients so - entry.setDead(); - } else { - // We need to watch this rejected message - entry.setWatchSet(deviceWatchSet); + // This client did not see this rejected message yet so add it to the watch set to monitor + deviceWatchSet.add(lastMessageEntryMachineId); } } - /** - * Check if this abort is live, if not then save it so we can kill it later. - * update the last transaction number that was arbitrated on. - */ - void processEntry(Abort entry) { + if (deviceWatchSet.isEmpty()) { + // This rejected message has been seen by all the clients so + entry.setDead(); + } else { + // We need to watch this rejected message + entry.setWatchSet(deviceWatchSet); + } +} +/** + * Check if this abort is live, if not then save it so we can kill it later. + * update the last transaction number that was arbitrated on. + */ +void Table::processEntry(Abort entry) { - if (entry.getTransactionSequenceNumber() != -1) { - // update the transaction status if it was sent to the server - TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); - if (status != NULL) { - status.setStatus(TransactionStatus.StatusAborted); - } - } - // Abort has not been seen by the client it is for yet so we need to keep track of it - Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry); - if (previouslySeenAbort != NULL) { - previouslySeenAbort.setDead();// Delete old version of the abort since we got a rescued newer version + if (entry.getTransactionSequenceNumber() != -1) { + // update the transaction status if it was sent to the server + TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); + if (status != NULL) { + status.setStatus(TransactionStatus.StatusAborted); } + } - if (entry.getTransactionArbitrator() == localMachineId) { - liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry); - } + // Abort has not been seen by the client it is for yet so we need to keep track of it + Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry); + if (previouslySeenAbort != NULL) { + previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version + } - if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) { + if (entry.getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry); + } - // The machine already saw this so it is dead - entry.setDead(); - liveAbortTable.remove(entry.getAbortId()); + if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) { - if (entry.getTransactionArbitrator() == localMachineId) { - liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber()); - } + // The machine already saw this so it is dead + entry.setDead(); + liveAbortTable.remove(entry.getAbortId()); - return; + if (entry.getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber()); } + return; + } - // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != NULL) { - int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()); - if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) { - // Is larger - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); - } - } else { - // Never seen any data from this arbitrator so record the first one + // Update the last arbitration data that we have seen so far + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != NULL) { + + int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()); + if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) { + // Is larger lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); } + } else { + // Never seen any data from this arbitrator so record the first one + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); + } - // Set dead a transaction if we can - Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber())); - if (transactionToSetDead != NULL) { - liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber()); - } + // Set dead a transaction if we can + Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber())); + if (transactionToSetDead != NULL) { + liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber()); + } - // Update the last transaction sequence number that the arbitrator arbitrated on - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator()); - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { + // Update the last transaction sequence number that the arbitrator arbitrated on + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator()); + if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { - // Is a valid one - if (entry.getTransactionSequenceNumber() != -1) { - lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber()); - } + // Is a valid one + if (entry.getTransactionSequenceNumber() != -1) { + lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber()); } } +} - /** - * Set dead the transaction part if that transaction is dead and keep track of all new parts - */ - void processEntry(TransactionPart entry) { - // Check if we have already seen this transaction and set it dead OR if it is not alive - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry.getSequenceNumber())) { - // This transaction is dead, it was already committed or aborted - entry.setDead(); - return; - } +/** + * Set dead the transaction part if that transaction is dead and keep track of all new parts + */ +void Table::processEntry(TransactionPart entry) { + // Check if we have already seen this transaction and set it dead OR if it is not alive + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId()); + if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry.getSequenceNumber())) { + // This transaction is dead, it was already committed or aborted + entry.setDead(); + return; + } - // This part is still alive - Hashtable, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId()); + // This part is still alive + Hashtable, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId()); - if (transactionPart == NULL) { - // Dont have a table for this machine Id yet so make one - transactionPart = new Hashtable, TransactionPart>(); - newTransactionParts.put(entry.getMachineId(), transactionPart); - } + if (transactionPart == NULL) { + // Dont have a table for this machine Id yet so make one + transactionPart = new Hashtable, TransactionPart>(); + newTransactionParts.put(entry.getMachineId(), transactionPart); + } - // Update the part and set dead ones we have already seen (got a rescued version) - TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry); - if (previouslySeenPart != NULL) { - previouslySeenPart.setDead(); - } + // Update the part and set dead ones we have already seen (got a rescued version) + TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry); + if (previouslySeenPart != NULL) { + previouslySeenPart.setDead(); } +} - /** - * Process new commit entries and save them for future use. Delete duplicates - */ - void processEntry(CommitPart entry) { +/** + * Process new commit entries and save them for future use. Delete duplicates + */ +void Table::processEntry(CommitPart entry) { - // Update the last transaction that was updated if we can - if (entry.getTransactionSequenceNumber() != -1) { - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId()); + // Update the last transaction that was updated if we can + if (entry.getTransactionSequenceNumber() != -1) { + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId()); - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { - lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber()); - } + // Update the last transaction sequence number that the arbitrator arbitrated on + if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { + lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber()); } + } - Hashtable, CommitPart> commitPart = newCommitParts.get(entry.getMachineId()); + Hashtable, CommitPart> commitPart = newCommitParts.get(entry.getMachineId()); - if (commitPart == NULL) { - // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable, CommitPart>(); - newCommitParts.put(entry.getMachineId(), commitPart); - } + if (commitPart == NULL) { + // Don't have a table for this machine Id yet so make one + commitPart = new Hashtable, CommitPart>(); + newCommitParts.put(entry.getMachineId(), commitPart); + } - // Update the part and set dead ones we have already seen (got a rescued version) - CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry); - if (previouslySeenPart != NULL) { - previouslySeenPart.setDead(); - } + // Update the part and set dead ones we have already seen (got a rescued version) + CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry); + if (previouslySeenPart != NULL) { + previouslySeenPart.setDead(); } +} - /** - * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them. - * Updates the live aborts, removes those that are dead and sets them dead. - * Check that the last message seen is correct and that there is no mismatch of our own last message or that - * other clients have not had a rollback on the last message. - */ - void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet machineSet) { +/** + * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them. + * Updates the live aborts, removes those that are dead and sets them dead. + * Check that the last message seen is correct and that there is no mismatch of our own last message or that + * other clients have not had a rollback on the last message. + */ +void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet machineSet) { - // We have seen this machine ID - machineSet.remove(machineId); + // We have seen this machine ID + machineSet.remove(machineId); - // Get the set of rejected messages that this machine Id is has not seen yet - HashSet watchset = rejectedMessageWatchVectorTable.get(machineId); + // Get the set of rejected messages that this machine Id is has not seen yet + HashSet watchset = rejectedMessageWatchVectorTable.get(machineId); - // If there is a rejected message that this machine Id has not seen yet - if (watchset != NULL) { + // If there is a rejected message that this machine Id has not seen yet + if (watchset != NULL) { - // Go through each rejected message that this machine Id has not seen yet - for (Iterator rmit = watchset.iterator(); rmit.hasNext(); ) { + // Go through each rejected message that this machine Id has not seen yet + for (Iterator rmit = watchset.iterator(); rmit.hasNext(); ) { - RejectedMessage rm = rmit.next(); + RejectedMessage rm = rmit.next(); - // If this machine Id has seen this rejected message... - if (rm.getSequenceNumber() <= seqNum) { + // If this machine Id has seen this rejected message... + if (rm.getSequenceNumber() <= seqNum) { - // Remove it from our watchlist - rmit.remove(); + // Remove it from our watchlist + rmit.remove(); - // Decrement machines that need to see this notification - rm.removeWatcher(machineId); - } + // Decrement machines that need to see this notification + rm.removeWatcher(machineId); } } + } - // Set dead the abort - for (Iterator, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) { - Abort abort = i.next().getValue(); + // Set dead the abort + for (Iterator, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) { + Abort abort = i.next().getValue(); - if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) { - abort.setDead(); - i.remove(); + if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) { + abort.setDead(); + i.remove(); - if (abort.getTransactionArbitrator() == localMachineId) { - liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber()); - } + if (abort.getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber()); } } + } - if (machineId == localMachineId) { - // Our own messages are immediately dead. - if (liveness instanceof LastMessage) { - ((LastMessage)liveness).setDead(); - } else if (liveness instanceof Slot) { - ((Slot)liveness).setDead(); - } else { - throw new Error("Unrecognized type"); - } + if (machineId == localMachineId) { + // Our own messages are immediately dead. + if (liveness instanceof LastMessage) { + ((LastMessage)liveness).setDead(); + } else if (liveness instanceof Slot) { + ((Slot)liveness).setDead(); + } else { + throw new Error("Unrecognized type"); } + } - // Get the old last message for this device - Pair lastMessageEntry = lastMessageTable.put(machineId, new Pair(seqNum, liveness)); - if (lastMessageEntry == NULL) { - // If no last message then there is nothing else to process - return; - } + // Get the old last message for this device + Pair lastMessageEntry = lastMessageTable.put(machineId, new Pair(seqNum, liveness)); + if (lastMessageEntry == NULL) { + // If no last message then there is nothing else to process + return; + } - int64_t lastMessageSeqNum = lastMessageEntry.getFirst(); - Liveness lastEntry = lastMessageEntry.getSecond(); + int64_t lastMessageSeqNum = lastMessageEntry.getFirst(); + Liveness lastEntry = lastMessageEntry.getSecond(); - // If it is not our machine Id since we already set ours to dead - if (machineId != localMachineId) { - if (lastEntry instanceof LastMessage) { - ((LastMessage)lastEntry).setDead(); - } else if (lastEntry instanceof Slot) { - ((Slot)lastEntry).setDead(); - } else { - throw new Error("Unrecognized type"); - } + // If it is not our machine Id since we already set ours to dead + if (machineId != localMachineId) { + if (lastEntry instanceof LastMessage) { + ((LastMessage)lastEntry).setDead(); + } else if (lastEntry instanceof Slot) { + ((Slot)lastEntry).setDead(); + } else { + throw new Error("Unrecognized type"); } + } - // Make sure the server is not playing any games - if (machineId == localMachineId) { - - if (hadPartialSendToServer) { - // We were not making any updates and we had a machine mismatch - if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum); - } + // Make sure the server is not playing any games + if (machineId == localMachineId) { - } else { - // We were not making any updates and we had a machine mismatch - if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum); - } + if (hadPartialSendToServer) { + // We were not making any updates and we had a machine mismatch + if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { + throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum); } + } else { - if (lastMessageSeqNum > seqNum) { - throw new Error("Server Error: Rollback on remote machine sequence number"); + // We were not making any updates and we had a machine mismatch + if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { + throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum); } } + } else { + if (lastMessageSeqNum > seqNum) { + throw new Error("Server Error: Rollback on remote machine sequence number"); + } } +} - /** - * Add a rejected message entry to the watch set to keep track of which clients have seen that - * rejected message entry and which have not. - */ - void addWatchVector(int64_t machineId, RejectedMessage entry) { - HashSet entries = rejectedMessageWatchVectorTable.get(machineId); - if (entries == NULL) { - // There is no set for this machine ID yet so create one - entries = new HashSet(); - rejectedMessageWatchVectorTable.put(machineId, entries); - } - entries.add(entry); +/** + * Add a rejected message entry to the watch set to keep track of which clients have seen that + * rejected message entry and which have not. + */ +void Table::addWatchVector(int64_t machineId, RejectedMessage entry) { + HashSet entries = rejectedMessageWatchVectorTable.get(machineId); + if (entries == NULL) { + // There is no set for this machine ID yet so create one + entries = new HashSet(); + rejectedMessageWatchVectorTable.put(machineId, entries); } + entries.add(entry); +} - /** - * Check if the HMAC chain is not violated - */ - void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) { - for (int i = 0; i < newSlots.length; i++) { - Slot currSlot = newSlots[i]; - Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1); - if (prevSlot != NULL && - !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC())) - throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot); - } +/** + * Check if the HMAC chain is not violated + */ +void Table::checkHMACChain(SlotIndexer indexer, Slot[] newSlots) { + for (int i = 0; i < newSlots.length; i++) { + Slot currSlot = newSlots[i]; + Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1); + if (prevSlot != NULL && + !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC())) + throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot); } } + diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index 0cee839..2da4759 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -225,7 +225,7 @@ class Transaction { dataSize += tp.getDataSize(); } - char[] combinedData = new char[dataSize]; + Array *combinedData = new char[dataSize]; int currentPosition = 0; // Stitch all the data sections together diff --git a/version2/src/C/Transaction.h b/version2/src/C/Transaction.h index 04fbb9a..31d47cd 100644 --- a/version2/src/C/Transaction.h +++ b/version2/src/C/Transaction.h @@ -227,7 +227,7 @@ private: dataSize += tp.getDataSize(); } - char[] combinedData = new char[dataSize]; + Array *combinedData = new char[dataSize]; int currentPosition = 0; // Stitch all the data sections together diff --git a/version2/src/C/common.h b/version2/src/C/common.h index 764c2f2..f23a526 100644 --- a/version2/src/C/common.h +++ b/version2/src/C/common.h @@ -20,8 +20,21 @@ class Commit; class CommitPart; class ArbitrationRound; class KeyValue; -class IoTString; class RejectedMessage; class PendingTransaction; +class CloudComm; +class IoTString; +class LastMessage; +class LocalComm; +class NewKey; +class SlotBuffer; +class SlotIndexer; +class Table; +class TableStatus; +class ThreeTuple; +class TimingSingleton; +class Transaction; +class TransactionPart; +class TransactionStatus; #endif