X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2Fjava%2Fiotcloud%2FTable.java;h=6a05197fe3d68a6129c81601eaec02ef10a05650;hp=6348ccc273ed40bb3a5e10bbb49f871f86963841;hb=12a5123dcee5f2d19bf729a71333359ca917df62;hpb=b6da0ceefcd3d2077c929fe805394b3804e3e88e;ds=sidebyside diff --git a/version2/src/java/iotcloud/Table.java b/version2/src/java/iotcloud/Table.java index 6348ccc..6a05197 100644 --- a/version2/src/java/iotcloud/Table.java +++ b/version2/src/java/iotcloud/Table.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; final public class Table { /* Constants */ - static final int FREE_SLOTS = 10; // Number of slots that should be kept free + static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10 static final int SKIP_THRESHOLD = 10; static final double RESIZE_MULTIPLE = 1.2; static final double RESIZE_THRESHOLD = 0.75; @@ -44,6 +44,8 @@ final public class Table { private long oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry private long localMachineId = 0; // Machine ID of this client device private long sequenceNumber = 0; // Largest sequence number a client has received + private long localSequenceNumber = 0; + // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions @@ -166,14 +168,34 @@ final public class Table { int livec = 0; int deadc = 0; + + int casdasd = 0; + + int liveslo = 0; + for (long i = o; i < (n + 1); i++) { Slot s = buffer.getSlot(i); + + if (s.isLive()) { + liveslo++; + } + Vector entries = s.getEntries(); for (Entry e : entries) { if (e.isLive()) { int type = e.getType(); + + + if (type == 6) { + RejectedMessage rej = (RejectedMessage)e; + casdasd++; + + System.out.println(rej.getMachineID()); + } + + types[type] = types[type] + 1; num++; livec++; @@ -187,6 +209,8 @@ final public class Table { System.out.println(i + " " + types[i]); } System.out.println("Live count: " + livec); + System.out.println("Live Slot count: " + liveslo); + System.out.println("Dead count: " + deadc); System.out.println("Old: " + o); System.out.println("New: " + n); @@ -220,7 +244,8 @@ final public class Table { cloud.initSecurity(); // Create the first insertion into the block chain which is the table status - Slot s = new Slot(this, 1, localMachineId); + Slot s = new Slot(this, 1, localMachineId, localSequenceNumber); + localSequenceNumber++; TableStatus status = new TableStatus(s, numberOfSlots); s.addEntry(status); Slot[] array = cloud.putSlot(s, numberOfSlots); @@ -244,6 +269,9 @@ final public class Table { // Just pull the latest slots from the server Slot[] newslots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); + sendToServer(null); + updateLiveTransactionsAndStatus(); + } // public String toString() { @@ -271,6 +299,11 @@ final public class Table { public synchronized void close() { cloud.close(); } + + // Return all keys in the table + public synchronized Set getKeys() { + return committedKeyValueTable.keySet(); + } public synchronized IoTString getCommitted(IoTString key) { KeyValue kv = committedKeyValueTable.get(key); @@ -381,6 +414,7 @@ final public class Table { } NewKey newKey = new NewKey(null, keyName, machineId); + if (sendToServer(newKey)) { // If successfully inserted return true; @@ -496,6 +530,10 @@ final public class Table { bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower); } + public long getLocalSequenceNumber() { + return localSequenceNumber; + } + boolean lastInsertedNewKey = false; @@ -689,9 +727,11 @@ final public class Table { } + try { // While we have stuff that needs inserting into the block chain while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) { + fromRetry = false; if (hadPartialSendToServer) { @@ -706,7 +746,8 @@ final public class Table { } // Create the slot - Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC()); + Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber); + localSequenceNumber++; // Try to fill the slot with data ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); @@ -900,7 +941,8 @@ final public class Table { bbEncode.putInt(0); // Send by local - byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + localSequenceNumber++; if (returnData == null) { // Could not contact server @@ -961,7 +1003,8 @@ final public class Table { // Send by local - byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + localSequenceNumber++; if (returnData == null) { // Could not contact server @@ -1114,6 +1157,8 @@ final public class Table { entry.encode(bbEncode); } + + localSequenceNumber++; return returnData; } @@ -1184,9 +1229,12 @@ final public class Table { * Returns false if a resize was needed */ private ThreeTuple fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) { + + int newSize = 0; if (liveSlotCount > bufferResizeThreshold) { resize = true; //Resize is forced + } if (resize) { @@ -1215,6 +1263,7 @@ final public class Table { if (newKeyEntry != null) { newKeyEntry.setSlot(slot); if (slot.hasSpace(newKeyEntry)) { + slot.addEntry(newKeyEntry); inserted = true; } @@ -1544,9 +1593,6 @@ final public class Table { if (expectedsize > currMaxSize) { expectedsize = currMaxSize; } - - // System.out.println("" + expectedsize); - } @@ -1581,6 +1627,7 @@ final public class Table { // Change the number of local slots to the new size numberOfSlots = (int)currMaxSize; + // Recalculate the resize threshold since the size of the local buffer has changed setResizeThreshold(); } @@ -2703,4 +2750,4 @@ final public class Table { throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot); } } -} \ No newline at end of file +}