This commit was manufactured by cvs2svn to create tag 'buildscript'.
[IRC.git] /
1 /*******************************************************************************
2 *                                 dht.c
3 *
4 *  High-performance Distributed Hash Table for finding the location of objects
5 * in a Distributed Shared Transactional Memory system.
6 *
7 * Creator: Erik Rubow
8 *
9 * TODO:
10 * 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key
11 * counterparts repeatedly, define some new messages to handle it more
12 * efficiently.
13 * 2) Improve the efficiency of functions that work with hostArray, hostReplied,
14 * and blockOwnerArray.
15 * 3) Currently a join or leave causes a rebuild of the entire hash table.
16 * Implement more graceful join and leave procedures.
17 * 4) Fine tune timeout values for performance, possibly implement a backoff
18 * algorithm to prevent overloading the network.
19 * 5) Whatever else I'm forgetting
20 *
21 *******************************************************************************/
22 /*******************************************************************************
23 *                              Includes
24 *******************************************************************************/
25
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/ioctl.h>
31 #include <stdio.h>
32 #include <stdarg.h>
33 #include <string.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <pthread.h>
37 #include <sys/time.h>
38 #include <sys/poll.h>
39 #include <netdb.h>
40 #include <net/if.h>
41 #include <linux/sockios.h>
42 #include <sys/time.h>
43 #include <sys/queue.h>
44 #include "dht.h"
45 #include "clookup.h" //this works for now, do we need anything better?
46 #include "mlookup.h"
47
48 /*******************************************************************************
49 *                           Local Defines, Structs
50 *******************************************************************************/
51
52 #define MAX_MSG_SIZE 1500
53 #define UDP_PORT 2157
54 #define INIT_HOST_ALLOC 3
55 #define INIT_NUM_BLOCKS 16
56 #define DEFAULT_INTERFACE "eth0"
57 #define TIMEOUT_PERIOD 100
58 #define INSERT_TIMEOUT_MS 500
59 #define INSERT_RETRIES 50
60 #define REMOVE_TIMEOUT_MS 500
61 #define REMOVE_RETRIES 50
62 #define SEARCH_TIMEOUT_MS 500
63 #define SEARCH_RETRIES 50
64
65 //message types
66 //make sure this matches msg_types global var
67 enum
68 {
69         INSERT_CMD,
70         INSERT_RES,
71         REMOVE_CMD,
72         REMOVE_RES,
73         SEARCH_CMD,
74         SEARCH_RES,
75         WHO_IS_LEADER_CMD,
76         WHO_IS_LEADER_RES,
77         JOIN_REQ,
78         JOIN_RES,
79         LEAVE_REQ,
80         LEAVE_RES,
81         DHT_UPDATE_CMD,
82         DHT_UPDATE_RES,
83         ELECT_LEADER_CMD,
84         ELECT_LEADER_RES,
85         CONGRATS_CMD,
86         REBUILD_REQ,
87         REBUILD_CMD,
88         FILL_DHT_CMD,
89         FILL_DHT_RES,
90         RESUME_NORMAL_CMD,
91         RESUME_NORMAL_RES,
92         NUM_MSG_TYPES
93 };
94
95 //states
96 //make sure this matches state_names, timeout_vals, and retry_vals global vars
97 enum
98 {
99         INIT1_STATE,
100         INIT2_STATE,
101         NORMAL_STATE,
102         LEAD_NORMAL1_STATE,
103         LEAD_NORMAL2_STATE,
104         ELECT1_STATE,
105         ELECT2_STATE,
106         REBUILD0_STATE,
107         REBUILD1_STATE,
108         REBUILD2_STATE,
109         REBUILD3_STATE,
110         REBUILD4_STATE,
111         REBUILD5_STATE,
112         LEAD_REBUILD1_STATE,
113         LEAD_REBUILD2_STATE,
114         LEAD_REBUILD3_STATE,
115         LEAD_REBUILD4_STATE,
116         EXIT1_STATE,
117         EXIT2_STATE,
118         NUM_STATES
119 };
120
121 //status codes
122 enum
123 {
124         OPERATION_OK,
125         KEY_NOT_FOUND,
126         NOT_KEY_OWNER,
127         NOT_LEADER,
128         INTERNAL_ERROR
129 };
130
131 struct hostData
132 {
133         unsigned int ipAddr;
134         unsigned int maxKeyCapacity;
135 };
136
137 /*******************************************************************************
138 *                         Local Function Prototypes
139 *******************************************************************************/
140
141 int msgSizeOk(unsigned char *msg, unsigned int size);
142 unsigned short read2(unsigned char *msg);
143 unsigned int read4(unsigned char *msg);
144 void write2(unsigned char *ptr, unsigned short tmp);
145 void write4(unsigned char *ptr, unsigned int tmp);
146 unsigned int getMyIpAddr(const char *interfaceStr);
147 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp);
148 int udpSendAll(unsigned char *msg, unsigned int size);
149 unsigned int hash(unsigned int x);
150 unsigned int getKeyOwner(unsigned int key);
151 void setState(unsigned int newState);
152 void makeAssignments();
153 int addHost(struct hostData newHost);
154 int removeHost(unsigned int ipAddr);
155 void removeUnresponsiveHosts();
156 int checkReplied(unsigned int ipAddr);
157 int allReplied();
158 void writeHostList();
159 void dhtLog(const char *format, ...);
160 void *fillTask();
161 void *udpListen();
162
163 /*******************************************************************************
164 *                           Global Variables
165 *******************************************************************************/
166
167 //make sure this matches enumeration above
168 const char *msg_types[NUM_MSG_TYPES] =
169 {
170         "INSERT_CMD",
171         "INSERT_RES",
172         "REMOVE_CMD",
173         "REMOVE_RES",
174         "SEARCH_CMD",
175         "SEARCH_RES",
176         "WHO_IS_LEADER_CMD",
177         "WHO_IS_LEADER_RES",
178         "JOIN_REQ",
179         "JOIN_RES",
180         "LEAVE_REQ",
181         "LEAVE_RES",
182         "DHT_UPDATE_CMD",
183         "DHT_UPDATE_RES",
184         "ELECT_LEADER_CMD",
185         "ELECT_LEADER_RES",
186         "CONGRATS_CMD",
187         "REBUILD_REQ",
188         "REBUILD_CMD",
189         "FILL_DHT_CMD",
190         "FILL_DHT_RES",
191         "RESUME_NORMAL_CMD",
192         "RESUME_NORMAL_RES"
193 };
194
195 const char *state_names[NUM_STATES] =
196 {
197         "INIT1_STATE",
198         "INIT2_STATE",
199         "NORMAL_STATE",
200         "LEAD_NORMAL1_STATE",
201         "LEAD_NORMAL2_STATE",
202         "ELECT1_STATE",
203         "ELECT2_STATE",
204         "REBUILD0_STATE",
205         "REBUILD1_STATE",
206         "REBUILD2_STATE",
207         "REBUILD3_STATE",
208         "REBUILD4_STATE",
209         "REBUILD5_STATE",
210         "LEAD_REBUILD1_STATE",
211         "LEAD_REBUILD2_STATE",
212         "LEAD_REBUILD3_STATE",
213         "LEAD_REBUILD4_STATE",
214         "EXIT1_STATE",
215         "EXIT2_STATE",
216 };
217
218 //note: { 0, 0 } means no timeout
219 struct timeval timeout_vals[NUM_STATES] =
220 {
221         { 0, 500000 }, //INIT1_STATE
222         { 0, 500000 }, //INIT2_STATE
223         { 0, 0 }, //NORMAL_STATE
224         { 0, 0 }, //LEAD_NORMAL1_STATE
225         { 3, 0 }, //LEAD_NORMAL2_STATE
226         { 1, 0 }, //ELECT1_STATE
227         { 1, 0 }, //ELECT2_STATE
228         { 0, 500000 }, //REBUILD0_STATE
229         { 0, 500000 }, //REBUILD1_STATE
230         { 10, 0 }, //REBUILD2_STATE
231         { 10, 0 }, //REBUILD3_STATE
232         { 10, 0 }, //REBUILD4_STATE
233         { 1, 0 }, //REBUILD5_STATE
234         { 1, 0 }, //LEAD_REBUILD1_STATE
235         { 1, 0 }, //LEAD_REBUILD2_STATE
236         { 10, 0 }, //LEAD_REBUILD3_STATE
237         { 10, 0 }, //LEAD_REBUILD4_STATE
238         { 0, 500000 }, //EXIT1_STATE
239         { 0, 0 } //EXIT2_STATE
240 };
241
242 int retry_vals[NUM_STATES] =
243 {
244         100, //INIT1_STATE
245         10, //INIT2_STATE
246         0, //NORMAL_STATE
247         0, //LEAD_NORMAL1_STATE
248         0, //LEAD_NORMAL2_STATE
249         10, //ELECT1_STATE
250         10, //ELECT2_STATE
251         10, //REBUILD0_STATE
252         10, //REBUILD1_STATE
253         0, //REBUILD2_STATE
254         0, //REBUILD3_STATE
255         0, //REBUILD4_STATE
256         10, //REBUILD5_STATE
257         10, //LEAD_REBUILD1_STATE
258         10, //LEAD_REBUILD2_STATE
259         10, //LEAD_REBUILD3_STATE
260         10, //LEAD_REBUILD4_STATE
261         10, //EXIT1_STATE
262         0 //EXIT2_STATE
263 };
264
265 FILE *logfile;
266 struct hostData myHostData;
267 pthread_t threadUdpListen;
268 pthread_t threadFillTask;
269 //status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error
270 int fillStatus;
271 struct pollfd udpPollSock;
272 unsigned int state;
273 unsigned int seed;
274 unsigned int leader;
275 unsigned int electionOriginator;
276 unsigned int electionParent;
277 unsigned int hostArraySize = 0;
278 struct hostData *hostArray = NULL;
279 unsigned int numBlocks = 0;
280 unsigned short *blockOwnerArray = NULL;
281 unsigned char *hostReplied = NULL;
282 pthread_mutex_t stateMutex;
283 pthread_cond_t stateCond;
284 chashtable_t *myHashTable;
285 unsigned int numHosts;
286 struct timeval timer;
287 int timerSet;
288 int timeoutCntr;
289
290 /*******************************************************************************
291 *                      Interface Function Definitions
292 *******************************************************************************/
293
294 void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity)
295 {
296         struct in_addr tmpAddr;
297         char filename[23] = "dht-";
298         struct sockaddr_in myAddr;
299         struct sockaddr_in seedAddr;
300         socklen_t socklen = sizeof(struct sockaddr_in);
301         char initMsg;
302
303         tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE));
304         strcat(filename, inet_ntoa(tmpAddr));
305         strcat(filename, ".log");
306         printf("log file: %s\n", filename);
307
308         logfile = fopen(filename, "w");
309         dhtLog("dhtInit(): inializing...\n");
310
311         myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE);
312         myHostData.maxKeyCapacity = maxKeyCapacity;
313
314         seed = seedIpAddr;
315         leader = 0;
316         electionOriginator = 0;
317         electionParent = 0;
318         hostArraySize = INIT_HOST_ALLOC;
319         hostArray = calloc(hostArraySize, sizeof(struct hostData));
320         hostReplied = calloc(hostArraySize, sizeof(unsigned char));
321         hostArray[0] = myHostData;
322         numHosts = 1;
323         numBlocks = INIT_NUM_BLOCKS;
324         blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
325         pthread_mutex_init(&stateMutex, NULL);
326         pthread_cond_init(&stateCond, NULL);
327         myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
328
329         udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
330         if (udpPollSock.fd < 0)
331                 perror("dhtInit():socket()");
332
333         udpPollSock.events = POLLIN;
334         
335         bzero(&myAddr, socklen);
336         myAddr.sin_family = AF_INET;
337         myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
338         myAddr.sin_port = htons(UDP_PORT);
339
340         if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0)
341                 perror("dhtInit():bind()");
342
343         if (seed == 0)
344         {
345                 dhtLog("I am the leader\n");
346                 leader = myHostData.ipAddr;
347                 setState(LEAD_NORMAL1_STATE);
348         }
349         else
350         {
351                 initMsg = WHO_IS_LEADER_CMD;
352                 udpSend(&initMsg, 1, seed);
353                 setState(INIT1_STATE);
354         }
355
356         if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0)
357                 dhtLog("dhtInit() - ERROR creating threadUdpListen\n");
358
359         return;
360 }
361
362 void dhtExit()
363 { //TODO: do this gracefully, wait for response from leader, etc.
364         char msg;
365
366         msg = LEAVE_REQ;
367         udpSend(&msg, 1, leader);
368         dhtLog("dhtExit(): cleaning up...\n");
369         pthread_cancel(threadUdpListen);
370         close(udpPollSock.fd);
371         free(hostArray);
372         free(hostReplied);
373         free(blockOwnerArray);
374         fclose(logfile);
375
376         return;
377 }
378
379 int dhtInsert(unsigned int key, unsigned int val)
380 {
381         struct sockaddr_in toAddr;
382         struct sockaddr_in fromAddr;
383         socklen_t socklen = sizeof(struct sockaddr_in);
384         struct pollfd pollsock;
385         char inBuffer[2];
386         char outBuffer[9];
387         ssize_t bytesRcvd;
388         int i;
389         int retval;
390         int status = -1;
391
392         bzero((char *)&toAddr, socklen);
393         toAddr.sin_family = AF_INET;
394         toAddr.sin_port = htons(UDP_PORT);
395
396         while (status != OPERATION_OK)
397         {
398                 pthread_mutex_lock(&stateMutex);
399                 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
400                                 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
401                                 || state == LEAD_REBUILD3_STATE))
402                         pthread_cond_wait(&stateCond, &stateMutex);
403                 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
404                 pthread_mutex_unlock(&stateMutex);
405
406                 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
407                 {
408                         perror("dhtInsert():socket()");
409                         return -1;
410                 }
411                 pollsock.events = POLLIN;
412
413                 outBuffer[0] = INSERT_CMD;
414                 write4(&outBuffer[1], key);
415                 write4(&outBuffer[5], val);
416
417                 for (i = 0; i < INSERT_RETRIES; i++)
418                 {
419                         if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr,
420                                 socklen) < 0)
421                         {
422                                 perror("dhtInsert():sendto()");
423                                 break;
424                         }
425                         retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS);
426                         if (retval < 0)
427                         {
428                                 perror("dhtInsert():poll()");
429                                 break;
430                         }
431                         if (retval > 0)
432                         {
433                                 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
434                                         (struct sockaddr *)&fromAddr, &socklen);
435                                 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
436                                         && fromAddr.sin_port == toAddr.sin_port
437                                         && bytesRcvd == 2 && inBuffer[0] == INSERT_RES)
438                                 {
439                                         status = inBuffer[1]; //status from remote host
440                                         break;
441                                 }
442                         }
443                 }
444                 if (status != OPERATION_OK)
445                 {
446                         pthread_mutex_lock(&stateMutex);
447                         setState(REBUILD0_STATE);
448                         outBuffer[0] = REBUILD_REQ;
449                         udpSend(outBuffer, 1, leader);
450                         pthread_mutex_unlock(&stateMutex);
451                 }
452         }
453
454         close(pollsock.fd);
455
456         return status;
457 }
458
459 int dhtInsertMult(unsigned int numKeys, unsigned int *keys,     unsigned int *vals)
460 {
461         int status;
462         int i;
463
464         status = 0;
465         for (i = 0; i < numKeys; i++)
466         {
467                 if (dhtInsert(keys[i], vals[i]) != 0)
468                         status = -1;
469         }
470         return status;
471 }
472
473 int dhtRemove(unsigned int key)
474 {
475         struct sockaddr_in toAddr;
476         struct sockaddr_in fromAddr;
477         socklen_t socklen = sizeof(struct sockaddr_in);
478         struct pollfd pollsock;
479         char inBuffer[2];
480         char outBuffer[5];
481         ssize_t bytesRcvd;
482         int i;
483         int retval;
484         int status = -1;
485
486         bzero((char *)&toAddr, socklen);
487         toAddr.sin_family = AF_INET;
488         toAddr.sin_port = htons(UDP_PORT);
489
490         while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
491         {
492                 pthread_mutex_lock(&stateMutex);
493                 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
494                                 || state == LEAD_NORMAL2_STATE))
495                         pthread_cond_wait(&stateCond, &stateMutex);
496                 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
497                 pthread_mutex_unlock(&stateMutex);
498
499                 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
500                 {
501                         perror("dhtRemove():socket()");
502                         return -1;
503                 }
504                 pollsock.events = POLLIN;
505
506                 outBuffer[0] = REMOVE_CMD;
507                 write4(&outBuffer[1], key);
508
509                 for (i = 0; i < REMOVE_RETRIES; i++)
510                 {
511                         if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
512                                 socklen) < 0)
513                         {
514                                 perror("dhtRemove():sendto()");
515                                 break;
516                         }
517                         retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS);
518                         if (retval < 0)
519                         {
520                                 perror("dhtRemove():poll()");
521                                 break;
522                         }
523                         if (retval > 0)
524                         {
525                                 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
526                                         (struct sockaddr *)&fromAddr, &socklen);
527                                 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
528                                         && fromAddr.sin_port == toAddr.sin_port
529                                         && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES)
530                                 {
531                                         status = inBuffer[1]; //status from remote host
532                                         break;
533                                 }
534                         }
535                 }
536                 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
537                 {
538                         pthread_mutex_lock(&stateMutex);
539                         setState(REBUILD0_STATE);
540                         outBuffer[0] = REBUILD_REQ;
541                         udpSend(outBuffer, 1, leader);
542                         pthread_mutex_unlock(&stateMutex);
543                 }
544         }
545
546         close(pollsock.fd);
547
548         return status;
549 }
550
551 int dhtRemoveMult(unsigned int numKeys, unsigned int *keys)
552 {
553         int status;
554         int i;
555
556         status = 0;
557         for (i = 0; i < numKeys; i++)
558         {
559                 if (dhtRemove(keys[i]) != 0)
560                         status = -1;
561         }
562         return status;
563 }
564
565 int dhtSearch(unsigned int key, unsigned int *val)
566 {
567         struct sockaddr_in toAddr;
568         struct sockaddr_in fromAddr;
569         socklen_t socklen = sizeof(struct sockaddr_in);
570         struct pollfd pollsock;
571         char inBuffer[6];
572         char outBuffer[5];
573         ssize_t bytesRcvd;
574         int i;
575         int retval;
576         int status = -1;
577
578         bzero((char *)&toAddr, socklen);
579         toAddr.sin_family = AF_INET;
580         toAddr.sin_port = htons(UDP_PORT);
581
582         while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
583         {
584                 pthread_mutex_lock(&stateMutex);
585                 while (numBlocks == 0)
586                         pthread_cond_wait(&stateCond, &stateMutex);
587                 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
588                 pthread_mutex_unlock(&stateMutex);
589
590                 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
591                 {
592                         perror("dhtSearch():socket()");
593                         return -1;
594                 }
595                 pollsock.events = POLLIN;
596
597                 outBuffer[0] = SEARCH_CMD;
598                 write4(&outBuffer[1], key);
599
600                 for (i = 0; i < SEARCH_RETRIES; i++)
601                 {
602                         if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
603                                 socklen) < 0)
604                         {
605                                 perror("dhtSearch():sendto()");
606                                 break;
607                         }
608                         retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS);
609                         if (retval < 0)
610                         {
611                                 perror("dhtSearch():poll()");
612                                 break;
613                         }
614                         if (retval > 0)
615                         {
616                                 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0,
617                                         (struct sockaddr *)&fromAddr, &socklen);
618                                 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
619                                         && fromAddr.sin_port == toAddr.sin_port
620                                         && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES)
621                                 {
622                                         status = inBuffer[1]; //status from remote host
623                                         *val = read4(&inBuffer[2]);
624                                         break;
625                                 }
626                         }
627                 }
628                 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
629                 {
630                         pthread_mutex_lock(&stateMutex);
631                         setState(REBUILD0_STATE);
632                         outBuffer[0] = REBUILD_REQ;
633                         udpSend(outBuffer, 1, leader);
634                         pthread_mutex_unlock(&stateMutex);
635                 }
636         }
637
638         close(pollsock.fd);
639
640         return status;
641 }
642
643 int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals)
644 {
645         int i;
646         int status = 0;
647         for (i = 0; i < numKeys; i++)
648         {
649                 if (dhtSearch(keys[i], &vals[i]) != 0)
650                         status = -1;
651         }
652         return status;
653 }
654
655 /*******************************************************************************
656 *                      Local Function Definitions
657 *******************************************************************************/
658
659 int msgSizeOk(unsigned char *msg, unsigned int size)
660 {
661         unsigned short tmpNumHosts;
662         unsigned short tmpNumBlocks;
663
664         if (size < 1)
665                 return 1;
666
667         switch (msg[0])
668         {
669                 case WHO_IS_LEADER_CMD:
670                 case LEAVE_REQ:
671                 case LEAVE_RES:
672                 case DHT_UPDATE_RES:
673                 case REBUILD_REQ:
674                 case REBUILD_CMD:
675                 case FILL_DHT_CMD:
676                 case FILL_DHT_RES:
677                 case RESUME_NORMAL_CMD:
678                 case RESUME_NORMAL_RES:
679                         return (size == 1);
680                 case INSERT_RES:
681                 case REMOVE_RES:
682                 case JOIN_RES:
683                         return (size == 2);
684                 case REMOVE_CMD:
685                 case SEARCH_CMD:
686                 case WHO_IS_LEADER_RES:
687                 case JOIN_REQ:
688                 case ELECT_LEADER_CMD:
689                         return (size == 5);
690                 case SEARCH_RES:
691                         return (size == 6);
692                 case INSERT_CMD:
693                         return (size == 9);
694                 case DHT_UPDATE_CMD:
695                         if (size < 5)
696                                 return 1;
697                         tmpNumHosts = read2(&msg[1]);
698                         tmpNumBlocks = read2(&msg[3]);
699                         return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks));
700                 case ELECT_LEADER_RES:
701                         if (size < 2)
702                                 return 1;
703                         if (msg[1] == 0xFF)
704                                 return (size == 2);
705                         if (size < 4)
706                                 return 1;
707                         tmpNumHosts = read2(&msg[2]);
708                         return (size == (4 + sizeof(struct hostData) * tmpNumHosts));
709                 case CONGRATS_CMD:
710                         if (size < 3)
711                                 return 1;
712                         tmpNumHosts = read2(&msg[1]);
713                         return (size == (3 + sizeof(struct hostData) * tmpNumHosts));
714                 default:
715                         return 1;
716         }
717 }
718
719 unsigned short read2(unsigned char *ptr)
720 {
721         unsigned short tmp = (ptr[1] << 8) | ptr[0];
722         return tmp;
723 }
724
725 unsigned int read4(unsigned char *ptr)
726 {
727         unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0];
728         return tmp;
729 }
730
731 void write2(unsigned char *ptr, unsigned short tmp)
732 {
733         ptr[1] = (tmp >> 8) & 0xFF;
734         ptr[0] = tmp & 0xFF;
735         return;
736 }
737
738 void write4(unsigned char *ptr, unsigned int tmp)
739 {
740         ptr[3] = (tmp >> 24) & 0xFF;
741         ptr[2] = (tmp >> 16) & 0xFF;
742         ptr[1] = (tmp >> 8) & 0xFF;
743         ptr[0] = tmp & 0xFF;
744         return;
745 }
746
747 unsigned int getMyIpAddr(const char *interfaceStr)
748 {       
749         int sock;
750         struct ifreq interfaceInfo;
751         struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
752
753         memset(&interfaceInfo, 0, sizeof(struct ifreq));
754
755         if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
756         {
757                 perror("getMyIpAddr():socket()");
758                 return 1;
759         }
760
761         strcpy(interfaceInfo.ifr_name, interfaceStr);
762         myAddr->sin_family = AF_INET;
763         
764         if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
765         {
766                 perror("getMyIpAddr():ioctl()");
767                 return 1;
768         }
769
770         return ntohl(myAddr->sin_addr.s_addr);
771 }
772
773 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp)
774 {
775         struct sockaddr_in peerAddr;
776         socklen_t socklen = sizeof(struct sockaddr_in);
777
778         bzero(&peerAddr, socklen);
779         peerAddr.sin_family = AF_INET;
780         peerAddr.sin_addr.s_addr = htonl(destIp);
781         peerAddr.sin_port = htons(UDP_PORT);
782
783         if (size >= 1)
784         {
785                 if (msg[0] < NUM_MSG_TYPES)
786                         dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]],
787                                 inet_ntoa(peerAddr.sin_addr), size);
788                 else
789                         dhtLog("udpSend(): sending unknown message to %s, %d bytes\n",
790                                 inet_ntoa(peerAddr.sin_addr), size);
791         }
792
793         if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr,
794                         socklen) < 0)
795         {
796                 perror("udpSend():sendto()");
797                 return -1;
798         }
799         
800         return 0;
801 }
802
803 int udpSendAll(unsigned char *msg, unsigned int size)
804 {
805         int i;
806         int status = 0;
807         for (i = 0; i < numHosts; i++)
808         {
809                 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
810                 {
811                         if (udpSend(msg, size, hostArray[i].ipAddr) != 0)
812                                 status = -1;
813                 }
814         }
815         return status;
816 }
817
818 //note: make sure this is only executed in a valid state, where numBlocks != 0
819 unsigned int hash(unsigned int x)
820 {
821         return (x % numBlocks);
822 }
823
824 //note: make sure this is only executed in a valid state, where these arrays
825 // are allocated and the index mappings are consistent
826 unsigned int getKeyOwner(unsigned int key)
827 {
828         return hostArray[blockOwnerArray[hash(key)]].ipAddr;
829 }
830
831 //sets state and timer, if applicable
832 void setState(unsigned int newState)
833 {
834         struct timeval now;
835         int i;
836         
837         gettimeofday(&now, NULL);
838         
839         if (newState >= NUM_STATES)
840         {
841                 dhtLog("setState(): ERROR: invalid state %d\n", newState);
842         }
843         else
844         {
845                 if (timeout_vals[newState].tv_sec == 0
846                         && timeout_vals[newState].tv_usec == 0)
847                 { //no timer
848                         timerSet = 0;
849                 }
850                 else
851                 {
852                         timeradd(&now, &timeout_vals[newState], &timer);
853                         timerSet = 1;
854                 }
855                 timeoutCntr = 0;
856                 state = newState;
857                 //TODO: only do this for states that require it
858                 for (i = 0; i < numHosts; i++)
859                         hostReplied[i] = 0;
860
861                 dhtLog("setState(): state set to %s\n", state_names[state]);
862         }
863
864         return;
865 }
866
867 //TODO: improve these simple and inefficient functions
868 int checkReplied(unsigned int ipAddr)
869 {
870         int i;
871
872         i = findHost(ipAddr);
873
874         if (i == -1)
875                 return -1;
876
877         hostReplied[i] = 1;
878
879         return 0;
880 }
881
882 int allReplied()
883 {
884         int i;
885
886         for (i = 0; i < numHosts; i++)
887                 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
888                         return 0;
889         
890         return 1;
891 }
892
893 int findHost(unsigned int ipAddr)
894 {
895         int i;
896
897         for (i = 0; i < numHosts; i++)
898                 if (hostArray[i].ipAddr == ipAddr)
899                         return i; //found, return index
900         
901         return -1; //not found
902 }
903
904 int removeHost(unsigned int ipAddr)
905 {
906         int i, j;
907
908         i = findHost(ipAddr);
909
910         if (i == -1)
911                 return -1;
912
913         for (j = 0; j < numBlocks; j++)
914         {
915                 if (blockOwnerArray[j] == i)
916                         blockOwnerArray[j] = 0; //TODO: is this what I want to have happen?
917                 else if (blockOwnerArray[j] > i)
918                         blockOwnerArray[j]--;
919         }
920
921         for (; i < numHosts - 1; i++)
922         {
923                 hostArray[i] = hostArray[i+1];
924                 hostReplied[i] = hostReplied[i+1];
925         }
926         numHosts--;
927
928         return 0;
929 }
930
931 void removeUnresponsiveHosts()
932 {
933         int i;
934
935         for (i = 0; i < numHosts; i++)
936         {
937                 if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr)
938                         removeHost(hostArray[i].ipAddr);
939         }
940 }
941
942 int addHost(struct hostData newHost)
943 {
944         struct hostData *newHostArray;
945         unsigned char *newHostReplied;
946         int i;
947         int j;
948
949         for (i = 0; i < numHosts; i++)
950         {
951                 if (hostArray[i].ipAddr == newHost.ipAddr)
952                 {
953                         hostArray[i] = newHost;
954                         hostReplied[i] = 0;
955                         return 0;
956                 }
957                 else if (hostArray[i].ipAddr > newHost.ipAddr)
958                 {
959                         if (numHosts == hostArraySize)
960                         {
961                                 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
962                                 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
963                                 memcpy(newHostArray, hostArray, (i * sizeof(struct hostData)));
964                                 memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char)));
965                                 newHostArray[i] = newHost;
966                                 newHostReplied[i] = 0;
967                                 memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) *
968                                         sizeof(struct hostData)));
969                                 memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) *
970                                         sizeof(unsigned char)));
971                                 free(hostArray);
972                                 free(hostReplied);
973                                 hostArray = newHostArray;
974                                 hostReplied = newHostReplied;
975                                 hostArraySize = 2 * hostArraySize;
976                         }
977                         else
978                         {
979                                 for (j = numHosts; j > i; j--)
980                                 {
981                                         hostArray[j] = hostArray[j-1];
982                                         hostReplied[j] = hostReplied[j-1];
983                                 }
984                                 hostArray[i] = newHost;
985                                 hostReplied[i] = 0;
986                         }
987                         for(j = 0; j < numBlocks; j++)
988                         {
989                                 if (blockOwnerArray[j] >= i)
990                                         blockOwnerArray[j]++;
991                         }
992                         numHosts++;
993                         return 1;
994                 }
995         }
996
997         //nothing greater, add to end
998         if (numHosts == hostArraySize)
999         {
1000                 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
1001                 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
1002                 memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData)));
1003                 memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char)));
1004                 free(hostArray);
1005                 free(hostReplied);
1006                 hostArray = newHostArray;
1007                 hostReplied = newHostReplied;
1008                 hostArraySize = 2 * hostArraySize;
1009         }
1010
1011         hostArray[numHosts] = newHost;
1012         hostReplied[numHosts] = 0;
1013         numHosts++;
1014         return 1;
1015 }
1016
1017 void makeAssignments()
1018 {
1019         int i;
1020
1021         if (numBlocks < numHosts)
1022         {
1023                 free(blockOwnerArray);
1024                 while (numBlocks < numHosts)
1025                         numBlocks *= 2;
1026                 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
1027         }
1028
1029         for (i = 0; i < numBlocks; i++)
1030                 blockOwnerArray[i]  = i % numHosts;
1031
1032         return;
1033 }
1034
1035 void writeHostList()
1036 {
1037         int i;
1038         struct in_addr tmpAddr;
1039
1040         fprintf(logfile, "numHosts = %d\n", numHosts);
1041         for (i = 0; i < numHosts; i++)
1042         {
1043                 tmpAddr.s_addr = htonl(hostArray[i].ipAddr);
1044                 fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr),
1045                         hostArray[i].maxKeyCapacity);
1046         }
1047         return;
1048 }
1049
1050 void dhtLog(const char *format, ...)
1051 {
1052         va_list args;
1053 //      struct timeval now;
1054
1055 //      if (gettimeofday(&now, NULL) < 0)
1056 //      {       perror("dhtLog():gettimeofday()"); }
1057         va_start(args, format);
1058 //      if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
1059 //      {       perror("dhtLog():fprintf()"); }
1060         if (vfprintf(logfile, format, args) < 0)
1061         {       perror("dhtLog():vfprintf()"); }
1062         if (fflush(logfile) == EOF)
1063         {       perror("dhtLog():fflush()"); }
1064         va_end(args);
1065
1066         return;
1067 }
1068
1069 void *fillTask()
1070 {
1071         unsigned int *vals;
1072         unsigned int *keys;
1073         unsigned int numKeys;
1074         int i;
1075         
1076         vals = mhashGetKeys(&numKeys); //note: key of mhash is val of dht
1077         keys = calloc(numKeys, sizeof(unsigned int));
1078
1079         for (i = 0; i < numKeys; i++)
1080                 keys[i] = myHostData.ipAddr;
1081
1082         if (dhtInsertMult(numKeys, keys, vals) == 0)
1083                 fillStatus = 2;
1084         else
1085                 fillStatus = 3;
1086         
1087         pthread_exit(NULL);
1088 }
1089
1090 void *udpListen()
1091 {
1092         ssize_t bytesRcvd;
1093         struct sockaddr_in peerAddr;
1094         unsigned int peerIp;
1095         socklen_t socklen = sizeof(struct sockaddr_in);
1096         unsigned char inBuffer[MAX_MSG_SIZE];
1097         unsigned char outBuffer[MAX_MSG_SIZE];
1098         int pollret;
1099         struct timeval now;
1100         struct in_addr tmpAddr;
1101         struct hostData tmpHost;
1102         unsigned int tmpKey;
1103         unsigned int tmpVal;
1104         struct hostData *hostDataPtr;
1105         unsigned short *uShortPtr;
1106         unsigned int tmpUInt;
1107         unsigned int tmpUShort;
1108         int i;
1109         unsigned int oldState;
1110
1111         dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
1112
1113         while (1)
1114         {
1115                 pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
1116                 pthread_mutex_lock(&stateMutex);
1117                 oldState = state;
1118                 if (pollret < 0)
1119                 {
1120                         perror("udpListen():poll()");
1121                 }
1122                 else if (pollret > 0)
1123                 {
1124                         bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
1125                                 (struct sockaddr *)&peerAddr, &socklen);
1126                         if (bytesRcvd < 1)
1127                         {
1128                                 dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
1129                         }
1130                         else if (inBuffer[0] >= NUM_MSG_TYPES)
1131                         {
1132                                 dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
1133                         }
1134                         else if (!msgSizeOk(inBuffer, bytesRcvd))
1135                         {
1136                                 dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
1137                                         msg_types[inBuffer[0]], bytesRcvd);
1138                         }
1139                         else if (state == EXIT2_STATE)
1140                         {
1141                                 //do nothing
1142                         }
1143                         else if (state == INIT1_STATE)
1144                         { //after initialization with seed, do not proceed until seed replies
1145                                 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1146                                         msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1147                                 for (i = 0; i < bytesRcvd; i++)
1148                                         dhtLog(" %x", inBuffer[i]);
1149                                 dhtLog("\n");
1150                                 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1151                                 if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES)
1152                                 {
1153                                         tmpHost.ipAddr = peerIp;
1154                                         tmpHost.maxKeyCapacity = 0;
1155                                         addHost(tmpHost);
1156                                         writeHostList();
1157                                         leader = read4(&inBuffer[1]);
1158                                         tmpAddr.s_addr = htonl(leader);
1159                                         dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
1160                                         if (leader != 0)
1161                                         {
1162                                                 setState(INIT2_STATE);
1163                                                 outBuffer[0] = JOIN_REQ;
1164                                                 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1165                                                 udpSend(outBuffer, 5, leader);
1166                                         }
1167                                         else
1168                                         {
1169                                                 electionOriginator = myHostData.ipAddr;
1170                                                 setState(ELECT1_STATE);
1171                                                 outBuffer[0] = ELECT_LEADER_CMD;
1172                                                 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1173                                                 udpSendAll(outBuffer, 5);
1174                                         }
1175                                 }
1176                         }
1177                         else
1178                         {
1179                                 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1180                                         msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1181                                 for (i = 0; i < bytesRcvd; i++)
1182                                         dhtLog(" %x", inBuffer[i]);
1183                                 dhtLog("\n");
1184                                 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1185                                 switch (inBuffer[0])
1186                                 {
1187                                         case INSERT_CMD:
1188                                                 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1189                                                         || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
1190                                                         || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE)
1191                                                 {
1192                                                         tmpKey = read4(&inBuffer[1]);
1193                                                         tmpVal = read4(&inBuffer[5]);
1194                                                         outBuffer[0] = INSERT_RES;
1195                                                         if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1196                                                         {
1197                                                                 if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
1198                                                                         outBuffer[1] = OPERATION_OK;
1199                                                                 else
1200                                                                         outBuffer[1] = INTERNAL_ERROR;
1201                                                         }
1202                                                         else
1203                                                         {
1204                                                                 outBuffer[1] = NOT_KEY_OWNER;
1205                                                         }
1206                                                         //reply to client socket
1207                                                         sendto(udpPollSock.fd, outBuffer, 2, 0,
1208                                                                 (struct sockaddr *)&peerAddr, socklen);
1209                                                 }
1210                                                 break;
1211                                         case REMOVE_CMD:
1212                                                 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1213                                                         || state == LEAD_NORMAL2_STATE)
1214                                                 {
1215                                                         tmpKey = read4(&inBuffer[1]);
1216                                                         outBuffer[0] = REMOVE_RES;
1217                                                         if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1218                                                         {
1219                                                                 if (chashRemove(myHashTable, tmpKey) == 0)
1220                                                                         outBuffer[1] = OPERATION_OK;
1221                                                                 else
1222                                                                         outBuffer[1] = KEY_NOT_FOUND;
1223                                                         }
1224                                                         else
1225                                                         {
1226                                                                 outBuffer[1] = NOT_KEY_OWNER;
1227                                                         }
1228                                                         //reply to client socket
1229                                                         sendto(udpPollSock.fd, outBuffer, 2, 0,
1230                                                                 (struct sockaddr *)&peerAddr, socklen);
1231                                                 }
1232                                                 break;
1233                                         case SEARCH_CMD:
1234                                                 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1235                                                         || state == LEAD_NORMAL2_STATE)
1236                                                 {
1237                                                         tmpKey = read4(&inBuffer[1]);
1238                                                         outBuffer[0] = SEARCH_RES;
1239                                                         if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1240                                                         {
1241                                                                 if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0)
1242                                                                 {
1243                                                                         outBuffer[1] = OPERATION_OK;
1244                                                                         write4(&outBuffer[2], tmpVal);
1245                                                                 }
1246                                                                 else
1247                                                                 {
1248                                                                         outBuffer[1] = KEY_NOT_FOUND;
1249                                                                         write4(&outBuffer[2], 0);
1250                                                                 }
1251                                                         }
1252                                                         else
1253                                                         {
1254                                                                 outBuffer[1] = NOT_KEY_OWNER;
1255                                                                 write4(&outBuffer[2], 0);
1256                                                         }
1257                                                         //reply to client socket
1258                                                         sendto(udpPollSock.fd, outBuffer, 6, 0,
1259                                                                 (struct sockaddr *)&peerAddr, socklen);
1260                                                 }
1261                                                 break;
1262                                         case WHO_IS_LEADER_CMD:
1263                                                 tmpHost.ipAddr = peerIp;
1264                                                 tmpHost.maxKeyCapacity = 0;
1265                                                 addHost(tmpHost);
1266                                                 writeHostList();
1267                                                 outBuffer[0] = WHO_IS_LEADER_RES;
1268                                                 //leader == 0 means I don't know who it is
1269                                                 write4(&outBuffer[1], leader);
1270                                                 udpSend(outBuffer, 5, peerIp);
1271                                                 break;
1272                                         case JOIN_REQ:
1273                                                 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1274                                                 {
1275                                                         tmpHost.ipAddr = peerIp;
1276                                                         tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
1277                                                         addHost(tmpHost);
1278                                                         writeHostList();
1279                                                         if (state == LEAD_NORMAL1_STATE)
1280                                                                 setState(LEAD_NORMAL2_STATE);
1281                                                         outBuffer[0] = JOIN_RES;
1282                                                         outBuffer[1] = 0; //status, success
1283                                                         udpSend(outBuffer, 2, peerIp);
1284                                                 }
1285                                                 else if (state == LEAD_REBUILD1_STATE)
1286                                                 {
1287                                                         //note: I don't need to addHost().
1288                                                         checkReplied(peerIp);
1289                                                         outBuffer[0] = JOIN_RES;
1290                                                         outBuffer[1] = 0; //status, success
1291                                                         udpSend(outBuffer, 2, peerIp);
1292                                                         if (allReplied())
1293                                                         {
1294                                                                 makeAssignments();
1295                                                                 setState(LEAD_REBUILD2_STATE);
1296                                                                 outBuffer[0] = DHT_UPDATE_CMD;
1297                                                                 write2(&outBuffer[1], numHosts);
1298                                                                 write2(&outBuffer[3], numBlocks);
1299                                                                 memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
1300                                                                 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1301                                                                         blockOwnerArray, numBlocks*2);
1302                                                                 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1303                                                                         + 2 * numBlocks);
1304                                                         }
1305                                                 }
1306                                                 break;
1307                                         case JOIN_RES:
1308                                                 if (state == REBUILD1_STATE)
1309                                                 {
1310                                                         setState(REBUILD2_STATE);
1311                                                 }
1312                                                 else if (state == INIT2_STATE)
1313                                                 {
1314                                                         setState(NORMAL_STATE);
1315                                                 }
1316                                                 break;
1317                                         case LEAVE_REQ:
1318                                                 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1319                                                 { //TODO: make this graceful, instead of just rebuilding
1320                                                         removeHost(peerIp);
1321                                                         if (state != LEAD_NORMAL2_STATE)
1322                                                                 setState(LEAD_NORMAL2_STATE);
1323                                                 }
1324                                                 break;
1325                                         case DHT_UPDATE_CMD:
1326                                                 if (state == REBUILD2_STATE && peerIp == leader)
1327                                                 {
1328                                                         free(hostArray);
1329                                                         free(blockOwnerArray);
1330                                                         numHosts = read2(&inBuffer[1]);
1331                                                         numBlocks = read2(&inBuffer[3]);
1332                                                         while (hostArraySize < numHosts)
1333                                                                 hostArraySize *= 2;
1334                                                         hostArray = calloc(hostArraySize, sizeof(struct hostData));
1335                                                         blockOwnerArray = calloc(numBlocks, 2);
1336                                                         memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
1337                                                         memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
1338                                                         writeHostList();
1339                                                         setState(REBUILD3_STATE);
1340                                                         outBuffer[0] = DHT_UPDATE_RES;
1341                                                         udpSend(outBuffer, 1, peerIp);
1342                                                 }
1343                                                 break;
1344                                         case DHT_UPDATE_RES:
1345                                                 if (state == LEAD_REBUILD2_STATE)
1346                                                 {
1347                                                         checkReplied(peerIp);
1348                                                         if (allReplied())
1349                                                         {
1350                                                                 setState(LEAD_REBUILD3_STATE);
1351                                                                 outBuffer[0] = FILL_DHT_CMD;
1352                                                                 udpSendAll(outBuffer, 1);
1353                                                                 if (fillStatus != 0)
1354                                                                         dhtLog("udpListen(): ERROR: fillTask already running\n");
1355                                                                 fillStatus = 1;
1356                                                                 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1357                                                                         dhtLog("udpListen(): ERROR creating threadFillTask\n");
1358                                                         }
1359                                                 }
1360                                                 break;
1361                                         case ELECT_LEADER_CMD:
1362                                                 tmpUInt = read4(&inBuffer[1]);
1363                                                 if ((state == ELECT1_STATE || state == ELECT2_STATE)
1364                                                         && tmpUInt >= electionOriginator)
1365                                                 { //already participating in a higher-priority election
1366                                                         outBuffer[0] = ELECT_LEADER_RES;
1367                                                         outBuffer[1] = 0xFF;
1368                                                         udpSend(outBuffer, 2, peerIp);
1369                                                 }
1370                                                 else
1371                                                 { //join election
1372                                                         electionOriginator = tmpUInt;
1373                                                         electionParent = peerIp;
1374                                                         setState(ELECT1_STATE);
1375                                                         outBuffer[0] = ELECT_LEADER_CMD;
1376                                                         write4(&outBuffer[1], electionOriginator);
1377                                                         //don't bother forwarding the message to originator or parent
1378                                                         checkReplied(electionOriginator);
1379                                                         checkReplied(electionParent);
1380                                                         if (allReplied())
1381                                                         { //in case that is everybody I know of
1382                                                                 setState(ELECT2_STATE);
1383                                                                 outBuffer[0] = ELECT_LEADER_RES;
1384                                                                 outBuffer[1] = 0;
1385                                                                 write2(&outBuffer[2], numHosts);
1386                                                                 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1387                                                                         * numHosts);
1388                                                                 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1389                                                                         electionParent);
1390                                                         }
1391                                                         else
1392                                                         {
1393                                                                 udpSendAll(outBuffer, 5);
1394                                                         }
1395                                                 }
1396                                                 break;
1397                                         case ELECT_LEADER_RES:
1398                                                 if (state == ELECT1_STATE)
1399                                                 {
1400                                                         checkReplied(peerIp);
1401                                                         if (inBuffer[1] != 0xFF)
1402                                                         {
1403                                                                 tmpUShort = read2(&inBuffer[2]);
1404                                                                 hostDataPtr = (struct hostData *)&inBuffer[4];
1405                                                                 for (i = 0; i < tmpUShort; i++)
1406                                                                         addHost(hostDataPtr[i]);
1407                                                                 writeHostList();
1408                                                         }
1409                                                         if (allReplied())
1410                                                         {
1411                                                                 setState(ELECT2_STATE);
1412                                                                 if (electionOriginator == myHostData.ipAddr)
1413                                                                 {
1414                                                                         leader = hostArray[0].ipAddr;
1415                                                                         if (leader == myHostData.ipAddr)
1416                                                                         { //I am the leader
1417                                                                                 dhtLog("I am the leader!\n");
1418                                                                                 setState(LEAD_REBUILD1_STATE);
1419                                                                                 outBuffer[0] = REBUILD_CMD;
1420                                                                                 udpSendAll(outBuffer, 1);
1421                                                                         }
1422                                                                         else
1423                                                                         { //notify leader
1424                                                                                 outBuffer[0] = CONGRATS_CMD;
1425                                                                                 write2(&outBuffer[1], numHosts);
1426                                                                                 hostDataPtr = (struct hostData *)&outBuffer[3];
1427                                                                                 for (i = 0; i < numHosts; i++)
1428                                                                                         hostDataPtr[i] = hostArray[i];
1429                                                                                 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1430                                                                                         leader);
1431                                                                         }
1432                                                                 }
1433                                                                 else
1434                                                                 {
1435                                                                         outBuffer[0] = ELECT_LEADER_RES;
1436                                                                         outBuffer[1] = 0;
1437                                                                         write2(&outBuffer[2], numHosts);
1438                                                                         hostDataPtr = (struct hostData *)&outBuffer[4];
1439                                                                         for (i = 0; i < numHosts; i++)
1440                                                                                 hostDataPtr[i] = hostArray[i];
1441                                                                         udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1442                                                                                 electionParent);
1443                                                                 }
1444                                                         }
1445                                                 }
1446                                                 break;
1447                                         case CONGRATS_CMD:
1448                                                 if (state == ELECT2_STATE)
1449                                                 { //I am the leader
1450                                                         leader = myHostData.ipAddr;
1451                                                         dhtLog("I am the leader!\n");
1452                                                         tmpUShort = read2(&inBuffer[1]);
1453                                                         hostDataPtr = (struct hostData *)&inBuffer[3];
1454                                                         for (i = 0; i < tmpUShort; i++)
1455                                                                 addHost(hostDataPtr[i]);
1456                                                         writeHostList();
1457                                                         setState(LEAD_REBUILD1_STATE);
1458                                                         outBuffer[0] = REBUILD_CMD;
1459                                                         udpSendAll(outBuffer, 1);
1460                                                 }
1461                                                 break;
1462                                         case REBUILD_REQ:
1463                                                 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1464                                                 {
1465                                                         setState(LEAD_REBUILD1_STATE);
1466                                                         outBuffer[0] = REBUILD_CMD;
1467                                                         udpSendAll(outBuffer, 1);
1468                                                 }
1469                                                 break;
1470                                         case REBUILD_CMD:
1471                                                 leader = peerIp; //consider this a declaration of authority
1472                                                 setState(REBUILD1_STATE);
1473                                                 outBuffer[0] = JOIN_REQ;
1474                                                 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1475                                                 udpSend(outBuffer, 5, leader);
1476                                                 break;
1477                                         case FILL_DHT_CMD:
1478                                                 if (state == REBUILD3_STATE && peerIp == leader)
1479                                                 {
1480                                                         setState(REBUILD4_STATE);
1481                                                         if (fillStatus != 0)
1482                                                                 dhtLog("udpListen(): ERROR: fillTask already running\n");
1483                                                         fillStatus = 1;
1484                                                         if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1485                                                                 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1486                                                 }
1487                                                 break;
1488                                         case FILL_DHT_RES:
1489                                                 if (state == LEAD_REBUILD3_STATE)
1490                                                 {
1491                                                         checkReplied(peerIp);
1492                                                         if (allReplied() && fillStatus == 2)
1493                                                         {
1494                                                                 fillStatus = 0;
1495                                                                 setState(LEAD_REBUILD4_STATE);
1496                                                                 outBuffer[0] = RESUME_NORMAL_CMD;
1497                                                                 udpSendAll(outBuffer, 1);
1498                                                         }
1499                                                 }
1500                                                 break;
1501                                         case RESUME_NORMAL_CMD:
1502                                                 if (state == REBUILD5_STATE && peerIp == leader)
1503                                                 {
1504                                                         setState(NORMAL_STATE);
1505                                                         outBuffer[0] = RESUME_NORMAL_RES;
1506                                                         udpSend(outBuffer, 1, leader);
1507                                                 }
1508                                                 break;
1509                                         case RESUME_NORMAL_RES:
1510                                                 if (state == LEAD_REBUILD4_STATE)
1511                                                 {
1512                                                         checkReplied(peerIp);
1513                                                         if (allReplied())
1514                                                         {
1515                                                                 setState(LEAD_NORMAL1_STATE);
1516                                                         }
1517                                                 }
1518                                                 break;
1519                                 }
1520                         }
1521                 }
1522                 if (state == REBUILD4_STATE)
1523                 {
1524                         switch (fillStatus)
1525                         {
1526                                 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
1527                                         break;
1528                                 case 1: //do nothing
1529                                         break;
1530                                 case 2: //done filling the dht, notify leader
1531                                         fillStatus = 0;
1532                                         setState(REBUILD5_STATE);
1533                                         outBuffer[0] = FILL_DHT_RES;
1534                                         udpSend(outBuffer, 1, leader);
1535                                         break;
1536                                 case 3: //error encountered -> restart rebuild
1537                                         fillStatus = 0;
1538                                         setState(REBUILD0_STATE);
1539                                         outBuffer[0] = REBUILD_REQ;
1540                                         udpSend(outBuffer, 1, leader);
1541                                         break;
1542                         }
1543                 }
1544                 if (state == LEAD_REBUILD3_STATE)
1545                 {
1546                         switch (fillStatus)
1547                         {
1548                                 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
1549                                         break;
1550                                 case 1: //do nothing
1551                                         break;
1552                                 case 2: //I'm done, now is everybody else also done?
1553                                         if (allReplied())
1554                                         {
1555                                                 fillStatus = 0;
1556                                                 setState(LEAD_REBUILD4_STATE);
1557                                                 outBuffer[0] = RESUME_NORMAL_CMD;
1558                                                 udpSendAll(outBuffer, 1);
1559                                         }
1560                                         break;
1561                                 case 3: //error encountered -> restart rebuild
1562                                         fillStatus = 0;
1563                                         setState(LEAD_REBUILD1_STATE);
1564                                         outBuffer[0] = REBUILD_CMD;
1565                                         udpSendAll(outBuffer, 1);
1566                                         break;
1567                         }
1568                 }
1569                 if (timerSet)
1570                 {
1571                         gettimeofday(&now, NULL);
1572                         if (timercmp(&now, &timer, >))
1573                         {
1574                                 if (timeoutCntr < retry_vals[state])
1575                                 {
1576                                         timeoutCntr++;
1577                                         timeradd(&now, &timeout_vals[state], &timer);
1578                                         dhtLog("udpListen(): retry: %d\n", timeoutCntr);
1579                                         switch (state)
1580                                         {
1581                                                 case INIT1_STATE:
1582                                                         outBuffer[0] = WHO_IS_LEADER_CMD;
1583                                                         udpSend(outBuffer, 1, seed);
1584                                                         break;
1585                                                 case INIT2_STATE:
1586                                                         outBuffer[0] = JOIN_REQ;
1587                                                         write4(&outBuffer[1], myHostData.maxKeyCapacity);
1588                                                         udpSend(outBuffer, 5, leader);
1589                                                         break;
1590                                                 case ELECT1_STATE:
1591                                                         outBuffer[0] = ELECT_LEADER_CMD;
1592                                                         write4(&outBuffer[1], electionOriginator);
1593                                                         udpSendAll(outBuffer, 5);
1594                                                         break;
1595                                                 case ELECT2_STATE:
1596                                                         if (electionOriginator == myHostData.ipAddr)
1597                                                         { //retry notify leader
1598                                                                 outBuffer[0] = CONGRATS_CMD;
1599                                                                 write2(&outBuffer[1], numHosts);
1600                                                                 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1601                                                                         * numHosts);
1602                                                                 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1603                                                                         leader);
1604                                                         }
1605                                                         else
1606                                                         {
1607                                                                 outBuffer[0] = ELECT_LEADER_RES;
1608                                                                 outBuffer[1] = 0;
1609                                                                 write2(&outBuffer[2], numHosts);
1610                                                                 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1611                                                                         * numHosts);
1612                                                                 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1613                                                                         electionParent);
1614                                                         }
1615                                                         break;
1616                                                 case REBUILD0_STATE:
1617                                                         outBuffer[0] = REBUILD_REQ;
1618                                                         udpSend(outBuffer, 1, leader);
1619                                                         break;
1620                                                 case REBUILD1_STATE:
1621                                                         outBuffer[0] = JOIN_REQ;
1622                                                         write4(&outBuffer[1], myHostData.maxKeyCapacity);
1623                                                         udpSend(outBuffer, 5, leader);
1624                                                         break;
1625                                                 case REBUILD5_STATE:
1626                                                         outBuffer[0] = FILL_DHT_RES;
1627                                                         udpSend(outBuffer, 1, leader);
1628                                                         break;
1629                                                 case LEAD_REBUILD1_STATE:
1630                                                         outBuffer[0] = REBUILD_CMD;
1631                                                         udpSendAll(outBuffer, 1);
1632                                                         break;
1633                                                 case LEAD_REBUILD2_STATE:
1634                                                         outBuffer[0] = DHT_UPDATE_CMD;
1635                                                         write2(&outBuffer[1], numHosts);
1636                                                         write2(&outBuffer[3], numBlocks);
1637                                                         memcpy(&outBuffer[5], hostArray, numHosts
1638                                                                 * sizeof(struct hostData));
1639                                                         memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1640                                                                 blockOwnerArray, numBlocks*2);
1641                                                         udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1642                                                                 + 2 * numBlocks);
1643                                                         break;
1644                                                 case LEAD_REBUILD3_STATE:
1645                                                         outBuffer[0] = FILL_DHT_CMD;
1646                                                         udpSendAll(outBuffer, 1);
1647                                                         break;
1648                                                 case LEAD_REBUILD4_STATE:
1649                                                         outBuffer[0] = RESUME_NORMAL_CMD;
1650                                                         udpSendAll(outBuffer, 1);
1651                                                         break;
1652                                                 case EXIT1_STATE: //TODO...
1653                                                         break;
1654                                                 case NORMAL_STATE:
1655                                                 case LEAD_NORMAL1_STATE:
1656                                                 case LEAD_NORMAL2_STATE:
1657                                                 case REBUILD2_STATE:
1658                                                 case REBUILD3_STATE:
1659                                                 case REBUILD4_STATE:
1660                                                 case EXIT2_STATE: //we shouldn't get here
1661                                                         break;
1662                                         }
1663                                 }
1664                                 else
1665                                 {
1666                                         dhtLog("udpListen(): timed out in state %s after %d retries\n",
1667                                                 state_names[state], timeoutCntr);
1668                                         switch (state)
1669                                         {
1670                                                 case INIT1_STATE:
1671                                                         setState(EXIT2_STATE);
1672                                                         break;
1673                                                 case LEAD_NORMAL2_STATE:
1674                                                         setState(LEAD_REBUILD1_STATE);
1675                                                         outBuffer[0] = REBUILD_CMD;
1676                                                         udpSendAll(outBuffer, 1);
1677                                                         break;
1678                                                 case ELECT1_STATE:
1679                                                         dhtLog("removing unresponsive hosts, before:\n");
1680                                                         writeHostList();
1681                                                         removeUnresponsiveHosts();
1682                                                         dhtLog("after\n");
1683                                                         writeHostList();
1684                                                         setState(ELECT2_STATE);
1685                                                         if (electionOriginator == myHostData.ipAddr)
1686                                                         {
1687                                                                 leader = hostArray[0].ipAddr;
1688                                                                 if (leader == myHostData.ipAddr)
1689                                                                 { //I am the leader
1690                                                                         dhtLog("I am the leader!\n");
1691                                                                         setState(LEAD_REBUILD1_STATE);
1692                                                                         outBuffer[0] = REBUILD_CMD;
1693                                                                         udpSendAll(outBuffer, 1);
1694                                                                 }
1695                                                                 else
1696                                                                 { //notify leader
1697                                                                         outBuffer[0] = CONGRATS_CMD;
1698                                                                         write2(&outBuffer[1], numHosts);
1699                                                                         memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1700                                                                                 * numHosts);
1701                                                                         udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1702                                                                                 leader);
1703                                                                 }
1704                                                         }
1705                                                         else
1706                                                         {
1707                                                                 outBuffer[0] = ELECT_LEADER_RES;
1708                                                                 outBuffer[1] = 0;
1709                                                                 write2(&outBuffer[2], numHosts);
1710                                                                 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1711                                                                         * numHosts);
1712                                                                 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1713                                                                         electionParent);
1714                                                         }
1715                                                         break;
1716                                                 case INIT2_STATE:
1717                                                 case ELECT2_STATE:
1718                                                 case REBUILD0_STATE:
1719                                                 case REBUILD1_STATE:
1720                                                 case REBUILD2_STATE:
1721                                                 case REBUILD3_STATE:
1722                                                 case REBUILD4_STATE:
1723                                                 case REBUILD5_STATE:
1724                                                 case LEAD_REBUILD1_STATE:
1725                                                 case LEAD_REBUILD2_STATE:
1726                                                 case LEAD_REBUILD3_STATE:
1727                                                 case LEAD_REBUILD4_STATE:
1728                                                         //start election
1729                                                         electionOriginator = myHostData.ipAddr;
1730                                                         setState(ELECT1_STATE);
1731                                                         outBuffer[0] = ELECT_LEADER_CMD;
1732                                                         write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1733                                                         udpSendAll(outBuffer, 5);
1734                                                         break;
1735                                                 case EXIT1_STATE:
1736                                                         setState(EXIT2_STATE);
1737                                                         break;
1738                                                 case NORMAL_STATE:
1739                                                 case LEAD_NORMAL1_STATE:
1740                                                 case EXIT2_STATE: //we shouldn't get here
1741                                                         break;
1742                                         }
1743                                 }
1744                         }
1745                 }
1746                 if (state != oldState)
1747                         pthread_cond_broadcast(&stateCond);
1748                 pthread_mutex_unlock(&stateMutex);
1749         }
1750 }
1751