3 /*import java.io.FileInputStream;
4 import java.io.FileOutputStream;
5 import java.util.Vector;*/
11 int[] mworkerStates; // array of map worker's state
12 // 0: idle 1: process 2: finished 3: fail
13 int[] rworkerStates; // array of reduce worker's state
14 Vector[] interoutputs; // array of string vector containing
15 // paths of intermediate outputs from
20 String outputfile; // path of final output file
24 public Master(int m, int r, Splitter splitter) {
28 mworkerStates = new int[m];
29 rworkerStates = new int[r];
30 for(int i = 0; i < m; ++i) {
33 for(int i = 0; i < r; ++i) {
37 interoutputs = new Vector[r];
38 for(int i = 0; i < r; ++i) {
39 interoutputs[i] = null;
42 this.splitter = splitter;
43 this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
52 public String getOutputFile() {
53 return this.outputfile;
56 public boolean isPartial() {
60 public void setPartial(boolean partial) {
61 this.partial = partial || this.partial;
68 public MapWorker[] assignMap() {
69 String[] contentsplits = splitter.getSlices();
70 MapWorker[] mworkers = new MapWorker[contentsplits.length];
71 for(int i = 0; i < contentsplits.length; ++i) {
72 //System.printString("*************************\n");
73 //System.printString(contentsplits[i] + "\n");
74 //System.printString("*************************\n");
75 MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i);
77 mworkers[i] = mworker;
82 public void setMapFinish(int i) {
86 public void setMapFail(int i) {
90 public boolean isMapFinish() {
91 for(int i = 0; i < mworkerStates.length; ++i) {
92 if(mworkerStates[i] == 1) {
100 public void addInterOutput(String interoutput) {
101 int start = interoutput.indexOf('_');
102 int end = interoutput.indexOf('.');
103 int index = Integer.parseInt(interoutput.substring(start + 1, end));
104 //System.printString(interoutput.subString(start + 1, end) + "\n");
105 if(interoutputs[index] == null) {
106 interoutputs[index] = new Vector();
108 interoutputs[index].addElement(interoutput);
111 public ReduceWorker[] assignReduce() {
112 ReduceWorker[] rworkers = new ReduceWorker[interoutputs.length];
113 for(int i = 0; i < interoutputs.length; ++i) {
114 ReduceWorker rworker = new ReduceWorker(interoutputs[i], i);
115 rworkerStates[i] = 1;
116 rworkers[i] = rworker;
121 public void setReduceFinish(int i) {
122 rworkerStates[i] = 2;
125 public void setReduceFail(int i) {
126 rworkerStates[i] = 3;
129 public boolean isReduceFinish() {
130 for(int i = 0; i < rworkerStates.length; ++i) {
131 if(rworkerStates[i] == 1) {
139 public void collectROutput(String file) {
141 FileInputStream iStream = new FileInputStream(file);
142 FileOutputStream oStream = new FileOutputStream(outputfile, true);
143 byte[] b = new byte[1024 * 100];
144 int length = iStream.read(b);
146 System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
149 //System.printString(new String(b, 0, length) + "\n");
150 oStream.write(b, 0, length);
153 /*} catch(Exception e) {