7 Hashtable<int32_t, TransactionPart *> parts = NULL;
8 Set<int32_t> missingParts = NULL;
9 Vector<int32_t> partsPendingSend = NULL;
10 bool isComplete = false;
11 bool hasLastPart = false;
12 Hashset<KeyValue *> *keyValueGuardSet = NULL;
13 Hashset<KeyValue *> *keyValueUpdateSet = NULL;
15 int64_t sequenceNumber = -1;
16 int64_t clientLocalSequenceNumber = -1;
17 int64_t arbitratorId = -1;
18 int64_t machineId = -1;
19 Pair<uint64_t, uint64_t> transactionId = NULL;
21 int nextPartToSend = 0;
22 bool didSendAPartToServer = false;
24 TransactionStatus transactionStatus = NULL;
26 bool hadServerFailure = false;
28 public Transaction() {
29 parts = new Hashtable<int32_t, TransactionPart>();
30 keyValueGuardSet = new HashSet<KeyValue>();
31 keyValueUpdateSet = new HashSet<KeyValue>();
32 partsPendingSend = new Vector<int32_t>();
35 public void addPartEncode(TransactionPart newPart) {
36 parts.put(newPart.getPartNumber(), newPart);
37 partsPendingSend.add(newPart.getPartNumber());
39 sequenceNumber = newPart.getSequenceNumber();
40 arbitratorId = newPart.getArbitratorId();
41 transactionId = newPart.getTransactionId();
42 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
43 machineId = newPart.getMachineId();
48 public void addPartDecode(TransactionPart newPart) {
51 // If dead then just kill this part and move on
56 sequenceNumber = newPart.getSequenceNumber();
57 arbitratorId = newPart.getArbitratorId();
58 transactionId = newPart.getTransactionId();
59 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
60 machineId = newPart.getMachineId();
62 TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
64 if (previoslySeenPart != NULL) {
65 // Set dead the old one since the new one is a rescued version of this part
66 previoslySeenPart.setDead();
67 } else if (newPart.isLastPart()) {
68 missingParts = new HashSet<int32_t>();
71 for (int i = 0; i < newPart.getPartNumber(); i++) {
72 if (parts.get(i) == NULL) {
78 if (!isComplete && hasLastPart) {
80 // We have seen this part so remove it from the set of missing parts
81 missingParts.remove(newPart.getPartNumber());
83 // Check if all the parts have been seen
84 if (missingParts.size() == 0) {
86 // We have all the parts
89 // Decode all the parts and create the key value guard and update sets
90 decodeTransactionData();
95 public void addUpdateKV(KeyValue kv) {
96 keyValueUpdateSet.add(kv);
99 public void addGuardKV(KeyValue kv) {
100 keyValueGuardSet.add(kv);
104 public int64_t getSequenceNumber() {
105 return sequenceNumber;
108 public void setSequenceNumber(int64_t _sequenceNumber) {
109 sequenceNumber = _sequenceNumber;
111 for (int32_t i : parts.keySet()) {
112 parts.get(i).setSequenceNumber(sequenceNumber);
116 public int64_t getClientLocalSequenceNumber() {
117 return clientLocalSequenceNumber;
120 public Hashtable<int32_t, TransactionPart> getParts() {
124 public bool didSendAPartToServer() {
125 return didSendAPartToServer;
128 public void resetNextPartToSend() {
132 public TransactionPart getNextPartToSend() {
133 if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
136 TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
142 public void setServerFailure() {
143 hadServerFailure = true;
146 public bool getServerFailure() {
147 return hadServerFailure;
151 public void resetServerFailure() {
152 hadServerFailure = false;
156 public void setTransactionStatus(TransactionStatus _transactionStatus) {
157 transactionStatus = _transactionStatus;
160 public TransactionStatus getTransactionStatus() {
161 return transactionStatus;
164 public void removeSentParts(Vector<int32_t> sentParts) {
166 if (partsPendingSend.removeAll(sentParts))
168 didSendAPartToServer = true;
169 transactionStatus.setTransactionSequenceNumber(sequenceNumber);
173 public bool didSendAllParts() {
174 return partsPendingSend.isEmpty();
177 public Set<KeyValue> getKeyValueUpdateSet() {
178 return keyValueUpdateSet;
181 public int getNumberOfParts() {
185 public int64_t getMachineId() {
189 public int64_t getArbitrator() {
193 public bool isComplete() {
197 public Pair<int64_t, int64_t> getId() {
198 return transactionId;
201 public void setDead() {
210 // Make all the parts of this transaction dead
211 for (int32_t partNumber : parts.keySet()) {
212 TransactionPart part = parts.get(partNumber);
217 public TransactionPart getPart(int index) {
218 return parts.get(index);
221 private void decodeTransactionData() {
223 // Calculate the size of the data section
225 for (int i = 0; i < parts.keySet().size(); i++) {
226 TransactionPart tp = parts.get(i);
227 dataSize += tp.getDataSize();
230 Array<char> *combinedData = new char[dataSize];
231 int currentPosition = 0;
233 // Stitch all the data sections together
234 for (int i = 0; i < parts.keySet().size(); i++) {
235 TransactionPart tp = parts.get(i);
236 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
237 currentPosition += tp.getDataSize();
241 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
243 // Decode how many key value pairs need to be decoded
244 int numberOfKVGuards = bbDecode.getInt();
245 int numberOfKVUpdates = bbDecode.getInt();
247 // Decode all the guard key values
248 for (int i = 0; i < numberOfKVGuards; i++) {
249 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
250 keyValueGuardSet.add(kv);
253 // Decode all the updates key values
254 for (int i = 0; i < numberOfKVUpdates; i++) {
255 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
256 keyValueUpdateSet.add(kv);
260 public bool evaluateGuard(Hashtable<IoTString, KeyValue> committedKeyValueTable, Hashtable<IoTString, KeyValue> speculatedKeyValueTable, Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
261 for (KeyValue kvGuard : keyValueGuardSet) {
263 // First check if the key is in the speculative table, this is the value of the latest assumption
266 // If we have a speculation table then use it first
267 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
268 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
271 // If we have a speculation table then use it first
272 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
273 kv = speculatedKeyValueTable.get(kvGuard.getKey());
277 // if it is not in the speculative table then check the committed table and use that
278 // value as our latest assumption
279 kv = committedKeyValueTable.get(kvGuard.getKey());
282 if (kvGuard.getValue() != NULL) {
283 if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
287 System.out.println(kvGuard.getValue() + " " + kv.getValue());
289 System.out.println(kvGuard.getValue() + " " + kv);