Add two benchmarks: MapReduce & PERT
[IRC.git] / Robust / src / Benchmarks / MapReduce / Tag / Master.java
1 public class Master {
2     flag split;
3     flag assignMap;
4     flag mapoutput;
5     flag mapfinished;
6     flag assignReduce;
7     flag reduceoutput;
8     flag reducefinished;
9     flag output;
10
11     int m;
12     int r;
13     int[] mworkerStates; // array of map worker's state
14     // 0: idle  1: process  2: finished 3: fail
15     int[] rworkerStates; // array of reduce worker's state
16     Vector[] interoutputs; // array of string vector containing
17     // paths of intermediate outputs from
18     // map worker
19
20     Splitter splitter;
21
22     String outputfile;  // path of final output file
23
24     boolean partial;
25
26     public Master(int m, int r, Splitter splitter) {
27         this.m = m;
28         this.r = r;
29
30         mworkerStates = new int[m];
31         rworkerStates = new int[r];
32         for(int i = 0; i < m; ++i) {
33             mworkerStates[i] = 0;
34         }
35         for(int i = 0; i < r; ++i) {
36             rworkerStates[i] = 0;
37         }
38
39         interoutputs = new Vector[r];
40         for(int i = 0; i < r; ++i) {
41             interoutputs[i] = null;
42         }
43
44         this.splitter = splitter;
45         this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
46
47         this.partial = false;
48     }
49
50     public int getR() {
51         return this.r;
52     }
53
54     public String getOutputFile() {
55         return this.outputfile;
56     }
57
58     public boolean isPartial() {
59         return this.partial;
60     }
61
62     public void setPartial(boolean partial) {
63         this.partial = partial || this.partial;
64     }
65
66     public void split() {
67         splitter.split();
68     }
69
70     public void assignMap() {
71         String[] contentsplits = splitter.getSlices();
72         for(int i = 0; i < contentsplits.length; ++i) {
73             //System.printString("*************************\n");
74             //System.printString(contentsplits[i] + "\n");
75             //System.printString("*************************\n");
76             MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
77             mworkerStates[i] = 1;
78         }
79     }
80
81     public void setMapFinish(int i) {
82         mworkerStates[i] = 2;
83     }
84
85     public void setMapFail(int i) {
86         mworkerStates[i] = 3;
87     }
88
89     public boolean isMapFinish() {
90         for(int i = 0; i < mworkerStates.length; ++i) {
91             if(mworkerStates[i] == 1) {
92                 return false;
93             }
94         }
95
96         return true;
97     }
98
99     public void addInterOutput(String interoutput) {
100         int start = interoutput.indexOf('_');
101         int end = interoutput.indexOf('.');
102         int index = Integer.parseInt(interoutput.subString(start + 1, end));
103         //System.printString(interoutput.subString(start + 1, end) + "\n");
104         if(interoutputs[index] == null) {
105             interoutputs[index] = new Vector();
106         }
107         interoutputs[index].addElement(interoutput);
108     }
109
110     public void assignReduce() {
111         for(int i = 0; i < interoutputs.length; ++i) {
112             ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
113             rworkerStates[i] = 1;
114         }
115     }
116
117     public void setReduceFinish(int i) {
118         rworkerStates[i] = 2;
119     }
120
121     public void setReduceFail(int i) {
122         rworkerStates[i] = 3;
123     }
124
125     public boolean isReduceFinish() {
126         for(int i = 0; i < rworkerStates.length; ++i) {
127             if(rworkerStates[i] == 1) {
128                 return false;
129             }
130         }
131
132         return true;
133     }
134
135     public void collectROutput(String file) {
136         FileInputStream iStream = new FileInputStream(file);
137         FileOutputStream oStream = new FileOutputStream(outputfile, true);
138         byte[] b = new byte[1024 * 100];
139         int length = iStream.read(b);
140         if(length < 0) {
141             System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
142             System.exit(-1);
143         }
144         //System.printString(new String(b, 0, length) + "\n");
145         oStream.write(b, 0, length);
146         iStream.close();
147         oStream.close();
148     }
149 }