start of new file
[IRC.git] / Robust / src / Benchmarks / Scheduling / MapReduce / MapReduce.java
1 task t1(StartupObject s{initialstate}) {
2     System.printString("task t1\n");
3     
4     String inputfile = "Manila International Airport Authority spokesman Octavio Lina said there were no injuries, but some of the 345 passengers vomited after disembarking, AP reported. Video of the incident shows passengers applauding as the plane landed safely.";
5     int m = 6;
6     int r = 3;
7     char seperator = '\n';
8     Splitter splitter = new Splitter(inputfile, m, seperator);
9     Master master = new Master(m, r, splitter){split};
10
11     taskexit(s{!initialstate});
12 }
13
14 //Split the input file into M pieces
15 task t2(Master master{split}) {
16     System.printString("task t2\n");
17     
18     master.split();
19
20     taskexit(master{!split, assignMap});
21 }
22
23 //Select a map worker to handle one of the pieces of input file
24 task t3(Master master{assignMap}) {
25     System.printString("task t3\n");
26     
27     //master.assignMap();
28     Splitter splitter = master.getSplitter();
29     String[] contentsplits = splitter.getSlices();
30     for(int i = 0; i < contentsplits.length; ++i) {
31         MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], master.getR(), i){map};
32         master.setMapInit(i);
33     }
34
35     taskexit(master{!assignMap, mapoutput});
36 }
37
38 //MapWorker do 'map' function on a input file piece
39 task t4(MapWorker mworker{map}) {
40     System.printString("task t4\n");
41     
42     mworker.map();
43
44     taskexit(mworker{!map, partition});
45 }
46
47 //Partition the intermediate key/value pair generated
48 //into R intermediate local files
49 task t5(MapWorker mworker{partition}) {
50     System.printString("task t5\n");
51     
52     mworker.partition();
53
54     taskexit(mworker{!partition, mapoutput});
55 }
56
57 //Register the intermediate ouput from map worker to master
58 task t6(Master master{mapoutput}, MapWorker mworker{mapoutput}) {
59     System.printString("task t6\n");
60     
61     int total = master.getR();
62     for(int i = 0; i < total; ++i) {
63         OutputCollector temp = mworker.outputFile(i);
64         if(temp != null) {
65             master.addInterOutput(temp, i); 
66         }
67     }
68     master.setMapFinish(mworker.getID());
69
70     if(master.isMapFinish()) {
71         taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
72     }
73
74     taskexit(mworker{!mapoutput});
75 }
76
77 //Assign the list of intermediate output associated to one key to
78 //a reduce worker 
79 task t7(Master master{assignReduce}) {
80     System.printString("task t7\n");
81     
82     //master.assignReduce();
83     Vector[] interoutputs = master.getInteroutputs();
84     for(int i = 0; i < interoutputs.length; ++i) {
85         ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
86         master.setReduceInit(i);
87     }
88
89     taskexit(master{!assignReduce, reduceoutput});
90 }
91
92 //First do sort and group on the intermediate key/value pairs assigned
93 //to reduce worker
94 task t8(ReduceWorker rworker{sortgroup}) {
95     System.printString("task t8\n");
96     
97     rworker.sortgroup();
98
99     taskexit(rworker{!sortgroup, reduce});
100 }
101
102 //Do 'reduce' function
103 task t9(ReduceWorker rworker{reduce}) {
104     System.printString("task t9\n");
105     
106     rworker.reduce();
107
108     taskexit(rworker{!reduce, reduceoutput});
109 }
110
111 //Collect the output into master
112 task t10(Master master{reduceoutput}, ReduceWorker rworker{reduceoutput}) {
113     System.printString("task t10\n");
114     
115     master.collectROutput(rworker.getOutput());
116     master.setReduceFinish(rworker.getID());
117
118     if(master.isReduceFinish()) {
119         taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
120     }
121
122     taskexit(rworker{!reduceoutput});
123 }
124
125 task t11(Master master{output}) {
126     System.printString("task t11\n");
127     
128     /*if(master.isPartial()) {
129         System.printString("Partial! The result may not be right due to some failure!\n");
130     }*/
131     System.printString("Finish!\n");// Results are in the output file: " + master.getOutputFile() + "\n");
132     System.printI(0xdddd);
133     taskexit(master{!output});
134 }