1 /*******************************************************************************
4 * High-performance Distributed Hash Table for finding the location of objects
5 * in a Distributed Shared Transactional Memory system.
10 * 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key
11 * counterparts repeatedly, define some new messages to handle it more
13 * 2) Improve the efficiency of functions that work with hostArray, hostReplied,
14 * and blockOwnerArray.
15 * 3) Currently a join or leave causes a rebuild of the entire hash table.
16 * Implement more graceful join and leave procedures.
17 * 4) Fine tune timeout values for performance, possibly implement a backoff
18 * algorithm to prevent overloading the network.
19 * 5) Whatever else I'm forgetting
21 *******************************************************************************/
22 /*******************************************************************************
24 *******************************************************************************/
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/ioctl.h>
41 #include <linux/sockios.h>
42 #include <sys/queue.h>
44 #include "clookup.h" //this works for now, do we need anything better?
47 /*******************************************************************************
48 * Local Defines, Structs
49 *******************************************************************************/
51 #define MAX_MSG_SIZE 1500
53 #define INIT_HOST_ALLOC 3
54 #define INIT_NUM_BLOCKS 16
55 #define DEFAULT_INTERFACE "eth0"
56 #define TIMEOUT_PERIOD 100
57 #define INSERT_TIMEOUT_MS 500
58 #define INSERT_RETRIES 50
59 #define REMOVE_TIMEOUT_MS 500
60 #define REMOVE_RETRIES 50
61 #define SEARCH_TIMEOUT_MS 500
62 #define SEARCH_RETRIES 50
65 //make sure this matches msg_types global var
95 //make sure this matches state_names, timeout_vals, and retry_vals global vars
132 unsigned int maxKeyCapacity;
135 /*******************************************************************************
136 * Local Function Prototypes
137 *******************************************************************************/
139 int msgSizeOk(unsigned char *msg, unsigned int size);
140 unsigned short read2(unsigned char *msg);
141 unsigned int read4(unsigned char *msg);
142 void write2(unsigned char *ptr, unsigned short tmp);
143 void write4(unsigned char *ptr, unsigned int tmp);
144 unsigned int getMyIpAddr(const char *interfaceStr);
145 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp);
146 int udpSendAll(unsigned char *msg, unsigned int size);
147 unsigned int hash(unsigned int x);
148 unsigned int getKeyOwner(unsigned int key);
149 void setState(unsigned int newState);
150 void makeAssignments();
151 int addHost(struct hostData newHost);
152 int removeHost(unsigned int ipAddr);
153 void removeUnresponsiveHosts();
154 int checkReplied(unsigned int ipAddr);
156 void writeHostList();
157 void dhtLog(const char *format, ...);
161 /*******************************************************************************
163 *******************************************************************************/
165 //make sure this matches enumeration above
166 const char *msg_types[NUM_MSG_TYPES] =
193 const char *state_names[NUM_STATES] =
198 "LEAD_NORMAL1_STATE",
199 "LEAD_NORMAL2_STATE",
208 "LEAD_REBUILD1_STATE",
209 "LEAD_REBUILD2_STATE",
210 "LEAD_REBUILD3_STATE",
211 "LEAD_REBUILD4_STATE",
216 //note: { 0, 0 } means no timeout
217 struct timeval timeout_vals[NUM_STATES] ={
218 { 0, 500000 }, //INIT1_STATE
219 { 0, 500000 }, //INIT2_STATE
220 { 0, 0 }, //NORMAL_STATE
221 { 0, 0 }, //LEAD_NORMAL1_STATE
222 { 3, 0 }, //LEAD_NORMAL2_STATE
223 { 1, 0 }, //ELECT1_STATE
224 { 1, 0 }, //ELECT2_STATE
225 { 0, 500000 }, //REBUILD0_STATE
226 { 0, 500000 }, //REBUILD1_STATE
227 { 10, 0 }, //REBUILD2_STATE
228 { 10, 0 }, //REBUILD3_STATE
229 { 10, 0 }, //REBUILD4_STATE
230 { 1, 0 }, //REBUILD5_STATE
231 { 1, 0 }, //LEAD_REBUILD1_STATE
232 { 1, 0 }, //LEAD_REBUILD2_STATE
233 { 10, 0 }, //LEAD_REBUILD3_STATE
234 { 10, 0 }, //LEAD_REBUILD4_STATE
235 { 0, 500000 }, //EXIT1_STATE
236 { 0, 0 } //EXIT2_STATE
239 int retry_vals[NUM_STATES] =
244 0, //LEAD_NORMAL1_STATE
245 0, //LEAD_NORMAL2_STATE
254 10, //LEAD_REBUILD1_STATE
255 10, //LEAD_REBUILD2_STATE
256 10, //LEAD_REBUILD3_STATE
257 10, //LEAD_REBUILD4_STATE
263 struct hostData myHostData;
264 pthread_t threadUdpListen;
265 pthread_t threadFillTask;
266 //status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error
268 struct pollfd udpPollSock;
272 unsigned int electionOriginator;
273 unsigned int electionParent;
274 unsigned int hostArraySize = 0;
275 struct hostData *hostArray = NULL;
276 unsigned int numBlocks = 0;
277 unsigned short *blockOwnerArray = NULL;
278 unsigned char *hostReplied = NULL;
279 pthread_mutex_t stateMutex;
280 pthread_cond_t stateCond;
281 chashtable_t *myHashTable;
282 unsigned int numHosts;
283 struct timeval timer;
287 /*******************************************************************************
288 * Interface Function Definitions
289 *******************************************************************************/
291 void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity) {
292 struct in_addr tmpAddr;
293 char filename[23] = "dht-";
294 struct sockaddr_in myAddr;
295 struct sockaddr_in seedAddr;
296 socklen_t socklen = sizeof(struct sockaddr_in);
299 tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE));
300 strcat(filename, inet_ntoa(tmpAddr));
301 strcat(filename, ".log");
302 printf("log file: %s\n", filename);
304 logfile = fopen(filename, "w");
305 dhtLog("dhtInit(): inializing...\n");
307 myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE);
308 myHostData.maxKeyCapacity = maxKeyCapacity;
312 electionOriginator = 0;
314 hostArraySize = INIT_HOST_ALLOC;
315 hostArray = calloc(hostArraySize, sizeof(struct hostData));
316 hostReplied = calloc(hostArraySize, sizeof(unsigned char));
317 hostArray[0] = myHostData;
319 numBlocks = INIT_NUM_BLOCKS;
320 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
321 pthread_mutex_init(&stateMutex, NULL);
322 pthread_cond_init(&stateCond, NULL);
323 myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
325 udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
326 if (udpPollSock.fd < 0)
327 perror("dhtInit():socket()");
329 udpPollSock.events = POLLIN;
331 bzero(&myAddr, socklen);
332 myAddr.sin_family = AF_INET;
333 myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
334 myAddr.sin_port = htons(UDP_PORT);
336 if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0)
337 perror("dhtInit():bind()");
340 dhtLog("I am the leader\n");
341 leader = myHostData.ipAddr;
342 setState(LEAD_NORMAL1_STATE);
345 initMsg = WHO_IS_LEADER_CMD;
346 udpSend(&initMsg, 1, seed);
347 setState(INIT1_STATE);
350 if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0)
351 dhtLog("dhtInit() - ERROR creating threadUdpListen\n");
356 void dhtExit() { //TODO: do this gracefully, wait for response from leader, etc.
360 udpSend(&msg, 1, leader);
361 dhtLog("dhtExit(): cleaning up...\n");
362 pthread_cancel(threadUdpListen);
363 close(udpPollSock.fd);
366 free(blockOwnerArray);
372 int dhtInsert(unsigned int key, unsigned int val) {
373 struct sockaddr_in toAddr;
374 struct sockaddr_in fromAddr;
375 socklen_t socklen = sizeof(struct sockaddr_in);
376 struct pollfd pollsock;
384 bzero((char *)&toAddr, socklen);
385 toAddr.sin_family = AF_INET;
386 toAddr.sin_port = htons(UDP_PORT);
388 while (status != OPERATION_OK){
389 pthread_mutex_lock(&stateMutex);
390 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
391 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
392 || state == LEAD_REBUILD3_STATE))
393 pthread_cond_wait(&stateCond, &stateMutex);
394 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
395 pthread_mutex_unlock(&stateMutex);
397 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0){
398 perror("dhtInsert():socket()");
401 pollsock.events = POLLIN;
403 outBuffer[0] = INSERT_CMD;
404 write4(&outBuffer[1], key);
405 write4(&outBuffer[5], val);
407 for (i = 0; i < INSERT_RETRIES; i++)
409 if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr,
411 perror("dhtInsert():sendto()");
414 retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS);
416 perror("dhtInsert():poll()");
420 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
421 (struct sockaddr *)&fromAddr, &socklen);
422 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
423 && fromAddr.sin_port == toAddr.sin_port
424 && bytesRcvd == 2 && inBuffer[0] == INSERT_RES){
425 status = inBuffer[1]; //status from remote host
430 if (status != OPERATION_OK){
431 pthread_mutex_lock(&stateMutex);
432 setState(REBUILD0_STATE);
433 outBuffer[0] = REBUILD_REQ;
434 udpSend(outBuffer, 1, leader);
435 pthread_mutex_unlock(&stateMutex);
444 int dhtInsertMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals) {
449 for (i = 0; i < numKeys; i++)
451 if (dhtInsert(keys[i], vals[i]) != 0)
457 int dhtRemove(unsigned int key) {
458 struct sockaddr_in toAddr;
459 struct sockaddr_in fromAddr;
460 socklen_t socklen = sizeof(struct sockaddr_in);
461 struct pollfd pollsock;
469 bzero((char *)&toAddr, socklen);
470 toAddr.sin_family = AF_INET;
471 toAddr.sin_port = htons(UDP_PORT);
473 while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)){
474 pthread_mutex_lock(&stateMutex);
475 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
476 || state == LEAD_NORMAL2_STATE))
477 pthread_cond_wait(&stateCond, &stateMutex);
478 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
479 pthread_mutex_unlock(&stateMutex);
481 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0){
482 perror("dhtRemove():socket()");
485 pollsock.events = POLLIN;
487 outBuffer[0] = REMOVE_CMD;
488 write4(&outBuffer[1], key);
490 for (i = 0; i < REMOVE_RETRIES; i++)
492 if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
494 perror("dhtRemove():sendto()");
497 retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS);
499 perror("dhtRemove():poll()");
503 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
504 (struct sockaddr *)&fromAddr, &socklen);
505 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
506 && fromAddr.sin_port == toAddr.sin_port
507 && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES){
508 status = inBuffer[1]; //status from remote host
513 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)){
514 pthread_mutex_lock(&stateMutex);
515 setState(REBUILD0_STATE);
516 outBuffer[0] = REBUILD_REQ;
517 udpSend(outBuffer, 1, leader);
518 pthread_mutex_unlock(&stateMutex);
527 int dhtRemoveMult(unsigned int numKeys, unsigned int *keys) {
532 for (i = 0; i < numKeys; i++)
534 if (dhtRemove(keys[i]) != 0)
540 int dhtSearch(unsigned int key, unsigned int *val) {
541 struct sockaddr_in toAddr;
542 struct sockaddr_in fromAddr;
543 socklen_t socklen = sizeof(struct sockaddr_in);
544 struct pollfd pollsock;
552 bzero((char *)&toAddr, socklen);
553 toAddr.sin_family = AF_INET;
554 toAddr.sin_port = htons(UDP_PORT);
556 while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)){
557 pthread_mutex_lock(&stateMutex);
558 while (numBlocks == 0)
559 pthread_cond_wait(&stateCond, &stateMutex);
560 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
561 pthread_mutex_unlock(&stateMutex);
563 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0){
564 perror("dhtSearch():socket()");
567 pollsock.events = POLLIN;
569 outBuffer[0] = SEARCH_CMD;
570 write4(&outBuffer[1], key);
572 for (i = 0; i < SEARCH_RETRIES; i++)
574 if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
576 perror("dhtSearch():sendto()");
579 retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS);
581 perror("dhtSearch():poll()");
585 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0,
586 (struct sockaddr *)&fromAddr, &socklen);
587 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
588 && fromAddr.sin_port == toAddr.sin_port
589 && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES){
590 status = inBuffer[1]; //status from remote host
591 *val = read4(&inBuffer[2]);
596 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)){
597 pthread_mutex_lock(&stateMutex);
598 setState(REBUILD0_STATE);
599 outBuffer[0] = REBUILD_REQ;
600 udpSend(outBuffer, 1, leader);
601 pthread_mutex_unlock(&stateMutex);
610 int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals) {
613 for (i = 0; i < numKeys; i++)
615 if (dhtSearch(keys[i], &vals[i]) != 0)
621 /*******************************************************************************
622 * Local Function Definitions
623 *******************************************************************************/
625 int msgSizeOk(unsigned char *msg, unsigned int size) {
626 unsigned short tmpNumHosts;
627 unsigned short tmpNumBlocks;
633 case WHO_IS_LEADER_CMD:
641 case RESUME_NORMAL_CMD:
642 case RESUME_NORMAL_RES:
652 case WHO_IS_LEADER_RES:
654 case ELECT_LEADER_CMD:
666 tmpNumHosts = read2(&msg[1]);
667 tmpNumBlocks = read2(&msg[3]);
668 return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks));
670 case ELECT_LEADER_RES:
677 tmpNumHosts = read2(&msg[2]);
678 return (size == (4 + sizeof(struct hostData) * tmpNumHosts));
683 tmpNumHosts = read2(&msg[1]);
684 return (size == (3 + sizeof(struct hostData) * tmpNumHosts));
691 unsigned short read2(unsigned char *ptr) {
692 unsigned short tmp = (ptr[1] << 8) | ptr[0];
696 unsigned int read4(unsigned char *ptr) {
697 unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0];
701 void write2(unsigned char *ptr, unsigned short tmp) {
702 ptr[1] = (tmp >> 8) & 0xFF;
707 void write4(unsigned char *ptr, unsigned int tmp) {
708 ptr[3] = (tmp >> 24) & 0xFF;
709 ptr[2] = (tmp >> 16) & 0xFF;
710 ptr[1] = (tmp >> 8) & 0xFF;
715 unsigned int getMyIpAddr(const char *interfaceStr) {
717 struct ifreq interfaceInfo;
718 struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
720 memset(&interfaceInfo, 0, sizeof(struct ifreq));
722 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
723 perror("getMyIpAddr():socket()");
727 strcpy(interfaceInfo.ifr_name, interfaceStr);
728 myAddr->sin_family = AF_INET;
730 if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0){
731 perror("getMyIpAddr():ioctl()");
735 return ntohl(myAddr->sin_addr.s_addr);
738 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp) {
739 struct sockaddr_in peerAddr;
740 socklen_t socklen = sizeof(struct sockaddr_in);
742 bzero(&peerAddr, socklen);
743 peerAddr.sin_family = AF_INET;
744 peerAddr.sin_addr.s_addr = htonl(destIp);
745 peerAddr.sin_port = htons(UDP_PORT);
748 if (msg[0] < NUM_MSG_TYPES)
749 dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]],
750 inet_ntoa(peerAddr.sin_addr), size);
752 dhtLog("udpSend(): sending unknown message to %s, %d bytes\n",
753 inet_ntoa(peerAddr.sin_addr), size);
756 if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr,
758 perror("udpSend():sendto()");
765 int udpSendAll(unsigned char *msg, unsigned int size) {
768 for (i = 0; i < numHosts; i++)
770 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr)){
771 if (udpSend(msg, size, hostArray[i].ipAddr) != 0)
778 //note: make sure this is only executed in a valid state, where numBlocks != 0
779 unsigned int hash(unsigned int x) {
780 return (x % numBlocks);
783 //note: make sure this is only executed in a valid state, where these arrays
784 // are allocated and the index mappings are consistent
785 unsigned int getKeyOwner(unsigned int key) {
786 return hostArray[blockOwnerArray[hash(key)]].ipAddr;
789 //sets state and timer, if applicable
790 void setState(unsigned int newState) {
794 gettimeofday(&now, NULL);
796 if (newState >= NUM_STATES){
797 dhtLog("setState(): ERROR: invalid state %d\n", newState);
800 if (timeout_vals[newState].tv_sec == 0
801 && timeout_vals[newState].tv_usec == 0){ //no timer
805 timeradd(&now, &timeout_vals[newState], &timer);
810 //TODO: only do this for states that require it
811 for (i = 0; i < numHosts; i++)
814 dhtLog("setState(): state set to %s\n", state_names[state]);
820 //TODO: improve these simple and inefficient functions
821 int checkReplied(unsigned int ipAddr) {
824 i = findHost(ipAddr);
837 for (i = 0; i < numHosts; i++)
838 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
844 int findHost(unsigned int ipAddr) {
847 for (i = 0; i < numHosts; i++)
848 if (hostArray[i].ipAddr == ipAddr)
849 return i; //found, return index
851 return -1; //not found
854 int removeHost(unsigned int ipAddr) {
857 i = findHost(ipAddr);
862 for (j = 0; j < numBlocks; j++)
864 if (blockOwnerArray[j] == i)
865 blockOwnerArray[j] = 0; //TODO: is this what I want to have happen?
866 else if (blockOwnerArray[j] > i)
867 blockOwnerArray[j]--;
870 for (; i < numHosts - 1; i++)
872 hostArray[i] = hostArray[i+1];
873 hostReplied[i] = hostReplied[i+1];
880 void removeUnresponsiveHosts() {
883 for (i = 0; i < numHosts; i++)
885 if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr)
886 removeHost(hostArray[i].ipAddr);
890 int addHost(struct hostData newHost) {
891 struct hostData *newHostArray;
892 unsigned char *newHostReplied;
896 for (i = 0; i < numHosts; i++)
898 if (hostArray[i].ipAddr == newHost.ipAddr){
899 hostArray[i] = newHost;
902 } else if (hostArray[i].ipAddr > newHost.ipAddr) {
903 if (numHosts == hostArraySize){
904 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
905 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
906 memcpy(newHostArray, hostArray, (i * sizeof(struct hostData)));
907 memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char)));
908 newHostArray[i] = newHost;
909 newHostReplied[i] = 0;
910 memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) *
911 sizeof(struct hostData)));
912 memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) *
913 sizeof(unsigned char)));
916 hostArray = newHostArray;
917 hostReplied = newHostReplied;
918 hostArraySize = 2 * hostArraySize;
921 for (j = numHosts; j > i; j--)
923 hostArray[j] = hostArray[j-1];
924 hostReplied[j] = hostReplied[j-1];
926 hostArray[i] = newHost;
929 for(j = 0; j < numBlocks; j++)
931 if (blockOwnerArray[j] >= i)
932 blockOwnerArray[j]++;
939 //nothing greater, add to end
940 if (numHosts == hostArraySize){
941 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
942 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
943 memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData)));
944 memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char)));
947 hostArray = newHostArray;
948 hostReplied = newHostReplied;
949 hostArraySize = 2 * hostArraySize;
952 hostArray[numHosts] = newHost;
953 hostReplied[numHosts] = 0;
958 void makeAssignments() {
961 if (numBlocks < numHosts){
962 free(blockOwnerArray);
963 while (numBlocks < numHosts)
965 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
968 for (i = 0; i < numBlocks; i++)
969 blockOwnerArray[i] = i % numHosts;
974 void writeHostList() {
976 struct in_addr tmpAddr;
978 fprintf(logfile, "numHosts = %d\n", numHosts);
979 for (i = 0; i < numHosts; i++)
981 tmpAddr.s_addr = htonl(hostArray[i].ipAddr);
982 fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr),
983 hostArray[i].maxKeyCapacity);
988 void dhtLog(const char *format, ...) {
990 // struct timeval now;
992 // if (gettimeofday(&now, NULL) < 0)
993 // { perror("dhtLog():gettimeofday()"); }
994 va_start(args, format);
995 // if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
996 // { perror("dhtLog():fprintf()"); }
997 if (vfprintf(logfile, format, args) < 0){
998 perror("dhtLog():vfprintf()");
1000 if (fflush(logfile) == EOF){
1001 perror("dhtLog():fflush()");
1011 unsigned int numKeys;
1014 vals = mhashGetKeys(&numKeys); //note: key of mhash is val of dht
1015 keys = calloc(numKeys, sizeof(unsigned int));
1017 for (i = 0; i < numKeys; i++)
1018 keys[i] = myHostData.ipAddr;
1020 if (dhtInsertMult(numKeys, keys, vals) == 0)
1030 struct sockaddr_in peerAddr;
1031 unsigned int peerIp;
1032 socklen_t socklen = sizeof(struct sockaddr_in);
1033 unsigned char inBuffer[MAX_MSG_SIZE];
1034 unsigned char outBuffer[MAX_MSG_SIZE];
1037 struct in_addr tmpAddr;
1038 struct hostData tmpHost;
1039 unsigned int tmpKey;
1040 unsigned int tmpVal;
1041 struct hostData *hostDataPtr;
1042 unsigned short *uShortPtr;
1043 unsigned int tmpUInt;
1044 unsigned int tmpUShort;
1046 unsigned int oldState;
1048 dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
1051 pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
1052 pthread_mutex_lock(&stateMutex);
1055 perror("udpListen():poll()");
1056 } else if (pollret > 0) {
1057 bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
1058 (struct sockaddr *)&peerAddr, &socklen);
1060 dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
1061 } else if (inBuffer[0] >= NUM_MSG_TYPES) {
1062 dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
1063 } else if (!msgSizeOk(inBuffer, bytesRcvd)) {
1064 dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
1065 msg_types[inBuffer[0]], bytesRcvd);
1066 } else if (state == EXIT2_STATE) {
1068 } else if (state == INIT1_STATE) { //after initialization with seed, do not proceed until seed replies
1069 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1070 msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1071 for (i = 0; i < bytesRcvd; i++)
1072 dhtLog(" %x", inBuffer[i]);
1074 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1075 if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES){
1076 tmpHost.ipAddr = peerIp;
1077 tmpHost.maxKeyCapacity = 0;
1080 leader = read4(&inBuffer[1]);
1081 tmpAddr.s_addr = htonl(leader);
1082 dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
1084 setState(INIT2_STATE);
1085 outBuffer[0] = JOIN_REQ;
1086 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1087 udpSend(outBuffer, 5, leader);
1090 electionOriginator = myHostData.ipAddr;
1091 setState(ELECT1_STATE);
1092 outBuffer[0] = ELECT_LEADER_CMD;
1093 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1094 udpSendAll(outBuffer, 5);
1099 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1100 msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1101 for (i = 0; i < bytesRcvd; i++)
1102 dhtLog(" %x", inBuffer[i]);
1104 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1105 switch (inBuffer[0]){
1107 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1108 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
1109 || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE){
1110 tmpKey = read4(&inBuffer[1]);
1111 tmpVal = read4(&inBuffer[5]);
1112 outBuffer[0] = INSERT_RES;
1113 if (getKeyOwner(tmpKey) == myHostData.ipAddr){
1114 if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
1115 outBuffer[1] = OPERATION_OK;
1117 outBuffer[1] = INTERNAL_ERROR;
1120 outBuffer[1] = NOT_KEY_OWNER;
1122 //reply to client socket
1123 sendto(udpPollSock.fd, outBuffer, 2, 0,
1124 (struct sockaddr *)&peerAddr, socklen);
1129 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1130 || state == LEAD_NORMAL2_STATE){
1131 tmpKey = read4(&inBuffer[1]);
1132 outBuffer[0] = REMOVE_RES;
1133 if (getKeyOwner(tmpKey) == myHostData.ipAddr){
1134 if (chashRemove(myHashTable, tmpKey) == 0)
1135 outBuffer[1] = OPERATION_OK;
1137 outBuffer[1] = KEY_NOT_FOUND;
1140 outBuffer[1] = NOT_KEY_OWNER;
1142 //reply to client socket
1143 sendto(udpPollSock.fd, outBuffer, 2, 0,
1144 (struct sockaddr *)&peerAddr, socklen);
1149 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1150 || state == LEAD_NORMAL2_STATE){
1151 tmpKey = read4(&inBuffer[1]);
1152 outBuffer[0] = SEARCH_RES;
1153 if (getKeyOwner(tmpKey) == myHostData.ipAddr){
1154 if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0){
1155 outBuffer[1] = OPERATION_OK;
1156 write4(&outBuffer[2], tmpVal);
1159 outBuffer[1] = KEY_NOT_FOUND;
1160 write4(&outBuffer[2], 0);
1164 outBuffer[1] = NOT_KEY_OWNER;
1165 write4(&outBuffer[2], 0);
1167 //reply to client socket
1168 sendto(udpPollSock.fd, outBuffer, 6, 0,
1169 (struct sockaddr *)&peerAddr, socklen);
1173 case WHO_IS_LEADER_CMD:
1174 tmpHost.ipAddr = peerIp;
1175 tmpHost.maxKeyCapacity = 0;
1178 outBuffer[0] = WHO_IS_LEADER_RES;
1179 //leader == 0 means I don't know who it is
1180 write4(&outBuffer[1], leader);
1181 udpSend(outBuffer, 5, peerIp);
1185 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE){
1186 tmpHost.ipAddr = peerIp;
1187 tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
1190 if (state == LEAD_NORMAL1_STATE)
1191 setState(LEAD_NORMAL2_STATE);
1192 outBuffer[0] = JOIN_RES;
1193 outBuffer[1] = 0; //status, success
1194 udpSend(outBuffer, 2, peerIp);
1195 } else if (state == LEAD_REBUILD1_STATE) {
1196 //note: I don't need to addHost().
1197 checkReplied(peerIp);
1198 outBuffer[0] = JOIN_RES;
1199 outBuffer[1] = 0; //status, success
1200 udpSend(outBuffer, 2, peerIp);
1203 setState(LEAD_REBUILD2_STATE);
1204 outBuffer[0] = DHT_UPDATE_CMD;
1205 write2(&outBuffer[1], numHosts);
1206 write2(&outBuffer[3], numBlocks);
1207 memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
1208 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1209 blockOwnerArray, numBlocks*2);
1210 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1217 if (state == REBUILD1_STATE){
1218 setState(REBUILD2_STATE);
1219 } else if (state == INIT2_STATE) {
1220 setState(NORMAL_STATE);
1225 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE){ //TODO: make this graceful, instead of just rebuilding
1227 if (state != LEAD_NORMAL2_STATE)
1228 setState(LEAD_NORMAL2_STATE);
1232 case DHT_UPDATE_CMD:
1233 if (state == REBUILD2_STATE && peerIp == leader){
1235 free(blockOwnerArray);
1236 numHosts = read2(&inBuffer[1]);
1237 numBlocks = read2(&inBuffer[3]);
1238 while (hostArraySize < numHosts)
1240 hostArray = calloc(hostArraySize, sizeof(struct hostData));
1241 blockOwnerArray = calloc(numBlocks, 2);
1242 memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
1243 memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
1245 setState(REBUILD3_STATE);
1246 outBuffer[0] = DHT_UPDATE_RES;
1247 udpSend(outBuffer, 1, peerIp);
1251 case DHT_UPDATE_RES:
1252 if (state == LEAD_REBUILD2_STATE){
1253 checkReplied(peerIp);
1255 setState(LEAD_REBUILD3_STATE);
1256 outBuffer[0] = FILL_DHT_CMD;
1257 udpSendAll(outBuffer, 1);
1258 if (fillStatus != 0)
1259 dhtLog("udpListen(): ERROR: fillTask already running\n");
1261 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1262 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1267 case ELECT_LEADER_CMD:
1268 tmpUInt = read4(&inBuffer[1]);
1269 if ((state == ELECT1_STATE || state == ELECT2_STATE)
1270 && tmpUInt >= electionOriginator){ //already participating in a higher-priority election
1271 outBuffer[0] = ELECT_LEADER_RES;
1272 outBuffer[1] = 0xFF;
1273 udpSend(outBuffer, 2, peerIp);
1276 electionOriginator = tmpUInt;
1277 electionParent = peerIp;
1278 setState(ELECT1_STATE);
1279 outBuffer[0] = ELECT_LEADER_CMD;
1280 write4(&outBuffer[1], electionOriginator);
1281 //don't bother forwarding the message to originator or parent
1282 checkReplied(electionOriginator);
1283 checkReplied(electionParent);
1284 if (allReplied()){ //in case that is everybody I know of
1285 setState(ELECT2_STATE);
1286 outBuffer[0] = ELECT_LEADER_RES;
1288 write2(&outBuffer[2], numHosts);
1289 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1291 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1295 udpSendAll(outBuffer, 5);
1300 case ELECT_LEADER_RES:
1301 if (state == ELECT1_STATE){
1302 checkReplied(peerIp);
1303 if (inBuffer[1] != 0xFF){
1304 tmpUShort = read2(&inBuffer[2]);
1305 hostDataPtr = (struct hostData *)&inBuffer[4];
1306 for (i = 0; i < tmpUShort; i++)
1307 addHost(hostDataPtr[i]);
1311 setState(ELECT2_STATE);
1312 if (electionOriginator == myHostData.ipAddr){
1313 leader = hostArray[0].ipAddr;
1314 if (leader == myHostData.ipAddr){ //I am the leader
1315 dhtLog("I am the leader!\n");
1316 setState(LEAD_REBUILD1_STATE);
1317 outBuffer[0] = REBUILD_CMD;
1318 udpSendAll(outBuffer, 1);
1321 outBuffer[0] = CONGRATS_CMD;
1322 write2(&outBuffer[1], numHosts);
1323 hostDataPtr = (struct hostData *)&outBuffer[3];
1324 for (i = 0; i < numHosts; i++)
1325 hostDataPtr[i] = hostArray[i];
1326 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1331 outBuffer[0] = ELECT_LEADER_RES;
1333 write2(&outBuffer[2], numHosts);
1334 hostDataPtr = (struct hostData *)&outBuffer[4];
1335 for (i = 0; i < numHosts; i++)
1336 hostDataPtr[i] = hostArray[i];
1337 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1345 if (state == ELECT2_STATE){ //I am the leader
1346 leader = myHostData.ipAddr;
1347 dhtLog("I am the leader!\n");
1348 tmpUShort = read2(&inBuffer[1]);
1349 hostDataPtr = (struct hostData *)&inBuffer[3];
1350 for (i = 0; i < tmpUShort; i++)
1351 addHost(hostDataPtr[i]);
1353 setState(LEAD_REBUILD1_STATE);
1354 outBuffer[0] = REBUILD_CMD;
1355 udpSendAll(outBuffer, 1);
1360 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE){
1361 setState(LEAD_REBUILD1_STATE);
1362 outBuffer[0] = REBUILD_CMD;
1363 udpSendAll(outBuffer, 1);
1368 leader = peerIp; //consider this a declaration of authority
1369 setState(REBUILD1_STATE);
1370 outBuffer[0] = JOIN_REQ;
1371 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1372 udpSend(outBuffer, 5, leader);
1376 if (state == REBUILD3_STATE && peerIp == leader){
1377 setState(REBUILD4_STATE);
1378 if (fillStatus != 0)
1379 dhtLog("udpListen(): ERROR: fillTask already running\n");
1381 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1382 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1387 if (state == LEAD_REBUILD3_STATE){
1388 checkReplied(peerIp);
1389 if (allReplied() && fillStatus == 2){
1391 setState(LEAD_REBUILD4_STATE);
1392 outBuffer[0] = RESUME_NORMAL_CMD;
1393 udpSendAll(outBuffer, 1);
1398 case RESUME_NORMAL_CMD:
1399 if (state == REBUILD5_STATE && peerIp == leader){
1400 setState(NORMAL_STATE);
1401 outBuffer[0] = RESUME_NORMAL_RES;
1402 udpSend(outBuffer, 1, leader);
1406 case RESUME_NORMAL_RES:
1407 if (state == LEAD_REBUILD4_STATE){
1408 checkReplied(peerIp);
1410 setState(LEAD_NORMAL1_STATE);
1417 if (state == REBUILD4_STATE){
1418 switch (fillStatus){
1419 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
1422 case 1: //do nothing
1425 case 2: //done filling the dht, notify leader
1427 setState(REBUILD5_STATE);
1428 outBuffer[0] = FILL_DHT_RES;
1429 udpSend(outBuffer, 1, leader);
1432 case 3: //error encountered -> restart rebuild
1434 setState(REBUILD0_STATE);
1435 outBuffer[0] = REBUILD_REQ;
1436 udpSend(outBuffer, 1, leader);
1440 if (state == LEAD_REBUILD3_STATE){
1441 switch (fillStatus){
1442 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
1445 case 1: //do nothing
1448 case 2: //I'm done, now is everybody else also done?
1451 setState(LEAD_REBUILD4_STATE);
1452 outBuffer[0] = RESUME_NORMAL_CMD;
1453 udpSendAll(outBuffer, 1);
1457 case 3: //error encountered -> restart rebuild
1459 setState(LEAD_REBUILD1_STATE);
1460 outBuffer[0] = REBUILD_CMD;
1461 udpSendAll(outBuffer, 1);
1466 gettimeofday(&now, NULL);
1467 if (timercmp(&now, &timer, >)){
1468 if (timeoutCntr < retry_vals[state]){
1470 timeradd(&now, &timeout_vals[state], &timer);
1471 dhtLog("udpListen(): retry: %d\n", timeoutCntr);
1474 outBuffer[0] = WHO_IS_LEADER_CMD;
1475 udpSend(outBuffer, 1, seed);
1479 outBuffer[0] = JOIN_REQ;
1480 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1481 udpSend(outBuffer, 5, leader);
1485 outBuffer[0] = ELECT_LEADER_CMD;
1486 write4(&outBuffer[1], electionOriginator);
1487 udpSendAll(outBuffer, 5);
1491 if (electionOriginator == myHostData.ipAddr){ //retry notify leader
1492 outBuffer[0] = CONGRATS_CMD;
1493 write2(&outBuffer[1], numHosts);
1494 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1496 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1500 outBuffer[0] = ELECT_LEADER_RES;
1502 write2(&outBuffer[2], numHosts);
1503 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1505 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1510 case REBUILD0_STATE:
1511 outBuffer[0] = REBUILD_REQ;
1512 udpSend(outBuffer, 1, leader);
1515 case REBUILD1_STATE:
1516 outBuffer[0] = JOIN_REQ;
1517 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1518 udpSend(outBuffer, 5, leader);
1521 case REBUILD5_STATE:
1522 outBuffer[0] = FILL_DHT_RES;
1523 udpSend(outBuffer, 1, leader);
1526 case LEAD_REBUILD1_STATE:
1527 outBuffer[0] = REBUILD_CMD;
1528 udpSendAll(outBuffer, 1);
1531 case LEAD_REBUILD2_STATE:
1532 outBuffer[0] = DHT_UPDATE_CMD;
1533 write2(&outBuffer[1], numHosts);
1534 write2(&outBuffer[3], numBlocks);
1535 memcpy(&outBuffer[5], hostArray, numHosts
1536 * sizeof(struct hostData));
1537 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1538 blockOwnerArray, numBlocks*2);
1539 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1543 case LEAD_REBUILD3_STATE:
1544 outBuffer[0] = FILL_DHT_CMD;
1545 udpSendAll(outBuffer, 1);
1548 case LEAD_REBUILD4_STATE:
1549 outBuffer[0] = RESUME_NORMAL_CMD;
1550 udpSendAll(outBuffer, 1);
1553 case EXIT1_STATE: //TODO...
1557 case LEAD_NORMAL1_STATE:
1558 case LEAD_NORMAL2_STATE:
1559 case REBUILD2_STATE:
1560 case REBUILD3_STATE:
1561 case REBUILD4_STATE:
1562 case EXIT2_STATE: //we shouldn't get here
1567 dhtLog("udpListen(): timed out in state %s after %d retries\n",
1568 state_names[state], timeoutCntr);
1571 setState(EXIT2_STATE);
1574 case LEAD_NORMAL2_STATE:
1575 setState(LEAD_REBUILD1_STATE);
1576 outBuffer[0] = REBUILD_CMD;
1577 udpSendAll(outBuffer, 1);
1581 dhtLog("removing unresponsive hosts, before:\n");
1583 removeUnresponsiveHosts();
1586 setState(ELECT2_STATE);
1587 if (electionOriginator == myHostData.ipAddr){
1588 leader = hostArray[0].ipAddr;
1589 if (leader == myHostData.ipAddr){ //I am the leader
1590 dhtLog("I am the leader!\n");
1591 setState(LEAD_REBUILD1_STATE);
1592 outBuffer[0] = REBUILD_CMD;
1593 udpSendAll(outBuffer, 1);
1596 outBuffer[0] = CONGRATS_CMD;
1597 write2(&outBuffer[1], numHosts);
1598 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1600 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1605 outBuffer[0] = ELECT_LEADER_RES;
1607 write2(&outBuffer[2], numHosts);
1608 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1610 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1617 case REBUILD0_STATE:
1618 case REBUILD1_STATE:
1619 case REBUILD2_STATE:
1620 case REBUILD3_STATE:
1621 case REBUILD4_STATE:
1622 case REBUILD5_STATE:
1623 case LEAD_REBUILD1_STATE:
1624 case LEAD_REBUILD2_STATE:
1625 case LEAD_REBUILD3_STATE:
1626 case LEAD_REBUILD4_STATE:
1628 electionOriginator = myHostData.ipAddr;
1629 setState(ELECT1_STATE);
1630 outBuffer[0] = ELECT_LEADER_CMD;
1631 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1632 udpSendAll(outBuffer, 5);
1636 setState(EXIT2_STATE);
1640 case LEAD_NORMAL1_STATE:
1641 case EXIT2_STATE: //we shouldn't get here
1647 if (state != oldState)
1648 pthread_cond_broadcast(&stateCond);
1649 pthread_mutex_unlock(&stateMutex);