time stamp
[IRC.git] / Robust / src / ClassLibrary / JavaDSM / Work.java
1 public class Work extends Thread {
2   Task tasks;
3         Object[] currentWorkList;
4         int MY_MID;
5         int NUM_THREADS;
6
7         Work (Task tasks, int num_threads, int mid, Object[] currentWorkList) {
8                 this.tasks = tasks;
9                 this.currentWorkList = currentWorkList;
10                 NUM_THREADS = num_threads;
11                 MY_MID = mid;
12         }
13
14         public void run() {
15     int workMID;
16     long fi,st;
17
18     atomic {
19       workMID = MY_MID;
20     }
21
22     System.out.println("Thread " + workMID + " has started");
23     st = System.currentTimeMillis();
24
25     Task localTask;
26     int chk; 
27     int result;
28     int i,j;
29                 boolean isEmpty;
30
31     while(true) {
32       atomic {
33                                         isEmpty = tasks.isTodoListEmpty();              // flag > !keep assigning 
34                         
35           if (!isEmpty) {
36                         currentWorkList[workMID] = tasks.grabTask();    /* grab the work from work pool */
37             chk = 1;
38           }
39                         else {
40             chk = Work.checkCurrentWorkList(this);
41         }
42       }
43
44       if(chk == 1) {    // still have work
45         atomic {
46           tasks.setWork(currentWorkList[workMID]);
47           localTask = tasks;
48         }
49         /* compute */
50         localTask.execution();
51
52         atomic {
53           /* push into done list */
54           tasks.done(currentWorkList[workMID]);
55                                         currentWorkList[workMID] = null;
56         }
57       }
58       else if(chk  == -1) {    // finished all work
59         break;
60       }
61       else {    // wait for other thread
62                                 sleep(5000000); 
63       }
64
65     }
66
67     fi = System.currentTimeMillis();
68
69     /* for debugging purpose */
70     atomic {
71       tasks.output();
72     }
73     System.out.println("\n\n I'm done\n\n\n");
74     System.out.println("Time Elapse = " + (double)((fi-st)/1000));
75         
76     RecoveryStat.printRecoveryStat();
77
78     while(true) {
79       sleep(100000);
80     }
81   }
82
83         public static int checkCurrentWorkList(Work mywork) {           
84     int i;
85     int index = -1;
86     int myID;
87                 int num_threads; 
88     int status;
89     boolean chk = false;
90     Object s;
91
92                 atomic {
93             myID = mywork.MY_MID;
94                         num_threads = mywork.NUM_THREADS;
95                 }
96
97     for(i = 0 ; (i < num_threads) && (index < 0); i++) {
98       if(myID == i) {
99         continue;
100       }
101                         status = Thread.getStatus(i);
102
103       atomic {
104
105         s = mywork.currentWorkList[i];
106
107         if(status == -1 && null != s) {
108           mywork.currentWorkList[myID] = mywork.currentWorkList[i];
109           mywork.currentWorkList[i] = null;
110           index = 0;
111         }
112         else if(null != s) {
113           chk = true;
114         }
115       }
116                         
117     }
118
119     if(index == 0)  // grabbed dead machine's work
120       return 1;
121     else if(i == num_threads && index < 0 && chk != true)  // wait for other machine's work
122       return -1;
123     else
124       return 0; // others are still working wait until they finish work
125   }
126
127   public static native void printRecoveryStat();
128 }
129