Changes
authorAli Younis <ayounis@uci.edu>
Thu, 1 Dec 2016 23:17:45 +0000 (15:17 -0800)
committerAli Younis <ayounis@uci.edu>
Thu, 1 Dec 2016 23:17:45 +0000 (15:17 -0800)
src2/java/iotcloud/Table.java
src2/java/iotcloud/Test.java

index 1cdb3d5..00d48b0 100644 (file)
@@ -178,7 +178,7 @@ final public class Table {
        public void addKV(IoTString key, IoTString value) {
 
                // Make sure new key value pair matches the current arbitrator
-               if (!pendingTransBuild.checkArbitrator( arbitratorTable.get(key))) {
+               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
                        // TODO: Maybe not throw and error
                        throw new Error("Not all Key Values match");
                }
@@ -189,17 +189,211 @@ final public class Table {
                pendingTransBuild.addKV(kv);
        }
 
-
-       // TODo: FIx Guard
-       public void addGuard(IoTString key, IoTString value) {
-               KeyValue kv = new KeyValue(key, value);
-               pendingTransBuild.addKV(kv);
+       public void addGuard(Guard guard) {
+               pendingTransBuild.addGuard(guard);
        }
 
        public void update() {
                Slot[] newslots = cloud.getSlots(sequencenumber + 1);
 
                validateandupdate(newslots, false);
+
+               if (uncommittedTransactionsList.size() > 0) {
+                       List<Transaction> uncommittedTransArb = new LinkedList<Transaction>();
+
+                       for (Transaction ut : uncommittedTransactionsList) {
+                               KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]);
+                               long arb = arbitratorTable.get(kv.getKey());
+
+                               if (arb == localmachineid) {
+                                       uncommittedTransArb.add(ut);
+                               }
+                       }
+
+
+                       boolean needResize = false;
+                       while (uncommittedTransArb.size() > 0) {
+                               boolean resize = needResize;
+                               needResize = false;
+
+                               Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+                               int newsize = 0;
+                               if (liveslotcount > resizethreshold) {
+                                       resize = true; //Resize is forced
+                               }
+
+                               if (resize) {
+                                       newsize = (int) (numslots * RESIZE_MULTIPLE);
+                                       TableStatus status = new TableStatus(s, newsize);
+                                       s.addEntry(status);
+                               }
+
+                               if (! rejectedmessagelist.isEmpty()) {
+                                       /* TODO: We should avoid generating a rejected message entry if
+                                        * there is already a sufficient entry in the queue (e.g.,
+                                        * equalsto value of true and same sequence number).  */
+
+                                       long old_seqn = rejectedmessagelist.firstElement();
+                                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+                                               long new_seqn = rejectedmessagelist.lastElement();
+                                               RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+                                               s.addEntry(rm);
+                                       } else {
+                                               long prev_seqn = -1;
+                                               int i = 0;
+                                               /* Go through list of missing messages */
+                                               for (; i < rejectedmessagelist.size(); i++) {
+                                                       long curr_seqn = rejectedmessagelist.get(i);
+                                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                                       if (s_msg != null)
+                                                               break;
+                                                       prev_seqn = curr_seqn;
+                                               }
+                                               /* Generate rejected message entry for missing messages */
+                                               if (prev_seqn != -1) {
+                                                       RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+                                                       s.addEntry(rm);
+                                               }
+                                               /* Generate rejected message entries for present messages */
+                                               for (; i < rejectedmessagelist.size(); i++) {
+                                                       long curr_seqn = rejectedmessagelist.get(i);
+                                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                                       long machineid = s_msg.getMachineID();
+                                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                                       s.addEntry(rm);
+                                               }
+                                       }
+                               }
+
+                               long newestseqnum = buffer.getNewestSeqNum();
+                               long oldestseqnum = buffer.getOldestSeqNum();
+                               if (lastliveslotseqn < oldestseqnum) {
+                                       lastliveslotseqn = oldestseqnum;
+                               }
+
+                               long seqn = lastliveslotseqn;
+                               boolean seenliveslot = false;
+                               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+                               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
+
+                               boolean tryAgain = false;
+
+                               // Mandatory Rescue
+                               for (; seqn < threshold; seqn++) {
+                                       Slot prevslot = buffer.getSlot(seqn);
+                                       //Push slot number forward
+                                       if (! seenliveslot)
+                                               lastliveslotseqn = seqn;
+
+                                       if (! prevslot.isLive())
+                                               continue;
+                                       seenliveslot = true;
+                                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                                       for (Entry liveentry : liveentries) {
+                                               if (s.hasSpace(liveentry)) {
+                                                       s.addEntry(liveentry);
+                                               } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
+                                                       if (!resize) {
+                                                               System.out.print("B"); //?
+                                                               tryAgain = true;
+                                                               needResize = true;
+                                                       }
+                                               }
+                                       }
+                               }
+
+                               if (tryAgain) {
+                                       continue;
+                               }
+
+                               // Arbitrate
+                               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+                               for (Iterator<Transaction> i = uncommittedTransArb.iterator(); i.hasNext();) {
+                                       Transaction ut = i.next();
+
+                                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+                                       // Check if this machine arbitrates for this transaction
+                                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+                                               continue;
+                                       }
+
+                                       Entry newEntry = null;
+
+                                       try {
+                                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                                                       // Guard evaluated as true
+
+                                                       // update the local tmp current key set
+                                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+                                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                                       }
+
+                                                       // create the commit
+                                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+                                               } else {
+                                                       // Guard was false
+
+                                                       // create the abort
+                                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+                                               }
+                                       } catch (Exception e) {
+                                               e.printStackTrace();
+                                       }
+
+                                       if ((newEntry != null) && s.hasSpace(newEntry)) {
+                                               s.addEntry(newEntry);
+                                               i.remove();
+
+                                       } else {
+                                               break;
+                                       }
+                               }
+
+                               /* now go through live entries from least to greatest sequence number until
+                                * either all live slots added, or the slot doesn't have enough room
+                                * for SKIP_THRESHOLD consecutive entries*/
+                               int skipcount = 0;
+                               search:
+                               for (; seqn <= newestseqnum; seqn++) {
+                                       Slot prevslot = buffer.getSlot(seqn);
+                                       //Push slot number forward
+                                       if (!seenliveslot)
+                                               lastliveslotseqn = seqn;
+
+                                       if (!prevslot.isLive())
+                                               continue;
+                                       seenliveslot = true;
+                                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                                       for (Entry liveentry : liveentries) {
+                                               if (s.hasSpace(liveentry))
+                                                       s.addEntry(liveentry);
+                                               else {
+                                                       skipcount++;
+                                                       if (skipcount > SKIP_THRESHOLD)
+                                                               break search;
+                                               }
+                                       }
+                               }
+
+                               int max = 0;
+                               if (resize)
+                                       max = newsize;
+                               Slot[] array = cloud.putSlot(s, max);
+                               if (array == null) {
+                                       array = new Slot[] {s};
+                                       rejectedmessagelist.clear();
+                               }       else {
+                                       if (array.length == 0)
+                                               throw new Error("Server Error: Did not send any slots");
+                                       rejectedmessagelist.add(s.getSequenceNumber());
+                               }
+
+                               /* update data structure */
+                               validateandupdate(array, true);
+                       }
+
+
+               }
        }
 
        public boolean createNewKey(IoTString keyName, long machineId) {
index e70d88e..6ac5b06 100644 (file)
@@ -8,88 +8,48 @@ package iotcloud;
 
 public class Test {
        public static void main(String[] args) {
-               // if(args[0].equals("2"))
-               //      test2();
-               // else if(args[0].equals("3"))
-               //      test3();
-               // else if(args[0].equals("4"))
-               //      test4();
-               // else if(args[0].equals("5"))
-               //      test5();
-
+               if (args[0].equals("2")) {
+                       test2();
+               }
        }
 
-       
-       
-       // static Thread buildThread(String prefix, Table t) {
-       //      return new Thread() {
-       //              public void run() {
-       //                      for(int i=0; i<10000; i++) {
-       //                              String a=prefix+i;
-       //                              IoTString ia=new IoTString(a);
-       //                              t.put(ia, ia);
-       //                              System.out.println(ia+"->"+t.get(ia));
-       //                      }
-       //              }
-       //      };
-       // }
-       
-       // static void test5() {
-       //      Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
-       //      t1.rebuild();
-       //      System.out.println(t1);
-       // }
-       
-       // static void test4() {
-       //      Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
-       //      Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
-       //      t1.rebuild();
-       //      t2.rebuild();
-       //      Thread thr1=buildThread("p1", t1);
-       //      Thread thr2=buildThread("p2", t2);
-       //      thr1.start();
-       //      thr2.start();
-       //      try {
-       //              thr1.join();
-       //              thr2.join();
-       //      } catch (Exception e) {
-       //              e.printStackTrace();
-       //      }
-       // }
+       static void test2() {
+               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               t1.initTable();
+               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               t2.update();
+
+               for (int i = 0; i < 600; i++) {
+                       String a = "STR" + i;
+                       String b = "ABR" + i;
+                       IoTString ia = new IoTString(a);
+                       IoTString ib = new IoTString(b);
+
+
+                       t1.createNewKey(ia, 321);
+                       t2.createNewKey(ib, 351);
+
+                       t1.startTransaction();
+                       t1.addKV(ia, ia);
+                       t1.commitTransaction();
 
-       // static void test3() {
-       //      Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
-       //      Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
-       //      t1.rebuild();
-       //      t2.rebuild();
-       //      for(int i=0; i<600; i++) {
-       //              String a="STR"+i;
-       //              String b="ABR"+i;
-       //              IoTString ia=new IoTString(a);
-       //              IoTString ib=new IoTString(b);
-       //              t1.put(ia, ia);
-       //              t2.put(ib, ib);
-       //              t1.update();
-       //              System.out.println(ib+"->"+t1.get(ib));
-       //              System.out.println(ia+"->"+t2.get(ia));
-       //      }
-       // }
+                       t2.startTransaction();
+                       t2.addKV(ib, ib);
+                       t2.commitTransaction();
+               }
 
-       // static void test2() {
-       //      Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
-       //      t1.initTable();
-       //      Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
-       //      t2.update();
-       //      for(int i=0; i<600; i++) {
-       //              String a="STR"+i;
-       //              String b="ABR"+i;
-       //              IoTString ia=new IoTString(a);
-       //              IoTString ib=new IoTString(b);
-       //              t1.put(ia, ia);
-       //              t2.put(ib, ib);
-       //              t1.update();
-       //              System.out.println(ib+"->"+t1.get(ib));
-       //              System.out.println(ia+"->"+t2.get(ia));
-       //      }
-       // }
+               t1.update();
+               t2.update();
+
+
+               for (int i = 0; i < 600; i++) {
+                       String a = "STR" + i;
+                       String b = "ABR" + i;
+                       IoTString ia = new IoTString(a);
+                       IoTString ib = new IoTString(b);
+
+                       System.out.println(ib + "->" + t1.getCommitted(ib));
+                       System.out.println(ia + "->" + t2.getCommitted(ia));
+               }
+       }
 }