package iotcloud;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Arrays;
-import javax.crypto.spec.*;
-import javax.crypto.*;
-
-public class Table {
- int numslots;
- HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
- HashMap<Long, Pair<Long, Liveness>> lastmessagetable=new HashMap<Long, Pair<Long, Liveness>>();
- SlotBuffer buffer;
- CloudComm cloud;
- private Mac hmac;
- long sequencenumber;
- long localmachineid;
-
+import java.util.Vector;
+
+/**
+ * IoTTable data structure. Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+
+final public class Table {
+ private int numslots;
+ private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
+ private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+ private SlotBuffer buffer;
+ private CloudComm cloud;
+ private long sequencenumber;
+ private long localmachineid;
+ private TableStatus lastTableStatus;
+ static final int FREE_SLOTS = 10;
+ static final int FORCED_RESIZE_INCREMENT = 20;
+
public Table(String baseurl, String password, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
- sequencenumber = 1;
- initCloud(baseurl, password);
+ numslots = buffer.capacity();
+ sequencenumber = 0;
+ cloud=new CloudComm(baseurl, password);
}
- private void initCloud(String baseurl, String password) {
- try {
- SecretKeySpec secret=getKey(password);
- Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
- encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
- Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
- decryptCipher.init(Cipher.DECRYPT_MODE, secret);
- hmac = Mac.getInstance("HmacSHA256");
- hmac.init(secret);
- cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
- } catch (Exception e) {
- throw new Error("Failed To Initialize Ciphers");
- }
+ public Table(CloudComm _cloud, long _localmachineid) {
+ localmachineid=_localmachineid;
+ buffer = new SlotBuffer();
+ numslots = buffer.capacity();
+ sequencenumber = 0;
+ cloud=_cloud;
}
- private SecretKeySpec getKey(String password) {
- try {
- PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
- SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
- SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
- return secret;
- } catch (Exception e) {
- throw new Error("Failed generating key.");
- }
+ public void rebuild() {
+ Slot[] newslots=cloud.getSlots(sequencenumber+1);
+ validateandupdate(newslots, true);
}
public void update() {
- Slot[] newslots=cloud.getSlots(sequencenumber);
- validateandupdate(newslots);
+ Slot[] newslots=cloud.getSlots(sequencenumber+1);
+
+ validateandupdate(newslots, false);
}
public IoTString get(IoTString key) {
return null;
}
+ public void initTable() {
+ Slot s=new Slot(1, localmachineid);
+ TableStatus status=new TableStatus(s, numslots);
+ s.addEntry(status);
+ Slot[] array=cloud.putSlot(s, numslots);
+ if (array == null) {
+ array = new Slot[] {s};
+ /* update data structure */
+ validateandupdate(array, true);
+ } else {
+ throw new Error("Error on initialization");
+ }
+ }
+
public IoTString put(IoTString key, IoTString value) {
- return null;
+ while(true) {
+ KeyValue oldvalue=table.get(key);
+ if (tryput(key, value, false)) {
+ if (oldvalue==null)
+ return null;
+ else
+ return oldvalue.getValue();
+ }
+ }
+ }
+
+ private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
+ Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ long seqn = buffer.getOldestSeqNum();
+
+ if (forcedresize) {
+ TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
+ s.addEntry(status);
+ }
+
+ if ((numslots - buffer.size()) < FREE_SLOTS) {
+ /* have to check whether we have enough free slots */
+ long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
+ seqn = fullfirstseqn < 1?1:fullfirstseqn;
+ for(int i=0; i < FREE_SLOTS; i++, seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ if (!prevslot.isLive())
+ continue;
+ Vector<Entry> liveentries = prevslot.getLiveEntries();
+ for(Entry liveentry:liveentries) {
+ if (redundant(liveentry))
+ continue;
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else if (i==0) {
+ if (s.canFit(liveentry))
+ s.addEntry(liveentry);
+ else if (!forcedresize) {
+ return tryput(key, value, true);
+ }
+ }
+ }
+ }
+ }
+ KeyValue kv=new KeyValue(s, key, value);
+ boolean insertedkv=false;
+ if (s.hasSpace(kv)) {
+ s.addEntry(kv);
+ insertedkv=true;
+ }
+
+ long newestseqnum=buffer.getNewestSeqNum();
+search:
+ for(; seqn<=newestseqnum; seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ if (!prevslot.isLive())
+ continue;
+ Vector<Entry> liveentries = prevslot.getLiveEntries();
+ for(Entry liveentry:liveentries) {
+ if (redundant(liveentry))
+ continue;
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else
+ break search;
+ }
+ }
+
+ int max=0;
+ if (forcedresize)
+ max = numslots + FORCED_RESIZE_INCREMENT;
+ Slot[] array=cloud.putSlot(s, max);
+ if (array == null)
+ array = new Slot[] {s};
+ else
+ insertedkv=false;
+
+ /* update data structure */
+ validateandupdate(array, true);
+
+ return insertedkv;
+ }
+
+ boolean redundant(Entry liveentry) {
+ if (liveentry.getType()==Entry.TypeLastMessage) {
+ LastMessage lastmsg=(LastMessage) liveentry;
+ return lastmsg.getMachineID() == localmachineid;
+ }
+ return false;
}
-
- void validateandupdate(Slot[] newslots) {
- //The cloud communication layer has checked slot HMACs already
- //before decoding
+
+
+ private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+ /* The cloud communication layer has checked slot HMACs already
+ before decoding */
if (newslots.length==0)
return;
long firstseqnum=newslots[0].getSequenceNumber();
- if (firstseqnum < sequencenumber)
+ if (firstseqnum <= sequencenumber)
throw new Error("Server Error: Sent older slots!");
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
checkHMACChain(indexer, newslots);
+
+ HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+
+ initExpectedSize();
for(Slot slot: newslots) {
- processSlot(indexer, slot);
+ updateExpectedSize();
+ processSlot(indexer, slot, acceptupdatestolocal, machineSet);
}
-
+
+ /* If there is a gap, check to see if the server sent us everything. */
+ if (firstseqnum != (sequencenumber+1)) {
+ checkNumSlots(newslots.length);
+ if (!machineSet.isEmpty())
+ throw new Error("Missing record for machines: "+machineSet);
+ }
+
+ commitNewMaxSize();
+
+ /* Commit new to slots. */
+ for(Slot slot:newslots) {
+ buffer.putSlot(slot);
+ }
+ sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+ }
+
+ private int expectedsize, currmaxsize;
+
+ private void checkNumSlots(int numslots) {
+ if (numslots != expectedsize)
+ throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
+ }
+
+ private void initExpectedSize() {
+ long prevslots = sequencenumber;
+ expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
+ currmaxsize = numslots;
}
- void processEntry(KeyValue entry, SlotIndexer indexer) {
+ private void updateExpectedSize() {
+ expectedsize++;
+ if (expectedsize > currmaxsize)
+ expectedsize = currmaxsize;
+ }
+
+ private void updateCurrMaxSize(int newmaxsize) {
+ currmaxsize=newmaxsize;
+ }
+
+ private void commitNewMaxSize() {
+ if (numslots != currmaxsize)
+ buffer.resize(currmaxsize);
+
+ numslots=currmaxsize;
+ }
+
+ private void processEntry(KeyValue entry, SlotIndexer indexer) {
IoTString key=entry.getKey();
KeyValue oldvalue=table.get(key);
if (oldvalue != null) {
- oldvalue.decrementLiveCount();
+ oldvalue.setDead();
}
table.put(key, entry);
}
- void processEntry(LastMessage entry, SlotIndexer indexer) {
- updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
+ private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+ updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
}
- void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+ private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
long oldseqnum=entry.getOldSeqNum();
long newseqnum=entry.getNewSeqNum();
boolean isequal=entry.getEqual();
long machineid=entry.getMachineID();
- for(long seqnum=oldseqnum;seqnum<=newseqnum;seqnum++) {
+ for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
Slot slot=indexer.getSlot(seqnum);
if (slot != null) {
long slotmachineid=slot.getMachineID();
}
}
}
-
- void processEntry(TableStatus entry, SlotIndexer indexer, Slot slot) {
-
+
+ private void processEntry(TableStatus entry, SlotIndexer indexer) {
+ int newnumslots=entry.getMaxSlots();
+ updateCurrMaxSize(newnumslots);
+ if (lastTableStatus != null)
+ lastTableStatus.setDead();
+ lastTableStatus = entry;
}
- void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+ private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ machineSet.remove(machineid);
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;
long lastmsgseqnum = lastmsgentry.getFirst();
Liveness lastentry = lastmsgentry.getSecond();
if (lastentry instanceof LastMessage) {
- ((LastMessage)lastentry).decrementLiveCount();
+ ((LastMessage)lastentry).setDead();
} else if (lastentry instanceof Slot) {
- ((Slot)lastentry).decrementLiveCount();
+ ((Slot)lastentry).setDead();
} else {
throw new Error("Unrecognized type");
}
-
- //Check that nothing funny happened
+
if (machineid == localmachineid) {
- if (lastmsgseqnum != seqnum)
+ if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
throw new Error("Server Error: Mismatch on local machine sequence number");
} else {
if (lastmsgseqnum > seqnum)
- throw new Error("Server Error: Rolback on remote machine sequence number");
+ throw new Error("Server Error: Rollback on remote machine sequence number");
}
}
-
- void processSlot(SlotIndexer indexer, Slot slot) {
- updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
+
+ private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {
case Entry.TypeKeyValue:
processEntry((KeyValue)entry, indexer);
break;
+
case Entry.TypeLastMessage:
- processEntry((LastMessage)entry, indexer);
+ processEntry((LastMessage)entry, indexer, machineSet);
break;
+
case Entry.TypeRejectedMessage:
processEntry((RejectedMessage)entry, indexer);
break;
+
case Entry.TypeTableStatus:
- processEntry((TableStatus)entry, indexer, slot);
+ processEntry((TableStatus)entry, indexer);
break;
+
default:
throw new Error("Unrecognized type: "+entry.getType());
}
}
}
-
- void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
+
+ private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
for(int i=0; i < newslots.length; i++) {
Slot currslot=newslots[i];
Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
if (prevslot != null &&
!Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
- throw new Error("Server Error: Invalid HMAC Chain");
+ throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
}
}
}