optimize for performance
authorjzhou <jzhou>
Fri, 5 Sep 2008 22:52:03 +0000 (22:52 +0000)
committerjzhou <jzhou>
Fri, 5 Sep 2008 22:52:03 +0000 (22:52 +0000)
22 files changed:
Robust/src/Benchmarks/MapReduce/Java/Configuration.java
Robust/src/Benchmarks/MapReduce/Java/Configured.java
Robust/src/Benchmarks/MapReduce/Java/JobClient.java
Robust/src/Benchmarks/MapReduce/Java/MapReduceBase.java
Robust/src/Benchmarks/MapReduce/Java/MapWorker.java
Robust/src/Benchmarks/MapReduce/Java/Master.java
Robust/src/Benchmarks/MapReduce/Java/OutputCollector.java
Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java
Robust/src/Benchmarks/MapReduce/Java/Splitter.java
Robust/src/Benchmarks/MapReduce/Java/Tool.java
Robust/src/Benchmarks/MapReduce/Java/ToolRunner.java
Robust/src/Benchmarks/MapReduce/Java/WordCounter.java
Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java
Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java
Robust/src/Benchmarks/MapReduce/Nor/Master.java
Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java
Robust/src/Benchmarks/MapReduce/Nor/Splitter.java
Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java
Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java
Robust/src/Benchmarks/MapReduce/Tag/Master.java
Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java
Robust/src/Benchmarks/MapReduce/Tag/Splitter.java

