X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=version2%2Fsrc%2FC%2FPendingTransaction.cc;h=088238e93aa4d9610a2dc44beb8742aa3b1880d0;hb=18c08ca8d6f4aed6fa24003826304cd5700f7b7d;hp=b3da754ca4fbe5035d1d5b7145d19931d0415bf4;hpb=bd73414f3ef76dbb9d060e67b6d31f32278b2ffb;p=iotcloud.git diff --git a/version2/src/C/PendingTransaction.cc b/version2/src/C/PendingTransaction.cc index b3da754..088238e 100644 --- a/version2/src/C/PendingTransaction.cc +++ b/version2/src/C/PendingTransaction.cc @@ -1,8 +1,13 @@ #include "PendingTransaction.h" +#include "KeyValue.h" +#include "IoTString.h" +#include "Transaction.h" +#include "TransactionPart.h" +#include "ByteBuffer.h" PendingTransaction::PendingTransaction(int64_t _machineId) : keyValueUpdateSet(new Hashset()), - keyValueGuardSet(new HashSet()), + keyValueGuardSet(new Hashset()), arbitrator(-1), clientLocalSequenceNumber(-1), machineId(_machineId), @@ -15,27 +20,30 @@ PendingTransaction::PendingTransaction(int64_t _machineId) : */ void PendingTransaction::addKV(KeyValue *newKV) { - KeyValue rmKV = NULL; + KeyValue *rmKV = NULL; // Make sure there are no duplicates - for (KeyValue kv : keyValueUpdateSet) { - if (kv.getKey().equals(newKV.getKey())) { + SetIterator *kvit = keyValueUpdateSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + if (kv->getKey()->equals(newKV->getKey())) { // Remove key if we are adding a newer version of the same key rmKV = kv; break; } } + delete kvit; // Remove key if we are adding a newer version of the same key if (rmKV != NULL) { - keyValueUpdateSet.remove(rmKV); - currentDataSize -= rmKV.getSize(); + keyValueUpdateSet->remove(rmKV); + currentDataSize -= rmKV->getSize(); } // Add the key to the hash set - keyValueUpdateSet.add(newKV); - currentDataSize += newKV.getSize(); + keyValueUpdateSet->add(newKV); + currentDataSize += newKV->getSize(); } /** @@ -44,8 +52,8 @@ void PendingTransaction::addKV(KeyValue *newKV) { */ void PendingTransaction::addKVGuard(KeyValue *newKV) { // Add the key to the hash set - keyValueGuardSet.add(newKV); - currentDataSize += newKV.getSize(); + keyValueGuardSet->add(newKV); + currentDataSize += newKV->getSize(); } /** @@ -60,36 +68,41 @@ bool PendingTransaction::checkArbitrator(int64_t arb) { return arb == arbitrator; } -bool PendingTransaction::evaluateGuard(Hashtable keyValTableCommitted, Hashtable keyValTableSpeculative, Hashtable keyValTablePendingTransSpeculative) { - for (KeyValue kvGuard : keyValueGuardSet) { +bool PendingTransaction::evaluateGuard(Hashtable *keyValTableCommitted, Hashtable *keyValTableSpeculative, Hashtable *keyValTablePendingTransSpeculative) { + SetIterator *kvit = keyValueGuardSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kvGuard = kvit->next(); // First check if the key is in the speculative table, this is the value of the latest assumption - KeyValue kv = keyValTablePendingTransSpeculative.get(kvGuard.getKey()); + KeyValue *kv = keyValTablePendingTransSpeculative->get(kvGuard->getKey()); if (kv == NULL) { // if it is not in the pending trans table then check the speculative table and use that // value as our latest assumption - kv = keyValTableSpeculative.get(kvGuard.getKey()); + kv = keyValTableSpeculative->get(kvGuard->getKey()); } if (kv == NULL) { // if it is not in the speculative table then check the committed table and use that // value as our latest assumption - kv = keyValTableCommitted.get(kvGuard.getKey()); + kv = keyValTableCommitted->get(kvGuard->getKey()); } - if (kvGuard.getValue() != NULL) { - if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { + if (kvGuard->getValue() != NULL) { + if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) { + delete kvit; return false; } } else { if (kv != NULL) { + delete kvit; return false; } } } + delete kvit; return true; } @@ -101,24 +114,24 @@ Transaction *PendingTransaction::createTransaction() { Array *charData = convertDataToBytes(); int currentPosition = 0; - int remaining = charData.length; + int remaining = charData->length(); while (remaining > 0) { bool isLastPart = false; // determine how much to copy - int copySize = TransactionPart.MAX_NON_HEADER_SIZE; - if (remaining <= TransactionPart.MAX_NON_HEADER_SIZE) { + int copySize = TransactionPart_MAX_NON_HEADER_SIZE; + if (remaining <= TransactionPart_MAX_NON_HEADER_SIZE) { copySize = remaining; isLastPart = true;// last bit of data so last part } // Copy to a smaller version - Array *partData = new char[copySize]; - System.arraycopy(charData, currentPosition, partData, 0, copySize); + Array *partData = new Array(copySize); + System_arraycopy(charData, currentPosition, partData, 0, copySize); - TransactionPart part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart); - newTransaction.addPartEncode(part); + TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart); + newTransaction->addPartEncode(part); // Update position, count and remaining currentPosition += copySize; @@ -127,19 +140,24 @@ Transaction *PendingTransaction::createTransaction() { } // Add the Guard Conditions - for (KeyValue kv : keyValueGuardSet) { - newTransaction.addGuardKV(kv); + SetIterator *kvit = keyValueGuardSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + newTransaction->addGuardKV(kv); } + delete kvit; // Add the updates - for (KeyValue kv : keyValueUpdateSet) { - newTransaction.addUpdateKV(kv); + kvit = keyValueUpdateSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + newTransaction->addUpdateKV(kv); } - + delete kvit; return newTransaction; } -Arrar *PendingTransaction::convertDataToBytes() { +Array *PendingTransaction::convertDataToBytes() { // Calculate the size of the data int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's sizeOfData += currentDataSize; @@ -149,18 +167,24 @@ Arrar *PendingTransaction::convertDataToBytes() { ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray); // Encode the size of the updates and guard sets - bbEncode->putInt(keyValueGuardSet.size()); - bbEncode->putInt(keyValueUpdateSet.size()); + bbEncode->putInt(keyValueGuardSet->size()); + bbEncode->putInt(keyValueUpdateSet->size()); // Encode all the guard conditions - for (KeyValue kv : keyValueGuardSet) { + SetIterator *kvit = keyValueGuardSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); kv->encode(bbEncode); } + delete kvit; // Encode all the updates - for (KeyValue kv : keyValueUpdateSet) { + kvit = keyValueUpdateSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); kv->encode(bbEncode); } + delete kvit; return bbEncode->array(); }