This commit was manufactured by cvs2svn to create tag 'buildscript'.
[IRC.git] /
1 /*******************************************************************************
2 *                                 dht.c
3 *
4 *  High-performance Distributed Hash Table for finding the location of objects
5 * in a Distributed Shared Transactional Memory system.
6 *
7 * Creator: Erik Rubow
8 *
9 * TODO:
10 * 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key
11 * counterparts repeatedly, define some new messages to handle it more
12 * efficiently.
13 * 2) Improve the efficiency of functions that work with hostArray, hostReplied,
14 * and blockOwnerArray.
15 * 3) Currently a join or leave causes a rebuild of the entire hash table.
16 * Implement more graceful join and leave procedures.
17 * 4) Fine tune timeout values for performance, possibly implement a backoff
18 * algorithm to prevent overloading the network.
19 * 5) Whatever else I'm forgetting
20 *
21 *******************************************************************************/
22 /*******************************************************************************
23 *                              Includes
24 *******************************************************************************/
25
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/ioctl.h>
31 #include <stdio.h>
32 #include <stdarg.h>
33 #include <string.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <pthread.h>
37 #include <sys/time.h>
38 #include <sys/poll.h>
39 #include <netdb.h>
40 #include <net/if.h>
41 #include <linux/sockios.h>
42 #include <sys/queue.h>
43 #include "dht.h"
44 #include "clookup.h" //this works for now, do we need anything better?
45 #include "mlookup.h"
46
47 /*******************************************************************************
48 *                           Local Defines, Structs
49 *******************************************************************************/
50
51 #define MAX_MSG_SIZE 1500
52 #define UDP_PORT 2157
53 #define INIT_HOST_ALLOC 3
54 #define INIT_NUM_BLOCKS 16
55 #define DEFAULT_INTERFACE "eth0"
56 #define TIMEOUT_PERIOD 100
57 #define INSERT_TIMEOUT_MS 500
58 #define INSERT_RETRIES 50
59 #define REMOVE_TIMEOUT_MS 500
60 #define REMOVE_RETRIES 50
61 #define SEARCH_TIMEOUT_MS 500
62 #define SEARCH_RETRIES 50
63
64 //message types
65 //make sure this matches msg_types global var
66 enum
67 {
68         INSERT_CMD,
69         INSERT_RES,
70         REMOVE_CMD,
71         REMOVE_RES,
72         SEARCH_CMD,
73         SEARCH_RES,
74         WHO_IS_LEADER_CMD,
75         WHO_IS_LEADER_RES,
76         JOIN_REQ,
77         JOIN_RES,
78         LEAVE_REQ,
79         LEAVE_RES,
80         DHT_UPDATE_CMD,
81         DHT_UPDATE_RES,
82         ELECT_LEADER_CMD,
83         ELECT_LEADER_RES,
84         CONGRATS_CMD,
85         REBUILD_REQ,
86         REBUILD_CMD,
87         FILL_DHT_CMD,
88         FILL_DHT_RES,
89         RESUME_NORMAL_CMD,
90         RESUME_NORMAL_RES,
91         NUM_MSG_TYPES
92 };
93
94 //states
95 //make sure this matches state_names, timeout_vals, and retry_vals global vars
96 enum
97 {
98         INIT1_STATE,
99         INIT2_STATE,
100         NORMAL_STATE,
101         LEAD_NORMAL1_STATE,
102         LEAD_NORMAL2_STATE,
103         ELECT1_STATE,
104         ELECT2_STATE,
105         REBUILD0_STATE,
106         REBUILD1_STATE,
107         REBUILD2_STATE,
108         REBUILD3_STATE,
109         REBUILD4_STATE,
110         REBUILD5_STATE,
111         LEAD_REBUILD1_STATE,
112         LEAD_REBUILD2_STATE,
113         LEAD_REBUILD3_STATE,
114         LEAD_REBUILD4_STATE,
115         EXIT1_STATE,
116         EXIT2_STATE,
117         NUM_STATES
118 };
119
120 //status codes
121 enum
122 {
123         OPERATION_OK,
124         KEY_NOT_FOUND,
125         NOT_KEY_OWNER,
126         NOT_LEADER,
127         INTERNAL_ERROR
128 };
129
130 struct hostData
131 {
132         unsigned int ipAddr;
133         unsigned int maxKeyCapacity;
134 };
135
136 /*******************************************************************************
137 *                         Local Function Prototypes
138 *******************************************************************************/
139
140 int msgSizeOk(unsigned char *msg, unsigned int size);
141 unsigned short read2(unsigned char *msg);
142 unsigned int read4(unsigned char *msg);
143 void write2(unsigned char *ptr, unsigned short tmp);
144 void write4(unsigned char *ptr, unsigned int tmp);
145 unsigned int getMyIpAddr(const char *interfaceStr);
146 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp);
147 int udpSendAll(unsigned char *msg, unsigned int size);
148 unsigned int hash(unsigned int x);
149 unsigned int getKeyOwner(unsigned int key);
150 void setState(unsigned int newState);
151 void makeAssignments();
152 int addHost(struct hostData newHost);
153 int removeHost(unsigned int ipAddr);
154 void removeUnresponsiveHosts();
155 int checkReplied(unsigned int ipAddr);
156 int allReplied();
157 void writeHostList();
158 void dhtLog(const char *format, ...);
159 void *fillTask();
160 void *udpListen();
161
162 /*******************************************************************************
163 *                           Global Variables
164 *******************************************************************************/
165
166 //make sure this matches enumeration above
167 const char *msg_types[NUM_MSG_TYPES] =
168 {
169         "INSERT_CMD",
170         "INSERT_RES",
171         "REMOVE_CMD",
172         "REMOVE_RES",
173         "SEARCH_CMD",
174         "SEARCH_RES",
175         "WHO_IS_LEADER_CMD",
176         "WHO_IS_LEADER_RES",
177         "JOIN_REQ",
178         "JOIN_RES",
179         "LEAVE_REQ",
180         "LEAVE_RES",
181         "DHT_UPDATE_CMD",
182         "DHT_UPDATE_RES",
183         "ELECT_LEADER_CMD",
184         "ELECT_LEADER_RES",
185         "CONGRATS_CMD",
186         "REBUILD_REQ",
187         "REBUILD_CMD",
188         "FILL_DHT_CMD",
189         "FILL_DHT_RES",
190         "RESUME_NORMAL_CMD",
191         "RESUME_NORMAL_RES"
192 };
193
194 const char *state_names[NUM_STATES] =
195 {
196         "INIT1_STATE",
197         "INIT2_STATE",
198         "NORMAL_STATE",
199         "LEAD_NORMAL1_STATE",
200         "LEAD_NORMAL2_STATE",
201         "ELECT1_STATE",
202         "ELECT2_STATE",
203         "REBUILD0_STATE",
204         "REBUILD1_STATE",
205         "REBUILD2_STATE",
206         "REBUILD3_STATE",
207         "REBUILD4_STATE",
208         "REBUILD5_STATE",
209         "LEAD_REBUILD1_STATE",
210         "LEAD_REBUILD2_STATE",
211         "LEAD_REBUILD3_STATE",
212         "LEAD_REBUILD4_STATE",
213         "EXIT1_STATE",
214         "EXIT2_STATE",
215 };
216
217 //note: { 0, 0 } means no timeout
218 struct timeval timeout_vals[NUM_STATES] =
219 {
220         { 0, 500000 }, //INIT1_STATE
221         { 0, 500000 }, //INIT2_STATE
222         { 0, 0 }, //NORMAL_STATE
223         { 0, 0 }, //LEAD_NORMAL1_STATE
224         { 3, 0 }, //LEAD_NORMAL2_STATE
225         { 1, 0 }, //ELECT1_STATE
226         { 1, 0 }, //ELECT2_STATE
227         { 0, 500000 }, //REBUILD0_STATE
228         { 0, 500000 }, //REBUILD1_STATE
229         { 10, 0 }, //REBUILD2_STATE
230         { 10, 0 }, //REBUILD3_STATE
231         { 10, 0 }, //REBUILD4_STATE
232         { 1, 0 }, //REBUILD5_STATE
233         { 1, 0 }, //LEAD_REBUILD1_STATE
234         { 1, 0 }, //LEAD_REBUILD2_STATE
235         { 10, 0 }, //LEAD_REBUILD3_STATE
236         { 10, 0 }, //LEAD_REBUILD4_STATE
237         { 0, 500000 }, //EXIT1_STATE
238         { 0, 0 } //EXIT2_STATE
239 };
240
241 int retry_vals[NUM_STATES] =
242 {
243         100, //INIT1_STATE
244         10, //INIT2_STATE
245         0, //NORMAL_STATE
246         0, //LEAD_NORMAL1_STATE
247         0, //LEAD_NORMAL2_STATE
248         10, //ELECT1_STATE
249         10, //ELECT2_STATE
250         10, //REBUILD0_STATE
251         10, //REBUILD1_STATE
252         0, //REBUILD2_STATE
253         0, //REBUILD3_STATE
254         0, //REBUILD4_STATE
255         10, //REBUILD5_STATE
256         10, //LEAD_REBUILD1_STATE
257         10, //LEAD_REBUILD2_STATE
258         10, //LEAD_REBUILD3_STATE
259         10, //LEAD_REBUILD4_STATE
260         10, //EXIT1_STATE
261         0 //EXIT2_STATE
262 };
263
264 FILE *logfile;
265 struct hostData myHostData;
266 pthread_t threadUdpListen;
267 pthread_t threadFillTask;
268 //status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error
269 int fillStatus;
270 struct pollfd udpPollSock;
271 unsigned int state;
272 unsigned int seed;
273 unsigned int leader;
274 unsigned int electionOriginator;
275 unsigned int electionParent;
276 unsigned int hostArraySize = 0;
277 struct hostData *hostArray = NULL;
278 unsigned int numBlocks = 0;
279 unsigned short *blockOwnerArray = NULL;
280 unsigned char *hostReplied = NULL;
281 pthread_mutex_t stateMutex;
282 pthread_cond_t stateCond;
283 chashtable_t *myHashTable;
284 unsigned int numHosts;
285 struct timeval timer;
286 int timerSet;
287 int timeoutCntr;
288
289 /*******************************************************************************
290 *                      Interface Function Definitions
291 *******************************************************************************/
292
293 void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity)
294 {
295         struct in_addr tmpAddr;
296         char filename[23] = "dht-";
297         struct sockaddr_in myAddr;
298         struct sockaddr_in seedAddr;
299         socklen_t socklen = sizeof(struct sockaddr_in);
300         char initMsg;
301
302         tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE));
303         strcat(filename, inet_ntoa(tmpAddr));
304         strcat(filename, ".log");
305         printf("log file: %s\n", filename);
306
307         logfile = fopen(filename, "w");
308         dhtLog("dhtInit(): inializing...\n");
309
310         myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE);
311         myHostData.maxKeyCapacity = maxKeyCapacity;
312
313         seed = seedIpAddr;
314         leader = 0;
315         electionOriginator = 0;
316         electionParent = 0;
317         hostArraySize = INIT_HOST_ALLOC;
318         hostArray = calloc(hostArraySize, sizeof(struct hostData));
319         hostReplied = calloc(hostArraySize, sizeof(unsigned char));
320         hostArray[0] = myHostData;
321         numHosts = 1;
322         numBlocks = INIT_NUM_BLOCKS;
323         blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
324         pthread_mutex_init(&stateMutex, NULL);
325         pthread_cond_init(&stateCond, NULL);
326         myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
327
328         udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
329         if (udpPollSock.fd < 0)
330                 perror("dhtInit():socket()");
331
332         udpPollSock.events = POLLIN;
333         
334         bzero(&myAddr, socklen);
335         myAddr.sin_family = AF_INET;
336         myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
337         myAddr.sin_port = htons(UDP_PORT);
338
339         if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0)
340                 perror("dhtInit():bind()");
341
342         if (seed == 0)
343         {
344                 dhtLog("I am the leader\n");
345                 leader = myHostData.ipAddr;
346                 setState(LEAD_NORMAL1_STATE);
347         }
348         else
349         {
350                 initMsg = WHO_IS_LEADER_CMD;
351                 udpSend(&initMsg, 1, seed);
352                 setState(INIT1_STATE);
353         }
354
355         if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0)
356                 dhtLog("dhtInit() - ERROR creating threadUdpListen\n");
357
358         return;
359 }
360
361 void dhtExit()
362 { //TODO: do this gracefully, wait for response from leader, etc.
363         char msg;
364
365         msg = LEAVE_REQ;
366         udpSend(&msg, 1, leader);
367         dhtLog("dhtExit(): cleaning up...\n");
368         pthread_cancel(threadUdpListen);
369         close(udpPollSock.fd);
370         free(hostArray);
371         free(hostReplied);
372         free(blockOwnerArray);
373         fclose(logfile);
374
375         return;
376 }
377
378 int dhtInsert(unsigned int key, unsigned int val)
379 {
380         struct sockaddr_in toAddr;
381         struct sockaddr_in fromAddr;
382         socklen_t socklen = sizeof(struct sockaddr_in);
383         struct pollfd pollsock;
384         char inBuffer[2];
385         char outBuffer[9];
386         ssize_t bytesRcvd;
387         int i;
388         int retval;
389         int status = -1;
390
391         bzero((char *)&toAddr, socklen);
392         toAddr.sin_family = AF_INET;
393         toAddr.sin_port = htons(UDP_PORT);
394
395         while (status != OPERATION_OK)
396         {
397                 pthread_mutex_lock(&stateMutex);
398                 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
399                                 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
400                                 || state == LEAD_REBUILD3_STATE))
401                         pthread_cond_wait(&stateCond, &stateMutex);
402                 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
403                 pthread_mutex_unlock(&stateMutex);
404
405                 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
406                 {
407                         perror("dhtInsert():socket()");
408                         return -1;
409                 }
410                 pollsock.events = POLLIN;
411
412                 outBuffer[0] = INSERT_CMD;
413                 write4(&outBuffer[1], key);
414                 write4(&outBuffer[5], val);
415
416                 for (i = 0; i < INSERT_RETRIES; i++)
417                 {
418                         if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr,
419                                 socklen) < 0)
420                         {
421                                 perror("dhtInsert():sendto()");
422                                 break;
423                         }
424                         retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS);
425                         if (retval < 0)
426                         {
427                                 perror("dhtInsert():poll()");
428                                 break;
429                         }
430                         if (retval > 0)
431                         {
432                                 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
433                                         (struct sockaddr *)&fromAddr, &socklen);
434                                 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
435                                         && fromAddr.sin_port == toAddr.sin_port
436                                         && bytesRcvd == 2 && inBuffer[0] == INSERT_RES)
437                                 {
438                                         status = inBuffer[1]; //status from remote host
439                                         break;
440                                 }
441                         }
442                 }
443                 if (status != OPERATION_OK)
444                 {
445                         pthread_mutex_lock(&stateMutex);
446                         setState(REBUILD0_STATE);
447                         outBuffer[0] = REBUILD_REQ;
448                         udpSend(outBuffer, 1, leader);
449                         pthread_mutex_unlock(&stateMutex);
450                 }
451         }
452
453         close(pollsock.fd);
454
455         return status;
456 }
457
458 int dhtInsertMult(unsigned int numKeys, unsigned int *keys,     unsigned int *vals)
459 {
460         int status;
461         int i;
462
463         status = 0;
464         for (i = 0; i < numKeys; i++)
465         {
466                 if (dhtInsert(keys[i], vals[i]) != 0)
467                         status = -1;
468         }
469         return status;
470 }
471
472 int dhtRemove(unsigned int key)
473 {
474         struct sockaddr_in toAddr;
475         struct sockaddr_in fromAddr;
476         socklen_t socklen = sizeof(struct sockaddr_in);
477         struct pollfd pollsock;
478         char inBuffer[2];
479         char outBuffer[5];
480         ssize_t bytesRcvd;
481         int i;
482         int retval;
483         int status = -1;
484
485         bzero((char *)&toAddr, socklen);
486         toAddr.sin_family = AF_INET;
487         toAddr.sin_port = htons(UDP_PORT);
488
489         while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
490         {
491                 pthread_mutex_lock(&stateMutex);
492                 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
493                                 || state == LEAD_NORMAL2_STATE))
494                         pthread_cond_wait(&stateCond, &stateMutex);
495                 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
496                 pthread_mutex_unlock(&stateMutex);
497
498                 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
499                 {
500                         perror("dhtRemove():socket()");
501                         return -1;
502                 }
503                 pollsock.events = POLLIN;
504
505                 outBuffer[0] = REMOVE_CMD;
506                 write4(&outBuffer[1], key);
507
508                 for (i = 0; i < REMOVE_RETRIES; i++)
509                 {
510                         if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
511                                 socklen) < 0)
512                         {
513                                 perror("dhtRemove():sendto()");
514                                 break;
515                         }
516                         retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS);
517                         if (retval < 0)
518                         {
519                                 perror("dhtRemove():poll()");
520                                 break;
521                         }
522                         if (retval > 0)
523                         {
524                                 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
525                                         (struct sockaddr *)&fromAddr, &socklen);
526                                 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
527                                         && fromAddr.sin_port == toAddr.sin_port
528                                         && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES)
529                                 {
530                                         status = inBuffer[1]; //status from remote host
531                                         break;
532                                 }
533                         }
534                 }
535                 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
536                 {
537                         pthread_mutex_lock(&stateMutex);
538                         setState(REBUILD0_STATE);
539                         outBuffer[0] = REBUILD_REQ;
540                         udpSend(outBuffer, 1, leader);
541                         pthread_mutex_unlock(&stateMutex);
542                 }
543         }
544
545         close(pollsock.fd);
546
547         return status;
548 }
549
550 int dhtRemoveMult(unsigned int numKeys, unsigned int *keys)
551 {
552         int status;
553         int i;
554
555         status = 0;
556         for (i = 0; i < numKeys; i++)
557         {
558                 if (dhtRemove(keys[i]) != 0)
559                         status = -1;
560         }
561         return status;
562 }
563
564 int dhtSearch(unsigned int key, unsigned int *val)
565 {
566         struct sockaddr_in toAddr;
567         struct sockaddr_in fromAddr;
568         socklen_t socklen = sizeof(struct sockaddr_in);
569         struct pollfd pollsock;
570         char inBuffer[6];
571         char outBuffer[5];
572         ssize_t bytesRcvd;
573         int i;
574         int retval;
575         int status = -1;
576
577         bzero((char *)&toAddr, socklen);
578         toAddr.sin_family = AF_INET;
579         toAddr.sin_port = htons(UDP_PORT);
580
581         while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
582         {
583                 pthread_mutex_lock(&stateMutex);
584                 while (numBlocks == 0)
585                         pthread_cond_wait(&stateCond, &stateMutex);
586                 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
587                 pthread_mutex_unlock(&stateMutex);
588
589                 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
590                 {
591                         perror("dhtSearch():socket()");
592                         return -1;
593                 }
594                 pollsock.events = POLLIN;
595
596                 outBuffer[0] = SEARCH_CMD;
597                 write4(&outBuffer[1], key);
598
599                 for (i = 0; i < SEARCH_RETRIES; i++)
600                 {
601                         if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
602                                 socklen) < 0)
603                         {
604                                 perror("dhtSearch():sendto()");
605                                 break;
606                         }
607                         retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS);
608                         if (retval < 0)
609                         {
610                                 perror("dhtSearch():poll()");
611                                 break;
612                         }
613                         if (retval > 0)
614                         {
615                                 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0,
616                                         (struct sockaddr *)&fromAddr, &socklen);
617                                 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
618                                         && fromAddr.sin_port == toAddr.sin_port
619                                         && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES)
620                                 {
621                                         status = inBuffer[1]; //status from remote host
622                                         *val = read4(&inBuffer[2]);
623                                         break;
624                                 }
625                         }
626                 }
627                 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
628                 {
629                         pthread_mutex_lock(&stateMutex);
630                         setState(REBUILD0_STATE);
631                         outBuffer[0] = REBUILD_REQ;
632                         udpSend(outBuffer, 1, leader);
633                         pthread_mutex_unlock(&stateMutex);
634                 }
635         }
636
637         close(pollsock.fd);
638
639         return status;
640 }
641
642 int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals)
643 {
644         int i;
645         int status = 0;
646         for (i = 0; i < numKeys; i++)
647         {
648                 if (dhtSearch(keys[i], &vals[i]) != 0)
649                         status = -1;
650         }
651         return status;
652 }
653
654 /*******************************************************************************
655 *                      Local Function Definitions
656 *******************************************************************************/
657
658 int msgSizeOk(unsigned char *msg, unsigned int size)
659 {
660         unsigned short tmpNumHosts;
661         unsigned short tmpNumBlocks;
662
663         if (size < 1)
664                 return 1;
665
666         switch (msg[0])
667         {
668                 case WHO_IS_LEADER_CMD:
669                 case LEAVE_REQ:
670                 case LEAVE_RES:
671                 case DHT_UPDATE_RES:
672                 case REBUILD_REQ:
673                 case REBUILD_CMD:
674                 case FILL_DHT_CMD:
675                 case FILL_DHT_RES:
676                 case RESUME_NORMAL_CMD:
677                 case RESUME_NORMAL_RES:
678                         return (size == 1);
679                 case INSERT_RES:
680                 case REMOVE_RES:
681                 case JOIN_RES:
682                         return (size == 2);
683                 case REMOVE_CMD:
684                 case SEARCH_CMD:
685                 case WHO_IS_LEADER_RES:
686                 case JOIN_REQ:
687                 case ELECT_LEADER_CMD:
688                         return (size == 5);
689                 case SEARCH_RES:
690                         return (size == 6);
691                 case INSERT_CMD:
692                         return (size == 9);
693                 case DHT_UPDATE_CMD:
694                         if (size < 5)
695                                 return 1;
696                         tmpNumHosts = read2(&msg[1]);
697                         tmpNumBlocks = read2(&msg[3]);
698                         return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks));
699                 case ELECT_LEADER_RES:
700                         if (size < 2)
701                                 return 1;
702                         if (msg[1] == 0xFF)
703                                 return (size == 2);
704                         if (size < 4)
705                                 return 1;
706                         tmpNumHosts = read2(&msg[2]);
707                         return (size == (4 + sizeof(struct hostData) * tmpNumHosts));
708                 case CONGRATS_CMD:
709                         if (size < 3)
710                                 return 1;
711                         tmpNumHosts = read2(&msg[1]);
712                         return (size == (3 + sizeof(struct hostData) * tmpNumHosts));
713                 default:
714                         return 1;
715         }
716 }
717
718 unsigned short read2(unsigned char *ptr)
719 {
720         unsigned short tmp = (ptr[1] << 8) | ptr[0];
721         return tmp;
722 }
723
724 unsigned int read4(unsigned char *ptr)
725 {
726         unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0];
727         return tmp;
728 }
729
730 void write2(unsigned char *ptr, unsigned short tmp)
731 {
732         ptr[1] = (tmp >> 8) & 0xFF;
733         ptr[0] = tmp & 0xFF;
734         return;
735 }
736
737 void write4(unsigned char *ptr, unsigned int tmp)
738 {
739         ptr[3] = (tmp >> 24) & 0xFF;
740         ptr[2] = (tmp >> 16) & 0xFF;
741         ptr[1] = (tmp >> 8) & 0xFF;
742         ptr[0] = tmp & 0xFF;
743         return;
744 }
745
746 unsigned int getMyIpAddr(const char *interfaceStr)
747 {       
748         int sock;
749         struct ifreq interfaceInfo;
750         struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
751
752         memset(&interfaceInfo, 0, sizeof(struct ifreq));
753
754         if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
755         {
756                 perror("getMyIpAddr():socket()");
757                 return 1;
758         }
759
760         strcpy(interfaceInfo.ifr_name, interfaceStr);
761         myAddr->sin_family = AF_INET;
762         
763         if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
764         {
765                 perror("getMyIpAddr():ioctl()");
766                 return 1;
767         }
768
769         return ntohl(myAddr->sin_addr.s_addr);
770 }
771
772 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp)
773 {
774         struct sockaddr_in peerAddr;
775         socklen_t socklen = sizeof(struct sockaddr_in);
776
777         bzero(&peerAddr, socklen);
778         peerAddr.sin_family = AF_INET;
779         peerAddr.sin_addr.s_addr = htonl(destIp);
780         peerAddr.sin_port = htons(UDP_PORT);
781
782         if (size >= 1)
783         {
784                 if (msg[0] < NUM_MSG_TYPES)
785                         dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]],
786                                 inet_ntoa(peerAddr.sin_addr), size);
787                 else
788                         dhtLog("udpSend(): sending unknown message to %s, %d bytes\n",
789                                 inet_ntoa(peerAddr.sin_addr), size);
790         }
791
792         if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr,
793                         socklen) < 0)
794         {
795                 perror("udpSend():sendto()");
796                 return -1;
797         }
798         
799         return 0;
800 }
801
802 int udpSendAll(unsigned char *msg, unsigned int size)
803 {
804         int i;
805         int status = 0;
806         for (i = 0; i < numHosts; i++)
807         {
808                 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
809                 {
810                         if (udpSend(msg, size, hostArray[i].ipAddr) != 0)
811                                 status = -1;
812                 }
813         }
814         return status;
815 }
816
817 //note: make sure this is only executed in a valid state, where numBlocks != 0
818 unsigned int hash(unsigned int x)
819 {
820         return (x % numBlocks);
821 }
822
823 //note: make sure this is only executed in a valid state, where these arrays
824 // are allocated and the index mappings are consistent
825 unsigned int getKeyOwner(unsigned int key)
826 {
827         return hostArray[blockOwnerArray[hash(key)]].ipAddr;
828 }
829
830 //sets state and timer, if applicable
831 void setState(unsigned int newState)
832 {
833         struct timeval now;
834         int i;
835         
836         gettimeofday(&now, NULL);
837         
838         if (newState >= NUM_STATES)
839         {
840                 dhtLog("setState(): ERROR: invalid state %d\n", newState);
841         }
842         else
843         {
844                 if (timeout_vals[newState].tv_sec == 0
845                         && timeout_vals[newState].tv_usec == 0)
846                 { //no timer
847                         timerSet = 0;
848                 }
849                 else
850                 {
851                         timeradd(&now, &timeout_vals[newState], &timer);
852                         timerSet = 1;
853                 }
854                 timeoutCntr = 0;
855                 state = newState;
856                 //TODO: only do this for states that require it
857                 for (i = 0; i < numHosts; i++)
858                         hostReplied[i] = 0;
859
860                 dhtLog("setState(): state set to %s\n", state_names[state]);
861         }
862
863         return;
864 }
865
866 //TODO: improve these simple and inefficient functions
867 int checkReplied(unsigned int ipAddr)
868 {
869         int i;
870
871         i = findHost(ipAddr);
872
873         if (i == -1)
874                 return -1;
875
876         hostReplied[i] = 1;
877
878         return 0;
879 }
880
881 int allReplied()
882 {
883         int i;
884
885         for (i = 0; i < numHosts; i++)
886                 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
887                         return 0;
888         
889         return 1;
890 }
891
892 int findHost(unsigned int ipAddr)
893 {
894         int i;
895
896         for (i = 0; i < numHosts; i++)
897                 if (hostArray[i].ipAddr == ipAddr)
898                         return i; //found, return index
899         
900         return -1; //not found
901 }
902
903 int removeHost(unsigned int ipAddr)
904 {
905         int i, j;
906
907         i = findHost(ipAddr);
908
909         if (i == -1)
910                 return -1;
911
912         for (j = 0; j < numBlocks; j++)
913         {
914                 if (blockOwnerArray[j] == i)
915                         blockOwnerArray[j] = 0; //TODO: is this what I want to have happen?
916                 else if (blockOwnerArray[j] > i)
917                         blockOwnerArray[j]--;
918         }
919
920         for (; i < numHosts - 1; i++)
921         {
922                 hostArray[i] = hostArray[i+1];
923                 hostReplied[i] = hostReplied[i+1];
924         }
925         numHosts--;
926
927         return 0;
928 }
929
930 void removeUnresponsiveHosts()
931 {
932         int i;
933
934         for (i = 0; i < numHosts; i++)
935         {
936                 if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr)
937                         removeHost(hostArray[i].ipAddr);
938         }
939 }
940
941 int addHost(struct hostData newHost)
942 {
943         struct hostData *newHostArray;
944         unsigned char *newHostReplied;
945         int i;
946         int j;
947
948         for (i = 0; i < numHosts; i++)
949         {
950                 if (hostArray[i].ipAddr == newHost.ipAddr)
951                 {
952                         hostArray[i] = newHost;
953                         hostReplied[i] = 0;
954                         return 0;
955                 }
956                 else if (hostArray[i].ipAddr > newHost.ipAddr)
957                 {
958                         if (numHosts == hostArraySize)
959                         {
960                                 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
961                                 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
962                                 memcpy(newHostArray, hostArray, (i * sizeof(struct hostData)));
963                                 memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char)));
964                                 newHostArray[i] = newHost;
965                                 newHostReplied[i] = 0;
966                                 memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) *
967                                         sizeof(struct hostData)));
968                                 memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) *
969                                         sizeof(unsigned char)));
970                                 free(hostArray);
971                                 free(hostReplied);
972                                 hostArray = newHostArray;
973                                 hostReplied = newHostReplied;
974                                 hostArraySize = 2 * hostArraySize;
975                         }
976                         else
977                         {
978                                 for (j = numHosts; j > i; j--)
979                                 {
980                                         hostArray[j] = hostArray[j-1];
981                                         hostReplied[j] = hostReplied[j-1];
982                                 }
983                                 hostArray[i] = newHost;
984                                 hostReplied[i] = 0;
985                         }
986                         for(j = 0; j < numBlocks; j++)
987                         {
988                                 if (blockOwnerArray[j] >= i)
989                                         blockOwnerArray[j]++;
990                         }
991                         numHosts++;
992                         return 1;
993                 }
994         }
995
996         //nothing greater, add to end
997         if (numHosts == hostArraySize)
998         {
999                 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
1000                 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
1001                 memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData)));
1002                 memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char)));
1003                 free(hostArray);
1004                 free(hostReplied);
1005                 hostArray = newHostArray;
1006                 hostReplied = newHostReplied;
1007                 hostArraySize = 2 * hostArraySize;
1008         }
1009
1010         hostArray[numHosts] = newHost;
1011         hostReplied[numHosts] = 0;
1012         numHosts++;
1013         return 1;
1014 }
1015
1016 void makeAssignments()
1017 {
1018         int i;
1019
1020         if (numBlocks < numHosts)
1021         {
1022                 free(blockOwnerArray);
1023                 while (numBlocks < numHosts)
1024                         numBlocks *= 2;
1025                 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
1026         }
1027
1028         for (i = 0; i < numBlocks; i++)
1029                 blockOwnerArray[i]  = i % numHosts;
1030
1031         return;
1032 }
1033
1034 void writeHostList()
1035 {
1036         int i;
1037         struct in_addr tmpAddr;
1038
1039         fprintf(logfile, "numHosts = %d\n", numHosts);
1040         for (i = 0; i < numHosts; i++)
1041         {
1042                 tmpAddr.s_addr = htonl(hostArray[i].ipAddr);
1043                 fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr),
1044                         hostArray[i].maxKeyCapacity);
1045         }
1046         return;
1047 }
1048
1049 void dhtLog(const char *format, ...)
1050 {
1051         va_list args;
1052 //      struct timeval now;
1053
1054 //      if (gettimeofday(&now, NULL) < 0)
1055 //      {       perror("dhtLog():gettimeofday()"); }
1056         va_start(args, format);
1057 //      if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
1058 //      {       perror("dhtLog():fprintf()"); }
1059         if (vfprintf(logfile, format, args) < 0)
1060         {       perror("dhtLog():vfprintf()"); }
1061         if (fflush(logfile) == EOF)
1062         {       perror("dhtLog():fflush()"); }
1063         va_end(args);
1064
1065         return;
1066 }
1067
1068 void *fillTask()
1069 {
1070         unsigned int *vals;
1071         unsigned int *keys;
1072         unsigned int numKeys;
1073         int i;
1074         
1075         vals = mhashGetKeys(&numKeys); //note: key of mhash is val of dht
1076         keys = calloc(numKeys, sizeof(unsigned int));
1077
1078         for (i = 0; i < numKeys; i++)
1079                 keys[i] = myHostData.ipAddr;
1080
1081         if (dhtInsertMult(numKeys, keys, vals) == 0)
1082                 fillStatus = 2;
1083         else
1084                 fillStatus = 3;
1085         
1086         pthread_exit(NULL);
1087 }
1088
1089 void *udpListen()
1090 {
1091         ssize_t bytesRcvd;
1092         struct sockaddr_in peerAddr;
1093         unsigned int peerIp;
1094         socklen_t socklen = sizeof(struct sockaddr_in);
1095         unsigned char inBuffer[MAX_MSG_SIZE];
1096         unsigned char outBuffer[MAX_MSG_SIZE];
1097         int pollret;
1098         struct timeval now;
1099         struct in_addr tmpAddr;
1100         struct hostData tmpHost;
1101         unsigned int tmpKey;
1102         unsigned int tmpVal;
1103         struct hostData *hostDataPtr;
1104         unsigned short *uShortPtr;
1105         unsigned int tmpUInt;
1106         unsigned int tmpUShort;
1107         int i;
1108         unsigned int oldState;
1109
1110         dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
1111
1112         while (1)
1113         {
1114                 pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
1115                 pthread_mutex_lock(&stateMutex);
1116                 oldState = state;
1117                 if (pollret < 0)
1118                 {
1119                         perror("udpListen():poll()");
1120                 }
1121                 else if (pollret > 0)
1122                 {
1123                         bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
1124                                 (struct sockaddr *)&peerAddr, &socklen);
1125                         if (bytesRcvd < 1)
1126                         {
1127                                 dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
1128                         }
1129                         else if (inBuffer[0] >= NUM_MSG_TYPES)
1130                         {
1131                                 dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
1132                         }
1133                         else if (!msgSizeOk(inBuffer, bytesRcvd))
1134                         {
1135                                 dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
1136                                         msg_types[inBuffer[0]], bytesRcvd);
1137                         }
1138                         else if (state == EXIT2_STATE)
1139                         {
1140                                 //do nothing
1141                         }
1142                         else if (state == INIT1_STATE)
1143                         { //after initialization with seed, do not proceed until seed replies
1144                                 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1145                                         msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1146                                 for (i = 0; i < bytesRcvd; i++)
1147                                         dhtLog(" %x", inBuffer[i]);
1148                                 dhtLog("\n");
1149                                 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1150                                 if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES)
1151                                 {
1152                                         tmpHost.ipAddr = peerIp;
1153                                         tmpHost.maxKeyCapacity = 0;
1154                                         addHost(tmpHost);
1155                                         writeHostList();
1156                                         leader = read4(&inBuffer[1]);
1157                                         tmpAddr.s_addr = htonl(leader);
1158                                         dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
1159                                         if (leader != 0)
1160                                         {
1161                                                 setState(INIT2_STATE);
1162                                                 outBuffer[0] = JOIN_REQ;
1163                                                 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1164                                                 udpSend(outBuffer, 5, leader);
1165                                         }
1166                                         else
1167                                         {
1168                                                 electionOriginator = myHostData.ipAddr;
1169                                                 setState(ELECT1_STATE);
1170                                                 outBuffer[0] = ELECT_LEADER_CMD;
1171                                                 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1172                                                 udpSendAll(outBuffer, 5);
1173                                         }
1174                                 }
1175                         }
1176                         else
1177                         {
1178                                 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1179                                         msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1180                                 for (i = 0; i < bytesRcvd; i++)
1181                                         dhtLog(" %x", inBuffer[i]);
1182                                 dhtLog("\n");
1183                                 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1184                                 switch (inBuffer[0])
1185                                 {
1186                                         case INSERT_CMD:
1187                                                 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1188                                                         || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
1189                                                         || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE)
1190                                                 {
1191                                                         tmpKey = read4(&inBuffer[1]);
1192                                                         tmpVal = read4(&inBuffer[5]);
1193                                                         outBuffer[0] = INSERT_RES;
1194                                                         if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1195                                                         {
1196                                                                 if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
1197                                                                         outBuffer[1] = OPERATION_OK;
1198                                                                 else
1199                                                                         outBuffer[1] = INTERNAL_ERROR;
1200                                                         }
1201                                                         else
1202                                                         {
1203                                                                 outBuffer[1] = NOT_KEY_OWNER;
1204                                                         }
1205                                                         //reply to client socket
1206                                                         sendto(udpPollSock.fd, outBuffer, 2, 0,
1207                                                                 (struct sockaddr *)&peerAddr, socklen);
1208                                                 }
1209                                                 break;
1210                                         case REMOVE_CMD:
1211                                                 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1212                                                         || state == LEAD_NORMAL2_STATE)
1213                                                 {
1214                                                         tmpKey = read4(&inBuffer[1]);
1215                                                         outBuffer[0] = REMOVE_RES;
1216                                                         if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1217                                                         {
1218                                                                 if (chashRemove(myHashTable, tmpKey) == 0)
1219                                                                         outBuffer[1] = OPERATION_OK;
1220                                                                 else
1221                                                                         outBuffer[1] = KEY_NOT_FOUND;
1222                                                         }
1223                                                         else
1224                                                         {
1225                                                                 outBuffer[1] = NOT_KEY_OWNER;
1226                                                         }
1227                                                         //reply to client socket
1228                                                         sendto(udpPollSock.fd, outBuffer, 2, 0,
1229                                                                 (struct sockaddr *)&peerAddr, socklen);
1230                                                 }
1231                                                 break;
1232                                         case SEARCH_CMD:
1233                                                 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1234                                                         || state == LEAD_NORMAL2_STATE)
1235                                                 {
1236                                                         tmpKey = read4(&inBuffer[1]);
1237                                                         outBuffer[0] = SEARCH_RES;
1238                                                         if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1239                                                         {
1240                                                                 if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0)
1241                                                                 {
1242                                                                         outBuffer[1] = OPERATION_OK;
1243                                                                         write4(&outBuffer[2], tmpVal);
1244                                                                 }
1245                                                                 else
1246                                                                 {
1247                                                                         outBuffer[1] = KEY_NOT_FOUND;
1248                                                                         write4(&outBuffer[2], 0);
1249                                                                 }
1250                                                         }
1251                                                         else
1252                                                         {
1253                                                                 outBuffer[1] = NOT_KEY_OWNER;
1254                                                                 write4(&outBuffer[2], 0);
1255                                                         }
1256                                                         //reply to client socket
1257                                                         sendto(udpPollSock.fd, outBuffer, 6, 0,
1258                                                                 (struct sockaddr *)&peerAddr, socklen);
1259                                                 }
1260                                                 break;
1261                                         case WHO_IS_LEADER_CMD:
1262                                                 tmpHost.ipAddr = peerIp;
1263                                                 tmpHost.maxKeyCapacity = 0;
1264                                                 addHost(tmpHost);
1265                                                 writeHostList();
1266                                                 outBuffer[0] = WHO_IS_LEADER_RES;
1267                                                 //leader == 0 means I don't know who it is
1268                                                 write4(&outBuffer[1], leader);
1269                                                 udpSend(outBuffer, 5, peerIp);
1270                                                 break;
1271                                         case JOIN_REQ:
1272                                                 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1273                                                 {
1274                                                         tmpHost.ipAddr = peerIp;
1275                                                         tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
1276                                                         addHost(tmpHost);
1277                                                         writeHostList();
1278                                                         if (state == LEAD_NORMAL1_STATE)
1279                                                                 setState(LEAD_NORMAL2_STATE);
1280                                                         outBuffer[0] = JOIN_RES;
1281                                                         outBuffer[1] = 0; //status, success
1282                                                         udpSend(outBuffer, 2, peerIp);
1283                                                 }
1284                                                 else if (state == LEAD_REBUILD1_STATE)
1285                                                 {
1286                                                         //note: I don't need to addHost().
1287                                                         checkReplied(peerIp);
1288                                                         outBuffer[0] = JOIN_RES;
1289                                                         outBuffer[1] = 0; //status, success
1290                                                         udpSend(outBuffer, 2, peerIp);
1291                                                         if (allReplied())
1292                                                         {
1293                                                                 makeAssignments();
1294                                                                 setState(LEAD_REBUILD2_STATE);
1295                                                                 outBuffer[0] = DHT_UPDATE_CMD;
1296                                                                 write2(&outBuffer[1], numHosts);
1297                                                                 write2(&outBuffer[3], numBlocks);
1298                                                                 memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
1299                                                                 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1300                                                                         blockOwnerArray, numBlocks*2);
1301                                                                 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1302                                                                         + 2 * numBlocks);
1303                                                         }
1304                                                 }
1305                                                 break;
1306                                         case JOIN_RES:
1307                                                 if (state == REBUILD1_STATE)
1308                                                 {
1309                                                         setState(REBUILD2_STATE);
1310                                                 }
1311                                                 else if (state == INIT2_STATE)
1312                                                 {
1313                                                         setState(NORMAL_STATE);
1314                                                 }
1315                                                 break;
1316                                         case LEAVE_REQ:
1317                                                 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1318                                                 { //TODO: make this graceful, instead of just rebuilding
1319                                                         removeHost(peerIp);
1320                                                         if (state != LEAD_NORMAL2_STATE)
1321                                                                 setState(LEAD_NORMAL2_STATE);
1322                                                 }
1323                                                 break;
1324                                         case DHT_UPDATE_CMD:
1325                                                 if (state == REBUILD2_STATE && peerIp == leader)
1326                                                 {
1327                                                         free(hostArray);
1328                                                         free(blockOwnerArray);
1329                                                         numHosts = read2(&inBuffer[1]);
1330                                                         numBlocks = read2(&inBuffer[3]);
1331                                                         while (hostArraySize < numHosts)
1332                                                                 hostArraySize *= 2;
1333                                                         hostArray = calloc(hostArraySize, sizeof(struct hostData));
1334                                                         blockOwnerArray = calloc(numBlocks, 2);
1335                                                         memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
1336                                                         memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
1337                                                         writeHostList();
1338                                                         setState(REBUILD3_STATE);
1339                                                         outBuffer[0] = DHT_UPDATE_RES;
1340                                                         udpSend(outBuffer, 1, peerIp);
1341                                                 }
1342                                                 break;
1343                                         case DHT_UPDATE_RES:
1344                                                 if (state == LEAD_REBUILD2_STATE)
1345                                                 {
1346                                                         checkReplied(peerIp);
1347                                                         if (allReplied())
1348                                                         {
1349                                                                 setState(LEAD_REBUILD3_STATE);
1350                                                                 outBuffer[0] = FILL_DHT_CMD;
1351                                                                 udpSendAll(outBuffer, 1);
1352                                                                 if (fillStatus != 0)
1353                                                                         dhtLog("udpListen(): ERROR: fillTask already running\n");
1354                                                                 fillStatus = 1;
1355                                                                 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1356                                                                         dhtLog("udpListen(): ERROR creating threadFillTask\n");
1357                                                         }
1358                                                 }
1359                                                 break;
1360                                         case ELECT_LEADER_CMD:
1361                                                 tmpUInt = read4(&inBuffer[1]);
1362                                                 if ((state == ELECT1_STATE || state == ELECT2_STATE)
1363                                                         && tmpUInt >= electionOriginator)
1364                                                 { //already participating in a higher-priority election
1365                                                         outBuffer[0] = ELECT_LEADER_RES;
1366                                                         outBuffer[1] = 0xFF;
1367                                                         udpSend(outBuffer, 2, peerIp);
1368                                                 }
1369                                                 else
1370                                                 { //join election
1371                                                         electionOriginator = tmpUInt;
1372                                                         electionParent = peerIp;
1373                                                         setState(ELECT1_STATE);
1374                                                         outBuffer[0] = ELECT_LEADER_CMD;
1375                                                         write4(&outBuffer[1], electionOriginator);
1376                                                         //don't bother forwarding the message to originator or parent
1377                                                         checkReplied(electionOriginator);
1378                                                         checkReplied(electionParent);
1379                                                         if (allReplied())
1380                                                         { //in case that is everybody I know of
1381                                                                 setState(ELECT2_STATE);
1382                                                                 outBuffer[0] = ELECT_LEADER_RES;
1383                                                                 outBuffer[1] = 0;
1384                                                                 write2(&outBuffer[2], numHosts);
1385                                                                 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1386                                                                         * numHosts);
1387                                                                 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1388                                                                         electionParent);
1389                                                         }
1390                                                         else
1391                                                         {
1392                                                                 udpSendAll(outBuffer, 5);
1393                                                         }
1394                                                 }
1395                                                 break;
1396                                         case ELECT_LEADER_RES:
1397                                                 if (state == ELECT1_STATE)
1398                                                 {
1399                                                         checkReplied(peerIp);
1400                                                         if (inBuffer[1] != 0xFF)
1401                                                         {
1402                                                                 tmpUShort = read2(&inBuffer[2]);
1403                                                                 hostDataPtr = (struct hostData *)&inBuffer[4];
1404                                                                 for (i = 0; i < tmpUShort; i++)
1405                                                                         addHost(hostDataPtr[i]);
1406                                                                 writeHostList();
1407                                                         }
1408                                                         if (allReplied())
1409                                                         {
1410                                                                 setState(ELECT2_STATE);
1411                                                                 if (electionOriginator == myHostData.ipAddr)
1412                                                                 {
1413                                                                         leader = hostArray[0].ipAddr;
1414                                                                         if (leader == myHostData.ipAddr)
1415                                                                         { //I am the leader
1416                                                                                 dhtLog("I am the leader!\n");
1417                                                                                 setState(LEAD_REBUILD1_STATE);
1418                                                                                 outBuffer[0] = REBUILD_CMD;
1419                                                                                 udpSendAll(outBuffer, 1);
1420                                                                         }
1421                                                                         else
1422                                                                         { //notify leader
1423                                                                                 outBuffer[0] = CONGRATS_CMD;
1424                                                                                 write2(&outBuffer[1], numHosts);
1425                                                                                 hostDataPtr = (struct hostData *)&outBuffer[3];
1426                                                                                 for (i = 0; i < numHosts; i++)
1427                                                                                         hostDataPtr[i] = hostArray[i];
1428                                                                                 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1429                                                                                         leader);
1430                                                                         }
1431                                                                 }
1432                                                                 else
1433                                                                 {
1434                                                                         outBuffer[0] = ELECT_LEADER_RES;
1435                                                                         outBuffer[1] = 0;
1436                                                                         write2(&outBuffer[2], numHosts);
1437                                                                         hostDataPtr = (struct hostData *)&outBuffer[4];
1438                                                                         for (i = 0; i < numHosts; i++)
1439                                                                                 hostDataPtr[i] = hostArray[i];
1440                                                                         udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1441                                                                                 electionParent);
1442                                                                 }
1443                                                         }
1444                                                 }
1445                                                 break;
1446                                         case CONGRATS_CMD:
1447                                                 if (state == ELECT2_STATE)
1448                                                 { //I am the leader
1449                                                         leader = myHostData.ipAddr;
1450                                                         dhtLog("I am the leader!\n");
1451                                                         tmpUShort = read2(&inBuffer[1]);
1452                                                         hostDataPtr = (struct hostData *)&inBuffer[3];
1453                                                         for (i = 0; i < tmpUShort; i++)
1454                                                                 addHost(hostDataPtr[i]);
1455                                                         writeHostList();
1456                                                         setState(LEAD_REBUILD1_STATE);
1457                                                         outBuffer[0] = REBUILD_CMD;
1458                                                         udpSendAll(outBuffer, 1);
1459                                                 }
1460                                                 break;
1461                                         case REBUILD_REQ:
1462                                                 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1463                                                 {
1464                                                         setState(LEAD_REBUILD1_STATE);
1465                                                         outBuffer[0] = REBUILD_CMD;
1466                                                         udpSendAll(outBuffer, 1);
1467                                                 }
1468                                                 break;
1469                                         case REBUILD_CMD:
1470                                                 leader = peerIp; //consider this a declaration of authority
1471                                                 setState(REBUILD1_STATE);
1472                                                 outBuffer[0] = JOIN_REQ;
1473                                                 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1474                                                 udpSend(outBuffer, 5, leader);
1475                                                 break;
1476                                         case FILL_DHT_CMD:
1477                                                 if (state == REBUILD3_STATE && peerIp == leader)
1478                                                 {
1479                                                         setState(REBUILD4_STATE);
1480                                                         if (fillStatus != 0)
1481                                                                 dhtLog("udpListen(): ERROR: fillTask already running\n");
1482                                                         fillStatus = 1;
1483                                                         if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1484                                                                 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1485                                                 }
1486                                                 break;
1487                                         case FILL_DHT_RES:
1488                                                 if (state == LEAD_REBUILD3_STATE)
1489                                                 {
1490                                                         checkReplied(peerIp);
1491                                                         if (allReplied() && fillStatus == 2)
1492                                                         {
1493                                                                 fillStatus = 0;
1494                                                                 setState(LEAD_REBUILD4_STATE);
1495                                                                 outBuffer[0] = RESUME_NORMAL_CMD;
1496                                                                 udpSendAll(outBuffer, 1);
1497                                                         }
1498                                                 }
1499                                                 break;
1500                                         case RESUME_NORMAL_CMD:
1501                                                 if (state == REBUILD5_STATE && peerIp == leader)
1502                                                 {
1503                                                         setState(NORMAL_STATE);
1504                                                         outBuffer[0] = RESUME_NORMAL_RES;
1505                                                         udpSend(outBuffer, 1, leader);
1506                                                 }
1507                                                 break;
1508                                         case RESUME_NORMAL_RES:
1509                                                 if (state == LEAD_REBUILD4_STATE)
1510                                                 {
1511                                                         checkReplied(peerIp);
1512                                                         if (allReplied())
1513                                                         {
1514                                                                 setState(LEAD_NORMAL1_STATE);
1515                                                         }
1516                                                 }
1517                                                 break;
1518                                 }
1519                         }
1520                 }
1521                 if (state == REBUILD4_STATE)
1522                 {
1523                         switch (fillStatus)
1524                         {
1525                                 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
1526                                         break;
1527                                 case 1: //do nothing
1528                                         break;
1529                                 case 2: //done filling the dht, notify leader
1530                                         fillStatus = 0;
1531                                         setState(REBUILD5_STATE);
1532                                         outBuffer[0] = FILL_DHT_RES;
1533                                         udpSend(outBuffer, 1, leader);
1534                                         break;
1535                                 case 3: //error encountered -> restart rebuild
1536                                         fillStatus = 0;
1537                                         setState(REBUILD0_STATE);
1538                                         outBuffer[0] = REBUILD_REQ;
1539                                         udpSend(outBuffer, 1, leader);
1540                                         break;
1541                         }
1542                 }
1543                 if (state == LEAD_REBUILD3_STATE)
1544                 {
1545                         switch (fillStatus)
1546                         {
1547                                 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
1548                                         break;
1549                                 case 1: //do nothing
1550                                         break;
1551                                 case 2: //I'm done, now is everybody else also done?
1552                                         if (allReplied())
1553                                         {
1554                                                 fillStatus = 0;
1555                                                 setState(LEAD_REBUILD4_STATE);
1556                                                 outBuffer[0] = RESUME_NORMAL_CMD;
1557                                                 udpSendAll(outBuffer, 1);
1558                                         }
1559                                         break;
1560                                 case 3: //error encountered -> restart rebuild
1561                                         fillStatus = 0;
1562                                         setState(LEAD_REBUILD1_STATE);
1563                                         outBuffer[0] = REBUILD_CMD;
1564                                         udpSendAll(outBuffer, 1);
1565                                         break;
1566                         }
1567                 }
1568                 if (timerSet)
1569                 {
1570                         gettimeofday(&now, NULL);
1571                         if (timercmp(&now, &timer, >))
1572                         {
1573                                 if (timeoutCntr < retry_vals[state])
1574                                 {
1575                                         timeoutCntr++;
1576                                         timeradd(&now, &timeout_vals[state], &timer);
1577                                         dhtLog("udpListen(): retry: %d\n", timeoutCntr);
1578                                         switch (state)
1579                                         {
1580                                                 case INIT1_STATE:
1581                                                         outBuffer[0] = WHO_IS_LEADER_CMD;
1582                                                         udpSend(outBuffer, 1, seed);
1583                                                         break;
1584                                                 case INIT2_STATE:
1585                                                         outBuffer[0] = JOIN_REQ;
1586                                                         write4(&outBuffer[1], myHostData.maxKeyCapacity);
1587                                                         udpSend(outBuffer, 5, leader);
1588                                                         break;
1589                                                 case ELECT1_STATE:
1590                                                         outBuffer[0] = ELECT_LEADER_CMD;
1591                                                         write4(&outBuffer[1], electionOriginator);
1592                                                         udpSendAll(outBuffer, 5);
1593                                                         break;
1594                                                 case ELECT2_STATE:
1595                                                         if (electionOriginator == myHostData.ipAddr)
1596                                                         { //retry notify leader
1597                                                                 outBuffer[0] = CONGRATS_CMD;
1598                                                                 write2(&outBuffer[1], numHosts);
1599                                                                 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1600                                                                         * numHosts);
1601                                                                 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1602                                                                         leader);
1603                                                         }
1604                                                         else
1605                                                         {
1606                                                                 outBuffer[0] = ELECT_LEADER_RES;
1607                                                                 outBuffer[1] = 0;
1608                                                                 write2(&outBuffer[2], numHosts);
1609                                                                 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1610                                                                         * numHosts);
1611                                                                 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1612                                                                         electionParent);
1613                                                         }
1614                                                         break;
1615                                                 case REBUILD0_STATE:
1616                                                         outBuffer[0] = REBUILD_REQ;
1617                                                         udpSend(outBuffer, 1, leader);
1618                                                         break;
1619                                                 case REBUILD1_STATE:
1620                                                         outBuffer[0] = JOIN_REQ;
1621                                                         write4(&outBuffer[1], myHostData.maxKeyCapacity);
1622                                                         udpSend(outBuffer, 5, leader);
1623                                                         break;
1624                                                 case REBUILD5_STATE:
1625                                                         outBuffer[0] = FILL_DHT_RES;
1626                                                         udpSend(outBuffer, 1, leader);
1627                                                         break;
1628                                                 case LEAD_REBUILD1_STATE:
1629                                                         outBuffer[0] = REBUILD_CMD;
1630                                                         udpSendAll(outBuffer, 1);
1631                                                         break;
1632                                                 case LEAD_REBUILD2_STATE:
1633                                                         outBuffer[0] = DHT_UPDATE_CMD;
1634                                                         write2(&outBuffer[1], numHosts);
1635                                                         write2(&outBuffer[3], numBlocks);
1636                                                         memcpy(&outBuffer[5], hostArray, numHosts
1637                                                                 * sizeof(struct hostData));
1638                                                         memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1639                                                                 blockOwnerArray, numBlocks*2);
1640                                                         udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1641                                                                 + 2 * numBlocks);
1642                                                         break;
1643                                                 case LEAD_REBUILD3_STATE:
1644                                                         outBuffer[0] = FILL_DHT_CMD;
1645                                                         udpSendAll(outBuffer, 1);
1646                                                         break;
1647                                                 case LEAD_REBUILD4_STATE:
1648                                                         outBuffer[0] = RESUME_NORMAL_CMD;
1649                                                         udpSendAll(outBuffer, 1);
1650                                                         break;
1651                                                 case EXIT1_STATE: //TODO...
1652                                                         break;
1653                                                 case NORMAL_STATE:
1654                                                 case LEAD_NORMAL1_STATE:
1655                                                 case LEAD_NORMAL2_STATE:
1656                                                 case REBUILD2_STATE:
1657                                                 case REBUILD3_STATE:
1658                                                 case REBUILD4_STATE:
1659                                                 case EXIT2_STATE: //we shouldn't get here
1660                                                         break;
1661                                         }
1662                                 }
1663                                 else
1664                                 {
1665                                         dhtLog("udpListen(): timed out in state %s after %d retries\n",
1666                                                 state_names[state], timeoutCntr);
1667                                         switch (state)
1668                                         {
1669                                                 case INIT1_STATE:
1670                                                         setState(EXIT2_STATE);
1671                                                         break;
1672                                                 case LEAD_NORMAL2_STATE:
1673                                                         setState(LEAD_REBUILD1_STATE);
1674                                                         outBuffer[0] = REBUILD_CMD;
1675                                                         udpSendAll(outBuffer, 1);
1676                                                         break;
1677                                                 case ELECT1_STATE:
1678                                                         dhtLog("removing unresponsive hosts, before:\n");
1679                                                         writeHostList();
1680                                                         removeUnresponsiveHosts();
1681                                                         dhtLog("after\n");
1682                                                         writeHostList();
1683                                                         setState(ELECT2_STATE);
1684                                                         if (electionOriginator == myHostData.ipAddr)
1685                                                         {
1686                                                                 leader = hostArray[0].ipAddr;
1687                                                                 if (leader == myHostData.ipAddr)
1688                                                                 { //I am the leader
1689                                                                         dhtLog("I am the leader!\n");
1690                                                                         setState(LEAD_REBUILD1_STATE);
1691                                                                         outBuffer[0] = REBUILD_CMD;
1692                                                                         udpSendAll(outBuffer, 1);
1693                                                                 }
1694                                                                 else
1695                                                                 { //notify leader
1696                                                                         outBuffer[0] = CONGRATS_CMD;
1697                                                                         write2(&outBuffer[1], numHosts);
1698                                                                         memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1699                                                                                 * numHosts);
1700                                                                         udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1701                                                                                 leader);
1702                                                                 }
1703                                                         }
1704                                                         else
1705                                                         {
1706                                                                 outBuffer[0] = ELECT_LEADER_RES;
1707                                                                 outBuffer[1] = 0;
1708                                                                 write2(&outBuffer[2], numHosts);
1709                                                                 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1710                                                                         * numHosts);
1711                                                                 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1712                                                                         electionParent);
1713                                                         }
1714                                                         break;
1715                                                 case INIT2_STATE:
1716                                                 case ELECT2_STATE:
1717                                                 case REBUILD0_STATE:
1718                                                 case REBUILD1_STATE:
1719                                                 case REBUILD2_STATE:
1720                                                 case REBUILD3_STATE:
1721                                                 case REBUILD4_STATE:
1722                                                 case REBUILD5_STATE:
1723                                                 case LEAD_REBUILD1_STATE:
1724                                                 case LEAD_REBUILD2_STATE:
1725                                                 case LEAD_REBUILD3_STATE:
1726                                                 case LEAD_REBUILD4_STATE:
1727                                                         //start election
1728                                                         electionOriginator = myHostData.ipAddr;
1729                                                         setState(ELECT1_STATE);
1730                                                         outBuffer[0] = ELECT_LEADER_CMD;
1731                                                         write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1732                                                         udpSendAll(outBuffer, 5);
1733                                                         break;
1734                                                 case EXIT1_STATE:
1735                                                         setState(EXIT2_STATE);
1736                                                         break;
1737                                                 case NORMAL_STATE:
1738                                                 case LEAD_NORMAL1_STATE:
1739                                                 case EXIT2_STATE: //we shouldn't get here
1740                                                         break;
1741                                         }
1742                                 }
1743                         }
1744                 }
1745                 if (state != oldState)
1746                         pthread_cond_broadcast(&stateCond);
1747                 pthread_mutex_unlock(&stateMutex);
1748         }
1749 }
1750