index 9a245f51261d55af54257c13d076e5144ace6394..983326795eae2078f56a0d82a8db0d4cda4dd8e6 100644 (file)
@@ -1,5 +1,3 @@
-//package mapreduce;
-
 public class Configuration {
     
     MapReduceBase mapreducer;
index 5cce432b80f754a40691cafd0521a1c18a50f048..8de0f09ca1818995f8df570882ab65ab3306b75a 100644 (file)
@@ -1,5 +1,3 @@
-//package mapreduce;
-
 public class Configured {
     
     Configuration conf;
index cab79af503bc050307dbde54a8f6656a9dd23305..ce92f96bd107f4b661efbb477d894fbfa7d43f58 100644 (file)
@@ -1,5 +1,3 @@
-//package mapreduce;
-
 public class JobClient{
     
     public JobClient() {}
@@ -10,7 +8,7 @@ public class JobClient{
        
        // split input file
        //System.printString("Split\n");
-       master.split();
+       //master.split();
        
        // do 'map'
        //System.printString("Map\n");
@@ -53,7 +51,7 @@ public class JobClient{
        }
        //assert(master.isReduceFinish());
        
-       System./*out.println*/printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+       //System./*out.println*/printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
     }
     
 }
index 21b806e69a51a62bdabebaed83b5d072832ce60d..c73d4a659fa96d8b47f7ec301dc171dd8f4b7cba 100644 (file)
@@ -1,13 +1,9 @@
-//package mapreduce;
-
-//import java.util.Vector;
-
-public /*abstract*/ class MapReduceBase {
+public class MapReduceBase {
     
     public MapReduceBase() {}
     
-    public /*abstract*/ void map(String key, String value, OutputCollector output);
+    public void map(String key, String value, OutputCollector output);
     
-    public /*abstract*/ void reduce(String key, Vector values, OutputCollector output);
+    public void reduce(String key, Vector values, OutputCollector output);
 }
 
index f5b591179157034cfd5363b96c7fc1099cb7c2b3..7c56eae827d741fcee226456c30d7a386077c985 100644 (file)
@@ -1,19 +1,13 @@
-//package mapreduce;
-
-//import java.io.FileOutputStream;
-
 public class MapWorker {
 
     int ID;
     MapReduceBase mapreducer;
-
     int r;
     String key;
     String value;
     OutputCollector output;
-
-    String[] locations;
-    FileOutputStream[] outputs;
+    String locationPrefix;
+    boolean[] outputsexit;
 
     public MapWorker(String key, String value, int r, int id) {
        this.ID = id;
@@ -23,23 +17,9 @@ public class MapWorker {
        this.key = key;
        this.value = value;
        this.output = new OutputCollector();
-
-       locations = new String[r];
-       for(int i = 0; i < r; ++i) {
-           StringBuffer temp = new StringBuffer("/scratch/mapreduce_java/output-intermediate-map-");
-           temp.append(String.valueOf(ID));
-           temp.append("-of-");
-           temp.append(String.valueOf(r));
-           temp.append("_");
-           temp.append(String.valueOf(i));
-           temp.append(".dat");
-           locations[i] = new String(temp);
-       }
-
-       outputs = new FileOutputStream[r];
-       for(int i = 0; i < r; ++i) {
-           outputs[i] = null;
-       }
+       this.locationPrefix = "/scratch/mapreduce_java/output-intermediate-map-";
+       
+       this.outputsexit = new boolean[r];
     }
 
     public MapReduceBase getMapreducer() {
@@ -51,58 +31,63 @@ public class MapWorker {
     }
 
     public void map() {
-       /*if(ID % 2 == 1) {
-           String temp = locations[locations.length];
-       }*/
-       
        this.mapreducer.map(key, value, output);
+       this.key = null;
+       this.value = null;
     }
 
     public void partition() {
-       /*if(ID % 2 == 1) {
-           String temp = locations[locations.length];
-       }*/
+       FileOutputStream[] outputs = new FileOutputStream[r];
+       for(int i = 0; i < r; ++i) {
+           outputs[i] = null;
+       }
        
-       //try{
-           int size = this.output.size();
-           for(int i = 0; i < size; ++i) {
-               String key = this.output.getKey(i);
-               String value = this.output.getValue(i);
-               // use the hashcode of key to decide which intermediate output
-               // this pair should be in
-               //int hash = key.hashCode();
-               int index = (int)Math.abs(key.hashCode()) % this.r;
-               FileOutputStream oStream = outputs[index];
-               if(oStream == null) {
-                   // open the file
-                   String filepath = locations[index];
-                   oStream = new FileOutputStream(filepath, true); // append
-                   outputs[index] = oStream;
-               }
-               // format: key value\n
-               oStream.write(key.getBytes());
-               oStream.write(' ');
-               oStream.write(value.getBytes());
-               oStream.write('\n');
-               oStream.flush();
+       int size = this.output.size();
+       for(int i = 0; i < size; ++i) {
+           String key = this.output.getKey(i);
+           String value = this.output.getValue(i);
+           // use the hashcode of key to decide which intermediate output
+           // this pair should be in
+           //int hash = key.hashCode();
+           int index = (int)Math.abs(key.hashCode()) % this.r;
+           FileOutputStream oStream = outputs[index];
+           if(oStream == null) {
+               // open the file
+               String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat";
+               oStream = new FileOutputStream(filepath, true); // append
+               outputs[index] = oStream;
+               this.outputsexit[index] = true;
            }
+           // format: key value\n
+           oStream.write(key.getBytes());
+           oStream.write(' ');
+           oStream.write(value.getBytes());
+           oStream.write('\n');
+           oStream.flush();
+       }
 
-           // close the output files
-           for(int i = 0; i < this.outputs.length; ++i) {
-               FileOutputStream temp = this.outputs[i];
-               if(temp != null) {
-                   temp.close();
-               }
+       // close the output files
+       for(int i = 0; i < outputs.length; ++i) {
+           FileOutputStream temp = outputs[i];
+           if(temp != null) {
+               temp.close();
+               outputs[i] = null;
            }
-       /*} catch(Exception e) {
-           e.printStackTrace();
-           System.exit(-1);
-       }*/
+       }
+       
+       this.output = null;
     }
 
     public String outputFile(int i) {
-       if(outputs[i] != null) {
-           return locations[i];
+       if(outputsexit[i]) {
+           StringBuffer temp = new StringBuffer(this.locationPrefix);
+           temp.append(String.valueOf(ID));
+           temp.append("-of-");
+           temp.append(String.valueOf(r));
+           temp.append("_");
+           temp.append(String.valueOf(i));
+           temp.append(".dat");
+           return new String(temp);
        } else {
            return null;
        }
index 39ac5cc401dbe6e5bba47782d3b25c6c35f9fe51..f1f197d923a9893860761c3f7221376bf91627de 100644 (file)
@@ -1,47 +1,25 @@
-//package mapreduce;
-
-/*import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.util.Vector;*/
-
 public class Master {
 
     int m;
     int r;
     int[] mworkerStates; // array of map worker's state
-    // 0: idle  1: process  2: finished 3: fail
+                         // 0: idle  1: process  2: finished 3: fail
     int[] rworkerStates; // array of reduce worker's state
     Vector[] interoutputs; // array of string vector containing
-    // paths of intermediate outputs from
-    // map worker
-
+                           // paths of intermediate outputs from
+                           // map worker
     Splitter splitter;
-
     String outputfile;  // path of final output file
-
     boolean partial;
 
     public Master(int m, int r, Splitter splitter) {
        this.m = m;
        this.r = r;
-
-       mworkerStates = new int[m];
-       rworkerStates = new int[r];
-       for(int i = 0; i < m; ++i) {
-           mworkerStates[i] = 0;
-       }
-       for(int i = 0; i < r; ++i) {
-           rworkerStates[i] = 0;
-       }
-
-       interoutputs = new Vector[r];
-       for(int i = 0; i < r; ++i) {
-           interoutputs[i] = null;
-       }
-
+       this.mworkerStates = new int[m];
+       this.rworkerStates = new int[r];
+       this.interoutputs = new Vector[r];
        this.splitter = splitter;
        this.outputfile = new String("/scratch/mapreduce_java/output.dat");
-
        this.partial = false;
     }
 
@@ -61,12 +39,12 @@ public class Master {
        this.partial = partial || this.partial;
     }
 
-    public void split() {
+    /*public void split() {
        splitter.split();
-    }
+    }*/
 
     public MapWorker[] assignMap() {
-       String[] contentsplits = splitter.getSlices();
+       String[] contentsplits = splitter.split();//splitter.getSlices();
        MapWorker[] mworkers = new MapWorker[contentsplits.length];
        for(int i = 0; i < contentsplits.length; ++i) {
            //System.printString("*************************\n");
@@ -76,6 +54,7 @@ public class Master {
            mworkerStates[i] = 1;
            mworkers[i] = mworker;
        }
+       this.splitter = null;
        return mworkers;
     }
 
@@ -114,7 +93,9 @@ public class Master {
            ReduceWorker rworker = new ReduceWorker(interoutputs[i], i);
            rworkerStates[i] = 1;
            rworkers[i] = rworker;
+           this.interoutputs[i] = null;
        }
+       this.interoutputs.clear();
        return rworkers;
     }
 
@@ -137,22 +118,17 @@ public class Master {
     }
 
     public void collectROutput(String file) {
-       //try{
-           FileInputStream iStream = new FileInputStream(file);
-           FileOutputStream oStream = new FileOutputStream(outputfile, true);
-           byte[] b = new byte[1024 * 10];
-           int length = iStream.read(b);
-           if(length < 0) {
-               System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
-               System.exit(-1);
-           }
-           //System.printString(new String(b, 0, length) + "\n");
-           oStream.write(b, 0, length);
-           iStream.close();
-           oStream.close();
-       /*} catch(Exception e) {
-           e.printStackTrace();
+       FileInputStream iStream = new FileInputStream(file);
+       FileOutputStream oStream = new FileOutputStream(outputfile, true);
+       byte[] b = new byte[1024 * 10];
+       int length = iStream.read(b);
+       if(length < 0) {
+           System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
            System.exit(-1);
-       }*/
+       }
+       //System.printString(new String(b, 0, length) + "\n");
+       oStream.write(b, 0, length);
+       iStream.close();
+       oStream.close();
     }
 }
index e66a6758863c7d399ec975d5ca18f1f806a7a0cf..3c6d852c6e733d1962bde838f06b18733e0566ad 100644 (file)
@@ -1,7 +1,3 @@
-//package mapreduce;
-
-//import java.util.Vector;
-
 public class OutputCollector {
 
     Vector keys;
index 4559882b6878f9dbcf8613428fb56eba8de1fe4f..3df1f0b93e176d2d85331a72aa93dfab674e237f 100644 (file)
@@ -1,13 +1,3 @@
-//package mapreduce;
-
-/*import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Vector;*/
-
-import mapreduce.MapReduceBase;
-
 public class ReduceWorker {
 
     int ID;
@@ -24,13 +14,9 @@ public class ReduceWorker {
     public ReduceWorker(Vector interoutputs, int id) {
        this.ID = id;
        this.mapreducer = null;
-       
        this.interoutputs = interoutputs;
-
        this.keys = new Vector();
        this.values = new HashMap();
-       //this.sorts = null;
-
        this.output = new OutputCollector();
        this.outputfile = "/scratch/mapreduce_java/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
     }
@@ -44,117 +30,97 @@ public class ReduceWorker {
     }
 
     public void sortgroup() {
-       /*if(ID % 2 == 1) {
-           int a[] = new int[1];
-           int temp = a[1];
-       }*/
-       
        // group values associated to the same key
        //System.printString("================================\n");
        if(interoutputs == null) {
            return;
        }
-       //try{
-           for(int i = 0; i < interoutputs.size(); ++i) {
-               FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
-               byte[] b = new byte[1024 * 10];
-               int length = iStream.read(b);
-               if(length < 0) {
-                   System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
-                   System.exit(-1);
-               }
-               String content = new String(b, 0, length);
-               //System.printString(content + "\n");
-               int index = content.indexOf('\n');
-               while(index != -1) {
-                   String line = content.substring(0, index);
-                   content = content.substring(index + 1);
-                   //System.printString(line + "\n");
-                   int tmpindex = line.indexOf(' ');
-                   String key = line.substring(0, tmpindex);
-                   String value = line.substring(tmpindex + 1);
-                   //System.printString(key + "; " + value + "\n");
-                   if(!this.values.containsKey(key)) {
-                       this.values.put(key, new Vector());
-                       this.keys.addElement(key);
-                   }
-                   ((Vector)this.values.get(key)).addElement(value);
-                   index = content.indexOf('\n');
-               }
-               iStream.close();
+       for(int i = 0; i < interoutputs.size(); ++i) {
+           FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
+           byte[] b = new byte[1024 * 10];
+           int length = iStream.read(b);
+           if(length < 0) {
+               System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
+               System.exit(-1);
            }
-           //System.printString("================================\n");
-
-           /*for(int i = 0; i < this.keys.size(); ++i) {
-                       System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+           String content = new String(b, 0, length);
+           //System.printString(content + "\n");
+           int index = content.indexOf('\n');
+           while(index != -1) {
+               String line = content.subString(0, index);
+               content = content.subString(index + 1);
+               //System.printString(line + "\n");
+               int tmpindex = line.indexOf(' ');
+               String key = line.subString(0, tmpindex);
+               String value = line.subString(tmpindex + 1);
+               //System.printString(key + "; " + value + "\n");
+               if(!this.values.containsKey(key)) {
+                   this.values.put(key, new Vector());
+                   this.keys.addElement(key);
                }
-               System.printString("\n");*/
+               ((Vector)this.values.get(key)).addElement(value);
+               index = content.indexOf('\n');
+           }
+           iStream.close();
+       }
+       //System.printString("================================\n");
 
-           // sort all the keys inside interoutputs
-           this.sorts = new int[this.keys.size()];
-           // insert sorting
-           this.sorts[0] = 0;
-           int tosort = 1;
-           for(; tosort < this.keys.size(); ++tosort) {
-               int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
-               int index = tosort;
-               for(int i = tosort; i > 0; --i) {
-                   if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
-                       this.sorts[i] = this.sorts[i-1];
-                       index = i - 1;
-                   } else {
-                       //System.printString(i + "; " + tosort + "\n");
-                       index = i;
-                       i = 0;
-                   }
+       /*for(int i = 0; i < this.keys.size(); ++i) {
+           System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+       }
+       System.printString("\n");*/
+
+       // sort all the keys inside interoutputs
+       this.sorts = new int[this.keys.size()];
+       // insert sorting
+       this.sorts[0] = 0;
+       int tosort = 1;
+       for(; tosort < this.keys.size(); ++tosort) {
+           int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
+           int index = tosort;
+           for(int i = tosort; i > 0; --i) {
+               if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
+                   this.sorts[i] = this.sorts[i-1];
+                   index = i - 1;
+               } else {
+                   //System.printString(i + "; " + tosort + "\n");
+                   index = i;
+                   i = 0;
                }
-               this.sorts[index] = tosort;
            }
-           /*for(int i = 0; i < this.sorts.length; ++i) {
-                       System.printString(this.sorts[i] + "; ");
-               }
-               System.printString("\n");*/
-       /*} catch(IOException e) {
-           e.printStackTrace();
-           System.exit(-1);
-       }*/
+           this.sorts[index] = tosort;
+       }
+       /*for(int i = 0; i < this.sorts.length; ++i) {
+           System.printString(this.sorts[i] + "; ");
+       }
+       System.printString("\n");*/
     }
 
     public void reduce() {
-       /*if(ID % 2 == 1) {
-           int a[] = new int[1];
-           int temp = a[1];
-       }*/
-       
        if(this.interoutputs != null) {
-           //return;
-       //}
-       for(int i = 0; i < this.sorts.length; ++i) {
-           String key = (String)this.keys.elementAt(this.sorts[i]);
-           Vector values = (Vector)this.values.get(key);
-           this.mapreducer.reduce(key, values, output);
-       }
+           for(int i = 0; i < this.sorts.length; ++i) {
+               String key = (String)this.keys.elementAt(this.sorts[i]);
+               Vector values = (Vector)this.values.get(key);
+               this.mapreducer.reduce(key, values, output);
+           }
        }
 
-       //try{
-           // output all the result into some local file
-           int size = this.output.size();
-           FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
-           for(int i = 0; i < size; ++i) {
-               String key = this.output.getKey(i);
-               String value = this.output.getValue(i);
-               // format: key value\n
-               oStream.write(key.getBytes());
-               oStream.write(' ');
-               oStream.write(value.getBytes());
-               oStream.write('\n');
-               oStream.flush();
-           }
-           oStream.close();
-       /*} catch(Exception e) {
-           e.printStackTrace();
-           System.exit(-1);
-       }*/
+       // output all the result into some local file
+       int size = this.output.size();
+       FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
+       for(int i = 0; i < size; ++i) {
+           String key = this.output.getKey(i);
+           String value = this.output.getValue(i);
+           // format: key value\n
+           oStream.write(key.getBytes());
+           oStream.write(' ');
+           oStream.write(value.getBytes());
+           oStream.write('\n');
+           oStream.flush();
+       }
+       oStream.close();
+       this.keys = null;
+       this.values = null;
     }
 
     public String getOutputFile() {
index 53cae4dfef5d2d283c72bc6d3500ec58a3a12eca..07e69e7f2724ea72dd61cb4a3a2e043bf33247a0 100644 (file)
@@ -1,83 +1,72 @@
-//package mapreduce;
-
-//import java.io.FileInputStream;
-//import java.io.IOException;
-
 public class Splitter {
     String filename;
     String content;
     int length;
-    int[] splits;
-    String[] slices;
+    int splitNum;
+    char seperator;
 
     public Splitter(String path, int splitNum, char seperator) {
-       //try{
-           //System.printString("Top of Splitter's constructor\n");
-           filename = path;
-           FileInputStream iStream = new FileInputStream(filename);
-           byte[] b = new byte[1024 * 1024];
-           length = iStream.read(b);
-           if(length < 0) {
-               System./*out.println*/printString("Error! Can not read from input file: " + filename + "\n");
-               System.exit(-1);
-           }
-           content = new String(b, 0, length);
-           //System.printString(content + "\n");
-           iStream.close();
+       //System.printString("Top of Splitter's constructor\n");
+       filename = path;
+       this.length = -1;
+       this.splitNum = splitNum;
+       this.seperator = seperator;
+    }
 
-           if(splitNum == 1) {
-               slices = new String[1];
-               slices[0] = content;
-           } else {
-               splits = new int[splitNum - 1];
-               int index = 0;
-               int span = length / splitNum;
-               int temp = 0;
-               for(int i = 0; i < splitNum - 1; ++i) {
-                   temp += span;
-                   if(temp > index) {
-                       index = temp;
-                       while((content.charAt(index) != seperator) && (index != length - 1)) {
-                           ++index;
-                       }
-                   }
-                   splits[i] = index;
-               }
+    public String[] split() {
+       int[] splits;
+       String[] slices;
+       
+       FileInputStream iStream = new FileInputStream(filename);
+       byte[] b = new byte[1024 * 1024];
+       length = iStream.read(b);
+       if(length < 0) {
+           System.printString("Error! Can not read from input file: " + filename + "\n");
+           System.exit(-1);
+       }
+       content = new String(b, 0, length);
+       //System.printString(content + "\n");
+       iStream.close();
 
-               this.slices = new String[splits.length + 1];
-               for(int i = 0; i < this.slices.length; ++i) {
-                   this.slices[i] = null;
+       if(splitNum == 1) {
+           slices = new String[1];
+           slices[0] = content;
+           this.content = null;
+       } else {
+           splits = new int[splitNum - 1];
+           int index = 0;
+           int span = length / splitNum;
+           int temp = 0;
+           for(int i = 0; i < splitNum - 1; ++i) {
+               temp += span;
+               if(temp > index) {
+                   index = temp;
+                   while((content.charAt(index) != seperator) && (index != length - 1)) {
+                       ++index;
+                   }
                }
+               splits[i] = index;
            }
-       /*} catch(IOException e) {
-           e.printStackTrace();
-           System.exit(-1);
-       }*/
-    }
 
-    public void split() {
-       if(slices.length == 1) {
-           return;
-       }
-       int start = 0;
-       int end = 0;
-       for(int i = 0; i < splits.length; ++i) {
-           end = splits[i];
-           if(end < start) {
-               slices[i] = null;
-           } else {
-               slices[i] = content.substring(start, end);
+           slices = new String[splitNum];
+           int start = 0;
+           int end = 0;
+           for(int i = 0; i < splits.length; ++i) {
+               end = splits[i];
+               if(end < start) {
+                   slices[i] = null;
+               } else {
+                   slices[i] = content.subString(start, end);
+               }
+               start = end + 1;
            }
-           start = end + 1;
+           slices[slices.length - 1] = content.subString(start);
+           this.content = null;
        }
-       slices[slices.length - 1] = content.substring(start);
+       return slices;
     }
 
     public String getFilename() {
        return filename;
     }
-
-    public String[] getSlices() {
-       return this.slices;
-    }
 }
index 5dbeb6e7f787cdbd1322e009d51cab62d4093100..ab9dd83864ff171d43d2c3c79fc4ea18d39ee147 100644 (file)
@@ -1,5 +1,3 @@
-//package mapreduce;
-
-public /*interface*/ class Tool {
+public class Tool {
     public int run(String[] args);
 }
index 82a8c5a4ff2674df1d210620a2a8181303dfc1a2..b91290c4b5e13779e3c79b3f3ddbdc78cad29fad 100644 (file)
@@ -1,5 +1,3 @@
-//package mapreduce;
-
 public class ToolRunner {
     
     public static int run(Tool tool, String[] args) {
index c67a291b4d45107e1c9a55196ecc4b494009df16..16061424abe74fbc6bcde67b641dd19b72226a15 100644 (file)
@@ -1,15 +1,3 @@
-
-//import java.io.FileInputStream;
-//import java.util.Vector;
-
-/*import mapreduce.Configuration;
-import mapreduce.Configured;
-import mapreduce.JobClient;
-import mapreduce.MapReduceBase;
-import mapreduce.OutputCollector;
-import mapreduce.Tool;
-import mapreduce.ToolRunner;*/
-
 /**
      * Counts the words in each line.
      * For each line of input, break the line into words and emit them as
@@ -71,12 +59,12 @@ import mapreduce.ToolRunner;*/
        }
     }
 
-public class WordCounter /*implements*/extends Tool {
+public class WordCounter extends Tool {
 
-       public WordCounter() {}
+    public WordCounter() {}
 
     static int printUsage() {
-       System./*out.println*/printString("<conffile>\n");
+       System.printString("<conffile>\n");
        return -1;
     }
 
@@ -87,50 +75,45 @@ public class WordCounter /*implements*/extends Tool {
      *                     job tracker.
      */
     public int run(String[] args) {
-       //try {
-           MapReduceClass mapreducer = new MapReduceClass();       
-
-           FileInputStream iStream = new FileInputStream(args[0]);
-           byte[] b = new byte[1024];
-           int length = iStream.read(b);
-           if(length < 0 ) {
-               System./*out.println*/printString("Error! Can not read from configure file: " + args[0] + "\n");
-               System.exit(-1);
-           }
-           String content = new String(b, 0, length);
-           //System.out.println(content);
-           int index = content.indexOf('\n');
-           String inputfile = content.substring(0, index);
-           content = content.substring(index + 1);
-           index = content.indexOf('\n');
-           int m = Integer.parseInt(content.substring(0, index));
-           content = content.substring(index + 1);
-           index = content.indexOf('\n');
-           int r = Integer.parseInt(content.substring(0, index));
-           content = content.substring(index + 1);
-           index = content.indexOf('\n');
-           String temp = content.substring(0, index);
-           char seperator = temp.charAt(0);
-           //System.out.println(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r));
-           
-           Configuration conf = new Configuration();
-           conf.setMapReduceClass(mapreducer);
-           conf.setInputfile(inputfile);
-           conf.setM(m);
-           conf.setR(r);
-           conf.setSeperator(seperator);
-
-           JobClient.runJob(conf);
-       /*} catch (Exception e) {
-           e.printStackTrace();
+       MapReduceClass mapreducer = new MapReduceClass();       
+
+       FileInputStream iStream = new FileInputStream(args[0]);
+       byte[] b = new byte[1024];
+       int length = iStream.read(b);
+       if(length < 0 ) {
+           System.printString("Error! Can not read from configure file: " + args[0] + "\n");
            System.exit(-1);
-       }*/
+       }
+       String content = new String(b, 0, length);
+       //System.out.println(content);
+       int index = content.indexOf('\n');
+       String inputfile = content.substring(0, index);
+       content = content.substring(index + 1);
+       index = content.indexOf('\n');
+       int m = Integer.parseInt(content.substring(0, index));
+       content = content.substring(index + 1);
+       index = content.indexOf('\n');
+       int r = Integer.parseInt(content.substring(0, index));
+       content = content.substring(index + 1);
+       index = content.indexOf('\n');
+       String temp = content.substring(0, index);
+       char seperator = temp.charAt(0);
+       //System.out.println(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r));
+
+       Configuration conf = new Configuration();
+       conf.setMapReduceClass(mapreducer);
+       conf.setInputfile(inputfile);
+       conf.setM(m);
+       conf.setR(r);
+       conf.setSeperator(seperator);
+
+       JobClient.runJob(conf);
        
        return 0;
     }
 
 
-    public static void main(String[] args) /*throws Exception*/ {
+    public static void main(String[] args) {
        int res = ToolRunner.run(new WordCounter(), args);
        System.exit(res);
     }
index 1a81bb61ebdf98e2ed9f688fd7dae06d9edf2730..8c1268003d172366a3eec34d266a2c4d11b6c59e 100644 (file)
@@ -1,6 +1,6 @@
 task startup(StartupObject s{initialstate}) {
     // read in configuration parameters
-    // System.printString("Top of task startup\n");
+    //System.printString("Top of task startup\n");
     String path = new String("/scratch/mapreduce_nor/conf.txt");
     FileInputStream iStream = new FileInputStream(path);
     byte[] b = new byte[1024];
@@ -25,13 +25,15 @@ task startup(StartupObject s{initialstate}) {
     char seperator = temp.charAt(0);
     //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
     Splitter splitter = new Splitter(inputfile, m, seperator);
-    Master master = new Master(m, r, splitter){split};
+    Master master = new Master(m, r, splitter){mapoutput};//{split};
+    
+    master.assignMap();
 
     taskexit(s{!initialstate});
 }
 
 //Split the input file into M pieces
-task split(Master master{split}) {
+/*task split(Master master{split}) {
     //System.printString("Top of task split\n");
     master.split();
 
@@ -44,23 +46,24 @@ task assignMap(Master master{assignMap}) {
     master.assignMap();
 
     taskexit(master{!assignMap, mapoutput});
-}
+}*/
 
 //MapWorker do 'map' function on a input file piece
 task map(MapWorker mworker{map}) {
     //System.printString("Top of task map\n");
     mworker.map();
 
-    taskexit(mworker{!map, partition});
+    /*taskexit(mworker{!map, partition});
 }
 
 //Partition the intermediate key/value pair generated
 //into R intermediate local files
 task partition(MapWorker mworker{partition}) {
-    //System.printString("Top of task partition\n");
+    //System.printString("Top of task partition\n");*/
     mworker.partition();
 
-    taskexit(mworker{!partition, mapoutput});
+    //taskexit(mworker{!partition, mapoutput});
+    taskexit(mworker{!map, mapoutput});
 }
 
 //Register the intermediate ouput from map worker to master
@@ -101,15 +104,16 @@ task sortgroup(ReduceWorker rworker{sortgroup}) {
     //System.printString("Top of task sortgroup\n");
     rworker.sortgroup();
 
-    taskexit(rworker{!sortgroup, reduce});
+    /*taskexit(rworker{!sortgroup, reduce});
 }
 
 //Do 'reduce' function
 task reduce(ReduceWorker rworker{reduce}) {
-    //System.printString("Top of task reduce\n");
+    //System.printString("Top of task reduce\n");*/
     rworker.reduce();
 
-    taskexit(rworker{!reduce, reduceoutput});
+    //taskexit(rworker{!reduce, reduceoutput});
+    taskexit(rworker{!sortgroup, reduceoutput});
 }
 
 //Collect the output into master
@@ -132,9 +136,9 @@ task reduceOutput(Master master{reduceoutput}, /*optional*/ ReduceWorker rworker
 
 task output(Master master{output}) {
     //System.printString("Top of task output\n");
-    if(master.isPartial()) {
+    /*if(master.isPartial()) {
        System.printString("Partial! The result may not be right due to some failure!\n");
     }
-    System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+    System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/
     taskexit(master{!output});
 }
index 0ab23fa7703c9650eb744b532a3520598eca061a..8c9fa3c14d39810e465d4e05af20552dacd48d55 100644 (file)
@@ -4,14 +4,12 @@ public class MapWorker {
     flag mapoutput;
 
     int ID;
-
     int r;
     String key;
     String value;
     OutputCollector output;
-
-    String[] locations;
-    FileOutputStream[] outputs;
+    String locationPrefix;
+    boolean[] outputsexit;
 
     public MapWorker(String key, String value, int r, int id) {
        this.ID = id;
@@ -20,38 +18,23 @@ public class MapWorker {
        this.key = key;
        this.value = value;
        this.output = new OutputCollector();
-
-       locations = new String[r];
-       for(int i = 0; i < r; ++i) {
-           StringBuffer temp = new StringBuffer("/scratch/mapreduce_nor/output-intermediate-map-");
-           temp.append(String.valueOf(ID));
-           temp.append("-of-");
-           temp.append(String.valueOf(r));
-           temp.append("_");
-           temp.append(String.valueOf(i));
-           temp.append(".dat");
-           locations[i] = new String(temp);
-       }
-
-       outputs = new FileOutputStream[r];
-       for(int i = 0; i < r; ++i) {
-           outputs[i] = null;
-       }
+       this.locationPrefix = "/scratch/mapreduce_nor/output-intermediate-map-";
+       
+       this.outputsexit = new boolean[r];
     }
 
     public void map() {
-       /*if(ID % 2 == 1) {
-               String temp = locations[locations.length];
-       }*/
-
        MapReduceBase.map(key, value, output);
+       this.key = null;
+       this.value = null;
     }
 
     public void partition() {
-       /*if(ID % 2 == 1) {
-               String temp = locations[locations.length];
-       }*/
-
+       FileOutputStream[] outputs = new FileOutputStream[r];
+       for(int i = 0; i < r; ++i) {
+           outputs[i] = null;
+       }
+       
        int size = this.output.size();
        for(int i = 0; i < size; ++i) {
            String key = this.output.getKey(i);
@@ -62,9 +45,11 @@ public class MapWorker {
            FileOutputStream oStream = outputs[index];
            if(oStream == null) {
                // open the file
-               String filepath = locations[index];
+               String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat";
+               //System.printString("partition: " + filepath + "\n");
                oStream = new FileOutputStream(filepath, true); // append
                outputs[index] = oStream;
+               this.outputsexit[index] = true;
            }
            // format: key value\n
            oStream.write(key.getBytes());
@@ -75,17 +60,27 @@ public class MapWorker {
        }
 
        // close the output files
-       for(int i = 0; i < this.outputs.length; ++i) {
-           FileOutputStream temp = this.outputs[i];
+       for(int i = 0; i < outputs.length; ++i) {
+           FileOutputStream temp = outputs[i];
            if(temp != null) {
                temp.close();
+               outputs[i] = null;
            }
        }
+       
+       this.output = null;
     }
 
     public String outputFile(int i) {
-       if(outputs[i] != null) {
-           return locations[i];
+       if(outputsexit[i]) {
+           StringBuffer temp = new StringBuffer(this.locationPrefix);
+           temp.append(String.valueOf(ID));
+           temp.append("-of-");
+           temp.append(String.valueOf(r));
+           temp.append("_");
+           temp.append(String.valueOf(i));
+           temp.append(".dat");
+           return new String(temp);
        } else {
            return null;
        }
index 4e21b9c394536a7e39c00f6067b89ace32f4ef5e..65738868cc4ae5544f6c2ffdd7aeab108eae02a5 100644 (file)
@@ -11,39 +11,23 @@ public class Master {
     int m;
     int r;
     int[] mworkerStates; // array of map worker's state
-    // 0: idle  1: process  2: finished 3: fail
+                         // 0: idle  1: process  2: finished 3: fail
     int[] rworkerStates; // array of reduce worker's state
     Vector[] interoutputs; // array of string vector containing
-    // paths of intermediate outputs from
-    // map worker
-
+                           // paths of intermediate outputs from
+                           // map worker
     Splitter splitter;
-
     String outputfile;  // path of final output file
-
     boolean partial;
 
     public Master(int m, int r, Splitter splitter) {
        this.m = m;
        this.r = r;
-
-       mworkerStates = new int[m];
-       rworkerStates = new int[r];
-       for(int i = 0; i < m; ++i) {
-           mworkerStates[i] = 0;
-       }
-       for(int i = 0; i < r; ++i) {
-           rworkerStates[i] = 0;
-       }
-
-       interoutputs = new Vector[r];
-       for(int i = 0; i < r; ++i) {
-           interoutputs[i] = null;
-       }
-
+       this.mworkerStates = new int[m];
+       this.rworkerStates = new int[r];
+       this.interoutputs = new Vector[r];
        this.splitter = splitter;
        this.outputfile = new String("/scratch/mapreduce_nor/output.dat");
-
        this.partial = false;
     }
 
@@ -63,12 +47,12 @@ public class Master {
        this.partial = partial || this.partial;
     }
 
-    public void split() {
+    /*public void split() {
        splitter.split();
-    }
+    }*/
 
     public void assignMap() {
-       String[] contentsplits = splitter.getSlices();
+       String[] contentsplits = splitter.split();//splitter.getSlices();
        for(int i = 0; i < contentsplits.length; ++i) {
            //System.printString("*************************\n");
            //System.printString(contentsplits[i] + "\n");
@@ -76,6 +60,8 @@ public class Master {
            MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
            mworkerStates[i] = 1;
        }
+       
+       this.splitter = null;
     }
 
     public void setMapFinish(int i) {
@@ -111,7 +97,9 @@ public class Master {
        for(int i = 0; i < interoutputs.length; ++i) {
            ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
            rworkerStates[i] = 1;
+           this.interoutputs[i] = null;
        }
+       this.interoutputs.clear();
     }
 
     public void setReduceFinish(int i) {
index a0e39ffa68544d2ada63996edd288a004c90df25..4eb2c69b3e11a5a4093bfcae1dd7fcae761b64c0 100644 (file)
@@ -5,7 +5,7 @@ public class ReduceWorker {
 
     int ID;
     Vector interoutputs;  // string vector containing paths
-    // of intermediate outputs from map worker
+                          // of intermediate outputs from map worker
     Vector keys;
     HashMap values; // hashmap map key to vector of string vector
     int[] sorts; // array record the sort of keys
@@ -15,21 +15,13 @@ public class ReduceWorker {
     public ReduceWorker(Vector interoutputs, int id) {
        this.ID = id;
        this.interoutputs = interoutputs;
-
        this.keys = new Vector();
        this.values = new HashMap();
-       //this.sorts = null;
-
        this.output = new OutputCollector();
        this.outputfile = "/scratch/mapreduce_nor/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
     }
 
     public void sortgroup() {
-       /*if(ID % 2 == 1) {
-               int a[] = new int[1];
-               int temp = a[1];
-       }*/
-
        // group values associated to the same key
        //System.printString("================================\n");
        if(interoutputs == null) {
@@ -66,9 +58,9 @@ public class ReduceWorker {
        //System.printString("================================\n");
 
        /*for(int i = 0; i < this.keys.size(); ++i) {
-                       System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
-               }
-               System.printString("\n");*/
+           System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+       }
+       System.printString("\n");*/
 
        // sort all the keys inside interoutputs
        this.sorts = new int[this.keys.size()];
@@ -91,25 +83,18 @@ public class ReduceWorker {
            this.sorts[index] = tosort;
        }
        /*for(int i = 0; i < this.sorts.length; ++i) {
-                       System.printString(this.sorts[i] + "; ");
-               }
-               System.printString("\n");*/
+           System.printString(this.sorts[i] + "; ");
+       }
+       System.printString("\n");*/
     }
 
     public void reduce() {
-       /*if(ID % 2 == 1) {
-               int a[] = new int[1];
-               int temp = a[1];
-       }*/
-
        if(this.interoutputs != null) {
-          // return;
-       //}
-       for(int i = 0; i < this.sorts.length; ++i) {
-           String key = (String)this.keys.elementAt(this.sorts[i]);
-           Vector values = (Vector)this.values.get(key);
-           MapReduceBase.reduce(key, values, output);
-       }
+           for(int i = 0; i < this.sorts.length; ++i) {
+               String key = (String)this.keys.elementAt(this.sorts[i]);
+               Vector values = (Vector)this.values.get(key);
+               MapReduceBase.reduce(key, values, output);
+           }
        }
 
        // output all the result into some local file
@@ -126,6 +111,8 @@ public class ReduceWorker {
            oStream.flush();
        }
        oStream.close();
+       this.keys = null;
+       this.values = null;
     }
 
     public String getOutputFile() {
index d61bf023bbc0c7e00fad3f22d1fe69104eac3945..07e69e7f2724ea72dd61cb4a3a2e043bf33247a0 100644 (file)
@@ -2,12 +2,21 @@ public class Splitter {
     String filename;
     String content;
     int length;
-    int[] splits;
-    String[] slices;
+    int splitNum;
+    char seperator;
 
     public Splitter(String path, int splitNum, char seperator) {
        //System.printString("Top of Splitter's constructor\n");
        filename = path;
+       this.length = -1;
+       this.splitNum = splitNum;
+       this.seperator = seperator;
+    }
+
+    public String[] split() {
+       int[] splits;
+       String[] slices;
+       
        FileInputStream iStream = new FileInputStream(filename);
        byte[] b = new byte[1024 * 1024];
        length = iStream.read(b);
@@ -22,6 +31,7 @@ public class Splitter {
        if(splitNum == 1) {
            slices = new String[1];
            slices[0] = content;
+           this.content = null;
        } else {
            splits = new int[splitNum - 1];
            int index = 0;
@@ -38,36 +48,25 @@ public class Splitter {
                splits[i] = index;
            }
 
-           this.slices = new String[splits.length + 1];
-           for(int i = 0; i < this.slices.length; ++i) {
-               this.slices[i] = null;
-           }
-       }
-    }
-
-    public void split() {
-       if(slices.length == 1) {
-           return;
-       }
-       int start = 0;
-       int end = 0;
-       for(int i = 0; i < splits.length; ++i) {
-           end = splits[i];
-           if(end < start) {
-               slices[i] = null;
-           } else {
-               slices[i] = content.subString(start, end);
+           slices = new String[splitNum];
+           int start = 0;
+           int end = 0;
+           for(int i = 0; i < splits.length; ++i) {
+               end = splits[i];
+               if(end < start) {
+                   slices[i] = null;
+               } else {
+                   slices[i] = content.subString(start, end);
+               }
+               start = end + 1;
            }
-           start = end + 1;
+           slices[slices.length - 1] = content.subString(start);
+           this.content = null;
        }
-       slices[slices.length - 1] = content.subString(start);
+       return slices;
     }
 
     public String getFilename() {
        return filename;
     }
-
-    public String[] getSlices() {
-       return this.slices;
-    }
 }
index a411a1657bc91a2ed84a354baa47f1ba9132355b..3ac4cd4d6f8198102bcd9de5a9b3851c0ab36186 100644 (file)
@@ -25,15 +25,17 @@ task startup(StartupObject s{initialstate}) {
     char seperator = temp.charAt(0);
     //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
     Splitter splitter = new Splitter(inputfile, m, seperator);
-    Master master = new Master(m, r, splitter){split};
+    Master master = new Master(m, r, splitter){mapoutput};//{split};
+    
+    master.assignMap();
 
     taskexit(s{!initialstate});
 }
 
 //Split the input file into M pieces
-task split(Master master{split}) {
+/*task split(Master master{split}) {
     //System.printString("Top of task split\n");
-    master.split();
+    //master.split();
 
     taskexit(master{!split, assignMap});
 }
@@ -43,24 +45,26 @@ task assignMap(Master master{assignMap}) {
     //System.printString("Top of task assignMap\n");
     master.assignMap();
 
-    taskexit(master{!assignMap, mapoutput});
-}
+    //taskexit(master{!assignMap, mapoutput});
+    taskexit(master{!split, mapoutput});
+}*/
 
 //MapWorker do 'map' function on a input file piece
 task map(MapWorker mworker{map}) {
     //System.printString("Top of task map\n");
     mworker.map();
 
-    taskexit(mworker{!map, partition});
+    /*taskexit(mworker{!map, partition});
 }
 
 //Partition the intermediate key/value pair generated
 //into R intermediate local files
 task partition(MapWorker mworker{partition}) {
-    //System.printString("Top of task partition\n");
+    //System.printString("Top of task partition\n");*/
     mworker.partition();
 
-    taskexit(mworker{!partition, mapoutput});
+    taskexit(mworker{!map, mapoutput});
+    //taskexit(mworker{!partition, mapoutput});
 }
 
 //Register the intermediate ouput from map worker to master
@@ -69,6 +73,7 @@ task mapOutput(Master master{mapoutput}, optional MapWorker mworker{mapoutput})
     if(isavailable(mworker)) {
        int total = master.getR();
        for(int i = 0; i < total; ++i) {
+           //System.printString("mapOutput\n");
            String temp = mworker.outputFile(i);
            if(temp != null) {
                master.addInterOutput(temp); 
@@ -101,15 +106,16 @@ task sortgroup(ReduceWorker rworker{sortgroup}) {
     //System.printString("Top of task sortgroup\n");
     rworker.sortgroup();
 
-    taskexit(rworker{!sortgroup, reduce});
+    /*taskexit(rworker{!sortgroup, reduce});
 }
 
 //Do 'reduce' function
 task reduce(ReduceWorker rworker{reduce}) {
-    //System.printString("Top of task reduce\n");
+    //System.printString("Top of task reduce\n");*/
     rworker.reduce();
 
-    taskexit(rworker{!reduce, reduceoutput});
+    //taskexit(rworker{!reduce, reduceoutput});
+    taskexit(rworker{!sortgroup, reduceoutput});
 }
 
 //Collect the output into master
@@ -132,9 +138,9 @@ task reduceOutput(Master master{reduceoutput}, optional ReduceWorker rworker{red
 
 task output(Master master{output}) {
     //System.printString("Top of task output\n");
-    if(master.isPartial()) {
+    /*if(master.isPartial()) {
        System.printString("Partial! The result may not be right due to some failure!\n");
     }
-    System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+    System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/
     taskexit(master{!output});
 }
index 60407fb358c474a2d2de5ed078a11c67fad52a98..9c810dd66a55dba02979f0f34f6071f16ec3507f 100644 (file)
@@ -4,14 +4,12 @@ public class MapWorker {
     flag mapoutput;
 
     int ID;
-
     int r;
     String key;
     String value;
     OutputCollector output;
-
-    String[] locations;
-    FileOutputStream[] outputs;
+    String locationPrefix;
+    boolean[] outputsexit;
 
     public MapWorker(String key, String value, int r, int id) {
        this.ID = id;
@@ -20,37 +18,22 @@ public class MapWorker {
        this.key = key;
        this.value = value;
        this.output = new OutputCollector();
-
-       locations = new String[r];
-       for(int i = 0; i < r; ++i) {
-           StringBuffer temp = new StringBuffer("/scratch/mapreduce_opt/output-intermediate-map-");
-           temp.append(String.valueOf(ID));
-           temp.append("-of-");
-           temp.append(String.valueOf(r));
-           temp.append("_");
-           temp.append(String.valueOf(i));
-           temp.append(".dat");
-           locations[i] = new String(temp);
-       }
-
-       outputs = new FileOutputStream[r];
-       for(int i = 0; i < r; ++i) {
-           outputs[i] = null;
-       }
+       this.locationPrefix = "/scratch/mapreduce_opt/output-intermediate-map-";
+       
+       this.outputsexit = new boolean[r];
     }
 
     public void map() {
-       /*if(ID % 2 == 1) {
-               String temp = locations[locations.length];
-       }*/
-
        MapReduceBase.map(key, value, output);
+       this.key = null;
+       this.value = null;
     }
 
     public void partition() {
-       /*if(ID % 2 == 1) {
-               String temp = locations[locations.length];
-       }*/
+       FileOutputStream[] outputs = new FileOutputStream[r];
+       for(int i = 0; i < r; ++i) {
+           outputs[i] = null;
+       }
 
        int size = this.output.size();
        for(int i = 0; i < size; ++i) {
@@ -62,9 +45,10 @@ public class MapWorker {
            FileOutputStream oStream = outputs[index];
            if(oStream == null) {
                // open the file
-               String filepath = locations[index];
+               String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat";
                oStream = new FileOutputStream(filepath, true); // append
                outputs[index] = oStream;
+               this.outputsexit[index] = true;
            }
            // format: key value\n
            oStream.write(key.getBytes());
@@ -75,17 +59,27 @@ public class MapWorker {
        }
 
        // close the output files
-       for(int i = 0; i < this.outputs.length; ++i) {
-           FileOutputStream temp = this.outputs[i];
+       for(int i = 0; i < outputs.length; ++i) {
+           FileOutputStream temp = outputs[i];
            if(temp != null) {
                temp.close();
+               outputs[i] = null;
            }
        }
+       
+       this.output = null;
     }
 
     public String outputFile(int i) {
-       if(outputs[i] != null) {
-           return locations[i];
+       if(outputsexit[i]) {
+           StringBuffer temp = new StringBuffer(this.locationPrefix);
+           temp.append(String.valueOf(ID));
+           temp.append("-of-");
+           temp.append(String.valueOf(r));
+           temp.append("_");
+           temp.append(String.valueOf(i));
+           temp.append(".dat");
+           return new String(temp);
        } else {
            return null;
        }
index c547fd839180f97b50c62b0bec0f446fa99b5753..ba98fb938a7dfc3e7ba471170a7c05c5a129b8d0 100644 (file)
@@ -11,39 +11,23 @@ public class Master {
     int m;
     int r;
     int[] mworkerStates; // array of map worker's state
-    // 0: idle  1: process  2: finished 3: fail
+                         // 0: idle  1: process  2: finished 3: fail
     int[] rworkerStates; // array of reduce worker's state
     Vector[] interoutputs; // array of string vector containing
-    // paths of intermediate outputs from
-    // map worker
-
+                           // paths of intermediate outputs from
+                           // map worker
     Splitter splitter;
-
     String outputfile;  // path of final output file
-
     boolean partial;
 
     public Master(int m, int r, Splitter splitter) {
        this.m = m;
        this.r = r;
-
-       mworkerStates = new int[m];
-       rworkerStates = new int[r];
-       for(int i = 0; i < m; ++i) {
-           mworkerStates[i] = 0;
-       }
-       for(int i = 0; i < r; ++i) {
-           rworkerStates[i] = 0;
-       }
-
-       interoutputs = new Vector[r];
-       for(int i = 0; i < r; ++i) {
-           interoutputs[i] = null;
-       }
-
+       this.mworkerStates = new int[m];
+       this.rworkerStates = new int[r];
+       this.interoutputs = new Vector[r];
        this.splitter = splitter;
        this.outputfile = new String("/scratch/mapreduce_opt/output.dat");
-
        this.partial = false;
     }
 
@@ -63,12 +47,12 @@ public class Master {
        this.partial = partial || this.partial;
     }
 
-    public void split() {
+    /*public void split() {
        splitter.split();
-    }
+    }*/
 
     public void assignMap() {
-       String[] contentsplits = splitter.getSlices();
+       String[] contentsplits = splitter.split();//splitter.getSlices();
        for(int i = 0; i < contentsplits.length; ++i) {
            //System.printString("*************************\n");
            //System.printString(contentsplits[i] + "\n");
@@ -76,6 +60,8 @@ public class Master {
            MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
            mworkerStates[i] = 1;
        }
+       
+       this.splitter = null;
     }
 
     public void setMapFinish(int i) {
@@ -111,7 +97,9 @@ public class Master {
        for(int i = 0; i < interoutputs.length; ++i) {
            ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
            rworkerStates[i] = 1;
+           this.interoutputs[i] = null;
        }
+       this.interoutputs.clear();
     }
 
     public void setReduceFinish(int i) {
index 7f2cbdb4a86038eaa264e64dcac470d6e9649321..27557c9e55715bd1802399d58423581279502db0 100644 (file)
@@ -5,7 +5,7 @@ public class ReduceWorker {
 
     int ID;
     Vector interoutputs;  // string vector containing paths
-    // of intermediate outputs from map worker
+                          // of intermediate outputs from map worker
     Vector keys;
     HashMap values; // hashmap map key to vector of string vector
     int[] sorts; // array record the sort of keys
@@ -15,21 +15,13 @@ public class ReduceWorker {
     public ReduceWorker(Vector interoutputs, int id) {
        this.ID = id;
        this.interoutputs = interoutputs;
-
        this.keys = new Vector();
        this.values = new HashMap();
-       //this.sorts = null;
-
        this.output = new OutputCollector();
        this.outputfile = "/scratch/mapreduce_opt/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
     }
 
     public void sortgroup() {
-       /*if(ID % 2 == 1) {
-               int a[] = new int[1];
-               int temp = a[1];
-       }*/
-
        // group values associated to the same key
        //System.printString("================================\n");
        if(interoutputs == null) {
@@ -66,9 +58,9 @@ public class ReduceWorker {
        //System.printString("================================\n");
 
        /*for(int i = 0; i < this.keys.size(); ++i) {
-                       System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
-               }
-               System.printString("\n");*/
+           System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+       }
+       System.printString("\n");*/
 
        // sort all the keys inside interoutputs
        this.sorts = new int[this.keys.size()];
@@ -91,25 +83,18 @@ public class ReduceWorker {
            this.sorts[index] = tosort;
        }
        /*for(int i = 0; i < this.sorts.length; ++i) {
-                       System.printString(this.sorts[i] + "; ");
-               }
-               System.printString("\n");*/
+           System.printString(this.sorts[i] + "; ");
+       }
+       System.printString("\n");*/
     }
 
     public void reduce() {
-       /*if(ID % 2 == 1) {
-               int a[] = new int[1];
-               int temp = a[1];
-       }*/
-
        if(this.interoutputs != null) {
-          // return;
-       //}
-       for(int i = 0; i < this.sorts.length; ++i) {
-           String key = (String)this.keys.elementAt(this.sorts[i]);
-           Vector values = (Vector)this.values.get(key);
-           MapReduceBase.reduce(key, values, output);
-       }
+           for(int i = 0; i < this.sorts.length; ++i) {
+               String key = (String)this.keys.elementAt(this.sorts[i]);
+               Vector values = (Vector)this.values.get(key);
+               MapReduceBase.reduce(key, values, output);
+           }
        }
 
        // output all the result into some local file
@@ -126,6 +111,8 @@ public class ReduceWorker {
            oStream.flush();
        }
        oStream.close();
+       this.keys = null;
+       this.values = null;
     }
 
     public String getOutputFile() {
index d61bf023bbc0c7e00fad3f22d1fe69104eac3945..07e69e7f2724ea72dd61cb4a3a2e043bf33247a0 100644 (file)
@@ -2,12 +2,21 @@ public class Splitter {
     String filename;
     String content;
     int length;
-    int[] splits;
-    String[] slices;
+    int splitNum;
+    char seperator;
 
     public Splitter(String path, int splitNum, char seperator) {
        //System.printString("Top of Splitter's constructor\n");
        filename = path;
+       this.length = -1;
+       this.splitNum = splitNum;
+       this.seperator = seperator;
+    }
+
+    public String[] split() {
+       int[] splits;
+       String[] slices;
+       
        FileInputStream iStream = new FileInputStream(filename);
        byte[] b = new byte[1024 * 1024];
        length = iStream.read(b);
@@ -22,6 +31,7 @@ public class Splitter {
        if(splitNum == 1) {
            slices = new String[1];
            slices[0] = content;
+           this.content = null;
        } else {
            splits = new int[splitNum - 1];
            int index = 0;
@@ -38,36 +48,25 @@ public class Splitter {
                splits[i] = index;
            }
 
-           this.slices = new String[splits.length + 1];
-           for(int i = 0; i < this.slices.length; ++i) {
-               this.slices[i] = null;
-           }
-       }
-    }
-
-    public void split() {
-       if(slices.length == 1) {
-           return;
-       }
-       int start = 0;
-       int end = 0;
-       for(int i = 0; i < splits.length; ++i) {
-           end = splits[i];
-           if(end < start) {
-               slices[i] = null;
-           } else {
-               slices[i] = content.subString(start, end);
+           slices = new String[splitNum];
+           int start = 0;
+           int end = 0;
+           for(int i = 0; i < splits.length; ++i) {
+               end = splits[i];
+               if(end < start) {
+                   slices[i] = null;
+               } else {
+                   slices[i] = content.subString(start, end);
+               }
+               start = end + 1;
            }
-           start = end + 1;
+           slices[slices.length - 1] = content.subString(start);
+           this.content = null;
        }
-       slices[slices.length - 1] = content.subString(start);
+       return slices;
     }
 
     public String getFilename() {
        return filename;
     }
-
-    public String[] getSlices() {
-       return this.slices;
-    }
 }