1 public class ReduceWorker {
7 Vector interoutputs; // string vector containing paths
8 // of intermediate outputs from map worker
10 HashMap values; // hashmap map key to vector of string vector
11 int[] sorts; // array record the sort of keys
12 OutputCollector output;
13 String outputfile; // path of the intermediate output file
15 public ReduceWorker(Vector interoutputs, int id) {
17 this.interoutputs = interoutputs;
19 this.keys = new Vector();
20 this.values = new HashMap();
23 this.output = new OutputCollector();
24 this.outputfile = "/scratch/mapreduce_opt/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
27 public void sortgroup() {
33 // group values associated to the same key
34 //System.printString("================================\n");
35 if(interoutputs == null) {
38 for(int i = 0; i < interoutputs.size(); ++i) {
39 FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
40 byte[] b = new byte[1024 * 10];
41 int length = iStream.read(b);
43 System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
46 String content = new String(b, 0, length);
47 //System.printString(content + "\n");
48 int index = content.indexOf('\n');
50 String line = content.subString(0, index);
51 content = content.subString(index + 1);
52 //System.printString(line + "\n");
53 int tmpindex = line.indexOf(' ');
54 String key = line.subString(0, tmpindex);
55 String value = line.subString(tmpindex + 1);
56 //System.printString(key + "; " + value + "\n");
57 if(!this.values.containsKey(key)) {
58 this.values.put(key, new Vector());
59 this.keys.addElement(key);
61 ((Vector)this.values.get(key)).addElement(value);
62 index = content.indexOf('\n');
66 //System.printString("================================\n");
68 /*for(int i = 0; i < this.keys.size(); ++i) {
69 System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
71 System.printString("\n");*/
73 // sort all the keys inside interoutputs
74 this.sorts = new int[this.keys.size()];
78 for(; tosort < this.keys.size(); ++tosort) {
79 int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
81 for(int i = tosort; i > 0; --i) {
82 if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
83 this.sorts[i] = this.sorts[i-1];
86 //System.printString(i + "; " + tosort + "\n");
91 this.sorts[index] = tosort;
93 /*for(int i = 0; i < this.sorts.length; ++i) {
94 System.printString(this.sorts[i] + "; ");
96 System.printString("\n");*/
99 public void reduce() {
101 int a[] = new int[1];
105 if(this.interoutputs != null) {
108 for(int i = 0; i < this.sorts.length; ++i) {
109 String key = (String)this.keys.elementAt(this.sorts[i]);
110 Vector values = (Vector)this.values.get(key);
111 MapReduceBase.reduce(key, values, output);
115 // output all the result into some local file
116 int size = this.output.size();
117 FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
118 for(int i = 0; i < size; ++i) {
119 String key = this.output.getKey(i);
120 String value = this.output.getValue(i);
121 // format: key value\n
122 oStream.write(key.getBytes());
124 oStream.write(value.getBytes());
131 public String getOutputFile() {
132 return this.outputfile;