71dc42527975414ba318512278c9f6f9d9b1e4fc
[IRC.git] / Robust / src / Benchmarks / Spider / dsm / QueryThread.java
1 public class QueryThread extends Thread {
2         int maxDepth;
3         int maxSearchDepth;
4   int MY_MID;
5   int NUM_THREADS;
6   Queue todoList;
7   Queue doneList;
8   Query myWork;
9   Query[] currentWorkList;
10
11   public QueryThread(Queue todoList, Queue doneList, int maxDepth, int maxSearchDepth,int mid,int NUM_THREADS,Query[] currentWorkList) {    
12     this.todoList = todoList;
13                 this.doneList = doneList;
14                 this.maxDepth = maxDepth;
15                 this.maxSearchDepth = maxSearchDepth;
16     this.currentWorkList = currentWorkList;
17     this.MY_MID = mid;
18     this.NUM_THREADS = NUM_THREADS;
19   }
20
21   public void run()
22   {
23     int workMID;
24
25     atomic {
26       workMID = MY_MID;
27     }
28
29     System.out.println("Thread " + workMID + " has started");
30
31     int chk;
32
33     while(true) {
34       atomic {
35         myWork = (Query)todoList.pop();
36         
37         if(null == myWork)  // no work in todolist
38         {
39           chk = checkCurrentWorkList(this);        
40         }
41         else {
42           currentWorkList[workMID] = myWork;
43           chk = 1;
44         }
45       }
46
47       if(chk == 1) { // it has query
48         execute(this);
49
50         atomic {
51           doneWork(myWork);
52           currentWorkList[workMID] = null;
53         }
54       }
55       else if(chk == -1) { // finished all work
56         break;
57       }
58       else {  // wait for other thread
59         sleep(5000000);
60       }
61
62     }
63
64    atomic {
65       System.out.println("\n\nDoneSize = " + doneList.size());
66     }
67
68     System.out.println("\n\n\n I'm done");
69   }
70
71         public static int checkCurrentWorkList(QueryThread qt) {                
72     int i;
73     int myID;
74                 int num_threads; 
75     boolean chk = false;
76     Object s;
77
78                 atomic {
79             myID = qt.MY_MID;
80                         num_threads = qt.NUM_THREADS;
81
82       for(i = 0 ; (i < num_threads); i++) {
83         if(myID == i) {
84           continue;
85         }  
86
87         s = qt.currentWorkList[i];
88
89         if(null != s) {
90           chk = true;
91           break;
92         }
93       }
94                         
95     }
96
97     if(chk == false)  // wait for other machine's work
98       return -1;
99     else
100       return 0; // others are still working wait until they finish work
101   }
102
103   public static void execute(QueryThread qt) {
104                 int depth;
105     int max;
106     int maxSearch;
107
108     atomic {
109       if(qt.myWork == null) {
110         System.out.println("What!!!!!!!!!!!!!!!");
111         System.exit(0);
112       }
113                         depth = ((Query)qt.myWork).getDepth();
114       max = qt.maxDepth;
115       maxSearch = qt.maxSearchDepth;
116                 }
117
118                 if (depth < max) {
119                         /* global variables */
120                         Query q;
121                         GlobalString ghostname;
122                         GlobalString gpath;
123
124                         /* local variables */
125                         QueryQueue toprocess;
126                         LocalQuery lq;
127                         String hostname;
128                         String path;
129
130                         atomic {
131                                 q = (Query)(qt.myWork);
132                                 ghostname = q.getHostName();
133                                 gpath = q.getPath();
134                                 hostname = new String(GlobalString.toLocalCharArray(ghostname));
135                                 path = new String(GlobalString.toLocalCharArray(gpath));
136                         }
137                         lq = new LocalQuery(hostname, path, depth);
138
139                         System.printString("Processing - Hostname : ");
140                         System.printString(hostname);
141                         System.printString(", Path : ");
142                         System.printString(path);
143                         System.printString("\n");
144
145                         Socket s = new Socket(hostname, 80);
146     
147                         requestQuery(hostname, path, s);
148                         readResponse(lq, s);
149                         toprocess = processPage(lq,maxSearch);
150                         s.close();
151
152                         atomic {
153                                 while(!toprocess.isEmpty()) {
154                                         lq = toprocess.pop();
155                                         ghostname = global new GlobalString(lq.getHostName());
156                                         gpath = global new GlobalString(lq.getPath());
157
158                                         q = global new Query(ghostname, gpath, lq.getDepth());
159                                         qt.todoList.push(q);
160                                 }
161                         }
162                 }
163   }
164         
165         public static void requestQuery(String hostname, String path, Socket sock) {
166     StringBuffer req = new StringBuffer("GET "); 
167     req.append("/");
168                 req.append(path);
169     req.append(" HTTP/1.1\r\nHost:");
170     req.append(hostname);
171     req.append("\r\n\r\n");
172     sock.write(req.toString().getBytes());
173   }
174
175         public static void readResponse(LocalQuery lq, Socket sock) {
176         //    state 0 - nothing
177         //    state 1 - \r
178         //    state 2 - \r\n
179         //    state 3 - \r\n\r
180         //    state 4 - \r\n\r\n
181     int state=0;
182     while(true) {
183       if (state<4) {
184         if (state==0) {
185           byte[] b=new byte[1];
186           int numchars=sock.read(b);
187           if ((numchars==1)) {
188             if (b[0]=='\r') {
189               state++;
190             }
191           } else
192                                                 return;
193         } else if (state==1) {
194           byte[] b=new byte[1];
195           int numchars=sock.read(b);
196           if (numchars==1) {
197             if (b[0]=='\n')
198               state++;
199             else
200               state=0;
201           } else return;
202         } else if (state==2) {
203           byte[] b=new byte[1];
204           int numchars=sock.read(b);
205           if (numchars==1) {
206             if (b[0]=='\r')
207               state++;
208             else
209               state=0;
210           } else return;
211         } else if (state==3) {
212           byte[] b=new byte[1];
213           int numchars=sock.read(b);
214           if (numchars==1) {
215             if (b[0]=='\n')
216               state++;
217             else
218               state=0;
219           } else return;
220         }
221       } else {
222                                 byte[] buffer=new byte[1024];
223         int numchars=sock.read(buffer);
224         if (numchars==0)
225           return;
226         else {
227           String curr=(new String(buffer)).subString(0,numchars);
228                                         lq.response.append(curr);
229         }
230       }
231     }
232   }
233         
234         public void doneWork(Object obj) {
235                 doneList.push(obj);
236         }
237
238   public static QueryQueue processPage(LocalQuery lq,int maxSearchDepth) {
239     int index = 0;
240         String href = new String("href=\"");
241         String searchstr = lq.response.toString();
242                 int depth;
243         boolean cont = true;
244
245                 QueryQueue toprocess = new QueryQueue();
246                 depth = lq.getDepth() + 1;
247
248                 int searchDepthCnt = 0;
249                 while(cont && (searchDepthCnt < maxSearchDepth)) {
250                         int mindex = searchstr.indexOf(href,index);
251                         if (mindex != -1) {     
252                                 int endquote = searchstr.indexOf('"', mindex+href.length());
253                 if (endquote != -1) {
254                       String match = searchstr.subString(mindex+href.length(), endquote);
255                                         String match2 = lq.makewebcanonical(match);
256         
257                       if (match2 != null) {
258                                                 LocalQuery newlq = new LocalQuery(lq.getHostName(match), lq.getPathName(match), depth);
259
260                                                 toprocess.push(newlq);
261                                                 searchDepthCnt++;
262                                         }
263                                         index = endquote;
264         } else cont = false;
265       } else cont = false;
266     }
267
268                 return toprocess;
269   }
270 }