This commit was manufactured by cvs2svn to create tag 'buildscript'.
[IRC.git] /
1 /*******************************************************************************
2 *                                 dht.c
3 *
4 *  High-performance Distributed Hash Table for finding the location of objects
5 * in a Distributed Shared Transactional Memory system.
6 *
7 * Creator: Erik Rubow
8 *
9 * TODO:
10 * 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key
11 * counterparts repeatedly, define some new messages to handle it more
12 * efficiently.
13 * 2) Improve the efficiency of functions that work with hostArray, hostReplied,
14 * and blockOwnerArray.
15 * 3) Currently a join or leave causes a rebuild of the entire hash table.
16 * Implement more graceful join and leave procedures.
17 * 4) Fine tune timeout values for performance, possibly implement a backoff
18 * algorithm to prevent overloading the network.
19 * 5) Whatever else I'm forgetting
20 *
21 *******************************************************************************/
22 /*******************************************************************************
23 *                              Includes
24 *******************************************************************************/
25
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/ioctl.h>
31 #include <stdio.h>
32 #include <stdarg.h>
33 #include <string.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <pthread.h>
37 #include <sys/time.h>
38 #include <sys/poll.h>
39 #include <netdb.h>
40 #include <net/if.h>
41 #include <linux/sockios.h>
42 #include <sys/queue.h>
43 #include "dht.h"
44 #include "clookup.h" //this works for now, do we need anything better?
45 #include "mlookup.h"
46
47 /*******************************************************************************
48 *                           Local Defines, Structs
49 *******************************************************************************/
50
51 #define MAX_MSG_SIZE 1500
52 #define UDP_PORT 2157
53 #define INIT_HOST_ALLOC 3
54 #define INIT_NUM_BLOCKS 16
55 #define DEFAULT_INTERFACE "eth0"
56 #define TIMEOUT_PERIOD 100
57 #define INSERT_TIMEOUT_MS 500
58 #define INSERT_RETRIES 50
59 #define REMOVE_TIMEOUT_MS 500
60 #define REMOVE_RETRIES 50
61 #define SEARCH_TIMEOUT_MS 500
62 #define SEARCH_RETRIES 50
63
64 //message types
65 //make sure this matches msg_types global var
66 enum
67 {
68   INSERT_CMD,
69   INSERT_RES,
70   REMOVE_CMD,
71   REMOVE_RES,
72   SEARCH_CMD,
73   SEARCH_RES,
74   WHO_IS_LEADER_CMD,
75   WHO_IS_LEADER_RES,
76   JOIN_REQ,
77   JOIN_RES,
78   LEAVE_REQ,
79   LEAVE_RES,
80   DHT_UPDATE_CMD,
81   DHT_UPDATE_RES,
82   ELECT_LEADER_CMD,
83   ELECT_LEADER_RES,
84   CONGRATS_CMD,
85   REBUILD_REQ,
86   REBUILD_CMD,
87   FILL_DHT_CMD,
88   FILL_DHT_RES,
89   RESUME_NORMAL_CMD,
90   RESUME_NORMAL_RES,
91   NUM_MSG_TYPES
92 };
93
94 //states
95 //make sure this matches state_names, timeout_vals, and retry_vals global vars
96 enum
97 {
98   INIT1_STATE,
99   INIT2_STATE,
100   NORMAL_STATE,
101   LEAD_NORMAL1_STATE,
102   LEAD_NORMAL2_STATE,
103   ELECT1_STATE,
104   ELECT2_STATE,
105   REBUILD0_STATE,
106   REBUILD1_STATE,
107   REBUILD2_STATE,
108   REBUILD3_STATE,
109   REBUILD4_STATE,
110   REBUILD5_STATE,
111   LEAD_REBUILD1_STATE,
112   LEAD_REBUILD2_STATE,
113   LEAD_REBUILD3_STATE,
114   LEAD_REBUILD4_STATE,
115   EXIT1_STATE,
116   EXIT2_STATE,
117   NUM_STATES
118 };
119
120 //status codes
121 enum
122 {
123   OPERATION_OK,
124   KEY_NOT_FOUND,
125   NOT_KEY_OWNER,
126   NOT_LEADER,
127   INTERNAL_ERROR
128 };
129
130 struct hostData {
131   unsigned int ipAddr;
132   unsigned int maxKeyCapacity;
133 };
134
135 /*******************************************************************************
136 *                         Local Function Prototypes
137 *******************************************************************************/
138
139 int msgSizeOk(unsigned char *msg, unsigned int size);
140 unsigned short read2(unsigned char *msg);
141 unsigned int read4(unsigned char *msg);
142 void write2(unsigned char *ptr, unsigned short tmp);
143 void write4(unsigned char *ptr, unsigned int tmp);
144 unsigned int getMyIpAddr(const char *interfaceStr);
145 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp);
146 int udpSendAll(unsigned char *msg, unsigned int size);
147 unsigned int hash(unsigned int x);
148 unsigned int getKeyOwner(unsigned int key);
149 void setState(unsigned int newState);
150 void makeAssignments();
151 int addHost(struct hostData newHost);
152 int removeHost(unsigned int ipAddr);
153 void removeUnresponsiveHosts();
154 int checkReplied(unsigned int ipAddr);
155 int allReplied();
156 void writeHostList();
157 void dhtLog(const char *format, ...);
158 void *fillTask();
159 void *udpListen();
160
161 /*******************************************************************************
162 *                           Global Variables
163 *******************************************************************************/
164
165 //make sure this matches enumeration above
166 const char *msg_types[NUM_MSG_TYPES] =
167 {
168   "INSERT_CMD",
169   "INSERT_RES",
170   "REMOVE_CMD",
171   "REMOVE_RES",
172   "SEARCH_CMD",
173   "SEARCH_RES",
174   "WHO_IS_LEADER_CMD",
175   "WHO_IS_LEADER_RES",
176   "JOIN_REQ",
177   "JOIN_RES",
178   "LEAVE_REQ",
179   "LEAVE_RES",
180   "DHT_UPDATE_CMD",
181   "DHT_UPDATE_RES",
182   "ELECT_LEADER_CMD",
183   "ELECT_LEADER_RES",
184   "CONGRATS_CMD",
185   "REBUILD_REQ",
186   "REBUILD_CMD",
187   "FILL_DHT_CMD",
188   "FILL_DHT_RES",
189   "RESUME_NORMAL_CMD",
190   "RESUME_NORMAL_RES"
191 };
192
193 const char *state_names[NUM_STATES] =
194 {
195   "INIT1_STATE",
196   "INIT2_STATE",
197   "NORMAL_STATE",
198   "LEAD_NORMAL1_STATE",
199   "LEAD_NORMAL2_STATE",
200   "ELECT1_STATE",
201   "ELECT2_STATE",
202   "REBUILD0_STATE",
203   "REBUILD1_STATE",
204   "REBUILD2_STATE",
205   "REBUILD3_STATE",
206   "REBUILD4_STATE",
207   "REBUILD5_STATE",
208   "LEAD_REBUILD1_STATE",
209   "LEAD_REBUILD2_STATE",
210   "LEAD_REBUILD3_STATE",
211   "LEAD_REBUILD4_STATE",
212   "EXIT1_STATE",
213   "EXIT2_STATE",
214 };
215
216 //note: { 0, 0 } means no timeout
217 struct timeval timeout_vals[NUM_STATES] ={
218   { 0, 500000 },       //INIT1_STATE
219   { 0, 500000 },       //INIT2_STATE
220   { 0, 0 },       //NORMAL_STATE
221   { 0, 0 },       //LEAD_NORMAL1_STATE
222   { 3, 0 },       //LEAD_NORMAL2_STATE
223   { 1, 0 },       //ELECT1_STATE
224   { 1, 0 },       //ELECT2_STATE
225   { 0, 500000 },       //REBUILD0_STATE
226   { 0, 500000 },       //REBUILD1_STATE
227   { 10, 0 },       //REBUILD2_STATE
228   { 10, 0 },       //REBUILD3_STATE
229   { 10, 0 },       //REBUILD4_STATE
230   { 1, 0 },       //REBUILD5_STATE
231   { 1, 0 },       //LEAD_REBUILD1_STATE
232   { 1, 0 },       //LEAD_REBUILD2_STATE
233   { 10, 0 },       //LEAD_REBUILD3_STATE
234   { 10, 0 },       //LEAD_REBUILD4_STATE
235   { 0, 500000 },       //EXIT1_STATE
236   { 0, 0 }       //EXIT2_STATE
237 };
238
239 int retry_vals[NUM_STATES] =
240 {
241   100,       //INIT1_STATE
242   10,       //INIT2_STATE
243   0,       //NORMAL_STATE
244   0,       //LEAD_NORMAL1_STATE
245   0,       //LEAD_NORMAL2_STATE
246   10,       //ELECT1_STATE
247   10,       //ELECT2_STATE
248   10,       //REBUILD0_STATE
249   10,       //REBUILD1_STATE
250   0,       //REBUILD2_STATE
251   0,       //REBUILD3_STATE
252   0,       //REBUILD4_STATE
253   10,       //REBUILD5_STATE
254   10,       //LEAD_REBUILD1_STATE
255   10,       //LEAD_REBUILD2_STATE
256   10,       //LEAD_REBUILD3_STATE
257   10,       //LEAD_REBUILD4_STATE
258   10,       //EXIT1_STATE
259   0       //EXIT2_STATE
260 };
261
262 FILE *logfile;
263 struct hostData myHostData;
264 pthread_t threadUdpListen;
265 pthread_t threadFillTask;
266 //status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error
267 int fillStatus;
268 struct pollfd udpPollSock;
269 unsigned int state;
270 unsigned int seed;
271 unsigned int leader;
272 unsigned int electionOriginator;
273 unsigned int electionParent;
274 unsigned int hostArraySize = 0;
275 struct hostData *hostArray = NULL;
276 unsigned int numBlocks = 0;
277 unsigned short *blockOwnerArray = NULL;
278 unsigned char *hostReplied = NULL;
279 pthread_mutex_t stateMutex;
280 pthread_cond_t stateCond;
281 chashtable_t *myHashTable;
282 unsigned int numHosts;
283 struct timeval timer;
284 int timerSet;
285 int timeoutCntr;
286
287 /*******************************************************************************
288 *                      Interface Function Definitions
289 *******************************************************************************/
290
291 void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity) {
292   struct in_addr tmpAddr;
293   char filename[23] = "dht-";
294   struct sockaddr_in myAddr;
295   struct sockaddr_in seedAddr;
296   socklen_t socklen = sizeof(struct sockaddr_in);
297   char initMsg;
298
299   tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE));
300   strcat(filename, inet_ntoa(tmpAddr));
301   strcat(filename, ".log");
302   printf("log file: %s\n", filename);
303
304   logfile = fopen(filename, "w");
305   dhtLog("dhtInit(): inializing...\n");
306
307   myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE);
308   myHostData.maxKeyCapacity = maxKeyCapacity;
309
310   seed = seedIpAddr;
311   leader = 0;
312   electionOriginator = 0;
313   electionParent = 0;
314   hostArraySize = INIT_HOST_ALLOC;
315   hostArray = calloc(hostArraySize, sizeof(struct hostData));
316   hostReplied = calloc(hostArraySize, sizeof(unsigned char));
317   hostArray[0] = myHostData;
318   numHosts = 1;
319   numBlocks = INIT_NUM_BLOCKS;
320   blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
321   pthread_mutex_init(&stateMutex, NULL);
322   pthread_cond_init(&stateCond, NULL);
323   myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
324
325   udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
326   if (udpPollSock.fd < 0)
327     perror("dhtInit():socket()");
328
329   udpPollSock.events = POLLIN;
330
331   bzero(&myAddr, socklen);
332   myAddr.sin_family = AF_INET;
333   myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
334   myAddr.sin_port = htons(UDP_PORT);
335
336   if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0)
337     perror("dhtInit():bind()");
338
339   if (seed == 0){
340     dhtLog("I am the leader\n");
341     leader = myHostData.ipAddr;
342     setState(LEAD_NORMAL1_STATE);
343   } else
344   {
345     initMsg = WHO_IS_LEADER_CMD;
346     udpSend(&initMsg, 1, seed);
347     setState(INIT1_STATE);
348   }
349
350   if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0)
351     dhtLog("dhtInit() - ERROR creating threadUdpListen\n");
352
353   return;
354 }
355
356 void dhtExit() { //TODO: do this gracefully, wait for response from leader, etc.
357   char msg;
358
359   msg = LEAVE_REQ;
360   udpSend(&msg, 1, leader);
361   dhtLog("dhtExit(): cleaning up...\n");
362   pthread_cancel(threadUdpListen);
363   close(udpPollSock.fd);
364   free(hostArray);
365   free(hostReplied);
366   free(blockOwnerArray);
367   fclose(logfile);
368
369   return;
370 }
371
372 int dhtInsert(unsigned int key, unsigned int val) {
373   struct sockaddr_in toAddr;
374   struct sockaddr_in fromAddr;
375   socklen_t socklen = sizeof(struct sockaddr_in);
376   struct pollfd pollsock;
377   char inBuffer[2];
378   char outBuffer[9];
379   ssize_t bytesRcvd;
380   int i;
381   int retval;
382   int status = -1;
383
384   bzero((char *)&toAddr, socklen);
385   toAddr.sin_family = AF_INET;
386   toAddr.sin_port = htons(UDP_PORT);
387
388   while (status != OPERATION_OK){
389     pthread_mutex_lock(&stateMutex);
390     while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
391              || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
392              || state == LEAD_REBUILD3_STATE))
393       pthread_cond_wait(&stateCond, &stateMutex);
394     toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
395     pthread_mutex_unlock(&stateMutex);
396
397     if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0){
398       perror("dhtInsert():socket()");
399       return -1;
400     }
401     pollsock.events = POLLIN;
402
403     outBuffer[0] = INSERT_CMD;
404     write4(&outBuffer[1], key);
405     write4(&outBuffer[5], val);
406
407     for (i = 0; i < INSERT_RETRIES; i++)
408     {
409       if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr,
410                  socklen) < 0){
411         perror("dhtInsert():sendto()");
412         break;
413       }
414       retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS);
415       if (retval < 0){
416         perror("dhtInsert():poll()");
417         break;
418       }
419       if (retval > 0){
420         bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
421                              (struct sockaddr *)&fromAddr, &socklen);
422         if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
423             && fromAddr.sin_port == toAddr.sin_port
424             && bytesRcvd == 2 && inBuffer[0] == INSERT_RES){
425           status = inBuffer[1];                               //status from remote host
426           break;
427         }
428       }
429     }
430     if (status != OPERATION_OK){
431       pthread_mutex_lock(&stateMutex);
432       setState(REBUILD0_STATE);
433       outBuffer[0] = REBUILD_REQ;
434       udpSend(outBuffer, 1, leader);
435       pthread_mutex_unlock(&stateMutex);
436     }
437   }
438
439   close(pollsock.fd);
440
441   return status;
442 }
443
444 int dhtInsertMult(unsigned int numKeys, unsigned int *keys,     unsigned int *vals) {
445   int status;
446   int i;
447
448   status = 0;
449   for (i = 0; i < numKeys; i++)
450   {
451     if (dhtInsert(keys[i], vals[i]) != 0)
452       status = -1;
453   }
454   return status;
455 }
456
457 int dhtRemove(unsigned int key) {
458   struct sockaddr_in toAddr;
459   struct sockaddr_in fromAddr;
460   socklen_t socklen = sizeof(struct sockaddr_in);
461   struct pollfd pollsock;
462   char inBuffer[2];
463   char outBuffer[5];
464   ssize_t bytesRcvd;
465   int i;
466   int retval;
467   int status = -1;
468
469   bzero((char *)&toAddr, socklen);
470   toAddr.sin_family = AF_INET;
471   toAddr.sin_port = htons(UDP_PORT);
472
473   while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)){
474     pthread_mutex_lock(&stateMutex);
475     while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
476              || state == LEAD_NORMAL2_STATE))
477       pthread_cond_wait(&stateCond, &stateMutex);
478     toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
479     pthread_mutex_unlock(&stateMutex);
480
481     if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0){
482       perror("dhtRemove():socket()");
483       return -1;
484     }
485     pollsock.events = POLLIN;
486
487     outBuffer[0] = REMOVE_CMD;
488     write4(&outBuffer[1], key);
489
490     for (i = 0; i < REMOVE_RETRIES; i++)
491     {
492       if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
493                  socklen) < 0){
494         perror("dhtRemove():sendto()");
495         break;
496       }
497       retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS);
498       if (retval < 0){
499         perror("dhtRemove():poll()");
500         break;
501       }
502       if (retval > 0){
503         bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
504                              (struct sockaddr *)&fromAddr, &socklen);
505         if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
506             && fromAddr.sin_port == toAddr.sin_port
507             && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES){
508           status = inBuffer[1];                               //status from remote host
509           break;
510         }
511       }
512     }
513     if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)){
514       pthread_mutex_lock(&stateMutex);
515       setState(REBUILD0_STATE);
516       outBuffer[0] = REBUILD_REQ;
517       udpSend(outBuffer, 1, leader);
518       pthread_mutex_unlock(&stateMutex);
519     }
520   }
521
522   close(pollsock.fd);
523
524   return status;
525 }
526
527 int dhtRemoveMult(unsigned int numKeys, unsigned int *keys) {
528   int status;
529   int i;
530
531   status = 0;
532   for (i = 0; i < numKeys; i++)
533   {
534     if (dhtRemove(keys[i]) != 0)
535       status = -1;
536   }
537   return status;
538 }
539
540 int dhtSearch(unsigned int key, unsigned int *val) {
541   struct sockaddr_in toAddr;
542   struct sockaddr_in fromAddr;
543   socklen_t socklen = sizeof(struct sockaddr_in);
544   struct pollfd pollsock;
545   char inBuffer[6];
546   char outBuffer[5];
547   ssize_t bytesRcvd;
548   int i;
549   int retval;
550   int status = -1;
551
552   bzero((char *)&toAddr, socklen);
553   toAddr.sin_family = AF_INET;
554   toAddr.sin_port = htons(UDP_PORT);
555
556   while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)){
557     pthread_mutex_lock(&stateMutex);
558     while (numBlocks == 0)
559       pthread_cond_wait(&stateCond, &stateMutex);
560     toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
561     pthread_mutex_unlock(&stateMutex);
562
563     if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0){
564       perror("dhtSearch():socket()");
565       return -1;
566     }
567     pollsock.events = POLLIN;
568
569     outBuffer[0] = SEARCH_CMD;
570     write4(&outBuffer[1], key);
571
572     for (i = 0; i < SEARCH_RETRIES; i++)
573     {
574       if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
575                  socklen) < 0){
576         perror("dhtSearch():sendto()");
577         break;
578       }
579       retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS);
580       if (retval < 0){
581         perror("dhtSearch():poll()");
582         break;
583       }
584       if (retval > 0){
585         bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0,
586                              (struct sockaddr *)&fromAddr, &socklen);
587         if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
588             && fromAddr.sin_port == toAddr.sin_port
589             && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES){
590           status = inBuffer[1];                               //status from remote host
591           *val = read4(&inBuffer[2]);
592           break;
593         }
594       }
595     }
596     if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)){
597       pthread_mutex_lock(&stateMutex);
598       setState(REBUILD0_STATE);
599       outBuffer[0] = REBUILD_REQ;
600       udpSend(outBuffer, 1, leader);
601       pthread_mutex_unlock(&stateMutex);
602     }
603   }
604
605   close(pollsock.fd);
606
607   return status;
608 }
609
610 int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals) {
611   int i;
612   int status = 0;
613   for (i = 0; i < numKeys; i++)
614   {
615     if (dhtSearch(keys[i], &vals[i]) != 0)
616       status = -1;
617   }
618   return status;
619 }
620
621 /*******************************************************************************
622 *                      Local Function Definitions
623 *******************************************************************************/
624
625 int msgSizeOk(unsigned char *msg, unsigned int size) {
626   unsigned short tmpNumHosts;
627   unsigned short tmpNumBlocks;
628
629   if (size < 1)
630     return 1;
631
632   switch (msg[0]){
633   case WHO_IS_LEADER_CMD:
634   case LEAVE_REQ:
635   case LEAVE_RES:
636   case DHT_UPDATE_RES:
637   case REBUILD_REQ:
638   case REBUILD_CMD:
639   case FILL_DHT_CMD:
640   case FILL_DHT_RES:
641   case RESUME_NORMAL_CMD:
642   case RESUME_NORMAL_RES:
643     return (size == 1);
644
645   case INSERT_RES:
646   case REMOVE_RES:
647   case JOIN_RES:
648     return (size == 2);
649
650   case REMOVE_CMD:
651   case SEARCH_CMD:
652   case WHO_IS_LEADER_RES:
653   case JOIN_REQ:
654   case ELECT_LEADER_CMD:
655     return (size == 5);
656
657   case SEARCH_RES:
658     return (size == 6);
659
660   case INSERT_CMD:
661     return (size == 9);
662
663   case DHT_UPDATE_CMD:
664     if (size < 5)
665       return 1;
666     tmpNumHosts = read2(&msg[1]);
667     tmpNumBlocks = read2(&msg[3]);
668     return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks));
669
670   case ELECT_LEADER_RES:
671     if (size < 2)
672       return 1;
673     if (msg[1] == 0xFF)
674       return (size == 2);
675     if (size < 4)
676       return 1;
677     tmpNumHosts = read2(&msg[2]);
678     return (size == (4 + sizeof(struct hostData) * tmpNumHosts));
679
680   case CONGRATS_CMD:
681     if (size < 3)
682       return 1;
683     tmpNumHosts = read2(&msg[1]);
684     return (size == (3 + sizeof(struct hostData) * tmpNumHosts));
685
686   default:
687     return 1;
688   }
689 }
690
691 unsigned short read2(unsigned char *ptr) {
692   unsigned short tmp = (ptr[1] << 8) | ptr[0];
693   return tmp;
694 }
695
696 unsigned int read4(unsigned char *ptr) {
697   unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0];
698   return tmp;
699 }
700
701 void write2(unsigned char *ptr, unsigned short tmp) {
702   ptr[1] = (tmp >> 8) & 0xFF;
703   ptr[0] = tmp & 0xFF;
704   return;
705 }
706
707 void write4(unsigned char *ptr, unsigned int tmp) {
708   ptr[3] = (tmp >> 24) & 0xFF;
709   ptr[2] = (tmp >> 16) & 0xFF;
710   ptr[1] = (tmp >> 8) & 0xFF;
711   ptr[0] = tmp & 0xFF;
712   return;
713 }
714
715 unsigned int getMyIpAddr(const char *interfaceStr) {
716   int sock;
717   struct ifreq interfaceInfo;
718   struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
719
720   memset(&interfaceInfo, 0, sizeof(struct ifreq));
721
722   if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
723     perror("getMyIpAddr():socket()");
724     return 1;
725   }
726
727   strcpy(interfaceInfo.ifr_name, interfaceStr);
728   myAddr->sin_family = AF_INET;
729
730   if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0){
731     perror("getMyIpAddr():ioctl()");
732     return 1;
733   }
734
735   return ntohl(myAddr->sin_addr.s_addr);
736 }
737
738 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp) {
739   struct sockaddr_in peerAddr;
740   socklen_t socklen = sizeof(struct sockaddr_in);
741
742   bzero(&peerAddr, socklen);
743   peerAddr.sin_family = AF_INET;
744   peerAddr.sin_addr.s_addr = htonl(destIp);
745   peerAddr.sin_port = htons(UDP_PORT);
746
747   if (size >= 1){
748     if (msg[0] < NUM_MSG_TYPES)
749       dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]],
750              inet_ntoa(peerAddr.sin_addr), size);
751     else
752       dhtLog("udpSend(): sending unknown message to %s, %d bytes\n",
753              inet_ntoa(peerAddr.sin_addr), size);
754   }
755
756   if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr,
757              socklen) < 0){
758     perror("udpSend():sendto()");
759     return -1;
760   }
761
762   return 0;
763 }
764
765 int udpSendAll(unsigned char *msg, unsigned int size) {
766   int i;
767   int status = 0;
768   for (i = 0; i < numHosts; i++)
769   {
770     if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr)){
771       if (udpSend(msg, size, hostArray[i].ipAddr) != 0)
772         status = -1;
773     }
774   }
775   return status;
776 }
777
778 //note: make sure this is only executed in a valid state, where numBlocks != 0
779 unsigned int hash(unsigned int x) {
780   return (x % numBlocks);
781 }
782
783 //note: make sure this is only executed in a valid state, where these arrays
784 // are allocated and the index mappings are consistent
785 unsigned int getKeyOwner(unsigned int key) {
786   return hostArray[blockOwnerArray[hash(key)]].ipAddr;
787 }
788
789 //sets state and timer, if applicable
790 void setState(unsigned int newState) {
791   struct timeval now;
792   int i;
793
794   gettimeofday(&now, NULL);
795
796   if (newState >= NUM_STATES){
797     dhtLog("setState(): ERROR: invalid state %d\n", newState);
798   } else
799   {
800     if (timeout_vals[newState].tv_sec == 0
801         && timeout_vals[newState].tv_usec == 0){ //no timer
802       timerSet = 0;
803     } else
804     {
805       timeradd(&now, &timeout_vals[newState], &timer);
806       timerSet = 1;
807     }
808     timeoutCntr = 0;
809     state = newState;
810     //TODO: only do this for states that require it
811     for (i = 0; i < numHosts; i++)
812       hostReplied[i] = 0;
813
814     dhtLog("setState(): state set to %s\n", state_names[state]);
815   }
816
817   return;
818 }
819
820 //TODO: improve these simple and inefficient functions
821 int checkReplied(unsigned int ipAddr) {
822   int i;
823
824   i = findHost(ipAddr);
825
826   if (i == -1)
827     return -1;
828
829   hostReplied[i] = 1;
830
831   return 0;
832 }
833
834 int allReplied() {
835   int i;
836
837   for (i = 0; i < numHosts; i++)
838     if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
839       return 0;
840
841   return 1;
842 }
843
844 int findHost(unsigned int ipAddr) {
845   int i;
846
847   for (i = 0; i < numHosts; i++)
848     if (hostArray[i].ipAddr == ipAddr)
849       return i;                   //found, return index
850
851   return -1;       //not found
852 }
853
854 int removeHost(unsigned int ipAddr) {
855   int i, j;
856
857   i = findHost(ipAddr);
858
859   if (i == -1)
860     return -1;
861
862   for (j = 0; j < numBlocks; j++)
863   {
864     if (blockOwnerArray[j] == i)
865       blockOwnerArray[j] = 0;                   //TODO: is this what I want to have happen?
866     else if (blockOwnerArray[j] > i)
867       blockOwnerArray[j]--;
868   }
869
870   for (; i < numHosts - 1; i++)
871   {
872     hostArray[i] = hostArray[i+1];
873     hostReplied[i] = hostReplied[i+1];
874   }
875   numHosts--;
876
877   return 0;
878 }
879
880 void removeUnresponsiveHosts() {
881   int i;
882
883   for (i = 0; i < numHosts; i++)
884   {
885     if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr)
886       removeHost(hostArray[i].ipAddr);
887   }
888 }
889
890 int addHost(struct hostData newHost) {
891   struct hostData *newHostArray;
892   unsigned char *newHostReplied;
893   int i;
894   int j;
895
896   for (i = 0; i < numHosts; i++)
897   {
898     if (hostArray[i].ipAddr == newHost.ipAddr){
899       hostArray[i] = newHost;
900       hostReplied[i] = 0;
901       return 0;
902     } else if (hostArray[i].ipAddr > newHost.ipAddr)  {
903       if (numHosts == hostArraySize){
904         newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
905         newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
906         memcpy(newHostArray, hostArray, (i * sizeof(struct hostData)));
907         memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char)));
908         newHostArray[i] = newHost;
909         newHostReplied[i] = 0;
910         memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) *
911                                                    sizeof(struct hostData)));
912         memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) *
913                                                        sizeof(unsigned char)));
914         free(hostArray);
915         free(hostReplied);
916         hostArray = newHostArray;
917         hostReplied = newHostReplied;
918         hostArraySize = 2 * hostArraySize;
919       } else
920       {
921         for (j = numHosts; j > i; j--)
922         {
923           hostArray[j] = hostArray[j-1];
924           hostReplied[j] = hostReplied[j-1];
925         }
926         hostArray[i] = newHost;
927         hostReplied[i] = 0;
928       }
929       for(j = 0; j < numBlocks; j++)
930       {
931         if (blockOwnerArray[j] >= i)
932           blockOwnerArray[j]++;
933       }
934       numHosts++;
935       return 1;
936     }
937   }
938
939   //nothing greater, add to end
940   if (numHosts == hostArraySize){
941     newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
942     newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
943     memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData)));
944     memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char)));
945     free(hostArray);
946     free(hostReplied);
947     hostArray = newHostArray;
948     hostReplied = newHostReplied;
949     hostArraySize = 2 * hostArraySize;
950   }
951
952   hostArray[numHosts] = newHost;
953   hostReplied[numHosts] = 0;
954   numHosts++;
955   return 1;
956 }
957
958 void makeAssignments() {
959   int i;
960
961   if (numBlocks < numHosts){
962     free(blockOwnerArray);
963     while (numBlocks < numHosts)
964       numBlocks *= 2;
965     blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
966   }
967
968   for (i = 0; i < numBlocks; i++)
969     blockOwnerArray[i]  = i % numHosts;
970
971   return;
972 }
973
974 void writeHostList() {
975   int i;
976   struct in_addr tmpAddr;
977
978   fprintf(logfile, "numHosts = %d\n", numHosts);
979   for (i = 0; i < numHosts; i++)
980   {
981     tmpAddr.s_addr = htonl(hostArray[i].ipAddr);
982     fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr),
983             hostArray[i].maxKeyCapacity);
984   }
985   return;
986 }
987
988 void dhtLog(const char *format, ...) {
989   va_list args;
990 //      struct timeval now;
991
992 //      if (gettimeofday(&now, NULL) < 0)
993 //      {       perror("dhtLog():gettimeofday()"); }
994   va_start(args, format);
995 //      if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
996 //      {       perror("dhtLog():fprintf()"); }
997   if (vfprintf(logfile, format, args) < 0){
998     perror("dhtLog():vfprintf()");
999   }
1000   if (fflush(logfile) == EOF){
1001     perror("dhtLog():fflush()");
1002   }
1003   va_end(args);
1004
1005   return;
1006 }
1007
1008 void *fillTask() {
1009   unsigned int *vals;
1010   unsigned int *keys;
1011   unsigned int numKeys;
1012   int i;
1013
1014   vals = mhashGetKeys(&numKeys);       //note: key of mhash is val of dht
1015   keys = calloc(numKeys, sizeof(unsigned int));
1016
1017   for (i = 0; i < numKeys; i++)
1018     keys[i] = myHostData.ipAddr;
1019
1020   if (dhtInsertMult(numKeys, keys, vals) == 0)
1021     fillStatus = 2;
1022   else
1023     fillStatus = 3;
1024
1025   pthread_exit(NULL);
1026 }
1027
1028 void *udpListen() {
1029   ssize_t bytesRcvd;
1030   struct sockaddr_in peerAddr;
1031   unsigned int peerIp;
1032   socklen_t socklen = sizeof(struct sockaddr_in);
1033   unsigned char inBuffer[MAX_MSG_SIZE];
1034   unsigned char outBuffer[MAX_MSG_SIZE];
1035   int pollret;
1036   struct timeval now;
1037   struct in_addr tmpAddr;
1038   struct hostData tmpHost;
1039   unsigned int tmpKey;
1040   unsigned int tmpVal;
1041   struct hostData *hostDataPtr;
1042   unsigned short *uShortPtr;
1043   unsigned int tmpUInt;
1044   unsigned int tmpUShort;
1045   int i;
1046   unsigned int oldState;
1047
1048   dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
1049
1050   while (1){
1051     pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
1052     pthread_mutex_lock(&stateMutex);
1053     oldState = state;
1054     if (pollret < 0){
1055       perror("udpListen():poll()");
1056     } else if (pollret > 0)  {
1057       bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
1058                            (struct sockaddr *)&peerAddr, &socklen);
1059       if (bytesRcvd < 1){
1060         dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
1061       } else if (inBuffer[0] >= NUM_MSG_TYPES)  {
1062         dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
1063       } else if (!msgSizeOk(inBuffer, bytesRcvd))  {
1064         dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
1065                msg_types[inBuffer[0]], bytesRcvd);
1066       } else if (state == EXIT2_STATE)  {
1067         //do nothing
1068       } else if (state == INIT1_STATE)  { //after initialization with seed, do not proceed until seed replies
1069         dhtLog("udpListen(): received %s from %s, %d bytes\n",
1070                msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1071         for (i = 0; i < bytesRcvd; i++)
1072           dhtLog(" %x", inBuffer[i]);
1073         dhtLog("\n");
1074         peerIp = ntohl(peerAddr.sin_addr.s_addr);
1075         if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES){
1076           tmpHost.ipAddr = peerIp;
1077           tmpHost.maxKeyCapacity = 0;
1078           addHost(tmpHost);
1079           writeHostList();
1080           leader = read4(&inBuffer[1]);
1081           tmpAddr.s_addr = htonl(leader);
1082           dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
1083           if (leader != 0){
1084             setState(INIT2_STATE);
1085             outBuffer[0] = JOIN_REQ;
1086             write4(&outBuffer[1], myHostData.maxKeyCapacity);
1087             udpSend(outBuffer, 5, leader);
1088           } else
1089           {
1090             electionOriginator = myHostData.ipAddr;
1091             setState(ELECT1_STATE);
1092             outBuffer[0] = ELECT_LEADER_CMD;
1093             write4(&outBuffer[1], myHostData.ipAddr);                                     //originator = me
1094             udpSendAll(outBuffer, 5);
1095           }
1096         }
1097       } else
1098       {
1099         dhtLog("udpListen(): received %s from %s, %d bytes\n",
1100                msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1101         for (i = 0; i < bytesRcvd; i++)
1102           dhtLog(" %x", inBuffer[i]);
1103         dhtLog("\n");
1104         peerIp = ntohl(peerAddr.sin_addr.s_addr);
1105         switch (inBuffer[0]){
1106         case INSERT_CMD:
1107           if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1108               || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
1109               || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE){
1110             tmpKey = read4(&inBuffer[1]);
1111             tmpVal = read4(&inBuffer[5]);
1112             outBuffer[0] = INSERT_RES;
1113             if (getKeyOwner(tmpKey) == myHostData.ipAddr){
1114               if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
1115                 outBuffer[1] = OPERATION_OK;
1116               else
1117                 outBuffer[1] = INTERNAL_ERROR;
1118             } else
1119             {
1120               outBuffer[1] = NOT_KEY_OWNER;
1121             }
1122             //reply to client socket
1123             sendto(udpPollSock.fd, outBuffer, 2, 0,
1124                    (struct sockaddr *)&peerAddr, socklen);
1125           }
1126           break;
1127
1128         case REMOVE_CMD:
1129           if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1130               || state == LEAD_NORMAL2_STATE){
1131             tmpKey = read4(&inBuffer[1]);
1132             outBuffer[0] = REMOVE_RES;
1133             if (getKeyOwner(tmpKey) == myHostData.ipAddr){
1134               if (chashRemove(myHashTable, tmpKey) == 0)
1135                 outBuffer[1] = OPERATION_OK;
1136               else
1137                 outBuffer[1] = KEY_NOT_FOUND;
1138             } else
1139             {
1140               outBuffer[1] = NOT_KEY_OWNER;
1141             }
1142             //reply to client socket
1143             sendto(udpPollSock.fd, outBuffer, 2, 0,
1144                    (struct sockaddr *)&peerAddr, socklen);
1145           }
1146           break;
1147
1148         case SEARCH_CMD:
1149           if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1150               || state == LEAD_NORMAL2_STATE){
1151             tmpKey = read4(&inBuffer[1]);
1152             outBuffer[0] = SEARCH_RES;
1153             if (getKeyOwner(tmpKey) == myHostData.ipAddr){
1154               if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0){
1155                 outBuffer[1] = OPERATION_OK;
1156                 write4(&outBuffer[2], tmpVal);
1157               } else
1158               {
1159                 outBuffer[1] = KEY_NOT_FOUND;
1160                 write4(&outBuffer[2], 0);
1161               }
1162             } else
1163             {
1164               outBuffer[1] = NOT_KEY_OWNER;
1165               write4(&outBuffer[2], 0);
1166             }
1167             //reply to client socket
1168             sendto(udpPollSock.fd, outBuffer, 6, 0,
1169                    (struct sockaddr *)&peerAddr, socklen);
1170           }
1171           break;
1172
1173         case WHO_IS_LEADER_CMD:
1174           tmpHost.ipAddr = peerIp;
1175           tmpHost.maxKeyCapacity = 0;
1176           addHost(tmpHost);
1177           writeHostList();
1178           outBuffer[0] = WHO_IS_LEADER_RES;
1179           //leader == 0 means I don't know who it is
1180           write4(&outBuffer[1], leader);
1181           udpSend(outBuffer, 5, peerIp);
1182           break;
1183
1184         case JOIN_REQ:
1185           if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE){
1186             tmpHost.ipAddr = peerIp;
1187             tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
1188             addHost(tmpHost);
1189             writeHostList();
1190             if (state == LEAD_NORMAL1_STATE)
1191               setState(LEAD_NORMAL2_STATE);
1192             outBuffer[0] = JOIN_RES;
1193             outBuffer[1] = 0;                                             //status, success
1194             udpSend(outBuffer, 2, peerIp);
1195           } else if (state == LEAD_REBUILD1_STATE)  {
1196             //note: I don't need to addHost().
1197             checkReplied(peerIp);
1198             outBuffer[0] = JOIN_RES;
1199             outBuffer[1] = 0;                                             //status, success
1200             udpSend(outBuffer, 2, peerIp);
1201             if (allReplied()){
1202               makeAssignments();
1203               setState(LEAD_REBUILD2_STATE);
1204               outBuffer[0] = DHT_UPDATE_CMD;
1205               write2(&outBuffer[1], numHosts);
1206               write2(&outBuffer[3], numBlocks);
1207               memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
1208               memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1209                      blockOwnerArray, numBlocks*2);
1210               udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1211                          + 2 * numBlocks);
1212             }
1213           }
1214           break;
1215
1216         case JOIN_RES:
1217           if (state == REBUILD1_STATE){
1218             setState(REBUILD2_STATE);
1219           } else if (state == INIT2_STATE)  {
1220             setState(NORMAL_STATE);
1221           }
1222           break;
1223
1224         case LEAVE_REQ:
1225           if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE){ //TODO: make this graceful, instead of just rebuilding
1226             removeHost(peerIp);
1227             if (state != LEAD_NORMAL2_STATE)
1228               setState(LEAD_NORMAL2_STATE);
1229           }
1230           break;
1231
1232         case DHT_UPDATE_CMD:
1233           if (state == REBUILD2_STATE && peerIp == leader){
1234             free(hostArray);
1235             free(blockOwnerArray);
1236             numHosts = read2(&inBuffer[1]);
1237             numBlocks = read2(&inBuffer[3]);
1238             while (hostArraySize < numHosts)
1239               hostArraySize *= 2;
1240             hostArray = calloc(hostArraySize, sizeof(struct hostData));
1241             blockOwnerArray = calloc(numBlocks, 2);
1242             memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
1243             memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
1244             writeHostList();
1245             setState(REBUILD3_STATE);
1246             outBuffer[0] = DHT_UPDATE_RES;
1247             udpSend(outBuffer, 1, peerIp);
1248           }
1249           break;
1250
1251         case DHT_UPDATE_RES:
1252           if (state == LEAD_REBUILD2_STATE){
1253             checkReplied(peerIp);
1254             if (allReplied()){
1255               setState(LEAD_REBUILD3_STATE);
1256               outBuffer[0] = FILL_DHT_CMD;
1257               udpSendAll(outBuffer, 1);
1258               if (fillStatus != 0)
1259                 dhtLog("udpListen(): ERROR: fillTask already running\n");
1260               fillStatus = 1;
1261               if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1262                 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1263             }
1264           }
1265           break;
1266
1267         case ELECT_LEADER_CMD:
1268           tmpUInt = read4(&inBuffer[1]);
1269           if ((state == ELECT1_STATE || state == ELECT2_STATE)
1270               && tmpUInt >= electionOriginator){  //already participating in a higher-priority election
1271             outBuffer[0] = ELECT_LEADER_RES;
1272             outBuffer[1] = 0xFF;
1273             udpSend(outBuffer, 2, peerIp);
1274           } else
1275           {                                       //join election
1276             electionOriginator = tmpUInt;
1277             electionParent = peerIp;
1278             setState(ELECT1_STATE);
1279             outBuffer[0] = ELECT_LEADER_CMD;
1280             write4(&outBuffer[1], electionOriginator);
1281             //don't bother forwarding the message to originator or parent
1282             checkReplied(electionOriginator);
1283             checkReplied(electionParent);
1284             if (allReplied()){                            //in case that is everybody I know of
1285               setState(ELECT2_STATE);
1286               outBuffer[0] = ELECT_LEADER_RES;
1287               outBuffer[1] = 0;
1288               write2(&outBuffer[2], numHosts);
1289               memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1290                      * numHosts);
1291               udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1292                       electionParent);
1293             } else
1294             {
1295               udpSendAll(outBuffer, 5);
1296             }
1297           }
1298           break;
1299
1300         case ELECT_LEADER_RES:
1301           if (state == ELECT1_STATE){
1302             checkReplied(peerIp);
1303             if (inBuffer[1] != 0xFF){
1304               tmpUShort = read2(&inBuffer[2]);
1305               hostDataPtr = (struct hostData *)&inBuffer[4];
1306               for (i = 0; i < tmpUShort; i++)
1307                 addHost(hostDataPtr[i]);
1308               writeHostList();
1309             }
1310             if (allReplied()){
1311               setState(ELECT2_STATE);
1312               if (electionOriginator == myHostData.ipAddr){
1313                 leader = hostArray[0].ipAddr;
1314                 if (leader == myHostData.ipAddr){                         //I am the leader
1315                   dhtLog("I am the leader!\n");
1316                   setState(LEAD_REBUILD1_STATE);
1317                   outBuffer[0] = REBUILD_CMD;
1318                   udpSendAll(outBuffer, 1);
1319                 } else
1320                 {                                                         //notify leader
1321                   outBuffer[0] = CONGRATS_CMD;
1322                   write2(&outBuffer[1], numHosts);
1323                   hostDataPtr = (struct hostData *)&outBuffer[3];
1324                   for (i = 0; i < numHosts; i++)
1325                     hostDataPtr[i] = hostArray[i];
1326                   udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1327                           leader);
1328                 }
1329               } else
1330               {
1331                 outBuffer[0] = ELECT_LEADER_RES;
1332                 outBuffer[1] = 0;
1333                 write2(&outBuffer[2], numHosts);
1334                 hostDataPtr = (struct hostData *)&outBuffer[4];
1335                 for (i = 0; i < numHosts; i++)
1336                   hostDataPtr[i] = hostArray[i];
1337                 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1338                         electionParent);
1339               }
1340             }
1341           }
1342           break;
1343
1344         case CONGRATS_CMD:
1345           if (state == ELECT2_STATE){             //I am the leader
1346             leader = myHostData.ipAddr;
1347             dhtLog("I am the leader!\n");
1348             tmpUShort = read2(&inBuffer[1]);
1349             hostDataPtr = (struct hostData *)&inBuffer[3];
1350             for (i = 0; i < tmpUShort; i++)
1351               addHost(hostDataPtr[i]);
1352             writeHostList();
1353             setState(LEAD_REBUILD1_STATE);
1354             outBuffer[0] = REBUILD_CMD;
1355             udpSendAll(outBuffer, 1);
1356           }
1357           break;
1358
1359         case REBUILD_REQ:
1360           if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE){
1361             setState(LEAD_REBUILD1_STATE);
1362             outBuffer[0] = REBUILD_CMD;
1363             udpSendAll(outBuffer, 1);
1364           }
1365           break;
1366
1367         case REBUILD_CMD:
1368           leader = peerIp;                                       //consider this a declaration of authority
1369           setState(REBUILD1_STATE);
1370           outBuffer[0] = JOIN_REQ;
1371           write4(&outBuffer[1], myHostData.maxKeyCapacity);
1372           udpSend(outBuffer, 5, leader);
1373           break;
1374
1375         case FILL_DHT_CMD:
1376           if (state == REBUILD3_STATE && peerIp == leader){
1377             setState(REBUILD4_STATE);
1378             if (fillStatus != 0)
1379               dhtLog("udpListen(): ERROR: fillTask already running\n");
1380             fillStatus = 1;
1381             if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1382               dhtLog("udpListen(): ERROR creating threadFillTask\n");
1383           }
1384           break;
1385
1386         case FILL_DHT_RES:
1387           if (state == LEAD_REBUILD3_STATE){
1388             checkReplied(peerIp);
1389             if (allReplied() && fillStatus == 2){
1390               fillStatus = 0;
1391               setState(LEAD_REBUILD4_STATE);
1392               outBuffer[0] = RESUME_NORMAL_CMD;
1393               udpSendAll(outBuffer, 1);
1394             }
1395           }
1396           break;
1397
1398         case RESUME_NORMAL_CMD:
1399           if (state == REBUILD5_STATE && peerIp == leader){
1400             setState(NORMAL_STATE);
1401             outBuffer[0] = RESUME_NORMAL_RES;
1402             udpSend(outBuffer, 1, leader);
1403           }
1404           break;
1405
1406         case RESUME_NORMAL_RES:
1407           if (state == LEAD_REBUILD4_STATE){
1408             checkReplied(peerIp);
1409             if (allReplied()){
1410               setState(LEAD_NORMAL1_STATE);
1411             }
1412           }
1413           break;
1414         }
1415       }
1416     }
1417     if (state == REBUILD4_STATE){
1418       switch (fillStatus){
1419       case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
1420         break;
1421
1422       case 1:                           //do nothing
1423         break;
1424
1425       case 2:                           //done filling the dht, notify leader
1426         fillStatus = 0;
1427         setState(REBUILD5_STATE);
1428         outBuffer[0] = FILL_DHT_RES;
1429         udpSend(outBuffer, 1, leader);
1430         break;
1431
1432       case 3:                           //error encountered -> restart rebuild
1433         fillStatus = 0;
1434         setState(REBUILD0_STATE);
1435         outBuffer[0] = REBUILD_REQ;
1436         udpSend(outBuffer, 1, leader);
1437         break;
1438       }
1439     }
1440     if (state == LEAD_REBUILD3_STATE){
1441       switch (fillStatus){
1442       case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
1443         break;
1444
1445       case 1:                           //do nothing
1446         break;
1447
1448       case 2:                           //I'm done, now is everybody else also done?
1449         if (allReplied()){
1450           fillStatus = 0;
1451           setState(LEAD_REBUILD4_STATE);
1452           outBuffer[0] = RESUME_NORMAL_CMD;
1453           udpSendAll(outBuffer, 1);
1454         }
1455         break;
1456
1457       case 3:                           //error encountered -> restart rebuild
1458         fillStatus = 0;
1459         setState(LEAD_REBUILD1_STATE);
1460         outBuffer[0] = REBUILD_CMD;
1461         udpSendAll(outBuffer, 1);
1462         break;
1463       }
1464     }
1465     if (timerSet){
1466       gettimeofday(&now, NULL);
1467       if (timercmp(&now, &timer, >)){
1468         if (timeoutCntr < retry_vals[state]){
1469           timeoutCntr++;
1470           timeradd(&now, &timeout_vals[state], &timer);
1471           dhtLog("udpListen(): retry: %d\n", timeoutCntr);
1472           switch (state){
1473           case INIT1_STATE:
1474             outBuffer[0] = WHO_IS_LEADER_CMD;
1475             udpSend(outBuffer, 1, seed);
1476             break;
1477
1478           case INIT2_STATE:
1479             outBuffer[0] = JOIN_REQ;
1480             write4(&outBuffer[1], myHostData.maxKeyCapacity);
1481             udpSend(outBuffer, 5, leader);
1482             break;
1483
1484           case ELECT1_STATE:
1485             outBuffer[0] = ELECT_LEADER_CMD;
1486             write4(&outBuffer[1], electionOriginator);
1487             udpSendAll(outBuffer, 5);
1488             break;
1489
1490           case ELECT2_STATE:
1491             if (electionOriginator == myHostData.ipAddr){ //retry notify leader
1492               outBuffer[0] = CONGRATS_CMD;
1493               write2(&outBuffer[1], numHosts);
1494               memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1495                      * numHosts);
1496               udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1497                       leader);
1498             } else
1499             {
1500               outBuffer[0] = ELECT_LEADER_RES;
1501               outBuffer[1] = 0;
1502               write2(&outBuffer[2], numHosts);
1503               memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1504                      * numHosts);
1505               udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1506                       electionParent);
1507             }
1508             break;
1509
1510           case REBUILD0_STATE:
1511             outBuffer[0] = REBUILD_REQ;
1512             udpSend(outBuffer, 1, leader);
1513             break;
1514
1515           case REBUILD1_STATE:
1516             outBuffer[0] = JOIN_REQ;
1517             write4(&outBuffer[1], myHostData.maxKeyCapacity);
1518             udpSend(outBuffer, 5, leader);
1519             break;
1520
1521           case REBUILD5_STATE:
1522             outBuffer[0] = FILL_DHT_RES;
1523             udpSend(outBuffer, 1, leader);
1524             break;
1525
1526           case LEAD_REBUILD1_STATE:
1527             outBuffer[0] = REBUILD_CMD;
1528             udpSendAll(outBuffer, 1);
1529             break;
1530
1531           case LEAD_REBUILD2_STATE:
1532             outBuffer[0] = DHT_UPDATE_CMD;
1533             write2(&outBuffer[1], numHosts);
1534             write2(&outBuffer[3], numBlocks);
1535             memcpy(&outBuffer[5], hostArray, numHosts
1536                    * sizeof(struct hostData));
1537             memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1538                    blockOwnerArray, numBlocks*2);
1539             udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1540                        + 2 * numBlocks);
1541             break;
1542
1543           case LEAD_REBUILD3_STATE:
1544             outBuffer[0] = FILL_DHT_CMD;
1545             udpSendAll(outBuffer, 1);
1546             break;
1547
1548           case LEAD_REBUILD4_STATE:
1549             outBuffer[0] = RESUME_NORMAL_CMD;
1550             udpSendAll(outBuffer, 1);
1551             break;
1552
1553           case EXIT1_STATE:                                       //TODO...
1554             break;
1555
1556           case NORMAL_STATE:
1557           case LEAD_NORMAL1_STATE:
1558           case LEAD_NORMAL2_STATE:
1559           case REBUILD2_STATE:
1560           case REBUILD3_STATE:
1561           case REBUILD4_STATE:
1562           case EXIT2_STATE:                                       //we shouldn't get here
1563             break;
1564           }
1565         } else
1566         {
1567           dhtLog("udpListen(): timed out in state %s after %d retries\n",
1568                  state_names[state], timeoutCntr);
1569           switch (state){
1570           case INIT1_STATE:
1571             setState(EXIT2_STATE);
1572             break;
1573
1574           case LEAD_NORMAL2_STATE:
1575             setState(LEAD_REBUILD1_STATE);
1576             outBuffer[0] = REBUILD_CMD;
1577             udpSendAll(outBuffer, 1);
1578             break;
1579
1580           case ELECT1_STATE:
1581             dhtLog("removing unresponsive hosts, before:\n");
1582             writeHostList();
1583             removeUnresponsiveHosts();
1584             dhtLog("after\n");
1585             writeHostList();
1586             setState(ELECT2_STATE);
1587             if (electionOriginator == myHostData.ipAddr){
1588               leader = hostArray[0].ipAddr;
1589               if (leader == myHostData.ipAddr){                   //I am the leader
1590                 dhtLog("I am the leader!\n");
1591                 setState(LEAD_REBUILD1_STATE);
1592                 outBuffer[0] = REBUILD_CMD;
1593                 udpSendAll(outBuffer, 1);
1594               } else
1595               {                                                   //notify leader
1596                 outBuffer[0] = CONGRATS_CMD;
1597                 write2(&outBuffer[1], numHosts);
1598                 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1599                        * numHosts);
1600                 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1601                         leader);
1602               }
1603             } else
1604             {
1605               outBuffer[0] = ELECT_LEADER_RES;
1606               outBuffer[1] = 0;
1607               write2(&outBuffer[2], numHosts);
1608               memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1609                      * numHosts);
1610               udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1611                       electionParent);
1612             }
1613             break;
1614
1615           case INIT2_STATE:
1616           case ELECT2_STATE:
1617           case REBUILD0_STATE:
1618           case REBUILD1_STATE:
1619           case REBUILD2_STATE:
1620           case REBUILD3_STATE:
1621           case REBUILD4_STATE:
1622           case REBUILD5_STATE:
1623           case LEAD_REBUILD1_STATE:
1624           case LEAD_REBUILD2_STATE:
1625           case LEAD_REBUILD3_STATE:
1626           case LEAD_REBUILD4_STATE:
1627             //start election
1628             electionOriginator = myHostData.ipAddr;
1629             setState(ELECT1_STATE);
1630             outBuffer[0] = ELECT_LEADER_CMD;
1631             write4(&outBuffer[1], myHostData.ipAddr);                                             //originator = me
1632             udpSendAll(outBuffer, 5);
1633             break;
1634
1635           case EXIT1_STATE:
1636             setState(EXIT2_STATE);
1637             break;
1638
1639           case NORMAL_STATE:
1640           case LEAD_NORMAL1_STATE:
1641           case EXIT2_STATE:                                       //we shouldn't get here
1642             break;
1643           }
1644         }
1645       }
1646     }
1647     if (state != oldState)
1648       pthread_cond_broadcast(&stateCond);
1649     pthread_mutex_unlock(&stateMutex);
1650   }
1651 }
1652