edits
authorbdemsky <bdemsky@uci.edu>
Sat, 20 Jan 2018 00:04:11 +0000 (16:04 -0800)
committerbdemsky <bdemsky@uci.edu>
Sat, 20 Jan 2018 00:04:11 +0000 (16:04 -0800)
version2/src/C/ByteBuffer.h
version2/src/C/CloudComm.h
version2/src/C/Slot.cc
version2/src/C/Slot.h
version2/src/C/Table.cc
version2/src/C/Table.h
version2/src/C/array.h
version2/src/C/common.h
version2/src/C/vector.h

index 086571b..a5b6dcf 100644 (file)
@@ -12,8 +12,10 @@ public:
        int32_t getInt();
        char get();
        void get(Array<char> * array);
+       void position(int32_t newPosition);
        Array<char> * array();
  private:
 };
 ByteBuffer * ByteBuffer_wrap(Array<char> * array);
+ByteBuffer * ByteBuffer_allocate(uint size);
 #endif
index bb319c2..3e0a56e 100644 (file)
@@ -26,10 +26,10 @@ private:
        SecureRandom *random;
        Array<char> *salt;
        Table *table;
-       int32_t listeningPort = -1;
-       Thread *localServerThread = NULL;
-       bool doEnd = false;
-       TimingSingleton *timer = NULL;
+       int32_t listeningPort;
+       Thread *localServerThread;
+       bool doEnd;
+       TimingSingleton *timer;
 
        /**
         * Generates Key from password.
@@ -89,6 +89,6 @@ public:
         */
 
        Array<char> *sendLocalData(Array<char> *sendData, int64_t localSequenceNumber, IoTString *host, int port);
-       public void close();
+       void close();
 };
 #endif
index 2ee6981..542705e 100644 (file)
@@ -1,25 +1,49 @@
 #include "Slot.h"
-
-Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber) {
-       seqnum = _seqnum;
-       machineid = _machineid;
-       prevhmac = _prevhmac;
-       hmac = _hmac;
-       entries = new Vector<Entry *>();
-       livecount = 1;
-       seqnumlive = true;
-       freespace = SLOT_SIZE - getBaseSize();
-       table = _table;
-       localSequenceNumber = _localSequenceNumber;
+#include "ByteBuffer.h"
+#include "Entry.h"
+#include "Error.h"
+#include "CloudComm.h"
+#include "Table.h"
+#include "LastMessage.h"
+
+Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, Array<char> *_hmac, int64_t _localSequenceNumber) :
+       seqnum(_seqnum),
+       prevhmac(_prevhmac),
+       hmac(_hmac),
+       machineid(_machineid),
+       entries(new Vector<Entry *>()),
+       livecount(1),
+       seqnumlive(true),
+       freespace(SLOT_SIZE - getBaseSize()),
+       table(_table),
+       localSequenceNumber(_localSequenceNumber) {
 }
 
-Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber) {
-       this(_table, _seqnum, _machineid, _prevhmac, NULL, _localSequenceNumber);
-}
+Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, int64_t _localSequenceNumber) :
+       seqnum(_seqnum),
+       prevhmac(_prevhmac),
+       hmac(NULL),
+       machineid(_machineid),
+       entries(new Vector<Entry *>()),
+       livecount(1),
+       seqnumlive(true),
+       freespace(SLOT_SIZE - getBaseSize()),
+       table(_table),
+       localSequenceNumber(_localSequenceNumber) {
+       }
 
-Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) {
-       this(_table, _seqnum, _machineid, new char[HMAC_SIZE], NULL, _localSequenceNumber);
-}
+Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) :
+       seqnum(_seqnum),
+       prevhmac(new Array<char>(HMAC_SIZE)),
+       hmac(NULL),
+       machineid(_machineid),
+       entries(new Vector<Entry *>()),
+       livecount(1),
+       seqnumlive(true),
+       freespace(SLOT_SIZE - getBaseSize()),
+       table(_table),
+       localSequenceNumber(_localSequenceNumber) {
+       }
 
 Entry *Slot::addEntry(Entry *e) {
        e = e->getCopy(this);
@@ -54,32 +78,32 @@ Vector<Entry *> *Slot::getEntries() {
        return entries;
 }
 
-Slot *Slotdecode(Table *table, char *array, Mac *mac) {
+Slot *Slotdecode(Table *table, Array<char> *array, Mac *mac) {
        mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE);
-       char *realmac = mac->doFinal();
+       Array<char> *realmac = mac->doFinal();
 
        ByteBuffer *bb = ByteBuffer_wrap(array);
-       char *hmac = new char[HMAC_SIZE];
-       char *prevhmac = new char[HMAC_SIZE];
+       Array<char> *hmac = new Array<char>(HMAC_SIZE);
+       Array<char> *prevhmac = new Array<char>(HMAC_SIZE);
        bb->get(hmac);
        bb->get(prevhmac);
-       if (!Arrays.equals(realmac, hmac))
+       if (!realmac->equals(hmac))
                throw new Error("Server Error: Invalid HMAC!  Potential Attack!");
 
        int64_t seqnum = bb->getLong();
        int64_t machineid = bb->getLong();
        int numentries = bb->getInt();
-       Slot slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1);
+       Slot *slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1);
 
        for (int i = 0; i < numentries; i++) {
-               slot->addShallowEntry(Entry->decode(slot, bb));
+               slot->addShallowEntry(Entry_decode(slot, bb));
        }
 
        return slot;
 }
 
