class RejectedMessage extends Entry {
+ /* Sequence number */
+ private long sequencenum;
+
+
/* Machine identifier */
private long machineid;
/* Oldest sequence number in range */
/* Set of machines that have not received notification. */
private HashSet<Long> watchset;
- RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
+ RejectedMessage(Slot slot, long _sequencenum, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
super(slot);
+ sequencenum = _sequencenum;
machineid=_machineid;
oldseqnum=_oldseqnum;
newseqnum=_newseqnum;
return machineid;
}
+
+ long getSequenceNumber() {
+ return sequencenum;
+ }
+
static Entry decode(Slot slot, ByteBuffer bb) {
+ long sequencenum=bb.getLong();
long machineid=bb.getLong();
long oldseqnum=bb.getLong();
long newseqnum=bb.getLong();
byte equalto=bb.get();
- return new RejectedMessage(slot, machineid, oldseqnum, newseqnum, equalto==1);
+ return new RejectedMessage(slot,sequencenum, machineid, oldseqnum, newseqnum, equalto==1);
}
void setWatchSet(HashSet<Long> _watchset) {
void encode(ByteBuffer bb) {
bb.put(Entry.TypeRejectedMessage);
+ bb.putLong(sequencenum);
bb.putLong(machineid);
bb.putLong(oldseqnum);
bb.putLong(newseqnum);
}
int getSize() {
- return 3*Long.BYTES + 2*Byte.BYTES;
+ return 4*Long.BYTES + 2*Byte.BYTES;
}
byte getType() {
}
Entry getCopy(Slot s) {
- return new RejectedMessage(s, machineid, oldseqnum, newseqnum, equalto);
+ return new RejectedMessage(s,sequencenum, machineid, oldseqnum, newseqnum, equalto);
}
}
*/
final public class Table {
-
-
+
/* Constants */
static final int FREE_SLOTS = 10; // Number of slots that should be kept free
static final int SKIP_THRESHOLD = 10;
private long oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
private long localMachineId = 0; // Machine ID of this client device
private long sequenceNumber = 0; // Largest sequence number a client has received
- private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
- private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
+ // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
+ // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
private long localArbitrationSequenceNumber = 0;
private boolean hadPartialSendToServer = false;
private boolean attemptedToSendToServer = false;
+ private long expectedsize;
+ private boolean didFindTableStatus = false;
+ private long currMaxSize = 0;
/* Data Structures */
private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
long old_seqn = rejectedSlotList.firstElement();
if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
long new_seqn = rejectedSlotList.lastElement();
- RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false);
+ RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
s.addEntry(rm);
} else {
long prev_seqn = -1;
}
/* Generate rejected message entry for missing messages */
if (prev_seqn != -1) {
- RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false);
+ RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
s.addEntry(rm);
}
/* Generate rejected message entries for present messages */
long curr_seqn = rejectedSlotList.get(i);
Slot s_msg = buffer.getSlot(curr_seqn);
long machineid = s_msg.getMachineID();
- RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
s.addEntry(rm);
}
}
return;
}
- // Reset the table status declared sizes
- smallestTableStatusSeen = -1;
- largestTableStatusSeen = -1;
-
-
// Make sure all slots are newer than the last largest slot this client has seen
long firstSeqNum = newSlots[0].getSequenceNumber();
if (firstSeqNum <= sequenceNumber) {
// Process each slots data
for (Slot slot : newSlots) {
processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
+ updateExpectedSize();
}
// If there is a gap, check to see if the server sent us everything.
updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
}
+
+
+ private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
+ if (didFindTableStatus) {
+ return;
+ }
+ long prevslots = firstSequenceNumber;
+ expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
+ currMaxSize = numberOfSlots;
+ }
+
+ private void updateExpectedSize() {
+ expectedsize++;
+ if (expectedsize > currMaxSize)
+ {
+ expectedsize = currMaxSize;
+ }
+ }
+
+
/**
* Check the size of the block chain to make sure there are enough slots sent back by the server.
* This is only called when we have a gap between the slots that we have locally and the slots
* status message
*/
private void checkNumSlots(int numberOfSlots) {
-
- // We only have 1 size so we must have this many slots
- if (largestTableStatusSeen == smallestTableStatusSeen) {
- if (numberOfSlots != smallestTableStatusSeen) {
- throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
- }
- } else {
- // We have more than 1
- if (numberOfSlots < smallestTableStatusSeen) {
- throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
- }
+ if (numberOfSlots != expectedsize) {
+ throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots);
}
}
+ private void updateCurrMaxSize(int newmaxsize) {
+ currMaxSize=newmaxsize;
+ }
+
+
/**
* Update the size of of the local buffer if it is needed.
*/
private void commitNewMaxSize() {
-
- int currMaxSize = 0;
-
- if (largestTableStatusSeen == -1) {
- // No table status seen so the current max size does not change
- currMaxSize = numberOfSlots;
- } else {
- currMaxSize = largestTableStatusSeen;
- }
+ didFindTableStatus = false;
// Resize the local slot buffer
if (numberOfSlots != currMaxSize) {
- buffer.resize(currMaxSize);
+ buffer.resize((int)currMaxSize);
}
// Change the number of local slots to the new size
- numberOfSlots = currMaxSize;
+ numberOfSlots = (int)currMaxSize;
// Recalculate the resize threshold since the size of the local buffer has changed
setResizeThreshold();
break;
case Entry.TypeTableStatus:
- processEntry((TableStatus)entry);
+ processEntry((TableStatus)entry, slot.getSequenceNumber());
break;
default:
* keeps track of the largest and smallest table status seen in this current round
* of updating the local copy of the block chain
*/
- private void processEntry(TableStatus entry) {
+ private void processEntry(TableStatus entry, long seq) {
int newNumSlots = entry.getMaxSlots();
+ updateCurrMaxSize(newNumSlots);
+
+ initExpectedSize(seq, newNumSlots);
if (liveTableStatus != null) {
// We have a larger table status so the old table status is no longer alive
// Make this new table status the latest alive table status
liveTableStatus = entry;
-
- if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) {
- smallestTableStatusSeen = newNumSlots;
- }
-
- if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) {
- largestTableStatusSeen = newNumSlots;
- }
}
/**
long newSeqNum = entry.getNewSeqNum();
boolean isequal = entry.getEqual();
long machineId = entry.getMachineID();
+ long seq = entry.getSequenceNumber();
// Check if we have messages that were supposed to be rejected in our local block chain
Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
long entrySequenceNumber = lastMessageValue.getFirst();
- if (entrySequenceNumber < newSeqNum) {
+ if (entrySequenceNumber < seq) {
// Add this rejected message to the set of messages that this machine ID did not see yet
addWatchList(lastMessageEntryMachineId, entry);
RejectedMessage rm = rmit.next();
// If this machine Id has seen this rejected message...
- if (rm.getNewSeqNum() <= seqNum) {
+ if (rm.getSequenceNumber() <= seqNum) {
// Remove it from our watchlist
rmit.remove();