Add two benchmarks: MapReduce & PERT
[IRC.git] / Robust / src / Benchmarks / MapReduce / Tag / MapReduce.java
1 task startup(StartupObject s{initialstate}) {
2     // read in configuration parameters
3     // System.printString("Top of task startup\n");
4     String path = new String("/home/jzhou/mapreduce/conf.txt");
5     FileInputStream iStream = new FileInputStream(path);
6     byte[] b = new byte[1024];
7     int length = iStream.read(b);
8     if(length < 0 ) {
9         System.printString("Error! Can not read from configure file: " + path + "\n");
10         System.exit(-1);
11     }
12     String content = new String(b, 0, length);
13     //System.printString(content + "\n");
14     int index = content.indexOf('\n');
15     String inputfile = content.subString(0, index);
16     content = content.subString(index + 1);
17     index = content.indexOf('\n');
18     int m = Integer.parseInt(content.subString(0, index));
19     content = content.subString(index + 1);
20     index = content.indexOf('\n');
21     int r = Integer.parseInt(content.subString(0, index));
22     content = content.subString(index + 1);
23     index = content.indexOf('\n');
24     String temp = content.subString(0, index);
25     char seperator = temp.charAt(0);
26     //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
27     Splitter splitter = new Splitter(inputfile, m, seperator);
28     Master master = new Master(m, r, splitter){split};
29
30     taskexit(s{!initialstate});
31 }
32
33 //Split the input file into M pieces
34 task split(Master master{split}) {
35     System.printString("Top of task split\n");
36     master.split();
37
38     taskexit(master{!split, assignMap});
39 }
40
41 //Select a map worker to handle one of the pieces of input file
42 task assignMap(Master master{assignMap}) {
43     System.printString("Top of task assignMap\n");
44     master.assignMap();
45
46     taskexit(master{!assignMap, mapoutput});
47 }
48
49 //MapWorker do 'map' function on a input file piece
50 task map(MapWorker mworker{map}) {
51     System.printString("Top of task map\n");
52     mworker.map();
53
54     taskexit(mworker{!map, partition});
55 }
56
57 //Partition the intermediate key/value pair generated
58 //into R intermediate local files
59 task partition(MapWorker mworker{partition}) {
60     System.printString("Top of task partition\n");
61     mworker.partition();
62
63     taskexit(mworker{!partition, mapoutput});
64 }
65
66 //Register the intermediate ouput from map worker to master
67 task mapOutput(Master master{mapoutput}, optional MapWorker mworker{mapoutput}) {
68     System.printString("Top of task mapOutput\n");
69     if(isavailable(mworker)) {
70         int total = master.getR();
71         for(int i = 0; i < total; ++i) {
72             String temp = mworker.outputFile(i);
73             if(temp != null) {
74                 master.addInterOutput(temp); 
75             }
76         }
77         master.setMapFinish(mworker.getID());
78     } else {
79         master.setMapFail(mworker.getID());
80         master.setPartial(true);
81     }
82     if(master.isMapFinish()) {
83         taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
84     }
85
86     taskexit(mworker{!mapoutput});
87 }
88
89 //Assign the list of intermediate output associated to one key to
90 //a reduce worker 
91 task assignReduce(Master master{assignReduce}) {
92     System.printString("Top of task assignReduce\n");
93     master.assignReduce();
94
95     taskexit(master{!assignReduce, reduceoutput});
96 }
97
98 //First do sort and group on the intermediate key/value pairs assigned
99 //to reduce worker
100 task sortgroup(ReduceWorker rworker{sortgroup}) {
101     System.printString("Top of task sortgroup\n");
102     rworker.sortgroup();
103
104     taskexit(rworker{!sortgroup, reduce});
105 }
106
107 //Do 'reduce' function
108 task reduce(ReduceWorker rworker{reduce}) {
109     System.printString("Top of task reduce\n");
110     rworker.reduce();
111
112     taskexit(rworker{!reduce, reduceoutput});
113 }
114
115 //Collect the output into master
116 task reduceOutput(Master master{reduceoutput}, optional ReduceWorker rworker{reduceoutput}) {
117     System.printString("Top of task reduceOutput\n");
118     if(isavailable(rworker)) {
119         master.collectROutput(rworker.getOutputFile());
120         master.setReduceFinish(rworker.getID());
121     } else {
122         master.setReduceFail(rworker.getID());
123         master.setPartial(true);
124     }
125     if(master.isReduceFinish()) {
126         //System.printString("reduce finish\n");
127         taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
128     }
129
130     taskexit(rworker{!reduceoutput});
131 }
132
133 task output(Master master{output}) {
134     System.printString("Top of task output\n");
135     if(master.isPartial()) {
136         System.printString("Partial! The result may not be right due to some failure!\n");
137     }
138     System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
139     taskexit(master{!output});
140 }