1 #include "Transaction.h"
3 Transaction::Transaction() {
4 parts = new Hashtable<int32_t, TransactionPart>();
5 keyValueGuardSet = new HashSet<KeyValue>();
6 keyValueUpdateSet = new HashSet<KeyValue>();
7 partsPendingSend = new Vector<int32_t>();
10 void Transaction::addPartEncode(TransactionPart *newPart) {
11 parts.put(newPart.getPartNumber(), newPart);
12 partsPendingSend.add(newPart.getPartNumber());
14 sequenceNumber = newPart.getSequenceNumber();
15 arbitratorId = newPart.getArbitratorId();
16 transactionId = newPart.getTransactionId();
17 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
18 machineId = newPart.getMachineId();
23 void Transaction::addPartDecode(TransactionPart *newPart) {
25 // If dead then just kill this part and move on
30 sequenceNumber = newPart.getSequenceNumber();
31 arbitratorId = newPart.getArbitratorId();
32 transactionId = newPart.getTransactionId();
33 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
34 machineId = newPart.getMachineId();
36 TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
38 if (previoslySeenPart != NULL) {
39 // Set dead the old one since the new one is a rescued version of this part
40 previoslySeenPart.setDead();
41 } else if (newPart.isLastPart()) {
42 missingParts = new HashSet<int32_t>();
45 for (int i = 0; i < newPart.getPartNumber(); i++) {
46 if (parts.get(i) == NULL) {
52 if (!isComplete && hasLastPart) {
54 // We have seen this part so remove it from the set of missing parts
55 missingParts.remove(newPart.getPartNumber());
57 // Check if all the parts have been seen
58 if (missingParts.size() == 0) {
60 // We have all the parts
63 // Decode all the parts and create the key value guard and update sets
64 decodeTransactionData();
69 void Transaction::addUpdateKV(KeyValue *kv) {
70 keyValueUpdateSet.add(kv);
73 void Transaction::addGuardKV(KeyValue *kv) {
74 keyValueGuardSet.add(kv);
78 int64_t Transaction::getSequenceNumber() {
79 return sequenceNumber;
82 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
83 sequenceNumber = _sequenceNumber;
85 for (int32_t i : parts.keySet()) {
86 parts.get(i).setSequenceNumber(sequenceNumber);
90 int64_t Transaction::getClientLocalSequenceNumber() {
91 return clientLocalSequenceNumber;
94 Hashtable<int32_t, TransactionPart *> *Transaction::getParts() {
98 bool Transaction::didSendAPartToServer() {
99 return didSendAPartToServer;
102 void Transaction::resetNextPartToSend() {
106 TransactionPart *Transaction::getNextPartToSend() {
107 if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
110 TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
116 void Transaction::setServerFailure() {
117 hadServerFailure = true;
120 bool Transaction::getServerFailure() {
121 return hadServerFailure;
125 void Transaction::resetServerFailure() {
126 hadServerFailure = false;
130 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
131 transactionStatus = _transactionStatus;
134 TransactionStatus *Transaction::getTransactionStatus() {
135 return transactionStatus;
138 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
140 if (partsPendingSend.removeAll(sentParts))
142 didSendAPartToServer = true;
143 transactionStatus.setTransactionSequenceNumber(sequenceNumber);
147 bool Transaction::didSendAllParts() {
148 return partsPendingSend.isEmpty();
151 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
152 return keyValueUpdateSet;
155 int Transaction::getNumberOfParts() {
159 int64_t Transaction::getMachineId() {
163 int64_t Transaction::getArbitrator() {
167 bool Transaction::isComplete() {
171 Pair<int64_t, int64_t> *Transaction::getId() {
172 return transactionId;
175 void Transaction::setDead() {
184 // Make all the parts of this transaction dead
185 for (int32_t partNumber : parts.keySet()) {
186 TransactionPart part = parts.get(partNumber);
191 TransactionPart *Transaction::getPart(int index) {
192 return parts.get(index);
195 void Transaction::decodeTransactionData() {
197 // Calculate the size of the data section
199 for (int i = 0; i < parts.keySet().size(); i++) {
200 TransactionPart tp = parts.get(i);
201 dataSize += tp.getDataSize();
204 Array<char> *combinedData = new char[dataSize];
205 int currentPosition = 0;
207 // Stitch all the data sections together
208 for (int i = 0; i < parts.keySet().size(); i++) {
209 TransactionPart tp = parts.get(i);
210 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
211 currentPosition += tp.getDataSize();
215 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
217 // Decode how many key value pairs need to be decoded
218 int numberOfKVGuards = bbDecode.getInt();
219 int numberOfKVUpdates = bbDecode.getInt();
221 // Decode all the guard key values
222 for (int i = 0; i < numberOfKVGuards; i++) {
223 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
224 keyValueGuardSet.add(kv);
227 // Decode all the updates key values
228 for (int i = 0; i < numberOfKVUpdates; i++) {
229 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
230 keyValueUpdateSet.add(kv);
234 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
235 for (KeyValue *kvGuard : keyValueGuardSet) {
237 // First check if the key is in the speculative table, this is the value of the latest assumption
240 // If we have a speculation table then use it first
241 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
242 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
245 // If we have a speculation table then use it first
246 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
247 kv = speculatedKeyValueTable.get(kvGuard.getKey());
251 // if it is not in the speculative table then check the committed table and use that
252 // value as our latest assumption
253 kv = committedKeyValueTable.get(kvGuard.getKey());
256 if (kvGuard.getValue() != NULL) {
257 if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
261 System.out.println(kvGuard.getValue() + " " + kv.getValue());
263 System.out.println(kvGuard.getValue() + " " + kv);