1 #include "PendingTransaction.h"
4 #include "Transaction.h"
5 #include "TransactionPart.h"
6 #include "ByteBuffer.h"
8 PendingTransaction::PendingTransaction(int64_t _machineId) :
9 keyValueUpdateSet(new Hashset<KeyValue *>()),
10 keyValueGuardSet(new Hashset<KeyValue *>()),
12 clientLocalSequenceNumber(-1),
13 machineId(_machineId),
18 * Add a new key value to the updates
21 void PendingTransaction::addKV(KeyValue *newKV) {
23 KeyValue *rmKV = NULL;
25 // Make sure there are no duplicates
26 SetIterator<KeyValue *> *kvit = keyValueUpdateSet->iterator();
27 while (kvit->hasNext()) {
28 KeyValue *kv = kvit->next();
29 if (kv->getKey()->equals(newKV->getKey())) {
31 // Remove key if we are adding a newer version of the same key
38 // Remove key if we are adding a newer version of the same key
40 keyValueUpdateSet->remove(rmKV);
41 currentDataSize -= rmKV->getSize();
44 // Add the key to the hash set
45 keyValueUpdateSet->add(newKV);
46 currentDataSize += newKV->getSize();
50 * Add a new key value to the guard set
53 void PendingTransaction::addKVGuard(KeyValue *newKV) {
54 // Add the key to the hash set
55 keyValueGuardSet->add(newKV);
56 currentDataSize += newKV->getSize();
60 * Checks if the arbitrator is the same
62 bool PendingTransaction::checkArbitrator(int64_t arb) {
63 if (arbitrator == -1) {
68 return arb == arbitrator;
71 bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *keyValTableCommitted, Hashtable<IoTString *, KeyValue *> *keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> *keyValTablePendingTransSpeculative) {
72 SetIterator<KeyValue *> *kvit = keyValueGuardSet->iterator();
73 while (kvit->hasNext()) {
74 KeyValue *kvGuard = kvit->next();
76 // First check if the key is in the speculative table, this is the value of the latest assumption
77 KeyValue *kv = keyValTablePendingTransSpeculative->get(kvGuard->getKey());
81 // if it is not in the pending trans table then check the speculative table and use that
82 // value as our latest assumption
83 kv = keyValTableSpeculative->get(kvGuard->getKey());
88 // if it is not in the speculative table then check the committed table and use that
89 // value as our latest assumption
90 kv = keyValTableCommitted->get(kvGuard->getKey());
93 if (kvGuard->getValue() != NULL) {
94 if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
109 Transaction *PendingTransaction::createTransaction() {
110 Transaction *newTransaction = new Transaction();
111 int transactionPartCount = 0;
113 // Convert all the data into a char array so we can start partitioning
114 Array<char> *charData = convertDataToBytes();
116 int currentPosition = 0;
117 int remaining = charData->length();
119 while (remaining > 0) {
121 bool isLastPart = false;
122 // determine how much to copy
123 int copySize = TransactionPart_MAX_NON_HEADER_SIZE;
124 if (remaining <= TransactionPart_MAX_NON_HEADER_SIZE) {
125 copySize = remaining;
126 isLastPart = true;// last bit of data so last part
129 // Copy to a smaller version
130 Array<char> *partData = new Array<char>(copySize);
131 System_arraycopy(charData, currentPosition, partData, 0, copySize);
133 TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
134 newTransaction->addPartEncode(part);
136 // Update position, count and remaining
137 currentPosition += copySize;
138 transactionPartCount++;
139 remaining -= copySize;
142 // Add the Guard Conditions
143 SetIterator<KeyValue *> *kvit = keyValueGuardSet->iterator();
144 while (kvit->hasNext()) {
145 KeyValue *kv = kvit->next();
146 newTransaction->addGuardKV(kv);
151 kvit = keyValueUpdateSet->iterator();
152 while (kvit->hasNext()) {
153 KeyValue *kv = kvit->next();
154 newTransaction->addUpdateKV(kv);
157 return newTransaction;
160 Array<char> *PendingTransaction::convertDataToBytes() {
161 // Calculate the size of the data
162 int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
163 sizeOfData += currentDataSize;
165 // Data handlers and storage
166 Array<char> *dataArray = new Array<char>(sizeOfData);
167 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
169 // Encode the size of the updates and guard sets
170 bbEncode->putInt(keyValueGuardSet->size());
171 bbEncode->putInt(keyValueUpdateSet->size());
173 // Encode all the guard conditions
174 SetIterator<KeyValue *> *kvit = keyValueGuardSet->iterator();
175 while (kvit->hasNext()) {
176 KeyValue *kv = kvit->next();
177 kv->encode(bbEncode);
181 // Encode all the updates
182 kvit = keyValueUpdateSet->iterator();
183 while (kvit->hasNext()) {
184 KeyValue *kv = kvit->next();
185 kv->encode(bbEncode);
189 return bbEncode->array();