edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2
3 Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) :
4         buffer(NULL),
5         cloud(new CloudComm(this, baseurl, password, listeningPort)),
6         random(NULL),
7         liveTableStatus(NULL),
8         pendingTransactionBuilder(NULL),
9         lastPendingTransactionSpeculatedOn(NULL),
10         firstPendingTransaction(NULL),
11         numberOfSlots(0),
12         bufferResizeThreshold(0),
13         liveSlotCount(0),
14         oldestLiveSlotSequenceNumber(0),
15         localMachineId(_localMachineId),
16         sequenceNumber(0),
17         localTransactionSequenceNumber(0),
18         lastTransactionSequenceNumberSpeculatedOn(0),
19         oldestTransactionSequenceNumberSpeculatedOn(0),
20         localArbitrationSequenceNumber(0),
21         hadPartialSendToServer(false),
22         attemptedToSendToServer(false),
23         expectedSize(0),
24         didFindTableStatus(false),
25         currMaxSize(0),
26         lastSlotAttemptedToSend(NULL),
27         lastIsNewKey(false),
28         lastNewSize(0),
29         lastTransactionPartsSent(NULL),
30         lastPendingSendArbitrationEntriesToDelete(NULL),
31         lastNewKey(NULL),
32         committedKeyValueTable(NULL),
33         speculatedKeyValueTable(NULL),
34         pendingTransactionSpeculatedKeyValueTable(NULL),
35         liveNewKeyTable(NULL),
36         lastMessageTable(NULL),
37         rejectedMessageWatchVectorTable(NULL),
38         arbitratorTable(NULL),
39         liveAbortTable(NULL),
40         newTransactionParts(NULL),
41         newCommitParts(NULL),
42         lastArbitratedTransactionNumberByArbitratorTable(NULL),
43         liveTransactionBySequenceNumberTable(NULL),
44         liveTransactionByTransactionIdTable(NULL),
45         liveCommitsTable(NULL),
46         liveCommitsByKeyTable(NULL),
47         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
48         rejectedSlotVector(NULL),
49         pendingTransactionQueue(NULL),
50         pendingSendArbitrationRounds(NULL),
51         pendingSendArbitrationEntriesToDelete(NULL),
52         transactionPartsSent(NULL),
53         outstandingTransactionStatus(NULL),
54         liveAbortsGeneratedByLocal(NULL),
55         offlineTransactionsCommittedAndAtServer(NULL),
56         localCommunicationTable(NULL),
57         lastTransactionSeenFromMachineFromServer(NULL),
58         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
59         lastInsertedNewKey(false),
60         lastSeqNumArbOn(0)
61 {
62         init();
63 }
64
65 Table::Table(CloudComm _cloud, int64_t _localMachineId) :
66         buffer(NULL),
67         cloud(_cloud),
68         random(NULL),
69         liveTableStatus(NULL),
70         pendingTransactionBuilder(NULL),
71         lastPendingTransactionSpeculatedOn(NULL),
72         firstPendingTransaction(NULL),
73         numberOfSlots(0),
74         bufferResizeThreshold(0),
75         liveSlotCount(0),
76         oldestLiveSlotSequenceNumber(0),
77         localMachineId(_localMachineId),
78         sequenceNumber(0),
79         localTransactionSequenceNumber(0),
80         lastTransactionSequenceNumberSpeculatedOn(0),
81         oldestTransactionSequenceNumberSpeculatedOn(0),
82         localArbitrationSequenceNumber(0),
83         hadPartialSendToServer(false),
84         attemptedToSendToServer(false),
85         expectedSize(0),
86         didFindTableStatus(false),
87         currMaxSize(0),
88         lastSlotAttemptedToSend(NULL),
89         lastIsNewKey(false),
90         lastNewSize(0),
91         lastTransactionPartsSent(NULL),
92         lastPendingSendArbitrationEntriesToDelete(NULL),
93         lastNewKey(NULL),
94         committedKeyValueTable(NULL),
95         speculatedKeyValueTable(NULL),
96         pendingTransactionSpeculatedKeyValueTable(NULL),
97         liveNewKeyTable(NULL),
98         lastMessageTable(NULL),
99         rejectedMessageWatchVectorTable(NULL),
100         arbitratorTable(NULL),
101         liveAbortTable(NULL),
102         newTransactionParts(NULL),
103         newCommitParts(NULL),
104         lastArbitratedTransactionNumberByArbitratorTable(NULL),
105         liveTransactionBySequenceNumberTable(NULL),
106         liveTransactionByTransactionIdTable(NULL),
107         liveCommitsTable(NULL),
108         liveCommitsByKeyTable(NULL),
109         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
110         rejectedSlotVector(NULL),
111         pendingTransactionQueue(NULL),
112         pendingSendArbitrationRounds(NULL),
113         pendingSendArbitrationEntriesToDelete(NULL),
114         transactionPartsSent(NULL),
115         outstandingTransactionStatus(NULL),
116         liveAbortsGeneratedByLocal(NULL),
117         offlineTransactionsCommittedAndAtServer(NULL),
118         localCommunicationTable(NULL),
119         lastTransactionSeenFromMachineFromServer(NULL),
120         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
121         lastInsertedNewKey(false),
122         lastSeqNumArbOn(0)
123 {
124         init();
125 }
126
127 /**
128  * Init all the stuff needed for for table usage
129  */
130 void Table::init() {
131
132         // Init helper objects
133         random = new Random();
134         buffer = new SlotBuffer();
135
136         // Set Variables
137         oldestLiveSlotSequenceNumver = 1;
138
139         // init data structs
140         committedKeyValueTable = new Hashtable<IoTString, KeyValue>();
141         speculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
142         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
143         liveNewKeyTable = new Hashtable<IoTString, NewKey>();
144         lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
145         rejectedMessageWatchVectorTable = new Hashtable<int64_t Hashset<RejectedMessage> >();
146         arbitratorTable = new Hashtable<IoTString, Long>();
147         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
148         newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
149         newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> >();
150         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
151         liveTransactionBySequenceNumberTable = new Hashtable<int64_t Transaction>();
152         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction>();
153         liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit> >();
154         liveCommitsByKeyTable = new Hashtable<IoTString, Commit>();
155         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
156         rejectedSlotVector = new Vector<Long>();
157         pendingTransactionQueue = new Vector<Transaction>();
158         pendingSendArbitrationEntriesToDelete = new Vector<Entry>();
159         transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
160         outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
161         liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
162         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
163         localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
164         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
165         pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
166         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
167
168
169         // Other init stuff
170         numberOfSlots = buffer.capacity();
171         setResizeThreshold();
172 }
173
174 // TODO: delete method
175 synchronized void Table::printSlots() {
176         int64_t o = buffer.getOldestSeqNum();
177         int64_t n = buffer.getNewestSeqNum();
178
179         Array<int> *types = new Array<int>(10);
180
181         int num = 0;
182
183         int livec = 0;
184         int deadc = 0;
185
186         int casdasd = 0;
187
188         int liveslo = 0;
189
190         for (int64_t i = o; i < (n + 1); i++) {
191                 Slot s = buffer.getSlot(i);
192
193
194                 if (s.isLive()) {
195                         liveslo++;
196                 }
197
198                 Vector<Entry> entries = s.getEntries();
199
200                 for (Entry e : entries) {
201                         if (e.isLive()) {
202                                 int type = e.getType();
203
204
205                                 if (type == 6) {
206                                         RejectedMessage rej = (RejectedMessage)e;
207                                         casdasd++;
208
209                                         System.out.println(rej.getMachineID());
210                                 }
211
212
213                                 types[type] = types[type] + 1;
214                                 num++;
215                                 livec++;
216                         } else {
217                                 deadc++;
218                         }
219                 }
220         }
221
222         for (int i = 0; i < 10; i++) {
223                 System.out.println(i + "    " + types[i]);
224         }
225         System.out.println("Live count:   " + livec);
226         System.out.println("Live Slot count:   " + liveslo);
227
228         System.out.println("Dead count:   " + deadc);
229         System.out.println("Old:   " + o);
230         System.out.println("New:   " + n);
231         System.out.println("Size:   " + buffer.size());
232         // System.out.println("Commits:   " + liveCommitsTable.size());
233         System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
234         System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
235
236         for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
237                 System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
238         }
239
240
241         for (Long a : liveCommitsTable.keySet()) {
242                 for (Long b : liveCommitsTable.get(a).keySet()) {
243                         for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
244                                 System.out.print(kv + " ");
245                         }
246                         System.out.print("|| ");
247                 }
248                 System.out.println();
249         }
250
251 }
252
253 /**
254  * Initialize the table by inserting a table status as the first entry into the table status
255  * also initialize the crypto stuff.
256  */
257 synchronized void Table::initTable() {
258         cloud.initSecurity();
259
260         // Create the first insertion into the block chain which is the table status
261         Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
262         localSequenceNumber++;
263         TableStatus status = new TableStatus(s, numberOfSlots);
264         s.addEntry(status);
265         Array<Slot> *array = cloud.putSlot(s, numberOfSlots);
266
267         if (array == NULL) {
268                 array = new Array<Slot>(1);
269                 array->set(0, s);
270                 // update local block chain
271                 validateAndUpdate(array, true);
272         } else if (array.length == 1) {
273                 // in case we did push the slot BUT we failed to init it
274                 validateAndUpdate(array, true);
275         } else {
276                 throw new Error("Error on initialization");
277         }
278 }
279
280 /**
281  * Rebuild the table from scratch by pulling the latest block chain from the server.
282  */
283 synchronized void Table::rebuild() {
284         // Just pull the latest slots from the server
285         Array<Slot> *newslots = cloud.getSlots(sequenceNumber + 1);
286         validateAndUpdate(newslots, true);
287         sendToServer(NULL);
288         updateLiveTransactionsAndStatus();
289
290 }
291
292 // String toString() {
293 //  String retString = " Committed Table: \n";
294 //  retString += "---------------------------\n";
295 //  retString += commitedTable.toString();
296
297 //  retString += "\n\n";
298
299 //  retString += " Speculative Table: \n";
300 //  retString += "---------------------------\n";
301 //  retString += speculativeTable.toString();
302
303 //  return retString;
304 // }
305
306 synchronized void Table::addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) {
307         localCommunicationTable.put(arbitrator, new Pair<String, int32_t>(hostName, portNumber));
308 }
309
310 synchronized Long Table::getArbitrator(IoTString key) {
311         return arbitratorTable.get(key);
312 }
313
314 synchronized void Table::close() {
315         cloud.close();
316 }
317
318 synchronized IoTString Table::getCommitted(IoTString key)  {
319         KeyValue kv = committedKeyValueTable.get(key);
320
321         if (kv != NULL) {
322                 return kv.getValue();
323         } else {
324                 return NULL;
325         }
326 }
327
328 synchronized IoTString Table::getSpeculative(IoTString key) {
329         KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
330
331         if (kv == NULL) {
332                 kv = speculatedKeyValueTable.get(key);
333         }
334
335         if (kv == NULL) {
336                 kv = committedKeyValueTable.get(key);
337         }
338
339         if (kv != NULL) {
340                 return kv.getValue();
341         } else {
342                 return NULL;
343         }
344 }
345
346 synchronized IoTString Table::getCommittedAtomic(IoTString key) {
347         KeyValue kv = committedKeyValueTable.get(key);
348
349         if (arbitratorTable.get(key) == NULL) {
350                 throw new Error("Key not Found.");
351         }
352
353         // Make sure new key value pair matches the current arbitrator
354         if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
355                 // TODO: Maybe not throw en error
356                 throw new Error("Not all Key Values Match Arbitrator.");
357         }
358
359         if (kv != NULL) {
360                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
361                 return kv.getValue();
362         } else {
363                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
364                 return NULL;
365         }
366 }
367
368 synchronized IoTString Table::getSpeculativeAtomic(IoTString key) {
369         if (arbitratorTable.get(key) == NULL) {
370                 throw new Error("Key not Found.");
371         }
372
373         // Make sure new key value pair matches the current arbitrator
374         if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
375                 // TODO: Maybe not throw en error
376                 throw new Error("Not all Key Values Match Arbitrator.");
377         }
378
379         KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
380
381         if (kv == NULL) {
382                 kv = speculatedKeyValueTable.get(key);
383         }
384
385         if (kv == NULL) {
386                 kv = committedKeyValueTable.get(key);
387         }
388
389         if (kv != NULL) {
390                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
391                 return kv.getValue();
392         } else {
393                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
394                 return NULL;
395         }
396 }
397
398 synchronized bool Table::update()  {
399         try {
400                 Array<Slot> *newSlots = cloud.getSlots(sequenceNumber + 1);
401                 validateAndUpdate(newSlots, false);
402                 sendToServer(NULL);
403
404
405                 updateLiveTransactionsAndStatus();
406
407                 return true;
408         } catch (Exception e) {
409                 // e.printStackTrace();
410
411                 for (Long m : localCommunicationTable.keySet()) {
412                         updateFromLocal(m);
413                 }
414         }
415
416         return false;
417 }
418
419 synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) {
420         while (true) {
421                 if (arbitratorTable.get(keyName) != NULL) {
422                         // There is already an arbitrator
423                         return false;
424                 }
425
426                 NewKey newKey = new NewKey(NULL, keyName, machineId);
427
428                 if (sendToServer(newKey)) {
429                         // If successfully inserted
430                         return true;
431                 }
432         }
433 }
434
435 synchronized void Table::startTransaction() {
436         // Create a new transaction, invalidates any old pending transactions.
437         pendingTransactionBuilder = new PendingTransaction(localMachineId);
438 }
439
440 synchronized void Table::addKV(IoTString key, IoTString value) {
441
442         // Make sure it is a valid key
443         if (arbitratorTable.get(key) == NULL) {
444                 throw new Error("Key not Found.");
445         }
446
447         // Make sure new key value pair matches the current arbitrator
448         if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
449                 // TODO: Maybe not throw en error
450                 throw new Error("Not all Key Values Match Arbitrator.");
451         }
452
453         // Add the key value to this transaction
454         KeyValue kv = new KeyValue(key, value);
455         pendingTransactionBuilder.addKV(kv);
456 }
457
458 synchronized TransactionStatus Table::commitTransaction() {
459
460         if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
461                 // transaction with no updates will have no effect on the system
462                 return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
463         }
464
465         // Set the local transaction sequence number and increment
466         pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
467         localTransactionSequenceNumber++;
468
469         // Create the transaction status
470         TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
471
472         // Create the new transaction
473         Transaction newTransaction = pendingTransactionBuilder.createTransaction();
474         newTransaction.setTransactionStatus(transactionStatus);
475
476         if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
477                 // Add it to the queue and invalidate the builder for safety
478                 pendingTransactionQueue.add(newTransaction);
479         } else {
480                 arbitrateOnLocalTransaction(newTransaction);
481                 updateLiveStateFromLocal();
482         }
483
484         pendingTransactionBuilder = new PendingTransaction(localMachineId);
485
486         try {
487                 sendToServer(NULL);
488         } catch (ServerException e) {
489
490                 Set<Long> arbitratorTriedAndFailed = new Hashset<Long>();
491                 for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
492                         Transaction transaction = iter.next();
493
494                         if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
495                                 // Already contacted this client so ignore all attempts to contact this client
496                                 // to preserve ordering for arbitrator
497                                 continue;
498                         }
499
500                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
501
502                         if (sendReturn.getFirst()) {
503                                 // Failed to contact over local
504                                 arbitratorTriedAndFailed.add(transaction.getArbitrator());
505                         } else {
506                                 // Successful contact or should not contact
507
508                                 if (sendReturn.getSecond()) {
509                                         // did arbitrate
510                                         iter.remove();
511                                 }
512                         }
513                 }
514         }
515
516         updateLiveStateFromLocal();
517
518         return transactionStatus;
519 }
520
521 /**
522  * Get the machine ID for this client
523  */
524 int64_t Table::getMachineId() {
525         return localMachineId;
526 }
527
528 /**
529  * Decrement the number of live slots that we currently have
530  */
531 void Table::decrementLiveCount() {
532         liveSlotCount--;
533 }
534
535 /**
536  * Recalculate the new resize threshold
537  */
538 void Table::setResizeThreshold() {
539         int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
540         bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
541 }
542
543 int64_t Table::getLocalSequenceNumber() {
544         return localSequenceNumber;
545 }
546
547
548 bool lastInsertedNewKey = false;
549
550 bool Table::sendToServer(NewKey newKey) {
551
552         bool fromRetry = false;
553
554         try {
555                 if (hadPartialSendToServer) {
556                         Array<Slot> *newSlots = cloud.getSlots(sequenceNumber + 1);
557                         if (newSlots.length == 0) {
558                                 fromRetry = true;
559                                 ThreeTuple<bool, bool, Array<Slot> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
560
561                                 if (sendSlotsReturn.getFirst()) {
562                                         if (newKey != NULL) {
563                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
564                                                         newKey = NULL;
565                                                 }
566                                         }
567
568                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
569                                                 transaction.resetServerFailure();
570
571                                                 // Update which transactions parts still need to be sent
572                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
573
574                                                 // Add the transaction status to the outstanding list
575                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
576
577                                                 // Update the transaction status
578                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
579
580                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
581                                                 if (transaction.didSendAllParts()) {
582                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
583                                                         pendingTransactionQueue.remove(transaction);
584                                                 }
585                                         }
586                                 } else {
587
588                                         newSlots = sendSlotsReturn.getThird();
589
590                                         bool isInserted = false;
591                                         for (Slot s : newSlots) {
592                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
593                                                         isInserted = true;
594                                                         break;
595                                                 }
596                                         }
597
598                                         for (Slot s : newSlots) {
599                                                 if (isInserted) {
600                                                         break;
601                                                 }
602
603                                                 // Process each entry in the slot
604                                                 for (Entry entry : s.getEntries()) {
605
606                                                         if (entry.getType() == Entry.TypeLastMessage) {
607                                                                 LastMessage lastMessage = (LastMessage)entry;
608                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
609                                                                         isInserted = true;
610                                                                         break;
611                                                                 }
612                                                         }
613                                                 }
614                                         }
615
616                                         if (isInserted) {
617                                                 if (newKey != NULL) {
618                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
619                                                                 newKey = NULL;
620                                                         }
621                                                 }
622
623                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
624                                                         transaction.resetServerFailure();
625
626                                                         // Update which transactions parts still need to be sent
627                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
628
629                                                         // Add the transaction status to the outstanding list
630                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
631
632                                                         // Update the transaction status
633                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
634
635                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
636                                                         if (transaction.didSendAllParts()) {
637                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
638                                                                 pendingTransactionQueue.remove(transaction);
639                                                         } else {
640                                                                 transaction.resetServerFailure();
641                                                                 // Set the transaction sequence number back to nothing
642                                                                 if (!transaction.didSendAPartToServer()) {
643                                                                         transaction.setSequenceNumber(-1);
644                                                                 }
645                                                         }
646                                                 }
647                                         }
648                                 }
649
650                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
651                                         transaction.resetServerFailure();
652                                         // Set the transaction sequence number back to nothing
653                                         if (!transaction.didSendAPartToServer()) {
654                                                 transaction.setSequenceNumber(-1);
655                                         }
656                                 }
657
658                                 if (sendSlotsReturn.getThird().length != 0) {
659                                         // insert into the local block chain
660                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
661                                 }
662                                 // continue;
663                         } else {
664                                 bool isInserted = false;
665                                 for (Slot s : newSlots) {
666                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
667                                                 isInserted = true;
668                                                 break;
669                                         }
670                                 }
671
672                                 for (Slot s : newSlots) {
673                                         if (isInserted) {
674                                                 break;
675                                         }
676
677                                         // Process each entry in the slot
678                                         for (Entry entry : s.getEntries()) {
679
680                                                 if (entry.getType() == Entry.TypeLastMessage) {
681                                                         LastMessage lastMessage = (LastMessage)entry;
682                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
683                                                                 isInserted = true;
684                                                                 break;
685                                                         }
686                                                 }
687                                         }
688                                 }
689
690                                 if (isInserted) {
691                                         if (newKey != NULL) {
692                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
693                                                         newKey = NULL;
694                                                 }
695                                         }
696
697                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
698                                                 transaction.resetServerFailure();
699
700                                                 // Update which transactions parts still need to be sent
701                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
702
703                                                 // Add the transaction status to the outstanding list
704                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
705
706                                                 // Update the transaction status
707                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
708
709                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
710                                                 if (transaction.didSendAllParts()) {
711                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
712                                                         pendingTransactionQueue.remove(transaction);
713                                                 } else {
714                                                         transaction.resetServerFailure();
715                                                         // Set the transaction sequence number back to nothing
716                                                         if (!transaction.didSendAPartToServer()) {
717                                                                 transaction.setSequenceNumber(-1);
718                                                         }
719                                                 }
720                                         }
721                                 } else {
722                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
723                                                 transaction.resetServerFailure();
724                                                 // Set the transaction sequence number back to nothing
725                                                 if (!transaction.didSendAPartToServer()) {
726                                                         transaction.setSequenceNumber(-1);
727                                                 }
728                                         }
729                                 }
730
731                                 // insert into the local block chain
732                                 validateAndUpdate(newSlots, true);
733                         }
734                 }
735         } catch (ServerException e) {
736                 throw e;
737         }
738
739
740
741         try {
742                 // While we have stuff that needs inserting into the block chain
743                 while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) {
744
745                         fromRetry = false;
746
747                         if (hadPartialSendToServer) {
748                                 throw new Error("Should Be error free");
749                         }
750
751
752
753                         // If there is a new key with same name then end
754                         if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) {
755                                 return false;
756                         }
757
758                         // Create the slot
759                         Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
760                         localSequenceNumber++;
761
762                         // Try to fill the slot with data
763                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
764                         bool needsResize = fillSlotsReturn.getFirst();
765                         int newSize = fillSlotsReturn.getSecond();
766                         bool insertedNewKey = fillSlotsReturn.getThird();
767
768                         if (needsResize) {
769                                 // Reset which transaction to send
770                                 for (Transaction transaction : transactionPartsSent.keySet()) {
771                                         transaction.resetNextPartToSend();
772
773                                         // Set the transaction sequence number back to nothing
774                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
775                                                 transaction.setSequenceNumber(-1);
776                                         }
777                                 }
778
779                                 // Clear the sent data since we are trying again
780                                 pendingSendArbitrationEntriesToDelete.clear();
781                                 transactionPartsSent.clear();
782
783                                 // We needed a resize so try again
784                                 fillSlot(slot, true, newKey);
785                         }
786
787                         lastSlotAttemptedToSend = slot;
788                         lastIsNewKey = (newKey != NULL);
789                         lastInsertedNewKey = insertedNewKey;
790                         lastNewSize = newSize;
791                         lastNewKey = newKey;
792                         lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >(transactionPartsSent);
793                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
794
795
796                         ThreeTuple<bool, bool, Array<Slot> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
797
798                         if (sendSlotsReturn.getFirst()) {
799
800                                 // Did insert into the block chain
801
802                                 if (insertedNewKey) {
803                                         // This slot was what was inserted not a previous slot
804
805                                         // New Key was successfully inserted into the block chain so dont want to insert it again
806                                         newKey = NULL;
807                                 }
808
809                                 // Remove the aborts and commit parts that were sent from the pending to send queue
810                                 for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
811                                         ArbitrationRound round = iter.next();
812                                         round.removeParts(pendingSendArbitrationEntriesToDelete);
813
814                                         if (round.isDoneSending()) {
815                                                 // Sent all the parts
816                                                 iter.remove();
817                                         }
818                                 }
819
820                                 for (Transaction transaction : transactionPartsSent.keySet()) {
821                                         transaction.resetServerFailure();
822
823                                         // Update which transactions parts still need to be sent
824                                         transaction.removeSentParts(transactionPartsSent.get(transaction));
825
826                                         // Add the transaction status to the outstanding list
827                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
828
829                                         // Update the transaction status
830                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
831
832                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
833                                         if (transaction.didSendAllParts()) {
834                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
835                                                 pendingTransactionQueue.remove(transaction);
836                                         }
837                                 }
838                         } else {
839
840                                 // if (!sendSlotsReturn.getSecond()) {
841                                 //  for (Transaction transaction : lastTransactionPartsSent.keySet()) {
842                                 //    transaction.resetServerFailure();
843                                 //  }
844                                 // } else {
845                                 //  for (Transaction transaction : lastTransactionPartsSent.keySet()) {
846                                 //    transaction.resetServerFailure();
847
848                                 //    // Update which transactions parts still need to be sent
849                                 //    transaction.removeSentParts(transactionPartsSent.get(transaction));
850
851                                 //    // Add the transaction status to the outstanding list
852                                 //    outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
853
854                                 //    // Update the transaction status
855                                 //    transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
856
857                                 //    // Check if all the transaction parts were successfully sent and if so then remove it from pending
858                                 //    if (transaction.didSendAllParts()) {
859                                 //      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
860                                 //      pendingTransactionQueue.remove(transaction);
861
862                                 //      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
863                                 //        System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
864                                 //      }
865                                 //    }
866                                 //  }
867                                 // }
868
869                                 // Reset which transaction to send
870                                 for (Transaction transaction : transactionPartsSent.keySet()) {
871                                         transaction.resetNextPartToSend();
872                                         // transaction.resetNextPartToSend();
873
874                                         // Set the transaction sequence number back to nothing
875                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
876                                                 transaction.setSequenceNumber(-1);
877                                         }
878                                 }
879                         }
880
881                         // Clear the sent data in preparation for next send
882                         pendingSendArbitrationEntriesToDelete.clear();
883                         transactionPartsSent.clear();
884
885                         if (sendSlotsReturn.getThird().length != 0) {
886                                 // insert into the local block chain
887                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
888                         }
889                 }
890
891         } catch (ServerException e) {
892
893                 if (e.getType() != ServerException.TypeInputTimeout) {
894                         // e.printStackTrace();
895
896                         // Nothing was able to be sent to the server so just clear these data structures
897                         for (Transaction transaction : transactionPartsSent.keySet()) {
898                                 transaction.resetNextPartToSend();
899
900                                 // Set the transaction sequence number back to nothing
901                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
902                                         transaction.setSequenceNumber(-1);
903                                 }
904                         }
905                 } else {
906                         // There was a partial send to the server
907                         hadPartialSendToServer = true;
908
909
910                         // if (!fromRetry) {
911                         //  lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
912                         //  lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
913                         // }
914
915                         // Nothing was able to be sent to the server so just clear these data structures
916                         for (Transaction transaction : transactionPartsSent.keySet()) {
917                                 transaction.resetNextPartToSend();
918                                 transaction.setServerFailure();
919                         }
920                 }
921
922                 pendingSendArbitrationEntriesToDelete.clear();
923                 transactionPartsSent.clear();
924
925                 throw e;
926         }
927
928         return newKey == NULL;
929 }
930
931 synchronized bool Table::updateFromLocal(int64_t machineId) {
932         Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(machineId);
933         if (localCommunicationInformation == NULL) {
934                 // Cant talk to that device locally so do nothing
935                 return false;
936         }
937
938         // Get the size of the send data
939         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
940
941         Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
942         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) {
943                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
944         }
945
946         Array<char> *sendData = new char[sendDataSize];
947         ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
948
949         // Encode the data
950         bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
951         bbEncode.putInt(0);
952
953         // Send by local
954         Array<char> *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
955         localSequenceNumber++;
956
957         if (returnData == NULL) {
958                 // Could not contact server
959                 return false;
960         }
961
962         // Decode the data
963         ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
964         int numberOfEntries = bbDecode.getInt();
965
966         for (int i = 0; i < numberOfEntries; i++) {
967                 char type = bbDecode.get();
968                 if (type == Entry.TypeAbort) {
969                         Abort abort = (Abort)Abort.decode(NULL, bbDecode);
970                         processEntry(abort);
971                 } else if (type == Entry.TypeCommitPart) {
972                         CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
973                         processEntry(commitPart);
974                 }
975         }
976
977         updateLiveStateFromLocal();
978
979         return true;
980 }
981
982 Pair<bool, bool> Table::sendTransactionToLocal(Transaction transaction) {
983
984         // Get the devices local communications
985         Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
986
987         if (localCommunicationInformation == NULL) {
988                 // Cant talk to that device locally so do nothing
989                 return new Pair<bool, bool>(true, false);
990         }
991
992         // Get the size of the send data
993         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
994         for (TransactionPart part : transaction.getParts().values()) {
995                 sendDataSize += part.getSize();
996         }
997
998         Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
999         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) {
1000                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
1001         }
1002
1003         // Make the send data size
1004         Array<char> *sendData = new char[sendDataSize];
1005         ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
1006
1007         // Encode the data
1008         bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
1009         bbEncode.putInt(transaction.getParts().size());
1010         for (TransactionPart part : transaction.getParts().values()) {
1011                 part.encode(bbEncode);
1012         }
1013
1014
1015         // Send by local
1016         Array<char> *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
1017         localSequenceNumber++;
1018
1019         if (returnData == NULL) {
1020                 // Could not contact server
1021                 return new Pair<bool, bool>(true, false);
1022         }
1023
1024         // Decode the data
1025         ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
1026         bool didCommit = bbDecode.get() == 1;
1027         bool couldArbitrate = bbDecode.get() == 1;
1028         int numberOfEntries = bbDecode.getInt();
1029         bool foundAbort = false;
1030
1031         for (int i = 0; i < numberOfEntries; i++) {
1032                 char type = bbDecode.get();
1033                 if (type == Entry.TypeAbort) {
1034                         Abort abort = (Abort)Abort.decode(NULL, bbDecode);
1035
1036                         if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
1037                                 foundAbort = true;
1038                         }
1039
1040                         processEntry(abort);
1041                 } else if (type == Entry.TypeCommitPart) {
1042                         CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
1043                         processEntry(commitPart);
1044                 }
1045         }
1046
1047         updateLiveStateFromLocal();
1048
1049         if (couldArbitrate) {
1050                 TransactionStatus status =  transaction.getTransactionStatus();
1051                 if (didCommit) {
1052                         status.setStatus(TransactionStatus.StatusCommitted);
1053                 } else {
1054                         status.setStatus(TransactionStatus.StatusAborted);
1055                 }
1056         } else {
1057                 TransactionStatus status =  transaction.getTransactionStatus();
1058                 if (foundAbort) {
1059                         status.setStatus(TransactionStatus.StatusAborted);
1060                 } else {
1061                         status.setStatus(TransactionStatus.StatusCommitted);
1062                 }
1063         }
1064
1065         return new Pair<bool, bool>(false, true);
1066 }
1067
1068 synchronized Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1069
1070         // Decode the data
1071         ByteBuffer bbDecode = ByteBuffer.wrap(data);
1072         int64_t lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1073         int numberOfParts = bbDecode.getInt();
1074
1075         // If we did commit a transaction or not
1076         bool didCommit = false;
1077         bool couldArbitrate = false;
1078
1079         if (numberOfParts != 0) {
1080
1081                 // decode the transaction
1082                 Transaction transaction = new Transaction();
1083                 for (int i = 0; i < numberOfParts; i++) {
1084                         bbDecode.get();
1085                         TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
1086                         transaction.addPartDecode(newPart);
1087                 }
1088
1089                 // Arbitrate on transaction and pull relevant return data
1090                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1091                 couldArbitrate = localArbitrateReturn.getFirst();
1092                 didCommit = localArbitrateReturn.getSecond();
1093
1094                 updateLiveStateFromLocal();
1095
1096                 // Transaction was sent to the server so keep track of it to prevent double commit
1097                 if (transaction.getSequenceNumber() != -1) {
1098                         offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1099                 }
1100         }
1101
1102         // The data to send back
1103         int returnDataSize = 0;
1104         Vector<Entry> unseenArbitrations = new Vector<Entry>();
1105
1106         // Get the aborts to send back
1107         Vector<Long> abortLocalSequenceNumbers = new Vector<Long >(liveAbortsGeneratedByLocal.keySet());
1108         Collections.sort(abortLocalSequenceNumbers);
1109         for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1110                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1111                         continue;
1112                 }
1113
1114                 Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1115                 unseenArbitrations.add(abort);
1116                 returnDataSize += abort.getSize();
1117         }
1118
1119         // Get the commits to send back
1120         Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1121         if (commitForClientTable != NULL) {
1122                 Vector<Long> commitLocalSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
1123                 Collections.sort(commitLocalSequenceNumbers);
1124
1125                 for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1126                         Commit commit = commitForClientTable.get(localSequenceNumber);
1127
1128                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1129                                 continue;
1130                         }
1131
1132                         unseenArbitrations.addAll(commit.getParts().values());
1133
1134                         for (CommitPart commitPart : commit.getParts().values()) {
1135                                 returnDataSize += commitPart.getSize();
1136                         }
1137                 }
1138         }
1139
1140         // Number of arbitration entries to decode
1141         returnDataSize += 2 * sizeof(int32_t);
1142
1143         // bool of did commit or not
1144         if (numberOfParts != 0) {
1145                 returnDataSize += sizeof(char);
1146         }
1147
1148         // Data to send Back
1149         Array<char> *returnData = new char[returnDataSize];
1150         ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1151
1152         if (numberOfParts != 0) {
1153                 if (didCommit) {
1154                         bbEncode.put((char)1);
1155                 } else {
1156                         bbEncode.put((char)0);
1157                 }
1158                 if (couldArbitrate) {
1159                         bbEncode.put((char)1);
1160                 } else {
1161                         bbEncode.put((char)0);
1162                 }
1163         }
1164
1165         bbEncode.putInt(unseenArbitrations.size());
1166         for (Entry entry : unseenArbitrations) {
1167                 entry.encode(bbEncode);
1168         }
1169
1170
1171         localSequenceNumber++;
1172         return returnData;
1173 }
1174
1175 ThreeTuple<bool, bool, Array<Slot> *> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) {
1176         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1177         attemptedToSendToServer = true;
1178
1179         bool inserted = false;
1180         bool lastTryInserted = false;
1181
1182         Array<Slot> *array = cloud.putSlot(slot, newSize);
1183         if (array == NULL) {
1184                 array = new Array<Slot>();
1185                 array->set(0, slot);
1186                 rejectedSlotVector.clear();
1187                 inserted = true;
1188         } else {
1189                 if (array.length == 0) {
1190                         throw new Error("Server Error: Did not send any slots");
1191                 }
1192
1193                 // if (attemptedToSendToServerTmp) {
1194                 if (hadPartialSendToServer) {
1195
1196                         bool isInserted = false;
1197                         for (Slot s : array) {
1198                                 if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1199                                         isInserted = true;
1200                                         break;
1201                                 }
1202                         }
1203
1204                         for (Slot s : array) {
1205                                 if (isInserted) {
1206                                         break;
1207                                 }
1208
1209                                 // Process each entry in the slot
1210                                 for (Entry entry : s.getEntries()) {
1211
1212                                         if (entry.getType() == Entry.TypeLastMessage) {
1213                                                 LastMessage lastMessage = (LastMessage)entry;
1214
1215                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1216                                                         isInserted = true;
1217                                                         break;
1218                                                 }
1219                                         }
1220                                 }
1221                         }
1222
1223                         if (!isInserted) {
1224                                 rejectedSlotVector.add(slot.getSequenceNumber());
1225                                 lastTryInserted = false;
1226                         } else {
1227                                 lastTryInserted = true;
1228                         }
1229                 } else {
1230                         rejectedSlotVector.add(slot.getSequenceNumber());
1231                         lastTryInserted = false;
1232                 }
1233         }
1234
1235         return new ThreeTuple<bool, bool, Array<Slot> *>(inserted, lastTryInserted, array);
1236 }
1237
1238 /**
1239  * Returns false if a resize was needed
1240  */
1241 ThreeTuple<bool, int32_t, bool> *Table::fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
1242
1243
1244         int newSize = 0;
1245         if (liveSlotCount > bufferResizeThreshold) {
1246                 resize = true;  //Resize is forced
1247
1248         }
1249
1250         if (resize) {
1251                 newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1252                 TableStatus status = new TableStatus(slot, newSize);
1253                 slot.addEntry(status);
1254         }
1255
1256         // Fill with rejected slots first before doing anything else
1257         doRejectedMessages(slot);
1258
1259         // Do mandatory rescue of entries
1260         ThreeTuple<bool, bool, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1261
1262         // Extract working variables
1263         bool needsResize = mandatoryRescueReturn.getFirst();
1264         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1265         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1266
1267         if (needsResize && !resize) {
1268                 // We need to resize but we are not resizing so return false
1269                 return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1270         }
1271
1272         bool inserted = false;
1273         if (newKeyEntry != NULL) {
1274                 newKeyEntry.setSlot(slot);
1275                 if (slot.hasSpace(newKeyEntry)) {
1276
1277                         slot.addEntry(newKeyEntry);
1278                         inserted = true;
1279                 }
1280         }
1281
1282         // Clear the transactions, aborts and commits that were sent previously
1283         transactionPartsSent.clear();
1284         pendingSendArbitrationEntriesToDelete.clear();
1285
1286         for (ArbitrationRound round : pendingSendArbitrationRounds) {
1287                 bool isFull = false;
1288                 round.generateParts();
1289                 Vector<Entry> parts = round.getParts();
1290
1291                 // Insert pending arbitration data
1292                 for (Entry arbitrationData : parts) {
1293
1294                         // If it is an abort then we need to set some information
1295                         if (arbitrationData instanceof Abort) {
1296                                 ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1297                         }
1298
1299                         if (!slot.hasSpace(arbitrationData)) {
1300                                 // No space so cant do anything else with these data entries
1301                                 isFull = true;
1302                                 break;
1303                         }
1304
1305                         // Add to this current slot and add it to entries to delete
1306                         slot.addEntry(arbitrationData);
1307                         pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1308                 }
1309
1310                 if (isFull) {
1311                         break;
1312                 }
1313         }
1314
1315         if (pendingTransactionQueue.size() > 0) {
1316
1317                 Transaction transaction = pendingTransactionQueue.get(0);
1318
1319                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1320                 // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1321                 //  transaction.setSequenceNumber(slot.getSequenceNumber());
1322                 // }
1323
1324                 if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1325                         transaction.setSequenceNumber(slot.getSequenceNumber());
1326                 }
1327
1328
1329                 while (true) {
1330                         TransactionPart part = transaction.getNextPartToSend();
1331
1332                         if (part == NULL) {
1333                                 // Ran out of parts to send for this transaction so move on
1334                                 break;
1335                         }
1336
1337                         if (slot.hasSpace(part)) {
1338                                 slot.addEntry(part);
1339                                 Vector<int32_t> partsSent = transactionPartsSent.get(transaction);
1340                                 if (partsSent == NULL) {
1341                                         partsSent = new Vector<int32_t>();
1342                                         transactionPartsSent.put(transaction, partsSent);
1343                                 }
1344                                 partsSent.add(part.getPartNumber());
1345                                 transactionPartsSent.put(transaction, partsSent);
1346                         } else {
1347                                 break;
1348                         }
1349                 }
1350         }
1351
1352         // Fill the remainder of the slot with rescue data
1353         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1354
1355         return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1356 }
1357
1358 void Table::doRejectedMessages(Slot s) {
1359         if (!rejectedSlotVector.isEmpty()) {
1360                 /* TODO: We should avoid generating a rejected message entry if
1361                  * there is already a sufficient entry in the queue (e.g.,
1362                  * equalsto value of true and same sequence number).  */
1363
1364                 int64_t old_seqn = rejectedSlotVector.firstElement();
1365                 if (rejectedSlotVector.size() > REJECTED_THRESHOLD) {
1366                         int64_t new_seqn = rejectedSlotVector.lastElement();
1367                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1368                         s.addEntry(rm);
1369                 } else {
1370                         int64_t prev_seqn = -1;
1371                         int i = 0;
1372                         /* Go through list of missing messages */
1373                         for (; i < rejectedSlotVector.size(); i++) {
1374                                 int64_t curr_seqn = rejectedSlotVector.get(i);
1375                                 Slot s_msg = buffer.getSlot(curr_seqn);
1376                                 if (s_msg != NULL)
1377                                         break;
1378                                 prev_seqn = curr_seqn;
1379                         }
1380                         /* Generate rejected message entry for missing messages */
1381                         if (prev_seqn != -1) {
1382                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1383                                 s.addEntry(rm);
1384                         }
1385                         /* Generate rejected message entries for present messages */
1386                         for (; i < rejectedSlotVector.size(); i++) {
1387                                 int64_t curr_seqn = rejectedSlotVector.get(i);
1388                                 Slot s_msg = buffer.getSlot(curr_seqn);
1389                                 int64_t machineid = s_msg.getMachineID();
1390                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1391                                 s.addEntry(rm);
1392                         }
1393                 }
1394         }
1395 }
1396
1397 ThreeTuple<bool, bool, Long> Table::doMandatoryResuce(Slot slot, bool resize) {
1398         int64_t newestSequenceNumber = buffer.getNewestSeqNum();
1399         int64_t oldestSequenceNumber = buffer.getOldestSeqNum();
1400         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1401                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1402         }
1403
1404         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1405         bool seenLiveSlot = false;
1406         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1407         int64_t threshold = firstIfFull + FREE_SLOTS;           // we want the buffer to be clear of live entries up to this point
1408
1409
1410         // Mandatory Rescue
1411         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1412                 Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1413                 // Push slot number forward
1414                 if (!seenLiveSlot) {
1415                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1416                 }
1417
1418                 if (!previousSlot.isLive()) {
1419                         continue;
1420                 }
1421
1422                 // We have seen a live slot
1423                 seenLiveSlot = true;
1424
1425                 // Get all the live entries for a slot
1426                 Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1427
1428                 // Iterate over all the live entries and try to rescue them
1429                 for (Entry liveEntry : liveEntries) {
1430                         if (slot.hasSpace(liveEntry)) {
1431
1432                                 // Enough space to rescue the entry
1433                                 slot.addEntry(liveEntry);
1434                         } else if (currentSequenceNumber == firstIfFull) {
1435                                 //if there's no space but the entry is about to fall off the queue
1436                                 System.out.println("B");        //?
1437                                 return new ThreeTuple<bool, bool, Long>(true, seenLiveSlot, currentSequenceNumber);
1438
1439                         }
1440                 }
1441         }
1442
1443         // Did not resize
1444         return new ThreeTuple<bool, bool, Long>(false, seenLiveSlot, currentSequenceNumber);
1445 }
1446
1447 void Table::doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) {
1448         /* now go through live entries from least to greatest sequence number until
1449          * either all live slots added, or the slot doesn't have enough room
1450          * for SKIP_THRESHOLD consecutive entries*/
1451         int skipcount = 0;
1452         int64_t newestseqnum = buffer.getNewestSeqNum();
1453 search:
1454         for (; seqn <= newestseqnum; seqn++) {
1455                 Slot prevslot = buffer.getSlot(seqn);
1456                 //Push slot number forward
1457                 if (!seenliveslot)
1458                         oldestLiveSlotSequenceNumver = seqn;
1459
1460                 if (!prevslot.isLive())
1461                         continue;
1462                 seenliveslot = true;
1463                 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1464                 for (Entry liveentry : liveentries) {
1465                         if (s.hasSpace(liveentry))
1466                                 s.addEntry(liveentry);
1467                         else {
1468                                 skipcount++;
1469                                 if (skipcount > SKIP_THRESHOLD)
1470                                         break search;
1471                         }
1472                 }
1473         }
1474 }
1475
1476 /**
1477  * Checks for malicious activity and updates the local copy of the block chain.
1478  */
1479 void Table::validateAndUpdate(Array<Slot> *newSlots, bool acceptUpdatesToLocal) {
1480
1481         // The cloud communication layer has checked slot HMACs already before decoding
1482         if (newSlots.length == 0) {
1483                 return;
1484         }
1485
1486         // Make sure all slots are newer than the last largest slot this client has seen
1487         int64_t firstSeqNum = newSlots[0].getSequenceNumber();
1488         if (firstSeqNum <= sequenceNumber) {
1489                 throw new Error("Server Error: Sent older slots!");
1490         }
1491
1492         // Create an object that can access both new slots and slots in our local chain
1493         // without committing slots to our local chain
1494         SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1495
1496         // Check that the HMAC chain is not broken
1497         checkHMACChain(indexer, newSlots);
1498
1499         // Set to keep track of messages from clients
1500         Hashset<Long> machineSet = new Hashset<int64_t>(lastMessageTable.keySet());
1501
1502         // Process each slots data
1503         for (Slot slot : newSlots) {
1504                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1505
1506                 updateExpectedSize();
1507         }
1508
1509         // If there is a gap, check to see if the server sent us everything.
1510         if (firstSeqNum != (sequenceNumber + 1)) {
1511
1512                 // Check the size of the slots that were sent down by the server.
1513                 // Can only check the size if there was a gap
1514                 checkNumSlots(newSlots.length);
1515
1516                 // Since there was a gap every machine must have pushed a slot or must have
1517                 // a last message message.  If not then the server is hiding slots
1518                 if (!machineSet.isEmpty()) {
1519                         throw new Error("Missing record for machines: " + machineSet);
1520                 }
1521         }
1522
1523         // Update the size of our local block chain.
1524         commitNewMaxSize();
1525
1526         // Commit new to slots to the local block chain.
1527         for (Slot slot : newSlots) {
1528
1529                 // Insert this slot into our local block chain copy.
1530                 buffer.putSlot(slot);
1531
1532                 // Keep track of how many slots are currently live (have live data in them).
1533                 liveSlotCount++;
1534         }
1535
1536         // Get the sequence number of the latest slot in the system
1537         sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1538
1539         updateLiveStateFromServer();
1540
1541         // No Need to remember after we pulled from the server
1542         offlineTransactionsCommittedAndAtServer.clear();
1543
1544         // This is invalidated now
1545         hadPartialSendToServer = false;
1546 }
1547
1548 void Table::updateLiveStateFromServer() {
1549         // Process the new transaction parts
1550         processNewTransactionParts();
1551
1552         // Do arbitration on new transactions that were received
1553         arbitrateFromServer();
1554
1555         // Update all the committed keys
1556         bool didCommitOrSpeculate = updateCommittedTable();
1557
1558         // Delete the transactions that are now dead
1559         updateLiveTransactionsAndStatus();
1560
1561         // Do speculations
1562         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1563         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1564 }
1565
1566 void Table::updateLiveStateFromLocal() {
1567         // Update all the committed keys
1568         bool didCommitOrSpeculate = updateCommittedTable();
1569
1570         // Delete the transactions that are now dead
1571         updateLiveTransactionsAndStatus();
1572
1573         // Do speculations
1574         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1575         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1576 }
1577
1578 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1579         // if (didFindTableStatus) {
1580         // return;
1581         // }
1582         int64_t prevslots = firstSequenceNumber;
1583
1584
1585         if (didFindTableStatus) {
1586                 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1587                 // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1588
1589         } else {
1590                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1591                 // System.out.println("Here: " + expectedsize);
1592         }
1593
1594         // System.out.println(numberOfSlots);
1595
1596         didFindTableStatus = true;
1597         currMaxSize = numberOfSlots;
1598 }
1599
1600 void Table::updateExpectedSize() {
1601         expectedsize++;
1602
1603         if (expectedsize > currMaxSize) {
1604                 expectedsize = currMaxSize;
1605         }
1606 }
1607
1608
1609 /**
1610  * Check the size of the block chain to make sure there are enough slots sent back by the server.
1611  * This is only called when we have a gap between the slots that we have locally and the slots
1612  * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1613  * status message
1614  */
1615 void Table::checkNumSlots(int numberOfSlots) {
1616         if (numberOfSlots != expectedsize) {
1617                 throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1618         }
1619 }
1620
1621 void Table::updateCurrMaxSize(int newmaxsize) {
1622         currMaxSize = newmaxsize;
1623 }
1624
1625
1626 /**
1627  * Update the size of of the local buffer if it is needed.
1628  */
1629 void Table::commitNewMaxSize() {
1630         didFindTableStatus = false;
1631
1632         // Resize the local slot buffer
1633         if (numberOfSlots != currMaxSize) {
1634                 buffer.resize((int)currMaxSize);
1635         }
1636
1637         // Change the number of local slots to the new size
1638         numberOfSlots = (int)currMaxSize;
1639
1640
1641         // Recalculate the resize threshold since the size of the local buffer has changed
1642         setResizeThreshold();
1643 }
1644
1645 /**
1646  * Process the new transaction parts from this latest round of slots received from the server
1647  */
1648 void Table::processNewTransactionParts() {
1649
1650         if (newTransactionParts.size() == 0) {
1651                 // Nothing new to process
1652                 return;
1653         }
1654
1655         // Iterate through all the machine Ids that we received new parts for
1656         for (Long machineId : newTransactionParts.keySet()) {
1657                 Hashtable<Pair<int64_t int32_t>, TransactionPart> parts = newTransactionParts.get(machineId);
1658
1659                 // Iterate through all the parts for that machine Id
1660                 for (Pair<int64_t int32_t> partId : parts.keySet()) {
1661                         TransactionPart part = parts.get(partId);
1662
1663                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1664                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part.getSequenceNumber())) {
1665                                 // Set dead the transaction part
1666                                 part.setDead();
1667                                 continue;
1668                         }
1669
1670                         // Get the transaction object for that sequence number
1671                         Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1672
1673                         if (transaction == NULL) {
1674                                 // This is a new transaction that we dont have so make a new one
1675                                 transaction = new Transaction();
1676
1677                                 // Insert this new transaction into the live tables
1678                                 liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1679                                 liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1680                         }
1681
1682                         // Add that part to the transaction
1683                         transaction.addPartDecode(part);
1684                 }
1685         }
1686
1687         // Clear all the new transaction parts in preparation for the next time the server sends slots
1688         newTransactionParts.clear();
1689 }
1690
1691
1692 int64_t lastSeqNumArbOn = 0;
1693
1694 void Table::arbitrateFromServer() {
1695
1696         if (liveTransactionBySequenceNumberTable.size() == 0) {
1697                 // Nothing to arbitrate on so move on
1698                 return;
1699         }
1700
1701         // Get the transaction sequence numbers and sort from oldest to newest
1702         Vector<Long> transactionSequenceNumbers = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
1703         Collections.sort(transactionSequenceNumbers);
1704
1705         // Collection of key value pairs that are
1706         Hashtable<IoTString, KeyValue> speculativeTableTmp = new Hashtable<IoTString, KeyValue>();
1707
1708         // The last transaction arbitrated on
1709         int64_t lastTransactionCommitted = -1;
1710         Set<Abort> generatedAborts = new Hashset<Abort>();
1711
1712         for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1713                 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1714
1715
1716
1717                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1718                 if (transaction.getArbitrator() != localMachineId) {
1719                         continue;
1720                 }
1721
1722                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1723                         continue;
1724                 }
1725
1726                 if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1727                         // We have seen this already locally so dont commit again
1728                         continue;
1729                 }
1730
1731
1732                 if (!transaction.isComplete()) {
1733                         // Will arbitrate in incorrect order if we continue so just break
1734                         // Most likely this
1735                         break;
1736                 }
1737
1738
1739                 // update the largest transaction seen by arbitrator from server
1740                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == NULL) {
1741                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1742                 } else {
1743                         Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1744                         if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1745                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1746                         }
1747                 }
1748
1749                 if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1750                         // Guard evaluated as true
1751
1752                         // Update the local changes so we can make the commit
1753                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1754                                 speculativeTableTmp.put(kv.getKey(), kv);
1755                         }
1756
1757                         // Update what the last transaction committed was for use in batch commit
1758                         lastTransactionCommitted = transactionSequenceNumber;
1759                 } else {
1760                         // Guard evaluated was false so create abort
1761
1762                         // Create the abort
1763                         Abort newAbort = new Abort(NULL,
1764                                                                                                                                  transaction.getClientLocalSequenceNumber(),
1765                                                                                                                                  transaction.getSequenceNumber(),
1766                                                                                                                                  transaction.getMachineId(),
1767                                                                                                                                  transaction.getArbitrator(),
1768                                                                                                                                  localArbitrationSequenceNumber);
1769                         localArbitrationSequenceNumber++;
1770
1771                         generatedAborts.add(newAbort);
1772
1773                         // Insert the abort so we can process
1774                         processEntry(newAbort);
1775                 }
1776
1777                 lastSeqNumArbOn = transactionSequenceNumber;
1778
1779                 // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1780         }
1781
1782         Commit newCommit = NULL;
1783
1784         // If there is something to commit
1785         if (speculativeTableTmp.size() != 0) {
1786
1787                 // Create the commit and increment the commit sequence number
1788                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1789                 localArbitrationSequenceNumber++;
1790
1791                 // Add all the new keys to the commit
1792                 for (KeyValue kv : speculativeTableTmp.values()) {
1793                         newCommit.addKV(kv);
1794                 }
1795
1796                 // create the commit parts
1797                 newCommit.createCommitParts();
1798
1799                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1800
1801                 // Insert the commit so we can process it
1802                 for (CommitPart commitPart : newCommit.getParts().values()) {
1803                         processEntry(commitPart);
1804                 }
1805         }
1806
1807         if ((newCommit != NULL) || (generatedAborts.size() > 0)) {
1808                 ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1809                 pendingSendArbitrationRounds.add(arbitrationRound);
1810
1811                 if (compactArbitrationData()) {
1812                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1813                         if (newArbitrationRound.getCommit() != NULL) {
1814                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1815                                         processEntry(commitPart);
1816                                 }
1817                         }
1818                 }
1819         }
1820 }
1821
1822 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction transaction) {
1823
1824         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1825         if (transaction.getArbitrator() != localMachineId) {
1826                 return new Pair<bool, bool>(false, false);
1827         }
1828
1829         if (!transaction.isComplete()) {
1830                 // Will arbitrate in incorrect order if we continue so just break
1831                 // Most likely this
1832                 return new Pair<bool, bool>(false, false);
1833         }
1834
1835         if (transaction.getMachineId() != localMachineId) {
1836                 // dont do this check for local transactions
1837                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) {
1838                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1839                                 // We've have already seen this from the server
1840                                 return new Pair<bool, bool>(false, false);
1841                         }
1842                 }
1843         }
1844
1845         if (transaction.evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1846                 // Guard evaluated as true
1847
1848                 // Create the commit and increment the commit sequence number
1849                 Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1850                 localArbitrationSequenceNumber++;
1851
1852                 // Update the local changes so we can make the commit
1853                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1854                         newCommit.addKV(kv);
1855                 }
1856
1857                 // create the commit parts
1858                 newCommit.createCommitParts();
1859
1860                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1861                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1862                 pendingSendArbitrationRounds.add(arbitrationRound);
1863
1864                 if (compactArbitrationData()) {
1865                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1866                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1867                                 processEntry(commitPart);
1868                         }
1869                 } else {
1870                         // Insert the commit so we can process it
1871                         for (CommitPart commitPart : newCommit.getParts().values()) {
1872                                 processEntry(commitPart);
1873                         }
1874                 }
1875
1876                 if (transaction.getMachineId() == localMachineId) {
1877                         TransactionStatus status = transaction.getTransactionStatus();
1878                         if (status != NULL) {
1879                                 status.setStatus(TransactionStatus.StatusCommitted);
1880                         }
1881                 }
1882
1883                 updateLiveStateFromLocal();
1884                 return new Pair<bool, bool>(true, true);
1885         } else {
1886
1887                 if (transaction.getMachineId() == localMachineId) {
1888                         // For locally created messages update the status
1889
1890                         // Guard evaluated was false so create abort
1891                         TransactionStatus status = transaction.getTransactionStatus();
1892                         if (status != NULL) {
1893                                 status.setStatus(TransactionStatus.StatusAborted);
1894                         }
1895                 } else {
1896                         Hashset<Abort *> addAbortSet = new Hashset<Abort * >();
1897
1898
1899                         // Create the abort
1900                         Abort newAbort = new Abort(NULL,
1901                                                                                                                                  transaction.getClientLocalSequenceNumber(),
1902                                                                                                                                  -1,
1903                                                                                                                                  transaction.getMachineId(),
1904                                                                                                                                  transaction.getArbitrator(),
1905                                                                                                                                  localArbitrationSequenceNumber);
1906                         localArbitrationSequenceNumber++;
1907
1908                         addAbortSet.add(newAbort);
1909
1910
1911                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1912                         ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1913                         pendingSendArbitrationRounds.add(arbitrationRound);
1914
1915                         if (compactArbitrationData()) {
1916                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1917                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1918                                         processEntry(commitPart);
1919                                 }
1920                         }
1921                 }
1922
1923                 updateLiveStateFromLocal();
1924                 return new Pair<bool, bool>(true, false);
1925         }
1926 }
1927
1928 /**
1929  * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1930  */
1931 bool Table::compactArbitrationData() {
1932
1933         if (pendingSendArbitrationRounds.size() < 2) {
1934                 // Nothing to compact so do nothing
1935                 return false;
1936         }
1937
1938         ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1939         if (lastRound.didSendPart()) {
1940                 return false;
1941         }
1942
1943         bool hadCommit = (lastRound.getCommit() == NULL);
1944         bool gotNewCommit = false;
1945
1946         int numberToDelete = 1;
1947         while (numberToDelete < pendingSendArbitrationRounds.size()) {
1948                 ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1949
1950                 if (round.isFull() || round.didSendPart()) {
1951                         // Stop since there is a part that cannot be compacted and we need to compact in order
1952                         break;
1953                 }
1954
1955                 if (round.getCommit() == NULL) {
1956
1957                         // Try compacting aborts only
1958                         int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1959                         if (newSize > ArbitrationRound.MAX_PARTS) {
1960                                 // Cant compact since it would be too large
1961                                 break;
1962                         }
1963                         lastRound.addAborts(round.getAborts());
1964                 } else {
1965
1966                         // Create a new larger commit
1967                         Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1968                         localArbitrationSequenceNumber++;
1969
1970                         // Create the commit parts so that we can count them
1971                         newCommit.createCommitParts();
1972
1973                         // Calculate the new size of the parts
1974                         int newSize = newCommit.getNumberOfParts();
1975                         newSize += lastRound.getAbortsCount();
1976                         newSize += round.getAbortsCount();
1977
1978                         if (newSize > ArbitrationRound.MAX_PARTS) {
1979                                 // Cant compact since it would be too large
1980                                 break;
1981                         }
1982
1983                         // Set the new compacted part
1984                         lastRound.setCommit(newCommit);
1985                         lastRound.addAborts(round.getAborts());
1986                         gotNewCommit = true;
1987                 }
1988
1989                 numberToDelete++;
1990         }
1991
1992         if (numberToDelete != 1) {
1993                 // If there is a compaction
1994
1995                 // Delete the previous pieces that are now in the new compacted piece
1996                 if (numberToDelete == pendingSendArbitrationRounds.size()) {
1997                         pendingSendArbitrationRounds.clear();
1998                 } else {
1999                         for (int i = 0; i < numberToDelete; i++) {
2000                                 pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
2001                         }
2002                 }
2003
2004                 // Add the new compacted into the pending to send list
2005                 pendingSendArbitrationRounds.add(lastRound);
2006
2007                 // Should reinsert into the commit processor
2008                 if (hadCommit && gotNewCommit) {
2009                         return true;
2010                 }
2011         }
2012
2013         return false;
2014 }
2015 // bool compactArbitrationData() {
2016 //  return false;
2017 // }
2018
2019 /**
2020  * Update all the commits and the committed tables, sets dead the dead transactions
2021  */
2022 bool Table::updateCommittedTable() {
2023
2024         if (newCommitParts.size() == 0) {
2025                 // Nothing new to process
2026                 return false;
2027         }
2028
2029         // Iterate through all the machine Ids that we received new parts for
2030         for (Long machineId : newCommitParts.keySet()) {
2031                 Hashtable<Pair<int64_t int32_t>, CommitPart> parts = newCommitParts.get(machineId);
2032
2033                 // Iterate through all the parts for that machine Id
2034                 for (Pair<int64_t int32_t> partId : parts.keySet()) {
2035                         CommitPart part = parts.get(partId);
2036
2037                         // Get the transaction object for that sequence number
2038                         Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
2039
2040                         if (commitForClientTable == NULL) {
2041                                 // This is the first commit from this device
2042                                 commitForClientTable = new Hashtable<int64_t Commit>();
2043                                 liveCommitsTable.put(part.getMachineId(), commitForClientTable);
2044                         }
2045
2046                         Commit commit = commitForClientTable.get(part.getSequenceNumber());
2047
2048                         if (commit == NULL) {
2049                                 // This is a new commit that we dont have so make a new one
2050                                 commit = new Commit();
2051
2052                                 // Insert this new commit into the live tables
2053                                 commitForClientTable.put(part.getSequenceNumber(), commit);
2054                         }
2055
2056                         // Add that part to the commit
2057                         commit.addPartDecode(part);
2058                 }
2059         }
2060
2061         // Clear all the new commits parts in preparation for the next time the server sends slots
2062         newCommitParts.clear();
2063
2064         // If we process a new commit keep track of it for future use
2065         bool didProcessANewCommit = false;
2066
2067         // Process the commits one by one
2068         for (Long arbitratorId : liveCommitsTable.keySet()) {
2069
2070                 // Get all the commits for a specific arbitrator
2071                 Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2072
2073                 // Sort the commits in order
2074                 Vector<Long> commitSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
2075                 Collections.sort(commitSequenceNumbers);
2076
2077                 // Get the last commit seen from this arbitrator
2078                 int64_t lastCommitSeenSequenceNumber = -1;
2079                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != NULL) {
2080                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2081                 }
2082
2083                 // Go through each new commit one by one
2084                 for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2085                         Long commitSequenceNumber = commitSequenceNumbers.get(i);
2086                         Commit commit = commitForClientTable.get(commitSequenceNumber);
2087
2088                         // Special processing if a commit is not complete
2089                         if (!commit.isComplete()) {
2090                                 if (i == (commitSequenceNumbers.size() - 1)) {
2091                                         // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2092                                         break;
2093                                 } else {
2094                                         // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2095                                         // Delete it and move on
2096                                         commit.setDead();
2097                                         commitForClientTable.remove(commit.getSequenceNumber());
2098                                         continue;
2099                                 }
2100                         }
2101
2102                         // Update the last transaction that was updated if we can
2103                         if (commit.getTransactionSequenceNumber() != -1) {
2104                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2105
2106                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2107                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2108                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2109                                 }
2110                         }
2111
2112                         // Update the last arbitration data that we have seen so far
2113                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != NULL) {
2114
2115                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2116                                 if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2117                                         // Is larger
2118                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2119                                 }
2120                         } else {
2121                                 // Never seen any data from this arbitrator so record the first one
2122                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2123                         }
2124
2125                         // We have already seen this commit before so need to do the full processing on this commit
2126                         if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2127
2128                                 // Update the last transaction that was updated if we can
2129                                 if (commit.getTransactionSequenceNumber() != -1) {
2130                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2131
2132                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2133                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2134                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2135                                         }
2136                                 }
2137
2138                                 continue;
2139                         }
2140
2141                         // If we got here then this is a brand new commit and needs full processing
2142
2143                         // Get what commits should be edited, these are the commits that have live values for their keys
2144                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2145                         for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2146                                 commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2147                         }
2148                         commitsToEdit.remove(NULL);             // remove NULL since it could be in this set
2149
2150                         // Update each previous commit that needs to be updated
2151                         for (Commit previousCommit : commitsToEdit) {
2152
2153                                 // Only bother with live commits (TODO: Maybe remove this check)
2154                                 if (previousCommit.isLive()) {
2155
2156                                         // Update which keys in the old commits are still live
2157                                         for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2158                                                 previousCommit.invalidateKey(kv.getKey());
2159                                         }
2160
2161                                         // if the commit is now dead then remove it
2162                                         if (!previousCommit.isLive()) {
2163                                                 commitForClientTable.remove(previousCommit);
2164                                         }
2165                                 }
2166                         }
2167
2168                         // Update the last seen sequence number from this arbitrator
2169                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != NULL) {
2170                                 if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2171                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2172                                 }
2173                         } else {
2174                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2175                         }
2176
2177                         // We processed a new commit that we havent seen before
2178                         didProcessANewCommit = true;
2179
2180                         // Update the committed table of keys and which commit is using which key
2181                         for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2182                                 committedKeyValueTable.put(kv.getKey(), kv);
2183                                 liveCommitsByKeyTable.put(kv.getKey(), commit);
2184                         }
2185                 }
2186         }
2187
2188         return didProcessANewCommit;
2189 }
2190
2191 /**
2192  * Create the speculative table from transactions that are still live and have come from the cloud
2193  */
2194 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2195         if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2196                 // There is nothing to speculate on
2197                 return false;
2198         }
2199
2200         // Create a list of the transaction sequence numbers and sort them from oldest to newest
2201         Vector<Long> transactionSequenceNumbersSorted = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
2202         Collections.sort(transactionSequenceNumbersSorted);
2203
2204         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2205
2206
2207         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2208                 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2209                 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2210
2211                 // Start from scratch
2212                 speculatedKeyValueTable.clear();
2213                 lastTransactionSequenceNumberSpeculatedOn = -1;
2214                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2215
2216         }
2217
2218         // Remember the front of the transaction list
2219         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2220
2221         // Find where to start arbitration from
2222         int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2223
2224         if (startIndex >= transactionSequenceNumbersSorted.size()) {
2225                 // Make sure we are not out of bounds
2226                 return false;           // did not speculate
2227         }
2228
2229         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2230         bool didSkip = true;
2231
2232         for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2233                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2234                 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2235
2236                 if (!transaction.isComplete()) {
2237                         // If there is an incomplete transaction then there is nothing we can do
2238                         // add this transactions arbitrator to the list of arbitrators we should ignore
2239                         incompleteTransactionArbitrator.add(transaction.getArbitrator());
2240                         didSkip = true;
2241                         continue;
2242                 }
2243
2244                 if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2245                         continue;
2246                 }
2247
2248                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2249
2250                 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2251                         // Guard evaluated to true so update the speculative table
2252                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2253                                 speculatedKeyValueTable.put(kv.getKey(), kv);
2254                         }
2255                 }
2256         }
2257
2258         if (didSkip) {
2259                 // Since there was a skip we need to redo the speculation next time around
2260                 lastTransactionSequenceNumberSpeculatedOn = -1;
2261                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2262         }
2263
2264         // We did some speculation
2265         return true;
2266 }
2267
2268 /**
2269  * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2270  */
2271 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2272         if (pendingTransactionQueue.size() == 0) {
2273                 // There is nothing to speculate on
2274                 return;
2275         }
2276
2277
2278         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2279                 // need to reset on the pending speculation
2280                 lastPendingTransactionSpeculatedOn = NULL;
2281                 firstPendingTransaction = pendingTransactionQueue.get(0);
2282                 pendingTransactionSpeculatedKeyValueTable.clear();
2283         }
2284
2285         // Find where to start arbitration from
2286         int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2287
2288         if (startIndex >= pendingTransactionQueue.size()) {
2289                 // Make sure we are not out of bounds
2290                 return;
2291         }
2292
2293         for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2294                 Transaction transaction = pendingTransactionQueue.get(i);
2295
2296                 lastPendingTransactionSpeculatedOn = transaction;
2297
2298                 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2299                         // Guard evaluated to true so update the speculative table
2300                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2301                                 pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2302                         }
2303                 }
2304         }
2305 }
2306
2307 /**
2308  * Set dead and remove from the live transaction tables the transactions that are dead
2309  */
2310 void Table::updateLiveTransactionsAndStatus() {
2311
2312         // Go through each of the transactions
2313         for (Iterator<Map.Entry<int64_t Transaction> > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2314                 Transaction transaction = iter.next().getValue();
2315
2316                 // Check if the transaction is dead
2317                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2318                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2319
2320                         // Set dead the transaction
2321                         transaction.setDead();
2322
2323                         // Remove the transaction from the live table
2324                         iter.remove();
2325                         liveTransactionByTransactionIdTable.remove(transaction.getId());
2326                 }
2327         }
2328
2329         // Go through each of the transactions
2330         for (Iterator<Map.Entry<int64_t TransactionStatus> > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2331                 TransactionStatus status = iter.next().getValue();
2332
2333                 // Check if the transaction is dead
2334                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2335                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2336
2337                         // Set committed
2338                         status.setStatus(TransactionStatus.StatusCommitted);
2339
2340                         // Remove
2341                         iter.remove();
2342                 }
2343         }
2344 }
2345
2346 /**
2347  * Process this slot, entry by entry.  Also update the latest message sent by slot
2348  */
2349 void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2350
2351         // Update the last message seen
2352         updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2353
2354         // Process each entry in the slot
2355         for (Entry entry : slot.getEntries()) {
2356                 switch (entry.getType()) {
2357
2358                 case Entry.TypeCommitPart:
2359                         processEntry((CommitPart)entry);
2360                         break;
2361
2362                 case Entry.TypeAbort:
2363                         processEntry((Abort)entry);
2364                         break;
2365
2366                 case Entry.TypeTransactionPart:
2367                         processEntry((TransactionPart)entry);
2368                         break;
2369
2370                 case Entry.TypeNewKey:
2371                         processEntry((NewKey)entry);
2372                         break;
2373
2374                 case Entry.TypeLastMessage:
2375                         processEntry((LastMessage)entry, machineSet);
2376                         break;
2377
2378                 case Entry.TypeRejectedMessage:
2379                         processEntry((RejectedMessage)entry, indexer);
2380                         break;
2381
2382                 case Entry.TypeTableStatus:
2383                         processEntry((TableStatus)entry, slot.getSequenceNumber());
2384                         break;
2385
2386                 default:
2387                         throw new Error("Unrecognized type: " + entry.getType());
2388                 }
2389         }
2390 }
2391
2392 /**
2393  * Update the last message that was sent for a machine Id
2394  */
2395 void Table::processEntry(LastMessage entry, Hashset<int64_t> *machineSet) {
2396         // Update what the last message received by a machine was
2397         updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2398 }
2399
2400 /**
2401  * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2402  */
2403 void Table::processEntry(NewKey entry) {
2404
2405         // Update the arbitrator table with the new key information
2406         arbitratorTable.put(entry.getKey(), entry.getMachineID());
2407
2408         // Update what the latest live new key is
2409         NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2410         if (oldNewKey != NULL) {
2411                 // Delete the old new key messages
2412                 oldNewKey.setDead();
2413         }
2414 }
2415
2416 /**
2417  * Process new table status entries and set dead the old ones as new ones come in.
2418  * keeps track of the largest and smallest table status seen in this current round
2419  * of updating the local copy of the block chain
2420  */
2421 void Table::processEntry(TableStatus entry, int64_t seq) {
2422         int newNumSlots = entry.getMaxSlots();
2423         updateCurrMaxSize(newNumSlots);
2424
2425         initExpectedSize(seq, newNumSlots);
2426
2427         if (liveTableStatus != NULL) {
2428                 // We have a larger table status so the old table status is no int64_ter alive
2429                 liveTableStatus.setDead();
2430         }
2431
2432         // Make this new table status the latest alive table status
2433         liveTableStatus = entry;
2434 }
2435
2436 /**
2437  * Check old messages to see if there is a block chain violation. Also
2438  */
2439 void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) {
2440         int64_t oldSeqNum = entry.getOldSeqNum();
2441         int64_t newSeqNum = entry.getNewSeqNum();
2442         bool isequal = entry.getEqual();
2443         int64_t machineId = entry.getMachineID();
2444         int64_t seq = entry.getSequenceNumber();
2445
2446
2447         // Check if we have messages that were supposed to be rejected in our local block chain
2448         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2449
2450                 // Get the slot
2451                 Slot slot = indexer.getSlot(seqNum);
2452
2453                 if (slot != NULL) {
2454                         // If we have this slot make sure that it was not supposed to be a rejected slot
2455
2456                         int64_t slotMachineId = slot.getMachineID();
2457                         if (isequal != (slotMachineId == machineId)) {
2458                                 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2459                         }
2460                 }
2461         }
2462
2463
2464         // Create a list of clients to watch until they see this rejected message entry.
2465         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2466         for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
2467
2468                 // Machine ID for the last message entry
2469                 int64_t lastMessageEntryMachineId = lastMessageEntry.getKey();
2470
2471                 // We've seen it, don't need to continue to watch.  Our next
2472                 // message will implicitly acknowledge it.
2473                 if (lastMessageEntryMachineId == localMachineId) {
2474                         continue;
2475                 }
2476
2477                 Pair<int64_t Liveness> lastMessageValue = lastMessageEntry.getValue();
2478                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2479
2480                 if (entrySequenceNumber < seq) {
2481
2482                         // Add this rejected message to the set of messages that this machine ID did not see yet
2483                         addWatchVector(lastMessageEntryMachineId, entry);
2484
2485                         // This client did not see this rejected message yet so add it to the watch set to monitor
2486                         deviceWatchSet.add(lastMessageEntryMachineId);
2487                 }
2488         }
2489
2490         if (deviceWatchSet.isEmpty()) {
2491                 // This rejected message has been seen by all the clients so
2492                 entry.setDead();
2493         } else {
2494                 // We need to watch this rejected message
2495                 entry.setWatchSet(deviceWatchSet);
2496         }
2497 }
2498
2499 /**
2500  * Check if this abort is live, if not then save it so we can kill it later.
2501  * update the last transaction number that was arbitrated on.
2502  */
2503 void Table::processEntry(Abort entry) {
2504
2505
2506         if (entry.getTransactionSequenceNumber() != -1) {
2507                 // update the transaction status if it was sent to the server
2508                 TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2509                 if (status != NULL) {
2510                         status.setStatus(TransactionStatus.StatusAborted);
2511                 }
2512         }
2513
2514         // Abort has not been seen by the client it is for yet so we need to keep track of it
2515         Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2516         if (previouslySeenAbort != NULL) {
2517                 previouslySeenAbort.setDead();  // Delete old version of the abort since we got a rescued newer version
2518         }
2519
2520         if (entry.getTransactionArbitrator() == localMachineId) {
2521                 liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2522         }
2523
2524         if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2525
2526                 // The machine already saw this so it is dead
2527                 entry.setDead();
2528                 liveAbortTable.remove(entry.getAbortId());
2529
2530                 if (entry.getTransactionArbitrator() == localMachineId) {
2531                         liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2532                 }
2533
2534                 return;
2535         }
2536
2537
2538
2539
2540         // Update the last arbitration data that we have seen so far
2541         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != NULL) {
2542
2543                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2544                 if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2545                         // Is larger
2546                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2547                 }
2548         } else {
2549                 // Never seen any data from this arbitrator so record the first one
2550                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2551         }
2552
2553
2554         // Set dead a transaction if we can
2555         Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<int64_t, int64_t>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2556         if (transactionToSetDead != NULL) {
2557                 liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2558         }
2559
2560         // Update the last transaction sequence number that the arbitrator arbitrated on
2561         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2562         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2563
2564                 // Is a valid one
2565                 if (entry.getTransactionSequenceNumber() != -1) {
2566                         lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2567                 }
2568         }
2569 }
2570
2571 /**
2572  * Set dead the transaction part if that transaction is dead and keep track of all new parts
2573  */
2574 void Table::processEntry(TransactionPart entry) {
2575         // Check if we have already seen this transaction and set it dead OR if it is not alive
2576         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2577         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2578                 // This transaction is dead, it was already committed or aborted
2579                 entry.setDead();
2580                 return;
2581         }
2582
2583         // This part is still alive
2584         Hashtable<Pair<int64_t int32_t>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2585
2586         if (transactionPart == NULL) {
2587                 // Dont have a table for this machine Id yet so make one
2588                 transactionPart = new Hashtable<Pair<int64_t int32_t>, TransactionPart>();
2589                 newTransactionParts.put(entry.getMachineId(), transactionPart);
2590         }
2591
2592         // Update the part and set dead ones we have already seen (got a rescued version)
2593         TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2594         if (previouslySeenPart != NULL) {
2595                 previouslySeenPart.setDead();
2596         }
2597 }
2598
2599 /**
2600  * Process new commit entries and save them for future use.  Delete duplicates
2601  */
2602 void Table::processEntry(CommitPart entry) {
2603
2604
2605         // Update the last transaction that was updated if we can
2606         if (entry.getTransactionSequenceNumber() != -1) {
2607                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2608
2609                 // Update the last transaction sequence number that the arbitrator arbitrated on
2610                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2611                         lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2612                 }
2613         }
2614
2615
2616
2617
2618         Hashtable<Pair<int64_t int32_t>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2619
2620         if (commitPart == NULL) {
2621                 // Don't have a table for this machine Id yet so make one
2622                 commitPart = new Hashtable<Pair<int64_t int32_t>, CommitPart>();
2623                 newCommitParts.put(entry.getMachineId(), commitPart);
2624         }
2625
2626         // Update the part and set dead ones we have already seen (got a rescued version)
2627         CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2628         if (previouslySeenPart != NULL) {
2629                 previouslySeenPart.setDead();
2630         }
2631 }
2632
2633 /**
2634  * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2635  * Updates the live aborts, removes those that are dead and sets them dead.
2636  * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2637  * other clients have not had a rollback on the last message.
2638  */
2639 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2640
2641         // We have seen this machine ID
2642         machineSet.remove(machineId);
2643
2644         // Get the set of rejected messages that this machine Id is has not seen yet
2645         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable.get(machineId);
2646
2647         // If there is a rejected message that this machine Id has not seen yet
2648         if (watchset != NULL) {
2649
2650                 // Go through each rejected message that this machine Id has not seen yet
2651                 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2652
2653                         RejectedMessage rm = rmit.next();
2654
2655                         // If this machine Id has seen this rejected message...
2656                         if (rm.getSequenceNumber() <= seqNum) {
2657
2658                                 // Remove it from our watchlist
2659                                 rmit.remove();
2660
2661                                 // Decrement machines that need to see this notification
2662                                 rm.removeWatcher(machineId);
2663                         }
2664                 }
2665         }
2666
2667         // Set dead the abort
2668         for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2669                 Abort abort = i.next().getValue();
2670
2671                 if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2672                         abort.setDead();
2673                         i.remove();
2674
2675                         if (abort.getTransactionArbitrator() == localMachineId) {
2676                                 liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2677                         }
2678                 }
2679         }
2680
2681
2682
2683         if (machineId == localMachineId) {
2684                 // Our own messages are immediately dead.
2685                 if (liveness instanceof LastMessage) {
2686                         ((LastMessage)liveness).setDead();
2687                 } else if (liveness instanceof Slot) {
2688                         ((Slot)liveness).setDead();
2689                 } else {
2690                         throw new Error("Unrecognized type");
2691                 }
2692         }
2693
2694         // Get the old last message for this device
2695         Pair<int64_t Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<int64_t Liveness>(seqNum, liveness));
2696         if (lastMessageEntry == NULL) {
2697                 // If no last message then there is nothing else to process
2698                 return;
2699         }
2700
2701         int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2702         Liveness lastEntry = lastMessageEntry.getSecond();
2703
2704         // If it is not our machine Id since we already set ours to dead
2705         if (machineId != localMachineId) {
2706                 if (lastEntry instanceof LastMessage) {
2707                         ((LastMessage)lastEntry).setDead();
2708                 } else if (lastEntry instanceof Slot) {
2709                         ((Slot)lastEntry).setDead();
2710                 } else {
2711                         throw new Error("Unrecognized type");
2712                 }
2713         }
2714
2715         // Make sure the server is not playing any games
2716         if (machineId == localMachineId) {
2717
2718                 if (hadPartialSendToServer) {
2719                         // We were not making any updates and we had a machine mismatch
2720                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2721                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2722                         }
2723
2724                 } else {
2725                         // We were not making any updates and we had a machine mismatch
2726                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2727                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2728                         }
2729                 }
2730         } else {
2731                 if (lastMessageSeqNum > seqNum) {
2732                         throw new Error("Server Error: Rollback on remote machine sequence number");
2733                 }
2734         }
2735 }
2736
2737 /**
2738  * Add a rejected message entry to the watch set to keep track of which clients have seen that
2739  * rejected message entry and which have not.
2740  */
2741 void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
2742         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable.get(machineId);
2743         if (entries == NULL) {
2744                 // There is no set for this machine ID yet so create one
2745                 entries = new Hashset<RejectedMessage *>();
2746                 rejectedMessageWatchVectorTable.put(machineId, entries);
2747         }
2748         entries.add(entry);
2749 }
2750
2751 /**
2752  * Check if the HMAC chain is not violated
2753  */
2754 void Table::checkHMACChain(SlotIndexer indexer, Array<Slot> *newSlots) {
2755         for (int i = 0; i < newSlots.length; i++) {
2756                 Slot currSlot = newSlots[i];
2757                 Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2758                 if (prevSlot != NULL &&
2759                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2760                         throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2761         }
2762 }
2763