change
[IRC.git] / Robust / src / Runtime / DSTM / interface / dht.c
1 /*******************************************************************************
2 *                                 dht.c
3 *
4 *  High-performance Distributed Hash Table for finding the location of objects
5 * in a Distributed Shared Transactional Memory system.
6 *
7 * Creator: Erik Rubow
8 *
9 * TODO:
10 * 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key
11 * counterparts repeatedly, define some new messages to handle it more
12 * efficiently.
13 * 2) Improve the efficiency of functions that work with hostArray, hostReplied,
14 * and blockOwnerArray.
15 * 3) Currently a join or leave causes a rebuild of the entire hash table.
16 * Implement more graceful join and leave procedures.
17 * 4) Fine tune timeout values for performance, possibly implement a backoff
18 * algorithm to prevent overloading the network.
19 * 5) Whatever else I'm forgetting
20 *
21 *******************************************************************************/
22 /*******************************************************************************
23 *                              Includes
24 *******************************************************************************/
25
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/ioctl.h>
31 #include <stdio.h>
32 #include <stdarg.h>
33 #include <string.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <pthread.h>
37 #include <sys/time.h>
38 #include <sys/poll.h>
39 #include <netdb.h>
40 #include <net/if.h>
41 #include <linux/sockios.h>
42 #include <sys/queue.h>
43 #include "dht.h"
44 #include "clookup.h" //this works for now, do we need anything better?
45 #include "mlookup.h"
46
47 /*******************************************************************************
48 *                           Local Defines, Structs
49 *******************************************************************************/
50
51 #define MAX_MSG_SIZE 1500
52 #define UDP_PORT 2157
53 #define INIT_HOST_ALLOC 3
54 #define INIT_NUM_BLOCKS 16
55 #define DEFAULT_INTERFACE "eth0"
56 #define TIMEOUT_PERIOD 100
57 #define INSERT_TIMEOUT_MS 500
58 #define INSERT_RETRIES 50
59 #define REMOVE_TIMEOUT_MS 500
60 #define REMOVE_RETRIES 50
61 #define SEARCH_TIMEOUT_MS 500
62 #define SEARCH_RETRIES 50
63
64 //message types
65 //make sure this matches msg_types global var
66 enum
67 {
68   INSERT_CMD,
69   INSERT_RES,
70   REMOVE_CMD,
71   REMOVE_RES,
72   SEARCH_CMD,
73   SEARCH_RES,
74   WHO_IS_LEADER_CMD,
75   WHO_IS_LEADER_RES,
76   JOIN_REQ,
77   JOIN_RES,
78   LEAVE_REQ,
79   LEAVE_RES,
80   DHT_UPDATE_CMD,
81   DHT_UPDATE_RES,
82   ELECT_LEADER_CMD,
83   ELECT_LEADER_RES,
84   CONGRATS_CMD,
85   REBUILD_REQ,
86   REBUILD_CMD,
87   FILL_DHT_CMD,
88   FILL_DHT_RES,
89   RESUME_NORMAL_CMD,
90   RESUME_NORMAL_RES,
91   NUM_MSG_TYPES
92 };
93
94 //states
95 //make sure this matches state_names, timeout_vals, and retry_vals global vars
96 enum
97 {
98   INIT1_STATE,
99   INIT2_STATE,
100   NORMAL_STATE,
101   LEAD_NORMAL1_STATE,
102   LEAD_NORMAL2_STATE,
103   ELECT1_STATE,
104   ELECT2_STATE,
105   REBUILD0_STATE,
106   REBUILD1_STATE,
107   REBUILD2_STATE,
108   REBUILD3_STATE,
109   REBUILD4_STATE,
110   REBUILD5_STATE,
111   LEAD_REBUILD1_STATE,
112   LEAD_REBUILD2_STATE,
113   LEAD_REBUILD3_STATE,
114   LEAD_REBUILD4_STATE,
115   EXIT1_STATE,
116   EXIT2_STATE,
117   NUM_STATES
118 };
119
120 //status codes
121 enum
122 {
123   OPERATION_OK,
124   KEY_NOT_FOUND,
125   NOT_KEY_OWNER,
126   NOT_LEADER,
127   INTERNAL_ERROR
128 };
129
130 struct hostData {
131   unsigned int ipAddr;
132   unsigned int maxKeyCapacity;
133 };
134
135 /*******************************************************************************
136 *                         Local Function Prototypes
137 *******************************************************************************/
138
139 int msgSizeOk(unsigned char *msg, unsigned int size);
140 unsigned short read2(unsigned char *msg);
141 unsigned int read4(unsigned char *msg);
142 void write2(unsigned char *ptr, unsigned short tmp);
143 void write4(unsigned char *ptr, unsigned int tmp);
144 unsigned int getMyIpAddr(const char *interfaceStr);
145 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp);
146 int udpSendAll(unsigned char *msg, unsigned int size);
147 unsigned int hash(unsigned int x);
148 unsigned int getKeyOwner(unsigned int key);
149 void setState(unsigned int newState);
150 void makeAssignments();
151 int addHost(struct hostData newHost);
152 int removeHost(unsigned int ipAddr);
153 void removeUnresponsiveHosts();
154 int checkReplied(unsigned int ipAddr);
155 int allReplied();
156 void writeHostList();
157 void dhtLog(const char *format, ...);
158 void *fillTask();
159 void *udpListen();
160
161 /*******************************************************************************
162 *                           Global Variables
163 *******************************************************************************/
164
165 //make sure this matches enumeration above
166 const char *msg_types[NUM_MSG_TYPES] =
167 {
168   "INSERT_CMD",
169   "INSERT_RES",
170   "REMOVE_CMD",
171   "REMOVE_RES",
172   "SEARCH_CMD",
173   "SEARCH_RES",
174   "WHO_IS_LEADER_CMD",
175   "WHO_IS_LEADER_RES",
176   "JOIN_REQ",
177   "JOIN_RES",
178   "LEAVE_REQ",
179   "LEAVE_RES",
180   "DHT_UPDATE_CMD",
181   "DHT_UPDATE_RES",
182   "ELECT_LEADER_CMD",
183   "ELECT_LEADER_RES",
184   "CONGRATS_CMD",
185   "REBUILD_REQ",
186   "REBUILD_CMD",
187   "FILL_DHT_CMD",
188   "FILL_DHT_RES",
189   "RESUME_NORMAL_CMD",
190   "RESUME_NORMAL_RES"
191 };
192
193 const char *state_names[NUM_STATES] =
194 {
195   "INIT1_STATE",
196   "INIT2_STATE",
197   "NORMAL_STATE",
198   "LEAD_NORMAL1_STATE",
199   "LEAD_NORMAL2_STATE",
200   "ELECT1_STATE",
201   "ELECT2_STATE",
202   "REBUILD0_STATE",
203   "REBUILD1_STATE",
204   "REBUILD2_STATE",
205   "REBUILD3_STATE",
206   "REBUILD4_STATE",
207   "REBUILD5_STATE",
208   "LEAD_REBUILD1_STATE",
209   "LEAD_REBUILD2_STATE",
210   "LEAD_REBUILD3_STATE",
211   "LEAD_REBUILD4_STATE",
212   "EXIT1_STATE",
213   "EXIT2_STATE",
214 };
215
216 //note: { 0, 0 } means no timeout
217 struct timeval timeout_vals[NUM_STATES] ={
218   { 0, 500000 },       //INIT1_STATE
219   { 0, 500000 },       //INIT2_STATE
220   { 0, 0 },       //NORMAL_STATE
221   { 0, 0 },       //LEAD_NORMAL1_STATE
222   { 3, 0 },       //LEAD_NORMAL2_STATE
223   { 1, 0 },       //ELECT1_STATE
224   { 1, 0 },       //ELECT2_STATE
225   { 0, 500000 },       //REBUILD0_STATE
226   { 0, 500000 },       //REBUILD1_STATE
227   { 10, 0 },       //REBUILD2_STATE
228   { 10, 0 },       //REBUILD3_STATE
229   { 10, 0 },       //REBUILD4_STATE
230   { 1, 0 },       //REBUILD5_STATE
231   { 1, 0 },       //LEAD_REBUILD1_STATE
232   { 1, 0 },       //LEAD_REBUILD2_STATE
233   { 10, 0 },       //LEAD_REBUILD3_STATE
234   { 10, 0 },       //LEAD_REBUILD4_STATE
235   { 0, 500000 },       //EXIT1_STATE
236   { 0, 0 }       //EXIT2_STATE
237 };
238
239 int retry_vals[NUM_STATES] =
240 {
241   100,       //INIT1_STATE
242   10,       //INIT2_STATE
243   0,       //NORMAL_STATE
244   0,       //LEAD_NORMAL1_STATE
245   0,       //LEAD_NORMAL2_STATE
246   10,       //ELECT1_STATE
247   10,       //ELECT2_STATE
248   10,       //REBUILD0_STATE
249   10,       //REBUILD1_STATE
250   0,       //REBUILD2_STATE
251   0,       //REBUILD3_STATE
252   0,       //REBUILD4_STATE
253   10,       //REBUILD5_STATE
254   10,       //LEAD_REBUILD1_STATE
255   10,       //LEAD_REBUILD2_STATE
256   10,       //LEAD_REBUILD3_STATE
257   10,       //LEAD_REBUILD4_STATE
258   10,       //EXIT1_STATE
259   0       //EXIT2_STATE
260 };
261
262 FILE *logfile;
263 struct hostData myHostData;
264 pthread_t threadUdpListen;
265 pthread_t threadFillTask;
266 //status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error
267 int fillStatus;
268 struct pollfd udpPollSock;
269 unsigned int state;
270 unsigned int seed;
271 unsigned int leader;
272 unsigned int electionOriginator;
273 unsigned int electionParent;
274 unsigned int hostArraySize = 0;
275 struct hostData *hostArray = NULL;
276 unsigned int numBlocks = 0;
277 unsigned short *blockOwnerArray = NULL;
278 unsigned char *hostReplied = NULL;
279 pthread_mutex_t stateMutex;
280 pthread_cond_t stateCond;
281 chashtable_t *myHashTable;
282 unsigned int numHosts;
283 struct timeval timer;
284 int timerSet;
285 int timeoutCntr;
286
287 /*******************************************************************************
288 *                      Interface Function Definitions
289 *******************************************************************************/
290
291 void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity) {
292   struct in_addr tmpAddr;
293   char filename[23] = "dht-";
294   struct sockaddr_in myAddr;
295   struct sockaddr_in seedAddr;
296   socklen_t socklen = sizeof(struct sockaddr_in);
297   char initMsg;
298
299   tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE));
300   strcat(filename, inet_ntoa(tmpAddr));
301   strcat(filename, ".log");
302   printf("log file: %s\n", filename);
303
304   logfile = fopen(filename, "w");
305   dhtLog("dhtInit(): inializing...\n");
306
307   myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE);
308   myHostData.maxKeyCapacity = maxKeyCapacity;
309
310   seed = seedIpAddr;
311   leader = 0;
312   electionOriginator = 0;
313   electionParent = 0;
314   hostArraySize = INIT_HOST_ALLOC;
315   hostArray = calloc(hostArraySize, sizeof(struct hostData));
316   hostReplied = calloc(hostArraySize, sizeof(unsigned char));
317   hostArray[0] = myHostData;
318   numHosts = 1;
319   numBlocks = INIT_NUM_BLOCKS;
320   blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
321   pthread_mutex_init(&stateMutex, NULL);
322   pthread_cond_init(&stateCond, NULL);
323   myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
324
325   udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
326   if (udpPollSock.fd < 0)
327     perror("dhtInit():socket()");
328
329   udpPollSock.events = POLLIN;
330
331   bzero(&myAddr, socklen);
332   myAddr.sin_family = AF_INET;
333   myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
334   myAddr.sin_port = htons(UDP_PORT);
335
336   if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0)
337     perror("dhtInit():bind()");
338
339   if (seed == 0) {
340     dhtLog("I am the leader\n");
341     leader = myHostData.ipAddr;
342     setState(LEAD_NORMAL1_STATE);
343   } else
344   {
345     initMsg = WHO_IS_LEADER_CMD;
346     udpSend(&initMsg, 1, seed);
347     setState(INIT1_STATE);
348   }
349
350   if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0)
351     dhtLog("dhtInit() - ERROR creating threadUdpListen\n");
352
353   return;
354 }
355
356 void dhtExit() { //TODO: do this gracefully, wait for response from leader, etc.
357   char msg;
358
359   msg = LEAVE_REQ;
360   udpSend(&msg, 1, leader);
361   dhtLog("dhtExit(): cleaning up...\n");
362   pthread_cancel(threadUdpListen);
363   close(udpPollSock.fd);
364   free(hostArray);
365   free(hostReplied);
366   free(blockOwnerArray);
367   fclose(logfile);
368
369   return;
370 }
371
372 int dhtInsert(unsigned int key, unsigned int val) {
373   struct sockaddr_in toAddr;
374   struct sockaddr_in fromAddr;
375   socklen_t socklen = sizeof(struct sockaddr_in);
376   struct pollfd pollsock;
377   char inBuffer[2];
378   char outBuffer[9];
379   ssize_t bytesRcvd;
380   int i;
381   int retval;
382   int status = -1;
383
384   bzero((char *)&toAddr, socklen);
385   toAddr.sin_family = AF_INET;
386   toAddr.sin_port = htons(UDP_PORT);
387
388   while (status != OPERATION_OK) {
389     pthread_mutex_lock(&stateMutex);
390     while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
391              || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
392              || state == LEAD_REBUILD3_STATE))
393       pthread_cond_wait(&stateCond, &stateMutex);
394     toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
395     pthread_mutex_unlock(&stateMutex);
396
397     if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
398       perror("dhtInsert():socket()");
399       return -1;
400     }
401     pollsock.events = POLLIN;
402
403     outBuffer[0] = INSERT_CMD;
404     write4(&outBuffer[1], key);
405     write4(&outBuffer[5], val);
406
407     for (i = 0; i < INSERT_RETRIES; i++) {
408       if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr,
409                  socklen) < 0) {
410         perror("dhtInsert():sendto()");
411         break;
412       }
413       retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS);
414       if (retval < 0) {
415         perror("dhtInsert():poll()");
416         break;
417       }
418       if (retval > 0) {
419         bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
420                              (struct sockaddr *)&fromAddr, &socklen);
421         if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
422             && fromAddr.sin_port == toAddr.sin_port
423             && bytesRcvd == 2 && inBuffer[0] == INSERT_RES) {
424           status = inBuffer[1];                               //status from remote host
425           break;
426         }
427       }
428     }
429     if (status != OPERATION_OK) {
430       pthread_mutex_lock(&stateMutex);
431       setState(REBUILD0_STATE);
432       outBuffer[0] = REBUILD_REQ;
433       udpSend(outBuffer, 1, leader);
434       pthread_mutex_unlock(&stateMutex);
435     }
436   }
437
438   close(pollsock.fd);
439
440   return status;
441 }
442
443 int dhtInsertMult(unsigned int numKeys, unsigned int *keys,     unsigned int *vals) {
444   int status;
445   int i;
446
447   status = 0;
448   for (i = 0; i < numKeys; i++) {
449     if (dhtInsert(keys[i], vals[i]) != 0)
450       status = -1;
451   }
452   return status;
453 }
454
455 int dhtRemove(unsigned int key) {
456   struct sockaddr_in toAddr;
457   struct sockaddr_in fromAddr;
458   socklen_t socklen = sizeof(struct sockaddr_in);
459   struct pollfd pollsock;
460   char inBuffer[2];
461   char outBuffer[5];
462   ssize_t bytesRcvd;
463   int i;
464   int retval;
465   int status = -1;
466
467   bzero((char *)&toAddr, socklen);
468   toAddr.sin_family = AF_INET;
469   toAddr.sin_port = htons(UDP_PORT);
470
471   while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) {
472     pthread_mutex_lock(&stateMutex);
473     while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
474              || state == LEAD_NORMAL2_STATE))
475       pthread_cond_wait(&stateCond, &stateMutex);
476     toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
477     pthread_mutex_unlock(&stateMutex);
478
479     if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
480       perror("dhtRemove():socket()");
481       return -1;
482     }
483     pollsock.events = POLLIN;
484
485     outBuffer[0] = REMOVE_CMD;
486     write4(&outBuffer[1], key);
487
488     for (i = 0; i < REMOVE_RETRIES; i++) {
489       if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
490                  socklen) < 0) {
491         perror("dhtRemove():sendto()");
492         break;
493       }
494       retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS);
495       if (retval < 0) {
496         perror("dhtRemove():poll()");
497         break;
498       }
499       if (retval > 0) {
500         bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
501                              (struct sockaddr *)&fromAddr, &socklen);
502         if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
503             && fromAddr.sin_port == toAddr.sin_port
504             && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES) {
505           status = inBuffer[1];                               //status from remote host
506           break;
507         }
508       }
509     }
510     if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) {
511       pthread_mutex_lock(&stateMutex);
512       setState(REBUILD0_STATE);
513       outBuffer[0] = REBUILD_REQ;
514       udpSend(outBuffer, 1, leader);
515       pthread_mutex_unlock(&stateMutex);
516     }
517   }
518
519   close(pollsock.fd);
520
521   return status;
522 }
523
524 int dhtRemoveMult(unsigned int numKeys, unsigned int *keys) {
525   int status;
526   int i;
527
528   status = 0;
529   for (i = 0; i < numKeys; i++) {
530     if (dhtRemove(keys[i]) != 0)
531       status = -1;
532   }
533   return status;
534 }
535
536 int dhtSearch(unsigned int key, unsigned int *val) {
537   struct sockaddr_in toAddr;
538   struct sockaddr_in fromAddr;
539   socklen_t socklen = sizeof(struct sockaddr_in);
540   struct pollfd pollsock;
541   char inBuffer[6];
542   char outBuffer[5];
543   ssize_t bytesRcvd;
544   int i;
545   int retval;
546   int status = -1;
547
548   bzero((char *)&toAddr, socklen);
549   toAddr.sin_family = AF_INET;
550   toAddr.sin_port = htons(UDP_PORT);
551
552   while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) {
553     pthread_mutex_lock(&stateMutex);
554     while (numBlocks == 0)
555       pthread_cond_wait(&stateCond, &stateMutex);
556     toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
557     pthread_mutex_unlock(&stateMutex);
558
559     if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
560       perror("dhtSearch():socket()");
561       return -1;
562     }
563     pollsock.events = POLLIN;
564
565     outBuffer[0] = SEARCH_CMD;
566     write4(&outBuffer[1], key);
567
568     for (i = 0; i < SEARCH_RETRIES; i++) {
569       if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
570                  socklen) < 0) {
571         perror("dhtSearch():sendto()");
572         break;
573       }
574       retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS);
575       if (retval < 0) {
576         perror("dhtSearch():poll()");
577         break;
578       }
579       if (retval > 0) {
580         bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0,
581                              (struct sockaddr *)&fromAddr, &socklen);
582         if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
583             && fromAddr.sin_port == toAddr.sin_port
584             && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES) {
585           status = inBuffer[1];                               //status from remote host
586           *val = read4(&inBuffer[2]);
587           break;
588         }
589       }
590     }
591     if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) {
592       pthread_mutex_lock(&stateMutex);
593       setState(REBUILD0_STATE);
594       outBuffer[0] = REBUILD_REQ;
595       udpSend(outBuffer, 1, leader);
596       pthread_mutex_unlock(&stateMutex);
597     }
598   }
599
600   close(pollsock.fd);
601
602   return status;
603 }
604
605 int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals) {
606   int i;
607   int status = 0;
608   for (i = 0; i < numKeys; i++) {
609     if (dhtSearch(keys[i], &vals[i]) != 0)
610       status = -1;
611   }
612   return status;
613 }
614
615 /*******************************************************************************
616 *                      Local Function Definitions
617 *******************************************************************************/
618
619 int msgSizeOk(unsigned char *msg, unsigned int size) {
620   unsigned short tmpNumHosts;
621   unsigned short tmpNumBlocks;
622
623   if (size < 1)
624     return 1;
625
626   switch (msg[0]) {
627   case WHO_IS_LEADER_CMD:
628   case LEAVE_REQ:
629   case LEAVE_RES:
630   case DHT_UPDATE_RES:
631   case REBUILD_REQ:
632   case REBUILD_CMD:
633   case FILL_DHT_CMD:
634   case FILL_DHT_RES:
635   case RESUME_NORMAL_CMD:
636   case RESUME_NORMAL_RES:
637     return (size == 1);
638
639   case INSERT_RES:
640   case REMOVE_RES:
641   case JOIN_RES:
642     return (size == 2);
643
644   case REMOVE_CMD:
645   case SEARCH_CMD:
646   case WHO_IS_LEADER_RES:
647   case JOIN_REQ:
648   case ELECT_LEADER_CMD:
649     return (size == 5);
650
651   case SEARCH_RES:
652     return (size == 6);
653
654   case INSERT_CMD:
655     return (size == 9);
656
657   case DHT_UPDATE_CMD:
658     if (size < 5)
659       return 1;
660     tmpNumHosts = read2(&msg[1]);
661     tmpNumBlocks = read2(&msg[3]);
662     return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks));
663
664   case ELECT_LEADER_RES:
665     if (size < 2)
666       return 1;
667     if (msg[1] == 0xFF)
668       return (size == 2);
669     if (size < 4)
670       return 1;
671     tmpNumHosts = read2(&msg[2]);
672     return (size == (4 + sizeof(struct hostData) * tmpNumHosts));
673
674   case CONGRATS_CMD:
675     if (size < 3)
676       return 1;
677     tmpNumHosts = read2(&msg[1]);
678     return (size == (3 + sizeof(struct hostData) * tmpNumHosts));
679
680   default:
681     return 1;
682   }
683 }
684
685 unsigned short read2(unsigned char *ptr) {
686   unsigned short tmp = (ptr[1] << 8) | ptr[0];
687   return tmp;
688 }
689
690 unsigned int read4(unsigned char *ptr) {
691   unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0];
692   return tmp;
693 }
694
695 void write2(unsigned char *ptr, unsigned short tmp) {
696   ptr[1] = (tmp >> 8) & 0xFF;
697   ptr[0] = tmp & 0xFF;
698   return;
699 }
700
701 void write4(unsigned char *ptr, unsigned int tmp) {
702   ptr[3] = (tmp >> 24) & 0xFF;
703   ptr[2] = (tmp >> 16) & 0xFF;
704   ptr[1] = (tmp >> 8) & 0xFF;
705   ptr[0] = tmp & 0xFF;
706   return;
707 }
708
709 unsigned int getMyIpAddr(const char *interfaceStr) {
710   int sock;
711   struct ifreq interfaceInfo;
712   struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
713
714   memset(&interfaceInfo, 0, sizeof(struct ifreq));
715
716   if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
717     perror("getMyIpAddr():socket()");
718     return 1;
719   }
720
721   strcpy(interfaceInfo.ifr_name, interfaceStr);
722   myAddr->sin_family = AF_INET;
723
724   if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0) {
725     perror("getMyIpAddr():ioctl()");
726     return 1;
727   }
728
729   return ntohl(myAddr->sin_addr.s_addr);
730 }
731
732 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp) {
733   struct sockaddr_in peerAddr;
734   socklen_t socklen = sizeof(struct sockaddr_in);
735
736   bzero(&peerAddr, socklen);
737   peerAddr.sin_family = AF_INET;
738   peerAddr.sin_addr.s_addr = htonl(destIp);
739   peerAddr.sin_port = htons(UDP_PORT);
740
741   if (size >= 1) {
742     if (msg[0] < NUM_MSG_TYPES)
743       dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]],
744              inet_ntoa(peerAddr.sin_addr), size);
745     else
746       dhtLog("udpSend(): sending unknown message to %s, %d bytes\n",
747              inet_ntoa(peerAddr.sin_addr), size);
748   }
749
750   if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr,
751              socklen) < 0) {
752     perror("udpSend():sendto()");
753     return -1;
754   }
755
756   return 0;
757 }
758
759 int udpSendAll(unsigned char *msg, unsigned int size) {
760   int i;
761   int status = 0;
762   for (i = 0; i < numHosts; i++) {
763     if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr)) {
764       if (udpSend(msg, size, hostArray[i].ipAddr) != 0)
765         status = -1;
766     }
767   }
768   return status;
769 }
770
771 //note: make sure this is only executed in a valid state, where numBlocks != 0
772 unsigned int hash(unsigned int x) {
773   return (x % numBlocks);
774 }
775
776 //note: make sure this is only executed in a valid state, where these arrays
777 // are allocated and the index mappings are consistent
778 unsigned int getKeyOwner(unsigned int key) {
779   return hostArray[blockOwnerArray[hash(key)]].ipAddr;
780 }
781
782 //sets state and timer, if applicable
783 void setState(unsigned int newState) {
784   struct timeval now;
785   int i;
786
787   gettimeofday(&now, NULL);
788
789   if (newState >= NUM_STATES) {
790     dhtLog("setState(): ERROR: invalid state %d\n", newState);
791   } else
792   {
793     if (timeout_vals[newState].tv_sec == 0
794         && timeout_vals[newState].tv_usec == 0) { //no timer
795       timerSet = 0;
796     } else
797     {
798       timeradd(&now, &timeout_vals[newState], &timer);
799       timerSet = 1;
800     }
801     timeoutCntr = 0;
802     state = newState;
803     //TODO: only do this for states that require it
804     for (i = 0; i < numHosts; i++)
805       hostReplied[i] = 0;
806
807     dhtLog("setState(): state set to %s\n", state_names[state]);
808   }
809
810   return;
811 }
812
813 //TODO: improve these simple and inefficient functions
814 int checkReplied(unsigned int ipAddr) {
815   int i;
816
817   i = findHost(ipAddr);
818
819   if (i == -1)
820     return -1;
821
822   hostReplied[i] = 1;
823
824   return 0;
825 }
826
827 int allReplied() {
828   int i;
829
830   for (i = 0; i < numHosts; i++)
831     if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
832       return 0;
833
834   return 1;
835 }
836
837 int findHost(unsigned int ipAddr) {
838   int i;
839
840   for (i = 0; i < numHosts; i++)
841     if (hostArray[i].ipAddr == ipAddr)
842       return i;                   //found, return index
843
844   return -1;       //not found
845 }
846
847 int removeHost(unsigned int ipAddr) {
848   int i, j;
849
850   i = findHost(ipAddr);
851
852   if (i == -1)
853     return -1;
854
855   for (j = 0; j < numBlocks; j++) {
856     if (blockOwnerArray[j] == i)
857       blockOwnerArray[j] = 0;                   //TODO: is this what I want to have happen?
858     else if (blockOwnerArray[j] > i)
859       blockOwnerArray[j]--;
860   }
861
862   for (; i < numHosts - 1; i++) {
863     hostArray[i] = hostArray[i+1];
864     hostReplied[i] = hostReplied[i+1];
865   }
866   numHosts--;
867
868   return 0;
869 }
870
871 void removeUnresponsiveHosts() {
872   int i;
873
874   for (i = 0; i < numHosts; i++) {
875     if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr)
876       removeHost(hostArray[i].ipAddr);
877   }
878 }
879
880 int addHost(struct hostData newHost) {
881   struct hostData *newHostArray;
882   unsigned char *newHostReplied;
883   int i;
884   int j;
885
886   for (i = 0; i < numHosts; i++) {
887     if (hostArray[i].ipAddr == newHost.ipAddr) {
888       hostArray[i] = newHost;
889       hostReplied[i] = 0;
890       return 0;
891     } else if (hostArray[i].ipAddr > newHost.ipAddr) {
892       if (numHosts == hostArraySize) {
893         newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
894         newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
895         memcpy(newHostArray, hostArray, (i * sizeof(struct hostData)));
896         memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char)));
897         newHostArray[i] = newHost;
898         newHostReplied[i] = 0;
899         memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) *
900                                                    sizeof(struct hostData)));
901         memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) *
902                                                        sizeof(unsigned char)));
903         free(hostArray);
904         free(hostReplied);
905         hostArray = newHostArray;
906         hostReplied = newHostReplied;
907         hostArraySize = 2 * hostArraySize;
908       } else
909       {
910         for (j = numHosts; j > i; j--) {
911           hostArray[j] = hostArray[j-1];
912           hostReplied[j] = hostReplied[j-1];
913         }
914         hostArray[i] = newHost;
915         hostReplied[i] = 0;
916       }
917       for(j = 0; j < numBlocks; j++) {
918         if (blockOwnerArray[j] >= i)
919           blockOwnerArray[j]++;
920       }
921       numHosts++;
922       return 1;
923     }
924   }
925
926   //nothing greater, add to end
927   if (numHosts == hostArraySize) {
928     newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
929     newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
930     memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData)));
931     memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char)));
932     free(hostArray);
933     free(hostReplied);
934     hostArray = newHostArray;
935     hostReplied = newHostReplied;
936     hostArraySize = 2 * hostArraySize;
937   }
938
939   hostArray[numHosts] = newHost;
940   hostReplied[numHosts] = 0;
941   numHosts++;
942   return 1;
943 }
944
945 void makeAssignments() {
946   int i;
947
948   if (numBlocks < numHosts) {
949     free(blockOwnerArray);
950     while (numBlocks < numHosts)
951       numBlocks *= 2;
952     blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
953   }
954
955   for (i = 0; i < numBlocks; i++)
956     blockOwnerArray[i]  = i % numHosts;
957
958   return;
959 }
960
961 void writeHostList() {
962   int i;
963   struct in_addr tmpAddr;
964
965   fprintf(logfile, "numHosts = %d\n", numHosts);
966   for (i = 0; i < numHosts; i++) {
967     tmpAddr.s_addr = htonl(hostArray[i].ipAddr);
968     fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr),
969             hostArray[i].maxKeyCapacity);
970   }
971   return;
972 }
973
974 void dhtLog(const char *format, ...) {
975   va_list args;
976 //      struct timeval now;
977
978 //      if (gettimeofday(&now, NULL) < 0)
979 //      {       perror("dhtLog():gettimeofday()"); }
980   va_start(args, format);
981 //      if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
982 //      {       perror("dhtLog():fprintf()"); }
983   if (vfprintf(logfile, format, args) < 0) {
984     perror("dhtLog():vfprintf()");
985   }
986   if (fflush(logfile) == EOF) {
987     perror("dhtLog():fflush()");
988   }
989   va_end(args);
990
991   return;
992 }
993
994 void *fillTask() {
995   unsigned int *vals;
996   unsigned int *keys;
997   unsigned int numKeys;
998   int i;
999
1000   vals = mhashGetKeys(&numKeys);       //note: key of mhash is val of dht
1001   keys = calloc(numKeys, sizeof(unsigned int));
1002
1003   for (i = 0; i < numKeys; i++)
1004     keys[i] = myHostData.ipAddr;
1005
1006   if (dhtInsertMult(numKeys, keys, vals) == 0)
1007     fillStatus = 2;
1008   else
1009     fillStatus = 3;
1010
1011   pthread_exit(NULL);
1012 }
1013
1014 void *udpListen() {
1015   ssize_t bytesRcvd;
1016   struct sockaddr_in peerAddr;
1017   unsigned int peerIp;
1018   socklen_t socklen = sizeof(struct sockaddr_in);
1019   unsigned char inBuffer[MAX_MSG_SIZE];
1020   unsigned char outBuffer[MAX_MSG_SIZE];
1021   int pollret;
1022   struct timeval now;
1023   struct in_addr tmpAddr;
1024   struct hostData tmpHost;
1025   unsigned int tmpKey;
1026   unsigned int tmpVal;
1027   struct hostData *hostDataPtr;
1028   unsigned short *uShortPtr;
1029   unsigned int tmpUInt;
1030   unsigned int tmpUShort;
1031   int i;
1032   unsigned int oldState;
1033
1034   dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
1035
1036   while (1) {
1037     pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
1038     pthread_mutex_lock(&stateMutex);
1039     oldState = state;
1040     if (pollret < 0) {
1041       perror("udpListen():poll()");
1042     } else if (pollret > 0) {
1043       bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
1044                            (struct sockaddr *)&peerAddr, &socklen);
1045       if (bytesRcvd < 1) {
1046         dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
1047       } else if (inBuffer[0] >= NUM_MSG_TYPES) {
1048         dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
1049       } else if (!msgSizeOk(inBuffer, bytesRcvd)) {
1050         dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
1051                msg_types[inBuffer[0]], bytesRcvd);
1052       } else if (state == EXIT2_STATE) {
1053         //do nothing
1054       } else if (state == INIT1_STATE) {  //after initialization with seed, do not proceed until seed replies
1055         dhtLog("udpListen(): received %s from %s, %d bytes\n",
1056                msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1057         for (i = 0; i < bytesRcvd; i++)
1058           dhtLog(" %x", inBuffer[i]);
1059         dhtLog("\n");
1060         peerIp = ntohl(peerAddr.sin_addr.s_addr);
1061         if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES) {
1062           tmpHost.ipAddr = peerIp;
1063           tmpHost.maxKeyCapacity = 0;
1064           addHost(tmpHost);
1065           writeHostList();
1066           leader = read4(&inBuffer[1]);
1067           tmpAddr.s_addr = htonl(leader);
1068           dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
1069           if (leader != 0) {
1070             setState(INIT2_STATE);
1071             outBuffer[0] = JOIN_REQ;
1072             write4(&outBuffer[1], myHostData.maxKeyCapacity);
1073             udpSend(outBuffer, 5, leader);
1074           } else
1075           {
1076             electionOriginator = myHostData.ipAddr;
1077             setState(ELECT1_STATE);
1078             outBuffer[0] = ELECT_LEADER_CMD;
1079             write4(&outBuffer[1], myHostData.ipAddr);                                     //originator = me
1080             udpSendAll(outBuffer, 5);
1081           }
1082         }
1083       } else
1084       {
1085         dhtLog("udpListen(): received %s from %s, %d bytes\n",
1086                msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1087         for (i = 0; i < bytesRcvd; i++)
1088           dhtLog(" %x", inBuffer[i]);
1089         dhtLog("\n");
1090         peerIp = ntohl(peerAddr.sin_addr.s_addr);
1091         switch (inBuffer[0]) {
1092         case INSERT_CMD:
1093           if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1094               || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
1095               || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE) {
1096             tmpKey = read4(&inBuffer[1]);
1097             tmpVal = read4(&inBuffer[5]);
1098             outBuffer[0] = INSERT_RES;
1099             if (getKeyOwner(tmpKey) == myHostData.ipAddr) {
1100               if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
1101                 outBuffer[1] = OPERATION_OK;
1102               else
1103                 outBuffer[1] = INTERNAL_ERROR;
1104             } else
1105             {
1106               outBuffer[1] = NOT_KEY_OWNER;
1107             }
1108             //reply to client socket
1109             sendto(udpPollSock.fd, outBuffer, 2, 0,
1110                    (struct sockaddr *)&peerAddr, socklen);
1111           }
1112           break;
1113
1114         case REMOVE_CMD:
1115           if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1116               || state == LEAD_NORMAL2_STATE) {
1117             tmpKey = read4(&inBuffer[1]);
1118             outBuffer[0] = REMOVE_RES;
1119             if (getKeyOwner(tmpKey) == myHostData.ipAddr) {
1120               if (chashRemove(myHashTable, tmpKey) == 0)
1121                 outBuffer[1] = OPERATION_OK;
1122               else
1123                 outBuffer[1] = KEY_NOT_FOUND;
1124             } else
1125             {
1126               outBuffer[1] = NOT_KEY_OWNER;
1127             }
1128             //reply to client socket
1129             sendto(udpPollSock.fd, outBuffer, 2, 0,
1130                    (struct sockaddr *)&peerAddr, socklen);
1131           }
1132           break;
1133
1134         case SEARCH_CMD:
1135           if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1136               || state == LEAD_NORMAL2_STATE) {
1137             tmpKey = read4(&inBuffer[1]);
1138             outBuffer[0] = SEARCH_RES;
1139             if (getKeyOwner(tmpKey) == myHostData.ipAddr) {
1140               if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0) {
1141                 outBuffer[1] = OPERATION_OK;
1142                 write4(&outBuffer[2], tmpVal);
1143               } else
1144               {
1145                 outBuffer[1] = KEY_NOT_FOUND;
1146                 write4(&outBuffer[2], 0);
1147               }
1148             } else
1149             {
1150               outBuffer[1] = NOT_KEY_OWNER;
1151               write4(&outBuffer[2], 0);
1152             }
1153             //reply to client socket
1154             sendto(udpPollSock.fd, outBuffer, 6, 0,
1155                    (struct sockaddr *)&peerAddr, socklen);
1156           }
1157           break;
1158
1159         case WHO_IS_LEADER_CMD:
1160           tmpHost.ipAddr = peerIp;
1161           tmpHost.maxKeyCapacity = 0;
1162           addHost(tmpHost);
1163           writeHostList();
1164           outBuffer[0] = WHO_IS_LEADER_RES;
1165           //leader == 0 means I don't know who it is
1166           write4(&outBuffer[1], leader);
1167           udpSend(outBuffer, 5, peerIp);
1168           break;
1169
1170         case JOIN_REQ:
1171           if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE) {
1172             tmpHost.ipAddr = peerIp;
1173             tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
1174             addHost(tmpHost);
1175             writeHostList();
1176             if (state == LEAD_NORMAL1_STATE)
1177               setState(LEAD_NORMAL2_STATE);
1178             outBuffer[0] = JOIN_RES;
1179             outBuffer[1] = 0;                                             //status, success
1180             udpSend(outBuffer, 2, peerIp);
1181           } else if (state == LEAD_REBUILD1_STATE) {
1182             //note: I don't need to addHost().
1183             checkReplied(peerIp);
1184             outBuffer[0] = JOIN_RES;
1185             outBuffer[1] = 0;                                             //status, success
1186             udpSend(outBuffer, 2, peerIp);
1187             if (allReplied()) {
1188               makeAssignments();
1189               setState(LEAD_REBUILD2_STATE);
1190               outBuffer[0] = DHT_UPDATE_CMD;
1191               write2(&outBuffer[1], numHosts);
1192               write2(&outBuffer[3], numBlocks);
1193               memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
1194               memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1195                      blockOwnerArray, numBlocks*2);
1196               udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1197                          + 2 * numBlocks);
1198             }
1199           }
1200           break;
1201
1202         case JOIN_RES:
1203           if (state == REBUILD1_STATE) {
1204             setState(REBUILD2_STATE);
1205           } else if (state == INIT2_STATE) {
1206             setState(NORMAL_STATE);
1207           }
1208           break;
1209
1210         case LEAVE_REQ:
1211           if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE) { //TODO: make this graceful, instead of just rebuilding
1212             removeHost(peerIp);
1213             if (state != LEAD_NORMAL2_STATE)
1214               setState(LEAD_NORMAL2_STATE);
1215           }
1216           break;
1217
1218         case DHT_UPDATE_CMD:
1219           if (state == REBUILD2_STATE && peerIp == leader) {
1220             free(hostArray);
1221             free(blockOwnerArray);
1222             numHosts = read2(&inBuffer[1]);
1223             numBlocks = read2(&inBuffer[3]);
1224             while (hostArraySize < numHosts)
1225               hostArraySize *= 2;
1226             hostArray = calloc(hostArraySize, sizeof(struct hostData));
1227             blockOwnerArray = calloc(numBlocks, 2);
1228             memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
1229             memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
1230             writeHostList();
1231             setState(REBUILD3_STATE);
1232             outBuffer[0] = DHT_UPDATE_RES;
1233             udpSend(outBuffer, 1, peerIp);
1234           }
1235           break;
1236
1237         case DHT_UPDATE_RES:
1238           if (state == LEAD_REBUILD2_STATE) {
1239             checkReplied(peerIp);
1240             if (allReplied()) {
1241               setState(LEAD_REBUILD3_STATE);
1242               outBuffer[0] = FILL_DHT_CMD;
1243               udpSendAll(outBuffer, 1);
1244               if (fillStatus != 0)
1245                 dhtLog("udpListen(): ERROR: fillTask already running\n");
1246               fillStatus = 1;
1247               if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1248                 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1249             }
1250           }
1251           break;
1252
1253         case ELECT_LEADER_CMD:
1254           tmpUInt = read4(&inBuffer[1]);
1255           if ((state == ELECT1_STATE || state == ELECT2_STATE)
1256               && tmpUInt >= electionOriginator) { //already participating in a higher-priority election
1257             outBuffer[0] = ELECT_LEADER_RES;
1258             outBuffer[1] = 0xFF;
1259             udpSend(outBuffer, 2, peerIp);
1260           } else
1261           {                                       //join election
1262             electionOriginator = tmpUInt;
1263             electionParent = peerIp;
1264             setState(ELECT1_STATE);
1265             outBuffer[0] = ELECT_LEADER_CMD;
1266             write4(&outBuffer[1], electionOriginator);
1267             //don't bother forwarding the message to originator or parent
1268             checkReplied(electionOriginator);
1269             checkReplied(electionParent);
1270             if (allReplied()) {                           //in case that is everybody I know of
1271               setState(ELECT2_STATE);
1272               outBuffer[0] = ELECT_LEADER_RES;
1273               outBuffer[1] = 0;
1274               write2(&outBuffer[2], numHosts);
1275               memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1276                      * numHosts);
1277               udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1278                       electionParent);
1279             } else
1280             {
1281               udpSendAll(outBuffer, 5);
1282             }
1283           }
1284           break;
1285
1286         case ELECT_LEADER_RES:
1287           if (state == ELECT1_STATE) {
1288             checkReplied(peerIp);
1289             if (inBuffer[1] != 0xFF) {
1290               tmpUShort = read2(&inBuffer[2]);
1291               hostDataPtr = (struct hostData *)&inBuffer[4];
1292               for (i = 0; i < tmpUShort; i++)
1293                 addHost(hostDataPtr[i]);
1294               writeHostList();
1295             }
1296             if (allReplied()) {
1297               setState(ELECT2_STATE);
1298               if (electionOriginator == myHostData.ipAddr) {
1299                 leader = hostArray[0].ipAddr;
1300                 if (leader == myHostData.ipAddr) {                        //I am the leader
1301                   dhtLog("I am the leader!\n");
1302                   setState(LEAD_REBUILD1_STATE);
1303                   outBuffer[0] = REBUILD_CMD;
1304                   udpSendAll(outBuffer, 1);
1305                 } else
1306                 {                                                         //notify leader
1307                   outBuffer[0] = CONGRATS_CMD;
1308                   write2(&outBuffer[1], numHosts);
1309                   hostDataPtr = (struct hostData *)&outBuffer[3];
1310                   for (i = 0; i < numHosts; i++)
1311                     hostDataPtr[i] = hostArray[i];
1312                   udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1313                           leader);
1314                 }
1315               } else
1316               {
1317                 outBuffer[0] = ELECT_LEADER_RES;
1318                 outBuffer[1] = 0;
1319                 write2(&outBuffer[2], numHosts);
1320                 hostDataPtr = (struct hostData *)&outBuffer[4];
1321                 for (i = 0; i < numHosts; i++)
1322                   hostDataPtr[i] = hostArray[i];
1323                 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1324                         electionParent);
1325               }
1326             }
1327           }
1328           break;
1329
1330         case CONGRATS_CMD:
1331           if (state == ELECT2_STATE) {            //I am the leader
1332             leader = myHostData.ipAddr;
1333             dhtLog("I am the leader!\n");
1334             tmpUShort = read2(&inBuffer[1]);
1335             hostDataPtr = (struct hostData *)&inBuffer[3];
1336             for (i = 0; i < tmpUShort; i++)
1337               addHost(hostDataPtr[i]);
1338             writeHostList();
1339             setState(LEAD_REBUILD1_STATE);
1340             outBuffer[0] = REBUILD_CMD;
1341             udpSendAll(outBuffer, 1);
1342           }
1343           break;
1344
1345         case REBUILD_REQ:
1346           if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE) {
1347             setState(LEAD_REBUILD1_STATE);
1348             outBuffer[0] = REBUILD_CMD;
1349             udpSendAll(outBuffer, 1);
1350           }
1351           break;
1352
1353         case REBUILD_CMD:
1354           leader = peerIp;                                       //consider this a declaration of authority
1355           setState(REBUILD1_STATE);
1356           outBuffer[0] = JOIN_REQ;
1357           write4(&outBuffer[1], myHostData.maxKeyCapacity);
1358           udpSend(outBuffer, 5, leader);
1359           break;
1360
1361         case FILL_DHT_CMD:
1362           if (state == REBUILD3_STATE && peerIp == leader) {
1363             setState(REBUILD4_STATE);
1364             if (fillStatus != 0)
1365               dhtLog("udpListen(): ERROR: fillTask already running\n");
1366             fillStatus = 1;
1367             if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1368               dhtLog("udpListen(): ERROR creating threadFillTask\n");
1369           }
1370           break;
1371
1372         case FILL_DHT_RES:
1373           if (state == LEAD_REBUILD3_STATE) {
1374             checkReplied(peerIp);
1375             if (allReplied() && fillStatus == 2) {
1376               fillStatus = 0;
1377               setState(LEAD_REBUILD4_STATE);
1378               outBuffer[0] = RESUME_NORMAL_CMD;
1379               udpSendAll(outBuffer, 1);
1380             }
1381           }
1382           break;
1383
1384         case RESUME_NORMAL_CMD:
1385           if (state == REBUILD5_STATE && peerIp == leader) {
1386             setState(NORMAL_STATE);
1387             outBuffer[0] = RESUME_NORMAL_RES;
1388             udpSend(outBuffer, 1, leader);
1389           }
1390           break;
1391
1392         case RESUME_NORMAL_RES:
1393           if (state == LEAD_REBUILD4_STATE) {
1394             checkReplied(peerIp);
1395             if (allReplied()) {
1396               setState(LEAD_NORMAL1_STATE);
1397             }
1398           }
1399           break;
1400         }
1401       }
1402     }
1403     if (state == REBUILD4_STATE) {
1404       switch (fillStatus) {
1405       case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
1406         break;
1407
1408       case 1:                           //do nothing
1409         break;
1410
1411       case 2:                           //done filling the dht, notify leader
1412         fillStatus = 0;
1413         setState(REBUILD5_STATE);
1414         outBuffer[0] = FILL_DHT_RES;
1415         udpSend(outBuffer, 1, leader);
1416         break;
1417
1418       case 3:                           //error encountered -> restart rebuild
1419         fillStatus = 0;
1420         setState(REBUILD0_STATE);
1421         outBuffer[0] = REBUILD_REQ;
1422         udpSend(outBuffer, 1, leader);
1423         break;
1424       }
1425     }
1426     if (state == LEAD_REBUILD3_STATE) {
1427       switch (fillStatus) {
1428       case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
1429         break;
1430
1431       case 1:                           //do nothing
1432         break;
1433
1434       case 2:                           //I'm done, now is everybody else also done?
1435         if (allReplied()) {
1436           fillStatus = 0;
1437           setState(LEAD_REBUILD4_STATE);
1438           outBuffer[0] = RESUME_NORMAL_CMD;
1439           udpSendAll(outBuffer, 1);
1440         }
1441         break;
1442
1443       case 3:                           //error encountered -> restart rebuild
1444         fillStatus = 0;
1445         setState(LEAD_REBUILD1_STATE);
1446         outBuffer[0] = REBUILD_CMD;
1447         udpSendAll(outBuffer, 1);
1448         break;
1449       }
1450     }
1451     if (timerSet) {
1452       gettimeofday(&now, NULL);
1453       if (timercmp(&now, &timer, >)) {
1454         if (timeoutCntr < retry_vals[state]) {
1455           timeoutCntr++;
1456           timeradd(&now, &timeout_vals[state], &timer);
1457           dhtLog("udpListen(): retry: %d\n", timeoutCntr);
1458           switch (state) {
1459           case INIT1_STATE:
1460             outBuffer[0] = WHO_IS_LEADER_CMD;
1461             udpSend(outBuffer, 1, seed);
1462             break;
1463
1464           case INIT2_STATE:
1465             outBuffer[0] = JOIN_REQ;
1466             write4(&outBuffer[1], myHostData.maxKeyCapacity);
1467             udpSend(outBuffer, 5, leader);
1468             break;
1469
1470           case ELECT1_STATE:
1471             outBuffer[0] = ELECT_LEADER_CMD;
1472             write4(&outBuffer[1], electionOriginator);
1473             udpSendAll(outBuffer, 5);
1474             break;
1475
1476           case ELECT2_STATE:
1477             if (electionOriginator == myHostData.ipAddr) { //retry notify leader
1478               outBuffer[0] = CONGRATS_CMD;
1479               write2(&outBuffer[1], numHosts);
1480               memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1481                      * numHosts);
1482               udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1483                       leader);
1484             } else
1485             {
1486               outBuffer[0] = ELECT_LEADER_RES;
1487               outBuffer[1] = 0;
1488               write2(&outBuffer[2], numHosts);
1489               memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1490                      * numHosts);
1491               udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1492                       electionParent);
1493             }
1494             break;
1495
1496           case REBUILD0_STATE:
1497             outBuffer[0] = REBUILD_REQ;
1498             udpSend(outBuffer, 1, leader);
1499             break;
1500
1501           case REBUILD1_STATE:
1502             outBuffer[0] = JOIN_REQ;
1503             write4(&outBuffer[1], myHostData.maxKeyCapacity);
1504             udpSend(outBuffer, 5, leader);
1505             break;
1506
1507           case REBUILD5_STATE:
1508             outBuffer[0] = FILL_DHT_RES;
1509             udpSend(outBuffer, 1, leader);
1510             break;
1511
1512           case LEAD_REBUILD1_STATE:
1513             outBuffer[0] = REBUILD_CMD;
1514             udpSendAll(outBuffer, 1);
1515             break;
1516
1517           case LEAD_REBUILD2_STATE:
1518             outBuffer[0] = DHT_UPDATE_CMD;
1519             write2(&outBuffer[1], numHosts);
1520             write2(&outBuffer[3], numBlocks);
1521             memcpy(&outBuffer[5], hostArray, numHosts
1522                    * sizeof(struct hostData));
1523             memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1524                    blockOwnerArray, numBlocks*2);
1525             udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1526                        + 2 * numBlocks);
1527             break;
1528
1529           case LEAD_REBUILD3_STATE:
1530             outBuffer[0] = FILL_DHT_CMD;
1531             udpSendAll(outBuffer, 1);
1532             break;
1533
1534           case LEAD_REBUILD4_STATE:
1535             outBuffer[0] = RESUME_NORMAL_CMD;
1536             udpSendAll(outBuffer, 1);
1537             break;
1538
1539           case EXIT1_STATE:                                       //TODO...
1540             break;
1541
1542           case NORMAL_STATE:
1543           case LEAD_NORMAL1_STATE:
1544           case LEAD_NORMAL2_STATE:
1545           case REBUILD2_STATE:
1546           case REBUILD3_STATE:
1547           case REBUILD4_STATE:
1548           case EXIT2_STATE:                                       //we shouldn't get here
1549             break;
1550           }
1551         } else
1552         {
1553           dhtLog("udpListen(): timed out in state %s after %d retries\n",
1554                  state_names[state], timeoutCntr);
1555           switch (state) {
1556           case INIT1_STATE:
1557             setState(EXIT2_STATE);
1558             break;
1559
1560           case LEAD_NORMAL2_STATE:
1561             setState(LEAD_REBUILD1_STATE);
1562             outBuffer[0] = REBUILD_CMD;
1563             udpSendAll(outBuffer, 1);
1564             break;
1565
1566           case ELECT1_STATE:
1567             dhtLog("removing unresponsive hosts, before:\n");
1568             writeHostList();
1569             removeUnresponsiveHosts();
1570             dhtLog("after\n");
1571             writeHostList();
1572             setState(ELECT2_STATE);
1573             if (electionOriginator == myHostData.ipAddr) {
1574               leader = hostArray[0].ipAddr;
1575               if (leader == myHostData.ipAddr) {                  //I am the leader
1576                 dhtLog("I am the leader!\n");
1577                 setState(LEAD_REBUILD1_STATE);
1578                 outBuffer[0] = REBUILD_CMD;
1579                 udpSendAll(outBuffer, 1);
1580               } else
1581               {                                                   //notify leader
1582                 outBuffer[0] = CONGRATS_CMD;
1583                 write2(&outBuffer[1], numHosts);
1584                 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1585                        * numHosts);
1586                 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1587                         leader);
1588               }
1589             } else
1590             {
1591               outBuffer[0] = ELECT_LEADER_RES;
1592               outBuffer[1] = 0;
1593               write2(&outBuffer[2], numHosts);
1594               memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1595                      * numHosts);
1596               udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1597                       electionParent);
1598             }
1599             break;
1600
1601           case INIT2_STATE:
1602           case ELECT2_STATE:
1603           case REBUILD0_STATE:
1604           case REBUILD1_STATE:
1605           case REBUILD2_STATE:
1606           case REBUILD3_STATE:
1607           case REBUILD4_STATE:
1608           case REBUILD5_STATE:
1609           case LEAD_REBUILD1_STATE:
1610           case LEAD_REBUILD2_STATE:
1611           case LEAD_REBUILD3_STATE:
1612           case LEAD_REBUILD4_STATE:
1613             //start election
1614             electionOriginator = myHostData.ipAddr;
1615             setState(ELECT1_STATE);
1616             outBuffer[0] = ELECT_LEADER_CMD;
1617             write4(&outBuffer[1], myHostData.ipAddr);                                             //originator = me
1618             udpSendAll(outBuffer, 5);
1619             break;
1620
1621           case EXIT1_STATE:
1622             setState(EXIT2_STATE);
1623             break;
1624
1625           case NORMAL_STATE:
1626           case LEAD_NORMAL1_STATE:
1627           case EXIT2_STATE:                                       //we shouldn't get here
1628             break;
1629           }
1630         }
1631       }
1632     }
1633     if (state != oldState)
1634       pthread_cond_broadcast(&stateCond);
1635     pthread_mutex_unlock(&stateMutex);
1636   }
1637 }
1638