Initial Working version of IoTCloudv2, needs more testing
[iotcloud.git] / src2 / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.List;
12 import java.util.Set;
13 import java.util.Collection;
14
15 /**
16  * IoTTable data structure.  Provides client inferface.
17  * @author Brian Demsky
18  * @version 1.0
19  */
20
21 final public class Table {
22         private int numslots;   //number of slots stored in buffer
23
24         //table of key-value pairs
25         //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
26
27         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
28         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
29         // machine id -> ...
30         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
31         private Vector<Long> rejectedmessagelist = new Vector<Long>();
32         private SlotBuffer buffer;
33         private CloudComm cloud;
34         private long sequencenumber; //Largest sequence number a client has received
35         private long localmachineid;
36         private TableStatus lastTableStatus;
37         static final int FREE_SLOTS = 10; //number of slots that should be kept free
38         static final int SKIP_THRESHOLD = 10;
39         private long liveslotcount = 0;
40         private int chance;
41         static final double RESIZE_MULTIPLE = 1.2;
42         static final double RESIZE_THRESHOLD = 0.75;
43         static final int REJECTED_THRESHOLD = 5;
44         private int resizethreshold;
45         private long lastliveslotseqn;  //smallest sequence number with a live entry
46         private Random random = new Random();
47
48         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
49         private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
50         private List<Commit> commitList = null; // List of all the most recent live commits
51         private Set<Abort> abortSet = null; // Set of the live aborts
52         private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
53         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
54         private List<Transaction> uncommittedTransactionsList = null; //
55         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
56         // private Set<Abort> arbitratorTable = null; // Table of arbitrators
57
58
59         public Table(String baseurl, String password, long _localmachineid) {
60                 localmachineid = _localmachineid;
61                 buffer = new SlotBuffer();
62                 numslots = buffer.capacity();
63                 setResizeThreshold();
64                 sequencenumber = 0;
65                 cloud = new CloudComm(this, baseurl, password);
66                 lastliveslotseqn = 1;
67
68                 pendingTransQueue = new LinkedList<PendingTransaction>();
69                 commitList = new LinkedList<Commit>();
70                 abortSet = new HashSet<Abort>();
71                 commitedTable = new HashMap<IoTString, KeyValue>();
72                 speculativeTable = new HashMap<IoTString, KeyValue>();
73                 uncommittedTransactionsList = new LinkedList<Transaction>();
74                 arbitratorTable = new HashMap<IoTString, Long>();
75         }
76
77         public Table(CloudComm _cloud, long _localmachineid) {
78                 localmachineid = _localmachineid;
79                 buffer = new SlotBuffer();
80                 numslots = buffer.capacity();
81                 setResizeThreshold();
82                 sequencenumber = 0;
83                 cloud = _cloud;
84
85                 pendingTransQueue = new LinkedList<PendingTransaction>();
86                 commitList = new LinkedList<Commit>();
87                 abortSet = new HashSet<Abort>();
88                 commitedTable = new HashMap<IoTString, KeyValue>();
89                 speculativeTable = new HashMap<IoTString, KeyValue>();
90                 uncommittedTransactionsList = new LinkedList<Transaction>();
91                 arbitratorTable = new HashMap<IoTString, Long>();
92         }
93
94         public void rebuild() {
95                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
96                 validateandupdate(newslots, true);
97         }
98
99
100
101         public IoTString getCommitted(IoTString key) {
102                 KeyValue kv = commitedTable.get(key);
103                 if (kv != null) {
104                         return kv.getValue();
105                 } else {
106                         return null;
107                 }
108         }
109
110         public IoTString getSpeculative(IoTString key) {
111                 KeyValue kv = speculativeTable.get(key);
112                 if (kv != null) {
113                         return kv.getValue();
114                 } else {
115                         return null;
116                 }
117         }
118
119
120         public void initTable() {
121                 cloud.setSalt();//Set the salt
122                 Slot s = new Slot(this, 1, localmachineid);
123                 TableStatus status = new TableStatus(s, numslots);
124                 s.addEntry(status);
125                 Slot[] array = cloud.putSlot(s, numslots);
126                 if (array == null) {
127                         array = new Slot[] {s};
128                         /* update data structure */
129                         validateandupdate(array, true);
130                 } else {
131                         throw new Error("Error on initialization");
132                 }
133         }
134
135         public String toString() {
136
137
138                 String retString = " Committed Table: \n";
139                 retString += "---------------------------\n";
140                 retString += commitedTable.toString();
141
142                 retString += "\n\n";
143
144                 retString += " Speculative Table: \n";
145                 retString += "---------------------------\n";
146                 retString += speculativeTable.toString();
147
148                 return retString;
149         }
150
151
152
153
154
155
156         public void startTransaction() {
157                 // Create a new transaction, invalidates any old pending transactions.
158                 pendingTransBuild = new PendingTransaction();
159         }
160
161         public void commitTransaction() {
162
163                 if (pendingTransBuild.getKVUpdates().size() == 0) {
164                         // If no updates are made then there is no point inserting into the chain
165                         return;
166                 }
167
168                 // Add the pending transaction to the queue
169                 pendingTransQueue.add(pendingTransBuild);
170
171                 while (!pendingTransQueue.isEmpty()) {
172                         if (tryput( pendingTransQueue.peek(), false)) {
173                                 pendingTransQueue.poll();
174                         }
175                 }
176         }
177
178         public void addKV(IoTString key, IoTString value) {
179
180                 // Make sure new key value pair matches the current arbitrator
181                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
182                         // TODO: Maybe not throw and error
183                         throw new Error("Not all Key Values match");
184                 }
185
186
187
188                 KeyValue kv = new KeyValue(key, value);
189                 pendingTransBuild.addKV(kv);
190         }
191
192         public void addGuard(Guard guard) {
193                 pendingTransBuild.addGuard(guard);
194         }
195
196         public void update() {
197                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
198
199                 validateandupdate(newslots, false);
200
201                 if (uncommittedTransactionsList.size() > 0) {
202                         List<Transaction> uncommittedTransArb = new LinkedList<Transaction>();
203
204                         for (Transaction ut : uncommittedTransactionsList) {
205                                 KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]);
206                                 long arb = arbitratorTable.get(kv.getKey());
207
208                                 if (arb == localmachineid) {
209                                         uncommittedTransArb.add(ut);
210                                 }
211                         }
212
213
214                         boolean needResize = false;
215                         while (uncommittedTransArb.size() > 0) {
216                                 boolean resize = needResize;
217                                 needResize = false;
218
219                                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
220                                 int newsize = 0;
221                                 if (liveslotcount > resizethreshold) {
222                                         resize = true; //Resize is forced
223                                 }
224
225                                 if (resize) {
226                                         newsize = (int) (numslots * RESIZE_MULTIPLE);
227                                         TableStatus status = new TableStatus(s, newsize);
228                                         s.addEntry(status);
229                                 }
230
231                                 if (! rejectedmessagelist.isEmpty()) {
232                                         /* TODO: We should avoid generating a rejected message entry if
233                                          * there is already a sufficient entry in the queue (e.g.,
234                                          * equalsto value of true and same sequence number).  */
235
236                                         long old_seqn = rejectedmessagelist.firstElement();
237                                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
238                                                 long new_seqn = rejectedmessagelist.lastElement();
239                                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
240                                                 s.addEntry(rm);
241                                         } else {
242                                                 long prev_seqn = -1;
243                                                 int i = 0;
244                                                 /* Go through list of missing messages */
245                                                 for (; i < rejectedmessagelist.size(); i++) {
246                                                         long curr_seqn = rejectedmessagelist.get(i);
247                                                         Slot s_msg = buffer.getSlot(curr_seqn);
248                                                         if (s_msg != null)
249                                                                 break;
250                                                         prev_seqn = curr_seqn;
251                                                 }
252                                                 /* Generate rejected message entry for missing messages */
253                                                 if (prev_seqn != -1) {
254                                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
255                                                         s.addEntry(rm);
256                                                 }
257                                                 /* Generate rejected message entries for present messages */
258                                                 for (; i < rejectedmessagelist.size(); i++) {
259                                                         long curr_seqn = rejectedmessagelist.get(i);
260                                                         Slot s_msg = buffer.getSlot(curr_seqn);
261                                                         long machineid = s_msg.getMachineID();
262                                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
263                                                         s.addEntry(rm);
264                                                 }
265                                         }
266                                 }
267
268                                 long newestseqnum = buffer.getNewestSeqNum();
269                                 long oldestseqnum = buffer.getOldestSeqNum();
270                                 if (lastliveslotseqn < oldestseqnum) {
271                                         lastliveslotseqn = oldestseqnum;
272                                 }
273
274                                 long seqn = lastliveslotseqn;
275                                 boolean seenliveslot = false;
276                                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
277                                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
278
279                                 boolean tryAgain = false;
280
281                                 // Mandatory Rescue
282                                 for (; seqn < threshold; seqn++) {
283                                         Slot prevslot = buffer.getSlot(seqn);
284                                         //Push slot number forward
285                                         if (! seenliveslot)
286                                                 lastliveslotseqn = seqn;
287
288                                         if (! prevslot.isLive())
289                                                 continue;
290                                         seenliveslot = true;
291                                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
292                                         for (Entry liveentry : liveentries) {
293                                                 if (s.hasSpace(liveentry)) {
294                                                         s.addEntry(liveentry);
295                                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
296                                                         if (!resize) {
297                                                                 System.out.print("B"); //?
298                                                                 tryAgain = true;
299                                                                 needResize = true;
300                                                         }
301                                                 }
302                                         }
303                                 }
304
305                                 if (tryAgain) {
306                                         continue;
307                                 }
308
309                                 // Arbitrate
310                                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
311                                 for (Iterator<Transaction> i = uncommittedTransArb.iterator(); i.hasNext();) {
312                                         Transaction ut = i.next();
313
314                                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
315                                         // Check if this machine arbitrates for this transaction
316                                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
317                                                 continue;
318                                         }
319
320                                         Entry newEntry = null;
321
322                                         try {
323                                                 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
324                                                         // Guard evaluated as true
325
326                                                         // update the local tmp current key set
327                                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
328                                                                 speculativeTableTmp.put(kv.getKey(), kv);
329                                                         }
330
331                                                         // create the commit
332                                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
333                                                 } else {
334                                                         // Guard was false
335
336                                                         // create the abort
337                                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
338                                                 }
339                                         } catch (Exception e) {
340                                                 e.printStackTrace();
341                                         }
342
343                                         if ((newEntry != null) && s.hasSpace(newEntry)) {
344                                                 s.addEntry(newEntry);
345                                                 i.remove();
346
347                                         } else {
348                                                 break;
349                                         }
350                                 }
351
352                                 /* now go through live entries from least to greatest sequence number until
353                                  * either all live slots added, or the slot doesn't have enough room
354                                  * for SKIP_THRESHOLD consecutive entries*/
355                                 int skipcount = 0;
356                                 search:
357                                 for (; seqn <= newestseqnum; seqn++) {
358                                         Slot prevslot = buffer.getSlot(seqn);
359                                         //Push slot number forward
360                                         if (!seenliveslot)
361                                                 lastliveslotseqn = seqn;
362
363                                         if (!prevslot.isLive())
364                                                 continue;
365                                         seenliveslot = true;
366                                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
367                                         for (Entry liveentry : liveentries) {
368                                                 if (s.hasSpace(liveentry))
369                                                         s.addEntry(liveentry);
370                                                 else {
371                                                         skipcount++;
372                                                         if (skipcount > SKIP_THRESHOLD)
373                                                                 break search;
374                                                 }
375                                         }
376                                 }
377
378                                 int max = 0;
379                                 if (resize)
380                                         max = newsize;
381                                 Slot[] array = cloud.putSlot(s, max);
382                                 if (array == null) {
383                                         array = new Slot[] {s};
384                                         rejectedmessagelist.clear();
385                                 }       else {
386                                         if (array.length == 0)
387                                                 throw new Error("Server Error: Did not send any slots");
388                                         rejectedmessagelist.add(s.getSequenceNumber());
389                                 }
390
391                                 /* update data structure */
392                                 validateandupdate(array, true);
393                         }
394
395
396                 }
397         }
398
399         public boolean createNewKey(IoTString keyName, long machineId) {
400
401                 while (true) {
402                         if (arbitratorTable.get(keyName) != null) {
403                                 // There is already an arbitrator
404                                 return false;
405                         }
406
407                         if (tryput(keyName, machineId, false)) {
408
409                                 // If successfully inserted
410                                 return true;
411                         }
412                 }
413         }
414
415         void decrementLiveCount() {
416                 liveslotcount--;
417         }
418
419
420         private void setResizeThreshold() {
421                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
422                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
423         }
424
425         private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
426                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
427                 int newsize = 0;
428                 if (liveslotcount > resizethreshold) {
429                         resize = true; //Resize is forced
430                 }
431
432                 if (resize) {
433                         newsize = (int) (numslots * RESIZE_MULTIPLE);
434                         TableStatus status = new TableStatus(s, newsize);
435                         s.addEntry(status);
436                 }
437
438                 if (! rejectedmessagelist.isEmpty()) {
439                         /* TODO: We should avoid generating a rejected message entry if
440                          * there is already a sufficient entry in the queue (e.g.,
441                          * equalsto value of true and same sequence number).  */
442
443                         long old_seqn = rejectedmessagelist.firstElement();
444                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
445                                 long new_seqn = rejectedmessagelist.lastElement();
446                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
447                                 s.addEntry(rm);
448                         } else {
449                                 long prev_seqn = -1;
450                                 int i = 0;
451                                 /* Go through list of missing messages */
452                                 for (; i < rejectedmessagelist.size(); i++) {
453                                         long curr_seqn = rejectedmessagelist.get(i);
454                                         Slot s_msg = buffer.getSlot(curr_seqn);
455                                         if (s_msg != null)
456                                                 break;
457                                         prev_seqn = curr_seqn;
458                                 }
459                                 /* Generate rejected message entry for missing messages */
460                                 if (prev_seqn != -1) {
461                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
462                                         s.addEntry(rm);
463                                 }
464                                 /* Generate rejected message entries for present messages */
465                                 for (; i < rejectedmessagelist.size(); i++) {
466                                         long curr_seqn = rejectedmessagelist.get(i);
467                                         Slot s_msg = buffer.getSlot(curr_seqn);
468                                         long machineid = s_msg.getMachineID();
469                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
470                                         s.addEntry(rm);
471                                 }
472                         }
473                 }
474
475                 long newestseqnum = buffer.getNewestSeqNum();
476                 long oldestseqnum = buffer.getOldestSeqNum();
477                 if (lastliveslotseqn < oldestseqnum)
478                         lastliveslotseqn = oldestseqnum;
479
480                 long seqn = lastliveslotseqn;
481                 boolean seenliveslot = false;
482                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
483                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
484
485
486                 // Mandatory Rescue
487                 for (; seqn < threshold; seqn++) {
488                         Slot prevslot = buffer.getSlot(seqn);
489                         //Push slot number forward
490                         if (! seenliveslot)
491                                 lastliveslotseqn = seqn;
492
493                         if (! prevslot.isLive())
494                                 continue;
495                         seenliveslot = true;
496                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
497                         for (Entry liveentry : liveentries) {
498                                 if (s.hasSpace(liveentry)) {
499                                         s.addEntry(liveentry);
500                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
501                                         if (!resize) {
502                                                 System.out.print("B"); //?
503                                                 return tryput(pendingTrans, true);
504                                         }
505                                 }
506                         }
507                 }
508
509
510                 // Arbitrate
511                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
512                 for (Transaction ut : uncommittedTransactionsList) {
513
514                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
515                         // Check if this machine arbitrates for this transaction
516                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
517                                 continue;
518                         }
519
520                         Entry newEntry = null;
521
522                         try {
523                                 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
524                                         // Guard evaluated as true
525
526                                         // update the local tmp current key set
527                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
528                                                 speculativeTableTmp.put(kv.getKey(), kv);
529                                         }
530
531                                         // create the commit
532                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
533                                 } else {
534                                         // Guard was false
535
536                                         // create the abort
537                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
538                                 }
539                         } catch (Exception e) {
540                                 e.printStackTrace();
541                         }
542
543                         if ((newEntry != null) && s.hasSpace(newEntry)) {
544                                 s.addEntry(newEntry);
545                         } else {
546                                 break;
547                         }
548                 }
549
550                 Transaction trans = new Transaction(s,
551                                                     s.getSequenceNumber(),
552                                                     localmachineid,
553                                                     pendingTrans.getKVUpdates(),
554                                                     pendingTrans.getGuard());
555                 boolean insertedTrans = false;
556                 if (s.hasSpace(trans)) {
557                         s.addEntry(trans);
558                         insertedTrans = true;
559                 }
560
561                 /* now go through live entries from least to greatest sequence number until
562                  * either all live slots added, or the slot doesn't have enough room
563                  * for SKIP_THRESHOLD consecutive entries*/
564                 int skipcount = 0;
565                 search:
566                 for (; seqn <= newestseqnum; seqn++) {
567                         Slot prevslot = buffer.getSlot(seqn);
568                         //Push slot number forward
569                         if (!seenliveslot)
570                                 lastliveslotseqn = seqn;
571
572                         if (!prevslot.isLive())
573                                 continue;
574                         seenliveslot = true;
575                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
576                         for (Entry liveentry : liveentries) {
577                                 if (s.hasSpace(liveentry))
578                                         s.addEntry(liveentry);
579                                 else {
580                                         skipcount++;
581                                         if (skipcount > SKIP_THRESHOLD)
582                                                 break search;
583                                 }
584                         }
585                 }
586
587                 int max = 0;
588                 if (resize)
589                         max = newsize;
590                 Slot[] array = cloud.putSlot(s, max);
591                 if (array == null) {
592                         array = new Slot[] {s};
593                         rejectedmessagelist.clear();
594                 }       else {
595                         if (array.length == 0)
596                                 throw new Error("Server Error: Did not send any slots");
597                         rejectedmessagelist.add(s.getSequenceNumber());
598                         insertedTrans = false;
599                 }
600
601                 /* update data structure */
602                 validateandupdate(array, true);
603
604                 return insertedTrans;
605         }
606
607         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
608                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
609                 int newsize = 0;
610                 if (liveslotcount > resizethreshold) {
611                         resize = true; //Resize is forced
612                 }
613
614                 if (resize) {
615                         newsize = (int) (numslots * RESIZE_MULTIPLE);
616                         TableStatus status = new TableStatus(s, newsize);
617                         s.addEntry(status);
618                 }
619
620                 if (! rejectedmessagelist.isEmpty()) {
621                         /* TODO: We should avoid generating a rejected message entry if
622                          * there is already a sufficient entry in the queue (e.g.,
623                          * equalsto value of true and same sequence number).  */
624
625                         long old_seqn = rejectedmessagelist.firstElement();
626                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
627                                 long new_seqn = rejectedmessagelist.lastElement();
628                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
629                                 s.addEntry(rm);
630                         } else {
631                                 long prev_seqn = -1;
632                                 int i = 0;
633                                 /* Go through list of missing messages */
634                                 for (; i < rejectedmessagelist.size(); i++) {
635                                         long curr_seqn = rejectedmessagelist.get(i);
636                                         Slot s_msg = buffer.getSlot(curr_seqn);
637                                         if (s_msg != null)
638                                                 break;
639                                         prev_seqn = curr_seqn;
640                                 }
641                                 /* Generate rejected message entry for missing messages */
642                                 if (prev_seqn != -1) {
643                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
644                                         s.addEntry(rm);
645                                 }
646                                 /* Generate rejected message entries for present messages */
647                                 for (; i < rejectedmessagelist.size(); i++) {
648                                         long curr_seqn = rejectedmessagelist.get(i);
649                                         Slot s_msg = buffer.getSlot(curr_seqn);
650                                         long machineid = s_msg.getMachineID();
651                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
652                                         s.addEntry(rm);
653                                 }
654                         }
655                 }
656
657                 long newestseqnum = buffer.getNewestSeqNum();
658                 long oldestseqnum = buffer.getOldestSeqNum();
659                 if (lastliveslotseqn < oldestseqnum)
660                         lastliveslotseqn = oldestseqnum;
661
662                 long seqn = lastliveslotseqn;
663                 boolean seenliveslot = false;
664                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
665                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
666
667
668                 // Mandatory Rescue
669                 for (; seqn < threshold; seqn++) {
670                         Slot prevslot = buffer.getSlot(seqn);
671                         //Push slot number forward
672                         if (! seenliveslot)
673                                 lastliveslotseqn = seqn;
674
675                         if (! prevslot.isLive())
676                                 continue;
677                         seenliveslot = true;
678                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
679                         for (Entry liveentry : liveentries) {
680                                 if (s.hasSpace(liveentry)) {
681                                         s.addEntry(liveentry);
682                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
683                                         if (!resize) {
684                                                 System.out.print("B"); //?
685                                                 return tryput(keyName, arbMachineid, true);
686                                         }
687                                 }
688                         }
689                 }
690
691
692                 // // Arbitrate
693                 // Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
694                 // for (Transaction ut : uncommittedTransactionsList) {
695
696                 //      KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
697                 //      // Check if this machine arbitrates for this transaction
698                 //      if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
699                 //              continue;
700                 //      }
701
702                 //      Entry newEntry = null;
703
704                 //      try {
705                 //              if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
706                 //                      // Guard evaluated as true
707
708                 //                      // update the local tmp current key set
709                 //                      for (KeyValue kv : ut.getkeyValueUpdateSet()) {
710                 //                              speculativeTableTmp.put(kv.getKey(), kv);
711                 //                      }
712
713                 //                      // create the commit
714                 //                      newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
715                 //              } else {
716                 //                      // Guard was false
717
718                 //                      // create the abort
719                 //                      newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
720                 //              }
721                 //      } catch (Exception e) {
722                 //              e.printStackTrace();
723                 //      }
724
725                 //      if ((newEntry != null) && s.hasSpace(newEntry)) {
726
727                 //              // TODO: Remove print
728                 //              System.out.println("Arbitrating...");
729                 //              s.addEntry(newEntry);
730                 //      } else {
731                 //              break;
732                 //      }
733                 // }
734
735
736                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
737
738                 boolean insertedNewKey = false;
739                 if (s.hasSpace(newKey)) {
740                         s.addEntry(newKey);
741                         insertedNewKey = true;
742                 }
743
744                 /* now go through live entries from least to greatest sequence number until
745                  * either all live slots added, or the slot doesn't have enough room
746                  * for SKIP_THRESHOLD consecutive entries*/
747                 int skipcount = 0;
748                 search:
749                 for (; seqn <= newestseqnum; seqn++) {
750                         Slot prevslot = buffer.getSlot(seqn);
751                         //Push slot number forward
752                         if (!seenliveslot)
753                                 lastliveslotseqn = seqn;
754
755                         if (!prevslot.isLive())
756                                 continue;
757                         seenliveslot = true;
758                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
759                         for (Entry liveentry : liveentries) {
760                                 if (s.hasSpace(liveentry))
761                                         s.addEntry(liveentry);
762                                 else {
763                                         skipcount++;
764                                         if (skipcount > SKIP_THRESHOLD)
765                                                 break search;
766                                 }
767                         }
768                 }
769
770                 int max = 0;
771                 if (resize)
772                         max = newsize;
773                 Slot[] array = cloud.putSlot(s, max);
774                 if (array == null) {
775                         array = new Slot[] {s};
776                         rejectedmessagelist.clear();
777                 }       else {
778                         if (array.length == 0)
779                                 throw new Error("Server Error: Did not send any slots");
780                         rejectedmessagelist.add(s.getSequenceNumber());
781                         insertedNewKey = false;
782                 }
783
784                 /* update data structure */
785                 validateandupdate(array, true);
786
787                 return insertedNewKey;
788         }
789
790         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
791                 /* The cloud communication layer has checked slot HMACs already
792                          before decoding */
793                 if (newslots.length == 0) return;
794
795                 long firstseqnum = newslots[0].getSequenceNumber();
796                 if (firstseqnum <= sequencenumber) {
797                         throw new Error("Server Error: Sent older slots!");
798                 }
799
800                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
801                 checkHMACChain(indexer, newslots);
802
803                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
804
805                 initExpectedSize(firstseqnum);
806                 for (Slot slot : newslots) {
807                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
808                         updateExpectedSize();
809                 }
810
811                 /* If there is a gap, check to see if the server sent us everything. */
812                 if (firstseqnum != (sequencenumber + 1)) {
813
814                         // TODO: Check size
815                         checkNumSlots(newslots.length);
816                         if (!machineSet.isEmpty()) {
817                                 throw new Error("Missing record for machines: " + machineSet);
818                         }
819                 }
820
821                 commitNewMaxSize();
822
823                 /* Commit new to slots. */
824                 for (Slot slot : newslots) {
825                         buffer.putSlot(slot);
826                         liveslotcount++;
827                 }
828                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
829
830                 // Speculate on key value pairs
831                 createSpeculativeTable();
832         }
833
834         private void createSpeculativeTable() {
835                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
836
837                 for (Transaction trans : uncommittedTransactionsList) {
838
839                         try {
840                                 if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
841                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
842                                                 speculativeTableTmp.put(kv.getKey(), kv);
843                                         }
844                                 }
845
846                         } catch (Exception e) {
847                                 e.printStackTrace();
848                         }
849                 }
850
851                 speculativeTable = speculativeTableTmp;
852         }
853
854         private int expectedsize, currmaxsize;
855
856         private void checkNumSlots(int numslots) {
857                 if (numslots != expectedsize) {
858                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
859                 }
860         }
861
862         private void initExpectedSize(long firstsequencenumber) {
863                 long prevslots = firstsequencenumber;
864                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
865                 currmaxsize = numslots;
866         }
867
868         private void updateExpectedSize() {
869                 expectedsize++;
870                 if (expectedsize > currmaxsize) {
871                         expectedsize = currmaxsize;
872                 }
873         }
874
875         private void updateCurrMaxSize(int newmaxsize) {
876                 currmaxsize = newmaxsize;
877         }
878
879         private void commitNewMaxSize() {
880                 if (numslots != currmaxsize)
881                         buffer.resize(currmaxsize);
882
883                 numslots = currmaxsize;
884                 setResizeThreshold();
885         }
886
887
888
889
890
891         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
892                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
893         }
894
895         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
896                 long oldseqnum = entry.getOldSeqNum();
897                 long newseqnum = entry.getNewSeqNum();
898                 boolean isequal = entry.getEqual();
899                 long machineid = entry.getMachineID();
900                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
901                         Slot slot = indexer.getSlot(seqnum);
902                         if (slot != null) {
903                                 long slotmachineid = slot.getMachineID();
904                                 if (isequal != (slotmachineid == machineid)) {
905                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
906                                 }
907                         }
908                 }
909
910                 HashSet<Long> watchset = new HashSet<Long>();
911                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
912                         long entry_mid = lastmsg_entry.getKey();
913                         /* We've seen it, don't need to continue to watch.  Our next
914                          * message will implicitly acknowledge it. */
915                         if (entry_mid == localmachineid)
916                                 continue;
917                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
918                         long entry_seqn = v.getFirst();
919                         if (entry_seqn < newseqnum) {
920                                 addWatchList(entry_mid, entry);
921                                 watchset.add(entry_mid);
922                         }
923                 }
924                 if (watchset.isEmpty())
925                         entry.setDead();
926                 else
927                         entry.setWatchSet(watchset);
928         }
929
930         private void processEntry(NewKey entry) {
931                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
932         }
933
934         private void processEntry(Transaction entry) {
935                 uncommittedTransactionsList.add(entry);
936         }
937
938         private void processEntry(Abort entry) {
939
940
941                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
942                         // Abort has not been seen yet so we need to keep track of it
943                         abortSet.add(entry);
944                 } else {
945                         // The machine already saw this so it is dead
946                         entry.setDead();
947                 }
948
949                 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
950                         Transaction prevtrans = i.next();
951                         if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
952                                 uncommittedTransactionsList.remove(prevtrans);
953                                 prevtrans.setDead();
954                                 return;
955                         }
956                 }
957         }
958
959         private void processEntry(Commit entry) {
960
961                 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
962                         Commit prevcommit = i.next();
963                         prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
964
965                         if (!prevcommit.isLive()) {
966                                 //commitList.remove(prevcommit);
967                                 i.remove();
968                         }
969                 }
970
971                 commitList.add(entry);
972
973                 // Update the committed table list
974                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
975                         IoTString key = kv.getKey();
976                         commitedTable.put(key, kv);
977                 }
978
979                 long committedTransSeq = entry.getTransSequenceNumber();
980
981                 // Make dead the transactions
982                 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
983                         Transaction prevtrans = i.next();
984
985                         if (prevtrans.getSequenceNumber() <= committedTransSeq) {
986                                 // uncommittedTransactionsList.remove(prevtrans);
987                                 i.remove();
988                                 prevtrans.setDead();
989                         }
990                 }
991         }
992
993         private void processEntry(TableStatus entry) {
994                 int newnumslots = entry.getMaxSlots();
995                 updateCurrMaxSize(newnumslots);
996                 if (lastTableStatus != null)
997                         lastTableStatus.setDead();
998                 lastTableStatus = entry;
999         }
1000
1001
1002         private void addWatchList(long machineid, RejectedMessage entry) {
1003                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1004                 if (entries == null)
1005                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1006                 entries.add(entry);
1007         }
1008
1009         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1010                 machineSet.remove(machineid);
1011
1012                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1013                 if (watchset != null) {
1014                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1015                                 RejectedMessage rm = rmit.next();
1016                                 if (rm.getNewSeqNum() <= seqnum) {
1017                                         /* Remove it from our watchlist */
1018                                         rmit.remove();
1019                                         /* Decrement machines that need to see this notification */
1020                                         rm.removeWatcher(machineid);
1021                                 }
1022                         }
1023                 }
1024
1025                 if (machineid == localmachineid) {
1026                         /* Our own messages are immediately dead. */
1027                         if (liveness instanceof LastMessage) {
1028                                 ((LastMessage)liveness).setDead();
1029                         } else if (liveness instanceof Slot) {
1030                                 ((Slot)liveness).setDead();
1031                         } else {
1032                                 throw new Error("Unrecognized type");
1033                         }
1034                 }
1035
1036                 // Set dead the abort
1037                 for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
1038                         Abort abort = ait.next();
1039
1040                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1041                                 abort.setDead();
1042                                 ait.remove();
1043                         }
1044                 }
1045
1046
1047                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1048                 if (lastmsgentry == null)
1049                         return;
1050
1051                 long lastmsgseqnum = lastmsgentry.getFirst();
1052                 Liveness lastentry = lastmsgentry.getSecond();
1053                 if (machineid != localmachineid) {
1054                         if (lastentry instanceof LastMessage) {
1055                                 ((LastMessage)lastentry).setDead();
1056                         } else if (lastentry instanceof Slot) {
1057                                 ((Slot)lastentry).setDead();
1058                         } else {
1059                                 throw new Error("Unrecognized type");
1060                         }
1061                 }
1062
1063                 if (machineid == localmachineid) {
1064                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1065                                 throw new Error("Server Error: Mismatch on local machine sequence number");
1066                 } else {
1067                         if (lastmsgseqnum > seqnum)
1068                                 throw new Error("Server Error: Rollback on remote machine sequence number");
1069                 }
1070         }
1071
1072         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1073                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1074                 for (Entry entry : slot.getEntries()) {
1075                         switch (entry.getType()) {
1076
1077                         case Entry.TypeNewKey:
1078                                 processEntry((NewKey)entry);
1079                                 break;
1080
1081                         case Entry.TypeCommit:
1082                                 processEntry((Commit)entry);
1083                                 break;
1084
1085                         case Entry.TypeAbort:
1086                                 processEntry((Abort)entry);
1087                                 break;
1088
1089                         case Entry.TypeTransaction:
1090                                 processEntry((Transaction)entry);
1091                                 break;
1092
1093                         case Entry.TypeLastMessage:
1094                                 processEntry((LastMessage)entry, machineSet);
1095                                 break;
1096
1097                         case Entry.TypeRejectedMessage:
1098                                 processEntry((RejectedMessage)entry, indexer);
1099                                 break;
1100
1101                         case Entry.TypeTableStatus:
1102                                 processEntry((TableStatus)entry);
1103                                 break;
1104
1105                         default:
1106                                 throw new Error("Unrecognized type: " + entry.getType());
1107                         }
1108                 }
1109         }
1110
1111         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1112                 for (int i = 0; i < newslots.length; i++) {
1113                         Slot currslot = newslots[i];
1114                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1115                         if (prevslot != null &&
1116                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1117                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
1118                 }
1119         }
1120 }