helpful progress reporting
[IRC.git] / Robust / src / Benchmarks / MapReduce / Tag / Master.java
1 public class Master {
2     flag split;
3     flag assignMap;
4     flag mapoutput;
5     flag mapfinished;
6     flag assignReduce;
7     flag reduceoutput;
8     flag reducefinished;
9     flag output;
10
11     int m;
12     int r;
13     int[] mworkerStates; // array of map worker's state
14                          // 0: idle  1: process  2: finished 3: fail
15     int[] rworkerStates; // array of reduce worker's state
16     Vector[] interoutputs; // array of string vector containing
17                            // paths of intermediate outputs from
18                            // map worker
19     Splitter splitter;
20     String outputfile;  // path of final output file
21     boolean partial;
22
23     public Master(int m, int r, Splitter splitter) {
24         this.m = m;
25         this.r = r;
26         this.mworkerStates = new int[m];
27         this.rworkerStates = new int[r];
28         this.interoutputs = new Vector[r];
29         this.splitter = splitter;
30         this.outputfile = new String("/scratch/mapreduce_opt/output.dat");
31         this.partial = false;
32     }
33
34     public int getR() {
35         return this.r;
36     }
37
38     public String getOutputFile() {
39         return this.outputfile;
40     }
41
42     public boolean isPartial() {
43         return this.partial;
44     }
45
46     public void setPartial(boolean partial) {
47         this.partial = partial || this.partial;
48     }
49
50     /*public void split() {
51         splitter.split();
52     }*/
53
54     public void assignMap() {
55         String[] contentsplits = splitter.split();//splitter.getSlices();
56         for(int i = 0; i < contentsplits.length; ++i) {
57             //System.printString("*************************\n");
58             //System.printString(contentsplits[i] + "\n");
59             //System.printString("*************************\n");
60             MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
61             mworkerStates[i] = 1;
62         }
63         
64         this.splitter = null;
65     }
66
67     public void setMapFinish(int i) {
68         mworkerStates[i] = 2;
69     }
70
71     public void setMapFail(int i) {
72         mworkerStates[i] = 3;
73     }
74
75     public boolean isMapFinish() {
76         for(int i = 0; i < mworkerStates.length; ++i) {
77             if(mworkerStates[i] == 1) {
78                 return false;
79             }
80         }
81
82         return true;
83     }
84
85     public void addInterOutput(String interoutput) {
86         int start = interoutput.lastindexOf('_');
87         int end = interoutput.indexOf('.');
88         int index = Integer.parseInt(interoutput.subString(start + 1, end));
89         //System.printString(interoutput.subString(start + 1, end) + "\n");
90         if(interoutputs[index] == null) {
91             interoutputs[index] = new Vector();
92         }
93         interoutputs[index].addElement(interoutput);
94     }
95
96     public void assignReduce() {
97         for(int i = 0; i < interoutputs.length; ++i) {
98             ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
99             rworkerStates[i] = 1;
100             this.interoutputs[i] = null;
101         }
102         this.interoutputs.clear();
103     }
104
105     public void setReduceFinish(int i) {
106         rworkerStates[i] = 2;
107     }
108
109     public void setReduceFail(int i) {
110         rworkerStates[i] = 3;
111     }
112
113     public boolean isReduceFinish() {
114         for(int i = 0; i < rworkerStates.length; ++i) {
115             if(rworkerStates[i] == 1) {
116                 return false;
117             }
118         }
119
120         return true;
121     }
122
123     public void collectROutput(String file) {
124         FileInputStream iStream = new FileInputStream(file);
125         FileOutputStream oStream = new FileOutputStream(outputfile, true);
126         byte[] b = new byte[1024 * 10];
127         int length = iStream.read(b);
128         if(length < 0) {
129             System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
130             System.exit(-1);
131         }
132         //System.printString(new String(b, 0, length) + "\n");
133         oStream.write(b, 0, length);
134         iStream.close();
135         oStream.close();
136     }
137 }