-#include"ArbitrationRound.h"
+#include "ArbitrationRound.h"
+#include "Commit.h"
-ArbitrationRound::ArbitrationRound(Commit * _commit, Set<Abort *> * _abortsBefore) {
- parts = new ArrayList<Entry>();
- commit = _commit;
- abortsBefore = _abortsBefore;
+ArbitrationRound::ArbitrationRound(Commit * _commit, Hashset<Abort *> * _abortsBefore) :
+ abortsBefore(_abortsBefore),
+ parts(new Vector<Entry *>()),
+ commit(_commit),
+ currentSize(0),
+ didSendPart(false),
+ didGenerateParts(false) {
if (commit != NULL) {
- commit.createCommitParts();
- currentSize += commit.getNumberOfParts();
- }
+ commit->createCommitParts();
+ currentSize += commit->getNumberOfParts();
+ }
- currentSize += abortsBefore.size();
+ currentSize += abortsBefore->size();
}
void ArbitrationRound::generateParts() {
if (didGenerateParts) {
return;
}
- parts = new ArrayList<Entry>(abortsBefore);
+ parts = new Vector<Entry>(abortsBefore);
if (commit != NULL) {
- parts.addAll(commit.getParts().values());
+ parts->addAll(commit->getParts()->values());
}
}
}
void ArbitrationRound::removeParts(List<Entry *> * removeParts) {
- parts.removeAll(removeParts);
+ parts->removeAll(removeParts);
didSendPart = true;
}
bool ArbitrationRound::isDoneSending() {
- if ((commit == NULL) && abortsBefore.isEmpty()) {
+ if ((commit == NULL) && abortsBefore->isEmpty()) {
return true;
}
- return parts.isEmpty();
+ return parts->isEmpty();
}
Commit * ArbitrationRound::getCommit() {
void ArbitrationRound::setCommit(Commit * _commit) {
if (commit != NULL) {
- currentSize -= commit.getNumberOfParts();
+ currentSize -= commit->getNumberOfParts();
}
commit = _commit;
if (commit != NULL) {
- currentSize += commit.getNumberOfParts();
+ currentSize += commit->getNumberOfParts();
}
}
void ArbitrationRound::addAbort(Abort * abort) {
- abortsBefore.add(abort);
+ abortsBefore->add(abort);
currentSize++;
}
-void ArbitrationRound::addAborts(Set<Abort *> * aborts) {
- abortsBefore.addAll(aborts);
- currentSize += aborts.size();
+void ArbitrationRound::addAborts(Hashset<Abort *> * aborts) {
+ abortsBefore->addAll(aborts);
+ currentSize += aborts->size();
}
-Set<Abort> ArbitrationRound::getAborts() {
+Hashset<Abort *> * ArbitrationRound::getAborts() {
return abortsBefore;
}
int ArbitrationRound::getAbortsCount() {
- return abortsBefore.size();
+ return abortsBefore->size();
}
int ArbitrationRound::getCurrentSize() {
return currentSize >= MAX_PARTS;
}
-bool ArbitrationRound::didSendPart() {
+bool ArbitrationRound::getDidSendPart() {
return didSendPart;
}
#define ARBITRATIONROUND_H
#define MAX_PARTS 10
+#include "common.h"
class ArbitrationRound {
private:
- Set<Abort *> * abortsBefore = NULL;
- List<Entry *> * parts = NULL;
- Commit commit = NULL;
- int currentSize = 0;
- bool didSendPart = false;
- bool didGenerateParts = false;
+ Hashset<Abort *> * abortsBefore;
+ Vector<Entry *> * parts;
+ Commit * commit;
+ int currentSize;
+ bool didSendPart;
+ bool didGenerateParts;
public:
- ArbitrationRound(Commit * _commit, Set<Abort *> * _abortsBefore);
+ ArbitrationRound(Commit * _commit, Hashset<Abort *> * _abortsBefore);
+ ~ArbitrationRound();
void generateParts();
- List<Entry> * getParts();
- void removeParts(List<Entry> * removeParts);
+ Vector<Entry *> * getParts();
+ void removeParts(Vector<Entry *> * removeParts);
bool isDoneSending();
void setCommit(Commit * _commit);
void addAbort(Abort * abort);
- void addAborts(Set<Abort *> * aborts);
- Set<Abort *> * getAborts();
+ void addAborts(Hashset<Abort *> * aborts);
+ Hashset<Abort *> * getAborts();
int getAbortsCount();
int getCurrentSize();
bool isFull();
- bool didSendPart();
+ bool getDidSendPart();
};
#endif
+#include "commit.h"
+
+Commit::Commit() :
+ parts(new HashMap<int32_t, CommitPart *>()),
+ missingParts(NULL),
+ fldisComplete(false),
+ hasLastPart(false),
+ keyValueUpdateSet(new HashSet<KeyValue *>()),
+ isDead(false),
+ sequenceNumber(-1),
+ machineId(-1),
+ transactionSequenceNumber(-1),
+ liveKeys(new Hashset<IoTString *>) {
+}
-class Commit {
-
- Map<Integer, CommitPart> parts = NULL;
- Set<Integer> missingParts = NULL;
- bool isComplete = false;
- bool hasLastPart = false;
- Set<KeyValue> keyValueUpdateSet = NULL;
- bool isDead = false;
- int64_t sequenceNumber = -1;
- int64_t machineId = -1;
- int64_t transactionSequenceNumber = -1;
-
- Set<IoTString> liveKeys = NULL;
-
- Commit() {
- parts = new HashMap<Integer, CommitPart>();
- keyValueUpdateSet = new HashSet<KeyValue>();
-
- liveKeys = new HashSet<IoTString>();
- }
-
- Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) {
- parts = new HashMap<Integer, CommitPart>();
- keyValueUpdateSet = new HashSet<KeyValue>();
-
- liveKeys = new HashSet<IoTString>();
-
- sequenceNumber = _sequenceNumber;
- machineId = _machineId;
- transactionSequenceNumber = _transactionSequenceNumber;
- isComplete = true;
- }
-
-
- void addPartDecode(CommitPart newPart) {
-
- if (isDead) {
- // If dead then just kill this part and move on
- newPart.setDead();
- return;
- }
-
- CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-
- if (previoslySeenPart != NULL) {
- // Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
- missingParts = new HashSet<Integer>();
- hasLastPart = true;
-
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
- }
- }
- }
-
- if (!isComplete && hasLastPart) {
-
- // We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
-
- // Check if all the parts have been seen
- if (missingParts.size() == 0) {
-
- // We have all the parts
- isComplete = true;
-
- // Decode all the parts and create the key value guard and update sets
- decodeCommitData();
+Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
+ parts(new HashMap<int32_t, CommitPart *>()),
+ missingParts(NULL),
+ fldisComplete(true),
+ hasLastPart(false),
+ keyValueUpdateSet(new HashSet<KeyValue *>()),
+ isDead(false),
+ sequenceNumber(_sequenceNumber),
+ machineId(_machineId),
+ transactionSequenceNumber(_transactionSequenceNumber),
+ liveKeys(new Hashset<IoTString *>) {
+}
- // Get the sequence number and arbitrator of this transaction
- sequenceNumber = parts.get(0).getSequenceNumber();
- machineId = parts.get(0).getMachineId();
- transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
- }
- }
- }
+void Commit::addPartDecode(CommitPart newPart) {
+
+ if (isDead) {
+ // If dead then just kill this part and move on
+ newPart.setDead();
+ return;
+ }
+
+ CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+
+ if (previoslySeenPart != NULL) {
+ // Set dead the old one since the new one is a rescued version of this part
+ previoslySeenPart.setDead();
+ } else if (newPart.isLastPart()) {
+ missingParts = new HashSet<Integer>();
+ hasLastPart = true;
+
+ for (int i = 0; i < newPart.getPartNumber(); i++) {
+ if (parts.get(i) == NULL) {
+ missingParts.add(i);
+ }
+ }
+ }
+
+ if (!fldisComplete && hasLastPart) {
+
+ // We have seen this part so remove it from the set of missing parts
+ missingParts.remove(newPart.getPartNumber());
+
+ // Check if all the parts have been seen
+ if (missingParts.size() == 0) {
+
+ // We have all the parts
+ fldisComplete = true;
+
+ // Decode all the parts and create the key value guard and update sets
+ decodeCommitData();
+
+ // Get the sequence number and arbitrator of this transaction
+ sequenceNumber = parts.get(0).getSequenceNumber();
+ machineId = parts.get(0).getMachineId();
+ transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
+ }
+ }
+}
int64_t getSequenceNumber() {
return sequenceNumber;
return keyValueUpdateSet;
}
- int getNumberOfParts() {
- return parts.size();
- }
-
- int64_t getMachineId() {
- return machineId;
- }
-
- bool isComplete() {
- return isComplete;
- }
-
- bool isLive() {
- return !isDead;
- }
+int32_t getNumberOfParts() {
+ return parts.size();
+}
void setDead() {
if (isDead) {
-
+#ifndef COMMIT_H
+#define COMMIT_H
+#include "common.h"
class Commit {
-
- private Map<Integer, CommitPart> parts = NULL;
- private Set<Integer> missingParts = NULL;
- private bool isComplete = false;
- private bool hasLastPart = false;
- private Set<KeyValue> keyValueUpdateSet = NULL;
- private bool isDead = false;
- private int64_t sequenceNumber = -1;
- private int64_t machineId = -1;
- private int64_t transactionSequenceNumber = -1;
-
- private Set<IoTString> liveKeys = NULL;
-
- public Commit() {
- parts = new HashMap<Integer, CommitPart>();
- keyValueUpdateSet = new HashSet<KeyValue>();
-
- liveKeys = new HashSet<IoTString>();
- }
-
- public Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) {
- parts = new HashMap<Integer, CommitPart>();
- keyValueUpdateSet = new HashSet<KeyValue>();
-
- liveKeys = new HashSet<IoTString>();
-
- sequenceNumber = _sequenceNumber;
- machineId = _machineId;
- transactionSequenceNumber = _transactionSequenceNumber;
- isComplete = true;
- }
-
-
- public void addPartDecode(CommitPart newPart) {
-
- if (isDead) {
- // If dead then just kill this part and move on
- newPart.setDead();
- return;
- }
-
- CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-
- if (previoslySeenPart != NULL) {
- // Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
- missingParts = new HashSet<Integer>();
- hasLastPart = true;
-
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
- }
- }
- }
-
- if (!isComplete && hasLastPart) {
-
- // We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
-
- // Check if all the parts have been seen
- if (missingParts.size() == 0) {
-
- // We have all the parts
- isComplete = true;
-
- // Decode all the parts and create the key value guard and update sets
- decodeCommitData();
-
- // Get the sequence number and arbitrator of this transaction
- sequenceNumber = parts.get(0).getSequenceNumber();
- machineId = parts.get(0).getMachineId();
- transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
- }
- }
- }
-
- public int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- public int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
-
- public Map<Integer, CommitPart> getParts() {
- return parts;
- }
-
- public void addKV(KeyValue kv) {
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
-
- public void invalidateKey(IoTString key) {
- liveKeys.remove(key);
-
- if (liveKeys.size() == 0) {
- setDead();
- }
- }
-
- public Set<KeyValue> getKeyValueUpdateSet() {
- return keyValueUpdateSet;
- }
-
- public int getNumberOfParts() {
- return parts.size();
- }
-
- public int64_t getMachineId() {
- return machineId;
- }
-
- public bool isComplete() {
- return isComplete;
- }
-
- public bool isLive() {
- return !isDead;
- }
-
- public void setDead() {
- if (isDead) {
- // Already dead
- return;
- }
-
- // Set dead
- isDead = true;
-
- // Make all the parts of this transaction dead
- for (Integer partNumber : parts.keySet()) {
- CommitPart part = parts.get(partNumber);
- part.setDead();
- }
- }
-
- public CommitPart getPart(int index) {
- return parts.get(index);
- }
-
- public void createCommitParts() {
-
- parts.clear();
-
- // Convert to chars
- char[] charData = convertDataToBytes();
-
-
- int commitPartCount = 0;
- int currentPosition = 0;
- int remaining = charData.length;
-
- while (remaining > 0) {
-
- Boolean isLastPart = false;
- // determine how much to copy
- int copySize = CommitPart.MAX_NON_HEADER_SIZE;
- if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
- copySize = remaining;
- isLastPart = true; // last bit of data so last part
- }
-
- // Copy to a smaller version
- char[] partData = new char[copySize];
- System.arraycopy(charData, currentPosition, partData, 0, copySize);
-
- CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
- parts.put(part.getPartNumber(), part);
-
- // Update position, count and remaining
- currentPosition += copySize;
- commitPartCount++;
- remaining -= copySize;
- }
- }
-
- private void decodeCommitData() {
-
- // Calculate the size of the data section
- int dataSize = 0;
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- dataSize += tp.getDataSize();
- }
-
- char[] combinedData = new char[dataSize];
- int currentPosition = 0;
-
- // Stitch all the data sections together
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
- currentPosition += tp.getDataSize();
- }
-
- // Decoder Object
- ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
-
- // Decode how many key value pairs need to be decoded
- int numberOfKVUpdates = bbDecode.getInt();
-
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
- }
-
- private char[] convertDataToBytes() {
-
- // Calculate the size of the data
- int sizeOfData = sizeof(int32_t); // Number of Update KV's
- for (KeyValue kv : keyValueUpdateSet) {
- sizeOfData += kv.getSize();
- }
-
- // Data handlers and storage
- char[] dataArray = new char[sizeOfData];
- ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
-
- // Encode the size of the updates and guard sets
- bbEncode.putInt(keyValueUpdateSet.size());
-
- // Encode all the updates
- for (KeyValue kv : keyValueUpdateSet) {
- kv.encode(bbEncode);
- }
-
- return bbEncode.array();
- }
-
- private void setKVsMap(Map<IoTString, KeyValue> newKVs) {
- keyValueUpdateSet.clear();
- liveKeys.clear();
-
- keyValueUpdateSet.addAll(newKVs.values());
- liveKeys.addAll(newKVs.keySet());
-
- }
-
-
- public static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
-
- if (older == NULL) {
- return newer;
- } else if (newer == NULL) {
- return older;
- }
-
- Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
- for (KeyValue kv : older.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
-
- for (KeyValue kv : newer.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
-
- int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
-
- if (transactionSequenceNumber == -1) {
- transactionSequenceNumber = older.getTransactionSequenceNumber();
- }
-
- Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
-
- newCommit.setKVsMap(kvSet);
-
- return newCommit;
- }
-}
+ private:
+ Hashtable<int32_t, CommitPart *> * parts;
+ Hashset<int32_t> *missingParts;
+ bool fldisComplete;
+ bool hasLastPart;
+ Hashset<KeyValue *> *keyValueUpdateSet;
+ bool isDead;
+ int64_t sequenceNumber;
+ int64_t machineId;
+ int64_t transactionSequenceNumber;
+ Hashset<IoTString*> * liveKeys;
+ Array<char> * convertDataToBytes();
+ void setKVsMap(Hashtable<IoTString *, KeyValue *> * newKVs);
+
+ public:
+ Commit();
+ Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber);
+
+ void addPartDecode(CommitPart * newPart);
+ int64_t getSequenceNumber();
+ int64_t getTransactionSequenceNumber();
+ Hashtable<int32_t, CommitPart *> *getParts();
+ void addKV(KeyValue * kv);
+ void invalidateKey(IoTString * key);
+ Hashset<KeyValue *> * getKeyValueUpdateSet();
+ int32_t getNumberOfParts();
+ int64_t getMachineId() { return machineId; }
+ bool isComplete() { return fldisComplete; }
+ bool isLive() { return !isDead; }
+ void setDead();
+ CommitPart * getPart(int32_t index);
+ void createCommitParts();
+ void decodeCommitData();
+};
+
+Commit * Commit_merge(Commit * newer, Commit * older, int64_t newSequenceNumber);
+#endif
+#include "TableStatus.h"
+#include "ByteBuffer.h"
-/**
- * TableStatus entries record the current size of the data structure
- * in slots. Used to remember the size and to perform resizes.
- * @author Brian Demsky
- * @version 1.0
- */
-
-
-class TableStatus extends Entry {
- int maxslots;
-
- TableStatus(Slot slot, int _maxslots) {
- super(slot);
- maxslots=_maxslots;
- }
-
- int getMaxSlots() {
- return maxslots;
- }
-
- static Entry decode(Slot slot, ByteBuffer bb) {
- int maxslots=bb.getInt();
- return new TableStatus(slot, maxslots);
- }
-
- void encode(ByteBuffer bb) {
- bb.put(Entry.TypeTableStatus);
- bb.putInt(maxslots);
- }
-
- int getSize() {
- return sizeof(int32_t)+sizeof(char);
- }
-
- char getType() {
- return Entry.TypeTableStatus;
- }
+Entry * TableStatus_decode(Slot * slot, ByteBuffer * bb) {
+ int maxslots=bb.getInt();
+ return new TableStatus(slot, maxslots);
+}
- Entry getCopy(Slot s) {
- return new TableStatus(s, maxslots);
- }
+void TableStatus::encode(ByteBuffer * bb) {
+ bb->put(TypeTableStatus);
+ bb->putInt(maxslots);
}
+#ifndef TABLESTATUS_H
+#define TABLESTATUS_H
+#include "common.h"
+#include "Entry.h"
/**
* TableStatus entries record the current size of the data structure
* @version 1.0
*/
-
-class TableStatus extends Entry {
- private int maxslots;
-
- TableStatus(Slot slot, int _maxslots) {
- super(slot);
- maxslots=_maxslots;
- }
-
- int getMaxSlots() {
- return maxslots;
- }
-
- static Entry decode(Slot slot, ByteBuffer bb) {
- int maxslots=bb.getInt();
- return new TableStatus(slot, maxslots);
- }
-
- void encode(ByteBuffer bb) {
- bb.put(Entry.TypeTableStatus);
- bb.putInt(maxslots);
- }
-
- int getSize() {
- return sizeof(int32_t)+sizeof(char);
- }
-
- char getType() {
- return Entry.TypeTableStatus;
- }
-
- Entry getCopy(Slot s) {
- return new TableStatus(s, maxslots);
- }
-}
+class TableStatus : public Entry {
+ private:
+ int maxslots;
+
+ public:
+ TableStatus(Slot * slot, int _maxslots) : Entry(slot),
+ maxslots(_maxslots) {
+ }
+ int getMaxSlots() { return maxslots; }
+ void encode(ByteBuffer *bb);
+ int getSize() { return sizeof(int32_t)+sizeof(char); }
+
+ char getType() { return TypeTableStatus; }
+
+ Entry * getCopy(Slot * s) { return new TableStatus(s, maxslots); }
+};
+
+Entry * TableStatus_decode(Slot * slot, ByteBuffer * bb);
+#endif
#ifndef ARRAY_H
#define ARRAY_H
+#include <inttypes.h>
+
+typedef uint32_t uint;
template<typename type>
class Array {
size(0) {
}
- Array(uint _size) :
+ Array(uint32_t _size) :
array((type *) ourcalloc(1, sizeof(type) * _size)),
size(_size)
{
#ifndef COMMON_H
#define COMMON_H
#include <inttypes.h>
+typedef uint32_t uint;
+#define CMEMALLOC ;
+#define model_print printf
+
+#include "hashset.h"
+#include "vector.h"
+#include "array.h"
+
+
class Abort;
class Entry;
class Slot;
class ByteBuffer;
class Liveness;
-
+class Commit;
+class CommitPart;
+class ArbitrationRound;
+class KeyValue;
+class IoTString;
#endif
Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *set;
};
-template<typename _Key, typename _KeyInt, int _Shift = 0, unsigned int (*hash_function) (_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals) (_Key, _Key) = defaultEquals<_Key> >
+template<typename _Key, typename _KeyInt = uintptr_t, int _Shift = 0, unsigned int (*hash_function) (_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals) (_Key, _Key) = defaultEquals<_Key> >
class Hashset {
public:
Hashset(unsigned int initialcapacity = 16, double factor = 0.5) :
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
-#include "mymemory.h"
#include "common.h"
+#include "mymemory.h"
/**
* @brief Hashtable node
#ifndef CPPVECTOR_H
#define CPPVECTOR_H
#include <string.h>
-
#define VECTOR_DEFCAP 8
template<typename type>