final public class Table {
-
/* Constants */
- static final int FREE_SLOTS = 10; // Number of slots that should be kept free
+ static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
static final int SKIP_THRESHOLD = 10;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
private long oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
private long localMachineId = 0; // Machine ID of this client device
private long sequenceNumber = 0; // Largest sequence number a client has received
- private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
- private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
+ private long localSequenceNumber = 0;
+
+ // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
+ // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
private long localArbitrationSequenceNumber = 0;
private boolean hadPartialSendToServer = false;
private boolean attemptedToSendToServer = false;
+ private long expectedsize;
+ private boolean didFindTableStatus = false;
+ private long currMaxSize = 0;
+
+ private Slot lastSlotAttemptedToSend = null;
+ private boolean lastIsNewKey = false;
+ private int lastNewSize = 0;
+ private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
+ private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
+ private NewKey lastNewKey = null;
+
/* Data Structures */
private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
-
-
-
public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
localMachineId = _localMachineId;
cloud = new CloudComm(this, baseurl, password, listeningPort);
int livec = 0;
int deadc = 0;
+
+ int casdasd = 0;
+
+ int liveslo = 0;
+
for (long i = o; i < (n + 1); i++) {
Slot s = buffer.getSlot(i);
+
+ if (s.isLive()) {
+ liveslo++;
+ }
+
Vector<Entry> entries = s.getEntries();
for (Entry e : entries) {
if (e.isLive()) {
int type = e.getType();
+
+
+ if (type == 6) {
+ RejectedMessage rej = (RejectedMessage)e;
+ casdasd++;
+
+ System.out.println(rej.getMachineID());
+ }
+
+
types[type] = types[type] + 1;
num++;
livec++;
System.out.println(i + " " + types[i]);
}
System.out.println("Live count: " + livec);
+ System.out.println("Live Slot count: " + liveslo);
+
System.out.println("Dead count: " + deadc);
System.out.println("Old: " + o);
System.out.println("New: " + n);
System.out.println("Size: " + buffer.size());
+ // System.out.println("Commits: " + liveCommitsTable.size());
+ System.out.println("pendingTrans: " + pendingTransactionQueue.size());
+ System.out.println("Trans Status Out: " + outstandingTransactionStatus.size());
- // List<IoTString> strList = new ArrayList<IoTString>();
- // for (int i = 0; i < 100; i++) {
- // String keyA = "a" + i;
- // String keyB = "b" + i;
- // String keyC = "c" + i;
- // String keyD = "d" + i;
-
- // IoTString iKeyA = new IoTString(keyA);
- // IoTString iKeyB = new IoTString(keyB);
- // IoTString iKeyC = new IoTString(keyC);
- // IoTString iKeyD = new IoTString(keyD);
-
- // strList.add(iKeyA);
- // strList.add(iKeyB);
- // strList.add(iKeyC);
- // strList.add(iKeyD);
- // }
+ for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
+ System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
+ }
- // for (Long l : commitMap.keySet()) {
- // for (Long l2 : commitMap.get(l).keySet()) {
- // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
- // strList.remove(kv.getKey());
- // System.out.print(kv.getKey() + " ");
- // }
- // }
- // }
-
- // System.out.println();
- // System.out.println();
+ for (Long a : liveCommitsTable.keySet()) {
+ for (Long b : liveCommitsTable.get(a).keySet()) {
+ for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
+ System.out.print(kv + " ");
+ }
+ System.out.print("|| ");
+ }
+ System.out.println();
+ }
- // for (IoTString s : strList) {
- // System.out.print(s + " ");
- // }
- // System.out.println();
- // System.out.println(strList.size());
}
/**
cloud.initSecurity();
// Create the first insertion into the block chain which is the table status
- Slot s = new Slot(this, 1, localMachineId);
+ Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
+ localSequenceNumber++;
TableStatus status = new TableStatus(s, numberOfSlots);
s.addEntry(status);
Slot[] array = cloud.putSlot(s, numberOfSlots);
// Just pull the latest slots from the server
Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
validateAndUpdate(newslots, true);
+ sendToServer(null);
+ updateLiveTransactionsAndStatus();
+
}
-// public String toString() {
-// String retString = " Committed Table: \n";
-// retString += "---------------------------\n";
-// retString += commitedTable.toString();
+ // public String toString() {
+ // String retString = " Committed Table: \n";
+ // retString += "---------------------------\n";
+ // retString += commitedTable.toString();
-// retString += "\n\n";
+ // retString += "\n\n";
-// retString += " Speculative Table: \n";
-// retString += "---------------------------\n";
-// retString += speculativeTable.toString();
+ // retString += " Speculative Table: \n";
+ // retString += "---------------------------\n";
+ // retString += speculativeTable.toString();
-// return retString;
-// }
+ // return retString;
+ // }
public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
validateAndUpdate(newSlots, false);
sendToServer(null);
+
+ updateLiveTransactionsAndStatus();
+
return true;
} catch (Exception e) {
// e.printStackTrace();
+
+ for (Long m : localCommunicationTable.keySet()) {
+ updateFromLocal(m);
+ }
}
return false;
}
NewKey newKey = new NewKey(null, keyName, machineId);
+
if (sendToServer(newKey)) {
// If successfully inserted
return true;
bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
}
+ public long getLocalSequenceNumber() {
+ return localSequenceNumber;
+ }
+
+
+ boolean lastInsertedNewKey = false;
+
private boolean sendToServer(NewKey newKey) throws ServerException {
+ boolean fromRetry = false;
+
+ try {
+ if (hadPartialSendToServer) {
+ Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+ if (newSlots.length == 0) {
+ fromRetry = true;
+ ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+
+ if (sendSlotsReturn.getFirst()) {
+ if (newKey != null) {
+ if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+ newKey = null;
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // Update the transaction status
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction.didSendAllParts()) {
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ pendingTransactionQueue.remove(transaction);
+ }
+ }
+ } else {
+
+ newSlots = sendSlotsReturn.getThird();
+
+ boolean isInserted = false;
+ for (Slot s : newSlots) {
+ if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (Slot s : newSlots) {
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ for (Entry entry : s.getEntries()) {
+
+ if (entry.getType() == Entry.TypeLastMessage) {
+ LastMessage lastMessage = (LastMessage)entry;
+ if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+ isInserted = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (isInserted) {
+ if (newKey != null) {
+ if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+ newKey = null;
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // Update the transaction status
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction.didSendAllParts()) {
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ pendingTransactionQueue.remove(transaction);
+ } else {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+ }
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+
+ if (sendSlotsReturn.getThird().length != 0) {
+ // insert into the local block chain
+ validateAndUpdate(sendSlotsReturn.getThird(), true);
+ }
+ // continue;
+ } else {
+ boolean isInserted = false;
+ for (Slot s : newSlots) {
+ if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (Slot s : newSlots) {
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ for (Entry entry : s.getEntries()) {
+
+ if (entry.getType() == Entry.TypeLastMessage) {
+ LastMessage lastMessage = (LastMessage)entry;
+ if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+ isInserted = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (isInserted) {
+ if (newKey != null) {
+ if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+ newKey = null;
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // Update the transaction status
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction.didSendAllParts()) {
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ pendingTransactionQueue.remove(transaction);
+ } else {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+ }
+ } else {
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+ }
+
+ // insert into the local block chain
+ validateAndUpdate(newSlots, true);
+ }
+ }
+ } catch (ServerException e) {
+ throw e;
+ }
+
+
+
try {
// While we have stuff that needs inserting into the block chain
while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
+ fromRetry = false;
+
+ if (hadPartialSendToServer) {
+ throw new Error("Should Be error free");
+ }
+
+
+
+ // If there is a new key with same name then end
+ if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
+ return false;
+ }
+
// Create the slot
- Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
+ Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
+ localSequenceNumber++;
// Try to fill the slot with data
ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
transaction.resetNextPartToSend();
// Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
+ if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
transaction.setSequenceNumber(-1);
}
}
fillSlot(slot, true, newKey);
}
- // Try to send to the server
+ lastSlotAttemptedToSend = slot;
+ lastIsNewKey = (newKey != null);
+ lastInsertedNewKey = insertedNewKey;
+ lastNewSize = newSize;
+ lastNewKey = newKey;
+ lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+ lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+
+
ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
- if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
+ if (sendSlotsReturn.getFirst()) {
+
// Did insert into the block chain
- if (sendSlotsReturn.getFirst()) {
+ if (insertedNewKey) {
// This slot was what was inserted not a previous slot
// New Key was successfully inserted into the block chain so dont want to insert it again
}
for (Transaction transaction : transactionPartsSent.keySet()) {
-
-
transaction.resetServerFailure();
-
// Update which transactions parts still need to be sent
transaction.removeSentParts(transactionPartsSent.get(transaction));
}
}
} else {
+
+ // if (!sendSlotsReturn.getSecond()) {
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
+ // }
+ // } else {
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
+
+ // // Update which transactions parts still need to be sent
+ // transaction.removeSentParts(transactionPartsSent.get(transaction));
+
+ // // Add the transaction status to the outstanding list
+ // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // // Update the transaction status
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ // if (transaction.didSendAllParts()) {
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ // pendingTransactionQueue.remove(transaction);
+
+ // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
+ // }
+ // }
+ // }
+ // }
+
// Reset which transaction to send
for (Transaction transaction : transactionPartsSent.keySet()) {
transaction.resetNextPartToSend();
- transaction.resetNextPartToSend();
+ // transaction.resetNextPartToSend();
// Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
+ if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
transaction.setSequenceNumber(-1);
}
}
validateAndUpdate(sendSlotsReturn.getThird(), true);
}
}
- } catch (ServerException e) {
-
- System.out.println("Server Failure: " + e.getType());
+ } catch (ServerException e) {
if (e.getType() != ServerException.TypeInputTimeout) {
// e.printStackTrace();
transaction.resetNextPartToSend();
// Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
+ if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
transaction.setSequenceNumber(-1);
}
}
// There was a partial send to the server
hadPartialSendToServer = true;
+
+ // if (!fromRetry) {
+ // lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+ // lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+ // }
+
// Nothing was able to be sent to the server so just clear these data structures
for (Transaction transaction : transactionPartsSent.keySet()) {
transaction.resetNextPartToSend();
return newKey == null;
}
- public synchronized boolean updateFromLocal(long machineId) {
+ private synchronized boolean updateFromLocal(long machineId) {
Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
if (localCommunicationInformation == null) {
// Cant talk to that device locally so do nothing
bbEncode.putInt(0);
// Send by local
- byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ localSequenceNumber++;
if (returnData == null) {
// Could not contact server
if (localCommunicationInformation == null) {
// Cant talk to that device locally so do nothing
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<Boolean, Boolean>(true, false);
}
// Get the size of the send data
part.encode(bbEncode);
}
+
// Send by local
- byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ localSequenceNumber++;
if (returnData == null) {
// Could not contact server
}
public synchronized byte[] acceptDataFromLocal(byte[] data) {
+
// Decode the data
ByteBuffer bbDecode = ByteBuffer.wrap(data);
long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
entry.encode(bbEncode);
}
+
+ localSequenceNumber++;
return returnData;
}
* Returns false if a resize was needed
*/
private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
+
+
int newSize = 0;
if (liveSlotCount > bufferResizeThreshold) {
resize = true; //Resize is forced
+
}
if (resize) {
if (newKeyEntry != null) {
newKeyEntry.setSlot(slot);
if (slot.hasSpace(newKeyEntry)) {
+
slot.addEntry(newKeyEntry);
inserted = true;
}
Transaction transaction = pendingTransactionQueue.get(0);
// Set the transaction sequence number if it has yet to be inserted into the block chain
- if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
+ // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
+ // transaction.setSequenceNumber(slot.getSequenceNumber());
+ // }
+
+ if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
transaction.setSequenceNumber(slot.getSequenceNumber());
}
+
while (true) {
TransactionPart part = transaction.getNextPartToSend();
long old_seqn = rejectedSlotList.firstElement();
if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
long new_seqn = rejectedSlotList.lastElement();
- RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false);
+ RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
s.addEntry(rm);
} else {
long prev_seqn = -1;
}
/* Generate rejected message entry for missing messages */
if (prev_seqn != -1) {
- RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false);
+ RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
s.addEntry(rm);
}
/* Generate rejected message entries for present messages */
long curr_seqn = rejectedSlotList.get(i);
Slot s_msg = buffer.getSlot(curr_seqn);
long machineid = s_msg.getMachineID();
- RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
s.addEntry(rm);
}
}
return;
}
- // Reset the table status declared sizes
- smallestTableStatusSeen = -1;
- largestTableStatusSeen = -1;
-
-
// Make sure all slots are newer than the last largest slot this client has seen
long firstSeqNum = newSlots[0].getSequenceNumber();
if (firstSeqNum <= sequenceNumber) {
// Process each slots data
for (Slot slot : newSlots) {
processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
+
+ updateExpectedSize();
}
// If there is a gap, check to see if the server sent us everything.
updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
}
+ private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
+ // if (didFindTableStatus) {
+ // return;
+ // }
+ long prevslots = firstSequenceNumber;
+
+
+ if (didFindTableStatus) {
+ // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
+ // System.out.println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots);
+
+ } else {
+ expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
+ // System.out.println("Here: " + expectedsize);
+ }
+
+ // System.out.println(numberOfSlots);
+
+ didFindTableStatus = true;
+ currMaxSize = numberOfSlots;
+ }
+
+ private void updateExpectedSize() {
+ expectedsize++;
+
+ if (expectedsize > currMaxSize) {
+ expectedsize = currMaxSize;
+ }
+ }
+
+
/**
* Check the size of the block chain to make sure there are enough slots sent back by the server.
* This is only called when we have a gap between the slots that we have locally and the slots
* status message
*/
private void checkNumSlots(int numberOfSlots) {
-
- // We only have 1 size so we must have this many slots
- if (largestTableStatusSeen == smallestTableStatusSeen) {
- if (numberOfSlots != smallestTableStatusSeen) {
- throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
- }
- } else {
- // We have more than 1
- if (numberOfSlots < smallestTableStatusSeen) {
- throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
- }
+ if (numberOfSlots != expectedsize) {
+ throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots);
}
}
+ private void updateCurrMaxSize(int newmaxsize) {
+ currMaxSize = newmaxsize;
+ }
+
+
/**
* Update the size of of the local buffer if it is needed.
*/
private void commitNewMaxSize() {
-
- int currMaxSize = 0;
-
- if (largestTableStatusSeen == -1) {
- // No table status seen so the current max size does not change
- currMaxSize = numberOfSlots;
- } else {
- currMaxSize = largestTableStatusSeen;
- }
+ didFindTableStatus = false;
// Resize the local slot buffer
if (numberOfSlots != currMaxSize) {
- buffer.resize(currMaxSize);
+ buffer.resize((int)currMaxSize);
}
// Change the number of local slots to the new size
- numberOfSlots = currMaxSize;
+ numberOfSlots = (int)currMaxSize;
+
// Recalculate the resize threshold since the size of the local buffer has changed
setResizeThreshold();
newTransactionParts.clear();
}
+
+ private long lastSeqNumArbOn = 0;
+
private void arbitrateFromServer() {
if (liveTransactionBySequenceNumberTable.size() == 0) {
for (Long transactionSequenceNumber : transactionSequenceNumbers) {
Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
+
+
// Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
if (transaction.getArbitrator() != localMachineId) {
continue;
}
+ if (transactionSequenceNumber < lastSeqNumArbOn) {
+ continue;
+ }
+
if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
// We have seen this already locally so dont commit again
continue;
break;
}
+
// update the largest transaction seen by arbitrator from server
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
}
// Update what the last transaction committed was for use in batch commit
- lastTransactionCommitted = transaction.getSequenceNumber();
+ lastTransactionCommitted = transactionSequenceNumber;
} else {
// Guard evaluated was false so create abort
// Insert the abort so we can process
processEntry(newAbort);
}
+
+ lastSeqNumArbOn = transactionSequenceNumber;
+
+ // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
}
Commit newCommit = null;
status.setStatus(TransactionStatus.StatusAborted);
}
} else {
-
Set addAbortSet = new HashSet<Abort>();
}
}
+ // Update the last transaction that was updated if we can
+ if (commit.getTransactionSequenceNumber() != -1) {
+ Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
+
+ // Update the last transaction sequence number that the arbitrator arbitrated on
+ if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
+ lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
+ }
+ }
+
// Update the last arbitration data that we have seen so far
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
}
-
-
- // Update the last transaction that was updated if we can
- if (commit.getTransactionSequenceNumber() != -1) {
- Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
-
- // Update the last transaction sequence number that the arbitrator arbitrated on
- if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
- lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
- }
- }
-
// We processed a new commit that we havent seen before
didProcessANewCommit = true;
-
-
// Update the committed table of keys and which commit is using which key
for (KeyValue kv : commit.getKeyValueUpdateSet()) {
committedKeyValueTable.put(kv.getKey(), kv);
break;
case Entry.TypeTableStatus:
- processEntry((TableStatus)entry);
+ processEntry((TableStatus)entry, slot.getSequenceNumber());
break;
default:
* keeps track of the largest and smallest table status seen in this current round
* of updating the local copy of the block chain
*/
- private void processEntry(TableStatus entry) {
+ private void processEntry(TableStatus entry, long seq) {
int newNumSlots = entry.getMaxSlots();
+ updateCurrMaxSize(newNumSlots);
+
+ initExpectedSize(seq, newNumSlots);
if (liveTableStatus != null) {
// We have a larger table status so the old table status is no longer alive
// Make this new table status the latest alive table status
liveTableStatus = entry;
-
- if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) {
- smallestTableStatusSeen = newNumSlots;
- }
-
- if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) {
- largestTableStatusSeen = newNumSlots;
- }
}
/**
long newSeqNum = entry.getNewSeqNum();
boolean isequal = entry.getEqual();
long machineId = entry.getMachineID();
+ long seq = entry.getSequenceNumber();
// Check if we have messages that were supposed to be rejected in our local block chain
Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
long entrySequenceNumber = lastMessageValue.getFirst();
- if (entrySequenceNumber < newSeqNum) {
+ if (entrySequenceNumber < seq) {
// Add this rejected message to the set of messages that this machine ID did not see yet
addWatchList(lastMessageEntryMachineId, entry);
* Process new commit entries and save them for future use. Delete duplicates
*/
private void processEntry(CommitPart entry) {
+
+
+ // Update the last transaction that was updated if we can
+ if (entry.getTransactionSequenceNumber() != -1) {
+ Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
+
+ // Update the last transaction sequence number that the arbitrator arbitrated on
+ if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
+ lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
+ }
+ }
+
+
+
+
Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
if (commitPart == null) {
RejectedMessage rm = rmit.next();
// If this machine Id has seen this rejected message...
- if (rm.getNewSeqNum() <= seqNum) {
+ if (rm.getSequenceNumber() <= seqNum) {
// Remove it from our watchlist
rmit.remove();