Tex typo issues, Code
[iotcloud.git] / src2 / java / iotcloud / Table.java
index 7009ed9307aa4cd8a5a2f2756acb34d17a29ff9b..732867fc4cd7bc0f49286f202cd5b43f9b31f659 100644 (file)
@@ -8,6 +8,8 @@ import java.util.Vector;
 import java.util.Random;
 import java.util.Queue;
 import java.util.LinkedList;
+import java.util.List;
+
 /**
  * IoTTable data structure.  Provides client inferface.
  * @author Brian Demsky
@@ -43,6 +45,11 @@ final public class Table {
 
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
+       private List<Commit> commitList = null; // List of all the most recent live commits
+       private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+       private List<Transaction> uncommittedTransactionsList = null; //
+       private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
+       private Set<Abort> arbitratorTable = null; // Table of arbitrators
 
 
        public Table(String baseurl, String password, long _localmachineid) {
@@ -55,6 +62,10 @@ final public class Table {
                lastliveslotseqn = 1;
 
                pendingTransQueue = new LinkedList<PendingTransaction>();
+               commitList = new LinkedList<Commit>();
+               commitedTable = new HashMap<IoTString, KeyValue>();
+               uncommittedTransactionsList = new LinkedList<Transaction>();
+               arbitratorTable = new HashMap<IoTString, Long>();
        }
 
        public Table(CloudComm _cloud, long _localmachineid) {
@@ -66,6 +77,10 @@ final public class Table {
                cloud = _cloud;
 
                pendingTransQueue = new LinkedList<PendingTransaction>();
+               commitList = new LinkedList<Commit>();
+               commitedTable = new HashMap<IoTString, KeyValue>();
+               uncommittedTransactionsList = new LinkedList<Transaction>();
+               arbitratorTable = new HashMap<IoTString, Long>();
        }
 
        public void rebuild() {
@@ -133,11 +148,6 @@ final public class Table {
                pendingTransBuild.addKV(kv);
        }
 
-
-
-
-
-
        void decrementLiveCount() {
                liveslotcount--;
        }
@@ -239,7 +249,7 @@ final public class Table {
                boolean insertedTrans = false;
                if (s.hasSpace(trans)) {
                        s.addEntry(trans);
-                       insertedTrans=true;
+                       insertedTrans = true;
                }
 
                /* now go through live entries from least to greatest sequence number until
@@ -356,16 +366,7 @@ final public class Table {
                setResizeThreshold();
        }
 
-       // private void processEntry(KeyValue entry, SlotIndexer indexer) {
-       //      IoTString key=entry.getKey();
-       //      KeyValue oldvalue=table.get(key);
-       //      if (oldvalue != null) {
-       //              oldvalue.setDead();
-       //      }
-       //      table.put(key, entry);
-       // }
-
-       private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+       private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
                updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
 
@@ -404,6 +405,57 @@ final public class Table {
                        entry.setWatchSet(watchset);
        }
 
+       private void processEntry(NewKey entry) {
+               arbitratorTable.put(entry.getKey(), entry.getMachineID());
+       }
+
+       private void processEntry(Transaction entry) {
+               uncommittedTransactionsList.add(entry);
+       }
+
+       private void processEntry(Abort entry) {
+               for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
+                       Transaction prevtrans = i.next();
+                       if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
+                               uncommittedTransactionsList.remove(prevtrans);
+                               prevtrans.setDead();
+                               return;
+                       }
+               }
+       }
+
+       private void processEntry(Commit entry) {
+
+               for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+                       Commit prevcommit = i.next();
+                       prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+                       if (!prevcommit.isLive()) {
+                               commitList.remove(prevcommit);
+                       }
+               }
+
+               commitList.add(entry);
+
+               // Update the committed table list
+               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                       IoTString key = kv.getKey();
+                       commitedTable.put(key, kv);
+               }
+
+               long committedTransSeq = entry.getTransSequenceNumber();
+
+               // Make dead the transactions
+               for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
+                       Transaction prevtrans = i.next();
+
+                       if (prevtrans.getSequenceNumber() <= committedTransSeq) {
+                               uncommittedTransactionsList.remove(prevtrans);
+                               prevtrans.setDead();
+                       }
+               }
+       }
+
        private void addWatchList(long machineid, RejectedMessage entry) {
                HashSet<RejectedMessage> entries = watchlist.get(machineid);
                if (entries == null)
@@ -411,7 +463,7 @@ final public class Table {
                entries.add(entry);
        }
 
-       private void processEntry(TableStatus entry, SlotIndexer indexer) {
+       private void processEntry(TableStatus entry) {
                int newnumslots = entry.getMaxSlots();
                updateCurrMaxSize(newnumslots);
                if (lastTableStatus != null)
@@ -476,12 +528,25 @@ final public class Table {
                updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
                for (Entry entry : slot.getEntries()) {
                        switch (entry.getType()) {
-                       // case Entry.TypeKeyValue:
-                       //      processEntry((KeyValue)entry, indexer);
-                       //      break;
+
+                       case Entry.TypeNewKey:
+                               processEntry((NewKey)entry);
+                               break;
+
+                       case Entry.TypeCommit:
+                               processEntry((Commit)entry);
+                               break;
+
+                       case Entry.TypeAbort:
+                               processEntry((Abort)entry);
+                               break;
+
+                       case Entry.TypeTransaction:
+                               processEntry((Transaction)entry);
+                               break;
 
                        case Entry.TypeLastMessage:
-                               processEntry((LastMessage)entry, indexer, machineSet);
+                               processEntry((LastMessage)entry, machineSet);
                                break;
 
                        case Entry.TypeRejectedMessage:
@@ -489,7 +554,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeTableStatus:
-                               processEntry((TableStatus)entry, indexer);
+                               processEntry((TableStatus)entry);
                                break;
 
                        default: