#include "TransactionStatus.h"
Transaction::Transaction() :
- parts(new Hashtable<int32_t, TransactionPart *>()),
+ parts(new Vector<TransactionPart *>()),
+ partCount(0),
missingParts(NULL),
partsPendingSend(new Vector<int32_t>()),
fldisComplete(false),
clientLocalSequenceNumber(-1),
arbitratorId(-1),
machineId(-1),
- transactionId(NULL),
+ transactionId(Pair<int64_t, int64_t>(0,0)),
hadServerFailure(false) {
}
void Transaction::addPartEncode(TransactionPart *newPart) {
- parts->put(newPart->getPartNumber(), newPart);
+ TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
+ if (old == NULL)
+ partCount++;
partsPendingSend->add(newPart->getPartNumber());
sequenceNumber = newPart->getSequenceNumber();
clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
machineId = newPart->getMachineId();
- TransactionPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
+ TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+ if (previouslySeenPart == NULL)
+ partCount++;
- if (previoslySeenPart != NULL) {
+ if (previouslySeenPart != NULL) {
// Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart->setDead();
+ previouslySeenPart->setDead();
} else if (newPart->isLastPart()) {
missingParts = new Hashset<int32_t>();
hasLastPart = true;
void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
sequenceNumber = _sequenceNumber;
- for (int32_t i : parts->keySet()) {
- parts->get(i)->setSequenceNumber(sequenceNumber);
+ for (int32_t i = 0; i < parts->size(); i++) {
+ TransactionPart *tp = parts->get(i);
+ if (tp != NULL)
+ tp->setSequenceNumber(sequenceNumber);
}
}
return clientLocalSequenceNumber;
}
-Hashtable<int32_t, TransactionPart *> *Transaction::getParts() {
+Vector<TransactionPart *> *Transaction::getParts() {
return parts;
}
void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
nextPartToSend = 0;
- if (partsPendingSend->removeAll(sentParts))
- {
+ bool changed = false;
+ uint lastusedindex = 0;
+ for (uint i = 0; i < partsPendingSend->size(); i++) {
+ int32_t parti = partsPendingSend->get(i);
+ for (uint j = 0; j < sentParts->size(); j++) {
+ int32_t partj = sentParts->get(j);
+ if (parti == partj) {
+ changed = true;
+ goto NextElement;
+ }
+ }
+ partsPendingSend->set(lastusedindex++, parti);
+NextElement:
+ ;
+ }
+ if (changed) {
+ partsPendingSend->setSize(lastusedindex);
flddidSendAPartToServer = true;
transactionStatus->setTransactionSequenceNumber(sequenceNumber);
}
}
int Transaction::getNumberOfParts() {
- return parts->size();
+ return partCount;
}
int64_t Transaction::getMachineId() {
return fldisComplete;
}
-Pair<int64_t, int64_t> *Transaction::getId() {
- return transactionId;
+Pair<int64_t, int64_t> * Transaction::getId() {
+ return & transactionId;
}
void Transaction::setDead() {
- if (isDead) {
- // Already dead
- return;
- }
-
- // Set dead
- isDead = true;
-
- // Make all the parts of this transaction dead
- for (int32_t partNumber : parts->keySet()) {
- TransactionPart *part = parts->get(partNumber);
- part->setDead();
+ if (!isDead) {
+ // Set dead
+ isDead = true;
+ // Make all the parts of this transaction dead
+ for (int32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+ TransactionPart *part = parts->get(partNumber);
+ if (part != NULL)
+ part->setDead();
+ }
}
}
}
void Transaction::decodeTransactionData() {
-
// Calculate the size of the data section
int dataSize = 0;
- for (int i = 0; i < parts->keySet()->size(); i++) {
+ for (int i = 0; i < parts->size(); i++) {
TransactionPart *tp = parts->get(i);
dataSize += tp->getDataSize();
}
int currentPosition = 0;
// Stitch all the data sections together
- for (int i = 0; i < parts->keySet()->size(); i++) {
+ for (int i = 0; i < parts->size(); i++) {
TransactionPart *tp = parts->get(i);
System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
currentPosition += tp->getDataSize();
}
bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
- for (KeyValue *kvGuard : keyValueGuardSet) {
-
+ SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kvGuard = kvit->next();
// First check if the key is in the speculative table, this is the value of the latest assumption
KeyValue *kv = NULL;
} else {
printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray());
}
-
+ delete kvit;
return false;
}
} else {
if (kv != NULL) {
+ delete kvit;
return false;
}
}
}
+ delete kvit;
return true;
}