13 int[] mworkerStates; // array of map worker's state
14 // 0: idle 1: process 2: finished 3: fail
15 int[] rworkerStates; // array of reduce worker's state
16 Vector[] interoutputs; // array of string vector containing
17 // paths of intermediate outputs from
22 String outputfile; // path of final output file
26 public Master(int m, int r, Splitter splitter) {
30 mworkerStates = new int[m];
31 rworkerStates = new int[r];
32 for(int i = 0; i < m; ++i) {
35 for(int i = 0; i < r; ++i) {
39 interoutputs = new Vector[r];
40 for(int i = 0; i < r; ++i) {
41 interoutputs[i] = null;
44 this.splitter = splitter;
45 this.outputfile = new String("/scratch/mapreduce_nor/output.dat");
54 public String getOutputFile() {
55 return this.outputfile;
58 public boolean isPartial() {
62 public void setPartial(boolean partial) {
63 this.partial = partial || this.partial;
70 public void assignMap() {
71 String[] contentsplits = splitter.getSlices();
72 for(int i = 0; i < contentsplits.length; ++i) {
73 //System.printString("*************************\n");
74 //System.printString(contentsplits[i] + "\n");
75 //System.printString("*************************\n");
76 MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
81 public void setMapFinish(int i) {
85 public void setMapFail(int i) {
89 public boolean isMapFinish() {
90 for(int i = 0; i < mworkerStates.length; ++i) {
91 if(mworkerStates[i] == 1) {
99 public void addInterOutput(String interoutput) {
100 int start = interoutput.lastindexOf('_');
101 int end = interoutput.indexOf('.');
102 int index = Integer.parseInt(interoutput.subString(start + 1, end));
103 //System.printString(interoutput.subString(start + 1, end) + "\n");
104 if(interoutputs[index] == null) {
105 interoutputs[index] = new Vector();
107 interoutputs[index].addElement(interoutput);
110 public void assignReduce() {
111 for(int i = 0; i < interoutputs.length; ++i) {
112 ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
113 rworkerStates[i] = 1;
117 public void setReduceFinish(int i) {
118 rworkerStates[i] = 2;
121 public void setReduceFail(int i) {
122 rworkerStates[i] = 3;
125 public boolean isReduceFinish() {
126 for(int i = 0; i < rworkerStates.length; ++i) {
127 if(rworkerStates[i] == 1) {
135 public void collectROutput(String file) {
136 FileInputStream iStream = new FileInputStream(file);
137 FileOutputStream oStream = new FileOutputStream(outputfile, true);
138 byte[] b = new byte[1024 * 10];
139 int length = iStream.read(b);
141 System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
144 //System.printString(new String(b, 0, length) + "\n");
145 oStream.write(b, 0, length);