Make the java version and two Bristlecone versions be able to run in parellel
authorjzhou <jzhou>
Tue, 18 Mar 2008 21:05:21 +0000 (21:05 +0000)
committerjzhou <jzhou>
Tue, 18 Mar 2008 21:05:21 +0000 (21:05 +0000)
17 files changed:
Robust/src/Benchmarks/MapReduce/Java/MapWorker.java
Robust/src/Benchmarks/MapReduce/Java/Master.java
Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java
Robust/src/Benchmarks/MapReduce/Java/Splitter.java
Robust/src/Benchmarks/MapReduce/Java/WordCounter.java
Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Nor/MapReduceBase.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Nor/Master.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Nor/OutputCollector.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Nor/Splitter.java [new file with mode: 0644]
Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java
Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java
Robust/src/Benchmarks/MapReduce/Tag/Master.java
Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java
Robust/src/Benchmarks/MapReduce/Tag/Splitter.java

index c31b575f796a4ef276ec5d336e3d8b1cdf6e38a7..f5b591179157034cfd5363b96c7fc1099cb7c2b3 100644 (file)
@@ -26,7 +26,7 @@ public class MapWorker {
 
        locations = new String[r];
        for(int i = 0; i < r; ++i) {
-           StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-");
+           StringBuffer temp = new StringBuffer("/scratch/mapreduce_java/output-intermediate-map-");
            temp.append(String.valueOf(ID));
            temp.append("-of-");
            temp.append(String.valueOf(r));
index 92ce404d22240a0632285cf04f038904b6576daa..39ac5cc401dbe6e5bba47782d3b25c6c35f9fe51 100644 (file)
@@ -40,7 +40,7 @@ public class Master {
        }
 
        this.splitter = splitter;
-       this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
+       this.outputfile = new String("/scratch/mapreduce_java/output.dat");
 
        this.partial = false;
     }
@@ -98,9 +98,9 @@ public class Master {
     }
 
     public void addInterOutput(String interoutput) {
-       int start = interoutput.indexOf('_');
+       int start = interoutput.lastindexOf('_');
        int end = interoutput.indexOf('.');
-       int index = Integer.parseInt(interoutput.substring(start + 1, end));
+       int index = Integer.parseInt(interoutput.subString(start + 1, end));
        //System.printString(interoutput.subString(start + 1, end) + "\n");
        if(interoutputs[index] == null) {
            interoutputs[index] = new Vector();
@@ -140,7 +140,7 @@ public class Master {
        //try{
            FileInputStream iStream = new FileInputStream(file);
            FileOutputStream oStream = new FileOutputStream(outputfile, true);
-           byte[] b = new byte[1024 * 100];
+           byte[] b = new byte[1024 * 10];
            int length = iStream.read(b);
            if(length < 0) {
                System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
index b18ce2091d319405ded9786b33ca883c4a00848b..4559882b6878f9dbcf8613428fb56eba8de1fe4f 100644 (file)
@@ -32,7 +32,7 @@ public class ReduceWorker {
        //this.sorts = null;
 
        this.output = new OutputCollector();
-       this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+       this.outputfile = "/scratch/mapreduce_java/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
     }
 
     public MapReduceBase getMapreducer() {
@@ -57,7 +57,7 @@ public class ReduceWorker {
        //try{
            for(int i = 0; i < interoutputs.size(); ++i) {
                FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
-               byte[] b = new byte[1024 * 100];
+               byte[] b = new byte[1024 * 10];
                int length = iStream.read(b);
                if(length < 0) {
                    System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
@@ -126,14 +126,15 @@ public class ReduceWorker {
            int temp = a[1];
        }*/
        
-       if(this.interoutputs == null) {
-           return;
-       }
+       if(this.interoutputs != null) {
+           //return;
+       //}
        for(int i = 0; i < this.sorts.length; ++i) {
            String key = (String)this.keys.elementAt(this.sorts[i]);
            Vector values = (Vector)this.values.get(key);
            this.mapreducer.reduce(key, values, output);
        }
+       }
 
        //try{
            // output all the result into some local file
index 342328b6ce37a6e17e4e444d6083a8025395315e..53cae4dfef5d2d283c72bc6d3500ec58a3a12eca 100644 (file)
@@ -15,7 +15,7 @@ public class Splitter {
            //System.printString("Top of Splitter's constructor\n");
            filename = path;
            FileInputStream iStream = new FileInputStream(filename);
-           byte[] b = new byte[1024 * 10];
+           byte[] b = new byte[1024 * 1024];
            length = iStream.read(b);
            if(length < 0) {
                System./*out.println*/printString("Error! Can not read from input file: " + filename + "\n");
index b8d961c923a1bc2bbf9fa100b50443257a3a565b..c67a291b4d45107e1c9a55196ecc4b494009df16 100644 (file)
@@ -52,15 +52,22 @@ import mapreduce.ToolRunner;*/
        }
 
        boolean isspace(char c) {
-           if((c == ' ') || 
-                   (c == '.') ||
-                   (c == '!') ||
-                   (c == '?') ||
-                   (c == '"') ||
-                   (c == '\n')) {
-               return true;
-           }
-           return false;
+       if((c == ' ') || 
+               (c == ',') ||
+               (c == '.') ||
+               (c == '!') ||
+               (c == '?') ||
+               (c == '"') ||
+               (c == '(') ||
+               (c == ')') ||
+               (c == '[') ||
+               (c == ']') ||
+               (c == '{') ||
+               (c == '}') ||
+               (c == '\n')) {
+           return true;
+       }
+       return false;
        }
     }
 
diff --git a/Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java b/Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java
new file mode 100644 (file)
index 0000000..cfef936
--- /dev/null
@@ -0,0 +1,140 @@
+task startup(StartupObject s{initialstate}) {
+    // read in configuration parameters
+    // System.printString("Top of task startup\n");
+    String path = new String("/scratch/mapreduce_nor/conf.txt");
+    FileInputStream iStream = new FileInputStream(path);
+    byte[] b = new byte[1024];
+    int length = iStream.read(b);
+    if(length < 0 ) {
+       System.printString("Error! Can not read from configure file: " + path + "\n");
+       System.exit(-1);
+    }
+    String content = new String(b, 0, length);
+    //System.printString(content + "\n");
+    int index = content.indexOf('\n');
+    String inputfile = content.subString(0, index);
+    content = content.subString(index + 1);
+    index = content.indexOf('\n');
+    int m = Integer.parseInt(content.subString(0, index));
+    content = content.subString(index + 1);
+    index = content.indexOf('\n');
+    int r = Integer.parseInt(content.subString(0, index));
+    content = content.subString(index + 1);
+    index = content.indexOf('\n');
+    String temp = content.subString(0, index);
+    char seperator = temp.charAt(0);
+    //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
+    Splitter splitter = new Splitter(inputfile, m, seperator);
+    Master master = new Master(m, r, splitter){split};
+
+    taskexit(s{!initialstate});
+}
+
+//Split the input file into M pieces
+task split(Master master{split}) {
+    System.printString("Top of task split\n");
+    master.split();
+
+    taskexit(master{!split, assignMap});
+}
+
+//Select a map worker to handle one of the pieces of input file
+task assignMap(Master master{assignMap}) {
+    System.printString("Top of task assignMap\n");
+    master.assignMap();
+
+    taskexit(master{!assignMap, mapoutput});
+}
+
+//MapWorker do 'map' function on a input file piece
+task map(MapWorker mworker{map}) {
+    System.printString("Top of task map\n");
+    mworker.map();
+
+    taskexit(mworker{!map, partition});
+}
+
+//Partition the intermediate key/value pair generated
+//into R intermediate local files
+task partition(MapWorker mworker{partition}) {
+    System.printString("Top of task partition\n");
+    mworker.partition();
+
+    taskexit(mworker{!partition, mapoutput});
+}
+
+//Register the intermediate ouput from map worker to master
+task mapOutput(Master master{mapoutput}, /*optional*/ MapWorker mworker{mapoutput}) {
+    System.printString("Top of task mapOutput\n");
+    //if(isavailable(mworker)) {
+       int total = master.getR();
+       for(int i = 0; i < total; ++i) {
+           String temp = mworker.outputFile(i);
+           if(temp != null) {
+               master.addInterOutput(temp); 
+           }
+       }
+       master.setMapFinish(mworker.getID());
+    /*} else {
+       master.setMapFail(mworker.getID());
+       master.setPartial(true);
+    }*/
+    if(master.isMapFinish()) {
+       taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
+    }
+
+    taskexit(mworker{!mapoutput});
+}
+
+//Assign the list of intermediate output associated to one key to
+//a reduce worker 
+task assignReduce(Master master{assignReduce}) {
+    System.printString("Top of task assignReduce\n");
+    master.assignReduce();
+
+    taskexit(master{!assignReduce, reduceoutput});
+}
+
+//First do sort and group on the intermediate key/value pairs assigned
+//to reduce worker
+task sortgroup(ReduceWorker rworker{sortgroup}) {
+    System.printString("Top of task sortgroup\n");
+    rworker.sortgroup();
+
+    taskexit(rworker{!sortgroup, reduce});
+}
+
+//Do 'reduce' function
+task reduce(ReduceWorker rworker{reduce}) {
+    System.printString("Top of task reduce\n");
+    rworker.reduce();
+
+    taskexit(rworker{!reduce, reduceoutput});
+}
+
+//Collect the output into master
+task reduceOutput(Master master{reduceoutput}, /*optional*/ ReduceWorker rworker{reduceoutput}) {
+    System.printString("Top of task reduceOutput\n");
+    //if(isavailable(rworker)) {
+       master.collectROutput(rworker.getOutputFile());
+       master.setReduceFinish(rworker.getID());
+   /* } else {
+       master.setReduceFail(rworker.getID());
+       master.setPartial(true);
+    }*/
+    if(master.isReduceFinish()) {
+       //System.printString("reduce finish\n");
+       taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
+    }
+
+    taskexit(rworker{!reduceoutput});
+}
+
+task output(Master master{output}) {
+    System.printString("Top of task output\n");
+    if(master.isPartial()) {
+       System.printString("Partial! The result may not be right due to some failure!\n");
+    }
+    System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+    taskexit(master{!output});
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Nor/MapReduceBase.java b/Robust/src/Benchmarks/MapReduce/Nor/MapReduceBase.java
new file mode 100644 (file)
index 0000000..3ff539d
--- /dev/null
@@ -0,0 +1,55 @@
+public class MapReduceBase {
+
+    public static void map(String key, String value, OutputCollector output) {
+       int n = value.length();
+       for (int i = 0; i < n; ) {
+           // Skip past leading whitespace
+           while ((i < n) && isspace(value.charAt(i))) {
+               ++i;
+           }
+
+           // Find word end
+           int start = i;
+           while ((i < n) && !isspace(value.charAt(i))) {
+               i++;
+           }
+
+           if (start < i) {
+               output.emit(value.subString(start, i), "1");
+               //System.printString(value.subString(start,i) + "\n");
+           }
+       }
+    }
+
+    public static void reduce(String key, Vector values, OutputCollector output) {
+       // Iterate over all entries with the
+       // // same key and add the values
+       int value = 0;
+       for(int i = 0; i < values.size(); ++i) {
+           value += Integer.parseInt((String)values.elementAt(i));
+       }
+
+       // Emit sum for input->key()
+       output.emit(key, String.valueOf(value));
+    }
+
+    static boolean isspace(char c) {
+       if((c == ' ') || 
+               (c == ',') ||
+               (c == '.') ||
+               (c == '!') ||
+               (c == '?') ||
+               (c == '"') ||
+               (c == '(') ||
+               (c == ')') ||
+               (c == '[') ||
+               (c == ']') ||
+               (c == '{') ||
+               (c == '}') ||
+               (c == '\n')) {
+           return true;
+       }
+       return false;
+    }
+}
+
diff --git a/Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java b/Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java
new file mode 100644 (file)
index 0000000..0ab23fa
--- /dev/null
@@ -0,0 +1,102 @@
+public class MapWorker {
+    flag map;
+    flag partition;
+    flag mapoutput;
+
+    int ID;
+
+    int r;
+    String key;
+    String value;
+    OutputCollector output;
+
+    String[] locations;
+    FileOutputStream[] outputs;
+
+    public MapWorker(String key, String value, int r, int id) {
+       this.ID = id;
+       this.r = r;
+
+       this.key = key;
+       this.value = value;
+       this.output = new OutputCollector();
+
+       locations = new String[r];
+       for(int i = 0; i < r; ++i) {
+           StringBuffer temp = new StringBuffer("/scratch/mapreduce_nor/output-intermediate-map-");
+           temp.append(String.valueOf(ID));
+           temp.append("-of-");
+           temp.append(String.valueOf(r));
+           temp.append("_");
+           temp.append(String.valueOf(i));
+           temp.append(".dat");
+           locations[i] = new String(temp);
+       }
+
+       outputs = new FileOutputStream[r];
+       for(int i = 0; i < r; ++i) {
+           outputs[i] = null;
+       }
+    }
+
+    public void map() {
+       /*if(ID % 2 == 1) {
+               String temp = locations[locations.length];
+       }*/
+
+       MapReduceBase.map(key, value, output);
+    }
+
+    public void partition() {
+       /*if(ID % 2 == 1) {
+               String temp = locations[locations.length];
+       }*/
+
+       int size = this.output.size();
+       for(int i = 0; i < size; ++i) {
+           String key = this.output.getKey(i);
+           String value = this.output.getValue(i);
+           // use the hashcode of key to decide which intermediate output
+           // this pair should be in
+           int index = (int)Math.abs(key.hashCode()) % this.r;
+           FileOutputStream oStream = outputs[index];
+           if(oStream == null) {
+               // open the file
+               String filepath = locations[index];
+               oStream = new FileOutputStream(filepath, true); // append
+               outputs[index] = oStream;
+           }
+           // format: key value\n
+           oStream.write(key.getBytes());
+           oStream.write(' ');
+           oStream.write(value.getBytes());
+           oStream.write('\n');
+           oStream.flush();
+       }
+
+       // close the output files
+       for(int i = 0; i < this.outputs.length; ++i) {
+           FileOutputStream temp = this.outputs[i];
+           if(temp != null) {
+               temp.close();
+           }
+       }
+    }
+
+    public String outputFile(int i) {
+       if(outputs[i] != null) {
+           return locations[i];
+       } else {
+           return null;
+       }
+    }
+
+    public int getID() {
+       return this.ID;
+    }
+
+    public int getR() {
+       return this.r;
+    }
+
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Nor/Master.java b/Robust/src/Benchmarks/MapReduce/Nor/Master.java
new file mode 100644 (file)
index 0000000..4e21b9c
--- /dev/null
@@ -0,0 +1,149 @@
+public class Master {
+    flag split;
+    flag assignMap;
+    flag mapoutput;
+    flag mapfinished;
+    flag assignReduce;
+    flag reduceoutput;
+    flag reducefinished;
+    flag output;
+
+    int m;
+    int r;
+    int[] mworkerStates; // array of map worker's state
+    // 0: idle  1: process  2: finished 3: fail
+    int[] rworkerStates; // array of reduce worker's state
+    Vector[] interoutputs; // array of string vector containing
+    // paths of intermediate outputs from
+    // map worker
+
+    Splitter splitter;
+
+    String outputfile;  // path of final output file
+
+    boolean partial;
+
+    public Master(int m, int r, Splitter splitter) {
+       this.m = m;
+       this.r = r;
+
+       mworkerStates = new int[m];
+       rworkerStates = new int[r];
+       for(int i = 0; i < m; ++i) {
+           mworkerStates[i] = 0;
+       }
+       for(int i = 0; i < r; ++i) {
+           rworkerStates[i] = 0;
+       }
+
+       interoutputs = new Vector[r];
+       for(int i = 0; i < r; ++i) {
+           interoutputs[i] = null;
+       }
+
+       this.splitter = splitter;
+       this.outputfile = new String("/scratch/mapreduce_nor/output.dat");
+
+       this.partial = false;
+    }
+
+    public int getR() {
+       return this.r;
+    }
+
+    public String getOutputFile() {
+       return this.outputfile;
+    }
+
+    public boolean isPartial() {
+       return this.partial;
+    }
+
+    public void setPartial(boolean partial) {
+       this.partial = partial || this.partial;
+    }
+
+    public void split() {
+       splitter.split();
+    }
+
+    public void assignMap() {
+       String[] contentsplits = splitter.getSlices();
+       for(int i = 0; i < contentsplits.length; ++i) {
+           //System.printString("*************************\n");
+           //System.printString(contentsplits[i] + "\n");
+           //System.printString("*************************\n");
+           MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
+           mworkerStates[i] = 1;
+       }
+    }
+
+    public void setMapFinish(int i) {
+       mworkerStates[i] = 2;
+    }
+
+    public void setMapFail(int i) {
+       mworkerStates[i] = 3;
+    }
+
+    public boolean isMapFinish() {
+       for(int i = 0; i < mworkerStates.length; ++i) {
+           if(mworkerStates[i] == 1) {
+               return false;
+           }
+       }
+
+       return true;
+    }
+
+    public void addInterOutput(String interoutput) {
+       int start = interoutput.lastindexOf('_');
+       int end = interoutput.indexOf('.');
+       int index = Integer.parseInt(interoutput.subString(start + 1, end));
+       //System.printString(interoutput.subString(start + 1, end) + "\n");
+       if(interoutputs[index] == null) {
+           interoutputs[index] = new Vector();
+       }
+       interoutputs[index].addElement(interoutput);
+    }
+
+    public void assignReduce() {
+       for(int i = 0; i < interoutputs.length; ++i) {
+           ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
+           rworkerStates[i] = 1;
+       }
+    }
+
+    public void setReduceFinish(int i) {
+       rworkerStates[i] = 2;
+    }
+
+    public void setReduceFail(int i) {
+       rworkerStates[i] = 3;
+    }
+
+    public boolean isReduceFinish() {
+       for(int i = 0; i < rworkerStates.length; ++i) {
+           if(rworkerStates[i] == 1) {
+               return false;
+           }
+       }
+
+       return true;
+    }
+
+    public void collectROutput(String file) {
+       FileInputStream iStream = new FileInputStream(file);
+       FileOutputStream oStream = new FileOutputStream(outputfile, true);
+       byte[] b = new byte[1024 * 10];
+       int length = iStream.read(b);
+       if(length < 0) {
+           System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
+           System.exit(-1);
+       }
+       //System.printString(new String(b, 0, length) + "\n");
+       oStream.write(b, 0, length);
+       iStream.close();
+       oStream.close();
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Nor/OutputCollector.java b/Robust/src/Benchmarks/MapReduce/Nor/OutputCollector.java
new file mode 100644 (file)
index 0000000..3c6d852
--- /dev/null
@@ -0,0 +1,27 @@
+public class OutputCollector {
+
+    Vector keys;
+    Vector values;
+
+    public OutputCollector() {
+       this.keys = new Vector();
+       this.values = new Vector();
+    }
+
+    public void emit(String key, String value) {
+       this.keys.addElement(key);
+       this.values.addElement(value);
+    }
+
+    public int size() {
+       return this.keys.size();
+    }
+
+    public String getKey(int i) {
+       return (String)this.keys.elementAt(i);
+    }
+
+    public String getValue(int i) {
+       return (String)this.values.elementAt(i);
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java b/Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java
new file mode 100644 (file)
index 0000000..a0e39ff
--- /dev/null
@@ -0,0 +1,138 @@
+public class ReduceWorker {
+    flag sortgroup;
+    flag reduce;
+    flag reduceoutput;
+
+    int ID;
+    Vector interoutputs;  // string vector containing paths
+    // of intermediate outputs from map worker
+    Vector keys;
+    HashMap values; // hashmap map key to vector of string vector
+    int[] sorts; // array record the sort of keys
+    OutputCollector output;
+    String outputfile;  // path of the intermediate output file
+
+    public ReduceWorker(Vector interoutputs, int id) {
+       this.ID = id;
+       this.interoutputs = interoutputs;
+
+       this.keys = new Vector();
+       this.values = new HashMap();
+       //this.sorts = null;
+
+       this.output = new OutputCollector();
+       this.outputfile = "/scratch/mapreduce_nor/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+    }
+
+    public void sortgroup() {
+       /*if(ID % 2 == 1) {
+               int a[] = new int[1];
+               int temp = a[1];
+       }*/
+
+       // group values associated to the same key
+       //System.printString("================================\n");
+       if(interoutputs == null) {
+           return;
+       }
+       for(int i = 0; i < interoutputs.size(); ++i) {
+           FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
+           byte[] b = new byte[1024 * 10];
+           int length = iStream.read(b);
+           if(length < 0) {
+               System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
+               System.exit(-1);
+           }
+           String content = new String(b, 0, length);
+           //System.printString(content + "\n");
+           int index = content.indexOf('\n');
+           while(index != -1) {
+               String line = content.subString(0, index);
+               content = content.subString(index + 1);
+               //System.printString(line + "\n");
+               int tmpindex = line.indexOf(' ');
+               String key = line.subString(0, tmpindex);
+               String value = line.subString(tmpindex + 1);
+               //System.printString(key + "; " + value + "\n");
+               if(!this.values.containsKey(key)) {
+                   this.values.put(key, new Vector());
+                   this.keys.addElement(key);
+               }
+               ((Vector)this.values.get(key)).addElement(value);
+               index = content.indexOf('\n');
+           }
+           iStream.close();
+       }
+       //System.printString("================================\n");
+
+       /*for(int i = 0; i < this.keys.size(); ++i) {
+                       System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+               }
+               System.printString("\n");*/
+
+       // sort all the keys inside interoutputs
+       this.sorts = new int[this.keys.size()];
+       // insert sorting
+       this.sorts[0] = 0;
+       int tosort = 1;
+       for(; tosort < this.keys.size(); ++tosort) {
+           int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
+           int index = tosort;
+           for(int i = tosort; i > 0; --i) {
+               if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
+                   this.sorts[i] = this.sorts[i-1];
+                   index = i - 1;
+               } else {
+                   //System.printString(i + "; " + tosort + "\n");
+                   index = i;
+                   i = 0;
+               }
+           }
+           this.sorts[index] = tosort;
+       }
+       /*for(int i = 0; i < this.sorts.length; ++i) {
+                       System.printString(this.sorts[i] + "; ");
+               }
+               System.printString("\n");*/
+    }
+
+    public void reduce() {
+       /*if(ID % 2 == 1) {
+               int a[] = new int[1];
+               int temp = a[1];
+       }*/
+
+       if(this.interoutputs != null) {
+          // return;
+       //}
+       for(int i = 0; i < this.sorts.length; ++i) {
+           String key = (String)this.keys.elementAt(this.sorts[i]);
+           Vector values = (Vector)this.values.get(key);
+           MapReduceBase.reduce(key, values, output);
+       }
+       }
+
+       // output all the result into some local file
+       int size = this.output.size();
+       FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
+       for(int i = 0; i < size; ++i) {
+           String key = this.output.getKey(i);
+           String value = this.output.getValue(i);
+           // format: key value\n
+           oStream.write(key.getBytes());
+           oStream.write(' ');
+           oStream.write(value.getBytes());
+           oStream.write('\n');
+           oStream.flush();
+       }
+       oStream.close();
+    }
+
+    public String getOutputFile() {
+       return this.outputfile;
+    }
+
+    public int getID() {
+       return this.ID;
+    }
+}
diff --git a/Robust/src/Benchmarks/MapReduce/Nor/Splitter.java b/Robust/src/Benchmarks/MapReduce/Nor/Splitter.java
new file mode 100644 (file)
index 0000000..d61bf02
--- /dev/null
@@ -0,0 +1,73 @@
+public class Splitter {
+    String filename;
+    String content;
+    int length;
+    int[] splits;
+    String[] slices;
+
+    public Splitter(String path, int splitNum, char seperator) {
+       //System.printString("Top of Splitter's constructor\n");
+       filename = path;
+       FileInputStream iStream = new FileInputStream(filename);
+       byte[] b = new byte[1024 * 1024];
+       length = iStream.read(b);
+       if(length < 0) {
+           System.printString("Error! Can not read from input file: " + filename + "\n");
+           System.exit(-1);
+       }
+       content = new String(b, 0, length);
+       //System.printString(content + "\n");
+       iStream.close();
+
+       if(splitNum == 1) {
+           slices = new String[1];
+           slices[0] = content;
+       } else {
+           splits = new int[splitNum - 1];
+           int index = 0;
+           int span = length / splitNum;
+           int temp = 0;
+           for(int i = 0; i < splitNum - 1; ++i) {
+               temp += span;
+               if(temp > index) {
+                   index = temp;
+                   while((content.charAt(index) != seperator) && (index != length - 1)) {
+                       ++index;
+                   }
+               }
+               splits[i] = index;
+           }
+
+           this.slices = new String[splits.length + 1];
+           for(int i = 0; i < this.slices.length; ++i) {
+               this.slices[i] = null;
+           }
+       }
+    }
+
+    public void split() {
+       if(slices.length == 1) {
+           return;
+       }
+       int start = 0;
+       int end = 0;
+       for(int i = 0; i < splits.length; ++i) {
+           end = splits[i];
+           if(end < start) {
+               slices[i] = null;
+           } else {
+               slices[i] = content.subString(start, end);
+           }
+           start = end + 1;
+       }
+       slices[slices.length - 1] = content.subString(start);
+    }
+
+    public String getFilename() {
+       return filename;
+    }
+
+    public String[] getSlices() {
+       return this.slices;
+    }
+}
index b62e76bde3bae62ce3163b5f8d64697db815dc51..7556a8fc1c1f78f6a2bdddf5a65bf08e5a04db48 100644 (file)
@@ -1,7 +1,7 @@
 task startup(StartupObject s{initialstate}) {
     // read in configuration parameters
     // System.printString("Top of task startup\n");
-    String path = new String("/home/jzhou/mapreduce/conf.txt");
+    String path = new String("/scratch/mapreduce_opt/conf.txt");
     FileInputStream iStream = new FileInputStream(path);
     byte[] b = new byte[1024];
     int length = iStream.read(b);
index 0563556b0fabfa60108d8c13299cdf6424078272..60407fb358c474a2d2de5ed078a11c67fad52a98 100644 (file)
@@ -23,7 +23,7 @@ public class MapWorker {
 
        locations = new String[r];
        for(int i = 0; i < r; ++i) {
-           StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-");
+           StringBuffer temp = new StringBuffer("/scratch/mapreduce_opt/output-intermediate-map-");
            temp.append(String.valueOf(ID));
            temp.append("-of-");
            temp.append(String.valueOf(r));
index 49bb68fa54088b52daa7a4af5139985cbc5085f0..c547fd839180f97b50c62b0bec0f446fa99b5753 100644 (file)
@@ -42,7 +42,7 @@ public class Master {
        }
 
        this.splitter = splitter;
-       this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
+       this.outputfile = new String("/scratch/mapreduce_opt/output.dat");
 
        this.partial = false;
     }
@@ -97,7 +97,7 @@ public class Master {
     }
 
     public void addInterOutput(String interoutput) {
-       int start = interoutput.indexOf('_');
+       int start = interoutput.lastindexOf('_');
        int end = interoutput.indexOf('.');
        int index = Integer.parseInt(interoutput.subString(start + 1, end));
        //System.printString(interoutput.subString(start + 1, end) + "\n");
@@ -135,7 +135,7 @@ public class Master {
     public void collectROutput(String file) {
        FileInputStream iStream = new FileInputStream(file);
        FileOutputStream oStream = new FileOutputStream(outputfile, true);
-       byte[] b = new byte[1024 * 100];
+       byte[] b = new byte[1024 * 10];
        int length = iStream.read(b);
        if(length < 0) {
            System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
index d4787a4eb890c434dd5ed4f945a1b411ad1f30c5..7f2cbdb4a86038eaa264e64dcac470d6e9649321 100644 (file)
@@ -21,7 +21,7 @@ public class ReduceWorker {
        //this.sorts = null;
 
        this.output = new OutputCollector();
-       this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+       this.outputfile = "/scratch/mapreduce_opt/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
     }
 
     public void sortgroup() {
@@ -37,7 +37,7 @@ public class ReduceWorker {
        }
        for(int i = 0; i < interoutputs.size(); ++i) {
            FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
-           byte[] b = new byte[1024 * 100];
+           byte[] b = new byte[1024 * 10];
            int length = iStream.read(b);
            if(length < 0) {
                System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
index f79c8d850d55b6921d1de91d2035eb85b99217b7..d61bf023bbc0c7e00fad3f22d1fe69104eac3945 100644 (file)
@@ -9,7 +9,7 @@ public class Splitter {
        //System.printString("Top of Splitter's constructor\n");
        filename = path;
        FileInputStream iStream = new FileInputStream(filename);
-       byte[] b = new byte[1024 * 10];
+       byte[] b = new byte[1024 * 1024];
        length = iStream.read(b);
        if(length < 0) {
            System.printString("Error! Can not read from input file: " + filename + "\n");