edits
[iotcloud.git] / version2 / src / C / Transaction.cc
index 526c0ba8dba74862cdf0ba608863eabc8818ebe0..1f862df5c42a46e392f0e4663737427baa7c60bd 100644 (file)
@@ -1,64 +1,84 @@
 #include "Transaction.h"
-
-Transaction::Transaction() {
-       parts = new Hashtable<int32_t, TransactionPart>();
-       keyValueGuardSet = new HashSet<KeyValue>();
-       keyValueUpdateSet = new HashSet<KeyValue>();
-       partsPendingSend = new Vector<int32_t>();
+#include "TransactionPart.h"
+#include "KeyValue.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
+#include "TransactionStatus.h"
+
+Transaction::Transaction() :
+       parts(new Vector<TransactionPart *>()),
+       partCount(0),
+       missingParts(NULL),
+       partsPendingSend(new Vector<int32_t>()),
+       fldisComplete(false),
+       hasLastPart(false),
+       keyValueGuardSet(new Hashset<KeyValue *>()),
+       keyValueUpdateSet(new Hashset<KeyValue *>()),
+       isDead(false),
+       sequenceNumber(-1),
+       clientLocalSequenceNumber(-1),
+       arbitratorId(-1),
+       machineId(-1),
+       transactionId(Pair<int64_t, int64_t>(0,0)),
+       hadServerFailure(false) {
 }
 
 void Transaction::addPartEncode(TransactionPart *newPart) {
-       parts.put(newPart.getPartNumber(), newPart);
-       partsPendingSend.add(newPart.getPartNumber());
-
-       sequenceNumber = newPart.getSequenceNumber();
-       arbitratorId = newPart.getArbitratorId();
-       transactionId = newPart.getTransactionId();
-       clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
-       machineId = newPart.getMachineId();
-
-       isComplete = true;
+       TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
+       if (old == NULL)
+               partCount++;
+       partsPendingSend->add(newPart->getPartNumber());
+
+       sequenceNumber = newPart->getSequenceNumber();
+       arbitratorId = newPart->getArbitratorId();
+       transactionId = newPart->getTransactionId();
+       clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+       machineId = newPart->getMachineId();
+
+       fldisComplete = true;
 }
 
 void Transaction::addPartDecode(TransactionPart *newPart) {
        if (isDead) {
                // If dead then just kill this part and move on
-               newPart.setDead();
+               newPart->setDead();
                return;
        }
 
-       sequenceNumber = newPart.getSequenceNumber();
-       arbitratorId = newPart.getArbitratorId();
-       transactionId = newPart.getTransactionId();
-       clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
-       machineId = newPart.getMachineId();
+       sequenceNumber = newPart->getSequenceNumber();
+       arbitratorId = newPart->getArbitratorId();
+       transactionId = newPart->getTransactionId();
+       clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+       machineId = newPart->getMachineId();
 
-       TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+       TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+       if (previouslySeenPart == NULL)
+               partCount++;
 
-       if (previoslySeenPart != NULL) {
+       if (previouslySeenPart != NULL) {
                // Set dead the old one since the new one is a rescued version of this part
-               previoslySeenPart.setDead();
-       } else if (newPart.isLastPart()) {
-               missingParts = new HashSet<int32_t>();
+               previouslySeenPart->setDead();
+       } else if (newPart->isLastPart()) {
+               missingParts = new Hashset<int32_t>();
                hasLastPart = true;
 
-               for (int i = 0; i < newPart.getPartNumber(); i++) {
-                       if (parts.get(i) == NULL) {
-                               missingParts.add(i);
+               for (int i = 0; i < newPart->getPartNumber(); i++) {
+                       if (parts->get(i) == NULL) {
+                               missingParts->add(i);
                        }
                }
        }
 
-       if (!isComplete && hasLastPart) {
+       if (!fldisComplete && hasLastPart) {
 
                // We have seen this part so remove it from the set of missing parts
-               missingParts.remove(newPart.getPartNumber());
+               missingParts->remove(newPart->getPartNumber());
 
                // Check if all the parts have been seen
-               if (missingParts.size() == 0) {
+               if (missingParts->size() == 0) {
 
                        // We have all the parts
-                       isComplete = true;
+                       fldisComplete = true;
 
                        // Decode all the parts and create the key value guard and update sets
                        decodeTransactionData();
@@ -67,11 +87,11 @@ void Transaction::addPartDecode(TransactionPart *newPart) {
 }
 
 void Transaction::addUpdateKV(KeyValue *kv) {
-       keyValueUpdateSet.add(kv);
+       keyValueUpdateSet->add(kv);
 }
 
 void Transaction::addGuardKV(KeyValue *kv) {
-       keyValueGuardSet.add(kv);
+       keyValueGuardSet->add(kv);
 }
 
 
@@ -82,8 +102,10 @@ int64_t Transaction::getSequenceNumber() {
 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
        sequenceNumber = _sequenceNumber;
 
-       for (int32_t i : parts.keySet()) {
-               parts.get(i).setSequenceNumber(sequenceNumber);
+       for (uint32_t i = 0; i < parts->size(); i++) {
+               TransactionPart *tp = parts->get(i);
+               if (tp != NULL)
+                       tp->setSequenceNumber(sequenceNumber);
        }
 }
 
@@ -91,12 +113,12 @@ int64_t Transaction::getClientLocalSequenceNumber() {
        return clientLocalSequenceNumber;
 }
 
-Hashtable<int32_t, TransactionPart *> *Transaction::getParts() {
+Vector<TransactionPart *> *Transaction::getParts() {
        return parts;
 }
 
 bool Transaction::didSendAPartToServer() {
-       return didSendAPartToServer;
+       return flddidSendAPartToServer;
 }
 
 void Transaction::resetNextPartToSend() {
@@ -104,10 +126,10 @@ void Transaction::resetNextPartToSend() {
 }
 
 TransactionPart *Transaction::getNextPartToSend() {
-       if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
+       if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
                return NULL;
        }
-       TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
+       TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
        nextPartToSend++;
        return part;
 }
@@ -137,15 +159,30 @@ TransactionStatus *Transaction::getTransactionStatus() {
 
 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
        nextPartToSend = 0;
-       if (partsPendingSend.removeAll(sentParts))
-       {
-               didSendAPartToServer = true;
-               transactionStatus.setTransactionSequenceNumber(sequenceNumber);
+       bool changed = false;
+       uint lastusedindex = 0;
+       for (uint i = 0; i < partsPendingSend->size(); i++) {
+               int32_t parti = partsPendingSend->get(i);
+               for (uint j = 0; j < sentParts->size(); j++) {
+                       int32_t partj = sentParts->get(j);
+                       if (parti == partj) {
+                               changed = true;
+                               goto NextElement;
+                       }
+               }
+               partsPendingSend->set(lastusedindex++, parti);
+NextElement:
+               ;
+       }
+       if (changed) {
+               partsPendingSend->setSize(lastusedindex);
+               flddidSendAPartToServer = true;
+               transactionStatus->setTransactionSequenceNumber(sequenceNumber);
        }
 }
 
 bool Transaction::didSendAllParts() {
-       return partsPendingSend.isEmpty();
+       return partsPendingSend->isEmpty();
 }
 
 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
@@ -153,7 +190,7 @@ Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
 }
 
 int Transaction::getNumberOfParts() {
-       return parts.size();
+       return partCount;
 }
 
 int64_t Transaction::getMachineId() {
@@ -165,112 +202,111 @@ int64_t Transaction::getArbitrator() {
 }
 
 bool Transaction::isComplete() {
-       return isComplete;
+       return fldisComplete;
 }
 
 Pair<int64_t, int64_t> *Transaction::getId() {
-       return transactionId;
+       return &transactionId;
 }
 
 void Transaction::setDead() {
-       if (isDead) {
-               // Already dead
-               return;
-       }
-
-       // Set dead
-       isDead = true;
-
-       // Make all the parts of this transaction dead
-       for (int32_t partNumber : parts.keySet()) {
-               TransactionPart part = parts.get(partNumber);
-               part.setDead();
+       if (!isDead) {
+               // Set dead
+               isDead = true;
+               // Make all the parts of this transaction dead
+               for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+                       TransactionPart *part = parts->get(partNumber);
+                       if (part != NULL)
+                               part->setDead();
+               }
        }
 }
 
 TransactionPart *Transaction::getPart(int index) {
-       return parts.get(index);
+       return parts->get(index);
 }
 
 void Transaction::decodeTransactionData() {
-
        // Calculate the size of the data section
        int dataSize = 0;
-       for (int i = 0; i < parts.keySet().size(); i++) {
-               TransactionPart tp = parts.get(i);
-               dataSize += tp.getDataSize();
+       for (uint i = 0; i < parts->size(); i++) {
+               TransactionPart *tp = parts->get(i);
+               dataSize += tp->getDataSize();
        }
 
-       Array<char> *combinedData = new char[dataSize];
+       Array<char> *combinedData = new Array<char>(dataSize);
        int currentPosition = 0;
 
        // Stitch all the data sections together
-       for (int i = 0; i < parts.keySet().size(); i++) {
-               TransactionPart tp = parts.get(i);
-               System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
-               currentPosition += tp.getDataSize();
+       for (uint i = 0; i < parts->size(); i++) {
+               TransactionPart *tp = parts->get(i);
+               System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+               currentPosition += tp->getDataSize();
        }
 
        // Decoder Object
-       ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+       ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
 
        // Decode how many key value pairs need to be decoded
-       int numberOfKVGuards = bbDecode.getInt();
-       int numberOfKVUpdates = bbDecode.getInt();
+       int numberOfKVGuards = bbDecode->getInt();
+       int numberOfKVUpdates = bbDecode->getInt();
 
        // Decode all the guard key values
        for (int i = 0; i < numberOfKVGuards; i++) {
-               KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
-               keyValueGuardSet.add(kv);
+               KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+               keyValueGuardSet->add(kv);
        }
 
        // Decode all the updates key values
        for (int i = 0; i < numberOfKVUpdates; i++) {
-               KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
-               keyValueUpdateSet.add(kv);
+               KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+               keyValueUpdateSet->add(kv);
        }
 }
 
-bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
-       for (KeyValue *kvGuard : keyValueGuardSet) {
-
+bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
+       SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kvGuard = kvit->next();
                // First check if the key is in the speculative table, this is the value of the latest assumption
-               KeyValue kv = NULL;
+               KeyValue *kv = NULL;
 
                // If we have a speculation table then use it first
                if (pendingTransactionSpeculatedKeyValueTable != NULL) {
-                       kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
+                       kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
                }
 
                // If we have a speculation table then use it first
                if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
-                       kv = speculatedKeyValueTable.get(kvGuard.getKey());
+                       kv = speculatedKeyValueTable->get(kvGuard->getKey());
                }
 
                if (kv == NULL) {
                        // if it is not in the speculative table then check the committed table and use that
                        // value as our latest assumption
-                       kv = committedKeyValueTable.get(kvGuard.getKey());
+                       kv = committedKeyValueTable->get(kvGuard->getKey());
                }
 
-               if (kvGuard.getValue() != NULL) {
-                       if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
+               if (kvGuard->getValue() != NULL) {
+                       if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
 
 
                                if (kv != NULL) {
-                                       System.out.println(kvGuard.getValue() + "       " + kv.getValue());
+                                       printf("%s      %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
                                } else {
-                                       System.out.println(kvGuard.getValue() + "       " + kv);
+                                       printf("%s      null\n", kvGuard->getValue()->internalBytes()->internalArray());
                                }
-
+                               delete kvit;
                                return false;
                        }
                } else {
                        if (kv != NULL) {
+                               delete kvit;
                                return false;
                        }
                }
        }
+       delete kvit;
        return true;
 }