changes to distributed benchmarks for ecoop submission
[IRC.git] / Robust / src / Benchmarks / Distributed / SpamFilter / SpamFilter2.java
1 public class SpamFilter extends Thread {
2   DistributedHashMap mydhmap;
3
4   int id; //thread id
5
6   /**
7    * Total number of iterations
8    **/
9   int numiter;
10
11   /**
12    * Total number of emails
13    **/
14   int numemail;
15
16   /**
17    * Total number of threads
18    **/
19   int nthreads;
20
21   public SpamFilter() {
22
23   }
24
25   public SpamFilter(int numiter, int numemail,int id, DistributedHashMap mydhmap, int nthreads) {
26     this.numiter=numiter;
27     this.numemail=numemail;
28     this.id = id;
29     this.mydhmap = mydhmap;
30     this.nthreads = nthreads;
31   }
32
33   public void run() {
34     int niter;
35     int nemails;
36     int thid;
37     int correct=0;
38     int wrong=0;
39     atomic {
40       niter=numiter;
41       nemails=numemail;
42       thid = id;
43     }
44
45     //if(thid == 0)
46     //  return;
47
48     Random rand = new Random(thid);
49     int i;
50
51     for(i=0; i<niter; i++) {
52       correct =0;
53       wrong = 0;
54       for(int j=0; j<nemails; j++) {
55         int pickemail = rand.nextInt(nemails);
56
57         // randomly pick emails
58         pickemail+=1;
59         //System.out.println("pickemail= " + pickemail);
60         Mail email = new Mail("emails/email"+pickemail);
61         Vector signatures = email.checkMail(thid);
62
63         //check with global data structure
64         int[] confidenceVals=null;
65         atomic {
66           confidenceVals = check(signatures,thid);
67         }
68
69         //---- create and  return results --------
70         FilterResult filterResult = new FilterResult();
71         boolean filterAnswer = filterResult.getResult(confidenceVals);
72
73         //---- get user's take on email and send feedback ------
74         boolean userAnswer = email.getIsSpam();
75
76         //System.out.println("userAnswer= " + userAnswer + " filterAnswer= " + filterAnswer);
77
78         if(filterAnswer != userAnswer) {
79           /* wrong answer from the spam filter */
80           wrong++;
81           atomic {
82             sendFeedBack(signatures, userAnswer, thid, rand);
83           }
84         }
85         else {
86           /* Correct answer from the spam filter */
87           correct++;
88         }
89       } //end num emails
90     }//end num iter
91     // Sanity check
92     System.out.println((i)+"th iteration correct = " + correct + " Wrong = " + wrong + " percentage = " + ((float)correct/(float)nemails));
93   }
94
95   public static void main(String[] args) {
96     int[] mid = new int[8];
97     mid[0] = (128<<24)|(195<<16)|(136<<8)|162; //dc-1.calit2
98     mid[1] = (128<<24)|(195<<16)|(136<<8)|163; //dc-2.calit2
99     mid[2] = (128<<24)|(195<<16)|(136<<8)|164; //dc-3.calit2
100     mid[3] = (128<<24)|(195<<16)|(136<<8)|165; //dc-4.calit2
101     mid[4] = (128<<24)|(195<<16)|(136<<8)|166; //dc-5.calit2
102     mid[5] = (128<<24)|(195<<16)|(136<<8)|167; //dc-6.calit2
103     mid[6] = (128<<24)|(195<<16)|(136<<8)|168; //dc-7.calit2
104     mid[7] = (128<<24)|(195<<16)|(136<<8)|169; //dc-8.calit2
105
106     //Read options from command prompt
107     SpamFilter sf = new SpamFilter();
108     SpamFilter.parseCmdLine(args, sf);
109     int nthreads = sf.nthreads;
110
111     //Create Global data structure 
112     DistributedHashMap dhmap;
113     SpamFilter[] spf;
114     atomic {
115       dhmap = global new DistributedHashMap(10000, 0.75f);
116     }
117     atomic {
118       spf = global new SpamFilter[nthreads];
119       for(int i=0; i<nthreads; i++) {
120         spf[i] = global new SpamFilter(sf.numiter, sf.numemail, i, dhmap, nthreads);
121       }
122     }
123
124     /* ---- Start Threads ---- */
125     SpamFilter tmp;
126     for(int i = 0; i<nthreads; i++) {
127       atomic {
128         tmp = spf[i];
129       }
130       tmp.start(mid[i]);
131     }
132
133     /* ---- Join threads----- */
134     for(int i = 0; i<nthreads; i++) {
135       atomic {
136         tmp = spf[i];
137       }
138       tmp.join();
139     }
140
141     System.out.println("Finished");
142   }
143
144   public static void parseCmdLine(String args[], SpamFilter sf) {
145     int i = 0;
146     String arg;
147     while (i < args.length && args[i].startsWith("-")) {
148       arg = args[i++];
149       //check options
150       if(arg.equals("-n")) { //num of iterations
151         if(i < args.length) {
152           sf.numiter = new Integer(args[i++]).intValue();
153         }
154       } else if(arg.equals("-e")) { //num of emails
155         if(i < args.length) {
156           sf.numemail = new Integer(args[i++]).intValue();
157         }
158       } else if(arg.equals("-t")) { //num of threads
159         if(i < args.length) {
160           sf.nthreads = new Integer(args[i++]).intValue();
161         }
162       } else if(arg.equals("-h")) {
163         sf.usage();
164       }
165     }
166     if(sf.nthreads == 0) {
167       sf.usage();
168     }
169   }
170
171   /**
172    * The usage routine describing the program
173    **/
174   public void usage() {
175     System.out.println("usage: ./spamfilter -n <num iterations> -e <num emails> -t <num threads>\n");
176     System.out.println(                   "  -n : num iterations");
177     System.out.println(                   "  -e : number of emails");
178     System.out.println(                   "  -t : number of threads");
179   }
180
181   /**
182    *  Returns result to the Spam filter
183    **/
184   /*
185   public boolean checkMail(Mail mail, int userid) {
186     //Preprocess emails
187     //Vector partsOfMailStrings = mail.createMailStringsWithURL();
188     /*
189     Vector partsOfMailStrings = mail.getCommonPart();
190     partsOfMailStrings.addElement(mail.getBodyString());
191
192     //Compute signatures
193     SignatureComputer sigComp = new SignatureComputer();
194     Vector signatures = sigComp.computeSigs(partsOfMailStrings);//vector of strings
195
196     //check with global data structure
197     int[] confidenceVals = check(signatures,userid);
198
199     //---- create and  return results --------
200     FilterResult filterResult = new FilterResult();
201     boolean spam = filterResult.getResult(confidenceVals);
202
203     return spam;
204   } 
205    */
206
207   public int[] check(Vector signatures, int userid) {
208
209     //*** Prefetch ****/
210     //prefetch(this.mydhmap.table);
211     int numparts = signatures.size();
212
213     //System.out.println("check() numparts= " + numparts);
214     int[] confidenceVals = new int[numparts];
215
216     for(int i=0; i<numparts; i++) {
217       String part = (String)(signatures.elementAt(i));
218       char tmpengine = part.charAt(0);
219       String enginestr=null;
220       if(tmpengine == '4') { //Ephemeral Signature calculator
221         enginestr = new String("4");
222       }
223       if(tmpengine == '8') { //Whiplash Signature calculator
224         enginestr = new String("8");
225       }
226       String signaturestr = new String(part.substring(2));//a:b index of a =0, index of : =1, index of b =2
227
228       //find object in distributedhashMap: if no object then add object 
229       HashEntry tmphe=null;
230       int hashCode = enginestr.hashCode()^signaturestr.hashCode();
231       
232       int index1 = mydhmap.hash1(hashCode, mydhmap.table.length);
233
234       /*** Prefetch ****/
235       //prefetch(mydhmap.table[index1].array.key.stats.userstat[userid],
236       //         mydhmap.table[index1].array.value,
237       //         mydhmap.table[index1].array.key.stats.userid,
238       //         mydhmap.table[index1].array.key.engine.value,
239       //         mydhmap.table[index1].array.key.signature.value);  
240
241       DistributedHashEntry testhe = mydhmap.table[index1];
242       boolean foundstatistics=false;
243       DHashEntry ptr=null;
244       if(testhe!=null) {
245         /*** Prefetch ****/
246         //prefetch(testhe.array.next.value,
247         //         testhe.array.next.key.engine.value,
248         //         testhe.array.next.key.stats.userid,
249         //         testhe.array.next.key.stats.userstat[userid],
250         //         testhe.array.next.key.signature,value);
251
252         ptr=testhe.array;
253
254         while(ptr !=null) {
255           boolean engineVal= inLineEquals(ptr.key.engine.value, ptr.key.engine.count, ptr.key.engine.offset,
256              enginestr.value, enginestr.count, enginestr.offset);
257           boolean SignatureVal= inLineEquals(ptr.key.signature.value, ptr.key.signature.count, ptr.key.signature.offset,
258               signaturestr.value, signaturestr.count, signaturestr.offset);
259           
260           FilterStatistic tmpfs = ptr.value;
261           int tmpuserid = ptr.key.stats.userid[userid];
262           FilterStatistic myfs = ptr.key.stats.userstat[userid];
263           
264           if(ptr.hashval==hashCode&&engineVal&&SignatureVal) {
265             //Found statics...get Checked value.
266             confidenceVals[i] = tmpfs.getChecked(); 
267             foundstatistics=true;
268             break;
269           }
270           /* Prefetch */
271           //prefetch(ptr.next.next.key.stats.userid,
272           //         ptr.next.next.key.engine.value,
273           //         ptr.next.next.key.signature.value,
274           //         ptr.next.next.key.stats.userstat[userid],
275           //         ptr.next.next.value);
276           ptr=ptr.next;
277         }
278       }
279
280       if (!foundstatistics) {
281         /* Prefetch */
282         //prefetch(testhe.array);
283         HashEntry myhe = global new HashEntry();
284         GString engine = global new GString(enginestr);
285         GString signature = global new GString(signaturestr);
286
287         myhe.setengine(engine);
288         myhe.setsig(signature);
289
290         DHashEntry he = global new DHashEntry();
291         //application specific fields
292         HashStat mystat = global new HashStat();
293         mystat.setuser(userid, 0, 0, -1);
294         myhe.setstats(mystat);
295         FilterStatistic myfs =  global new FilterStatistic(0,0,-1);
296         he.value=myfs;
297         he.key=myhe;
298         he.hashval=hashCode;
299         //link old element into chain
300         //build new element
301
302         if (testhe!=null) {
303           //splice into old list
304           he.next=testhe.array;
305           testhe.array=he;
306         } else {
307           //create new header...this will cause many aborts
308           DistributedHashEntry newhe=global new DistributedHashEntry();
309           newhe.array=he;
310           mydhmap.table[index1]=newhe;
311         }
312       }
313     }
314
315     //  --> the mail client is able to determine if it is spam or not
316     // --- According to the "any"-logic (in Core#check_logic) in original Razor ---
317     // If any answer is spam, the entire email is spam.
318     return confidenceVals;
319   }
320
321   /**
322    * This method sends feedback from the user to a distributed
323    * spam database and trains the spam database to check future
324    * emails and detect spam
325    **/
326   public void sendFeedBack(Vector signatures, boolean isSpam, int id, Random myrand) {
327
328     for(int i=0;i<signatures.size();i++) {
329       String part = (String)(signatures.elementAt(i));
330       //
331       // Signature is of form a:b
332       // where a = string representing a signature engine
333       //           either "4" or "8"
334       //       b = string representing signature
335       //
336       char tmpengine = part.charAt(0); //
337
338       GString engine=null;
339
340       if(tmpengine == '4') {
341         String tmpstr = new String("4");
342         engine = global new GString(tmpstr);
343       }
344
345       if(tmpengine == '8') {
346         String tmpstr = new String("8");
347         engine = global new GString(tmpstr);
348       }
349
350       //System.out.println("sendFeedBack(): engine= " + engine.toLocalString());
351
352       String tmpsig = new String(part.substring(2));
353       GString signature = global new GString(tmpsig);
354
355       //System.out.println("sendFeedBack(): signature= " + signature.toLocalString());
356
357       HashEntry myhe = global new HashEntry();
358       myhe.setengine(engine);
359       myhe.setsig(signature);
360
361
362       // ----- now connect to global data structure and update stats -----
363       HashEntry tmphe=null;
364       FilterStatistic fs=null;
365       int hashCode = myhe.hashCode();
366       int index1 = mydhmap.hash1(hashCode, mydhmap.table.length);
367       DistributedHashEntry testhe = mydhmap.table[index1];
368       if(testhe!=null) {
369         DHashEntry ptr=testhe.array;
370         while(ptr!=null) {
371           boolean engineVal= inLineEquals(ptr.key.engine.value, ptr.key.engine.count, ptr.key.engine.offset,
372               myhe.engine.value, myhe.engine.count, myhe.engine.offset);
373           boolean SignatureVal= inLineEquals(ptr.key.signature.value, ptr.key.signature.count, ptr.key.signature.offset,
374               myhe.signature.value, myhe.signature.count, myhe.signature.offset);
375
376           if(ptr.hashval==hashCode&&engineVal&&SignatureVal) {
377             tmphe=ptr.key;
378             fs=ptr.value;
379             break;
380           }
381           ptr=ptr.next;
382         }
383       }
384       //tmphe has the key at the end
385       //fs has the value at the end      
386
387       if(tmphe==null) 
388         return;
389
390
391       if(tmphe.stats.userid[id] != 1) {
392         tmphe.stats.setuserid(id);
393       }
394
395
396       //---- get value from distributed hash and update spam count
397
398       //Allow users to give incorrect feedback
399       int pickemail = myrand.nextInt(100);
400       /* Randomly allow user to provide incorrect feedback */
401       if(pickemail < 95) {
402         //give correct feedback 95% of times
403         //Increment spam or ham value 
404         if(isSpam) {
405           tmphe.stats.incSpamCount(id);
406           fs.increaseSpam();
407         } else {
408           tmphe.stats.incHamCount(id);
409           fs.increaseHam();
410         }
411       } else {
412         // Give incorrect feedback 5% of times
413         if(isSpam) {
414           tmphe.stats.incHamCount(id);
415           fs.increaseHam();
416         } else {
417           tmphe.stats.incSpamCount(id);
418           fs.increaseSpam();
419         }
420       } //end of pickemail
421     }//end of for
422   }//end of sendFeedback
423
424   public static boolean inLineEquals(char[] array1, int count1, int offset1, char[] array2, int count2, int offset2) {
425     if(count1 != count2)
426       return false;
427     for(int i=0; i<count1; i++) {
428       if(array1[i+offset1] != array2[i+offset2]) {
429         return false;
430       }
431     }
432     return true;
433   }
434 }