-char *Slot::encode(Mac * mac) {
-       char *array = new char[SLOT_SIZE];
+Array<char> *Slot::encode(Mac * mac) {
+       Array<char> *array = new Array<char>(SLOT_SIZE);
        ByteBuffer *bb = ByteBuffer_wrap(array);
        /* Leave space for the slot HMAC.  */
        bb->position(HMAC_SIZE);
@@ -87,12 +111,12 @@ char *Slot::encode(Mac * mac) {
        bb->putLong(seqnum);
        bb->putLong(machineid);
        bb->putInt(entries->size());
-       for (Entry entry : entries) {
+       for (Entry *entry : entries) {
                entry->encode(bb);
        }
        /* Compute our HMAC */
        mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE);
-       char *realmac = mac->doFinal();
+       Array<char> *realmac = mac->doFinal();
        hmac = realmac;
        bb->position(0);
        bb->put(realmac);
@@ -110,7 +134,7 @@ Vector<Entry *> *Slot::getLiveEntries(bool resize) {
        Vector<Entry *> *liveEntries = new Vector<Entry *>();
        for (Entry *entry : entries) {
                if (entry->isLive()) {
-                       if (!resize || entry->getType() != Entry->TypeTableStatus)
+                       if (!resize || entry->getType() != TypeTableStatus)
                                liveEntries->add(entry);
                }
        }
@@ -143,8 +167,8 @@ void Slot::decrementLiveCount() {
        }
 }
 
-char *Slot::getSlotCryptIV() {
-       ByteBuffer *buffer = ByteBuffer_allocate(CloudComm.IV_SIZE);
+Array<char> *Slot::getSlotCryptIV() {
+       ByteBuffer *buffer = ByteBuffer_allocate(CloudComm_IV_SIZE);
        buffer->putLong(machineid);
        int64_t localSequenceNumberShift = localSequenceNumber << 16;
        buffer->putLong(localSequenceNumberShift);
index 76e0fb4..4ffe205 100644 (file)
@@ -9,13 +9,13 @@
 #define HMAC_SIZE 32
 
 class Slot : public Liveness {
-private:
+ private:
        /** Sequence number of the slot. */
        int64_t seqnum;
        /** HMAC of previous slot. */
-       char *prevhmac;
+       Array<char> *prevhmac;
        /** HMAC of this slot. */
-       char *hmac;
+       Array<char> *hmac;
        /** Machine that sent this slot. */
        int64_t machineid;
        /** Vector of entries in this slot. */
@@ -34,17 +34,17 @@ private:
        void addShallowEntry(Entry *e);
 
 public:
-       Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber);
-       Slot(Table _table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber);
-       Slot(Table _table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber);
+       Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, Array<char> *_hmac, int64_t _localSequenceNumber);
+       Slot(Table * _table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, int64_t _localSequenceNumber);
+       Slot(Table _table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber);
 
-       char *getHMAC() { return hmac; }
-       char *getPrevHMAC() { return prevhmac; }
+       Array<char> *getHMAC() { return hmac; }
+       Array<char> *getPrevHMAC() { return prevhmac; }
        Entry *addEntry(Entry *e);
        void removeEntry(Entry *e);
        bool hasSpace(Entry *e);
        Vector<Entry *> *getEntries();
-       char *encode(Mac *mac);
+       Array<char> *encode(Mac *mac);
        int getBaseSize() { return 2 * HMAC_SIZE + 2 * sizeof(int64_t) + sizeof(int); }
        Vector<Entry *> *getLiveEntries(bool resize);
        int64_t getSequenceNumber() { return seqnum; }
@@ -52,8 +52,9 @@ public:
        void setDead();
        void decrementLiveCount();
        bool isLive() { return livecount > 0; }
-       char *getSlotCryptIV();
+       Array<char> *getSlotCryptIV();
+       friend Slot *Slotdecode(Table *table, Array<char> *array, Mac *mac);
 };
 
-Slot *Slotdecode(Table *table, char *array, Mac *mac);
+Slot *Slotdecode(Table *table, Array<char> *array, Mac *mac);
 #endif
index 1148e93..562e164 100644 (file)
 #include "Table.h"
 
-Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) {
-       localMachineId = _localMachineId;
-       cloud = new CloudComm(this, baseurl, password, listeningPort);
-
+Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) :
+       buffer(NULL),
+       cloud(new CloudComm(this, baseurl, password, listeningPort)),
+       random(NULL),
+       liveTableStatus(NULL),
+       pendingTransactionBuilder(NULL),
+       lastPendingTransactionSpeculatedOn(NULL),
+       firstPendingTransaction(NULL),
+       numberOfSlots(0),
+       bufferResizeThreshold(0),
+       liveSlotCount(0),
+       oldestLiveSlotSequenceNumber(0),
+       localMachineId(_localMachineId),
+       sequenceNumber(0),
+       localTransactionSequenceNumber(0),
+       lastTransactionSequenceNumberSpeculatedOn(0),
+       oldestTransactionSequenceNumberSpeculatedOn(0),
+       localArbitrationSequenceNumber(0),
+       hadPartialSendToServer(false),
+       attemptedToSendToServer(false),
+       expectedSize(0),
+       didFindTableStatus(false),
+       currMaxSize(0),
+       lastSlotAttemptedToSend(NULL),
+       lastIsNewKey(false),
+       lastNewSize(0),
+       lastTransactionPartsSent(NULL),
+       lastPendingSendArbitrationEntriesToDelete(NULL),
+       lastNewKey(NULL),
+       committedKeyValueTable(NULL),
+       speculatedKeyValueTable(NULL),
+       pendingTransactionSpeculatedKeyValueTable(NULL),
+  liveNewKeyTable(NULL),
+       lastMessageTable(NULL),
+       rejectedMessageWatchVectorTable(NULL),
+       arbitratorTable(NULL),
+       liveAbortTable(NULL),
+       newTransactionParts(NULL),
+       newCommitParts(NULL),
+       lastArbitratedTransactionNumberByArbitratorTable(NULL),
+       liveTransactionBySequenceNumberTable(NULL),
+       liveTransactionByTransactionIdTable(NULL),
+       liveCommitsTable(NULL),
+       liveCommitsByKeyTable(NULL),
+       lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+       rejectedSlotVector(NULL),
+       pendingTransactionQueue(NULL),
+       pendingSendArbitrationRounds(NULL),
+       pendingSendArbitrationEntriesToDelete(NULL),
+       transactionPartsSent(NULL),
+       outstandingTransactionStatus(NULL),
+       liveAbortsGeneratedByLocal(NULL),
+       offlineTransactionsCommittedAndAtServer(NULL),
+       localCommunicationTable(NULL),
+       lastTransactionSeenFromMachineFromServer(NULL),
+       lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+       lastInsertedNewKey(false),
+       lastSeqNumArbOn(0)
+{
        init();
 }
 
-Table::Table(CloudComm _cloud, int64_t _localMachineId) {
-       localMachineId = _localMachineId;
-       cloud = _cloud;
-
+Table::Table(CloudComm _cloud, int64_t _localMachineId) :
+       buffer(NULL),
+       cloud(_cloud),
+       random(NULL),
+       liveTableStatus(NULL),
+       pendingTransactionBuilder(NULL),
+       lastPendingTransactionSpeculatedOn(NULL),
+       firstPendingTransaction(NULL),
+       numberOfSlots(0),
+       bufferResizeThreshold(0),
+       liveSlotCount(0),
+       oldestLiveSlotSequenceNumber(0),
+       localMachineId(_localMachineId),
+       sequenceNumber(0),
+       localTransactionSequenceNumber(0),
+       lastTransactionSequenceNumberSpeculatedOn(0),
+       oldestTransactionSequenceNumberSpeculatedOn(0),
+       localArbitrationSequenceNumber(0),
+       hadPartialSendToServer(false),
+       attemptedToSendToServer(false),
+       expectedSize(0),
+       didFindTableStatus(false),
+       currMaxSize(0),
+       lastSlotAttemptedToSend(NULL),
+       lastIsNewKey(false),
+       lastNewSize(0),
+       lastTransactionPartsSent(NULL),
+       lastPendingSendArbitrationEntriesToDelete(NULL),
+       lastNewKey(NULL),
+       committedKeyValueTable(NULL),
+       speculatedKeyValueTable(NULL),
+       pendingTransactionSpeculatedKeyValueTable(NULL),
+  liveNewKeyTable(NULL),
+       lastMessageTable(NULL),
+       rejectedMessageWatchVectorTable(NULL),
+       arbitratorTable(NULL),
+       liveAbortTable(NULL),
+       newTransactionParts(NULL),
+       newCommitParts(NULL),
+       lastArbitratedTransactionNumberByArbitratorTable(NULL),
+       liveTransactionBySequenceNumberTable(NULL),
+       liveTransactionByTransactionIdTable(NULL),
+       liveCommitsTable(NULL),
+       liveCommitsByKeyTable(NULL),
+       lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+       rejectedSlotVector(NULL),
+       pendingTransactionQueue(NULL),
+       pendingSendArbitrationRounds(NULL),
+       pendingSendArbitrationEntriesToDelete(NULL),
+       transactionPartsSent(NULL),
+       outstandingTransactionStatus(NULL),
+       liveAbortsGeneratedByLocal(NULL),
+       offlineTransactionsCommittedAndAtServer(NULL),
+       localCommunicationTable(NULL),
+       lastTransactionSeenFromMachineFromServer(NULL),
+       lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+       lastInsertedNewKey(false),
+       lastSeqNumArbOn(0)
+{
        init();
 }
 
@@ -32,7 +142,7 @@ void Table::init() {
        pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
        liveNewKeyTable = new Hashtable<IoTString, NewKey>();
        lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
-       rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
+       rejectedMessageWatchVectorTable = new Hashtable<int64_t Hashset<RejectedMessage> >();
        arbitratorTable = new Hashtable<IoTString, Long>();
        liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
        newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
@@ -49,7 +159,7 @@ void Table::init() {
        transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
        outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
        liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
-       offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
+       offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
        localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
        lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
        pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
@@ -66,7 +176,7 @@ synchronized void Table::printSlots() {
        int64_t o = buffer.getOldestSeqNum();
        int64_t n = buffer.getNewestSeqNum();
 
-       int[] types = new int[10];
+       Array<int> * types = new Array<int>(10);
 
        int num = 0;
 
@@ -152,10 +262,11 @@ synchronized void Table::initTable() {
        localSequenceNumber++;
        TableStatus status = new TableStatus(s, numberOfSlots);
        s.addEntry(status);
-       Slot[] array = cloud.putSlot(s, numberOfSlots);
+       Array<Slot> * array = cloud.putSlot(s, numberOfSlots);
 
        if (array == NULL) {
-               array = new Slot[] {s};
+               array = new Array<Slot>(1);
+               array->set(0, s);
                // update local block chain
                validateAndUpdate(array, true);
        } else if (array.length == 1) {
@@ -171,7 +282,7 @@ synchronized void Table::initTable() {
  */
 synchronized void Table::rebuild() {
        // Just pull the latest slots from the server
-       Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
+       Array<Slot> * newslots = cloud.getSlots(sequenceNumber + 1);
        validateAndUpdate(newslots, true);
        sendToServer(NULL);
        updateLiveTransactionsAndStatus();
@@ -286,7 +397,7 @@ synchronized IoTString Table::getSpeculativeAtomic(IoTString key) {
 
 synchronized bool Table::update()  {
        try {
-               Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+               Array<Slot> * newSlots = cloud.getSlots(sequenceNumber + 1);
                validateAndUpdate(newSlots, false);
                sendToServer(NULL);
 
@@ -376,7 +487,7 @@ synchronized TransactionStatus Table::commitTransaction() {
                sendToServer(NULL);
        } catch (ServerException e) {
 
-               Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
+               Set<Long> arbitratorTriedAndFailed = new Hashset<Long>();
                for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
                        Transaction transaction = iter.next();
 
@@ -442,10 +553,10 @@ bool Table::sendToServer(NewKey newKey) {
 
        try {
                if (hadPartialSendToServer) {
-                       Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+                       Array<Slot> * newSlots = cloud.getSlots(sequenceNumber + 1);
                        if (newSlots.length == 0) {
                                fromRetry = true;
-                               ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+                               ThreeTuple<bool, bool, Array<Slot>*> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
 
                                if (sendSlotsReturn.getFirst()) {
                                        if (newKey != NULL) {
@@ -682,7 +793,7 @@ bool Table::sendToServer(NewKey newKey) {
                        lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
 
 
-                       ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
+                       ThreeTuple<bool, bool, Array<Slot>*> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
 
                        if (sendSlotsReturn.getFirst()) {
 
@@ -1061,16 +1172,17 @@ synchronized Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
        return returnData;
 }
 
-ThreeTuple<bool, bool, Slot[]> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) {
+ThreeTuple<bool, bool, Array<Slot>*> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) {
        bool attemptedToSendToServerTmp = attemptedToSendToServer;
        attemptedToSendToServer = true;
 
        bool inserted = false;
        bool lastTryInserted = false;
 
-       Slot[] array = cloud.putSlot(slot, newSize);
+       Array<Slot>* array = cloud.putSlot(slot, newSize);
        if (array == NULL) {
-               array = new Slot[] {slot};
+               array = new Array<Slot>();
+               array->set(0, slot);
                rejectedSlotVector.clear();
                inserted = true;
        } else {
@@ -1120,7 +1232,7 @@ ThreeTuple<bool, bool, Slot[]> Table::sendSlotsToServer(Slot slot, int newSize,
                }
        }
 
-       return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
+       return new ThreeTuple<bool, bool, Array<Slot>*>(inserted, lastTryInserted, array);
 }
 
 /**
@@ -1364,7 +1476,7 @@ search:
 /**
  * Checks for malicious activity and updates the local copy of the block chain.
  */
-void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
+void Table::validateAndUpdate(Array<Slot>* newSlots, bool acceptUpdatesToLocal) {
 
        // The cloud communication layer has checked slot HMACs already before decoding
        if (newSlots.length == 0) {
@@ -1385,7 +1497,7 @@ void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
        checkHMACChain(indexer, newSlots);
 
        // Set to keep track of messages from clients
-       HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
+       Hashset<Long> machineSet = new Hashset<int64_t>(lastMessageTable.keySet());
 
        // Process each slots data
        for (Slot slot : newSlots) {
@@ -1595,7 +1707,7 @@ void Table::arbitrateFromServer() {
 
        // The last transaction arbitrated on
        int64_t lastTransactionCommitted = -1;
-       Set<Abort> generatedAborts = new HashSet<Abort>();
+       Set<Abort> generatedAborts = new Hashset<Abort>();
 
        for (Long transactionSequenceNumber : transactionSequenceNumbers) {
                Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
@@ -1746,7 +1858,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction transaction) {
                newCommit.createCommitParts();
 
                // Append all the commit parts to the end of the pending queue waiting for sending to the server
-               ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
+               ArbitrationRound * arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
                pendingSendArbitrationRounds.add(arbitrationRound);
 
                if (compactArbitrationData()) {
@@ -1781,7 +1893,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction transaction) {
                                status.setStatus(TransactionStatus.StatusAborted);
                        }
                } else {
-                       Set addAbortSet = new HashSet<Abort>();
+                       Hashset<Abort*> addAbortSet = new Hashset<Abort* >();
 
 
                        // Create the abort
@@ -2029,7 +2141,7 @@ bool Table::updateCommittedTable() {
                        // If we got here then this is a brand new commit and needs full processing
 
                        // Get what commits should be edited, these are the commits that have live values for their keys
-                       Set<Commit> commitsToEdit = new HashSet<Commit>();
+                       Hashset<Commit*> * commitsToEdit = new Hashset<Commit*>();
                        for (KeyValue kv : commit.getKeyValueUpdateSet()) {
                                commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
                        }
@@ -2114,7 +2226,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
                return false;           // did not speculate
        }
 
-       Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
+       Hashset<int64_t> * incompleteTransactionArbitrator = new Hashset<int64_t>();
        bool didSkip = true;
 
        for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
@@ -2234,7 +2346,7 @@ void Table::updateLiveTransactionsAndStatus() {
 /**
  * Process this slot, entry by entry.  Also update the latest message sent by slot
  */
-void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
+void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> * machineSet) {
 
        // Update the last message seen
        updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
@@ -2280,7 +2392,7 @@ void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLoca
 /**
  * Update the last message that was sent for a machine Id
  */
-void Table::processEntry(LastMessage entry, HashSet<Long> machineSet) {
+void Table::processEntry(LastMessage entry, Hashset<int64_t>* machineSet) {
        // Update what the last message received by a machine was
        updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
 }
@@ -2350,7 +2462,7 @@ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) {
 
 
        // Create a list of clients to watch until they see this rejected message entry.
-       HashSet<Long> deviceWatchSet = new HashSet<Long>();
+       Hashset<int64_t>* deviceWatchSet = new Hashset<int64_t>();
        for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
 
                // Machine ID for the last message entry
@@ -2524,13 +2636,13 @@ void Table::processEntry(CommitPart entry) {
  * Check that the last message seen is correct and that there is no mismatch of our own last message or that
  * other clients have not had a rollback on the last message.
  */
-void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
+void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> * machineSet) {
 
        // We have seen this machine ID
        machineSet.remove(machineId);
 
        // Get the set of rejected messages that this machine Id is has not seen yet
-       HashSet<RejectedMessage> watchset = rejectedMessageWatchVectorTable.get(machineId);
+       Hashset<RejectedMessage*>* watchset = rejectedMessageWatchVectorTable.get(machineId);
 
        // If there is a rejected message that this machine Id has not seen yet
        if (watchset != NULL) {
@@ -2627,10 +2739,10 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene
  * rejected message entry and which have not.
  */
 void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
-       HashSet<RejectedMessage> entries = rejectedMessageWatchVectorTable.get(machineId);
+       Hashset<RejectedMessage*>* entries = rejectedMessageWatchVectorTable.get(machineId);
        if (entries == NULL) {
                // There is no set for this machine ID yet so create one
-               entries = new HashSet<RejectedMessage>();
+               entries = new Hashset<RejectedMessage*>();
                rejectedMessageWatchVectorTable.put(machineId, entries);
        }
        entries.add(entry);
@@ -2639,7 +2751,7 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
 /**
  * Check if the HMAC chain is not violated
  */
-void Table::checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
+void Table::checkHMACChain(SlotIndexer indexer, Array<Slot> * newSlots) {
        for (int i = 0; i < newSlots.length; i++) {
                Slot currSlot = newSlots[i];
                Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
index 3bc9cb9..7944347 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef Table_H
 #define Table_H
-
+#include "common.h"
+#include "Pair.h"
+#include "ThreeTuple.h"
 /**
  * IoTTable data structure.  Provides client interface.
  * @author Brian Demsky
@@ -19,77 +21,81 @@ class Table {
 private:
        /* Helper Objects */
        SlotBuffer *buffer;
-       CloudComm *cloud = NULL;
-       Random *random = NULL;
-       TableStatus *liveTableStatus = NULL;
-       PendingTransaction *pendingTransactionBuilder = NULL;   // Pending Transaction used in building a Pending Transaction
-       Transaction *lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
-       Transaction *firstPendingTransaction = NULL;    // first transaction in the pending transaction list
+       CloudComm *cloud;
+       Random *random;
+       TableStatus *liveTableStatus;
+       PendingTransaction *pendingTransactionBuilder;  // Pending Transaction used in building a Pending Transaction
+       Transaction *lastPendingTransactionSpeculatedOn;        // Last transaction that was speculated on from the pending transaction
+       Transaction *firstPendingTransaction;   // first transaction in the pending transaction list
 
        /* Variables */
-       int numberOfSlots = 0;  // Number of slots stored in buffer
-       int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
-       int64_t liveSlotCount = 0;// Number of currently live slots
-       int64_t oldestLiveSlotSequenceNumver = 0;       // Smallest sequence number of the slot with a live entry
-       int64_t localMachineId = 0;     // Machine ID of this client device
-       int64_t sequenceNumber = 0;     // Largest sequence number a client has received
-       int64_t localSequenceNumber = 0;
+       int numberOfSlots;      // Number of slots stored in buffer
+       int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
+       int64_t liveSlotCount;// Number of currently live slots
+       int64_t oldestLiveSlotSequenceNumver;   // Smallest sequence number of the slot with a live entry
+       int64_t localMachineId; // Machine ID of this client device
+       int64_t sequenceNumber; // Largest sequence number a client has received
+       int64_t localSequenceNumber;
 
        //  int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
        //  int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
-       int64_t localTransactionSequenceNumber = 0;     // Local sequence number counter for transactions
-       int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
-       int64_t oldestTransactionSequenceNumberSpeculatedOn = -1;       // the oldest transaction that was speculated on
-       int64_t localArbitrationSequenceNumber = 0;
-       bool hadPartialSendToServer = false;
-       bool attemptedToSendToServer = false;
+       int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
+       int64_t lastTransactionSequenceNumberSpeculatedOn;      // the last transaction that was speculated on
+       int64_t oldestTransactionSequenceNumberSpeculatedOn;    // the oldest transaction that was speculated on
+       int64_t localArbitrationSequenceNumber;
+       bool hadPartialSendToServer;
+       bool attemptedToSendToServer;
        int64_t expectedsize;
-       bool didFindTableStatus = false;
-       int64_t currMaxSize = 0;
+       bool didFindTableStatus;
+       int64_t currMaxSize;
 
-       Slot *lastSlotAttemptedToSend = NULL;
-       bool lastIsNewKey = false;
-       int lastNewSize = 0;
-       Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent = NULL;
-       Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete = NULL;
-       NewKey *lastNewKey = NULL;
+       Slot *lastSlotAttemptedToSend;
+       bool lastIsNewKey;
+       int lastNewSize;
+       Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent;
+       Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
+       NewKey *lastNewKey;
 
 
        /* Data Structures  */
-       Hashtable<IoTString *, KeyValue *> *committedKeyValueTable = NULL;// Table of committed key value pairs
-       Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable = NULL;     // Table of speculated key value pairs, if there is a speculative value
-       Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable = NULL;   // Table of speculated key value pairs, if there is a speculative value from the pending transactions
-       Hashtable<IoTString *, NewKey *> *liveNewKeyTable = NULL;       // Table of live new keys
-       Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable = NULL;       // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
-       Hashtable<int64_t, HashSet<RejectedMessage *> *> *rejectedMessageWatchVectorTable = NULL;       // Table of machine Ids and the set of rejected messages they have not seen yet
-       Hashtable<IoTString *, int64_t> *arbitratorTable = NULL;// Table of keys and their arbitrators
-       Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable = NULL;// Table live abort messages
-       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts = NULL;       // transaction parts that are seen in this latest round of slots from the server
-       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
-       Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL;   // Last transaction sequence number that an arbitrator arbitrated on
-       Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
-       Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
-       Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable = NULL;
-       Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable = NULL;
-       Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
-       Vector<int64_t> *rejectedSlotVector = NULL;     // Vector of rejected slots that have yet to be sent to the server
-       Vector<Transaction *> *pendingTransactionQueue = NULL;
-       Vector<ArbitrationRound *> *pendingSendArbitrationRounds = NULL;
-       Vector<Entry *> *pendingSendArbitrationEntriesToDelete = NULL;
-       Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent = NULL;
-       Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus = NULL;
-       Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal = NULL;
-       Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer = NULL;
-       Hashtable<int64_t, Pair<String *, int32_t> > *localCommunicationTable = NULL;
-       Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer = NULL;
-       Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
+       Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
+       Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable;    // Table of speculated key value pairs, if there is a speculative value
+       Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable;  // Table of speculated key value pairs, if there is a speculative value from the pending transactions
+       Hashtable<IoTString *, NewKey *> *liveNewKeyTable;      // Table of live new keys
+       Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable;      // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+       Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable;      // Table of machine Ids and the set of rejected messages they have not seen yet
+       Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
+       Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable;// Table live abort messages
+       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts;      // transaction parts that are seen in this latest round of slots from the server
+       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts;        // commit parts that are seen in this latest round of slots from the server
+       Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable;  // Last transaction sequence number that an arbitrator arbitrated on
+       Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable;        // live transaction grouped by the sequence number
+       Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable;        // live transaction grouped by the transaction ID
+       Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable;
+       Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
+       Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
+       Vector<int64_t> *rejectedSlotVector;    // Vector of rejected slots that have yet to be sent to the server
+       Vector<Transaction *> *pendingTransactionQueue;
+       Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
+       Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
+       Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent;
+       Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
+       Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
+       Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer;
+       Hashtable<int64_t, Pair<IoTString *, int32_t> > *localCommunicationTable;
+       Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
+       Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+       bool lastInsertedNewKey;
+       int64_t lastSeqNumArbOn;
+
+
        void init();
        /**
         * Recalculate the new resize threshold
         */
        void setResizeThreshold();
        bool sendToServer(NewKey *newKey);
-       synchronized bool updateFromLocal(int64_t machineId);
+       bool updateFromLocal(int64_t machineId);
        Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
        ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
        /**
@@ -104,7 +110,7 @@ private:
        /**
         * Checks for malicious activity and updates the local copy of the block chain.
         */
-       void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
+       void validateAndUpdate(Array<Slot> * newSlots, bool acceptUpdatesToLocal);
 
        void updateLiveStateFromServer();
 
@@ -136,7 +142,7 @@ private:
         */
        void processNewTransactionParts();
 
-       int64_t lastSeqNumArbOn = 0;
+
 
        void arbitrateFromServer();
 
@@ -170,12 +176,12 @@ private:
        /**
         * Process this slot, entry by entry.  Also update the latest message sent by slot
         */
-       void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+       void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
 
        /**
         * Update the last message that was sent for a machine Id
         */
-       void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
+       void processEntry(LastMessage entry, Hashset<int64_t> machineSet);
 
        /**
         * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
@@ -216,7 +222,7 @@ private:
         * Check that the last message seen is correct and that there is no mismatch of our own last message or that
         * other clients have not had a rollback on the last message.
         */
-       void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+       void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
 
        /**
         * Add a rejected message entry to the watch set to keep track of which clients have seen that
@@ -227,11 +233,11 @@ private:
        /**
         * Check if the HMAC chain is not violated
         */
-       void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
-       bool lastInsertedNewKey = false;
+       void checkHMACChain(SlotIndexer indexer, Array<Slot> * newSlots);
+
 
 public:
-       Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
+       Table(IoTString * baseurl, IoTString * password, int64_t _localMachineId, int listeningPort);
        Table(CloudComm _cloud, int64_t _localMachineId);
 
        /**
@@ -244,7 +250,7 @@ public:
         * Rebuild the table from scratch by pulling the latest block chain from the server.
         */
        void rebuild();
-       void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
+       void addLocalCommunication(int64_t arbitrator, IoTString* hostName, int portNumber);
        uint64_t getArbitrator(IoTString *key);
        void close();
        IoTString *getCommitted(IoTString *key);
index b9d7196..3d77da9 100644 (file)
@@ -53,6 +53,13 @@ public:
                        ourfree(array);
        }
 
+       bool equals(Array<type> * _array) {
+               if (_array->size != size)
+                       return false;
+               int cmp=memcmp(array, _array->array, size * sizeof(type));
+               return cmp == 0;
+       }
+       
        type get(uint index) const {
                return array[index];
        }
index 7973b6b..bc23a7c 100644 (file)
@@ -39,14 +39,18 @@ class SlotBuffer;
 class SlotIndexer;
 class Table;
 class TableStatus;
-class ThreeTuple;
 class TimingSingleton;
 class Transaction;
 class TransactionPart;
 class TransactionStatus;
-class Mac;
 class Error;
 
-
-
+//Code to write
+class SecretKeySpec;
+class Mac;
+class SecureRandom;
+class Thread;
+class DataInputStream;
+class URL;
+class Random;
 #endif
index 5a7770d..00ff962 100644 (file)
@@ -7,58 +7,58 @@ template<typename type>
 class Vector {
 public:
        Vector(uint _capacity = VECTOR_DEFCAP) :
-               size(0),
+               fldsize(0),
                capacity(_capacity),
                array((type *) ourmalloc(sizeof(type) * _capacity)) {
        }
 
        Vector(uint _capacity, type *_array)  :
-               size(_capacity),
+               fldsize(_capacity),
                capacity(_capacity),
                array((type *) ourmalloc(sizeof(type) * _capacity)) {
                memcpy(array, _array, capacity * sizeof(type));
        }
 
        Vector(Vector<type> *v) :
-               size(v->size),
+               fldsize(v->fldsize),
                capacity(v->capacity),
                array((type *) ourmalloc(sizeof(type) * v->capacity)) {
                memcpy(array, v->array, capacity * sizeof(type));
        }
 
        void pop() {
-               size--;
+               fldsize--;
        }
 
        type last() const {
-               return array[size - 1];
+               return array[fldsize - 1];
        }
 
        void setSize(uint _size) {
-               if (_size <= size) {
-                       size = _size;
+               if (_size <= fldsize) {
+                       fldsize = _size;
                        return;
                } else if (_size > capacity) {
                        array = (type *)ourrealloc(array, _size * sizeof(type));
                        capacity = _size;
                }
-               bzero(&array[size], (_size - size) * sizeof(type));
-               size = _size;
+               bzero(&array[fldsize], (_size - fldsize) * sizeof(type));
+               fldsize = _size;
        }
 
        void addAll(Vector<type> *v) {
-               int oldsize = size;
-               setSize(size + v->size);
-               memcpy(&array[size], v->array, v->size * sizeof(type));
+               int oldsize = fldsize;
+               setSize(fldsize + v->fldsize);
+               memcpy(&array[fldsize], v->array, v->fldsize * sizeof(type));
        }
 
        void add(type item) {
-               if (size >= capacity) {
+               if (fldsize >= capacity) {
                        uint newcap = capacity << 1;
                        array = (type *)ourrealloc(array, newcap * sizeof(type));
                        capacity = newcap;
                }
-               array[size++] = item;
+               array[fldsize++] = item;
        }
 
        type get(uint index) const {
@@ -66,7 +66,7 @@ public:
        }
 
        void setExpand(uint index, type item) {
-               if (index >= size)
+               if (index >= fldsize)
                        setSize(index + 1);
                set(index, item);
        }
@@ -75,12 +75,12 @@ public:
                array[index] = item;
        }
 
-       uint getSize() const {
-               return size;
+       uint size() const {
+               return fldsize;
        }
 
        bool isEmpty() const {
-               return size == 0;
+               return fldsize == 0;
        }
 
        ~Vector() {
@@ -88,7 +88,7 @@ public:
        }
 
        void clear() {
-               size = 0;
+               fldsize = 0;
        }
 
        type *expose() {
@@ -96,7 +96,7 @@ public:
        }
        CMEMALLOC;
 private:
-       uint size;
+       uint fldsize;
        uint capacity;
        type *array;
 };