Changes
[iotcloud.git] / src2 / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.List;
12 import java.util.Set;
13 import java.util.Collection;
14
15 /**
16  * IoTTable data structure.  Provides client inferface.
17  * @author Brian Demsky
18  * @version 1.0
19  */
20
21 final public class Table {
22         private int numslots;   //number of slots stored in buffer
23
24         //table of key-value pairs
25         //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
26
27         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
28         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
29         // machine id -> ...
30         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
31         private Vector<Long> rejectedmessagelist = new Vector<Long>();
32         private SlotBuffer buffer;
33         private CloudComm cloud;
34         private long sequencenumber; //Largest sequence number a client has received
35         private long localmachineid;
36         private TableStatus lastTableStatus;
37         static final int FREE_SLOTS = 10; //number of slots that should be kept free
38         static final int SKIP_THRESHOLD = 10;
39         private long liveslotcount = 0;
40         private int chance;
41         static final double RESIZE_MULTIPLE = 1.2;
42         static final double RESIZE_THRESHOLD = 0.75;
43         static final int REJECTED_THRESHOLD = 5;
44         private int resizethreshold;
45         private long lastliveslotseqn;  //smallest sequence number with a live entry
46         private Random random = new Random();
47
48         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
49         private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
50         private List<Commit> commitList = null; // List of all the most recent live commits
51         private Set<Abort> abortSet = null; // Set of the live aborts
52         private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
53         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
54         private List<Transaction> uncommittedTransactionsList = null; //
55         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
56         // private Set<Abort> arbitratorTable = null; // Table of arbitrators
57
58
59         public Table(String baseurl, String password, long _localmachineid) {
60                 localmachineid = _localmachineid;
61                 buffer = new SlotBuffer();
62                 numslots = buffer.capacity();
63                 setResizeThreshold();
64                 sequencenumber = 0;
65                 cloud = new CloudComm(this, baseurl, password);
66                 lastliveslotseqn = 1;
67
68                 pendingTransQueue = new LinkedList<PendingTransaction>();
69                 commitList = new LinkedList<Commit>();
70                 abortSet = new HashSet<Abort>();
71                 commitedTable = new HashMap<IoTString, KeyValue>();
72                 speculativeTable = new HashMap<IoTString, KeyValue>();
73                 uncommittedTransactionsList = new LinkedList<Transaction>();
74                 arbitratorTable = new HashMap<IoTString, Long>();
75         }
76
77         public Table(CloudComm _cloud, long _localmachineid) {
78                 localmachineid = _localmachineid;
79                 buffer = new SlotBuffer();
80                 numslots = buffer.capacity();
81                 setResizeThreshold();
82                 sequencenumber = 0;
83                 cloud = _cloud;
84
85                 pendingTransQueue = new LinkedList<PendingTransaction>();
86                 commitList = new LinkedList<Commit>();
87                 abortSet = new HashSet<Abort>();
88                 commitedTable = new HashMap<IoTString, KeyValue>();
89                 speculativeTable = new HashMap<IoTString, KeyValue>();
90                 uncommittedTransactionsList = new LinkedList<Transaction>();
91                 arbitratorTable = new HashMap<IoTString, Long>();
92         }
93
94         public void rebuild() {
95                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
96                 validateandupdate(newslots, true);
97         }
98
99
100
101         public IoTString getCommitted(IoTString key) {
102                 KeyValue kv = commitedTable.get(key);
103                 if (kv != null) {
104                         return kv.getValue();
105                 } else {
106                         return null;
107                 }
108         }
109
110         public IoTString getSpeculative(IoTString key) {
111                 KeyValue kv = speculativeTable.get(key);
112                 if (kv != null) {
113                         return kv.getValue();
114                 } else {
115                         return null;
116                 }
117         }
118
119
120         public void initTable() {
121                 cloud.setSalt();//Set the salt
122                 Slot s = new Slot(this, 1, localmachineid);
123                 TableStatus status = new TableStatus(s, numslots);
124                 s.addEntry(status);
125                 Slot[] array = cloud.putSlot(s, numslots);
126                 if (array == null) {
127                         array = new Slot[] {s};
128                         /* update data structure */
129                         validateandupdate(array, true);
130                 } else {
131                         throw new Error("Error on initialization");
132                 }
133         }
134
135         public String toString() {
136
137
138                 String retString = " Committed Table: \n";
139                 retString += "---------------------------\n";
140                 retString += commitedTable.toString();
141
142                 retString += "\n\n";
143
144                 retString += " Speculative Table: \n";
145                 retString += "---------------------------\n";
146                 retString += speculativeTable.toString();
147
148                 return retString;
149         }
150
151
152
153
154
155
156         public void startTransaction() {
157                 // Create a new transaction, invalidates any old pending transactions.
158                 pendingTransBuild = new PendingTransaction();
159         }
160
161         public void commitTransaction() {
162
163                 if (pendingTransBuild.getKVUpdates().size() == 0) {
164                         // If no updates are made then there is no point inserting into the chain
165                         return;
166                 }
167
168                 // Add the pending transaction to the queue
169                 pendingTransQueue.add(pendingTransBuild);
170
171                 while (!pendingTransQueue.isEmpty()) {
172                         if (tryput( pendingTransQueue.peek(), false)) {
173                                 pendingTransQueue.poll();
174                         }
175                 }
176         }
177
178         public void addKV(IoTString key, IoTString value) {
179
180                 // Make sure new key value pair matches the current arbitrator
181                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
182                         // TODO: Maybe not throw and error
183                         throw new Error("Not all Key Values match");
184                 }
185
186
187
188                 KeyValue kv = new KeyValue(key, value);
189                 pendingTransBuild.addKV(kv);
190         }
191
192         public void addGuard(Guard guard) {
193                 pendingTransBuild.addGuard(guard);
194         }
195
196         public void update() {
197                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
198
199                 validateandupdate(newslots, false);
200
201                 if (uncommittedTransactionsList.size() > 0) {
202                         List<Transaction> uncommittedTransArb = new LinkedList<Transaction>();
203
204                         for (Transaction ut : uncommittedTransactionsList) {
205                                 KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]);
206                                 long arb = arbitratorTable.get(kv.getKey());
207
208                                 if (arb == localmachineid) {
209                                         uncommittedTransArb.add(ut);
210                                 }
211                         }
212
213
214                         boolean needResize = false;
215                         while (uncommittedTransArb.size() > 0) {
216                                 boolean resize = needResize;
217                                 needResize = false;
218
219                                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
220                                 int newsize = 0;
221                                 if (liveslotcount > resizethreshold) {
222                                         resize = true; //Resize is forced
223                                 }
224
225                                 if (resize) {
226                                         newsize = (int) (numslots * RESIZE_MULTIPLE);
227                                         TableStatus status = new TableStatus(s, newsize);
228                                         s.addEntry(status);
229                                 }
230
231                                 if (! rejectedmessagelist.isEmpty()) {
232                                         /* TODO: We should avoid generating a rejected message entry if
233                                          * there is already a sufficient entry in the queue (e.g.,
234                                          * equalsto value of true and same sequence number).  */
235
236                                         long old_seqn = rejectedmessagelist.firstElement();
237                                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
238                                                 long new_seqn = rejectedmessagelist.lastElement();
239                                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
240                                                 s.addEntry(rm);
241                                         } else {
242                                                 long prev_seqn = -1;
243                                                 int i = 0;
244                                                 /* Go through list of missing messages */
245                                                 for (; i < rejectedmessagelist.size(); i++) {
246                                                         long curr_seqn = rejectedmessagelist.get(i);
247                                                         Slot s_msg = buffer.getSlot(curr_seqn);
248                                                         if (s_msg != null)
249                                                                 break;
250                                                         prev_seqn = curr_seqn;
251                                                 }
252                                                 /* Generate rejected message entry for missing messages */
253                                                 if (prev_seqn != -1) {
254                                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
255                                                         s.addEntry(rm);
256                                                 }
257                                                 /* Generate rejected message entries for present messages */
258                                                 for (; i < rejectedmessagelist.size(); i++) {
259                                                         long curr_seqn = rejectedmessagelist.get(i);
260                                                         Slot s_msg = buffer.getSlot(curr_seqn);
261                                                         long machineid = s_msg.getMachineID();
262                                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
263                                                         s.addEntry(rm);
264                                                 }
265                                         }
266                                 }
267
268                                 long newestseqnum = buffer.getNewestSeqNum();
269                                 long oldestseqnum = buffer.getOldestSeqNum();
270                                 if (lastliveslotseqn < oldestseqnum) {
271                                         lastliveslotseqn = oldestseqnum;
272                                 }
273
274                                 long seqn = lastliveslotseqn;
275                                 boolean seenliveslot = false;
276                                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
277                                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
278
279                                 boolean tryAgain = false;
280
281                                 // Mandatory Rescue
282                                 for (; seqn < threshold; seqn++) {
283                                         Slot prevslot = buffer.getSlot(seqn);
284                                         //Push slot number forward
285                                         if (! seenliveslot)
286                                                 lastliveslotseqn = seqn;
287
288                                         if (! prevslot.isLive())
289                                                 continue;
290                                         seenliveslot = true;
291                                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
292                                         for (Entry liveentry : liveentries) {
293                                                 if (s.hasSpace(liveentry)) {
294                                                         s.addEntry(liveentry);
295                                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
296                                                         if (!resize) {
297                                                                 System.out.print("B"); //?
298                                                                 tryAgain = true;
299                                                                 needResize = true;
300                                                         }
301                                                 }
302                                         }
303                                 }
304
305                                 if (tryAgain) {
306                                         continue;
307                                 }
308
309                                 // Arbitrate
310                                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
311                                 for (Iterator<Transaction> i = uncommittedTransArb.iterator(); i.hasNext();) {
312                                         Transaction ut = i.next();
313
314                                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
315                                         // Check if this machine arbitrates for this transaction
316                                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
317                                                 continue;
318                                         }
319
320                                         Entry newEntry = null;
321
322                                         try {
323                                                 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
324                                                         // Guard evaluated as true
325
326                                                         // update the local tmp current key set
327                                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
328                                                                 speculativeTableTmp.put(kv.getKey(), kv);
329                                                         }
330
331                                                         // create the commit
332                                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
333                                                 } else {
334                                                         // Guard was false
335
336                                                         // create the abort
337                                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
338                                                 }
339                                         } catch (Exception e) {
340                                                 e.printStackTrace();
341                                         }
342
343                                         if ((newEntry != null) && s.hasSpace(newEntry)) {
344                                                 s.addEntry(newEntry);
345                                                 i.remove();
346
347                                         } else {
348                                                 break;
349                                         }
350                                 }
351
352                                 /* now go through live entries from least to greatest sequence number until
353                                  * either all live slots added, or the slot doesn't have enough room
354                                  * for SKIP_THRESHOLD consecutive entries*/
355                                 int skipcount = 0;
356                                 search:
357                                 for (; seqn <= newestseqnum; seqn++) {
358                                         Slot prevslot = buffer.getSlot(seqn);
359                                         //Push slot number forward
360                                         if (!seenliveslot)
361                                                 lastliveslotseqn = seqn;
362
363                                         if (!prevslot.isLive())
364                                                 continue;
365                                         seenliveslot = true;
366                                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
367                                         for (Entry liveentry : liveentries) {
368                                                 if (s.hasSpace(liveentry))
369                                                         s.addEntry(liveentry);
370                                                 else {
371                                                         skipcount++;
372                                                         if (skipcount > SKIP_THRESHOLD)
373                                                                 break search;
374                                                 }
375                                         }
376                                 }
377
378                                 int max = 0;
379                                 if (resize)
380                                         max = newsize;
381                                 Slot[] array = cloud.putSlot(s, max);
382                                 if (array == null) {
383                                         array = new Slot[] {s};
384                                         rejectedmessagelist.clear();
385                                 }       else {
386                                         if (array.length == 0)
387                                                 throw new Error("Server Error: Did not send any slots");
388                                         rejectedmessagelist.add(s.getSequenceNumber());
389                                 }
390
391                                 /* update data structure */
392                                 validateandupdate(array, true);
393                         }
394
395
396                 }
397         }
398
399         public boolean createNewKey(IoTString keyName, long machineId) {
400
401                 while (true) {
402                         if (arbitratorTable.get(keyName) != null) {
403                                 // There is already an arbitrator
404                                 return false;
405                         }
406
407                         if (tryput(keyName, machineId, false)) {
408
409                                 // If successfully inserted
410                                 return true;
411                         }
412                 }
413         }
414
415         void decrementLiveCount() {
416                 liveslotcount--;
417         }
418
419
420         private void setResizeThreshold() {
421                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
422                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
423         }
424
425         private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
426                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
427                 int newsize = 0;
428                 if (liveslotcount > resizethreshold) {
429                         resize = true; //Resize is forced
430                 }
431
432                 if (resize) {
433                         newsize = (int) (numslots * RESIZE_MULTIPLE);
434                         TableStatus status = new TableStatus(s, newsize);
435                         s.addEntry(status);
436                 }
437
438                 if (! rejectedmessagelist.isEmpty()) {
439                         /* TODO: We should avoid generating a rejected message entry if
440                          * there is already a sufficient entry in the queue (e.g.,
441                          * equalsto value of true and same sequence number).  */
442
443                         long old_seqn = rejectedmessagelist.firstElement();
444                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
445                                 long new_seqn = rejectedmessagelist.lastElement();
446                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
447                                 s.addEntry(rm);
448                         } else {
449                                 long prev_seqn = -1;
450                                 int i = 0;
451                                 /* Go through list of missing messages */
452                                 for (; i < rejectedmessagelist.size(); i++) {
453                                         long curr_seqn = rejectedmessagelist.get(i);
454                                         Slot s_msg = buffer.getSlot(curr_seqn);
455                                         if (s_msg != null)
456                                                 break;
457                                         prev_seqn = curr_seqn;
458                                 }
459                                 /* Generate rejected message entry for missing messages */
460                                 if (prev_seqn != -1) {
461                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
462                                         s.addEntry(rm);
463                                 }
464                                 /* Generate rejected message entries for present messages */
465                                 for (; i < rejectedmessagelist.size(); i++) {
466                                         long curr_seqn = rejectedmessagelist.get(i);
467                                         Slot s_msg = buffer.getSlot(curr_seqn);
468                                         long machineid = s_msg.getMachineID();
469                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
470                                         s.addEntry(rm);
471                                 }
472                         }
473                 }
474
475                 long newestseqnum = buffer.getNewestSeqNum();
476                 long oldestseqnum = buffer.getOldestSeqNum();
477                 if (lastliveslotseqn < oldestseqnum)
478                         lastliveslotseqn = oldestseqnum;
479
480                 long seqn = lastliveslotseqn;
481                 boolean seenliveslot = false;
482                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
483                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
484
485
486                 // Mandatory Rescue
487                 for (; seqn < threshold; seqn++) {
488                         Slot prevslot = buffer.getSlot(seqn);
489                         //Push slot number forward
490                         if (! seenliveslot)
491                                 lastliveslotseqn = seqn;
492
493                         if (! prevslot.isLive())
494                                 continue;
495                         seenliveslot = true;
496                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
497                         for (Entry liveentry : liveentries) {
498                                 if (s.hasSpace(liveentry)) {
499                                         s.addEntry(liveentry);
500                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
501                                         if (!resize) {
502                                                 System.out.print("B"); //?
503                                                 return tryput(pendingTrans, true);
504                                         }
505                                 }
506                         }
507                 }
508
509
510                 // Arbitrate
511                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
512                 for (Transaction ut : uncommittedTransactionsList) {
513
514                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
515                         // Check if this machine arbitrates for this transaction
516                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
517                                 continue;
518                         }
519
520                         Entry newEntry = null;
521
522                         try {
523                                 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
524                                         // Guard evaluated as true
525
526                                         // update the local tmp current key set
527                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
528                                                 speculativeTableTmp.put(kv.getKey(), kv);
529                                         }
530
531                                         // create the commit
532                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
533                                 } else {
534                                         // Guard was false
535
536                                         // create the abort
537                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
538                                 }
539                         } catch (Exception e) {
540                                 e.printStackTrace();
541                         }
542
543                         if ((newEntry != null) && s.hasSpace(newEntry)) {
544                                 s.addEntry(newEntry);
545                         } else {
546                                 break;
547                         }
548                 }
549
550                 Transaction trans = new Transaction(s,
551                                                     s.getSequenceNumber(),
552                                                     localmachineid,
553                                                     pendingTrans.getKVUpdates(),
554                                                     pendingTrans.getGuard());
555                 boolean insertedTrans = false;
556                 if (s.hasSpace(trans)) {
557                         s.addEntry(trans);
558                         insertedTrans = true;
559                 }
560
561                 /* now go through live entries from least to greatest sequence number until
562                  * either all live slots added, or the slot doesn't have enough room
563                  * for SKIP_THRESHOLD consecutive entries*/
564                 int skipcount = 0;
565                 search:
566                 for (; seqn <= newestseqnum; seqn++) {
567                         Slot prevslot = buffer.getSlot(seqn);
568                         //Push slot number forward
569                         if (!seenliveslot)
570                                 lastliveslotseqn = seqn;
571
572                         if (!prevslot.isLive())
573                                 continue;
574                         seenliveslot = true;
575                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
576                         for (Entry liveentry : liveentries) {
577                                 if (s.hasSpace(liveentry))
578                                         s.addEntry(liveentry);
579                                 else {
580                                         skipcount++;
581                                         if (skipcount > SKIP_THRESHOLD)
582                                                 break search;
583                                 }
584                         }
585                 }
586
587                 int max = 0;
588                 if (resize)
589                         max = newsize;
590                 Slot[] array = cloud.putSlot(s, max);
591                 if (array == null) {
592                         array = new Slot[] {s};
593                         rejectedmessagelist.clear();
594                 }       else {
595                         if (array.length == 0)
596                                 throw new Error("Server Error: Did not send any slots");
597                         rejectedmessagelist.add(s.getSequenceNumber());
598                         insertedTrans = false;
599                 }
600
601                 /* update data structure */
602                 validateandupdate(array, true);
603
604                 return insertedTrans;
605         }
606
607         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
608                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
609                 int newsize = 0;
610                 if (liveslotcount > resizethreshold) {
611                         resize = true; //Resize is forced
612                 }
613
614                 if (resize) {
615                         newsize = (int) (numslots * RESIZE_MULTIPLE);
616                         TableStatus status = new TableStatus(s, newsize);
617                         s.addEntry(status);
618                 }
619
620                 if (! rejectedmessagelist.isEmpty()) {
621                         /* TODO: We should avoid generating a rejected message entry if
622                          * there is already a sufficient entry in the queue (e.g.,
623                          * equalsto value of true and same sequence number).  */
624
625                         long old_seqn = rejectedmessagelist.firstElement();
626                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
627                                 long new_seqn = rejectedmessagelist.lastElement();
628                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
629                                 s.addEntry(rm);
630                         } else {
631                                 long prev_seqn = -1;
632                                 int i = 0;
633                                 /* Go through list of missing messages */
634                                 for (; i < rejectedmessagelist.size(); i++) {
635                                         long curr_seqn = rejectedmessagelist.get(i);
636                                         Slot s_msg = buffer.getSlot(curr_seqn);
637                                         if (s_msg != null)
638                                                 break;
639                                         prev_seqn = curr_seqn;
640                                 }
641                                 /* Generate rejected message entry for missing messages */
642                                 if (prev_seqn != -1) {
643                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
644                                         s.addEntry(rm);
645                                 }
646                                 /* Generate rejected message entries for present messages */
647                                 for (; i < rejectedmessagelist.size(); i++) {
648                                         long curr_seqn = rejectedmessagelist.get(i);
649                                         Slot s_msg = buffer.getSlot(curr_seqn);
650                                         long machineid = s_msg.getMachineID();
651                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
652                                         s.addEntry(rm);
653                                 }
654                         }
655                 }
656
657                 long newestseqnum = buffer.getNewestSeqNum();
658                 long oldestseqnum = buffer.getOldestSeqNum();
659                 if (lastliveslotseqn < oldestseqnum)
660                         lastliveslotseqn = oldestseqnum;
661
662                 long seqn = lastliveslotseqn;
663                 boolean seenliveslot = false;
664                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
665                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
666
667
668                 // Mandatory Rescue
669                 for (; seqn < threshold; seqn++) {
670                         Slot prevslot = buffer.getSlot(seqn);
671                         //Push slot number forward
672                         if (! seenliveslot)
673                                 lastliveslotseqn = seqn;
674
675                         if (! prevslot.isLive())
676                                 continue;
677                         seenliveslot = true;
678                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
679                         for (Entry liveentry : liveentries) {
680                                 if (s.hasSpace(liveentry)) {
681                                         s.addEntry(liveentry);
682                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
683                                         if (!resize) {
684                                                 System.out.print("B"); //?
685                                                 return tryput(keyName, arbMachineid, true);
686                                         }
687                                 }
688                         }
689                 }
690
691
692                 // Arbitrate
693                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
694                 for (Transaction ut : uncommittedTransactionsList) {
695
696                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
697                         // Check if this machine arbitrates for this transaction
698                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
699                                 continue;
700                         }
701
702                         Entry newEntry = null;
703
704                         try {
705                                 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
706                                         // Guard evaluated as true
707
708                                         // update the local tmp current key set
709                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
710                                                 speculativeTableTmp.put(kv.getKey(), kv);
711                                         }
712
713                                         // create the commit
714                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
715                                 } else {
716                                         // Guard was false
717
718                                         // create the abort
719                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
720                                 }
721                         } catch (Exception e) {
722                                 e.printStackTrace();
723                         }
724
725                         if ((newEntry != null) && s.hasSpace(newEntry)) {
726                                 s.addEntry(newEntry);
727                         } else {
728                                 break;
729                         }
730                 }
731
732
733                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
734
735                 boolean insertedNewKey = false;
736                 if (s.hasSpace(newKey)) {
737                         s.addEntry(newKey);
738                         insertedNewKey = true;
739                 }
740
741                 /* now go through live entries from least to greatest sequence number until
742                  * either all live slots added, or the slot doesn't have enough room
743                  * for SKIP_THRESHOLD consecutive entries*/
744                 int skipcount = 0;
745                 search:
746                 for (; seqn <= newestseqnum; seqn++) {
747                         Slot prevslot = buffer.getSlot(seqn);
748                         //Push slot number forward
749                         if (!seenliveslot)
750                                 lastliveslotseqn = seqn;
751
752                         if (!prevslot.isLive())
753                                 continue;
754                         seenliveslot = true;
755                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
756                         for (Entry liveentry : liveentries) {
757                                 if (s.hasSpace(liveentry))
758                                         s.addEntry(liveentry);
759                                 else {
760                                         skipcount++;
761                                         if (skipcount > SKIP_THRESHOLD)
762                                                 break search;
763                                 }
764                         }
765                 }
766
767                 int max = 0;
768                 if (resize)
769                         max = newsize;
770                 Slot[] array = cloud.putSlot(s, max);
771                 if (array == null) {
772                         array = new Slot[] {s};
773                         rejectedmessagelist.clear();
774                 }       else {
775                         if (array.length == 0)
776                                 throw new Error("Server Error: Did not send any slots");
777                         rejectedmessagelist.add(s.getSequenceNumber());
778                         insertedNewKey = false;
779                 }
780
781                 /* update data structure */
782                 validateandupdate(array, true);
783
784                 return insertedNewKey;
785         }
786
787         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
788                 /* The cloud communication layer has checked slot HMACs already
789                          before decoding */
790                 if (newslots.length == 0) return;
791
792                 long firstseqnum = newslots[0].getSequenceNumber();
793                 if (firstseqnum <= sequencenumber) {
794                         throw new Error("Server Error: Sent older slots!");
795                 }
796
797                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
798                 checkHMACChain(indexer, newslots);
799
800                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
801
802                 initExpectedSize(firstseqnum);
803                 for (Slot slot : newslots) {
804                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
805                         updateExpectedSize();
806                 }
807
808                 /* If there is a gap, check to see if the server sent us everything. */
809                 if (firstseqnum != (sequencenumber + 1)) {
810
811                         // TODO: Check size
812                         checkNumSlots(newslots.length);
813                         if (!machineSet.isEmpty()) {
814                                 throw new Error("Missing record for machines: " + machineSet);
815                         }
816                 }
817
818                 commitNewMaxSize();
819
820                 /* Commit new to slots. */
821                 for (Slot slot : newslots) {
822                         buffer.putSlot(slot);
823                         liveslotcount++;
824                 }
825                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
826
827                 // Speculate on key value pairs
828                 createSpeculativeTable();
829         }
830
831         private void createSpeculativeTable() {
832                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
833
834                 for (Transaction trans : uncommittedTransactionsList) {
835
836                         try {
837                                 if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
838                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
839                                                 speculativeTableTmp.put(kv.getKey(), kv);
840                                         }
841                                 }
842
843                         } catch (Exception e) {
844                                 e.printStackTrace();
845                         }
846                 }
847
848                 speculativeTable = speculativeTableTmp;
849         }
850
851         private int expectedsize, currmaxsize;
852
853         private void checkNumSlots(int numslots) {
854                 if (numslots != expectedsize) {
855                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
856                 }
857         }
858
859         private void initExpectedSize(long firstsequencenumber) {
860                 long prevslots = firstsequencenumber;
861                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
862                 currmaxsize = numslots;
863         }
864
865         private void updateExpectedSize() {
866                 expectedsize++;
867                 if (expectedsize > currmaxsize) {
868                         expectedsize = currmaxsize;
869                 }
870         }
871
872         private void updateCurrMaxSize(int newmaxsize) {
873                 currmaxsize = newmaxsize;
874         }
875
876         private void commitNewMaxSize() {
877                 if (numslots != currmaxsize)
878                         buffer.resize(currmaxsize);
879
880                 numslots = currmaxsize;
881                 setResizeThreshold();
882         }
883
884
885
886
887
888         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
889                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
890         }
891
892         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
893                 long oldseqnum = entry.getOldSeqNum();
894                 long newseqnum = entry.getNewSeqNum();
895                 boolean isequal = entry.getEqual();
896                 long machineid = entry.getMachineID();
897                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
898                         Slot slot = indexer.getSlot(seqnum);
899                         if (slot != null) {
900                                 long slotmachineid = slot.getMachineID();
901                                 if (isequal != (slotmachineid == machineid)) {
902                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
903                                 }
904                         }
905                 }
906
907                 HashSet<Long> watchset = new HashSet<Long>();
908                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
909                         long entry_mid = lastmsg_entry.getKey();
910                         /* We've seen it, don't need to continue to watch.  Our next
911                          * message will implicitly acknowledge it. */
912                         if (entry_mid == localmachineid)
913                                 continue;
914                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
915                         long entry_seqn = v.getFirst();
916                         if (entry_seqn < newseqnum) {
917                                 addWatchList(entry_mid, entry);
918                                 watchset.add(entry_mid);
919                         }
920                 }
921                 if (watchset.isEmpty())
922                         entry.setDead();
923                 else
924                         entry.setWatchSet(watchset);
925         }
926
927         private void processEntry(NewKey entry) {
928                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
929         }
930
931         private void processEntry(Transaction entry) {
932                 uncommittedTransactionsList.add(entry);
933         }
934
935         private void processEntry(Abort entry) {
936
937
938                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
939                         // Abort has not been seen yet so we need to keep track of it
940                         abortSet.add(entry);
941                 } else {
942                         // The machine already saw this so it is dead
943                         entry.setDead();
944                 }
945
946                 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
947                         Transaction prevtrans = i.next();
948                         if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
949                                 uncommittedTransactionsList.remove(prevtrans);
950                                 prevtrans.setDead();
951                                 return;
952                         }
953                 }
954         }
955
956         private void processEntry(Commit entry) {
957
958                 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
959                         Commit prevcommit = i.next();
960                         prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
961
962                         if (!prevcommit.isLive()) {
963                                 commitList.remove(prevcommit);
964                         }
965                 }
966
967                 commitList.add(entry);
968
969                 // Update the committed table list
970                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
971                         IoTString key = kv.getKey();
972                         commitedTable.put(key, kv);
973                 }
974
975                 long committedTransSeq = entry.getTransSequenceNumber();
976
977                 // Make dead the transactions
978                 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
979                         Transaction prevtrans = i.next();
980
981                         if (prevtrans.getSequenceNumber() <= committedTransSeq) {
982                                 uncommittedTransactionsList.remove(prevtrans);
983                                 prevtrans.setDead();
984                         }
985                 }
986         }
987
988         private void processEntry(TableStatus entry) {
989                 int newnumslots = entry.getMaxSlots();
990                 updateCurrMaxSize(newnumslots);
991                 if (lastTableStatus != null)
992                         lastTableStatus.setDead();
993                 lastTableStatus = entry;
994         }
995
996
997         private void addWatchList(long machineid, RejectedMessage entry) {
998                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
999                 if (entries == null)
1000                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1001                 entries.add(entry);
1002         }
1003
1004         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1005                 machineSet.remove(machineid);
1006
1007                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1008                 if (watchset != null) {
1009                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1010                                 RejectedMessage rm = rmit.next();
1011                                 if (rm.getNewSeqNum() <= seqnum) {
1012                                         /* Remove it from our watchlist */
1013                                         rmit.remove();
1014                                         /* Decrement machines that need to see this notification */
1015                                         rm.removeWatcher(machineid);
1016                                 }
1017                         }
1018                 }
1019
1020                 if (machineid == localmachineid) {
1021                         /* Our own messages are immediately dead. */
1022                         if (liveness instanceof LastMessage) {
1023                                 ((LastMessage)liveness).setDead();
1024                         } else if (liveness instanceof Slot) {
1025                                 ((Slot)liveness).setDead();
1026                         } else {
1027                                 throw new Error("Unrecognized type");
1028                         }
1029                 }
1030
1031                 // Set dead the abort
1032                 for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
1033                         Abort abort = ait.next();
1034
1035                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1036                                 abort.setDead();
1037                                 ait.remove();
1038                         }
1039                 }
1040
1041
1042                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1043                 if (lastmsgentry == null)
1044                         return;
1045
1046                 long lastmsgseqnum = lastmsgentry.getFirst();
1047                 Liveness lastentry = lastmsgentry.getSecond();
1048                 if (machineid != localmachineid) {
1049                         if (lastentry instanceof LastMessage) {
1050                                 ((LastMessage)lastentry).setDead();
1051                         } else if (lastentry instanceof Slot) {
1052                                 ((Slot)lastentry).setDead();
1053                         } else {
1054                                 throw new Error("Unrecognized type");
1055                         }
1056                 }
1057
1058                 if (machineid == localmachineid) {
1059                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1060                                 throw new Error("Server Error: Mismatch on local machine sequence number");
1061                 } else {
1062                         if (lastmsgseqnum > seqnum)
1063                                 throw new Error("Server Error: Rollback on remote machine sequence number");
1064                 }
1065         }
1066
1067         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1068                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1069                 for (Entry entry : slot.getEntries()) {
1070                         switch (entry.getType()) {
1071
1072                         case Entry.TypeNewKey:
1073                                 processEntry((NewKey)entry);
1074                                 break;
1075
1076                         case Entry.TypeCommit:
1077                                 processEntry((Commit)entry);
1078                                 break;
1079
1080                         case Entry.TypeAbort:
1081                                 processEntry((Abort)entry);
1082                                 break;
1083
1084                         case Entry.TypeTransaction:
1085                                 processEntry((Transaction)entry);
1086                                 break;
1087
1088                         case Entry.TypeLastMessage:
1089                                 processEntry((LastMessage)entry, machineSet);
1090                                 break;
1091
1092                         case Entry.TypeRejectedMessage:
1093                                 processEntry((RejectedMessage)entry, indexer);
1094                                 break;
1095
1096                         case Entry.TypeTableStatus:
1097                                 processEntry((TableStatus)entry);
1098                                 break;
1099
1100                         default:
1101                                 throw new Error("Unrecognized type: " + entry.getType());
1102                         }
1103                 }
1104         }
1105
1106         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1107                 for (int i = 0; i < newslots.length; i++) {
1108                         Slot currslot = newslots[i];
1109                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1110                         if (prevslot != null &&
1111                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1112                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
1113                 }
1114         }
1115 }