X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTransaction.h;h=aa62661ecfbcb07ac11ca679d78529de63f8c0dd;hp=31d47cd07bbc758a1e09e35a21c496b27b44ec71;hb=0a73089b732948a43f9d29fef2eee4754ecad54c;hpb=bd73414f3ef76dbb9d060e67b6d31f32278b2ffb diff --git a/version2/src/C/Transaction.h b/version2/src/C/Transaction.h index 31d47cd..aa62661 100644 --- a/version2/src/C/Transaction.h +++ b/version2/src/C/Transaction.h @@ -4,9 +4,9 @@ class Transaction { private: - Hashtable parts = NULL; - Set missingParts = NULL; - Vector partsPendingSend = NULL; + Hashtable *parts = NULL; + Hashset *missingParts = NULL; + Vector *partsPendingSend = NULL; bool isComplete = false; bool hasLastPart = false; Hashset *keyValueGuardSet = NULL; @@ -16,288 +16,41 @@ private: int64_t clientLocalSequenceNumber = -1; int64_t arbitratorId = -1; int64_t machineId = -1; - Pair transactionId = NULL; - + Pair *transactionId = NULL; int nextPartToSend = 0; bool didSendAPartToServer = false; - - TransactionStatus transactionStatus = NULL; - + TransactionStatus *transactionStatus = NULL; bool hadServerFailure = false; - - public Transaction() { - parts = new Hashtable(); - keyValueGuardSet = new HashSet(); - keyValueUpdateSet = new HashSet(); - partsPendingSend = new Vector(); - } - - public void addPartEncode(TransactionPart newPart) { - parts.put(newPart.getPartNumber(), newPart); - partsPendingSend.add(newPart.getPartNumber()); - - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); - - isComplete = true; - } - - public void addPartDecode(TransactionPart newPart) { - - if (isDead) { - // If dead then just kill this part and move on - newPart.setDead(); - return; - } - - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); - - TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); - - if (previoslySeenPart != NULL) { - // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { - missingParts = new HashSet(); - hasLastPart = true; - - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); - } - } - } - - if (!isComplete && hasLastPart) { - - // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); - - // Check if all the parts have been seen - if (missingParts.size() == 0) { - - // We have all the parts - isComplete = true; - - // Decode all the parts and create the key value guard and update sets - decodeTransactionData(); - } - } - } - - public void addUpdateKV(KeyValue kv) { - keyValueUpdateSet.add(kv); - } - - public void addGuardKV(KeyValue kv) { - keyValueGuardSet.add(kv); - } - - - public int64_t getSequenceNumber() { - return sequenceNumber; - } - - public void setSequenceNumber(int64_t _sequenceNumber) { - sequenceNumber = _sequenceNumber; - - for (int32_t i : parts.keySet()) { - parts.get(i).setSequenceNumber(sequenceNumber); - } - } - - public int64_t getClientLocalSequenceNumber() { - return clientLocalSequenceNumber; - } - - public Hashtable getParts() { - return parts; - } - - public bool didSendAPartToServer() { - return didSendAPartToServer; - } - - public void resetNextPartToSend() { - nextPartToSend = 0; - } - - public TransactionPart getNextPartToSend() { - if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { - return NULL; - } - TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); - nextPartToSend++; - return part; - } - - - public void setServerFailure() { - hadServerFailure = true; - } - - public bool getServerFailure() { - return hadServerFailure; - } - - - public void resetServerFailure() { - hadServerFailure = false; - } - - - public void setTransactionStatus(TransactionStatus _transactionStatus) { - transactionStatus = _transactionStatus; - } - - public TransactionStatus getTransactionStatus() { - return transactionStatus; - } - - public void removeSentParts(Vector sentParts) { - nextPartToSend = 0; - if (partsPendingSend.removeAll(sentParts)) - { - didSendAPartToServer = true; - transactionStatus.setTransactionSequenceNumber(sequenceNumber); - } - } - - public bool didSendAllParts() { - return partsPendingSend.isEmpty(); - } - - public Set getKeyValueUpdateSet() { - return keyValueUpdateSet; - } - - public int getNumberOfParts() { - return parts.size(); - } - - public int64_t getMachineId() { - return machineId; - } - - public int64_t getArbitrator() { - return arbitratorId; - } - - public bool isComplete() { - return isComplete; - } - - public Pair getId() { - return transactionId; - } - - public void setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (int32_t partNumber : parts.keySet()) { - TransactionPart part = parts.get(partNumber); - part.setDead(); - } - } - - public TransactionPart getPart(int index) { - return parts.get(index); - } - - private void decodeTransactionData() { - - // Calculate the size of the data section - int dataSize = 0; - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - dataSize += tp.getDataSize(); - } - - Array *combinedData = new char[dataSize]; - int currentPosition = 0; - - // Stitch all the data sections together - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); - currentPosition += tp.getDataSize(); - } - - // Decoder Object - ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); - - // Decode how many key value pairs need to be decoded - int numberOfKVGuards = bbDecode.getInt(); - int numberOfKVUpdates = bbDecode.getInt(); - - // Decode all the guard key values - for (int i = 0; i < numberOfKVGuards; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueGuardSet.add(kv); - } - - // Decode all the updates key values - for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueUpdateSet.add(kv); - } - } - - public bool evaluateGuard(Hashtable committedKeyValueTable, Hashtable speculatedKeyValueTable, Hashtable pendingTransactionSpeculatedKeyValueTable) { - for (KeyValue kvGuard : keyValueGuardSet) { - - // First check if the key is in the speculative table, this is the value of the latest assumption - KeyValue kv = NULL; - - // If we have a speculation table then use it first - if (pendingTransactionSpeculatedKeyValueTable != NULL) { - kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); - } - - // If we have a speculation table then use it first - if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { - kv = speculatedKeyValueTable.get(kvGuard.getKey()); - } - - if (kv == NULL) { - // if it is not in the speculative table then check the committed table and use that - // value as our latest assumption - kv = committedKeyValueTable.get(kvGuard.getKey()); - } - - if (kvGuard.getValue() != NULL) { - if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { - - - if (kv != NULL) { - System.out.println(kvGuard.getValue() + " " + kv.getValue()); - } else { - System.out.println(kvGuard.getValue() + " " + kv); - } - - return false; - } - } else { - if (kv != NULL) { - return false; - } - } - } - return true; - } + void decodeTransactionData(); + +public: + Transaction(); + void addPartEncode(TransactionPart *newPart); + void addPartDecode(TransactionPart *newPart); + void addUpdateKV(KeyValue *kv); + void addGuardKV(KeyValue *kv); + int64_t getSequenceNumber(); + void setSequenceNumber(int64_t _sequenceNumber); + int64_t getClientLocalSequenceNumber(); + Hashtable *getParts(); + bool didSendAPartToServer(); + void resetNextPartToSend(); + TransactionPart *getNextPartToSend(); + void setServerFailure(); + bool getServerFailure(); + void resetServerFailure(); + void setTransactionStatus(TransactionStatus *_transactionStatus); + TransactionStatus *getTransactionStatus(); + void removeSentParts(Vector *sentParts); + bool didSendAllParts(); + Hashset *getKeyValueUpdateSet(); + int getNumberOfParts(); + int64_t getMachineId(); + int64_t getArbitrator(); + bool isComplete(); + Pair *getId(); + void setDead(); + TransactionPart *getPart(int32_t index); + bool evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable); }; #endif