edits
[iotcloud.git] / version2 / src / C / Transaction.cc
index 691a78f9609118aa8d00c3467606bb8380dbf825..1f862df5c42a46e392f0e4663737427baa7c60bd 100644 (file)
@@ -1,7 +1,13 @@
 #include "Transaction.h"
+#include "TransactionPart.h"
+#include "KeyValue.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
+#include "TransactionStatus.h"
 
 Transaction::Transaction() :
-       parts(new Hashtable<int32_t, TransactionPart *>()),
+       parts(new Vector<TransactionPart *>()),
+       partCount(0),
        missingParts(NULL),
        partsPendingSend(new Vector<int32_t>()),
        fldisComplete(false),
@@ -13,12 +19,14 @@ Transaction::Transaction() :
        clientLocalSequenceNumber(-1),
        arbitratorId(-1),
        machineId(-1),
-       transactionId(NULL),
+       transactionId(Pair<int64_t, int64_t>(0,0)),
        hadServerFailure(false) {
 }
 
 void Transaction::addPartEncode(TransactionPart *newPart) {
-       parts->put(newPart->getPartNumber(), newPart);
+       TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
+       if (old == NULL)
+               partCount++;
        partsPendingSend->add(newPart->getPartNumber());
 
        sequenceNumber = newPart->getSequenceNumber();
@@ -43,11 +51,13 @@ void Transaction::addPartDecode(TransactionPart *newPart) {
        clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
        machineId = newPart->getMachineId();
 
-       TransactionPart previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
+       TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+       if (previouslySeenPart == NULL)
+               partCount++;
 
-       if (previoslySeenPart != NULL) {
+       if (previouslySeenPart != NULL) {
                // Set dead the old one since the new one is a rescued version of this part
-               previoslySeenPart->setDead();
+               previouslySeenPart->setDead();
        } else if (newPart->isLastPart()) {
                missingParts = new Hashset<int32_t>();
                hasLastPart = true;
@@ -92,8 +102,10 @@ int64_t Transaction::getSequenceNumber() {
 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
        sequenceNumber = _sequenceNumber;
 
-       for (int32_t i : parts->keySet()) {
-               parts->get(i)->setSequenceNumber(sequenceNumber);
+       for (uint32_t i = 0; i < parts->size(); i++) {
+               TransactionPart *tp = parts->get(i);
+               if (tp != NULL)
+                       tp->setSequenceNumber(sequenceNumber);
        }
 }
 
@@ -101,7 +113,7 @@ int64_t Transaction::getClientLocalSequenceNumber() {
        return clientLocalSequenceNumber;
 }
 
-Hashtable<int32_t, TransactionPart *> *Transaction::getParts() {
+Vector<TransactionPart *> *Transaction::getParts() {
        return parts;
 }
 
@@ -117,7 +129,7 @@ TransactionPart *Transaction::getNextPartToSend() {
        if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
                return NULL;
        }
-       TransactionPart part = parts->get(partsPendingSend->get(nextPartToSend));
+       TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
        nextPartToSend++;
        return part;
 }
@@ -147,8 +159,23 @@ TransactionStatus *Transaction::getTransactionStatus() {
 
 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
        nextPartToSend = 0;
-       if (partsPendingSend->removeAll(sentParts))
-       {
+       bool changed = false;
+       uint lastusedindex = 0;
+       for (uint i = 0; i < partsPendingSend->size(); i++) {
+               int32_t parti = partsPendingSend->get(i);
+               for (uint j = 0; j < sentParts->size(); j++) {
+                       int32_t partj = sentParts->get(j);
+                       if (parti == partj) {
+                               changed = true;
+                               goto NextElement;
+                       }
+               }
+               partsPendingSend->set(lastusedindex++, parti);
+NextElement:
+               ;
+       }
+       if (changed) {
+               partsPendingSend->setSize(lastusedindex);
                flddidSendAPartToServer = true;
                transactionStatus->setTransactionSequenceNumber(sequenceNumber);
        }
@@ -163,7 +190,7 @@ Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
 }
 
 int Transaction::getNumberOfParts() {
-       return parts->size();
+       return partCount;
 }
 
 int64_t Transaction::getMachineId() {
@@ -179,22 +206,19 @@ bool Transaction::isComplete() {
 }
 
 Pair<int64_t, int64_t> *Transaction::getId() {
-       return transactionId;
+       return &transactionId;
 }
 
 void Transaction::setDead() {
-       if (isDead) {
-               // Already dead
-               return;
-       }
-
-       // Set dead
-       isDead = true;
-
-       // Make all the parts of this transaction dead
-       for (int32_t partNumber : parts->keySet()) {
-               TransactionPart part = parts->get(partNumber);
-               part->setDead();
+       if (!isDead) {
+               // Set dead
+               isDead = true;
+               // Make all the parts of this transaction dead
+               for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+                       TransactionPart *part = parts->get(partNumber);
+                       if (part != NULL)
+                               part->setDead();
+               }
        }
 }
 
@@ -203,26 +227,25 @@ TransactionPart *Transaction::getPart(int index) {
 }
 
 void Transaction::decodeTransactionData() {
-
        // Calculate the size of the data section
        int dataSize = 0;
-       for (int i = 0; i < parts->keySet()->size(); i++) {
-               TransactionPart tp = parts->get(i);
+       for (uint i = 0; i < parts->size(); i++) {
+               TransactionPart *tp = parts->get(i);
                dataSize += tp->getDataSize();
        }
 
-       Array<char> *combinedData = new char[dataSize];
+       Array<char> *combinedData = new Array<char>(dataSize);
        int currentPosition = 0;
 
        // Stitch all the data sections together
-       for (int i = 0; i < parts->keySet()->size(); i++) {
-               TransactionPart tp = parts->get(i);
+       for (uint i = 0; i < parts->size(); i++) {
+               TransactionPart *tp = parts->get(i);
                System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
                currentPosition += tp->getDataSize();
        }
 
        // Decoder Object
-       ByteBuffer bbDecode = ByteBuffer_wrap(combinedData);
+       ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
 
        // Decode how many key value pairs need to be decoded
        int numberOfKVGuards = bbDecode->getInt();
@@ -230,22 +253,23 @@ void Transaction::decodeTransactionData() {
 
        // Decode all the guard key values
        for (int i = 0; i < numberOfKVGuards; i++) {
-               KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode);
+               KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
                keyValueGuardSet->add(kv);
        }
 
        // Decode all the updates key values
        for (int i = 0; i < numberOfKVUpdates; i++) {
-               KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode);
+               KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
                keyValueUpdateSet->add(kv);
        }
 }
 
-bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
-       for (KeyValue *kvGuard : keyValueGuardSet) {
-
+bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
+       SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kvGuard = kvit->next();
                // First check if the key is in the speculative table, this is the value of the latest assumption
-               KeyValue * kv = NULL;
+               KeyValue *kv = NULL;
 
                // If we have a speculation table then use it first
                if (pendingTransactionSpeculatedKeyValueTable != NULL) {
@@ -268,19 +292,21 @@ bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKey
 
 
                                if (kv != NULL) {
-                                       System.out.println(kvGuard->getValue() + "       " + kv->getValue());
+                                       printf("%s      %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
                                } else {
-                                       System.out.println(kvGuard->getValue() + "       " + kv);
+                                       printf("%s      null\n", kvGuard->getValue()->internalBytes()->internalArray());
                                }
-
+                               delete kvit;
                                return false;
                        }
                } else {
                        if (kv != NULL) {
+                               delete kvit;
                                return false;
                        }
                }
        }
+       delete kvit;
        return true;
 }