1 task startup(StartupObject s{initialstate}) {
2 // read in configuration parameters
3 // System.printString("Top of task startup\n");
4 String path = new String("/scratch/mapreduce_opt/conf.txt");
5 FileInputStream iStream = new FileInputStream(path);
6 byte[] b = new byte[1024];
7 int length = iStream.read(b);
9 System.printString("Error! Can not read from configure file: " + path + "\n");
12 String content = new String(b, 0, length);
13 //System.printString(content + "\n");
14 int index = content.indexOf('\n');
15 String inputfile = content.subString(0, index);
16 content = content.subString(index + 1);
17 index = content.indexOf('\n');
18 int m = Integer.parseInt(content.subString(0, index));
19 content = content.subString(index + 1);
20 index = content.indexOf('\n');
21 int r = Integer.parseInt(content.subString(0, index));
22 content = content.subString(index + 1);
23 index = content.indexOf('\n');
24 String temp = content.subString(0, index);
25 char seperator = temp.charAt(0);
26 //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
27 Splitter splitter = new Splitter(inputfile, m, seperator);
28 Master master = new Master(m, r, splitter){split};
30 taskexit(s{!initialstate});
33 //Split the input file into M pieces
34 task split(Master master{split}) {
35 //System.printString("Top of task split\n");
38 taskexit(master{!split, assignMap});
41 //Select a map worker to handle one of the pieces of input file
42 task assignMap(Master master{assignMap}) {
43 //System.printString("Top of task assignMap\n");
46 taskexit(master{!assignMap, mapoutput});
49 //MapWorker do 'map' function on a input file piece
50 task map(MapWorker mworker{map}) {
51 //System.printString("Top of task map\n");
54 taskexit(mworker{!map, partition});
57 //Partition the intermediate key/value pair generated
58 //into R intermediate local files
59 task partition(MapWorker mworker{partition}) {
60 //System.printString("Top of task partition\n");
63 taskexit(mworker{!partition, mapoutput});
66 //Register the intermediate ouput from map worker to master
67 task mapOutput(Master master{mapoutput}, optional MapWorker mworker{mapoutput}) {
68 //System.printString("Top of task mapOutput\n");
69 if(isavailable(mworker)) {
70 int total = master.getR();
71 for(int i = 0; i < total; ++i) {
72 String temp = mworker.outputFile(i);
74 master.addInterOutput(temp);
77 master.setMapFinish(mworker.getID());
79 master.setMapFail(mworker.getID());
80 master.setPartial(true);
82 if(master.isMapFinish()) {
83 taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
86 taskexit(mworker{!mapoutput});
89 //Assign the list of intermediate output associated to one key to
91 task assignReduce(Master master{assignReduce}) {
92 //System.printString("Top of task assignReduce\n");
93 master.assignReduce();
95 taskexit(master{!assignReduce, reduceoutput});
98 //First do sort and group on the intermediate key/value pairs assigned
100 task sortgroup(ReduceWorker rworker{sortgroup}) {
101 //System.printString("Top of task sortgroup\n");
104 taskexit(rworker{!sortgroup, reduce});
107 //Do 'reduce' function
108 task reduce(ReduceWorker rworker{reduce}) {
109 //System.printString("Top of task reduce\n");
112 taskexit(rworker{!reduce, reduceoutput});
115 //Collect the output into master
116 task reduceOutput(Master master{reduceoutput}, optional ReduceWorker rworker{reduceoutput}) {
117 //System.printString("Top of task reduceOutput\n");
118 if(isavailable(rworker)) {
119 master.collectROutput(rworker.getOutputFile());
120 master.setReduceFinish(rworker.getID());
122 master.setReduceFail(rworker.getID());
123 master.setPartial(true);
125 if(master.isReduceFinish()) {
126 //System.printString("reduce finish\n");
127 taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
130 taskexit(rworker{!reduceoutput});
133 task output(Master master{output}) {
134 //System.printString("Top of task output\n");
135 if(master.isPartial()) {
136 System.printString("Partial! The result may not be right due to some failure!\n");
138 System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
139 taskexit(master{!output});