77db3cd74454ecadec227f12851bc4214f1c48e7
[IRC.git] / Robust / src / Benchmarks / Spider / dsm / QueryThread.java
1 public class QueryThread extends Thread {
2   int MY_MID;
3   int NUM_THREADS;
4   Queue todoList;
5   DistributedHashMap doneList;
6   GlobalQuery myWork;
7   GlobalQuery[] currentWorkList;
8
9   DistributedHashMap results;
10   Queue toprocess;
11   GlobalString gTitle;
12   GlobalString workingURL;
13         int maxDepth;
14
15   public QueryThread(Queue todoList, DistributedHashMap doneList, DistributedHashMap results,int maxDepth,int mid,int NUM_THREADS,GlobalQuery[] currentWorkList) {    
16     this.todoList = todoList;
17                 this.doneList = doneList;
18     this.results = results;
19                 this.maxDepth = maxDepth;
20     this.currentWorkList = currentWorkList;
21     this.MY_MID = mid;
22     this.NUM_THREADS = NUM_THREADS;
23   }
24
25   public void run()
26   {
27     int workMID;
28
29     atomic {
30       workMID = MY_MID;
31     }
32
33     System.out.println("Thread " + workMID + " has started");
34
35     int chk;
36
37     while(true) {
38       atomic {
39         myWork = (GlobalQuery)todoList.pop();
40         
41         if(null == myWork)  // no work in todolist
42         {
43           chk = checkCurrentWorkList(this);        
44         }
45         else {
46           currentWorkList[workMID] = myWork;
47           chk = 1;
48         }
49       }
50
51       if(chk == 1) { // it has query
52         QueryThread.execute(this);
53
54         atomic {
55           done(myWork);
56           currentWorkList[workMID] = null;
57         }
58       }
59       else if(chk == -1) { // finished all work
60         break;
61       }
62       else {  // wait for other thread
63         sleep(5000000);
64       }
65
66     }
67
68    atomic {
69       System.out.println("\n\nDoneSize = " + doneList.size());
70     }
71
72     System.out.println("\n\n\n I'm done");
73   }
74
75         public static int checkCurrentWorkList(QueryThread qt) {                
76     int i;
77     int myID;
78                 int num_threads; 
79     boolean chk = false;
80     Object s;
81
82                 atomic {
83             myID = qt.MY_MID;
84                         num_threads = qt.NUM_THREADS;
85
86       for(i = 0 ; (i < num_threads); i++) {
87         if(myID == i) {
88           continue;
89         }  
90
91         s = qt.currentWorkList[i];
92
93         if(null != s) {
94           chk = true;
95           break;
96         }
97       }
98                         
99     }
100
101     if(chk == false)  // wait for other machine's work
102       return -1;
103     else
104       return 0; // others are still working wait until they finish work
105   }
106
107   public static void execute(QueryThread qt) {
108                 int depth;
109                 int max;
110                 
111                 atomic {
112                         depth = qt.myWork.getDepth();
113       max = qt.maxDepth;
114                 }
115
116                 if (depth < max) {
117                         /* global variables */
118                         GlobalQuery gq;
119
120                         /* local variables */
121                         LocalQuery lq;
122                         String hostname;
123                         String path;
124                         String title;
125
126                         atomic {
127                                 gq = qt.myWork;
128                                 hostname = new String(GlobalString.toLocalCharArray(gq.getHostName()));
129                                 path = new String(GlobalString.toLocalCharArray(gq.getPath()));
130
131                                 GlobalStringBuffer gsb = global new GlobalStringBuffer(hostname);
132                                 gsb.append("/");
133                                 gsb.append(path);
134                                 qt.workingURL = global new GlobalString(gsb.toGlobalString());
135                                 qt.gTitle = null;
136                         }
137                         lq = new LocalQuery(hostname, path, depth);
138
139                         System.printString("["+lq.getDepth()+"] ");
140                         System.printString("Processing - Hostname : ");
141                         System.printString(hostname);
142                         System.printString(", Path : ");
143                         System.printString(path);
144                         System.printString("\n");
145
146                         Socket s = new Socket(hostname, 80);
147     
148                         requestQuery(hostname, path, s);
149                         readResponse(lq, s);
150
151                         if ((title = grabTitle(lq)) != null) {
152                                 atomic {
153                                         qt.gTitle = global new GlobalString(title);
154                                 }
155                         }
156
157                         atomic {
158                                 qt.toprocess = processPage(lq);
159                         }
160
161                         s.close();
162                 }
163   }
164
165         public void done(Object obj) {
166                 if (gTitle != null) 
167                         processList();
168
169                 GlobalString str = global new GlobalString("true");
170
171                 doneList.put(workingURL, str);
172
173                 while(!toprocess.isEmpty()) {
174                         GlobalQuery q = (GlobalQuery)toprocess.pop();
175
176                         GlobalString hostname = global new GlobalString(q.getHostName());
177                         GlobalString path = global new GlobalString(q.getPath());
178
179                         GlobalStringBuffer gsb = global new GlobalStringBuffer(hostname);
180                         gsb.append("/");
181                         gsb.append(path);
182
183                         if (!doneList.containsKey(gsb.toGlobalString())) {
184                                 todoList.push(q);
185                         }
186                 }
187         }
188
189         public static String grabTitle(LocalQuery lq) {
190                 String sTitle = new String("<title>");  
191                 String eTitle = new String("</title>");
192         String searchstr = lq.response.toString();
193                 String title = null;
194                 char ch;
195
196                 int mindex = searchstr.indexOf(sTitle);
197                 if (mindex != -1) {
198                         int endquote = searchstr.indexOf(eTitle, mindex+sTitle.length());
199
200                         title = new String(searchstr.subString(mindex+sTitle.length(), endquote));
201                         
202                         if (Character.isWhitespace(title.charAt(0))){
203                                 mindex=0;
204                                 while (Character.isWhitespace(title.charAt(mindex++)));
205                                 mindex--;
206                                 title = new String(title.subString(mindex));
207                         }
208
209                         if (Character.isWhitespace(title.charAt(title.length()-1))) {
210                                 endquote=title.length()-1;
211                                 while (Character.isWhitespace(title.charAt(endquote--)));
212                                 endquote += 2;
213                                 title = new String(title.subString(0, endquote));
214                         }
215
216                         if (errorPage(title)) 
217                                 title = null;
218                 }
219
220                 return title;
221         }
222
223         public static boolean errorPage(String str) {
224                 if (str.equals("301 Moved Permanently"))     
225                         return true;                               
226                 else if (str.equals("302 Found"))            
227                         return true;                               
228                 else if (str.equals("404 Not Found"))        
229                         return true;                               
230                 else                                         
231                         return false;                              
232         }                                              
233   
234   public static void requestQuery(String hostname, String path, Socket sock) {
235     StringBuffer req = new StringBuffer("GET "); 
236     req.append("/");
237                 req.append(path);
238     req.append(" HTTP/1.1\r\nHost:");
239     req.append(hostname);
240     req.append("\r\n\r\n");
241     sock.write(req.toString().getBytes());
242   }
243
244         public static void readResponse(LocalQuery lq, Socket sock) {
245         //    state 0 - nothing
246         //    state 1 - \r
247         //    state 2 - \r\n
248         //    state 3 - \r\n\r
249         //    state 4 - \r\n\r\n
250     int state=0;
251     while(true) {
252       if (state<4) {
253         if (state==0) {
254           byte[] b=new byte[1];
255           int numchars=sock.read(b);
256           if ((numchars==1)) {
257             if (b[0]=='\r') {
258               state++;
259             }
260           } else
261                                                 return;
262         } else if (state==1) {
263           byte[] b=new byte[1];
264           int numchars=sock.read(b);
265           if (numchars==1) {
266             if (b[0]=='\n')
267               state++;
268             else
269               state=0;
270           } else return;
271         } else if (state==2) {
272           byte[] b=new byte[1];
273           int numchars=sock.read(b);
274           if (numchars==1) {
275             if (b[0]=='\r')
276               state++;
277             else
278               state=0;
279           } else return;
280         } else if (state==3) {
281           byte[] b=new byte[1];
282           int numchars=sock.read(b);
283           if (numchars==1) {
284             if (b[0]=='\n')
285               state++;
286             else
287               state=0;
288           } else return;
289         }
290       } else {
291                                 byte[] buffer=new byte[1024];
292         int numchars=sock.read(buffer);
293         if (numchars==0)
294           return;
295         else {
296           String curr=(new String(buffer)).subString(0,numchars);
297                                         lq.response.append(curr);
298         }
299       }
300     }
301   }
302         
303         public void processList() {
304                 LinkedList ll;
305                 GlobalString token = null;
306                 int mindex = 0;
307                 int endquote = 0;
308
309                 while (endquote != -1) {
310                         endquote = gTitle.indexOf(' ', mindex);
311
312                         if (endquote != -1) {
313                                 token = gTitle.subString(mindex, endquote);
314                                 mindex = endquote + 1;
315                                 if (filter(token)) {
316                                         continue;
317                                 }
318                                 token = refine(token);
319                         }
320                         else {
321                                 token = gTitle.subString(mindex);
322                                 token = refine(token);
323                         }
324
325                         Queue q = (Queue)results.get(token);
326                         if (q == null) {
327                                 q = global new Queue();
328                         }
329                         q.push(workingURL);     
330                         results.put(token, q);
331                         System.out.println("Key : ["+token.toLocalString()+"],["+q.size()+"]");
332                 }
333         }
334
335         public boolean filter(GlobalString str) {
336                 if (str.equals("of"))   return true;
337                 else if (str.equals("for")) return true;
338                 else if (str.equals("a")) return true;
339                 else if (str.equals("an")) return true;
340                 else if (str.equals("the")) return true;
341                 else if (str.equals("at")) return true;
342                 else if (str.equals("and")) return true;
343                 else if (str.equals("or")) return true;
344                 else if (str.equals("but")) return true;
345                 else if (str.equals("to")) return true;
346                 else if (str.equals(".")) return true;
347                 else if (str.equals("=")) return true;
348                 else if (str.equals("-")) return true;
349                 else if (str.equals(":")) return true;
350                 else if (str.equals(";")) return true;
351                 else if (str.equals("\'")) return true;
352                 else if (str.equals("\"")) return true;
353                 else if (str.equals("|")) return true;
354                 else if (str.equals("@")) return true;
355                 else if (str.equals("&")) return true;
356                 else return false;
357         }
358
359         public GlobalString refine(GlobalString str) {
360                 str = refinePrefix(str);
361                 str = refinePostfix(str);
362                 return str;
363         }
364
365         public GlobalString refinePrefix(GlobalString str) {
366                 if (str.charAt(0) == '&') {             // &
367                         return str.subString(1);
368                 }
369                 return str;
370         }
371
372         public GlobalString refinePostfix(GlobalString str) {
373                 if (str.charAt(str.length()-1) == ',') {                        // ,
374                         return str.subString(0, str.length()-1);
375                 }
376                 else if (str.charAt(str.length()-1) == ':') {           // :
377                         return str.subString(0, str.length()-1);
378                 }
379                 else if (str.charAt(str.length()-1) == ';') {           // ;
380                         return str.subString(0, str.length()-1);
381                 }
382                 else if (str.charAt(str.length()-1) == '!') {           // !
383                         return str.subString(0, str.length()-1);
384                 }
385                 else if (str.charAt(str.length()-1) == 's') {                   // 's
386                         if (str.charAt(str.length()-2) == '\'')
387                                 return str.subString(0, str.length()-2);        
388                 }
389                 return str;
390         }
391   
392   public static Queue processPage(LocalQuery lq) {
393     int index = 0;
394         String href = new String("href=\"");
395         String searchstr = lq.response.toString();
396                 int depth;
397         boolean cont = true;
398                 Queue toprocess;
399
400                 depth = lq.getDepth() + 1;
401
402                 toprocess = global new Queue();
403                 while(cont) {
404                         int mindex = searchstr.indexOf(href,index);
405                         if (mindex != -1) {     
406                                 int endquote = searchstr.indexOf('"', mindex+href.length());
407                 if (endquote != -1) {
408                       String match = searchstr.subString(mindex+href.length(), endquote);
409                                         String match2 = lq.makewebcanonical(match);
410         
411                                         GlobalString ghostname;
412                                         GlobalString gpath;
413
414                                         ghostname = global new GlobalString(lq.getHostName(match));
415                                         gpath = global new GlobalString(lq.getPathName(match));
416
417                       if (match2 != null) {
418                                                         GlobalQuery gq = global new GlobalQuery(ghostname, gpath, depth);
419                                                         toprocess.push(gq);
420                                         }
421                                         index = endquote;
422         } else cont = false;
423       } else cont = false;
424     }                                                                          
425                 return toprocess;
426   }
427 }