1 public class MapWorker {
11 OutputCollector output;
14 FileOutputStream[] outputs;
16 public MapWorker(String key, String value, int r, int id) {
22 this.output = new OutputCollector();
24 locations = new String[r];
25 for(int i = 0; i < r; ++i) {
26 StringBuffer temp = new StringBuffer("/scratch/mapreduce_opt/output-intermediate-map-");
27 temp.append(String.valueOf(ID));
29 temp.append(String.valueOf(r));
31 temp.append(String.valueOf(i));
33 locations[i] = new String(temp);
36 outputs = new FileOutputStream[r];
37 for(int i = 0; i < r; ++i) {
44 String temp = locations[locations.length];
47 MapReduceBase.map(key, value, output);
50 public void partition() {
52 String temp = locations[locations.length];
55 int size = this.output.size();
56 for(int i = 0; i < size; ++i) {
57 String key = this.output.getKey(i);
58 String value = this.output.getValue(i);
59 // use the hashcode of key to decide which intermediate output
60 // this pair should be in
61 int index = (int)Math.abs(key.hashCode()) % this.r;
62 FileOutputStream oStream = outputs[index];
65 String filepath = locations[index];
66 oStream = new FileOutputStream(filepath, true); // append
67 outputs[index] = oStream;
69 // format: key value\n
70 oStream.write(key.getBytes());
72 oStream.write(value.getBytes());
77 // close the output files
78 for(int i = 0; i < this.outputs.length; ++i) {
79 FileOutputStream temp = this.outputs[i];
86 public String outputFile(int i) {
87 if(outputs[i] != null) {