X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTransaction.cc;h=526c0ba8dba74862cdf0ba608863eabc8818ebe0;hp=2da475908be3c61a7af6f0be18e76a296e0ab113;hb=0a73089b732948a43f9d29fef2eee4754ecad54c;hpb=bd73414f3ef76dbb9d060e67b6d31f32278b2ffb diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index 2da4759..526c0ba 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -1,300 +1,276 @@ +#include "Transaction.h" +Transaction::Transaction() { + parts = new Hashtable(); + keyValueGuardSet = new HashSet(); + keyValueUpdateSet = new HashSet(); + partsPendingSend = new Vector(); +} -class Transaction { - - Hashtable parts = NULL; - Set missingParts = NULL; - Vector partsPendingSend = NULL; - bool isComplete = false; - bool hasLastPart = false; - Set keyValueGuardSet = NULL; - Set keyValueUpdateSet = NULL; - bool isDead = false; - int64_t sequenceNumber = -1; - int64_t clientLocalSequenceNumber = -1; - int64_t arbitratorId = -1; - int64_t machineId = -1; - Pair transactionId = NULL; - - int nextPartToSend = 0; - bool didSendAPartToServer = false; +void Transaction::addPartEncode(TransactionPart *newPart) { + parts.put(newPart.getPartNumber(), newPart); + partsPendingSend.add(newPart.getPartNumber()); - TransactionStatus transactionStatus = NULL; + sequenceNumber = newPart.getSequenceNumber(); + arbitratorId = newPart.getArbitratorId(); + transactionId = newPart.getTransactionId(); + clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); + machineId = newPart.getMachineId(); - bool hadServerFailure = false; + isComplete = true; +} - Transaction() { - parts = new Hashtable(); - keyValueGuardSet = new HashSet(); - keyValueUpdateSet = new HashSet(); - partsPendingSend = new Vector(); +void Transaction::addPartDecode(TransactionPart *newPart) { + if (isDead) { + // If dead then just kill this part and move on + newPart.setDead(); + return; } - void addPartEncode(TransactionPart newPart) { - parts.put(newPart.getPartNumber(), newPart); - partsPendingSend.add(newPart.getPartNumber()); - - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); + sequenceNumber = newPart.getSequenceNumber(); + arbitratorId = newPart.getArbitratorId(); + transactionId = newPart.getTransactionId(); + clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); + machineId = newPart.getMachineId(); - isComplete = true; - } + TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); - void addPartDecode(TransactionPart newPart) { + if (previoslySeenPart != NULL) { + // Set dead the old one since the new one is a rescued version of this part + previoslySeenPart.setDead(); + } else if (newPart.isLastPart()) { + missingParts = new HashSet(); + hasLastPart = true; - if (isDead) { - // If dead then just kill this part and move on - newPart.setDead(); - return; + for (int i = 0; i < newPart.getPartNumber(); i++) { + if (parts.get(i) == NULL) { + missingParts.add(i); + } } + } - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); + if (!isComplete && hasLastPart) { - TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); + // We have seen this part so remove it from the set of missing parts + missingParts.remove(newPart.getPartNumber()); - if (previoslySeenPart != NULL) { - // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { - missingParts = new HashSet(); - hasLastPart = true; + // Check if all the parts have been seen + if (missingParts.size() == 0) { - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); - } - } + // We have all the parts + isComplete = true; + + // Decode all the parts and create the key value guard and update sets + decodeTransactionData(); } + } +} - if (!isComplete && hasLastPart) { +void Transaction::addUpdateKV(KeyValue *kv) { + keyValueUpdateSet.add(kv); +} - // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); +void Transaction::addGuardKV(KeyValue *kv) { + keyValueGuardSet.add(kv); +} - // Check if all the parts have been seen - if (missingParts.size() == 0) { - // We have all the parts - isComplete = true; +int64_t Transaction::getSequenceNumber() { + return sequenceNumber; +} - // Decode all the parts and create the key value guard and update sets - decodeTransactionData(); - } - } - } +void Transaction::setSequenceNumber(int64_t _sequenceNumber) { + sequenceNumber = _sequenceNumber; - void addUpdateKV(KeyValue kv) { - keyValueUpdateSet.add(kv); + for (int32_t i : parts.keySet()) { + parts.get(i).setSequenceNumber(sequenceNumber); } +} - void addGuardKV(KeyValue kv) { - keyValueGuardSet.add(kv); - } +int64_t Transaction::getClientLocalSequenceNumber() { + return clientLocalSequenceNumber; +} +Hashtable *Transaction::getParts() { + return parts; +} - int64_t getSequenceNumber() { - return sequenceNumber; - } +bool Transaction::didSendAPartToServer() { + return didSendAPartToServer; +} - void setSequenceNumber(int64_t _sequenceNumber) { - sequenceNumber = _sequenceNumber; +void Transaction::resetNextPartToSend() { + nextPartToSend = 0; +} - for (int32_t i : parts.keySet()) { - parts.get(i).setSequenceNumber(sequenceNumber); - } +TransactionPart *Transaction::getNextPartToSend() { + if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { + return NULL; } + TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); + nextPartToSend++; + return part; +} - int64_t getClientLocalSequenceNumber() { - return clientLocalSequenceNumber; - } - Hashtable getParts() { - return parts; - } +void Transaction::setServerFailure() { + hadServerFailure = true; +} - bool didSendAPartToServer() { - return didSendAPartToServer; - } +bool Transaction::getServerFailure() { + return hadServerFailure; +} - void resetNextPartToSend() { - nextPartToSend = 0; - } - TransactionPart getNextPartToSend() { - if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { - return NULL; - } - TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); - nextPartToSend++; - return part; - } +void Transaction::resetServerFailure() { + hadServerFailure = false; +} - void setServerFailure() { - hadServerFailure = true; - } +void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) { + transactionStatus = _transactionStatus; +} + +TransactionStatus *Transaction::getTransactionStatus() { + return transactionStatus; +} - bool getServerFailure() { - return hadServerFailure; +void Transaction::removeSentParts(Vector *sentParts) { + nextPartToSend = 0; + if (partsPendingSend.removeAll(sentParts)) + { + didSendAPartToServer = true; + transactionStatus.setTransactionSequenceNumber(sequenceNumber); } +} +bool Transaction::didSendAllParts() { + return partsPendingSend.isEmpty(); +} - void resetServerFailure() { - hadServerFailure = false; - } +Hashset *Transaction::getKeyValueUpdateSet() { + return keyValueUpdateSet; +} +int Transaction::getNumberOfParts() { + return parts.size(); +} - void setTransactionStatus(TransactionStatus _transactionStatus) { - transactionStatus = _transactionStatus; - } +int64_t Transaction::getMachineId() { + return machineId; +} - TransactionStatus getTransactionStatus() { - return transactionStatus; - } +int64_t Transaction::getArbitrator() { + return arbitratorId; +} - void removeSentParts(Vector sentParts) { - nextPartToSend = 0; - if (partsPendingSend.removeAll(sentParts)) - { - didSendAPartToServer = true; - transactionStatus.setTransactionSequenceNumber(sequenceNumber); - } - } +bool Transaction::isComplete() { + return isComplete; +} - bool didSendAllParts() { - return partsPendingSend.isEmpty(); - } +Pair *Transaction::getId() { + return transactionId; +} - Set getKeyValueUpdateSet() { - return keyValueUpdateSet; +void Transaction::setDead() { + if (isDead) { + // Already dead + return; } - int getNumberOfParts() { - return parts.size(); - } + // Set dead + isDead = true; - int64_t getMachineId() { - return machineId; + // Make all the parts of this transaction dead + for (int32_t partNumber : parts.keySet()) { + TransactionPart part = parts.get(partNumber); + part.setDead(); } +} - int64_t getArbitrator() { - return arbitratorId; - } +TransactionPart *Transaction::getPart(int index) { + return parts.get(index); +} + +void Transaction::decodeTransactionData() { - bool isComplete() { - return isComplete; + // Calculate the size of the data section + int dataSize = 0; + for (int i = 0; i < parts.keySet().size(); i++) { + TransactionPart tp = parts.get(i); + dataSize += tp.getDataSize(); } - Pair getId() { - return transactionId; + Array *combinedData = new char[dataSize]; + int currentPosition = 0; + + // Stitch all the data sections together + for (int i = 0; i < parts.keySet().size(); i++) { + TransactionPart tp = parts.get(i); + System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); + currentPosition += tp.getDataSize(); } - void setDead() { - if (isDead) { - // Already dead - return; - } + // Decoder Object + ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); - // Set dead - isDead = true; + // Decode how many key value pairs need to be decoded + int numberOfKVGuards = bbDecode.getInt(); + int numberOfKVUpdates = bbDecode.getInt(); - // Make all the parts of this transaction dead - for (int32_t partNumber : parts.keySet()) { - TransactionPart part = parts.get(partNumber); - part.setDead(); - } + // Decode all the guard key values + for (int i = 0; i < numberOfKVGuards; i++) { + KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); + keyValueGuardSet.add(kv); } - TransactionPart getPart(int index) { - return parts.get(index); + // Decode all the updates key values + for (int i = 0; i < numberOfKVUpdates; i++) { + KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); + keyValueUpdateSet.add(kv); } +} - void decodeTransactionData() { - - // Calculate the size of the data section - int dataSize = 0; - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - dataSize += tp.getDataSize(); - } +bool Transaction::evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable) { + for (KeyValue *kvGuard : keyValueGuardSet) { - Array *combinedData = new char[dataSize]; - int currentPosition = 0; + // First check if the key is in the speculative table, this is the value of the latest assumption + KeyValue kv = NULL; - // Stitch all the data sections together - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); - currentPosition += tp.getDataSize(); + // If we have a speculation table then use it first + if (pendingTransactionSpeculatedKeyValueTable != NULL) { + kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); } - // Decoder Object - ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); - - // Decode how many key value pairs need to be decoded - int numberOfKVGuards = bbDecode.getInt(); - int numberOfKVUpdates = bbDecode.getInt(); - - // Decode all the guard key values - for (int i = 0; i < numberOfKVGuards; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueGuardSet.add(kv); + // If we have a speculation table then use it first + if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { + kv = speculatedKeyValueTable.get(kvGuard.getKey()); } - // Decode all the updates key values - for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueUpdateSet.add(kv); + if (kv == NULL) { + // if it is not in the speculative table then check the committed table and use that + // value as our latest assumption + kv = committedKeyValueTable.get(kvGuard.getKey()); } - } - bool evaluateGuard(Hashtable committedKeyValueTable, Hashtable speculatedKeyValueTable, Hashtable pendingTransactionSpeculatedKeyValueTable) { - for (KeyValue kvGuard : keyValueGuardSet) { + if (kvGuard.getValue() != NULL) { + if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { - // First check if the key is in the speculative table, this is the value of the latest assumption - KeyValue kv = NULL; - // If we have a speculation table then use it first - if (pendingTransactionSpeculatedKeyValueTable != NULL) { - kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); - } - - // If we have a speculation table then use it first - if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { - kv = speculatedKeyValueTable.get(kvGuard.getKey()); - } - - if (kv == NULL) { - // if it is not in the speculative table then check the committed table and use that - // value as our latest assumption - kv = committedKeyValueTable.get(kvGuard.getKey()); - } - - if (kvGuard.getValue() != NULL) { - if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { - - - if (kv != NULL) { - System.out.println(kvGuard.getValue() + " " + kv.getValue()); - } else { - System.out.println(kvGuard.getValue() + " " + kv); - } - - return false; - } - } else { if (kv != NULL) { - return false; + System.out.println(kvGuard.getValue() + " " + kv.getValue()); + } else { + System.out.println(kvGuard.getValue() + " " + kv); } + + return false; + } + } else { + if (kv != NULL) { + return false; } } - return true; } + return true; } +