fix distributed KMeans bugs and add javasingle version
[IRC.git] / Robust / src / Benchmarks / Prefetch / KMeans / KMeans.java
1 /* =============================================================================
2  *
3  * kmeans.java
4  *
5  * =============================================================================
6  *
7  * Description:
8  *
9  * Takes as input a file:
10  *   ascii  file: containing 1 data point per line
11  *   binary file: first int is the number of objects
12  *                2nd int is the no. of features of each object
13  *
14  * This example performs a fuzzy c-means clustering on the data. Fuzzy clustering
15  * is performed using min to max clusters and the clustering that gets the best
16  * score according to a compactness and separation criterion are returned.
17  *
18  *
19  * Author:
20  *
21  * Wei-keng Liao
22  * ECE Department Northwestern University
23  * email: wkliao@ece.northwestern.edu
24  *
25  *
26  * Edited by:
27  *
28  * Jay Pisharath
29  * Northwestern University
30  *
31  * Chi Cao Minh
32  * Stanford University
33  *
34  * Port to Java version
35  * Alokika Dash
36  * University of California, Irvine
37  *
38  * =============================================================================
39  *
40  * ------------------------------------------------------------------------
41  * 
42  * For the license of kmeans, please see kmeans/LICENSE.kmeans
43  * 
44  * ------------------------------------------------------------------------
45  * 
46  * Unless otherwise noted, the following license applies to STAMP files:
47  * 
48  * Copyright (c) 2007, Stanford University
49  * All rights reserved.
50  * 
51  * Redistribution and use in source and binary forms, with or without
52  * modification, are permitted provided that the following conditions are
53  * met:
54  * 
55  *     * Redistributions of source code must retain the above copyright
56  *       notice, this list of conditions and the following disclaimer.
57  * 
58  *     * Redistributions in binary form must reproduce the above copyright
59  *       notice, this list of conditions and the following disclaimer in
60  *       the documentation and/or other materials provided with the
61  *       distribution.
62  * 
63  *     * Neither the name of Stanford University nor the names of its
64  *       contributors may be used to endorse or promote products derived
65  *       from this software without specific prior written permission.
66  * 
67  * THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
68  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
69  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
70  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE
71  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
72  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
73  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
74  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
75  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
76  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
77  * THE POSSIBILITY OF SUCH DAMAGE.
78  *
79  * =============================================================================
80  */
81
82 public class KMeans extends Thread {
83   /**
84    * User input for max clusters
85    **/
86   int max_nclusters;
87
88   /**
89    * User input for min clusters
90    **/
91   int min_nclusters;
92
93   /**
94    * Check for Binary file
95    **/
96   int isBinaryFile;
97
98   /**
99    * Using zscore transformation for cluster center 
100    * deviating from distribution's mean
101    **/
102   int use_zscore_transform;
103
104   /**
105    * Input file name used for clustering
106    **/
107   String filename;
108
109   /**
110    * Total number of threads
111    **/
112   int nthreads;
113
114   /**
115    * threshold until which kmeans cluster continues
116    **/
117   float threshold;
118
119   /**
120    * thread id
121    **/
122   int threadid;
123
124   /**
125    * Global arguments for threads 
126    **/
127   GlobalArgs g_args;
128
129   /**
130    * Output:  Number of best clusters
131    **/
132   int best_nclusters;
133
134   /**
135    * Output: Cluster centers
136    **/
137   float[][] cluster_centres;
138
139   /**
140    *
141    **/
142   float[][] attributes;
143
144   int numObjects;
145
146   int numAttributes;
147
148   public KMeans() {
149     max_nclusters = 13;
150     min_nclusters = 4;
151     isBinaryFile = 0;
152     use_zscore_transform = 1;
153     threshold = (float) 0.001;
154     best_nclusters = 0;
155   }
156
157   /*
158   public KMeans(int threadid, GlobalArgs g_args) {
159     this.threadid = threadid;
160     this.g_args = g_args;
161   }
162   */
163
164   public KMeans(int threadid, GlobalArgs g_args, int nthreads, int use_zscore_transform, 
165       int max_nclusters, int min_nclusters, float threshold,
166       float[][] attributes, int numObjects, int numAttributes) {
167     this.threadid = threadid;
168     this.g_args = g_args;
169     //this.nthreads = nthreads;
170     //this.use_zscore_transform = use_zscore_transform;
171     //this.max_nclusters = max_nclusters;
172     //this.min_nclusters = min_nclusters;
173     //this.threshold = threshold;
174     //this.attributes = attributes;
175     //this.numObjects = numObjects;
176     //this.numAttributes = numAttributes;
177   }
178
179   public void run() {
180     Barrier barr = new Barrier("128.195.136.162");
181     int id;
182     GlobalArgs tmp_g_args;
183     atomic {
184       id = threadid;
185       tmp_g_args = g_args;
186     }
187     while(true) {
188       Barrier.enterBarrier(barr);
189       Normal.work(id, tmp_g_args);
190       Barrier.enterBarrier(barr);
191     }
192   }
193
194   /* =============================================================================
195    * main
196    * =============================================================================
197    */
198   public static void main(String[] args) {
199     int nthreads;
200     int MAX_LINE_LENGTH = 1000000; /* max input is 400000 one digit input + spaces */
201     int[] mid = new int[8];
202     mid[0] = (128<<24)|(195<<16)|(136<<8)|162; //dc-1.calit2
203     mid[1] = (128<<24)|(195<<16)|(136<<8)|163; //dc-2.calit2
204     mid[2] = (128<<24)|(195<<16)|(136<<8)|164; //dc-3.calit2
205     mid[3] = (128<<24)|(195<<16)|(136<<8)|165; //dc-4.calit2
206     mid[4] = (128<<24)|(195<<16)|(136<<8)|166; //dc-5.calit2
207     mid[5] = (128<<24)|(195<<16)|(136<<8)|167; //dc-6.calit2
208     mid[6] = (128<<24)|(195<<16)|(136<<8)|168; //dc-7.calit2
209     mid[7] = (128<<24)|(195<<16)|(136<<8)|169; //dc-8.calit2
210
211
212     /**
213      * Read options fron the command prompt 
214      **/
215     KMeans kms;
216     kms = new KMeans();
217     KMeans.parseCmdLine(args, kms);
218     nthreads = kms.nthreads;
219     System.out.println("nthreads= " + kms.nthreads);
220
221     /* Initiate Barriers */
222     BarrierServer mybarr;
223
224     atomic {
225       mybarr = global new BarrierServer(nthreads);
226     }
227     mybarr.start(mid[0]);
228
229
230     if (kms.max_nclusters < kms.min_nclusters) {
231       System.out.println("Error: max_clusters must be >= min_clusters\n");
232       System.exit(0);
233     }
234     
235     float[][] buf;
236     float[][] attributes;
237     int numAttributes = 0;
238     int numObjects = 0;
239
240     /*
241      * From the input file, get the numAttributes (columns in txt file) and numObjects (rows in txt file)
242      */
243     if (kms.isBinaryFile == 1) {
244       System.out.println("TODO: Unimplemented Binary file option\n");
245       System.exit(0);
246     }
247
248     FileInputStream inputFile = new FileInputStream(kms.filename);
249     byte b[] = new byte[MAX_LINE_LENGTH];
250     int n;
251     while ((n = inputFile.read(b)) != 0) {
252       for (int i = 0; i < n; i++) {
253         if (b[i] == '\n')
254           numObjects++;
255       }
256     }
257     inputFile.close();
258     inputFile = new FileInputStream(kms.filename);
259     String line = null;
260     if((line = inputFile.readLine()) != null) {
261       int index = 0;
262       boolean prevWhiteSpace = true;
263       while(index < line.length()) {
264         char c = line.charAt(index++);
265         boolean currWhiteSpace = Character.isWhitespace(c);
266         if(prevWhiteSpace && !currWhiteSpace){
267           numAttributes++;
268         }   
269         prevWhiteSpace = currWhiteSpace;
270       }   
271     }   
272     inputFile.close();
273
274     /* Ignore the first attribute: numAttributes = 1; */
275     numAttributes = numAttributes - 1; 
276
277     /* Allocate new shared objects and read attributes of all objects */
278     float[][] tmp_buf;
279     atomic {
280       buf = global new float[numObjects][numAttributes];
281       attributes = global new float[numObjects][numAttributes];
282       tmp_buf = buf;
283     }
284     KMeans.readFromFile(inputFile, kms.filename, tmp_buf, MAX_LINE_LENGTH);
285     System.out.println("Finished Reading from file ......");
286
287     /*
288      * The core of the clustering
289      */
290
291     int len = kms.max_nclusters - kms.min_nclusters + 1;
292
293     int nloops = 1;
294
295     KMeans[] km;
296     GlobalArgs g_args;
297     atomic {
298       km = global new KMeans[nthreads];
299       g_args = global new GlobalArgs();
300       g_args.nthreads = nthreads;
301       for(int x = 0; x < numObjects; x++) {
302         for(int y = 0; y < numAttributes; y++) {
303           attributes[x][y] = buf[x][y];
304         }
305       }
306     }
307
308     atomic {
309       /* Create and Start Threads */
310       for(int i = 1; i<nthreads; i++) {
311         km[i] = global new KMeans(i, g_args, nthreads, kms.use_zscore_transform, 
312             kms.max_nclusters, kms.min_nclusters, kms.threshold, attributes, numObjects, numAttributes);
313       }
314     }
315
316     KMeans tmp;
317
318     boolean waitfordone=true;
319     while(waitfordone) {
320       atomic {
321         if (mybarr.done)
322           waitfordone=false;
323       }
324     }
325
326     for(int i = 1; i<nthreads; i++) {
327       atomic {
328         tmp = km[i];
329       }
330       tmp.start(mid[i]);
331     }
332
333     System.out.println("Finished Starting threads......");
334
335     for (int i = 0; i < nloops; i++) {
336       //
337       // Since zscore transform may perform in cluster() which modifies the
338       // contents of attributes[][], we need to re-store the originals
339       //
340       float[][] tmp_attributes;
341       GlobalArgs tmp_g_args;
342       atomic {
343         for(int x = 0; x < numObjects; x++) {
344           for(int y = 0; y < numAttributes; y++) {
345             attributes[x][y] = buf[x][y];
346           }
347         }
348         tmp_attributes = attributes;
349         tmp_g_args = g_args;
350       }
351
352
353       Cluster.cluster_exec(nthreads,
354           numObjects,
355           numAttributes,
356           tmp_attributes,             // [numObjects][numAttributes] 
357           kms,                    //main class that holds users inputs from command prompt and output arrays that need to be filled
358           tmp_g_args);                // Global arguments common to all threads
359     }
360
361     System.out.println("Printing output......");
362     System.out.println("Best_nclusters= " + kms.best_nclusters);
363
364     /* Output: the coordinates of the cluster centres */
365     
366     /*
367     {
368       for (int i = 0; i < kms.best_nclusters; i++) {
369         System.out.print(i + " ");
370         for (int j = 0; j < numAttributes; j++) {
371           System.out.print(kms.cluster_centres[i][j] + " ");
372         }
373         System.out.println("\n");
374       }
375     }
376     */
377
378     System.out.println("Finished......\n");
379     System.exit(0);
380   }
381
382   public static void parseCmdLine(String args[], KMeans km) {
383     int i = 0;
384     String arg;
385     while (i < args.length && args[i].startsWith("-")) {
386       arg = args[i++];
387       //check options
388       if(arg.equals("-m")) {
389         if(i < args.length) {
390           km.max_nclusters = new Integer(args[i++]).intValue();
391         }
392       } else if(arg.equals("-n")) {
393         if(i < args.length) {
394           km.min_nclusters = new Integer(args[i++]).intValue();
395         }
396       } else if(arg.equals("-t")) {
397         if(i < args.length) {
398           //km.threshold = new Integer(args[i++]).intValue();
399           km.threshold = (float) Double.parseDouble(args[i++]);
400         }
401       } else if(arg.equals("-i")) {
402         if(i < args.length) {
403           km.filename = args[i++];
404         }
405       } else if(arg.equals("-b")) {
406         if(i < args.length) {
407           km.isBinaryFile = new Integer(args[i++]).intValue();
408         }
409       } else if(arg.equals("-z")) {
410         km.use_zscore_transform=0;
411       } else if(arg.equals("-nthreads")) {
412         if(i < args.length) {
413           km.nthreads = new Integer(args[i++]).intValue();
414         }
415       } else if(arg.equals("-h")) {
416         km.usage();
417       }
418     }
419     if(km.nthreads == 0 || km.filename == null) {
420       km.usage();
421     }
422   }
423
424   /**
425    * The usage routine which describes the program options.
426    **/
427   public void usage() {
428     System.out.println("usage: ./kmeans -m <max_clusters> -n <min_clusters> -t <threshold> -i <filename> -nthreads <threads>\n");
429     System.out.println(                   "  -i filename:     file containing data to be clustered\n");
430     System.out.println(                   "  -b               input file is in binary format\n");
431     System.out.println(                   "  -m max_clusters: maximum number of clusters allowed\n");
432     System.out.println(                   "  -n min_clusters: minimum number of clusters allowed\n");
433     System.out.println(                   "  -z             : don't zscore transform data\n");
434     System.out.println(                   "  -t threshold   : threshold value\n");
435     System.out.println(                   "  -nthreads      : number of threads\n");
436   }
437
438   /**
439    * readFromFile()
440    * Read attributes from the input file into an array
441    **/
442   public static void readFromFile(FileInputStream inputFile, String filename, float[][] buf, int MAX_LINE_LENGTH) {
443     inputFile = new FileInputStream(filename);
444     int j;
445     int i = 0;
446
447     byte b[] = new byte[MAX_LINE_LENGTH];
448     int n;
449     byte oldbytes[]=null;
450
451
452     atomic { //FIXME: temporary fix  Note that this 
453       // transaction will never abort because it is only executed
454       // on master machine and therefore the fileread native call is
455       //allowed as a warning
456       j = -1;
457       while ((n = inputFile.read(b)) != 0) {
458         int x=0;
459
460         if (oldbytes!=null) {
461           //find space
462           boolean cr=false;
463           for (;x < n; x++) {
464             if (b[x] == ' ')
465               break;
466             if(b[x] =='\n') {
467               cr=true;
468               break;
469             }
470           }
471           byte newbytes[]= new byte[x+oldbytes.length];
472           boolean isnumber=false;
473           for(int ii=0;ii<oldbytes.length;ii++) {
474             if (oldbytes[ii]>='0'&&oldbytes[ii]<='9')
475               isnumber=true;
476             newbytes[ii]=oldbytes[ii];
477           }
478           for(int ii=0;ii<x;ii++) {
479             if (b[ii]>='0'&&b[ii]<='9')
480               isnumber=true;
481             newbytes[ii+oldbytes.length]=b[ii];
482           }
483           if(x!=n)
484             x++; //skip past space
485           if(isnumber) {
486             if (j>=0) {
487               buf[i][j]=(float)Double.parseDouble(new String(newbytes, 0, newbytes.length));
488             }
489             j++;
490           }
491           if(cr) {
492             j=-1;
493             i++;
494           }
495           oldbytes=null;
496         }
497
498         while (x < n) {
499           int y=x;
500           boolean cr=false;
501           boolean isnumber=false;
502           for(y=x;y<n;y++) {
503             if ((b[y]>='0')&&(b[y]<='9'))
504               isnumber=true;
505             if (b[y]==' ')
506               break;
507             if (b[y]=='\n') {
508               cr=true;
509               break;
510             }
511           }
512           if (y==n) {
513             //need to continue for another read
514             oldbytes=new byte[y-x];
515             for(int ii=0;ii<(y-x);ii++)
516               oldbytes[ii]=b[ii+x];
517             break;
518           }
519           //otherwise x is beginning of character string, y is end
520           if (isnumber) {
521             if (j>=0) {
522               buf[i][j]=(float)Double.parseDouble(new String(b,x,y-x));
523             }
524             j++;
525           }
526           if(cr) {
527             i++;//skip to next line
528             j = -1;//don't store line number
529             x=y;//skip to end of number
530             x++;//skip past return
531           } else {
532             x=y;//skip to end of number
533             x++;//skip past space
534           }
535         }
536       }
537     }
538     inputFile.close();
539   }
540 }
541
542 /* =============================================================================
543  *
544  * End of kmeans.java
545  *
546  * =============================================================================
547  */