Slot lastSlotAttemptedToSend = NULL;
bool lastIsNewKey = false;
int lastNewSize = 0;
- Hashtable<Transaction, List<Integer>> lastTransactionPartsSent = NULL;
- List<Entry> lastPendingSendArbitrationEntriesToDelete = NULL;
+ Hashtable<Transaction, Vector<int32_t>> lastTransactionPartsSent = NULL;
+ Vector<Entry> lastPendingSendArbitrationEntriesToDelete = NULL;
NewKey lastNewKey = NULL;
Hashtable<IoTString, KeyValue> speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
Hashtable<IoTString, NewKey> liveNewKeyTable = NULL; // Table of live new keys
- Hashtable<Long, Pair<Long, Liveness>> lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
- Hashtable<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
+ Hashtable<int64_t Pair<int64_t Liveness>> lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+ Hashtable<int64_t HashSet<RejectedMessage>> rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
Hashtable<IoTString, Long> arbitratorTable = NULL; // Table of keys and their arbitrators
Hashtable<Pair<int64_t, int64_t>, Abort> liveAbortTable = NULL; // Table live abort messages
- Hashtable<Long, Hashtable<Pair<Long, Integer>, TransactionPart>> newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
- Hashtable<Long, Hashtable<Pair<Long, Integer>, CommitPart>> newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart>> newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart>> newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
Hashtable<int64_t, int64_t> lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
- Hashtable<Long, Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
+ Hashtable<int64_t Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
Hashtable<Pair<int64_t, int64_t>, Transaction> liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
- Hashtable<Long, Hashtable<Long, Commit>> liveCommitsTable = NULL;
+ Hashtable<int64_t Hashtable<int64_t Commit>> liveCommitsTable = NULL;
Hashtable<IoTString, Commit> liveCommitsByKeyTable = NULL;
Hashtable<int64_t, int64_t> lastCommitSeenSequenceNumberByArbitratorTable = NULL;
- Vector<Long> rejectedSlotList = NULL; // List of rejected slots that have yet to be sent to the server
- List<Transaction> pendingTransactionQueue = NULL;
- List<ArbitrationRound> pendingSendArbitrationRounds = NULL;
- List<Entry> pendingSendArbitrationEntriesToDelete = NULL;
- Hashtable<Transaction, List<Integer>> transactionPartsSent = NULL;
- Hashtable<Long, TransactionStatus> outstandingTransactionStatus = NULL;
- Hashtable<Long, Abort> liveAbortsGeneratedByLocal = NULL;
+ Vector<Long> rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
+ Vector<Transaction> pendingTransactionQueue = NULL;
+ Vector<ArbitrationRound> pendingSendArbitrationRounds = NULL;
+ Vector<Entry> pendingSendArbitrationEntriesToDelete = NULL;
+ Hashtable<Transaction, Vector<int32_t>> transactionPartsSent = NULL;
+ Hashtable<int64_t TransactionStatus> outstandingTransactionStatus = NULL;
+ Hashtable<int64_t Abort> liveAbortsGeneratedByLocal = NULL;
Set<Pair<int64_t, int64_t>> offlineTransactionsCommittedAndAtServer = NULL;
- Hashtable<Long, Pair<String, Integer>> localCommunicationTable = NULL;
+ Hashtable<int64_t Pair<String, int32_t>> localCommunicationTable = NULL;
Hashtable<int64_t, int64_t> lastTransactionSeenFromMachineFromServer = NULL;
Hashtable<int64_t, int64_t> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
speculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
liveNewKeyTable = new Hashtable<IoTString, NewKey>();
- lastMessageTable = new Hashtable<Long, Pair<Long, Liveness>>();
- rejectedMessageWatchListTable = new Hashtable<Long, HashSet<RejectedMessage>>();
+ lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness>>();
+ rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage>>();
arbitratorTable = new Hashtable<IoTString, Long>();
liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
- newTransactionParts = new Hashtable<Long, Hashtable<Pair<Long, Integer>, TransactionPart>>();
- newCommitParts = new Hashtable<Long, Hashtable<Pair<Long, Integer>, CommitPart>>();
+ newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart>>();
+ newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart>>();
lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
- liveTransactionBySequenceNumberTable = new Hashtable<Long, Transaction>();
+ liveTransactionBySequenceNumberTable = new Hashtable<int64_t Transaction>();
liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction>();
- liveCommitsTable = new Hashtable<Long, Hashtable<Long, Commit>>();
+ liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit>>();
liveCommitsByKeyTable = new Hashtable<IoTString, Commit>();
lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
- rejectedSlotList = new Vector<Long>();
- pendingTransactionQueue = new ArrayList<Transaction>();
- pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
- transactionPartsSent = new Hashtable<Transaction, List<Integer>>();
- outstandingTransactionStatus = new Hashtable<Long, TransactionStatus>();
- liveAbortsGeneratedByLocal = new Hashtable<Long, Abort>();
+ rejectedSlotVector = new Vector<Long>();
+ pendingTransactionQueue = new Vector<Transaction>();
+ pendingSendArbitrationEntriesToDelete = new Vector<Entry>();
+ transactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>();
+ outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
+ liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t>>();
- localCommunicationTable = new Hashtable<Long, Pair<String, Integer>>();
+ localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t>>();
lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
- pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
+ pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
// }
synchronized void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) {
- localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
+ localCommunicationTable.put(arbitrator, new Pair<String, int32_t>(hostName, portNumber));
}
synchronized Long getArbitrator(IoTString key) {
localSequenceNumber++;
// Try to fill the slot with data
- ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
+ ThreeTuple<Boolean, int32_t, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
bool needsResize = fillSlotsReturn.getFirst();
int newSize = fillSlotsReturn.getSecond();
Boolean insertedNewKey = fillSlotsReturn.getThird();
lastInsertedNewKey = insertedNewKey;
lastNewSize = newSize;
lastNewKey = newKey;
- lastTransactionPartsSent = new Hashtable<Transaction, List<Integer>>(transactionPartsSent);
- lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+ lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
+ lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
// if (!fromRetry) {
- // lastTransactionPartsSent = new Hashtable<Transaction, List<Integer>>(transactionPartsSent);
- // lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+ // lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
+ // lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
// }
// Nothing was able to be sent to the server so just clear these data structures
}
synchronized bool updateFromLocal(int64_t machineId) {
- Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
+ Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(machineId);
if (localCommunicationInformation == NULL) {
// Cant talk to that device locally so do nothing
return false;
Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
// Get the devices local communications
- Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
+ Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
if (localCommunicationInformation == NULL) {
// Cant talk to that device locally so do nothing
// The data to send back
int returnDataSize = 0;
- List<Entry> unseenArbitrations = new ArrayList<Entry>();
+ Vector<Entry> unseenArbitrations = new Vector<Entry>();
// Get the aborts to send back
- List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
+ Vector<Long> abortLocalSequenceNumbers = new Vector<Long >(liveAbortsGeneratedByLocal.keySet());
Collections.sort(abortLocalSequenceNumbers);
for (Long localSequenceNumber : abortLocalSequenceNumbers) {
if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
}
// Get the commits to send back
- Hashtable<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
+ Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
if (commitForClientTable != NULL) {
- List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
+ Vector<Long> commitLocalSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
Collections.sort(commitLocalSequenceNumbers);
for (Long localSequenceNumber : commitLocalSequenceNumbers) {
Slot[] array = cloud.putSlot(slot, newSize);
if (array == NULL) {
array = new Slot[] {slot};
- rejectedSlotList.clear();
+ rejectedSlotVector.clear();
inserted = true;
} else {
if (array.length == 0) {
}
if (!isInserted) {
- rejectedSlotList.add(slot.getSequenceNumber());
+ rejectedSlotVector.add(slot.getSequenceNumber());
lastTryInserted = false;
} else {
lastTryInserted = true;
}
} else {
- rejectedSlotList.add(slot.getSequenceNumber());
+ rejectedSlotVector.add(slot.getSequenceNumber());
lastTryInserted = false;
}
}
/**
* Returns false if a resize was needed
*/
- ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
+ ThreeTuple<Boolean, int32_t, Boolean> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
int newSize = 0;
if (needsResize && !resize) {
// We need to resize but we are not resizing so return false
- return new ThreeTuple<Boolean, Integer, Boolean>(true, NULL, NULL);
+ return new ThreeTuple<Boolean, int32_t, Boolean>(true, NULL, NULL);
}
bool inserted = false;
for (ArbitrationRound round : pendingSendArbitrationRounds) {
bool isFull = false;
round.generateParts();
- List<Entry> parts = round.getParts();
+ Vector<Entry> parts = round.getParts();
// Insert pending arbitration data
for (Entry arbitrationData : parts) {
if (slot.hasSpace(part)) {
slot.addEntry(part);
- List<Integer> partsSent = transactionPartsSent.get(transaction);
+ Vector<int32_t> partsSent = transactionPartsSent.get(transaction);
if (partsSent == NULL) {
- partsSent = new ArrayList<Integer>();
+ partsSent = new Vector<int32_t>();
transactionPartsSent.put(transaction, partsSent);
}
partsSent.add(part.getPartNumber());
// Fill the remainder of the slot with rescue data
doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
- return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
+ return new ThreeTuple<Boolean, int32_t, Boolean>(false, newSize, inserted);
}
void doRejectedMessages(Slot s) {
- if (! rejectedSlotList.isEmpty()) {
+ if (! rejectedSlotVector.isEmpty()) {
/* TODO: We should avoid generating a rejected message entry if
* there is already a sufficient entry in the queue (e.g.,
* equalsto value of true and same sequence number). */
- int64_t old_seqn = rejectedSlotList.firstElement();
- if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
- int64_t new_seqn = rejectedSlotList.lastElement();
+ int64_t old_seqn = rejectedSlotVector.firstElement();
+ if (rejectedSlotVector.size() > REJECTED_THRESHOLD) {
+ int64_t new_seqn = rejectedSlotVector.lastElement();
RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
s.addEntry(rm);
} else {
int64_t prev_seqn = -1;
int i = 0;
/* Go through list of missing messages */
- for (; i < rejectedSlotList.size(); i++) {
- int64_t curr_seqn = rejectedSlotList.get(i);
+ for (; i < rejectedSlotVector.size(); i++) {
+ int64_t curr_seqn = rejectedSlotVector.get(i);
Slot s_msg = buffer.getSlot(curr_seqn);
if (s_msg != NULL)
break;
s.addEntry(rm);
}
/* Generate rejected message entries for present messages */
- for (; i < rejectedSlotList.size(); i++) {
- int64_t curr_seqn = rejectedSlotList.get(i);
+ for (; i < rejectedSlotVector.size(); i++) {
+ int64_t curr_seqn = rejectedSlotVector.get(i);
Slot s_msg = buffer.getSlot(curr_seqn);
int64_t machineid = s_msg.getMachineID();
RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
// Iterate through all the machine Ids that we received new parts for
for (Long machineId : newTransactionParts.keySet()) {
- Hashtable<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
+ Hashtable<Pair<int64_t int32_t>, TransactionPart> parts = newTransactionParts.get(machineId);
// Iterate through all the parts for that machine Id
- for (Pair<Long, Integer> partId : parts.keySet()) {
+ for (Pair<int64_t int32_t> partId : parts.keySet()) {
TransactionPart part = parts.get(partId);
Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
}
// Get the transaction sequence numbers and sort from oldest to newest
- List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
+ Vector<Long> transactionSequenceNumbers = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
Collections.sort(transactionSequenceNumbers);
// Collection of key value pairs that are
// Iterate through all the machine Ids that we received new parts for
for (Long machineId : newCommitParts.keySet()) {
- Hashtable<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
+ Hashtable<Pair<int64_t int32_t>, CommitPart> parts = newCommitParts.get(machineId);
// Iterate through all the parts for that machine Id
- for (Pair<Long, Integer> partId : parts.keySet()) {
+ for (Pair<int64_t int32_t> partId : parts.keySet()) {
CommitPart part = parts.get(partId);
// Get the transaction object for that sequence number
- Hashtable<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
+ Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
if (commitForClientTable == NULL) {
// This is the first commit from this device
- commitForClientTable = new Hashtable<Long, Commit>();
+ commitForClientTable = new Hashtable<int64_t Commit>();
liveCommitsTable.put(part.getMachineId(), commitForClientTable);
}
for (Long arbitratorId : liveCommitsTable.keySet()) {
// Get all the commits for a specific arbitrator
- Hashtable<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
+ Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
// Sort the commits in order
- List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
+ Vector<Long> commitSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
Collections.sort(commitSequenceNumbers);
// Get the last commit seen from this arbitrator
}
// Create a list of the transaction sequence numbers and sort them from oldest to newest
- List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
+ Vector<Long> transactionSequenceNumbersSorted = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
Collections.sort(transactionSequenceNumbersSorted);
bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
void updateLiveTransactionsAndStatus() {
// Go through each of the transactions
- for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<Map.Entry<int64_t Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
Transaction transaction = iter.next().getValue();
// Check if the transaction is dead
}
// Go through each of the transactions
- for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<Map.Entry<int64_t TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
TransactionStatus status = iter.next().getValue();
// Check if the transaction is dead
// Create a list of clients to watch until they see this rejected message entry.
HashSet<Long> deviceWatchSet = new HashSet<Long>();
- for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
+ for (Map.Entry<int64_t Pair<int64_t Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
// Machine ID for the last message entry
int64_t lastMessageEntryMachineId = lastMessageEntry.getKey();
continue;
}
- Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
+ Pair<int64_t Liveness> lastMessageValue = lastMessageEntry.getValue();
int64_t entrySequenceNumber = lastMessageValue.getFirst();
if (entrySequenceNumber < seq) {
// Add this rejected message to the set of messages that this machine ID did not see yet
- addWatchList(lastMessageEntryMachineId, entry);
+ addWatchVector(lastMessageEntryMachineId, entry);
// This client did not see this rejected message yet so add it to the watch set to monitor
deviceWatchSet.add(lastMessageEntryMachineId);
}
// This part is still alive
- Hashtable<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
+ Hashtable<Pair<int64_t int32_t>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
if (transactionPart == NULL) {
// Dont have a table for this machine Id yet so make one
- transactionPart = new Hashtable<Pair<Long, Integer>, TransactionPart>();
+ transactionPart = new Hashtable<Pair<int64_t int32_t>, TransactionPart>();
newTransactionParts.put(entry.getMachineId(), transactionPart);
}
- Hashtable<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
+ Hashtable<Pair<int64_t int32_t>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
if (commitPart == NULL) {
// Don't have a table for this machine Id yet so make one
- commitPart = new Hashtable<Pair<Long, Integer>, CommitPart>();
+ commitPart = new Hashtable<Pair<int64_t int32_t>, CommitPart>();
newCommitParts.put(entry.getMachineId(), commitPart);
}
machineSet.remove(machineId);
// Get the set of rejected messages that this machine Id is has not seen yet
- HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
+ HashSet<RejectedMessage> watchset = rejectedMessageWatchVectorTable.get(machineId);
// If there is a rejected message that this machine Id has not seen yet
if (watchset != NULL) {
}
// Get the old last message for this device
- Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
+ Pair<int64_t Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<int64_t Liveness>(seqNum, liveness));
if (lastMessageEntry == NULL) {
// If no last message then there is nothing else to process
return;
* Add a rejected message entry to the watch set to keep track of which clients have seen that
* rejected message entry and which have not.
*/
- void addWatchList(int64_t machineId, RejectedMessage entry) {
- HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
+ void addWatchVector(int64_t machineId, RejectedMessage entry) {
+ HashSet<RejectedMessage> entries = rejectedMessageWatchVectorTable.get(machineId);
if (entries == NULL) {
// There is no set for this machine ID yet so create one
entries = new HashSet<RejectedMessage>();
- rejectedMessageWatchListTable.put(machineId, entries);
+ rejectedMessageWatchVectorTable.put(machineId, entries);
}
entries.add(entry);
}