Add two benchmarks: MapReduce & PERT
authorjzhou <jzhou>
Tue, 11 Mar 2008 21:37:51 +0000 (21:37 +0000)
committerjzhou <jzhou>
Tue, 11 Mar 2008 21:37:51 +0000 (21:37 +0000)
25 files changed:
Robust/src/Benchmarks/MapReduce/Java/Configuration.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/Configured.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/JobClient.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/MapReduceBase.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/MapWorker.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/Master.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/OutputCollector.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/Splitter.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/Tool.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/ToolRunner.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Java/WordCounter.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Tag/MapReduceBase.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Tag/Master.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Tag/OutputCollector.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Tag/Splitter.java [new file with mode: 0644]
Robust/src/Benchmarks/PERT/Java/Estimator.java [new file with mode: 0644]
Robust/src/Benchmarks/PERT/Java/PERT.java [new file with mode: 0644]
Robust/src/Benchmarks/PERT/Java/Stage.java [new file with mode: 0644]
Robust/src/Benchmarks/PERT/Tag/Estimator.java [new file with mode: 0644]
Robust/src/Benchmarks/PERT/Tag/PERT.java [new file with mode: 0644]
Robust/src/Benchmarks/PERT/Tag/Stage.java [new file with mode: 0644]

diff --git a/Robust/src/Benchmarks/MapReduce/Java/Configuration.java b/Robust/src/Benchmarks/MapReduce/Java/Configuration.java
new file mode 100644 (file)
index 0000000..9a245f5
--- /dev/null
@@ -0,0 +1,55 @@
+//package mapreduce;
+
+public class Configuration {
+    
+    MapReduceBase mapreducer;
+    
+    int m;
+    int r;
+    char seperator;
+    String inputfile;
+    
+    public Configuration() {
+       this.mapreducer = null;
+    }
+
+    public MapReduceBase getMapReduce() {
+       return this.mapreducer;
+    }
+    
+    public void setMapReduceClass(MapReduceBase mapreducer) {
+       this.mapreducer = mapreducer;
+    }
+
+    public int getM() {
+        return m;
+    }
+
+    public void setM(int m) {
+        this.m = m;
+    }
+
+    public int getR() {
+        return r;
+    }
+
+    public void setR(int r) {
+        this.r = r;
+    }
+
+    public char getSeperator() {
+        return seperator;
+    }
+
+    public void setSeperator(char seperator) {
+        this.seperator = seperator;
+    }
+
+    public String getInputfile() {
+        return inputfile;
+    }
+
+    public void setInputfile(String inputfile) {
+        this.inputfile = inputfile;
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/Configured.java b/Robust/src/Benchmarks/MapReduce/Java/Configured.java
new file mode 100644 (file)
index 0000000..5cce432
--- /dev/null
@@ -0,0 +1,22 @@
+//package mapreduce;
+
+public class Configured {
+    
+    Configuration conf;
+    
+    public Configured() {
+       this.conf = null;
+    }
+    
+    public Configured(Configuration conf) {
+       this.conf = conf;
+    }
+    
+    public Configuration getConf() {
+       return this.conf;
+    }
+    
+    public void setConf(Configuration conf) {
+       this.conf = conf;
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/JobClient.java b/Robust/src/Benchmarks/MapReduce/Java/JobClient.java
new file mode 100644 (file)
index 0000000..b45f77e
--- /dev/null
@@ -0,0 +1,59 @@
+//package mapreduce;
+
+public class JobClient{
+    
+    public JobClient() {}
+    
+    public static void runJob(Configuration conf) {
+       Splitter splitter = new Splitter(conf.getInputfile(), conf.getM(), conf.getSeperator());
+       Master master = new Master(conf.getM(), conf.getR(), splitter);
+       
+       // split input file
+       System.printString("Split\n");
+       master.split();
+       
+       // do 'map'
+       System.printString("Map\n");
+       MapWorker[] mworkers = master.assignMap();
+       for(int i = 0; i < mworkers.length; ++i) {
+           MapWorker mworker = mworkers[i];
+           mworker.setMapreducer(conf.getMapReduce());
+           mworker.map();
+           mworker.partition();
+       }
+       
+       // register intermediate output from map workers to master
+       System.printString("Mapoutput\n");
+       for(int i = 0; i < mworkers.length; ++i) {
+           for(int j = 0; j < conf.getR(); ++j) {
+               String temp = mworkers[i].outputFile(j);
+               if(temp != null) {
+                   master.addInterOutput(temp);
+               }
+           }
+           master.setMapFinish(mworkers[i].getID());
+       }
+       //assert(master.isMapFinish());
+       
+       // do 'reduce'
+       System.printString("Reduce\n");
+       ReduceWorker[] rworkers = master.assignReduce();
+       for(int i = 0; i < rworkers.length; ++i) {
+           ReduceWorker rworker = rworkers[i];
+           rworker.setMapreducer(conf.getMapReduce());
+           rworker.sortgroup();
+           rworker.reduce();
+       }
+       
+       // merge all the intermediate output from reduce workers to master
+       System.printString("Merge\n");
+       for(int i = 0; i < rworkers.length; ++i) {
+           master.collectROutput(rworkers[i].getOutputFile());
+           master.setReduceFinish(rworkers[i].getID());
+       }
+       //assert(master.isReduceFinish());
+       
+       System./*out.println*/printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+    }
+    
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/MapReduceBase.java b/Robust/src/Benchmarks/MapReduce/Java/MapReduceBase.java
new file mode 100644 (file)
index 0000000..21b806e
--- /dev/null
@@ -0,0 +1,13 @@
+//package mapreduce;
+
+//import java.util.Vector;
+
+public /*abstract*/ class MapReduceBase {
+    
+    public MapReduceBase() {}
+    
+    public /*abstract*/ void map(String key, String value, OutputCollector output);
+    
+    public /*abstract*/ void reduce(String key, Vector values, OutputCollector output);
+}
+
diff --git a/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java b/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java
new file mode 100644 (file)
index 0000000..c31b575
--- /dev/null
@@ -0,0 +1,119 @@
+//package mapreduce;
+
+//import java.io.FileOutputStream;
+
+public class MapWorker {
+
+    int ID;
+    MapReduceBase mapreducer;
+
+    int r;
+    String key;
+    String value;
+    OutputCollector output;
+
+    String[] locations;
+    FileOutputStream[] outputs;
+
+    public MapWorker(String key, String value, int r, int id) {
+       this.ID = id;
+       this.mapreducer = null;
+
+       this.r = r;
+       this.key = key;
+       this.value = value;
+       this.output = new OutputCollector();
+
+       locations = new String[r];
+       for(int i = 0; i < r; ++i) {
+           StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-");
+           temp.append(String.valueOf(ID));
+           temp.append("-of-");
+           temp.append(String.valueOf(r));
+           temp.append("_");
+           temp.append(String.valueOf(i));
+           temp.append(".dat");
+           locations[i] = new String(temp);
+       }
+
+       outputs = new FileOutputStream[r];
+       for(int i = 0; i < r; ++i) {
+           outputs[i] = null;
+       }
+    }
+
+    public MapReduceBase getMapreducer() {
+       return mapreducer;
+    }
+
+    public void setMapreducer(MapReduceBase mapreducer) {
+       this.mapreducer = mapreducer;
+    }
+
+    public void map() {
+       /*if(ID % 2 == 1) {
+           String temp = locations[locations.length];
+       }*/
+       
+       this.mapreducer.map(key, value, output);
+    }
+
+    public void partition() {
+       /*if(ID % 2 == 1) {
+           String temp = locations[locations.length];
+       }*/
+       
+       //try{
+           int size = this.output.size();
+           for(int i = 0; i < size; ++i) {
+               String key = this.output.getKey(i);
+               String value = this.output.getValue(i);
+               // use the hashcode of key to decide which intermediate output
+               // this pair should be in
+               //int hash = key.hashCode();
+               int index = (int)Math.abs(key.hashCode()) % this.r;
+               FileOutputStream oStream = outputs[index];
+               if(oStream == null) {
+                   // open the file
+                   String filepath = locations[index];
+                   oStream = new FileOutputStream(filepath, true); // append
+                   outputs[index] = oStream;
+               }
+               // format: key value\n
+               oStream.write(key.getBytes());
+               oStream.write(' ');
+               oStream.write(value.getBytes());
+               oStream.write('\n');
+               oStream.flush();
+           }
+
+           // close the output files
+           for(int i = 0; i < this.outputs.length; ++i) {
+               FileOutputStream temp = this.outputs[i];
+               if(temp != null) {
+                   temp.close();
+               }
+           }
+       /*} catch(Exception e) {
+           e.printStackTrace();
+           System.exit(-1);
+       }*/
+    }
+
+    public String outputFile(int i) {
+       if(outputs[i] != null) {
+           return locations[i];
+       } else {
+           return null;
+       }
+    }
+
+    public int getID() {
+       return this.ID;
+    }
+
+    public int getR() {
+       return this.r;
+    }
+
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/Master.java b/Robust/src/Benchmarks/MapReduce/Java/Master.java
new file mode 100644 (file)
index 0000000..92ce404
--- /dev/null
@@ -0,0 +1,158 @@
+//package mapreduce;
+
+/*import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.Vector;*/
+
+public class Master {
+
+    int m;
+    int r;
+    int[] mworkerStates; // array of map worker's state
+    // 0: idle  1: process  2: finished 3: fail
+    int[] rworkerStates; // array of reduce worker's state
+    Vector[] interoutputs; // array of string vector containing
+    // paths of intermediate outputs from
+    // map worker
+
+    Splitter splitter;
+
+    String outputfile;  // path of final output file
+
+    boolean partial;
+
+    public Master(int m, int r, Splitter splitter) {
+       this.m = m;
+       this.r = r;
+
+       mworkerStates = new int[m];
+       rworkerStates = new int[r];
+       for(int i = 0; i < m; ++i) {
+           mworkerStates[i] = 0;
+       }
+       for(int i = 0; i < r; ++i) {
+           rworkerStates[i] = 0;
+       }
+
+       interoutputs = new Vector[r];
+       for(int i = 0; i < r; ++i) {
+           interoutputs[i] = null;
+       }
+
+       this.splitter = splitter;
+       this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
+
+       this.partial = false;
+    }
+
+    public int getR() {
+       return this.r;
+    }
+
+    public String getOutputFile() {
+       return this.outputfile;
+    }
+
+    public boolean isPartial() {
+       return this.partial;
+    }
+
+    public void setPartial(boolean partial) {
+       this.partial = partial || this.partial;
+    }
+
+    public void split() {
+       splitter.split();
+    }
+
+    public MapWorker[] assignMap() {
+       String[] contentsplits = splitter.getSlices();
+       MapWorker[] mworkers = new MapWorker[contentsplits.length];
+       for(int i = 0; i < contentsplits.length; ++i) {
+           //System.printString("*************************\n");
+           //System.printString(contentsplits[i] + "\n");
+           //System.printString("*************************\n");
+           MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i);
+           mworkerStates[i] = 1;
+           mworkers[i] = mworker;
+       }
+       return mworkers;
+    }
+
+    public void setMapFinish(int i) {
+       mworkerStates[i] = 2;
+    }
+
+    public void setMapFail(int i) {
+       mworkerStates[i] = 3;
+    }
+
+    public boolean isMapFinish() {
+       for(int i = 0; i < mworkerStates.length; ++i) {
+           if(mworkerStates[i] == 1) {
+               return false;
+           }
+       }
+
+       return true;
+    }
+
+    public void addInterOutput(String interoutput) {
+       int start = interoutput.indexOf('_');
+       int end = interoutput.indexOf('.');
+       int index = Integer.parseInt(interoutput.substring(start + 1, end));
+       //System.printString(interoutput.subString(start + 1, end) + "\n");
+       if(interoutputs[index] == null) {
+           interoutputs[index] = new Vector();
+       }
+       interoutputs[index].addElement(interoutput);
+    }
+
+    public ReduceWorker[] assignReduce() {
+       ReduceWorker[] rworkers = new ReduceWorker[interoutputs.length];
+       for(int i = 0; i < interoutputs.length; ++i) {
+           ReduceWorker rworker = new ReduceWorker(interoutputs[i], i);
+           rworkerStates[i] = 1;
+           rworkers[i] = rworker;
+       }
+       return rworkers;
+    }
+
+    public void setReduceFinish(int i) {
+       rworkerStates[i] = 2;
+    }
+
+    public void setReduceFail(int i) {
+       rworkerStates[i] = 3;
+    }
+
+    public boolean isReduceFinish() {
+       for(int i = 0; i < rworkerStates.length; ++i) {
+           if(rworkerStates[i] == 1) {
+               return false;
+           }
+       }
+
+       return true;
+    }
+
+    public void collectROutput(String file) {
+       //try{
+           FileInputStream iStream = new FileInputStream(file);
+           FileOutputStream oStream = new FileOutputStream(outputfile, true);
+           byte[] b = new byte[1024 * 100];
+           int length = iStream.read(b);
+           if(length < 0) {
+               System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
+               System.exit(-1);
+           }
+           //System.printString(new String(b, 0, length) + "\n");
+           oStream.write(b, 0, length);
+           iStream.close();
+           oStream.close();
+       /*} catch(Exception e) {
+           e.printStackTrace();
+           System.exit(-1);
+       }*/
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/OutputCollector.java b/Robust/src/Benchmarks/MapReduce/Java/OutputCollector.java
new file mode 100644 (file)
index 0000000..e66a675
--- /dev/null
@@ -0,0 +1,31 @@
+//package mapreduce;
+
+//import java.util.Vector;
+
+public class OutputCollector {
+
+    Vector keys;
+    Vector values;
+
+    public OutputCollector() {
+       this.keys = new Vector();
+       this.values = new Vector();
+    }
+
+    public void emit(String key, String value) {
+       this.keys.addElement(key);
+       this.values.addElement(value);
+    }
+
+    public int size() {
+       return this.keys.size();
+    }
+
+    public String getKey(int i) {
+       return (String)this.keys.elementAt(i);
+    }
+
+    public String getValue(int i) {
+       return (String)this.values.elementAt(i);
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java b/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java
new file mode 100644 (file)
index 0000000..b18ce20
--- /dev/null
@@ -0,0 +1,166 @@
+//package mapreduce;
+
+/*import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Vector;*/
+
+import mapreduce.MapReduceBase;
+
+public class ReduceWorker {
+
+    int ID;
+    MapReduceBase mapreducer;
+    Vector interoutputs;  // string vector containing paths
+                          // of intermediate outputs from map worker
+    Vector keys;
+    HashMap values; // hashmap map key to vector of string vector
+    int[] sorts; // array record the sort of keys
+    OutputCollector output;
+    String outputfile;  // path of the intermediate output file
+
+    public ReduceWorker(Vector interoutputs, int id) {
+       this.ID = id;
+       this.mapreducer = null;
+       
+       this.interoutputs = interoutputs;
+
+       this.keys = new Vector();
+       this.values = new HashMap();
+       //this.sorts = null;
+
+       this.output = new OutputCollector();
+       this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+    }
+
+    public MapReduceBase getMapreducer() {
+        return mapreducer;
+    }
+
+    public void setMapreducer(MapReduceBase mapreducer) {
+        this.mapreducer = mapreducer;
+    }
+
+    public void sortgroup() {
+       /*if(ID % 2 == 1) {
+           int a[] = new int[1];
+           int temp = a[1];
+       }*/
+       
+       // group values associated to the same key
+       //System.printString("================================\n");
+       if(interoutputs == null) {
+           return;
+       }
+       //try{
+           for(int i = 0; i < interoutputs.size(); ++i) {
+               FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
+               byte[] b = new byte[1024 * 100];
+               int length = iStream.read(b);
+               if(length < 0) {
+                   System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
+                   System.exit(-1);
+               }
+               String content = new String(b, 0, length);
+               //System.printString(content + "\n");
+               int index = content.indexOf('\n');
+               while(index != -1) {
+                   String line = content.substring(0, index);
+                   content = content.substring(index + 1);
+                   //System.printString(line + "\n");
+                   int tmpindex = line.indexOf(' ');
+                   String key = line.substring(0, tmpindex);
+                   String value = line.substring(tmpindex + 1);
+                   //System.printString(key + "; " + value + "\n");
+                   if(!this.values.containsKey(key)) {
+                       this.values.put(key, new Vector());
+                       this.keys.addElement(key);
+                   }
+                   ((Vector)this.values.get(key)).addElement(value);
+                   index = content.indexOf('\n');
+               }
+               iStream.close();
+           }
+           //System.printString("================================\n");
+
+           /*for(int i = 0; i < this.keys.size(); ++i) {
+                       System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+               }
+               System.printString("\n");*/
+
+           // sort all the keys inside interoutputs
+           this.sorts = new int[this.keys.size()];
+           // insert sorting
+           this.sorts[0] = 0;
+           int tosort = 1;
+           for(; tosort < this.keys.size(); ++tosort) {
+               int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
+               int index = tosort;
+               for(int i = tosort; i > 0; --i) {
+                   if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
+                       this.sorts[i] = this.sorts[i-1];
+                       index = i - 1;
+                   } else {
+                       //System.printString(i + "; " + tosort + "\n");
+                       index = i;
+                       i = 0;
+                   }
+               }
+               this.sorts[index] = tosort;
+           }
+           /*for(int i = 0; i < this.sorts.length; ++i) {
+                       System.printString(this.sorts[i] + "; ");
+               }
+               System.printString("\n");*/
+       /*} catch(IOException e) {
+           e.printStackTrace();
+           System.exit(-1);
+       }*/
+    }
+
+    public void reduce() {
+       /*if(ID % 2 == 1) {
+           int a[] = new int[1];
+           int temp = a[1];
+       }*/
+       
+       if(this.interoutputs == null) {
+           return;
+       }
+       for(int i = 0; i < this.sorts.length; ++i) {
+           String key = (String)this.keys.elementAt(this.sorts[i]);
+           Vector values = (Vector)this.values.get(key);
+           this.mapreducer.reduce(key, values, output);
+       }
+
+       //try{
+           // output all the result into some local file
+           int size = this.output.size();
+           FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
+           for(int i = 0; i < size; ++i) {
+               String key = this.output.getKey(i);
+               String value = this.output.getValue(i);
+               // format: key value\n
+               oStream.write(key.getBytes());
+               oStream.write(' ');
+               oStream.write(value.getBytes());
+               oStream.write('\n');
+               oStream.flush();
+           }
+           oStream.close();
+       /*} catch(Exception e) {
+           e.printStackTrace();
+           System.exit(-1);
+       }*/
+    }
+
+    public String getOutputFile() {
+       return this.outputfile;
+    }
+
+    public int getID() {
+       return this.ID;
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/Splitter.java b/Robust/src/Benchmarks/MapReduce/Java/Splitter.java
new file mode 100644 (file)
index 0000000..342328b
--- /dev/null
@@ -0,0 +1,83 @@
+//package mapreduce;
+
+//import java.io.FileInputStream;
+//import java.io.IOException;
+
+public class Splitter {
+    String filename;
+    String content;
+    int length;
+    int[] splits;
+    String[] slices;
+
+    public Splitter(String path, int splitNum, char seperator) {
+       //try{
+           //System.printString("Top of Splitter's constructor\n");
+           filename = path;
+           FileInputStream iStream = new FileInputStream(filename);
+           byte[] b = new byte[1024 * 10];
+           length = iStream.read(b);
+           if(length < 0) {
+               System./*out.println*/printString("Error! Can not read from input file: " + filename + "\n");
+               System.exit(-1);
+           }
+           content = new String(b, 0, length);
+           //System.printString(content + "\n");
+           iStream.close();
+
+           if(splitNum == 1) {
+               slices = new String[1];
+               slices[0] = content;
+           } else {
+               splits = new int[splitNum - 1];
+               int index = 0;
+               int span = length / splitNum;
+               int temp = 0;
+               for(int i = 0; i < splitNum - 1; ++i) {
+                   temp += span;
+                   if(temp > index) {
+                       index = temp;
+                       while((content.charAt(index) != seperator) && (index != length - 1)) {
+                           ++index;
+                       }
+                   }
+                   splits[i] = index;
+               }
+
+               this.slices = new String[splits.length + 1];
+               for(int i = 0; i < this.slices.length; ++i) {
+                   this.slices[i] = null;
+               }
+           }
+       /*} catch(IOException e) {
+           e.printStackTrace();
+           System.exit(-1);
+       }*/
+    }
+
+    public void split() {
+       if(slices.length == 1) {
+           return;
+       }
+       int start = 0;
+       int end = 0;
+       for(int i = 0; i < splits.length; ++i) {
+           end = splits[i];
+           if(end < start) {
+               slices[i] = null;
+           } else {
+               slices[i] = content.substring(start, end);
+           }
+           start = end + 1;
+       }
+       slices[slices.length - 1] = content.substring(start);
+    }
+
+    public String getFilename() {
+       return filename;
+    }
+
+    public String[] getSlices() {
+       return this.slices;
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/Tool.java b/Robust/src/Benchmarks/MapReduce/Java/Tool.java
new file mode 100644 (file)
index 0000000..5dbeb6e
--- /dev/null
@@ -0,0 +1,5 @@
+//package mapreduce;
+
+public /*interface*/ class Tool {
+    public int run(String[] args);
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/ToolRunner.java b/Robust/src/Benchmarks/MapReduce/Java/ToolRunner.java
new file mode 100644 (file)
index 0000000..82a8c5a
--- /dev/null
@@ -0,0 +1,9 @@
+//package mapreduce;
+
+public class ToolRunner {
+    
+    public static int run(Tool tool, String[] args) {
+       return tool.run(args);
+    }
+    
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java b/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java
new file mode 100644 (file)
index 0000000..b8d961c
--- /dev/null
@@ -0,0 +1,131 @@
+
+//import java.io.FileInputStream;
+//import java.util.Vector;
+
+/*import mapreduce.Configuration;
+import mapreduce.Configured;
+import mapreduce.JobClient;
+import mapreduce.MapReduceBase;
+import mapreduce.OutputCollector;
+import mapreduce.Tool;
+import mapreduce.ToolRunner;*/
+
+/**
+     * Counts the words in each line.
+     * For each line of input, break the line into words and emit them as
+     * (<b>word</b>, <b>1</b>).
+     */
+    public class MapReduceClass extends MapReduceBase {
+       
+       public MapReduceClass() {}
+
+       public void map(String key, String value, OutputCollector output) {
+           int n = value.length();
+           for (int i = 0; i < n; ) {
+               // Skip past leading whitespace
+               while ((i < n) && isspace(value.charAt(i))) {
+                   ++i;
+               }
+
+               // Find word end
+               int start = i;
+               while ((i < n) && !isspace(value.charAt(i))) {
+                   i++;
+               }
+
+               if (start < i) {
+                   output.emit(value.substring(start, i), "1");
+               }
+           }
+       }
+
+       public void reduce(String key, Vector values, OutputCollector output) {
+           // Iterate over all entries with the
+           // // same key and add the values
+           int value = 0;
+           for(int i = 0; i < values.size(); ++i) {
+               value += Integer.parseInt((String)values.elementAt(i));
+           }
+
+           // Emit sum for input->key()
+           output.emit(key, String.valueOf(value));
+       }
+
+       boolean isspace(char c) {
+           if((c == ' ') || 
+                   (c == '.') ||
+                   (c == '!') ||
+                   (c == '?') ||
+                   (c == '"') ||
+                   (c == '\n')) {
+               return true;
+           }
+           return false;
+       }
+    }
+
+public class WordCounter /*implements*/extends Tool {
+
+       public WordCounter() {}
+
+    static int printUsage() {
+       System./*out.println*/printString("<conffile>\n");
+       return -1;
+    }
+
+    /**
+     * The main driver for word count map/reduce program.
+     * Invoke this method to submit the map/reduce job.
+     * @throws IOException When there is communication problems with the 
+     *                     job tracker.
+     */
+    public int run(String[] args) {
+       //try {
+           MapReduceClass mapreducer = new MapReduceClass();       
+
+           FileInputStream iStream = new FileInputStream(args[0]);
+           byte[] b = new byte[1024];
+           int length = iStream.read(b);
+           if(length < 0 ) {
+               System./*out.println*/printString("Error! Can not read from configure file: " + args[0] + "\n");
+               System.exit(-1);
+           }
+           String content = new String(b, 0, length);
+           //System.out.println(content);
+           int index = content.indexOf('\n');
+           String inputfile = content.substring(0, index);
+           content = content.substring(index + 1);
+           index = content.indexOf('\n');
+           int m = Integer.parseInt(content.substring(0, index));
+           content = content.substring(index + 1);
+           index = content.indexOf('\n');
+           int r = Integer.parseInt(content.substring(0, index));
+           content = content.substring(index + 1);
+           index = content.indexOf('\n');
+           String temp = content.substring(0, index);
+           char seperator = temp.charAt(0);
+           //System.out.println(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r));
+           
+           Configuration conf = new Configuration();
+           conf.setMapReduceClass(mapreducer);
+           conf.setInputfile(inputfile);
+           conf.setM(m);
+           conf.setR(r);
+           conf.setSeperator(seperator);
+
+           JobClient.runJob(conf);
+       /*} catch (Exception e) {
+           e.printStackTrace();
+           System.exit(-1);
+       }*/
+       
+       return 0;
+    }
+
+
+    public static void main(String[] args) /*throws Exception*/ {
+       int res = ToolRunner.run(new WordCounter(), args);
+       System.exit(res);
+    }
+
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java b/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java
new file mode 100644 (file)
index 0000000..b62e76b
--- /dev/null
@@ -0,0 +1,140 @@
+task startup(StartupObject s{initialstate}) {
+    // read in configuration parameters
+    // System.printString("Top of task startup\n");
+    String path = new String("/home/jzhou/mapreduce/conf.txt");
+    FileInputStream iStream = new FileInputStream(path);
+    byte[] b = new byte[1024];
+    int length = iStream.read(b);
+    if(length < 0 ) {
+       System.printString("Error! Can not read from configure file: " + path + "\n");
+       System.exit(-1);
+    }
+    String content = new String(b, 0, length);
+    //System.printString(content + "\n");
+    int index = content.indexOf('\n');
+    String inputfile = content.subString(0, index);
+    content = content.subString(index + 1);
+    index = content.indexOf('\n');
+    int m = Integer.parseInt(content.subString(0, index));
+    content = content.subString(index + 1);
+    index = content.indexOf('\n');
+    int r = Integer.parseInt(content.subString(0, index));
+    content = content.subString(index + 1);
+    index = content.indexOf('\n');
+    String temp = content.subString(0, index);
+    char seperator = temp.charAt(0);
+    //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
+    Splitter splitter = new Splitter(inputfile, m, seperator);
+    Master master = new Master(m, r, splitter){split};
+
+    taskexit(s{!initialstate});
+}
+
+//Split the input file into M pieces
+task split(Master master{split}) {
+    System.printString("Top of task split\n");
+    master.split();
+
+    taskexit(master{!split, assignMap});
+}
+
+//Select a map worker to handle one of the pieces of input file
+task assignMap(Master master{assignMap}) {
+    System.printString("Top of task assignMap\n");
+    master.assignMap();
+
+    taskexit(master{!assignMap, mapoutput});
+}
+
+//MapWorker do 'map' function on a input file piece
+task map(MapWorker mworker{map}) {
+    System.printString("Top of task map\n");
+    mworker.map();
+
+    taskexit(mworker{!map, partition});
+}
+
+//Partition the intermediate key/value pair generated
+//into R intermediate local files
+task partition(MapWorker mworker{partition}) {
+    System.printString("Top of task partition\n");
+    mworker.partition();
+
+    taskexit(mworker{!partition, mapoutput});
+}
+
+//Register the intermediate ouput from map worker to master
+task mapOutput(Master master{mapoutput}, optional MapWorker mworker{mapoutput}) {
+    System.printString("Top of task mapOutput\n");
+    if(isavailable(mworker)) {
+       int total = master.getR();
+       for(int i = 0; i < total; ++i) {
+           String temp = mworker.outputFile(i);
+           if(temp != null) {
+               master.addInterOutput(temp); 
+           }
+       }
+       master.setMapFinish(mworker.getID());
+    } else {
+       master.setMapFail(mworker.getID());
+       master.setPartial(true);
+    }
+    if(master.isMapFinish()) {
+       taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
+    }
+
+    taskexit(mworker{!mapoutput});
+}
+
+//Assign the list of intermediate output associated to one key to
+//a reduce worker 
+task assignReduce(Master master{assignReduce}) {
+    System.printString("Top of task assignReduce\n");
+    master.assignReduce();
+
+    taskexit(master{!assignReduce, reduceoutput});
+}
+
+//First do sort and group on the intermediate key/value pairs assigned
+//to reduce worker
+task sortgroup(ReduceWorker rworker{sortgroup}) {
+    System.printString("Top of task sortgroup\n");
+    rworker.sortgroup();
+
+    taskexit(rworker{!sortgroup, reduce});
+}
+
+//Do 'reduce' function
+task reduce(ReduceWorker rworker{reduce}) {
+    System.printString("Top of task reduce\n");
+    rworker.reduce();
+
+    taskexit(rworker{!reduce, reduceoutput});
+}
+
+//Collect the output into master
+task reduceOutput(Master master{reduceoutput}, optional ReduceWorker rworker{reduceoutput}) {
+    System.printString("Top of task reduceOutput\n");
+    if(isavailable(rworker)) {
+       master.collectROutput(rworker.getOutputFile());
+       master.setReduceFinish(rworker.getID());
+    } else {
+       master.setReduceFail(rworker.getID());
+       master.setPartial(true);
+    }
+    if(master.isReduceFinish()) {
+       //System.printString("reduce finish\n");
+       taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
+    }
+
+    taskexit(rworker{!reduceoutput});
+}
+
+task output(Master master{output}) {
+    System.printString("Top of task output\n");
+    if(master.isPartial()) {
+       System.printString("Partial! The result may not be right due to some failure!\n");
+    }
+    System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+    taskexit(master{!output});
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Tag/MapReduceBase.java b/Robust/src/Benchmarks/MapReduce/Tag/MapReduceBase.java
new file mode 100644 (file)
index 0000000..3ff539d
--- /dev/null
@@ -0,0 +1,55 @@
+public class MapReduceBase {
+
+    public static void map(String key, String value, OutputCollector output) {
+       int n = value.length();
+       for (int i = 0; i < n; ) {
+           // Skip past leading whitespace
+           while ((i < n) && isspace(value.charAt(i))) {
+               ++i;
+           }
+
+           // Find word end
+           int start = i;
+           while ((i < n) && !isspace(value.charAt(i))) {
+               i++;
+           }
+
+           if (start < i) {
+               output.emit(value.subString(start, i), "1");
+               //System.printString(value.subString(start,i) + "\n");
+           }
+       }
+    }
+
+    public static void reduce(String key, Vector values, OutputCollector output) {
+       // Iterate over all entries with the
+       // // same key and add the values
+       int value = 0;
+       for(int i = 0; i < values.size(); ++i) {
+           value += Integer.parseInt((String)values.elementAt(i));
+       }
+
+       // Emit sum for input->key()
+       output.emit(key, String.valueOf(value));
+    }
+
+    static boolean isspace(char c) {
+       if((c == ' ') || 
+               (c == ',') ||
+               (c == '.') ||
+               (c == '!') ||
+               (c == '?') ||
+               (c == '"') ||
+               (c == '(') ||
+               (c == ')') ||
+               (c == '[') ||
+               (c == ']') ||
+               (c == '{') ||
+               (c == '}') ||
+               (c == '\n')) {
+           return true;
+       }
+       return false;
+    }
+}
+
diff --git a/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java b/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java
new file mode 100644 (file)
index 0000000..0563556
--- /dev/null
@@ -0,0 +1,102 @@
+public class MapWorker {
+    flag map;
+    flag partition;
+    flag mapoutput;
+
+    int ID;
+
+    int r;
+    String key;
+    String value;
+    OutputCollector output;
+
+    String[] locations;
+    FileOutputStream[] outputs;
+
+    public MapWorker(String key, String value, int r, int id) {
+       this.ID = id;
+       this.r = r;
+
+       this.key = key;
+       this.value = value;
+       this.output = new OutputCollector();
+
+       locations = new String[r];
+       for(int i = 0; i < r; ++i) {
+           StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-");
+           temp.append(String.valueOf(ID));
+           temp.append("-of-");
+           temp.append(String.valueOf(r));
+           temp.append("_");
+           temp.append(String.valueOf(i));
+           temp.append(".dat");
+           locations[i] = new String(temp);
+       }
+
+       outputs = new FileOutputStream[r];
+       for(int i = 0; i < r; ++i) {
+           outputs[i] = null;
+       }
+    }
+
+    public void map() {
+       /*if(ID % 2 == 1) {
+               String temp = locations[locations.length];
+       }*/
+
+       MapReduceBase.map(key, value, output);
+    }
+
+    public void partition() {
+       /*if(ID % 2 == 1) {
+               String temp = locations[locations.length];
+       }*/
+
+       int size = this.output.size();
+       for(int i = 0; i < size; ++i) {
+           String key = this.output.getKey(i);
+           String value = this.output.getValue(i);
+           // use the hashcode of key to decide which intermediate output
+           // this pair should be in
+           int index = (int)Math.abs(key.hashCode()) % this.r;
+           FileOutputStream oStream = outputs[index];
+           if(oStream == null) {
+               // open the file
+               String filepath = locations[index];
+               oStream = new FileOutputStream(filepath, true); // append
+               outputs[index] = oStream;
+           }
+           // format: key value\n
+           oStream.write(key.getBytes());
+           oStream.write(' ');
+           oStream.write(value.getBytes());
+           oStream.write('\n');
+           oStream.flush();
+       }
+
+       // close the output files
+       for(int i = 0; i < this.outputs.length; ++i) {
+           FileOutputStream temp = this.outputs[i];
+           if(temp != null) {
+               temp.close();
+           }
+       }
+    }
+
+    public String outputFile(int i) {
+       if(outputs[i] != null) {
+           return locations[i];
+       } else {
+           return null;
+       }
+    }
+
+    public int getID() {
+       return this.ID;
+    }
+
+    public int getR() {
+       return this.r;
+    }
+
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Tag/Master.java b/Robust/src/Benchmarks/MapReduce/Tag/Master.java
new file mode 100644 (file)
index 0000000..49bb68f
--- /dev/null
@@ -0,0 +1,149 @@
+public class Master {
+    flag split;
+    flag assignMap;
+    flag mapoutput;
+    flag mapfinished;
+    flag assignReduce;
+    flag reduceoutput;
+    flag reducefinished;
+    flag output;
+
+    int m;
+    int r;
+    int[] mworkerStates; // array of map worker's state
+    // 0: idle  1: process  2: finished 3: fail
+    int[] rworkerStates; // array of reduce worker's state
+    Vector[] interoutputs; // array of string vector containing
+    // paths of intermediate outputs from
+    // map worker
+
+    Splitter splitter;
+
+    String outputfile;  // path of final output file
+
+    boolean partial;
+
+    public Master(int m, int r, Splitter splitter) {
+       this.m = m;
+       this.r = r;
+
+       mworkerStates = new int[m];
+       rworkerStates = new int[r];
+       for(int i = 0; i < m; ++i) {
+           mworkerStates[i] = 0;
+       }
+       for(int i = 0; i < r; ++i) {
+           rworkerStates[i] = 0;
+       }
+
+       interoutputs = new Vector[r];
+       for(int i = 0; i < r; ++i) {
+           interoutputs[i] = null;
+       }
+
+       this.splitter = splitter;
+       this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
+
+       this.partial = false;
+    }
+
+    public int getR() {
+       return this.r;
+    }
+
+    public String getOutputFile() {
+       return this.outputfile;
+    }
+
+    public boolean isPartial() {
+       return this.partial;
+    }
+
+    public void setPartial(boolean partial) {
+       this.partial = partial || this.partial;
+    }
+
+    public void split() {
+       splitter.split();
+    }
+
+    public void assignMap() {
+       String[] contentsplits = splitter.getSlices();
+       for(int i = 0; i < contentsplits.length; ++i) {
+           //System.printString("*************************\n");
+           //System.printString(contentsplits[i] + "\n");
+           //System.printString("*************************\n");
+           MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
+           mworkerStates[i] = 1;
+       }
+    }
+
+    public void setMapFinish(int i) {
+       mworkerStates[i] = 2;
+    }
+
+    public void setMapFail(int i) {
+       mworkerStates[i] = 3;
+    }
+
+    public boolean isMapFinish() {
+       for(int i = 0; i < mworkerStates.length; ++i) {
+           if(mworkerStates[i] == 1) {
+               return false;
+           }
+       }
+
+       return true;
+    }
+
+    public void addInterOutput(String interoutput) {
+       int start = interoutput.indexOf('_');
+       int end = interoutput.indexOf('.');
+       int index = Integer.parseInt(interoutput.subString(start + 1, end));
+       //System.printString(interoutput.subString(start + 1, end) + "\n");
+       if(interoutputs[index] == null) {
+           interoutputs[index] = new Vector();
+       }
+       interoutputs[index].addElement(interoutput);
+    }
+
+    public void assignReduce() {
+       for(int i = 0; i < interoutputs.length; ++i) {
+           ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
+           rworkerStates[i] = 1;
+       }
+    }
+
+    public void setReduceFinish(int i) {
+       rworkerStates[i] = 2;
+    }
+
+    public void setReduceFail(int i) {
+       rworkerStates[i] = 3;
+    }
+
+    public boolean isReduceFinish() {
+       for(int i = 0; i < rworkerStates.length; ++i) {
+           if(rworkerStates[i] == 1) {
+               return false;
+           }
+       }
+
+       return true;
+    }
+
+    public void collectROutput(String file) {
+       FileInputStream iStream = new FileInputStream(file);
+       FileOutputStream oStream = new FileOutputStream(outputfile, true);
+       byte[] b = new byte[1024 * 100];
+       int length = iStream.read(b);
+       if(length < 0) {
+           System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
+           System.exit(-1);
+       }
+       //System.printString(new String(b, 0, length) + "\n");
+       oStream.write(b, 0, length);
+       iStream.close();
+       oStream.close();
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Tag/OutputCollector.java b/Robust/src/Benchmarks/MapReduce/Tag/OutputCollector.java
new file mode 100644 (file)
index 0000000..3c6d852
--- /dev/null
@@ -0,0 +1,27 @@
+public class OutputCollector {
+
+    Vector keys;
+    Vector values;
+
+    public OutputCollector() {
+       this.keys = new Vector();
+       this.values = new Vector();
+    }
+
+    public void emit(String key, String value) {
+       this.keys.addElement(key);
+       this.values.addElement(value);
+    }
+
+    public int size() {
+       return this.keys.size();
+    }
+
+    public String getKey(int i) {
+       return (String)this.keys.elementAt(i);
+    }
+
+    public String getValue(int i) {
+       return (String)this.values.elementAt(i);
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java b/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java
new file mode 100644 (file)
index 0000000..d4787a4
--- /dev/null
@@ -0,0 +1,138 @@
+public class ReduceWorker {
+    flag sortgroup;
+    flag reduce;
+    flag reduceoutput;
+
+    int ID;
+    Vector interoutputs;  // string vector containing paths
+    // of intermediate outputs from map worker
+    Vector keys;
+    HashMap values; // hashmap map key to vector of string vector
+    int[] sorts; // array record the sort of keys
+    OutputCollector output;
+    String outputfile;  // path of the intermediate output file
+
+    public ReduceWorker(Vector interoutputs, int id) {
+       this.ID = id;
+       this.interoutputs = interoutputs;
+
+       this.keys = new Vector();
+       this.values = new HashMap();
+       //this.sorts = null;
+
+       this.output = new OutputCollector();
+       this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+    }
+
+    public void sortgroup() {
+       /*if(ID % 2 == 1) {
+               int a[] = new int[1];
+               int temp = a[1];
+       }*/
+
+       // group values associated to the same key
+       //System.printString("================================\n");
+       if(interoutputs == null) {
+           return;
+       }
+       for(int i = 0; i < interoutputs.size(); ++i) {
+           FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
+           byte[] b = new byte[1024 * 100];
+           int length = iStream.read(b);
+           if(length < 0) {
+               System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
+               System.exit(-1);
+           }
+           String content = new String(b, 0, length);
+           //System.printString(content + "\n");
+           int index = content.indexOf('\n');
+           while(index != -1) {
+               String line = content.subString(0, index);
+               content = content.subString(index + 1);
+               //System.printString(line + "\n");
+               int tmpindex = line.indexOf(' ');
+               String key = line.subString(0, tmpindex);
+               String value = line.subString(tmpindex + 1);
+               //System.printString(key + "; " + value + "\n");
+               if(!this.values.containsKey(key)) {
+                   this.values.put(key, new Vector());
+                   this.keys.addElement(key);
+               }
+               ((Vector)this.values.get(key)).addElement(value);
+               index = content.indexOf('\n');
+           }
+           iStream.close();
+       }
+       //System.printString("================================\n");
+
+       /*for(int i = 0; i < this.keys.size(); ++i) {
+                       System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+               }
+               System.printString("\n");*/
+
+       // sort all the keys inside interoutputs
+       this.sorts = new int[this.keys.size()];
+       // insert sorting
+       this.sorts[0] = 0;
+       int tosort = 1;
+       for(; tosort < this.keys.size(); ++tosort) {
+           int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
+           int index = tosort;
+           for(int i = tosort; i > 0; --i) {
+               if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
+                   this.sorts[i] = this.sorts[i-1];
+                   index = i - 1;
+               } else {
+                   //System.printString(i + "; " + tosort + "\n");
+                   index = i;
+                   i = 0;
+               }
+           }
+           this.sorts[index] = tosort;
+       }
+       /*for(int i = 0; i < this.sorts.length; ++i) {
+                       System.printString(this.sorts[i] + "; ");
+               }
+               System.printString("\n");*/
+    }
+
+    public void reduce() {
+       /*if(ID % 2 == 1) {
+               int a[] = new int[1];
+               int temp = a[1];
+       }*/
+
+       if(this.interoutputs != null) {
+          // return;
+       //}
+       for(int i = 0; i < this.sorts.length; ++i) {
+           String key = (String)this.keys.elementAt(this.sorts[i]);
+           Vector values = (Vector)this.values.get(key);
+           MapReduceBase.reduce(key, values, output);
+       }
+       }
+
+       // output all the result into some local file
+       int size = this.output.size();
+       FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
+       for(int i = 0; i < size; ++i) {
+           String key = this.output.getKey(i);
+           String value = this.output.getValue(i);
+           // format: key value\n
+           oStream.write(key.getBytes());
+           oStream.write(' ');
+           oStream.write(value.getBytes());
+           oStream.write('\n');
+           oStream.flush();
+       }
+       oStream.close();
+    }
+
+    public String getOutputFile() {
+       return this.outputfile;
+    }
+
+    public int getID() {
+       return this.ID;
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java b/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java
new file mode 100644 (file)
index 0000000..f79c8d8
--- /dev/null
@@ -0,0 +1,73 @@
+public class Splitter {
+    String filename;
+    String content;
+    int length;
+    int[] splits;
+    String[] slices;
+
+    public Splitter(String path, int splitNum, char seperator) {
+       //System.printString("Top of Splitter's constructor\n");
+       filename = path;
+       FileInputStream iStream = new FileInputStream(filename);
+       byte[] b = new byte[1024 * 10];
+       length = iStream.read(b);
+       if(length < 0) {
+           System.printString("Error! Can not read from input file: " + filename + "\n");
+           System.exit(-1);
+       }
+       content = new String(b, 0, length);
+       //System.printString(content + "\n");
+       iStream.close();
+
+       if(splitNum == 1) {
+           slices = new String[1];
+           slices[0] = content;
+       } else {
+           splits = new int[splitNum - 1];
+           int index = 0;
+           int span = length / splitNum;
+           int temp = 0;
+           for(int i = 0; i < splitNum - 1; ++i) {
+               temp += span;
+               if(temp > index) {
+                   index = temp;
+                   while((content.charAt(index) != seperator) && (index != length - 1)) {
+                       ++index;
+                   }
+               }
+               splits[i] = index;
+           }
+
+           this.slices = new String[splits.length + 1];
+           for(int i = 0; i < this.slices.length; ++i) {
+               this.slices[i] = null;
+           }
+       }
+    }
+
+    public void split() {
+       if(slices.length == 1) {
+           return;
+       }
+       int start = 0;
+       int end = 0;
+       for(int i = 0; i < splits.length; ++i) {
+           end = splits[i];
+           if(end < start) {
+               slices[i] = null;
+           } else {
+               slices[i] = content.subString(start, end);
+           }
+           start = end + 1;
+       }
+       slices[slices.length - 1] = content.subString(start);
+    }
+
+    public String getFilename() {
+       return filename;
+    }
+
+    public String[] getSlices() {
+       return this.slices;
+    }
+}
diff --git a/Robust/src/Benchmarks/PERT/Java/Estimator.java b/Robust/src/Benchmarks/PERT/Java/Estimator.java
new file mode 100644 (file)
index 0000000..38820a6
--- /dev/null
@@ -0,0 +1,108 @@
+public class Estimator {
+    int stages;
+    int time;
+    double variance;
+    double[] probtable;
+
+    boolean partial;
+
+    public Estimator(int stages) {
+       this.stages = stages;
+       this.time = 0;
+       this.variance = 0;
+
+       this.probtable = new double[31];
+       int i = 0;
+       this.probtable[i++] = 0.5000;
+       this.probtable[i++] = 0.5398;
+       this.probtable[i++] = 0.5793;
+       this.probtable[i++] = 0.6179;
+       this.probtable[i++] = 0.6554;
+       this.probtable[i++] = 0.6915;
+       this.probtable[i++] = 0.7257;
+       this.probtable[i++] = 0.7580;
+       this.probtable[i++] = 0.7881;
+       this.probtable[i++] = 0.8159;
+       this.probtable[i++] = 0.8413;
+       this.probtable[i++] = 0.8643;
+       this.probtable[i++] = 0.8849;
+       this.probtable[i++] = 0.9032;
+       this.probtable[i++] = 0.9192;
+       this.probtable[i++] = 0.9332;
+       this.probtable[i++] = 0.9452;
+       this.probtable[i++] = 0.9554;
+       this.probtable[i++] = 0.9641;
+       this.probtable[i++] = 0.9713;
+       this.probtable[i++] = 0.9772;
+       this.probtable[i++] = 0.9821;
+       this.probtable[i++] = 0.9861;
+       this.probtable[i++] = 0.9893;
+       this.probtable[i++] = 0.9918;
+       this.probtable[i++] = 0.9938;
+       this.probtable[i++] = 0.9953;
+       this.probtable[i++] = 0.9965;
+       this.probtable[i++] = 0.9974;
+       this.probtable[i++] = 0.9981;
+       this.probtable[i++] = 0.9987;
+
+       this.partial = false;
+    }
+
+    public boolean estimate(int time, double variance2, boolean fake) {
+       if(!fake) {
+           this.time += time;
+           this.variance += variance2;
+       } else {
+           this.partial = true;
+       }
+       --this.stages;
+       if(this.stages == 0) {
+           //System.out.print("variance2: " + (int)(this.variance*100) + "(/100); ");
+           this.variance = Math.sqrt(this.variance);
+           //System.out.println("variance: " + (int)(this.variance*100) + "(/100)");
+           return true;
+       }
+       return false;
+    }
+
+    public double getProbability(int x, int y) {
+       int l = x;
+       int r = y;
+       if(x > y) {
+           l = y;
+           r = x;
+       }
+
+       double prob = prob(r) - prob(l);
+       return prob;
+    }
+
+    private double prob(int s) {
+       int tmp = (int)((s - this.time) * 10 / this.variance);
+       //System.out.println(tmp);
+       int abs = (int)Math.abs(tmp);
+       double prob = 0;
+       if(abs > this.probtable.length - 1) {
+           prob = 1;
+       } else {
+           prob = this.probtable[abs];
+       }
+       if(tmp < 0) {
+           return 1.0 - prob;
+       } else {
+           return prob;
+       }
+    }
+
+    public int getTime() {
+       return this.time;
+    }
+
+    public double getVariance() {
+       return this.variance;
+    }
+
+    public boolean isPartial() {
+       return this.partial;
+    }
+}
diff --git a/Robust/src/Benchmarks/PERT/Java/PERT.java b/Robust/src/Benchmarks/PERT/Java/PERT.java
new file mode 100644 (file)
index 0000000..d002ff3
--- /dev/null
@@ -0,0 +1,104 @@
+import java.io.FileInputStream;
+
+public class PERT {
+
+    int stageNum;
+    Stage[] stages;
+    Estimator estimator;
+
+
+    public PERT() {
+       this.stageNum = -1;
+       //this.stages = null;
+       //this.estimator = null;
+    }
+
+    public Estimator getEstimator() {
+       return estimator;
+    }
+
+    public void setEstimator(Estimator estimator) {
+       this.estimator = estimator;
+    }
+
+    public void setStageNum(int stageNum) {
+        this.stageNum = stageNum;
+    }
+
+    public void createStages() {
+       this.stages = new Stage[this.stageNum];
+       for(int i = 0; i < stageNum; ++i) {
+           this.stages[i] = new Stage(i);
+       }
+    }
+
+    public void sampling() {
+       for(int i = 0; i < this.stageNum; ++i) {
+           this.stages[i].sampling();
+       }
+    }
+
+    public void estimate() {
+       for(int i = 0; i < this.stageNum; ++i) {
+           this.stages[i].estimate();
+       }
+    }
+
+    public void merge() {
+       for(int i = 0; i < this.stageNum; ++i) {
+           Stage tmp = this.stages[i];
+           this.estimator.estimate(tmp.getAntTime(), tmp.getAntVariance2(), false);
+       }
+    }
+
+    public static void main(String args[]) {
+//     try{
+           PERT pert = new PERT();
+
+           String path = new String("/home/jzhou/pert/conf.txt");
+           FileInputStream iStream = new FileInputStream(path);
+           byte[] b = new byte[1024];
+           int length = iStream.read(b);
+           if(length < 0) {
+               System./*out.println*/printString("Error! Can not read from configure file: " + path + "\n");
+               System.exit(-1);
+           }
+           iStream.close();
+           String content = new String(b, 0, length);
+           int index = content.indexOf('\n');
+           int stage = Integer.parseInt(content.substring(0, index));
+           Estimator estimator = new Estimator(stage);
+           pert.setStageNum(stage);
+           pert.setEstimator(estimator);
+           pert.createStages();
+           pert.sampling();
+           pert.estimate();
+           pert.merge();
+           path = new String("/home/jzhou/pert/prob.txt");
+           iStream = new FileInputStream(path);
+           byte c[] = new byte[1024];
+           length = iStream.read(c);
+           if(length < 0) {
+               System./*out.println*/printString("Error! Can not read from input file: " + path + "\n");
+               System.exit(-1);
+           }
+           iStream.close();
+           content = new String(c, 0, length);
+           index = content.indexOf('\n');
+           int x = Integer.parseInt(content.substring(0, index));
+           content = content.substring(index + 1);
+           index = content.indexOf('\n');
+           int y = Integer.parseInt(content.substring(0, index));
+           //System.out.println("x: " + x + "; y: " + y);
+           System./*out.println*/printString("The anticipate days need to finish this project is: " + pert.getEstimator().getTime() + "\n");
+           System./*out.println*/printString("And the anticipate variance is: " + (int)(pert.getEstimator().getVariance()*100) + "(/100)\n");
+           double prob = pert.getEstimator().getProbability(x, y);
+
+           System./*out.println*/printString("The probability of this project to be finished in " + x + " to " + y + " days is: " + (int)(prob*100) + "(/100)\n");
+/*     } catch(Exception e) {
+           e.printStackTrace();
+           System.exit(-1);
+       }*/
+    }
+
+}
diff --git a/Robust/src/Benchmarks/PERT/Java/Stage.java b/Robust/src/Benchmarks/PERT/Java/Stage.java
new file mode 100644 (file)
index 0000000..fc8ba8a
--- /dev/null
@@ -0,0 +1,80 @@
+import java.util.Random;
+
+public class Stage {
+    int ID;
+
+    int[] samplings;
+    int optime;
+    int nortime;
+    int petime;
+    int time;
+    double variance2;
+
+    public Stage(int id) {
+       this.ID = id;
+
+       this.samplings = new int[10];
+       for(int i = 0; i < this.samplings.length; ++i) {
+           this.samplings[i] = 0;
+       }
+
+       this.optime = 0;
+       this.nortime = 0;
+       this.petime = 0;
+       this.time = 0;
+       this.variance2 = 0;
+    }
+
+    public void sampling() {
+       /*if(ID % 2 == 1) {
+                       int tmp = samplings[samplings.length];
+               }*/
+
+       Random r = new Random();
+       int tint = 0;
+       for(int i = 0; i < this.samplings.length; ++i) {
+           do {
+               tint = r.nextInt()%50;
+           } while(tint <= 0);
+           this.samplings[i] = tint;
+           System./*out.print*/printString(tint + "; ");
+       }
+       System.printString("\n");//out.println();
+    }
+
+    public void estimate() {
+       /*if(ID % 2 == 1) {
+                       int tmp = samplings[samplings.length];
+               }*/
+
+       int highest = this.samplings[0];
+       int lowest = this.samplings[0];
+       int sum = this.samplings[0];
+       for(int i = 1; i < this.samplings.length; ++i) {
+           int temp = this.samplings[i];
+           if(temp > highest) {
+               highest = temp;
+           } else if(temp < lowest) {
+               lowest = temp;
+           }
+           sum += temp;
+       }
+       sum  = sum - highest - lowest;
+       int ordinary = sum / (this.samplings.length - 2);
+       this.optime = lowest;;
+       this.petime = highest;
+       this.nortime = ordinary;
+       this.time = (this.optime + 4 * this.nortime + this.petime) / 6;
+       this.variance2 = (double)(this.optime - this.petime) * (double)(this.optime - this.petime) / 36.0;
+       //System.out.println("Op time: " + this.optime + "; Nor time: " + this.nortime + "; Pe time: " + this.petime + "; variance2: " + (int)(this.variance2*100) + "(/100)");
+    }
+
+    public int getAntTime() {
+       return this.time;
+    }
+
+    public double getAntVariance2() {
+       return this.variance2;
+    }
+
+}
diff --git a/Robust/src/Benchmarks/PERT/Tag/Estimator.java b/Robust/src/Benchmarks/PERT/Tag/Estimator.java
new file mode 100644 (file)
index 0000000..7822c4e
--- /dev/null
@@ -0,0 +1,111 @@
+public class Estimator {
+    flag estimate;
+    flag prob;
+
+    int stages;
+    int time;
+    double variance;
+    double[] probtable;
+
+    boolean partial;
+
+    public Estimator(int stages) {
+       this.stages = stages;
+       this.time = 0;
+       this.variance = 0;
+
+       this.probtable = new double[31];
+       int i = 0;
+       this.probtable[i++] = 0.5000;
+       this.probtable[i++] = 0.5398;
+       this.probtable[i++] = 0.5793;
+       this.probtable[i++] = 0.6179;
+       this.probtable[i++] = 0.6554;
+       this.probtable[i++] = 0.6915;
+       this.probtable[i++] = 0.7257;
+       this.probtable[i++] = 0.7580;
+       this.probtable[i++] = 0.7881;
+       this.probtable[i++] = 0.8159;
+       this.probtable[i++] = 0.8413;
+       this.probtable[i++] = 0.8643;
+       this.probtable[i++] = 0.8849;
+       this.probtable[i++] = 0.9032;
+       this.probtable[i++] = 0.9192;
+       this.probtable[i++] = 0.9332;
+       this.probtable[i++] = 0.9452;
+       this.probtable[i++] = 0.9554;
+       this.probtable[i++] = 0.9641;
+       this.probtable[i++] = 0.9713;
+       this.probtable[i++] = 0.9772;
+       this.probtable[i++] = 0.9821;
+       this.probtable[i++] = 0.9861;
+       this.probtable[i++] = 0.9893;
+       this.probtable[i++] = 0.9918;
+       this.probtable[i++] = 0.9938;
+       this.probtable[i++] = 0.9953;
+       this.probtable[i++] = 0.9965;
+       this.probtable[i++] = 0.9974;
+       this.probtable[i++] = 0.9981;
+       this.probtable[i++] = 0.9987;
+
+       this.partial = false;
+    }
+
+    public boolean estimate(int time, double variance2, boolean fake) {
+       if(!fake) {
+           this.time += time;
+           this.variance += variance2;
+       } else {
+           this.partial = true;
+       }
+       --this.stages;
+       if(this.stages == 0) {
+           //System.printString("variance2: " + (int)(this.variance*100) + "(/100); ");
+           this.variance = Math.sqrt(this.variance);
+           //System.printString("variance: " + (int)(this.variance*100) + "(/100)\n");
+           return true;
+       }
+       return false;
+    }
+
+    public double getProbability(int x, int y) {
+       int l = x;
+       int r = y;
+       if(x > y) {
+           l = y;
+           r = x;
+       }
+
+       double prob = prob(r) - prob(l);
+       return prob;
+    }
+
+    private double prob(int s) {
+       int tmp = (int)((s - this.time) * 10 / this.variance);
+       //System.printString(tmp + "\n");
+       int abs = (int)Math.abs(tmp);
+       double prob = 0;
+       if(abs > this.probtable.length - 1) {
+           prob = 1;
+       } else {
+           prob = this.probtable[abs];
+       }
+       if(tmp < 0) {
+           return 1.0 - prob;
+       } else {
+           return prob;
+       }
+    }
+
+    public int getTime() {
+       return this.time;
+    }
+
+    public double getVariance() {
+       return this.variance;
+    }
+
+    public boolean isPartial() {
+       return this.partial;
+    }
+}
diff --git a/Robust/src/Benchmarks/PERT/Tag/PERT.java b/Robust/src/Benchmarks/PERT/Tag/PERT.java
new file mode 100644 (file)
index 0000000..951312a
--- /dev/null
@@ -0,0 +1,86 @@
+task startup(StartupObject s{initialstate}) {
+
+    // read in configuration parameters
+    //System.printString("Top of task startup\n");
+    String path = new String("/home/jzhou/pert/conf.txt");
+    FileInputStream iStream = new FileInputStream(path);
+    byte[] b = new byte[1024];
+    int length = iStream.read(b);
+    if(length < 0) {
+       System.printString("Error! Can not read from configure file: " + path + "\n");
+       System.exit(-1);
+    }
+    iStream.close();
+    String content = new String(b, 0, length);
+    int index = content.indexOf('\n');
+    int stages = Integer.parseInt(content.subString(0, index));
+    Estimator estimator = new Estimator(stages){estimate};
+    for(int i = 0; i < stages; ++i) {
+       Stage stage = new Stage(i){sampling};
+    }
+
+    taskexit(s{!initialstate});
+}
+
+task sampling(Stage s{sampling}) {
+    //System.printString("Top of task sampling\n");
+
+    s.sampling();
+
+    taskexit(s{!sampling, estimate});
+}
+
+task estimateStage(Stage s{estimate}) {
+    //System.printString("Top of task estimateStage\n");
+
+    s.estimate();
+
+    taskexit(s{!estimate, merge});
+}
+
+task estimate(Estimator e{estimate}, optional Stage s{merge}) {
+    //System.printString("Top of task estimate\n");
+
+    boolean fake = false;
+    if(!isavailable(s)) {
+       fake = true;
+    }
+    boolean finish = e.estimate(s.getAntTime(), s.getAntVariance2(), fake);
+
+    if(finish) {
+       taskexit(e{!estimate, prob}, s{!merge});
+    } else {
+       taskexit(s{!merge});
+    }
+}
+
+task prob(Estimator e{prob}) {
+    //System.printString("Top of task prob\n");
+
+    if(e.isPartial()) {
+       System.printString("There are some sampling data unavailable. The anticipate probability may be greater than it should be!\n");
+    }
+
+    String path = new String("/home/jzhou/pert/prob.txt");
+    FileInputStream iStream = new FileInputStream(path);
+    byte b[] = new byte[1024];
+    int length = iStream.read(b);
+    if(length < 0) {
+       System.printString("Error! Can not read from input file: " + path + "\n");
+       System.exit(-1);
+    }
+    iStream.close();
+    String content = new String(b, 0, length);
+    int index = content.indexOf('\n');
+    int x = Integer.parseInt(content.subString(0, index));
+    content = content.subString(index + 1);
+    index = content.indexOf('\n');
+    int y = Integer.parseInt(content.subString(0, index));
+    //System.printString("x: " + x + "; y: " + y + "\n");
+    System.printString("The anticipate days need to finish this project is: " + e.getTime() + "\n");
+    System.printString("And the anticipate variance is: " + (int)(e.getVariance()*100) + "(/100)\n");
+    double prob = e.getProbability(x, y);
+
+    System.printString("The probability of this project to be finished in " + x + " to " + y + " days is: " + (int)(prob*100) + "%\n");
+    taskexit(e{!prob});
+}
diff --git a/Robust/src/Benchmarks/PERT/Tag/Stage.java b/Robust/src/Benchmarks/PERT/Tag/Stage.java
new file mode 100644 (file)
index 0000000..a2f9b78
--- /dev/null
@@ -0,0 +1,82 @@
+public class Stage {
+    flag sampling;
+    flag estimate;
+    flag merge;
+
+    int ID;
+
+    int[] samplings;
+    int optime;
+    int nortime;
+    int petime;
+    int time;
+    double variance2;
+
+    public Stage(int id) {
+       this.ID = id;
+
+       this.samplings = new int[10];
+       for(int i = 0; i < this.samplings.length; ++i) {
+           this.samplings[i] = 0;
+       }
+
+       this.optime = 0;
+       this.nortime = 0;
+       this.petime = 0;
+       this.time = 0;
+       this.variance2 = 0;
+    }
+
+    public void sampling() {
+       /*if(ID % 2 == 1) {
+                       int tmp = samplings[samplings.length];
+               }*/
+
+       Random r = new Random();
+       int tint = 0;
+       for(int i = 0; i < this.samplings.length; ++i) {
+           do {
+               tint = r.nextInt()%50;
+           } while(tint <= 0);
+           this.samplings[i] = tint;
+           //System.printString(tint + "; ");
+       }
+       //System.printString("\n");
+    }
+
+    public void estimate() {
+       /*if(ID % 2 == 1) {
+                       int tmp = samplings[samplings.length];
+               }*/
+
+       int highest = this.samplings[0];
+       int lowest = this.samplings[0];
+       int sum = this.samplings[0];
+       for(int i = 1; i < this.samplings.length; ++i) {
+           int temp = this.samplings[i];
+           if(temp > highest) {
+               highest = temp;
+           } else if(temp < lowest) {
+               lowest = temp;
+           }
+           sum += temp;
+       }
+       sum  = sum - highest - lowest;
+       int ordinary = sum / (this.samplings.length - 2);
+       this.optime = lowest;;
+       this.petime = highest;
+       this.nortime = ordinary;
+       this.time = (this.optime + 4 * this.nortime + this.petime) / 6;
+       this.variance2 = (double)(this.optime - this.petime) * (double)(this.optime - this.petime) / 36.0;
+       //System.printString("Op time: " + this.optime + "; Nor time: " + this.nortime + "; Pe time: " + this.petime + "; variance2: " + (int)(this.variance2*100) + "(/100)\n");
+    }
+
+    public int getAntTime() {
+       return this.time;
+    }
+
+    public double getAntVariance2() {
+       return this.variance2;
+    }
+
+}