final class Table {
/* Constants */
- static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
+ static final int FREE_SLOTS = 2;// Number of slots that should be kept free // 10
static final int SKIP_THRESHOLD = 10;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
CloudComm cloud = NULL;
Random random = NULL;
TableStatus liveTableStatus = NULL;
- PendingTransaction pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
- Transaction lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
- Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list
+ PendingTransaction pendingTransactionBuilder = NULL;// Pending Transaction used in building a Pending Transaction
+ Transaction lastPendingTransactionSpeculatedOn = NULL;// Last transaction that was speculated on from the pending transaction
+ Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list
/* Variables */
- int numberOfSlots = 0; // Number of slots stored in buffer
- int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
- int64_t liveSlotCount = 0; // Number of currently live slots
+ int numberOfSlots = 0; // Number of slots stored in buffer
+ int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
+ int64_t liveSlotCount = 0;// Number of currently live slots
int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
- int64_t localMachineId = 0; // Machine ID of this client device
- int64_t sequenceNumber = 0; // Largest sequence number a client has received
+ int64_t localMachineId = 0; // Machine ID of this client device
+ int64_t sequenceNumber = 0; // Largest sequence number a client has received
int64_t localSequenceNumber = 0;
// int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
// int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
- int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
- int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
- int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
+ int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
+ int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
+ int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
int64_t localArbitrationSequenceNumber = 0;
bool hadPartialSendToServer = false;
bool attemptedToSendToServer = false;
Slot lastSlotAttemptedToSend = NULL;
bool lastIsNewKey = false;
int lastNewSize = 0;
- Map<Transaction, List<Integer>> lastTransactionPartsSent = NULL;
- List<Entry> lastPendingSendArbitrationEntriesToDelete = NULL;
+ Hashtable<Transaction, Vector<int32_t> > lastTransactionPartsSent = NULL;
+ Vector<Entry> lastPendingSendArbitrationEntriesToDelete = NULL;
NewKey lastNewKey = NULL;
/* Data Structures */
- Map<IoTString, KeyValue> committedKeyValueTable = NULL; // Table of committed key value pairs
- Map<IoTString, KeyValue> speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
- Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
- Map<IoTString, NewKey> liveNewKeyTable = NULL; // Table of live new keys
- HashMap<Long, Pair<Long, Liveness>> lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
- HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
- Map<IoTString, Long> arbitratorTable = NULL; // Table of keys and their arbitrators
- Map<Pair<int64_t, int64_t>, Abort> liveAbortTable = NULL; // Table live abort messages
- Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
- Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
- Map<int64_t, int64_t> lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
- Map<Long, Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
- Map<Pair<int64_t, int64_t>, Transaction> liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
- Map<Long, Map<Long, Commit>> liveCommitsTable = NULL;
- Map<IoTString, Commit> liveCommitsByKeyTable = NULL;
- Map<int64_t, int64_t> lastCommitSeenSequenceNumberByArbitratorTable = NULL;
- Vector<Long> rejectedSlotList = NULL; // List of rejected slots that have yet to be sent to the server
- List<Transaction> pendingTransactionQueue = NULL;
- List<ArbitrationRound> pendingSendArbitrationRounds = NULL;
- List<Entry> pendingSendArbitrationEntriesToDelete = NULL;
- Map<Transaction, List<Integer>> transactionPartsSent = NULL;
- Map<Long, TransactionStatus> outstandingTransactionStatus = NULL;
- Map<Long, Abort> liveAbortsGeneratedByLocal = NULL;
- Set<Pair<int64_t, int64_t>> offlineTransactionsCommittedAndAtServer = NULL;
- Map<Long, Pair<String, Integer>> localCommunicationTable = NULL;
- Map<int64_t, int64_t> lastTransactionSeenFromMachineFromServer = NULL;
- Map<int64_t, int64_t> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
+ Hashtable<IoTString, KeyValue> committedKeyValueTable = NULL; // Table of committed key value pairs
+ Hashtable<IoTString, KeyValue> speculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value
+ Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value from the pending transactions
+ Hashtable<IoTString, NewKey> liveNewKeyTable = NULL;// Table of live new keys
+ Hashtable<int64_t Pair<int64_t Liveness> > lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+ Hashtable<int64_t HashSet<RejectedMessage> > rejectedMessageWatchVectorTable = NULL;// Table of machine Ids and the set of rejected messages they have not seen yet
+ Hashtable<IoTString, Long> arbitratorTable = NULL;// Table of keys and their arbitrators
+ Hashtable<Pair<int64_t, int64_t>, Abort> liveAbortTable = NULL; // Table live abort messages
+ Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> > newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> > newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, int64_t> lastArbitratedTransactionNumberByArbitratorTable = NULL;// Last transaction sequence number that an arbitrator arbitrated on
+ Hashtable<int64_t Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
+ Hashtable<Pair<int64_t, int64_t>, Transaction> liveTransactionByTransactionIdTable = NULL;// live transaction grouped by the transaction ID
+ Hashtable<int64_t Hashtable<int64_t Commit> > liveCommitsTable = NULL;
+ Hashtable<IoTString, Commit> liveCommitsByKeyTable = NULL;
+ Hashtable<int64_t, int64_t> lastCommitSeenSequenceNumberByArbitratorTable = NULL;
+ Vector<Long> rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
+ Vector<Transaction> pendingTransactionQueue = NULL;
+ Vector<ArbitrationRound> pendingSendArbitrationRounds = NULL;
+ Vector<Entry> pendingSendArbitrationEntriesToDelete = NULL;
+ Hashtable<Transaction, Vector<int32_t> > transactionPartsSent = NULL;
+ Hashtable<int64_t TransactionStatus> outstandingTransactionStatus = NULL;
+ Hashtable<int64_t Abort> liveAbortsGeneratedByLocal = NULL;
+ Set<Pair<int64_t, int64_t> > offlineTransactionsCommittedAndAtServer = NULL;
+ Hashtable<int64_t Pair<String, int32_t> > localCommunicationTable = NULL;
+ Hashtable<int64_t, int64_t> lastTransactionSeenFromMachineFromServer = NULL;
+ Hashtable<int64_t, int64_t> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) {
oldestLiveSlotSequenceNumver = 1;
// init data structs
- committedKeyValueTable = new HashMap<IoTString, KeyValue>();
- speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
- pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
- liveNewKeyTable = new HashMap<IoTString, NewKey>();
- lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
- rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
- arbitratorTable = new HashMap<IoTString, Long>();
- liveAbortTable = new HashMap<Pair<int64_t, int64_t>, Abort>();
- newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
- newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
- lastArbitratedTransactionNumberByArbitratorTable = new HashMap<int64_t, int64_t>();
- liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
- liveTransactionByTransactionIdTable = new HashMap<Pair<int64_t, int64_t>, Transaction>();
- liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
- liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
- lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<int64_t, int64_t>();
- rejectedSlotList = new Vector<Long>();
- pendingTransactionQueue = new ArrayList<Transaction>();
- pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
- transactionPartsSent = new HashMap<Transaction, List<Integer>>();
- outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
- liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
- offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t>>();
- localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
- lastTransactionSeenFromMachineFromServer = new HashMap<int64_t, int64_t>();
- pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
- lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<int64_t, int64_t>();
+ committedKeyValueTable = new Hashtable<IoTString, KeyValue>();
+ speculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
+ pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
+ liveNewKeyTable = new Hashtable<IoTString, NewKey>();
+ lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
+ rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
+ arbitratorTable = new Hashtable<IoTString, Long>();
+ liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
+ newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
+ newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> >();
+ lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
+ liveTransactionBySequenceNumberTable = new Hashtable<int64_t Transaction>();
+ liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction>();
+ liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit> >();
+ liveCommitsByKeyTable = new Hashtable<IoTString, Commit>();
+ lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
+ rejectedSlotVector = new Vector<Long>();
+ pendingTransactionQueue = new Vector<Transaction>();
+ pendingSendArbitrationEntriesToDelete = new Vector<Entry>();
+ transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
+ outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
+ liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
+ offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
+ localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
+ lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
+ pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
// Other init stuff
}
// String toString() {
- // String retString = " Committed Table: \n";
- // retString += "---------------------------\n";
- // retString += commitedTable.toString();
+ // String retString = " Committed Table: \n";
+ // retString += "---------------------------\n";
+ // retString += commitedTable.toString();
- // retString += "\n\n";
+ // retString += "\n\n";
- // retString += " Speculative Table: \n";
- // retString += "---------------------------\n";
- // retString += speculativeTable.toString();
+ // retString += " Speculative Table: \n";
+ // retString += "---------------------------\n";
+ // retString += speculativeTable.toString();
- // return retString;
+ // return retString;
// }
synchronized void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) {
- localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
+ localCommunicationTable.put(arbitrator, new Pair<String, int32_t>(hostName, portNumber));
}
synchronized Long getArbitrator(IoTString key) {
continue;
}
- Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
+ Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
if (sendReturn.getFirst()) {
// Failed to contact over local
Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
if (newSlots.length == 0) {
fromRetry = true;
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+ ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
if (sendSlotsReturn.getFirst()) {
if (newKey != NULL) {
localSequenceNumber++;
// Try to fill the slot with data
- ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
+ ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
bool needsResize = fillSlotsReturn.getFirst();
int newSize = fillSlotsReturn.getSecond();
- Boolean insertedNewKey = fillSlotsReturn.getThird();
+ bool insertedNewKey = fillSlotsReturn.getThird();
if (needsResize) {
// Reset which transaction to send
lastInsertedNewKey = insertedNewKey;
lastNewSize = newSize;
lastNewKey = newKey;
- lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
- lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+ lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >(transactionPartsSent);
+ lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
+ ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
if (sendSlotsReturn.getFirst()) {
} else {
// if (!sendSlotsReturn.getSecond()) {
- // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- // transaction.resetServerFailure();
- // }
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
+ // }
// } else {
- // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- // transaction.resetServerFailure();
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
- // // Update which transactions parts still need to be sent
- // transaction.removeSentParts(transactionPartsSent.get(transaction));
+ // // Update which transactions parts still need to be sent
+ // transaction.removeSentParts(transactionPartsSent.get(transaction));
- // // Add the transaction status to the outstanding list
- // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+ // // Add the transaction status to the outstanding list
+ // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
- // // Update the transaction status
- // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+ // // Update the transaction status
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
- // // Check if all the transaction parts were successfully sent and if so then remove it from pending
- // if (transaction.didSendAllParts()) {
- // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
- // pendingTransactionQueue.remove(transaction);
+ // // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ // if (transaction.didSendAllParts()) {
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ // pendingTransactionQueue.remove(transaction);
- // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
- // }
- // }
- // }
+ // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
+ // }
+ // }
+ // }
// }
// Reset which transaction to send
// if (!fromRetry) {
- // lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
- // lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+ // lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
+ // lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
// }
// Nothing was able to be sent to the server so just clear these data structures
}
synchronized bool updateFromLocal(int64_t machineId) {
- Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
+ Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(machineId);
if (localCommunicationInformation == NULL) {
// Cant talk to that device locally so do nothing
return false;
// Get the size of the send data
int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
- Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1;
+ Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) {
lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
}
return true;
}
- Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
+ Pair<bool, bool> sendTransactionToLocal(Transaction transaction) {
// Get the devices local communications
- Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
+ Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
if (localCommunicationInformation == NULL) {
// Cant talk to that device locally so do nothing
- return new Pair<Boolean, Boolean>(true, false);
+ return new Pair<bool, bool>(true, false);
}
// Get the size of the send data
sendDataSize += part.getSize();
}
- Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1;
+ Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) {
lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
}
if (returnData == NULL) {
// Could not contact server
- return new Pair<Boolean, Boolean>(true, false);
+ return new Pair<bool, bool>(true, false);
}
// Decode the data
}
}
- return new Pair<Boolean, Boolean>(false, true);
+ return new Pair<bool, bool>(false, true);
}
synchronized char[] acceptDataFromLocal(char[] data) {
}
// Arbitrate on transaction and pull relevant return data
- Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
+ Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
couldArbitrate = localArbitrateReturn.getFirst();
didCommit = localArbitrateReturn.getSecond();
// The data to send back
int returnDataSize = 0;
- List<Entry> unseenArbitrations = new ArrayList<Entry>();
+ Vector<Entry> unseenArbitrations = new Vector<Entry>();
// Get the aborts to send back
- List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
+ Vector<Long> abortLocalSequenceNumbers = new Vector<Long >(liveAbortsGeneratedByLocal.keySet());
Collections.sort(abortLocalSequenceNumbers);
for (Long localSequenceNumber : abortLocalSequenceNumbers) {
if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
}
// Get the commits to send back
- Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
+ Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
if (commitForClientTable != NULL) {
- List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
+ Vector<Long> commitLocalSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
Collections.sort(commitLocalSequenceNumbers);
for (Long localSequenceNumber : commitLocalSequenceNumbers) {
// Number of arbitration entries to decode
returnDataSize += 2 * sizeof(int32_t);
- // Boolean of did commit or not
+ // bool of did commit or not
if (numberOfParts != 0) {
returnDataSize += sizeof(char);
}
return returnData;
}
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException {
+ ThreeTuple<bool, bool, Slot[]> sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException {
bool attemptedToSendToServerTmp = attemptedToSendToServer;
attemptedToSendToServer = true;
Slot[] array = cloud.putSlot(slot, newSize);
if (array == NULL) {
array = new Slot[] {slot};
- rejectedSlotList.clear();
+ rejectedSlotVector.clear();
inserted = true;
- } else {
+ } else {
if (array.length == 0) {
throw new Error("Server Error: Did not send any slots");
}
}
if (!isInserted) {
- rejectedSlotList.add(slot.getSequenceNumber());
+ rejectedSlotVector.add(slot.getSequenceNumber());
lastTryInserted = false;
} else {
lastTryInserted = true;
}
} else {
- rejectedSlotList.add(slot.getSequenceNumber());
+ rejectedSlotVector.add(slot.getSequenceNumber());
lastTryInserted = false;
}
}
- return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
+ return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
}
/**
* Returns false if a resize was needed
*/
- ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
+ ThreeTuple<bool, int32_t, bool> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
int newSize = 0;
if (liveSlotCount > bufferResizeThreshold) {
- resize = true; //Resize is forced
+ resize = true;//Resize is forced
}
doRejectedMessages(slot);
// Do mandatory rescue of entries
- ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
+ ThreeTuple<bool, bool, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
// Extract working variables
bool needsResize = mandatoryRescueReturn.getFirst();
if (needsResize && !resize) {
// We need to resize but we are not resizing so return false
- return new ThreeTuple<Boolean, Integer, Boolean>(true, NULL, NULL);
+ return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
}
bool inserted = false;
for (ArbitrationRound round : pendingSendArbitrationRounds) {
bool isFull = false;
round.generateParts();
- List<Entry> parts = round.getParts();
+ Vector<Entry> parts = round.getParts();
// Insert pending arbitration data
for (Entry arbitrationData : parts) {
// Set the transaction sequence number if it has yet to be inserted into the block chain
// if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
- // transaction.setSequenceNumber(slot.getSequenceNumber());
+ // transaction.setSequenceNumber(slot.getSequenceNumber());
// }
if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
if (slot.hasSpace(part)) {
slot.addEntry(part);
- List<Integer> partsSent = transactionPartsSent.get(transaction);
+ Vector<int32_t> partsSent = transactionPartsSent.get(transaction);
if (partsSent == NULL) {
- partsSent = new ArrayList<Integer>();
+ partsSent = new Vector<int32_t>();
transactionPartsSent.put(transaction, partsSent);
}
partsSent.add(part.getPartNumber());
// Fill the remainder of the slot with rescue data
doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
- return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
+ return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
}
void doRejectedMessages(Slot s) {
- if (! rejectedSlotList.isEmpty()) {
+ if (!rejectedSlotVector.isEmpty()) {
/* TODO: We should avoid generating a rejected message entry if
* there is already a sufficient entry in the queue (e.g.,
* equalsto value of true and same sequence number). */
- int64_t old_seqn = rejectedSlotList.firstElement();
- if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
- int64_t new_seqn = rejectedSlotList.lastElement();
+ int64_t old_seqn = rejectedSlotVector.firstElement();
+ if (rejectedSlotVector.size() > REJECTED_THRESHOLD) {
+ int64_t new_seqn = rejectedSlotVector.lastElement();
RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
s.addEntry(rm);
} else {
int64_t prev_seqn = -1;
int i = 0;
/* Go through list of missing messages */
- for (; i < rejectedSlotList.size(); i++) {
- int64_t curr_seqn = rejectedSlotList.get(i);
+ for (; i < rejectedSlotVector.size(); i++) {
+ int64_t curr_seqn = rejectedSlotVector.get(i);
Slot s_msg = buffer.getSlot(curr_seqn);
if (s_msg != NULL)
break;
s.addEntry(rm);
}
/* Generate rejected message entries for present messages */
- for (; i < rejectedSlotList.size(); i++) {
- int64_t curr_seqn = rejectedSlotList.get(i);
+ for (; i < rejectedSlotVector.size(); i++) {
+ int64_t curr_seqn = rejectedSlotVector.get(i);
Slot s_msg = buffer.getSlot(curr_seqn);
int64_t machineid = s_msg.getMachineID();
RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
}
}
- ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, bool resize) {
+ ThreeTuple<bool, bool, Long> doMandatoryResuce(Slot slot, bool resize) {
int64_t newestSequenceNumber = buffer.getNewestSeqNum();
int64_t oldestSequenceNumber = buffer.getOldestSeqNum();
if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
Slot previousSlot = buffer.getSlot(currentSequenceNumber);
// Push slot number forward
- if (! seenLiveSlot) {
+ if (!seenLiveSlot) {
oldestLiveSlotSequenceNumver = currentSequenceNumber;
}
slot.addEntry(liveEntry);
} else if (currentSequenceNumber == firstIfFull) {
//if there's no space but the entry is about to fall off the queue
- System.out.println("B"); //?
- return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
+ System.out.println("B");//?
+ return new ThreeTuple<bool, bool, Long>(true, seenLiveSlot, currentSequenceNumber);
}
}
}
// Did not resize
- return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
+ return new ThreeTuple<bool, bool, Long>(false, seenLiveSlot, currentSequenceNumber);
}
void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) {
* for SKIP_THRESHOLD consecutive entries*/
int skipcount = 0;
int64_t newestseqnum = buffer.getNewestSeqNum();
- search:
+search:
for (; seqn <= newestseqnum; seqn++) {
Slot prevslot = buffer.getSlot(seqn);
//Push slot number forward
// Iterate through all the machine Ids that we received new parts for
for (Long machineId : newTransactionParts.keySet()) {
- Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
+ Hashtable<Pair<int64_t int32_t>, TransactionPart> parts = newTransactionParts.get(machineId);
// Iterate through all the parts for that machine Id
- for (Pair<Long, Integer> partId : parts.keySet()) {
+ for (Pair<int64_t int32_t> partId : parts.keySet()) {
TransactionPart part = parts.get(partId);
Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
}
// Get the transaction sequence numbers and sort from oldest to newest
- List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
+ Vector<Long> transactionSequenceNumbers = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
Collections.sort(transactionSequenceNumbers);
// Collection of key value pairs that are
- Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
+ Hashtable<IoTString, KeyValue> speculativeTableTmp = new Hashtable<IoTString, KeyValue>();
// The last transaction arbitrated on
int64_t lastTransactionCommitted = -1;
// Create the abort
Abort newAbort = new Abort(NULL,
- transaction.getClientLocalSequenceNumber(),
- transaction.getSequenceNumber(),
- transaction.getMachineId(),
- transaction.getArbitrator(),
- localArbitrationSequenceNumber);
+ transaction.getClientLocalSequenceNumber(),
+ transaction.getSequenceNumber(),
+ transaction.getMachineId(),
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
localArbitrationSequenceNumber++;
generatedAborts.add(newAbort);
}
}
- Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
+ Pair<bool, bool> arbitrateOnLocalTransaction(Transaction transaction) {
// Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
if (transaction.getArbitrator() != localMachineId) {
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<bool, bool>(false, false);
}
if (!transaction.isComplete()) {
// Will arbitrate in incorrect order if we continue so just break
// Most likely this
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<bool, bool>(false, false);
}
if (transaction.getMachineId() != localMachineId) {
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) {
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
// We've have already seen this from the server
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<bool, bool>(false, false);
}
}
}
}
updateLiveStateFromLocal();
- return new Pair<Boolean, Boolean>(true, true);
+ return new Pair<bool, bool>(true, true);
} else {
if (transaction.getMachineId() == localMachineId) {
// Create the abort
Abort newAbort = new Abort(NULL,
- transaction.getClientLocalSequenceNumber(),
- -1,
- transaction.getMachineId(),
- transaction.getArbitrator(),
- localArbitrationSequenceNumber);
+ transaction.getClientLocalSequenceNumber(),
+ -1,
+ transaction.getMachineId(),
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
localArbitrationSequenceNumber++;
addAbortSet.add(newAbort);
}
updateLiveStateFromLocal();
- return new Pair<Boolean, Boolean>(true, false);
+ return new Pair<bool, bool>(true, false);
}
}
return false;
}
// bool compactArbitrationData() {
- // return false;
+ // return false;
// }
/**
// Iterate through all the machine Ids that we received new parts for
for (Long machineId : newCommitParts.keySet()) {
- Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
+ Hashtable<Pair<int64_t int32_t>, CommitPart> parts = newCommitParts.get(machineId);
// Iterate through all the parts for that machine Id
- for (Pair<Long, Integer> partId : parts.keySet()) {
+ for (Pair<int64_t int32_t> partId : parts.keySet()) {
CommitPart part = parts.get(partId);
// Get the transaction object for that sequence number
- Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
+ Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
if (commitForClientTable == NULL) {
// This is the first commit from this device
- commitForClientTable = new HashMap<Long, Commit>();
+ commitForClientTable = new Hashtable<int64_t Commit>();
liveCommitsTable.put(part.getMachineId(), commitForClientTable);
}
for (Long arbitratorId : liveCommitsTable.keySet()) {
// Get all the commits for a specific arbitrator
- Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
+ Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
// Sort the commits in order
- List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
+ Vector<Long> commitSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
Collections.sort(commitSequenceNumbers);
// Get the last commit seen from this arbitrator
for (KeyValue kv : commit.getKeyValueUpdateSet()) {
commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
}
- commitsToEdit.remove(NULL); // remove NULL since it could be in this set
+ commitsToEdit.remove(NULL); // remove NULL since it could be in this set
// Update each previous commit that needs to be updated
for (Commit previousCommit : commitsToEdit) {
}
// Create a list of the transaction sequence numbers and sort them from oldest to newest
- List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
+ Vector<Long> transactionSequenceNumbersSorted = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
Collections.sort(transactionSequenceNumbersSorted);
bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
if (startIndex >= transactionSequenceNumbersSorted.size()) {
// Make sure we are not out of bounds
- return false; // did not speculate
+ return false; // did not speculate
}
Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
void updateLiveTransactionsAndStatus() {
// Go through each of the transactions
- for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<Map.Entry<int64_t Transaction> > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
Transaction transaction = iter.next().getValue();
// Check if the transaction is dead
}
// Go through each of the transactions
- for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<Map.Entry<int64_t TransactionStatus> > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
TransactionStatus status = iter.next().getValue();
// Check if the transaction is dead
// Create a list of clients to watch until they see this rejected message entry.
HashSet<Long> deviceWatchSet = new HashSet<Long>();
- for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
+ for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
// Machine ID for the last message entry
int64_t lastMessageEntryMachineId = lastMessageEntry.getKey();
continue;
}
- Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
+ Pair<int64_t Liveness> lastMessageValue = lastMessageEntry.getValue();
int64_t entrySequenceNumber = lastMessageValue.getFirst();
if (entrySequenceNumber < seq) {
// Add this rejected message to the set of messages that this machine ID did not see yet
- addWatchList(lastMessageEntryMachineId, entry);
+ addWatchVector(lastMessageEntryMachineId, entry);
// This client did not see this rejected message yet so add it to the watch set to monitor
deviceWatchSet.add(lastMessageEntryMachineId);
// Abort has not been seen by the client it is for yet so we need to keep track of it
Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
if (previouslySeenAbort != NULL) {
- previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
+ previouslySeenAbort.setDead();// Delete old version of the abort since we got a rescued newer version
}
if (entry.getTransactionArbitrator() == localMachineId) {
}
// This part is still alive
- Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
+ Hashtable<Pair<int64_t int32_t>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
if (transactionPart == NULL) {
// Dont have a table for this machine Id yet so make one
- transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
+ transactionPart = new Hashtable<Pair<int64_t int32_t>, TransactionPart>();
newTransactionParts.put(entry.getMachineId(), transactionPart);
}
- Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
+ Hashtable<Pair<int64_t int32_t>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
if (commitPart == NULL) {
// Don't have a table for this machine Id yet so make one
- commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
+ commitPart = new Hashtable<Pair<int64_t int32_t>, CommitPart>();
newCommitParts.put(entry.getMachineId(), commitPart);
}
machineSet.remove(machineId);
// Get the set of rejected messages that this machine Id is has not seen yet
- HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
+ HashSet<RejectedMessage> watchset = rejectedMessageWatchVectorTable.get(machineId);
// If there is a rejected message that this machine Id has not seen yet
if (watchset != NULL) {
}
// Set dead the abort
- for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
+ for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
Abort abort = i.next().getValue();
if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
}
// Get the old last message for this device
- Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
+ Pair<int64_t Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<int64_t Liveness>(seqNum, liveness));
if (lastMessageEntry == NULL) {
// If no last message then there is nothing else to process
return;
* Add a rejected message entry to the watch set to keep track of which clients have seen that
* rejected message entry and which have not.
*/
- void addWatchList(int64_t machineId, RejectedMessage entry) {
- HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
+ void addWatchVector(int64_t machineId, RejectedMessage entry) {
+ HashSet<RejectedMessage> entries = rejectedMessageWatchVectorTable.get(machineId);
if (entries == NULL) {
// There is no set for this machine ID yet so create one
entries = new HashSet<RejectedMessage>();
- rejectedMessageWatchListTable.put(machineId, entries);
+ rejectedMessageWatchVectorTable.put(machineId, entries);
}
entries.add(entry);
}
Slot currSlot = newSlots[i];
Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
if (prevSlot != NULL &&
- !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
+ !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
}
}