import java.util.Random;
import java.util.Queue;
import java.util.LinkedList;
+import java.util.List;
+
/**
* IoTTable data structure. Provides client inferface.
* @author Brian Demsky
private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
+ private List<Commit> commitList = null; // List of all the most recent live commits
+ private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+ private List<Transaction> uncommittedTransactionsList = null; //
+ private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
+ private Set<Abort> arbitratorTable = null; // Table of arbitrators
public Table(String baseurl, String password, long _localmachineid) {
lastliveslotseqn = 1;
pendingTransQueue = new LinkedList<PendingTransaction>();
+ commitList = new LinkedList<Commit>();
+ commitedTable = new HashMap<IoTString, KeyValue>();
+ uncommittedTransactionsList = new LinkedList<Transaction>();
+ arbitratorTable = new HashMap<IoTString, Long>();
}
public Table(CloudComm _cloud, long _localmachineid) {
cloud = _cloud;
pendingTransQueue = new LinkedList<PendingTransaction>();
+ commitList = new LinkedList<Commit>();
+ commitedTable = new HashMap<IoTString, KeyValue>();
+ uncommittedTransactionsList = new LinkedList<Transaction>();
+ arbitratorTable = new HashMap<IoTString, Long>();
}
public void rebuild() {
pendingTransBuild.addKV(kv);
}
-
-
-
-
-
void decrementLiveCount() {
liveslotcount--;
}
boolean insertedTrans = false;
if (s.hasSpace(trans)) {
s.addEntry(trans);
- insertedTrans=true;
+ insertedTrans = true;
}
/* now go through live entries from least to greatest sequence number until
setResizeThreshold();
}
- // private void processEntry(KeyValue entry, SlotIndexer indexer) {
- // IoTString key=entry.getKey();
- // KeyValue oldvalue=table.get(key);
- // if (oldvalue != null) {
- // oldvalue.setDead();
- // }
- // table.put(key, entry);
- // }
-
- private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+ private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
}
entry.setWatchSet(watchset);
}
+ private void processEntry(NewKey entry) {
+ arbitratorTable.put(entry.getKey(), entry.getMachineID());
+ }
+
+ private void processEntry(Transaction entry) {
+ uncommittedTransactionsList.add(entry);
+ }
+
+ private void processEntry(Abort entry) {
+ for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
+ Transaction prevtrans = i.next();
+ if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
+ uncommittedTransactionsList.remove(prevtrans);
+ prevtrans.setDead();
+ return;
+ }
+ }
+ }
+
+ private void processEntry(Commit entry) {
+
+ for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+ Commit prevcommit = i.next();
+ prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+ if (!prevcommit.isLive()) {
+ commitList.remove(prevcommit);
+ }
+ }
+
+ commitList.add(entry);
+
+ // Update the committed table list
+ for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+ IoTString key = kv.getKey();
+ commitedTable.put(key, kv);
+ }
+
+ long committedTransSeq = entry.getTransSequenceNumber();
+
+ // Make dead the transactions
+ for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
+ Transaction prevtrans = i.next();
+
+ if (prevtrans.getSequenceNumber() <= committedTransSeq) {
+ uncommittedTransactionsList.remove(prevtrans);
+ prevtrans.setDead();
+ }
+ }
+ }
+
private void addWatchList(long machineid, RejectedMessage entry) {
HashSet<RejectedMessage> entries = watchlist.get(machineid);
if (entries == null)
entries.add(entry);
}
- private void processEntry(TableStatus entry, SlotIndexer indexer) {
+ private void processEntry(TableStatus entry) {
int newnumslots = entry.getMaxSlots();
updateCurrMaxSize(newnumslots);
if (lastTableStatus != null)
updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
for (Entry entry : slot.getEntries()) {
switch (entry.getType()) {
- // case Entry.TypeKeyValue:
- // processEntry((KeyValue)entry, indexer);
- // break;
+
+ case Entry.TypeNewKey:
+ processEntry((NewKey)entry);
+ break;
+
+ case Entry.TypeCommit:
+ processEntry((Commit)entry);
+ break;
+
+ case Entry.TypeAbort:
+ processEntry((Abort)entry);
+ break;
+
+ case Entry.TypeTransaction:
+ processEntry((Transaction)entry);
+ break;
case Entry.TypeLastMessage:
- processEntry((LastMessage)entry, indexer, machineSet);
+ processEntry((LastMessage)entry, machineSet);
break;
case Entry.TypeRejectedMessage:
break;
case Entry.TypeTableStatus:
- processEntry((TableStatus)entry, indexer);
+ processEntry((TableStatus)entry);
break;
default: