projects
/
iotcloud.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
a740c23
)
--amend
author
bdemsky
<bdemsky@uci.edu>
Sun, 24 Jul 2016 05:01:17 +0000
(22:01 -0700)
committer
bdemsky
<bdemsky@uci.edu>
Sun, 24 Jul 2016 05:01:17 +0000
(22:01 -0700)
src/java/iotcloud/Makefile
patch
|
blob
|
history
src/java/iotcloud/Table.java
patch
|
blob
|
history
src/java/iotcloud/Test.java
patch
|
blob
|
history
src/java/iotcloud/TestCloudComm.java
patch
|
blob
|
history
diff --git
a/src/java/iotcloud/Makefile
b/src/java/iotcloud/Makefile
index df6ceebf0e5668c91f27c007a52921a925c5da97..2d45b63d9a1fdc6055c72bdf25f819938903d606 100644
(file)
--- a/
src/java/iotcloud/Makefile
+++ b/
src/java/iotcloud/Makefile
@@
-9,7
+9,7
@@
server:
$(JAVAC) -d $(BIN_DIR) *.java
doc: server
$(JAVAC) -d $(BIN_DIR) *.java
doc: server
- $(JAVADOC) -d $(DOCS_DIR) *.java
+ $(JAVADOC) -
private -
d $(DOCS_DIR) *.java
clean:
rm -r bin/*
clean:
rm -r bin/*
diff --git
a/src/java/iotcloud/Table.java
b/src/java/iotcloud/Table.java
index c31aed86851d4e273eb950b0656f3223a54a7698..d3bac3dd40a913a6f71c47495c53e8174991d13a 100644
(file)
--- a/
src/java/iotcloud/Table.java
+++ b/
src/java/iotcloud/Table.java
@@
-22,14
+22,15
@@
final public class Table {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
localmachineid=_localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
- sequencenumber =
1
;
+ sequencenumber =
0
;
initCloud(baseurl, password);
}
public Table(CloudComm _cloud, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
initCloud(baseurl, password);
}
public Table(CloudComm _cloud, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
- sequencenumber = 1;
+ numslots = buffer.capacity();
+ sequencenumber = 0;
cloud=_cloud;
}
cloud=_cloud;
}
@@
-60,8
+61,8
@@
final public class Table {
}
public void update() {
}
public void update() {
- Slot[] newslots=cloud.getSlots(sequencenumber);
- validateandupdate(newslots);
+ Slot[] newslots=cloud.getSlots(sequencenumber
+1
);
+ validateandupdate(newslots
, false
);
}
public IoTString get(IoTString key) {
}
public IoTString get(IoTString key) {
@@
-79,7
+80,7
@@
final public class Table {
Slot[] array=cloud.putSlot(s, numslots);
if (array == null) {
array = new Slot[] {s};
Slot[] array=cloud.putSlot(s, numslots);
if (array == null) {
array = new Slot[] {s};
- validateandupdate(array); // update data structure
+ validateandupdate(array
, true
); // update data structure
} else {
throw new Error("Error on initialization");
}
} else {
throw new Error("Error on initialization");
}
@@
-89,7
+90,10
@@
final public class Table {
while(true) {
KeyValue oldvalue=table.get(key);
if (tryput(key, value, false)) {
while(true) {
KeyValue oldvalue=table.get(key);
if (tryput(key, value, false)) {
- return oldvalue.getValue();
+ if (oldvalue==null)
+ return null;
+ else
+ return oldvalue.getValue();
}
}
}
}
}
}
@@
-112,6
+116,8
@@
final public class Table {
continue;
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
continue;
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
+ if (redundant(liveentry))
+ continue;
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
else if (i==0) {
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
else if (i==0) {
@@
-139,6
+145,8
@@
final public class Table {
continue;
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
continue;
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
+ if (redundant(liveentry))
+ continue;
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
else
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
else
@@
-155,19
+163,28
@@
final public class Table {
else
insertedkv=false;
else
insertedkv=false;
- validateandupdate(array); // update data structure
+ validateandupdate(array
, true
); // update data structure
return insertedkv;
}
return insertedkv;
}
- private void validateandupdate(Slot[] newslots) {
+ boolean redundant(Entry liveentry) {
+ if (liveentry.getType()==Entry.TypeLastMessage) {
+ LastMessage lastmsg=(LastMessage) liveentry;
+ return lastmsg.getMachineID() == localmachineid;
+ }
+ return false;
+ }
+
+
+ private void validateandupdate(Slot[] newslots, boolean isput) {
//The cloud communication layer has checked slot HMACs already
//before decoding
if (newslots.length==0)
return;
long firstseqnum=newslots[0].getSequenceNumber();
//The cloud communication layer has checked slot HMACs already
//before decoding
if (newslots.length==0)
return;
long firstseqnum=newslots[0].getSequenceNumber();
- if (firstseqnum < sequencenumber)
+ if (firstseqnum <
=
sequencenumber)
throw new Error("Server Error: Sent older slots!");
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
throw new Error("Server Error: Sent older slots!");
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
@@
-176,26
+193,32
@@
final public class Table {
initExpectedSize();
for(Slot slot: newslots) {
updateExpectedSize();
initExpectedSize();
for(Slot slot: newslots) {
updateExpectedSize();
- processSlot(indexer, slot);
+ processSlot(indexer, slot
, isput
);
}
}
- checkNumSlots(newslots.length);
+
+ //If there is a gap, check to see if the server sent us everything
+ if (firstseqnum != (sequencenumber+1))
+ checkNumSlots(newslots.length);
+
commitNewMaxSize();
//commit new to slots
for(Slot slot:newslots) {
buffer.putSlot(slot);
}
commitNewMaxSize();
//commit new to slots
for(Slot slot:newslots) {
buffer.putSlot(slot);
}
+ sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
}
private int expectedsize, currmaxsize;
private void checkNumSlots(int numslots) {
if (numslots != expectedsize)
}
private int expectedsize, currmaxsize;
private void checkNumSlots(int numslots) {
if (numslots != expectedsize)
- throw new Error("Server Error: Server did not send all slots
"
);
+ throw new Error("Server Error: Server did not send all slots
. Expected: "+expectedsize+" Received:"+numslots
);
}
private void initExpectedSize() {
}
private void initExpectedSize() {
- expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
+ long prevslots = sequencenumber;
+ expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
currmaxsize = numslots;
}
currmaxsize = numslots;
}
@@
-226,7
+249,7
@@
final public class Table {
}
private void processEntry(LastMessage entry, SlotIndexer indexer) {
}
private void processEntry(LastMessage entry, SlotIndexer indexer) {
- updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
+ updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry
, false
);
}
private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
}
private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
@@
-253,7
+276,7
@@
final public class Table {
lastTableStatus = entry;
}
lastTableStatus = entry;
}
- private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+ private void updateLastMessage(long machineid, long seqnum, Liveness liveness
, boolean isput
) {
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;
@@
-269,7
+292,7
@@
final public class Table {
}
if (machineid == localmachineid) {
}
if (machineid == localmachineid) {
- if (lastmsgseqnum != seqnum)
+ if (lastmsgseqnum != seqnum
&& !isput
)
throw new Error("Server Error: Mismatch on local machine sequence number");
} else {
if (lastmsgseqnum > seqnum)
throw new Error("Server Error: Mismatch on local machine sequence number");
} else {
if (lastmsgseqnum > seqnum)
@@
-277,8
+300,8
@@
final public class Table {
}
}
}
}
- private void processSlot(SlotIndexer indexer, Slot slot) {
- updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
+ private void processSlot(SlotIndexer indexer, Slot slot
, boolean isput
) {
+ updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot
, isput
);
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {
diff --git
a/src/java/iotcloud/Test.java
b/src/java/iotcloud/Test.java
index 5ecd9d21844aee425f472bb1053dd286dd55723f..97057dd11cc802f0b267c51e8ee76fe6069e0abc 100644
(file)
--- a/
src/java/iotcloud/Test.java
+++ b/
src/java/iotcloud/Test.java
@@
-14,6
+14,7
@@
public class Test {
IoTString ib=new IoTString(b);
t1.put(ia, ia);
t2.put(ib, ib);
IoTString ib=new IoTString(b);
t1.put(ia, ia);
t2.put(ib, ib);
+ t1.update();
System.out.println(ib+"->"+t1.get(ib));
System.out.println(ia+"->"+t2.get(ia));
}
System.out.println(ib+"->"+t1.get(ib));
System.out.println(ia+"->"+t2.get(ia));
}
diff --git
a/src/java/iotcloud/TestCloudComm.java
b/src/java/iotcloud/TestCloudComm.java
index ce5b9ba980f9a501ea015726b6073e81f53c84dc..6c1831491166b883c98fbe678e880b08f3135b49 100644
(file)
--- a/
src/java/iotcloud/TestCloudComm.java
+++ b/
src/java/iotcloud/TestCloudComm.java
@@
-12,7
+12,8
@@
class TestCloudComm extends CloudComm {
}
public synchronized Slot[] putSlot(Slot slot, int max) {
}
public synchronized Slot[] putSlot(Slot slot, int max) {
- if (buffer.getNewestSeqNum()+1 == slot.getSequenceNumber()) {
+ if ((buffer.size()==0 && 1 == slot.getSequenceNumber()) ||
+ buffer.getNewestSeqNum()+1 == slot.getSequenceNumber()) {
if (max!=0)
buffer.resize(max);
buffer.putSlot(slot);
if (max!=0)
buffer.resize(max);
buffer.putSlot(slot);