X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTransaction.cc;h=1f862df5c42a46e392f0e4663737427baa7c60bd;hp=00d0882c5da034996ffedb134410e1525500e6f1;hb=d28d6cb0b30fcb629eb66feb8506c7e76a3652f8;hpb=c3666021ffae881f13e3c293ab427c292ccbfd02 diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index 00d0882..1f862df 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -1,300 +1,312 @@ +#include "Transaction.h" +#include "TransactionPart.h" +#include "KeyValue.h" +#include "ByteBuffer.h" +#include "IoTString.h" +#include "TransactionStatus.h" + +Transaction::Transaction() : + parts(new Vector()), + partCount(0), + missingParts(NULL), + partsPendingSend(new Vector()), + fldisComplete(false), + hasLastPart(false), + keyValueGuardSet(new Hashset()), + keyValueUpdateSet(new Hashset()), + isDead(false), + sequenceNumber(-1), + clientLocalSequenceNumber(-1), + arbitratorId(-1), + machineId(-1), + transactionId(Pair(0,0)), + hadServerFailure(false) { +} + +void Transaction::addPartEncode(TransactionPart *newPart) { + TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart); + if (old == NULL) + partCount++; + partsPendingSend->add(newPart->getPartNumber()); + + sequenceNumber = newPart->getSequenceNumber(); + arbitratorId = newPart->getArbitratorId(); + transactionId = newPart->getTransactionId(); + clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); + machineId = newPart->getMachineId(); + + fldisComplete = true; +} + +void Transaction::addPartDecode(TransactionPart *newPart) { + if (isDead) { + // If dead then just kill this part and move on + newPart->setDead(); + return; + } + + sequenceNumber = newPart->getSequenceNumber(); + arbitratorId = newPart->getArbitratorId(); + transactionId = newPart->getTransactionId(); + clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); + machineId = newPart->getMachineId(); + + TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart); + if (previouslySeenPart == NULL) + partCount++; + + if (previouslySeenPart != NULL) { + // Set dead the old one since the new one is a rescued version of this part + previouslySeenPart->setDead(); + } else if (newPart->isLastPart()) { + missingParts = new Hashset(); + hasLastPart = true; + + for (int i = 0; i < newPart->getPartNumber(); i++) { + if (parts->get(i) == NULL) { + missingParts->add(i); + } + } + } + + if (!fldisComplete && hasLastPart) { + + // We have seen this part so remove it from the set of missing parts + missingParts->remove(newPart->getPartNumber()); + + // Check if all the parts have been seen + if (missingParts->size() == 0) { + + // We have all the parts + fldisComplete = true; + + // Decode all the parts and create the key value guard and update sets + decodeTransactionData(); + } + } +} + +void Transaction::addUpdateKV(KeyValue *kv) { + keyValueUpdateSet->add(kv); +} + +void Transaction::addGuardKV(KeyValue *kv) { + keyValueGuardSet->add(kv); +} + + +int64_t Transaction::getSequenceNumber() { + return sequenceNumber; +} + +void Transaction::setSequenceNumber(int64_t _sequenceNumber) { + sequenceNumber = _sequenceNumber; + + for (uint32_t i = 0; i < parts->size(); i++) { + TransactionPart *tp = parts->get(i); + if (tp != NULL) + tp->setSequenceNumber(sequenceNumber); + } +} + +int64_t Transaction::getClientLocalSequenceNumber() { + return clientLocalSequenceNumber; +} + +Vector *Transaction::getParts() { + return parts; +} + +bool Transaction::didSendAPartToServer() { + return flddidSendAPartToServer; +} + +void Transaction::resetNextPartToSend() { + nextPartToSend = 0; +} +TransactionPart *Transaction::getNextPartToSend() { + if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) { + return NULL; + } + TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend)); + nextPartToSend++; + return part; +} -class Transaction { - - Map parts = NULL; - Set missingParts = NULL; - List partsPendingSend = NULL; - bool isComplete = false; - bool hasLastPart = false; - Set keyValueGuardSet = NULL; - Set keyValueUpdateSet = NULL; - bool isDead = false; - int64_t sequenceNumber = -1; - int64_t clientLocalSequenceNumber = -1; - int64_t arbitratorId = -1; - int64_t machineId = -1; - Pair transactionId = NULL; - int nextPartToSend = 0; - bool didSendAPartToServer = false; +void Transaction::setServerFailure() { + hadServerFailure = true; +} - TransactionStatus transactionStatus = NULL; +bool Transaction::getServerFailure() { + return hadServerFailure; +} - bool hadServerFailure = false; - Transaction() { - parts = new HashMap(); - keyValueGuardSet = new HashSet(); - keyValueUpdateSet = new HashSet(); - partsPendingSend = new ArrayList(); - } - - void addPartEncode(TransactionPart newPart) { - parts.put(newPart.getPartNumber(), newPart); - partsPendingSend.add(newPart.getPartNumber()); - - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); - - isComplete = true; - } - - void addPartDecode(TransactionPart newPart) { +void Transaction::resetServerFailure() { + hadServerFailure = false; +} - if (isDead) { - // If dead then just kill this part and move on - newPart.setDead(); - return; - } - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); +void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) { + transactionStatus = _transactionStatus; +} - TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); +TransactionStatus *Transaction::getTransactionStatus() { + return transactionStatus; +} - if (previoslySeenPart != NULL) { - // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { - missingParts = new HashSet(); - hasLastPart = true; - - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); - } - } - } +void Transaction::removeSentParts(Vector *sentParts) { + nextPartToSend = 0; + bool changed = false; + uint lastusedindex = 0; + for (uint i = 0; i < partsPendingSend->size(); i++) { + int32_t parti = partsPendingSend->get(i); + for (uint j = 0; j < sentParts->size(); j++) { + int32_t partj = sentParts->get(j); + if (parti == partj) { + changed = true; + goto NextElement; + } + } + partsPendingSend->set(lastusedindex++, parti); +NextElement: + ; + } + if (changed) { + partsPendingSend->setSize(lastusedindex); + flddidSendAPartToServer = true; + transactionStatus->setTransactionSequenceNumber(sequenceNumber); + } +} - if (!isComplete && hasLastPart) { - - // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); - - // Check if all the parts have been seen - if (missingParts.size() == 0) { +bool Transaction::didSendAllParts() { + return partsPendingSend->isEmpty(); +} + +Hashset *Transaction::getKeyValueUpdateSet() { + return keyValueUpdateSet; +} - // We have all the parts - isComplete = true; +int Transaction::getNumberOfParts() { + return partCount; +} - // Decode all the parts and create the key value guard and update sets - decodeTransactionData(); - } - } - } - - void addUpdateKV(KeyValue kv) { - keyValueUpdateSet.add(kv); - } - - void addGuardKV(KeyValue kv) { - keyValueGuardSet.add(kv); - } - - - int64_t getSequenceNumber() { - return sequenceNumber; - } - - void setSequenceNumber(int64_t _sequenceNumber) { - sequenceNumber = _sequenceNumber; - - for (Integer i : parts.keySet()) { - parts.get(i).setSequenceNumber(sequenceNumber); - } - } - - int64_t getClientLocalSequenceNumber() { - return clientLocalSequenceNumber; - } - - Map getParts() { - return parts; - } +int64_t Transaction::getMachineId() { + return machineId; +} - bool didSendAPartToServer() { - return didSendAPartToServer; - } - - void resetNextPartToSend() { - nextPartToSend = 0; - } - - TransactionPart getNextPartToSend() { - if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { - return NULL; - } - TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); - nextPartToSend++; - return part; - } - - - void setServerFailure() { - hadServerFailure = true; - } - - bool getServerFailure() { - return hadServerFailure; - } - - - void resetServerFailure() { - hadServerFailure = false; - } - - - void setTransactionStatus(TransactionStatus _transactionStatus) { - transactionStatus = _transactionStatus; - } - - TransactionStatus getTransactionStatus() { - return transactionStatus; - } - - void removeSentParts(List sentParts) { - nextPartToSend = 0; - if(partsPendingSend.removeAll(sentParts)) - { - didSendAPartToServer = true; - transactionStatus.setTransactionSequenceNumber(sequenceNumber); - } - } - - bool didSendAllParts() { - return partsPendingSend.isEmpty(); - } - - Set getKeyValueUpdateSet() { - return keyValueUpdateSet; - } - - int getNumberOfParts() { - return parts.size(); - } - - int64_t getMachineId() { - return machineId; - } - - int64_t getArbitrator() { - return arbitratorId; - } - - bool isComplete() { - return isComplete; - } - - Pair getId() { - return transactionId; - } - - void setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (Integer partNumber : parts.keySet()) { - TransactionPart part = parts.get(partNumber); - part.setDead(); - } - } - - TransactionPart getPart(int index) { - return parts.get(index); - } - - void decodeTransactionData() { - - // Calculate the size of the data section - int dataSize = 0; - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - dataSize += tp.getDataSize(); - } - - char[] combinedData = new char[dataSize]; - int currentPosition = 0; - - // Stitch all the data sections together - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); - currentPosition += tp.getDataSize(); - } - - // Decoder Object - ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); - - // Decode how many key value pairs need to be decoded - int numberOfKVGuards = bbDecode.getInt(); - int numberOfKVUpdates = bbDecode.getInt(); - - // Decode all the guard key values - for (int i = 0; i < numberOfKVGuards; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueGuardSet.add(kv); - } - - // Decode all the updates key values - for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueUpdateSet.add(kv); - } - } - - bool evaluateGuard(Map committedKeyValueTable, Map speculatedKeyValueTable, Map pendingTransactionSpeculatedKeyValueTable) { - for (KeyValue kvGuard : keyValueGuardSet) { - - // First check if the key is in the speculative table, this is the value of the latest assumption - KeyValue kv = NULL; - - // If we have a speculation table then use it first - if (pendingTransactionSpeculatedKeyValueTable != NULL) { - kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); - } - - // If we have a speculation table then use it first - if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { - kv = speculatedKeyValueTable.get(kvGuard.getKey()); - } - - if (kv == NULL) { - // if it is not in the speculative table then check the committed table and use that - // value as our latest assumption - kv = committedKeyValueTable.get(kvGuard.getKey()); - } - - if (kvGuard.getValue() != NULL) { - if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { - - - if (kv != NULL) { - System.out.println(kvGuard.getValue() + " " + kv.getValue()); - } else { - System.out.println(kvGuard.getValue() + " " + kv); - } - - return false; - } - } else { - if (kv != NULL) { - return false; - } - } - } - return true; - } +int64_t Transaction::getArbitrator() { + return arbitratorId; } + +bool Transaction::isComplete() { + return fldisComplete; +} + +Pair *Transaction::getId() { + return &transactionId; +} + +void Transaction::setDead() { + if (!isDead) { + // Set dead + isDead = true; + // Make all the parts of this transaction dead + for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) { + TransactionPart *part = parts->get(partNumber); + if (part != NULL) + part->setDead(); + } + } +} + +TransactionPart *Transaction::getPart(int index) { + return parts->get(index); +} + +void Transaction::decodeTransactionData() { + // Calculate the size of the data section + int dataSize = 0; + for (uint i = 0; i < parts->size(); i++) { + TransactionPart *tp = parts->get(i); + dataSize += tp->getDataSize(); + } + + Array *combinedData = new Array(dataSize); + int currentPosition = 0; + + // Stitch all the data sections together + for (uint i = 0; i < parts->size(); i++) { + TransactionPart *tp = parts->get(i); + System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); + currentPosition += tp->getDataSize(); + } + + // Decoder Object + ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData); + + // Decode how many key value pairs need to be decoded + int numberOfKVGuards = bbDecode->getInt(); + int numberOfKVUpdates = bbDecode->getInt(); + + // Decode all the guard key values + for (int i = 0; i < numberOfKVGuards; i++) { + KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode); + keyValueGuardSet->add(kv); + } + + // Decode all the updates key values + for (int i = 0; i < numberOfKVUpdates; i++) { + KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode); + keyValueUpdateSet->add(kv); + } +} + +bool Transaction::evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable) { + SetIterator *kvit = keyValueGuardSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kvGuard = kvit->next(); + // First check if the key is in the speculative table, this is the value of the latest assumption + KeyValue *kv = NULL; + + // If we have a speculation table then use it first + if (pendingTransactionSpeculatedKeyValueTable != NULL) { + kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey()); + } + + // If we have a speculation table then use it first + if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { + kv = speculatedKeyValueTable->get(kvGuard->getKey()); + } + + if (kv == NULL) { + // if it is not in the speculative table then check the committed table and use that + // value as our latest assumption + kv = committedKeyValueTable->get(kvGuard->getKey()); + } + + if (kvGuard->getValue() != NULL) { + if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) { + + + if (kv != NULL) { + printf("%s %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray()); + } else { + printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray()); + } + delete kvit; + return false; + } + } else { + if (kv != NULL) { + delete kvit; + return false; + } + } + } + delete kvit; + return true; +} +