Tested C++ RPS/RMI libraries for arbitrary objects and arbitrary remote calls
authorrtrimana <rtrimana@uci.edu>
Tue, 31 Jan 2017 23:36:14 +0000 (15:36 -0800)
committerrtrimana <rtrimana@uci.edu>
Tue, 31 Jan 2017 23:36:14 +0000 (15:36 -0800)
13 files changed:
iotjava/iotrmi/C++/ConcurrentLinkedListQueue.cpp
iotjava/iotrmi/C++/ConcurrentLinkedListQueue.hpp
iotjava/iotrmi/C++/IoTRMICall.hpp
iotjava/iotrmi/C++/IoTRMIComm.hpp [new file with mode: 0644]
iotjava/iotrmi/C++/IoTRMICommClient.hpp [new file with mode: 0644]
iotjava/iotrmi/C++/IoTRMICommServer.hpp [new file with mode: 0644]
iotjava/iotrmi/C++/IoTRMIUtil.hpp
iotjava/iotrmi/C++/IoTSocket.hpp
iotjava/iotrmi/C++/basics/CallBack.hpp
iotjava/iotrmi/C++/basics/TestClass.hpp
iotjava/iotrmi/C++/basics/TestClassComplete_Stub.cpp
iotjava/iotrmi/C++/basics/TestClassInterface_Skeleton.cpp
iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java

index 3fd02736249b7dbe30cdffade393acda05c274f4..9b6826d765d184ada8cde7b3de0fd8d1a6b62ef8 100644 (file)
@@ -6,9 +6,10 @@
 using namespace std;
 
 
-Node::Node(void* val) {
+Node::Node(char* val, int len) {
 
        value = val;
+       length = len;
        next = NULL;
 }
 
@@ -26,12 +27,18 @@ Node::~Node() {
 }
 
 
-void* Node::getValue() {
+char* Node::getValue() {
 
        return value;
 }
 
 
+int Node::getLength() {
+
+       return length;
+}
+
+
 Node* Node::getNext() {
 
        return next;
@@ -53,21 +60,21 @@ ConcurrentLinkedListQueue::ConcurrentLinkedListQueue() {
 
 ConcurrentLinkedListQueue::~ConcurrentLinkedListQueue() {
 
-       void* val = NULL;
+       char* val = NULL;
        do {    // Dequeue and free everything up
                val = dequeue();
        } while(val != NULL);
 }
 
 
-void ConcurrentLinkedListQueue::enqueue(void* value) {
+void ConcurrentLinkedListQueue::enqueue(char* value, int length) {
 
-       lock_guard<mutex> guard(mtx);
+       lock_guard<mutex> guard(queueMutex);
        if (tail == NULL && head == NULL) {     // first element
-               tail = new Node(value);
+               tail = new Node(value, length);
                head = tail;    // Both tail and head point to the first element
        } else {        // Next elements
-               Node* newEl = new Node(value);
+               Node* newEl = new Node(value, length);
                tail->setNext(newEl);
                tail = newEl;
        }
@@ -75,9 +82,9 @@ void ConcurrentLinkedListQueue::enqueue(void* value) {
 
 
 // Return element and remove from list
-void* ConcurrentLinkedListQueue::dequeue() {
+char* ConcurrentLinkedListQueue::dequeue() {
 
-       lock_guard<mutex> guard(mtx);
+       lock_guard<mutex> guard(queueMutex);
        if (tail == NULL && head == NULL) {     // empty
                return NULL;
        } else {
@@ -87,7 +94,7 @@ void* ConcurrentLinkedListQueue::dequeue() {
                        tail = NULL;
                } else
                        head = head->getNext();
-               void* retVal = retEl->getValue();
+               char* retVal = retEl->getValue();
                // Prepare retEl for deletion
                retEl->setNext(NULL);
                delete retEl;
@@ -96,80 +103,33 @@ void* ConcurrentLinkedListQueue::dequeue() {
        }
 }
 
+// Return element, length, and remove it from list
+char* ConcurrentLinkedListQueue::deQAndGetLength(int* length) {
 
-void enQ(ConcurrentLinkedListQueue* methodQueue, char** test) {
-
-       /*char* test = new char[3];
-       test[0] = 'a';
-       test[1] = 'b';
-       test[2] = 'c';*/
-       void* ptr = test;
-       for(int i=0; i<10; i++ ) {
-               cout << "Enqueuing: " << test << " address: " << ptr << endl;
-               methodQueue->enqueue(ptr);
-       }
-}
-
-
-void deQ(ConcurrentLinkedListQueue* methodQueue) {
-
-       for(int i=0; i<12; i++) {
-               void* result = methodQueue->dequeue();
-               if (result != NULL) {
-                       cout << "Dequeue result: " << result << endl;
-                       cout << "Dequeue result: " << *((char**) result) << endl;               
-               } else {
-                       cout << "Result is NULL!!! End of queue!" << endl;
-               }
+       lock_guard<mutex> guard(queueMutex);
+       if (tail == NULL && head == NULL) {     // empty
+               *length = 0;
+               return 0;
+       } else {
+               Node* retEl = head;
+               if (head->getNext() == NULL) {
+                       head = NULL;
+                       tail = NULL;
+               } else
+                       head = head->getNext();
+               char* retVal = retEl->getValue();
+               *length = retEl->getLength();
+               // Prepare retEl for deletion
+               retEl->setNext(NULL);
+               delete retEl;
+               // Return just the value
+               //cout << "Print bytes inside dequeue: ";
+               //IoTRMIUtil::printBytes(*((char**) retVal), *length, false);
+               //cout << "Dequeuing: " << *((char**) retVal) << endl;
+               //cout << "Dequeuing address: " << std::ref(retVal) << endl;
+               return retVal;
        }
 }
 
 
-int main(int argc, char *argv[])
-{
-       ConcurrentLinkedListQueue* methodQueue = new ConcurrentLinkedListQueue();
-       /*cout << "Dequeue result: " << methodQueue->dequeue() << endl;
-       string str = "this is a test string";
-       const char* test = str.c_str();
-       const char** test2 = &test;
-       cout << "Initial result: " << test << endl;
-       cout << "Initial result 2: " << *test2 << endl;
-       void* ptr = &test;
-       cout << "Pointer: " << ptr << endl;
-       methodQueue->enqueue(ptr);
-       methodQueue->enqueue(ptr);
-       void* result = methodQueue->dequeue();
-       cout << "Result: " << result << endl;
-       cout << "Dequeue result: " << *((const char**) result) << endl;
-       void* result2 = methodQueue->dequeue();
-       cout << "Dequeue result: " << *((const char**) result2) << endl;
-       void* result3 = methodQueue->dequeue();
-       cout << "Dequeue result: " << result3 << endl;*/
-       //thread t1,t2;
-
-       //t1 = thread(enQ, methodQueue);
-       //t2 = thread(deQ, methodQueue);
-
-       //t1.join();
-       //t2.join();
-
-       char* test = new char[3];
-       test[0] = 'a';
-       test[1] = 'b';
-       test[2] = 'c';
-       void* ptr = &test;
-       methodQueue->enqueue(ptr);
-       void* result = methodQueue->dequeue();
-       cout << "Dequeue result: " << *((char**) result) << endl;
-
-       thread t1,t2;
-
-       t1 = thread(enQ, methodQueue, &test);
-       t2 = thread(deQ, methodQueue);
-
-       t1.join();
-       t2.join();
-
-       return 0;
-}
 
index e18e5e44f4991da763655a996dc2ae1b529438bf..20073236b629d05d70ed2b27a975d11a37f0b114 100644 (file)
@@ -3,10 +3,12 @@
 #include <iostream>
 #include <mutex>
 
+#include "IoTRMIUtil.hpp"
+
 /** Class ConcurrentLinkedListQueue is a queue that can handle
  *  concurrent requests and packets for IoT communication via socket.
  *  <p>
- *  It stores object through a void pointer.
+ *  It stores object through a char pointer.
  *
  * @author      Rahmadi Trimananda <rtrimana @ uci.edu>
  * @version     1.0
 
 using namespace std;
 
-mutex mtx;
+mutex queueMutex;
 
 class Node {
 
        private:
                Node* next;
-               void* value;
+               char* value;
+               int length;
 
        public:
-               Node(void* val);
+               Node(char* val, int len);
                ~Node();
-               void* getValue();
+               char* getValue();
+               int getLength();
                Node* getNext();
                void setNext(Node* nxt);
 
@@ -42,7 +46,8 @@ class ConcurrentLinkedListQueue {
        public:
                ConcurrentLinkedListQueue();
                ~ConcurrentLinkedListQueue();
-               void enqueue(void* value);      // Enqueue to tail
-               void* dequeue();                        // Dequeue from tail
+               void enqueue(char* value, int length);  // Enqueue to tail
+               char* dequeue();                                                // Dequeue from tail
+               char* deQAndGetLength(int* length);             // Dequeue from tail and return length
 };
 #endif
index a63079846b4c7797bc9a37adebfe676b227dd3d9..a92f3e3732da99b090073d334cf5d96d47abd3f8 100644 (file)
@@ -37,7 +37,6 @@ class IoTRMICall {
                void**  getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]);
 
        private:
-               map<string,int>         mapSign2MethodId;
                IoTRMIUtil                      *rmiUtil;
                IoTSocketClient         *rmiClient;
 
diff --git a/iotjava/iotrmi/C++/IoTRMIComm.hpp b/iotjava/iotrmi/C++/IoTRMIComm.hpp
new file mode 100644 (file)
index 0000000..9f45c60
--- /dev/null
@@ -0,0 +1,477 @@
+/** Class IoTRMIComm combines the functionalities
+ *  of IoTRMICall and IoTRMIObject to create a single
+ *  communication class with two sockets serving one
+ *  directional traffic for each.
+ *
+ * @author      Rahmadi Trimananda <rtrimana @ uci.edu>
+ * @version     1.0
+ * @since       2017-01-28
+ */
+#ifndef _IOTRMICOMM_HPP__
+#define _IOTRMICOMM_HPP__
+
+#include <iostream>
+#include <string>
+#include <atomic>
+#include <limits>
+#include <thread>
+#include <mutex>
+
+#include "IoTSocketServer.hpp"
+#include "IoTSocketClient.hpp"
+#include "ConcurrentLinkedListQueue.cpp"
+
+using namespace std;
+
+std::atomic<bool> didGetMethodBytes(false);
+std::atomic<bool> didGetReturnBytes(false);
+
+mutex regSkelMutex;
+mutex regStubMutex;
+mutex retValMutex;
+mutex remoteCallMutex;
+mutex sendReturnObjMutex;
+
+class IoTRMIComm {
+       public:
+               IoTRMIComm();
+               ~IoTRMIComm();
+               // Public methods
+               virtual void            sendReturnObj(void* retObj, string type, char* methodBytes) = 0;
+               virtual void            sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) = 0;
+               int                                     returnLength(void* retObj[], string retCls[], int numRet);
+               char*                           returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet);
+               char*                           getMethodBytes();
+               int                                     getMethodLength();
+               int                                     getObjectIdFromMethod();
+               static int                      getObjectId(char* packetBytes);
+               static int                      getMethodId(char* packetBytes);
+               static int                      getPacketType(char* packetBytes);
+               void**                          getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes);
+               void                            registerSkeleton(int objectId, bool* methodReceived);
+               void                            registerStub(int objectId, int methodId, bool* retValueReceived);
+               int                                     getObjectIdCounter();
+               void                            setObjectIdCounter(int objIdCounter);
+               void                            decrementObjectIdCounter();
+
+               int                                     methodLength(string paramCls[], void* paramObj[], int numParam);
+               char*                           methodToBytes(int objectId, int methId, string paramCls[], void* paramObj[],
+                                                               char* method, int numParam);
+               virtual void            remoteCall(int objectId, int methodId, string paramCls[], 
+                                                               void* paramObj[], int numParam) = 0;
+               void*                           getReturnValue(string retType, void* retObj);
+               
+               void**                          getStructObjects(string retType[], int numRet, void* retObj[]);
+               void**                          getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]);
+
+       protected:
+               IoTRMIUtil                                      *rmiUtil;
+               char*                                           methodBytes;
+               int                                                     methodLen;
+               char*                                           retValueBytes;
+               int                                                     retValueLen;
+               ConcurrentLinkedListQueue       methodQueue;
+               ConcurrentLinkedListQueue       returnQueue;
+               map<int,bool*>                          mapSkeletonId;
+               map<string,bool*>                       mapStubId;
+               int                                                     objectIdCounter = std::numeric_limits<int>::max();
+
+       private:
+               // Private methods
+               void                            wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm);
+               void                            wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm);
+};
+
+
+// Constructor
+IoTRMIComm::IoTRMIComm() {
+
+       rmiUtil = new IoTRMIUtil();
+       methodBytes = NULL;
+       retValueBytes = NULL;
+       methodLen = 0;
+       retValueLen = 0;
+       thread th1 (&IoTRMIComm::wakeUpThreadOnMethodCall, this, this);
+       th1.detach();
+       thread th2 (&IoTRMIComm::wakeUpThreadOnReturnValue, this, this);
+       th2.detach();   
+
+}
+
+
+// Destructor
+IoTRMIComm::~IoTRMIComm() {
+
+       // Clean up
+       if (rmiUtil != NULL) {  
+               delete rmiUtil;
+               rmiUtil = NULL;         
+       }
+}
+
+
+void IoTRMIComm::wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm) {
+
+       int methLen = 0;
+       //cout << "Starting wakeUpThreadOnMethodCall()" << endl;
+       while(true) {
+               // Convert back to char*
+               char* queueHead = rmiComm->methodQueue.deQAndGetLength(&methLen);
+               if (queueHead != NULL) {
+                       rmiComm->methodBytes = queueHead;
+                       rmiComm->methodLen = methLen;
+                       //IoTRMIUtil::printBytes(rmiComm->methodBytes, rmiComm->methodLen, false);
+                       int currObjId = rmiComm->getObjectId(rmiComm->methodBytes);
+                       auto search = rmiComm->mapSkeletonId.find(currObjId);
+                       bool* methRecv = search->second;
+                       didGetMethodBytes.exchange(false);
+                       *methRecv = true;
+                       while(!didGetMethodBytes);
+               }
+       }
+}
+
+
+void IoTRMIComm::wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm) {
+
+       int retLen = 0;
+       //cout << "Starting wakeUpThreadOnReturnValue()" << endl;
+       while(true) {
+               // Convert back to char*
+               char* queueHead = rmiComm->returnQueue.deQAndGetLength(&retLen);
+               if (queueHead != NULL) {
+                       rmiComm->retValueBytes = queueHead;
+                       rmiComm->retValueLen = retLen;
+                       //IoTRMIUtil::printBytes(rmiComm->retValueBytes, rmiComm->retValueLen, false);
+                       int objectId = rmiComm->getObjectId(rmiComm->retValueBytes);
+                       int methodId = rmiComm->getMethodId(rmiComm->retValueBytes);
+                       string strKey = to_string(objectId) + "-" + to_string(methodId);
+                       auto search = rmiComm->mapStubId.find(strKey);
+                       bool* retRecv = search->second;
+                       didGetReturnBytes.exchange(false);
+                       *retRecv = true;
+                       while(!didGetReturnBytes);
+               }
+       }
+}
+
+
+// registerSkeleton() registers the skeleton to be woken up
+void IoTRMIComm::registerSkeleton(int objectId, bool* methodReceived) {
+
+       lock_guard<mutex> guard(regSkelMutex);
+       mapSkeletonId.insert(make_pair(objectId, methodReceived));
+}
+
+
+// registerStub() registers the skeleton to be woken up
+void IoTRMIComm::registerStub(int objectId, int methodId, bool* retValueReceived) {
+
+       lock_guard<mutex> guard(regStubMutex);
+       string strKey = to_string(objectId) + "-" + to_string(methodId);
+       mapStubId.insert(make_pair(strKey, retValueReceived));
+}
+
+
+// getObjectIdCounter() gets object Id counter
+int    IoTRMIComm::getObjectIdCounter() {
+
+       return objectIdCounter;
+}
+
+
+// setObjectIdCounter() sets object Id counter
+void IoTRMIComm::setObjectIdCounter(int objIdCounter) {
+
+       objectIdCounter = objIdCounter;
+}
+
+
+// decrementObjectIdCounter() gets object Id counter
+void IoTRMIComm::decrementObjectIdCounter() {
+
+       objectIdCounter--;
+}
+
+
+// Get method bytes from the socket
+char* IoTRMIComm::getMethodBytes() {
+
+       // Get method bytes
+       return methodBytes;
+}
+
+
+// Get method length from the socket
+int IoTRMIComm::getMethodLength() {
+
+       // Get method bytes
+       return methodLen;
+}
+
+
+// Get object Id from bytes
+int IoTRMIComm::getObjectIdFromMethod() {
+
+       char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN];
+       memcpy(objectIdBytes, methodBytes, IoTRMIUtil::OBJECT_ID_LEN);
+       // Get method signature 
+       int objectId = 0;
+       IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes);
+       
+       return objectId;
+}
+
+
+// Get object Id from bytes (static version)
+int IoTRMIComm::getObjectId(char* packetBytes) {
+
+       char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN];
+       memcpy(objectIdBytes, packetBytes, IoTRMIUtil::OBJECT_ID_LEN);
+       // Get method signature 
+       int objectId = 0;
+       IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes);
+       
+       return objectId;
+}
+
+
+// Get methodId from bytes (static version)
+int IoTRMIComm::getMethodId(char* packetBytes) {
+
+       // Get method Id
+       char methodIdBytes[IoTRMIUtil::METHOD_ID_LEN];
+       int offset = IoTRMIUtil::OBJECT_ID_LEN;
+       memcpy(methodIdBytes, packetBytes + offset, IoTRMIUtil::METHOD_ID_LEN);
+       // Get method signature 
+       int methodId = 0;
+       IoTRMIUtil::byteArrayToInt(&methodId, methodIdBytes);
+       
+       return methodId;
+}
+
+
+// Get methodId from bytes (static version)
+int IoTRMIComm::getPacketType(char* packetBytes) {
+
+       // Get method Id
+       char packetTypeBytes[IoTRMIUtil::METHOD_ID_LEN];
+       int offset = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+       memcpy(packetTypeBytes, packetBytes + offset, IoTRMIUtil::PACKET_TYPE_LEN);
+       // Get method signature 
+       int packetType = 0;
+       IoTRMIUtil::byteArrayToInt(&packetType, packetTypeBytes);
+       
+       return packetType;
+}
+
+
+// Get method parameters and return an array of parameter objects
+//
+// For primitive objects:
+// | 32-bit method ID | m-bit actual data (fixed length)  |
+// 
+// For string, arrays, and non-primitive objects:
+// | 32-bit method ID | 32-bit length | n-bit actual data | ...
+void** IoTRMIComm::getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes) {
+
+       // Byte scanning position
+       int pos = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+       for (int i = 0; i < numParam; i++) {
+               int paramLen = rmiUtil->getTypeSize(paramCls[i]);
+               // Get the 32-bit field in the byte array to get the actual
+               //              length (this is a param with indefinite length)
+               if (paramLen == -1) {
+                       char bytPrmLen[IoTRMIUtil::PARAM_LEN];
+                       memcpy(bytPrmLen, methodBytes + pos, IoTRMIUtil::PARAM_LEN);
+                       pos = pos + IoTRMIUtil::PARAM_LEN;
+                       int* prmLenPtr = IoTRMIUtil::byteArrayToInt(&paramLen, bytPrmLen);
+                       paramLen = *prmLenPtr;
+               }
+               char paramBytes[paramLen];
+               memcpy(paramBytes, methodBytes + pos, paramLen);
+               pos = pos + paramLen;
+               paramObj[i] = IoTRMIUtil::getParamObject(paramObj[i], paramCls[i].c_str(), paramBytes, paramLen);
+       }
+
+       return paramObj;
+}
+
+
+// Find the bytes length of a return object (struct that has more than 1 member)
+int    IoTRMIComm::returnLength(void* retObj[], string retCls[], int numRet) {
+
+       // Get byte arrays and calculate return bytes length
+       int returnLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+       for (int i = 0; i < numRet; i++) {
+               // Find the return length
+               int retObjLen = rmiUtil->getTypeSize(retCls[i]);
+               if (retObjLen == -1) { // Store the length of the field - indefinite length
+                       retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]);
+                       // Some space for return length, i.e. 32 bits for integer               
+                       returnLen = returnLen + IoTRMIUtil::RETURN_LEN;
+               }
+               // Calculate returnLen
+               returnLen = returnLen + retObjLen;
+       }
+
+       return returnLen;
+}
+
+
+// Convert return object (struct members) into bytes
+char* IoTRMIComm::returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet) {
+
+       int pos = 0;
+       // Get byte arrays and calculate return bytes length
+       for (int i = 0; i < numRet; i++) {
+               // Find the return length
+               int retObjLen = rmiUtil->getTypeSize(retCls[i]);
+               if (retObjLen == -1) { // Store the length of the field - indefinite length
+                       retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]);
+                       // Write the return length
+                       char retLenBytes[IoTRMIUtil::RETURN_LEN];
+                       IoTRMIUtil::intToByteArray(retObjLen, retLenBytes);
+                       memcpy(retBytes + pos, retLenBytes, IoTRMIUtil::RETURN_LEN);                    
+                       pos = pos + IoTRMIUtil::RETURN_LEN;
+               }
+               // Get array of bytes and put it in the array of array of bytes
+               char objBytes[retObjLen];
+               IoTRMIUtil::getObjectBytes(objBytes, retObj[i], retCls[i].c_str());
+               memcpy(retBytes + pos, objBytes, retObjLen);
+               pos = pos + retObjLen;
+       }
+
+       return retBytes;
+}
+
+
+// Get return value for single values (non-structs)
+void* IoTRMIComm::getReturnValue(string retType, void* retObj) {
+
+       // Receive return value and return it to caller
+       lock_guard<mutex> guard(retValMutex);
+       // Copy just the actual return value bytes
+       int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+       int retActualLen = retValueLen - headerLen;
+       //char *retActualBytes = new char[retActualLen];
+       char retActualBytes[retActualLen];
+       memcpy(retActualBytes, retValueBytes + headerLen, retActualLen);
+       //IoTRMIUtil::printBytes(retActualBytes, retActualLen, false);
+       retObj = IoTRMIUtil::getParamObject(retObj, retType.c_str(), retActualBytes, retActualLen);
+       // Delete received bytes object
+       delete[] retValueBytes;
+       //delete[] retActualBytes;
+       
+       return retObj;
+}
+
+
+// Get a set of return objects (struct)
+void** IoTRMIComm::getStructObjects(string retType[], int numRet, void* retObj[]) {
+
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(retValMutex);
+       // Copy just the actual return value bytes
+       int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+       int retActualLen = retValueLen - headerLen;
+       char retActualBytes[retActualLen];
+       memcpy(retActualBytes, retValueBytes + headerLen, retActualLen);
+       // Return size of array of struct
+       retObj = getReturnObjects(retActualBytes, retType, numRet, retObj);
+       // Delete received bytes object
+       delete[] retValueBytes;
+       
+       return retObj;
+}
+
+
+// Find the bytes length of a method
+int IoTRMIComm::methodLength(string paramCls[], void* paramObj[], int numParam) {
+
+       // Get byte arrays and calculate method bytes length
+       // Start from the object Id + method Id...
+       int methodLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+       for (int i = 0; i < numParam; i++) {
+               // Find the parameter length
+               int paramLen = rmiUtil->getTypeSize(paramCls[i]);
+               if (paramLen == -1) { // Store the length of the field - indefinite length
+                       paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]);
+                       // Some space for param length, i.e. 32 bits for integer                
+                       methodLen = methodLen + IoTRMIUtil::PARAM_LEN;
+               }
+               // Calculate methodLen
+               methodLen = methodLen + paramLen;
+       }
+       return methodLen;
+}
+
+
+// Convert method and its parameters into bytes
+char* IoTRMIComm::methodToBytes(int objectId, int methId, string paramCls[], 
+               void* paramObj[], char* method, int numParam) {
+
+       // Get object Id in bytes
+       char objId[IoTRMIUtil::OBJECT_ID_LEN];
+       IoTRMIUtil::intToByteArray(objectId, objId);
+       memcpy(method, objId, IoTRMIUtil::OBJECT_ID_LEN);
+       int pos = IoTRMIUtil::OBJECT_ID_LEN;
+       // Get method Id in bytes
+       char methodId[IoTRMIUtil::METHOD_ID_LEN];
+       IoTRMIUtil::intToByteArray(methId, methodId);
+       memcpy(method + pos, methodId, IoTRMIUtil::METHOD_ID_LEN);
+       pos = pos + IoTRMIUtil::METHOD_ID_LEN;
+       char packetType[IoTRMIUtil::PACKET_TYPE_LEN];
+       IoTRMIUtil::intToByteArray(IoTRMIUtil::METHOD_TYPE, methodId);
+       memcpy(method + pos, methodId, IoTRMIUtil::PACKET_TYPE_LEN);
+       pos = pos + IoTRMIUtil::PACKET_TYPE_LEN;
+       // Get byte arrays and calculate method bytes length
+       for (int i = 0; i < numParam; i++) {
+               // Find the parameter length
+               int paramLen = rmiUtil->getTypeSize(paramCls[i]);
+               if (paramLen == -1) { // Store the length of the field - indefinite length
+                       paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]);
+                       // Write the parameter length
+                       char prmLenBytes[IoTRMIUtil::PARAM_LEN];
+                       IoTRMIUtil::intToByteArray(paramLen, prmLenBytes);
+                       memcpy(method + pos, prmLenBytes, IoTRMIUtil::PARAM_LEN);                       
+                       pos = pos + IoTRMIUtil::PARAM_LEN;
+               }
+               // Get array of bytes and put it in the array of array of bytes
+               char objBytes[paramLen];
+               IoTRMIUtil::getObjectBytes(objBytes, paramObj[i], paramCls[i].c_str());
+               memcpy(method + pos, objBytes, paramLen);
+               pos = pos + paramLen;
+       }
+
+       return method;
+}
+
+
+// Get return objects for structs
+void** IoTRMIComm::getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]) {
+
+       // Byte scanning position
+       int pos = 0;
+       for (int i = 0; i < numRet; i++) {
+               int retLen = rmiUtil->getTypeSize(retCls[i]);
+               // Get the 32-bit field in the byte array to get the actual
+               //              length (this is a param with indefinite length)
+               if (retLen == -1) {
+                       char bytRetLen[IoTRMIUtil::RETURN_LEN];
+                       memcpy(bytRetLen, retBytes + pos, IoTRMIUtil::RETURN_LEN);
+                       pos = pos + IoTRMIUtil::RETURN_LEN;
+                       int* retLenPtr = IoTRMIUtil::byteArrayToInt(&retLen, bytRetLen);
+                       retLen = *retLenPtr;
+               }
+               char retObjBytes[retLen];
+               memcpy(retObjBytes, retBytes + pos, retLen);
+               pos = pos + retLen;
+               retObj[i] = IoTRMIUtil::getParamObject(retObj[i], retCls[i].c_str(), retObjBytes, retLen);
+       }
+
+       return retObj;
+}
+#endif
+
+
diff --git a/iotjava/iotrmi/C++/IoTRMICommClient.hpp b/iotjava/iotrmi/C++/IoTRMICommClient.hpp
new file mode 100644 (file)
index 0000000..0328059
--- /dev/null
@@ -0,0 +1,175 @@
+/** Class IoTRMICommClient implements the client side
+ *  of IoTRMIComm class.
+ *
+ * @author      Rahmadi Trimananda <rtrimana @ uci.edu>
+ * @version     1.0
+ * @since       2017-01-28
+ */
+#ifndef _IOTRMICOMMCLIENT_HPP__
+#define _IOTRMICOMMCLIENT_HPP__
+
+#include <iostream>
+#include <string>
+#include <atomic>
+#include <limits>
+#include <thread>
+#include <mutex>
+
+#include "IoTRMIComm.hpp"
+
+using namespace std;
+
+mutex clientRemoteCallMutex;
+mutex clientSendReturnObjMutex;
+
+class IoTRMICommClient : public IoTRMIComm {
+       public:
+               IoTRMICommClient(int _portSend, int _portRecv, const char* _address, int _rev, bool* _bResult);
+               ~IoTRMICommClient();
+               // Public methods
+               void                            sendReturnObj(void* retObj, string type, char* methodBytes);
+               void                            sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes);
+               void                            remoteCall(int objectId, int methodId, string paramCls[], void* paramObj[], int numParam);
+               //void                          waitForPackets();
+               //void                          waitForPackets(IoTRMICommClient* rmiComm);
+
+       private:
+               IoTSocketClient         *rmiClientSend;
+               IoTSocketClient         *rmiClientRecv;
+
+               // Private methods
+               void                            waitForPackets(IoTRMICommClient* rmiComm);
+};
+
+
+// Constructor
+IoTRMICommClient::IoTRMICommClient(int _portSend, int _portRecv, const char* _address, int _rev, bool* _bResult) : IoTRMIComm() {
+
+       rmiClientRecv = new IoTSocketClient(_portSend, _address, _rev, _bResult);
+       rmiClientSend = new IoTSocketClient(_portRecv, _address, _rev, _bResult);
+       thread th1 (&IoTRMICommClient::waitForPackets, this, this);
+       th1.detach();
+
+}
+
+
+// Destructor
+IoTRMICommClient::~IoTRMICommClient() {
+
+       // Clean up
+       if (rmiClientRecv != NULL) {    
+               delete rmiClientRecv;
+               rmiClientRecv = NULL;           
+       }
+       if (rmiClientSend != NULL) {    
+               delete rmiClientSend;
+               rmiClientSend = NULL;           
+       }
+}
+
+
+void IoTRMICommClient::waitForPackets(IoTRMICommClient* rmiComm) {
+
+       char* packetBytes = NULL;
+       int packetLen = 0;
+       while(true) {
+               fflush(NULL);
+               packetBytes = rmiClientRecv->receiveBytes(packetBytes, &packetLen);
+               fflush(NULL);
+               if (packetBytes != NULL) { // If there is method bytes
+                       //IoTRMIUtil::printBytes(packetBytes, packetLen, false);
+                       //packetBytesPtr = &packetBytes;
+                       int packetType = getPacketType(packetBytes);
+                       if (packetType == IoTRMIUtil::METHOD_TYPE) {
+                               rmiComm->methodQueue.enqueue(packetBytes, packetLen);
+                       } else if (packetType == IoTRMIUtil::RET_VAL_TYPE) {
+                               rmiComm->returnQueue.enqueue(packetBytes, packetLen);
+                       } else {
+                               // TODO: We need to log error message when we come to running this using IoTSlave
+                               // TODO: Beware that using "cout" in the process will kill it (as IoTSlave is loaded in Sentinel)
+                               cerr << "IoTRMICommClient: Packet type is unknown: " << packetType << endl;
+                               exit(1);
+                       }
+               }
+               packetBytes = NULL;
+               packetLen = 0;
+       }
+}
+
+
+// Send return values in bytes to the caller
+void IoTRMICommClient::sendReturnObj(void* retObj, string type, char* methodBytes) {
+
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(sendReturnObjMutex);
+       // Find the length of return object in bytes
+       int retLen = rmiUtil->getTypeSize(type);
+       if (retLen == -1) {
+               retLen = rmiUtil->getVarTypeSize(type, retObj);
+       }
+       // Copy the header and object bytes
+       int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+       int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN;
+       char retAllObjBytes[headerLen+retLen];
+       // Copy object and method Id first
+       memcpy(retAllObjBytes, methodBytes, objAndMethIdLen);
+       // Copy objectId + methodId + packet type in bytes
+       char packType[IoTRMIUtil::PACKET_TYPE_LEN];
+       IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType);
+       memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN);
+       // Copy object into byte array
+       char retObjBytes[retLen];
+       IoTRMIUtil::getObjectBytes(retObjBytes, retObj, type.c_str());
+       memcpy(retAllObjBytes + headerLen, retObjBytes, retLen);
+       fflush(NULL);
+       rmiClientSend->sendBytes(retAllObjBytes, headerLen+retLen);
+       fflush(NULL);
+}
+
+
+// Send return values in bytes to the caller (for more than one object - struct)
+void IoTRMICommClient::sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) {
+
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(sendReturnObjMutex);
+       // Find the length of return object in bytes
+       int retLen = returnLength(retObj, type, numRet);
+       // Copy the header and object bytes
+       int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+       int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN;
+       char retAllObjBytes[headerLen+retLen];
+       // Copy object and method Id first
+       memcpy(retAllObjBytes, methodBytes, objAndMethIdLen);
+       // Copy objectId + methodId + packet type in bytes
+       char packType[IoTRMIUtil::PACKET_TYPE_LEN];
+       IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType);
+       memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN);
+       // Copy object into byte array
+       char retObjBytes[retLen];
+       returnToBytes(retObj, type, retObjBytes, numRet);
+       memcpy(retAllObjBytes + headerLen, retObjBytes, retLen);
+       fflush(NULL);
+       rmiClientSend->sendBytes(retAllObjBytes, headerLen+retLen);
+       fflush(NULL);
+}
+
+
+// Calls a method remotely by passing in parameters and getting a return object
+void IoTRMICommClient::remoteCall(int objectId, int methodId, string paramCls[], 
+               void* paramObj[], int numParam) {
+
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(remoteCallMutex);
+       // Send input parameters
+       int len = methodLength(paramCls, paramObj, numParam);
+       char method[len];
+       methodToBytes(objectId, methodId, paramCls, paramObj, method, numParam);
+       // Send bytes
+       fflush(NULL);
+       rmiClientSend->sendBytes(method, len);
+       fflush(NULL);
+
+}
+#endif
+
+
diff --git a/iotjava/iotrmi/C++/IoTRMICommServer.hpp b/iotjava/iotrmi/C++/IoTRMICommServer.hpp
new file mode 100644 (file)
index 0000000..18f59c2
--- /dev/null
@@ -0,0 +1,193 @@
+/** Class IoTRMICommServer implements the server side
+ *  of IoTRMIComm class.
+ *
+ * @author      Rahmadi Trimananda <rtrimana @ uci.edu>
+ * @version     1.0
+ * @since       2017-01-28
+ */
+#ifndef _IOTRMICOMMSERVER_HPP__
+#define _IOTRMICOMMSERVER_HPP__
+
+#include <iostream>
+#include <string>
+#include <atomic>
+#include <limits>
+#include <thread>
+#include <mutex>
+
+#include "IoTRMIComm.hpp"
+
+using namespace std;
+
+
+class IoTRMICommServer : public IoTRMIComm {
+       public:
+               IoTRMICommServer(int _portSend, int _portRecv, bool* _bResult);
+               ~IoTRMICommServer();
+               // Public methods
+               void            sendReturnObj(void* retObj, string type, char* methodBytes);
+               void            sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes);
+               void            remoteCall(int objectId, int methodId, string paramCls[], void* paramObj[], int numParam);
+
+       private:
+               IoTSocketServer         *rmiServerSend;
+               IoTSocketServer         *rmiServerRecv;
+
+               // Private methods
+               void            waitForConnectionOnServerRecv();
+               void            waitForConnectionOnServerSend();
+               void            waitForPackets(IoTRMICommServer* rmiComm);
+};
+
+
+// Constructor
+IoTRMICommServer::IoTRMICommServer(int _portSend, int _portRecv, bool* _bResult) : IoTRMIComm() {
+
+       rmiServerSend = new IoTSocketServer(_portSend, _bResult);
+       rmiServerRecv = new IoTSocketServer(_portRecv, _bResult);
+       thread th1 (&IoTRMICommServer::waitForConnectionOnServerSend, this);
+       thread th2 (&IoTRMICommServer::waitForConnectionOnServerRecv, this);
+       th1.join();
+       th2.join();
+       thread th3 (&IoTRMICommServer::waitForPackets, this, this);
+       th3.detach();
+}
+
+
+// Destructor
+IoTRMICommServer::~IoTRMICommServer() {
+
+       // Clean up
+       if (rmiServerSend != NULL) {    
+               delete rmiServerSend;
+               rmiServerSend = NULL;           
+       }
+       if (rmiServerRecv != NULL) {    
+               delete rmiServerRecv;
+               rmiServerRecv = NULL;           
+       }
+}
+
+
+void IoTRMICommServer::waitForConnectionOnServerRecv() {
+
+       cout << "Wait on connection ServerRecv!" << endl;
+       rmiServerRecv->connect();
+       cout << "Connected on connection ServerRecv!" << endl;
+}
+
+
+void IoTRMICommServer::waitForConnectionOnServerSend() {
+
+       cout << "Wait on connection ServerSend!" << endl;
+       rmiServerSend->connect();
+       cout << "Connected on connection ServerSend!" << endl;
+}
+
+
+void IoTRMICommServer::waitForPackets(IoTRMICommServer* rmiComm) {
+
+       char* packetBytes = NULL;
+       int packetLen = 0;
+       //cout << "Starting waitForPacketsOnServer()" << endl;
+       while(true) {
+               fflush(NULL);
+               packetBytes = rmiComm->rmiServerRecv->receiveBytes(packetBytes, &packetLen);
+               fflush(NULL);
+               if (packetBytes != NULL) { // If there is method bytes
+                       //IoTRMIUtil::printBytes(packetBytes, packetLen, false);
+                       int packetType = IoTRMIComm::getPacketType(packetBytes);
+                       if (packetType == IoTRMIUtil::METHOD_TYPE) {
+                               rmiComm->methodQueue.enqueue(packetBytes, packetLen);
+                       } else if (packetType == IoTRMIUtil::RET_VAL_TYPE) {
+                               rmiComm->returnQueue.enqueue(packetBytes, packetLen);
+                       } else {
+                               // TODO: We need to log error message when we come to running this using IoTSlave
+                               // TODO: Beware that using "cout" in the process will kill it (as IoTSlave is loaded in Sentinel)
+                               cerr << "IoTRMICommServer: Packet type is unknown: " << packetType << endl;
+                               exit(1);
+                       }
+               }
+               packetBytes = NULL;
+               packetLen = 0;
+       }
+}
+
+
+// Send return values in bytes to the caller
+void IoTRMICommServer::sendReturnObj(void* retObj, string type, char* methodBytes) {
+
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(sendReturnObjMutex);
+       // Find the length of return object in bytes
+       int retLen = rmiUtil->getTypeSize(type);
+       if (retLen == -1) {
+               retLen = rmiUtil->getVarTypeSize(type, retObj);
+       }
+       // Copy the header and object bytes
+       int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+       int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN;
+       char retAllObjBytes[headerLen+retLen];
+       // Copy object and method Id first
+       memcpy(retAllObjBytes, methodBytes, objAndMethIdLen);
+       // Copy objectId + methodId + packet type in bytes
+       char packType[IoTRMIUtil::PACKET_TYPE_LEN];
+       IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType);
+       memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN);
+       // Copy object into byte array
+       char retObjBytes[retLen];
+       IoTRMIUtil::getObjectBytes(retObjBytes, retObj, type.c_str());
+       memcpy(retAllObjBytes + headerLen, retObjBytes, retLen);
+       fflush(NULL);
+       IoTRMIUtil::printBytes(retAllObjBytes, headerLen+retLen, false);
+       rmiServerSend->sendBytes(retAllObjBytes, headerLen+retLen);
+       fflush(NULL);
+}
+
+
+// Send return values in bytes to the caller (for more than one object - struct)
+void IoTRMICommServer::sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) {
+
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(sendReturnObjMutex);
+       // Find the length of return object in bytes
+       int retLen = returnLength(retObj, type, numRet);
+       // Copy the header and object bytes
+       int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+       int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN;
+       char retAllObjBytes[headerLen+retLen];
+       // Copy object and method Id first
+       memcpy(retAllObjBytes, methodBytes, objAndMethIdLen);
+       // Copy objectId + methodId + packet type in bytes
+       char packType[IoTRMIUtil::PACKET_TYPE_LEN];
+       IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType);
+       memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN);
+       // Copy object into byte array
+       char retObjBytes[retLen];
+       returnToBytes(retObj, type, retObjBytes, numRet);
+       memcpy(retAllObjBytes + headerLen, retObjBytes, retLen);
+       fflush(NULL);
+       rmiServerSend->sendBytes(retAllObjBytes, headerLen+retLen);
+       fflush(NULL);
+}
+
+
+// Calls a method remotely by passing in parameters and getting a return object
+void IoTRMICommServer::remoteCall(int objectId, int methodId, string paramCls[], 
+               void* paramObj[], int numParam) {
+
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(remoteCallMutex);
+       // Send input parameters
+       int len = methodLength(paramCls, paramObj, numParam);
+       char method[len];
+       methodToBytes(objectId, methodId, paramCls, paramObj, method, numParam);
+       // Send bytes
+       fflush(NULL);
+       rmiServerSend->sendBytes(method, len);
+       fflush(NULL);
+
+}
+#endif
+
+
index 2c3be17cfc9aab7372d1b647cf175257cf3681f1..74f6d320dee46a7d22da6b0c97b1f77831f8e491 100644 (file)
@@ -16,6 +16,7 @@
 #include <cstdlib>
 #include <memory>
 #include <typeinfo>
+#include <map>
 
 #include <iostream>
 #include <string>
@@ -97,11 +98,19 @@ class IoTRMIUtil {
                // Constants
                const static int        OBJECT_ID_LEN = 4;      // 4 bytes = 32 bits
                const static int        METHOD_ID_LEN = 4;      // 4 bytes = 32 bits
+               const static int        PACKET_TYPE_LEN = 4;// 4 bytes = 32 bits
                const static int        PARAM_LEN = 4;          // 4 bytes = 32 bits (4-byte field that stores the length of the param)
                const static int        RETURN_LEN = 4;         // 4 bytes = 32 bits (4-byte field that stores the length of the return object)
                const static int        CHAR_LEN = 2;           // 2 bytes (we follow Java convention)
                const static int        BYTE_LEN = 1;           // 1 byte
                const static int        BOOL_LEN = 1;           // 1 byte
+               const static int        METHOD_TYPE = 1;        // Packet type of method
+               const static int        RET_VAL_TYPE = -1;      // Packet type of return value
+
+               // Static containers
+               static map<int,void*>* mapStub;         // Map object to its stub ID
+               static map<void*,void*>* mapSkel;       // Map object to its skeleton
+               static map<void*,int>* mapSkelId;       // Map object to its skeleton ID
                
        private:
                map<string,string>      mapPrimitives;
@@ -109,6 +118,10 @@ class IoTRMIUtil {
                map<string,string>      mapNonPrimitives;
 };
 
+map<int,void*>* IoTRMIUtil::mapStub = new map<int,void*>();
+map<void*,void*>* IoTRMIUtil::mapSkel = new map<void*,void*>();
+map<void*,int>* IoTRMIUtil::mapSkelId = new map<void*,int>();
+
 
 // Constructor
 IoTRMIUtil::IoTRMIUtil() {
index 3bbf74fa6f4ab2b963a8b8a661c8b212d5c56fdd..52907ba0f6dc095b6d5a3dfeb947d1059d012b0a 100644 (file)
@@ -40,6 +40,10 @@ static const int MSG_LEN_SIZE = 4;
 #define SD_SEND         0x01
 #define SD_BOTH         0x02
 
+mutex sendBytesMutex;
+mutex recvBytesMutex;
+mutex sendAckMutex;
+mutex recvAckMutex;
 
 class IoTSocket {
        public:
@@ -92,6 +96,9 @@ IoTSocket::~IoTSocket() {
 // Send bytes over the wire
 bool IoTSocket::sendBytes(char* pVals, int iLen) {
 
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(sendBytesMutex);
+
        int i = 0;
        char size[MSG_LEN_SIZE];
        // Convert int to byte array and fix endianness
@@ -102,10 +109,13 @@ bool IoTSocket::sendBytes(char* pVals, int iLen) {
                return false;
        }
 
+       IoTRMIUtil::printBytes(size, 4, false);
+
        if (send(m_iSock, (char *) pVals, iLen, 0) == -1) {
                perror("IoTSocket: Send bytes error!");
                return false;
        }
+
 #ifdef DEBUG_ACK
        if (!receiveAck())
                return false;
@@ -121,6 +131,9 @@ bool IoTSocket::sendBytes(char* pVals, int iLen) {
 // Generate an array of char on the heap and return it
 char* IoTSocket::receiveBytes(char* pVals, int* len)
 {
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(recvBytesMutex);
+
        int                     i                               = 0;
        int                     j                               = 0;
        char*           pTemp                   = NULL;
@@ -162,6 +175,7 @@ char* IoTSocket::receiveBytes(char* pVals, int* len)
                if (iTotalBytes == iLen)
                        bEnd = true;
        }
+
 #ifdef DEBUG_ACK
        if (!sendAck())
                return NULL;
@@ -187,6 +201,8 @@ bool IoTSocket::close()
 // Receive a short ack from the client 
 bool IoTSocket::receiveAck()
 {
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(recvAckMutex);
        char temp[1];
        int iTotal = 0;
        int iResult = 0;
@@ -208,6 +224,8 @@ bool IoTSocket::receiveAck()
 // Send a short ack to the client 
 bool IoTSocket::sendAck()
 {
+       // Critical section that is used by different objects
+       lock_guard<mutex> guard(sendAckMutex);
        char temp[1];
        temp[0] = 42;
 
index 3aa71ae54428f6c58e13534cff695e4db8b5365b..7b7c8a183589efad64c265b374dd2d15e570f200 100644 (file)
@@ -39,7 +39,7 @@ void CallBack::setInt(int _i) {
 
 void CallBack::needCallback(TestClassComplete* tc) {
 
-       cout << "Short from TestClass: " << tc->getShort(1234);
+       cout << endl << "Short from TestClass: " << tc->getShort(1234) << endl << endl;
 }
 
 #endif
index e4918b58209319c26703bd3aae0ebaf6dac56811..ca963490351ab0ad57adc281a7f45d58588ebda8 100644 (file)
@@ -105,13 +105,6 @@ TestClass::TestClass(int _int, float _float, string _string) {
 }
 
 
-void TestClass::registerCallback(CallBackInterfaceWithCallBack* _cb) {
-
-       cbvec.push_back(_cb);
-       cout << "Registering callback object!" << endl;
-}
-
-
 void TestClass::registerCallbackArray(vector<CallBackInterfaceWithCallBack*> _cb) {
 
        for (CallBackInterfaceWithCallBack* cb : _cb) {
@@ -142,13 +135,26 @@ void TestClass::registerCallbackComplex(int in, vector<CallBackInterfaceWithCall
 }
 
 
+void TestClass::registerCallback(CallBackInterfaceWithCallBack* _cb) {
+
+       cbvec.push_back(_cb);
+       cout << "Registering callback object!" << endl;
+}
+
+
 int TestClass::callBack() {
 
        int sum = 0;
        for (CallBackInterfaceWithCallBack* cb : cbvec) {
-               //cb->needCallback(this);
+               //cout << "Sum: " << sum << endl;
                sum = sum + cb->printInt();
+               cb->needCallback(this);
+               //cb->needCallback(this);
+               TestClass* tc = new TestClass();
+               cb->needCallback(tc);
+               //cout << "Sum after: " << sum << endl;
        }
+       cout << "About to return sum: " << sum << endl;
 
        return sum;
 }
index 4003d89d07df99f9ae6e3bfd3768d39fcaec0123..e524f4dc16924c9077b98fd1e251ce13fe768e82 100644 (file)
@@ -8,34 +8,24 @@
 using namespace std;
 
 
-TestClassComplete_Stub::TestClassComplete_Stub(int _port, const char* _skeletonAddress, string _callbackAddress, int _rev, bool* _bResult, vector<int> _ports) {
-       callbackAddress = _callbackAddress;
-       ports = _ports;
-       rmiCall = new IoTRMICall(_port, _skeletonAddress, _rev, _bResult);
-       set0Allowed.insert(-9998);
-       //thread th1 (&TestClassComplete_Stub::___initCallBack, this);
-       //th1.detach();
-       ___regCB();
+TestClassComplete_Stub::TestClassComplete_Stub(int _portSend, int _portRecv, const char* _skeletonAddress, int _rev, bool* _bResult) {
+       rmiComm = new IoTRMICommClient(_portSend, _portRecv, _skeletonAddress, _rev, _bResult);
+       rmiComm->registerStub(objectId, 0, &retValueReceived0);
+       rmiComm->registerStub(objectId, 2, &retValueReceived2);
+       IoTRMIUtil::mapStub->insert(make_pair(objectId, this));
 }
 
-TestClassComplete_Stub::TestClassComplete_Stub(IoTRMICall* _rmiCall, string _callbackAddress, int _objIdCnt, vector<int> _ports) {
-       callbackAddress = _callbackAddress;
-       rmiCall = _rmiCall;
-       objIdCnt = _objIdCnt;
-       set0Allowed.insert(-9998);
-       //thread th1 (&TestClassComplete_Stub::___initCallBack, this);
-       //th1.detach();
-       ___regCB();
+TestClassComplete_Stub::TestClassComplete_Stub(IoTRMIComm* _rmiComm, int _objectId) {
+       rmiComm = _rmiComm;
+       objectId = _objectId;
+       rmiComm->registerStub(objectId, 0, &retValueReceived0);
+       rmiComm->registerStub(objectId, 2, &retValueReceived2);
 }
 
 TestClassComplete_Stub::~TestClassComplete_Stub() {
-       if (rmiCall != NULL) {
-               delete rmiCall;
-               rmiCall = NULL;
-       }
-       if (rmiObj != NULL) {
-               delete rmiObj;
-               rmiObj = NULL;
+       if (rmiComm != NULL) {
+               delete rmiComm;
+               rmiComm = NULL;
        }
        for(CallBackInterface* cb : vecCallbackObj) {
                delete cb;
@@ -43,32 +33,45 @@ TestClassComplete_Stub::~TestClassComplete_Stub() {
        }
 }
 
+mutex mtxMethodExec1;  // TODO: We probably need to correlate this always with class name, e.g. methodExecCallBackInterfaceWithCallBack
 void TestClassComplete_Stub::registerCallback(CallBackInterface* _cb) { 
-       //CallBackInterface_CallbackSkeleton* skel0 = new CallBackInterface_CallbackSkeleton(_cb, callbackAddress, objIdCnt++);
-       CallBackInterface_Skeleton* skel0 = new CallBackInterface_Skeleton(_cb, callbackAddress, objIdCnt++);
-       vecCallbackObj.push_back(skel0);
-       int ___paramCB0 = 1;
+       lock_guard<mutex> guard(mtxMethodExec1);
+       int objIdSent = 0;
+       auto it = IoTRMIUtil::mapSkel->find(_cb);
+       if (it == IoTRMIUtil::mapSkel->end()) { // Not in the map, so new object
+               objIdSent = rmiComm->getObjectIdCounter();
+               rmiComm->decrementObjectIdCounter();
+               CallBackInterface_Skeleton* skel0 = new CallBackInterface_Skeleton(_cb, rmiComm, objIdSent);
+               vecCallbackObj.push_back(skel0);
+               IoTRMIUtil::mapSkel->insert(make_pair(_cb, skel0));
+               IoTRMIUtil::mapSkelId->insert(make_pair(_cb, objIdSent));
+               cout << "Create new skeleton for TestClass! ID=" << objIdSent << endl;
+               //thread th0 (&CallBackInterface_Skeleton::___waitRequestInvokeMethod, std::ref(skel0));
+               thread th0 (&CallBackInterface_Skeleton::___waitRequestInvokeMethod, std::ref(skel0), std::ref(skel0));  
+               th0.detach();
+               //while(!didAlreadyInitWaitInvoke);
+               while(!skel0->didInitWaitInvoke());
+       } else {
+               auto itId = IoTRMIUtil::mapSkelId->find(_cb);
+               objIdSent = itId->second;
+               cout << "Skeleton exists for TestClass! ID=" << objIdSent << endl;
+       }
+
+       //int ___paramCB0 = 1;
+       int ___paramCB0 = objIdSent;
        int methodId = 1;
        string retType = "void";
        int numParam = 1;
        string paramCls[] = { "int" };
        void* paramObj[] = { &___paramCB0 };
        void* retObj = NULL;
-       rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj);
-}
-
-void TestClassComplete_Stub::___regCB() {
-       int numParam = 3;
-       int methodId = -9999;
-       string retType = "void";
-       string paramCls[] = { "int*", "String", "int" };
-       int rev = 0;
-       void* paramObj[] = { &ports, &callbackAddress, &rev };
-       void* retObj = NULL;
-       rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj);
+       rmiComm->remoteCall(objectId, methodId, paramCls, paramObj, numParam);
 }
 
-short TestClassComplete_Stub::getShort(short in) { 
+mutex mtxMethodExec0;  // TODO: We probably need to correlate this always with class name, e.g. methodExecCallBackInterfaceWithCallBack
+short TestClassComplete_Stub::getShort(short in) {
+       lock_guard<mutex> guard(mtxMethodExec0);
+       cout << "getShort() is called!!!" << endl << endl;
        int methodId = 0;
        string retType = "short";
        int numParam = 1;
@@ -76,11 +79,23 @@ short TestClassComplete_Stub::getShort(short in) {
        void* paramObj[] = { &in };
        short retVal = 0;
        void* retObj = &retVal;
-       rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj);
+       cout << "Calling remote call!" << endl;
+       rmiComm->remoteCall(objectId, methodId, paramCls, paramObj, numParam);
+       cout << "Finished calling remote call!" << endl;
+       // Waiting for return value
+       while(!retValueReceived0);
+       rmiComm->getReturnValue(retType, retObj);
+       //retValueReceived0.exchange(false);
+       retValueReceived0 = false;
+       didGetReturnBytes.exchange(true);
+       cout << "Getting return value for getShort(): " << retVal << endl;
+
        return retVal;
 }
 
-int TestClassComplete_Stub::callBack() { 
+mutex mtxMethodExec2;  // TODO: We probably need to correlate this always with class name, e.g. methodExecCallBackInterfaceWithCallBack
+int TestClassComplete_Stub::callBack() {
+       lock_guard<mutex> guard(mtxMethodExec2);
        int methodId = 2;
        string retType = "int";
        int numParam = 0;
@@ -88,7 +103,16 @@ int TestClassComplete_Stub::callBack() {
        void* paramObj[] = {  };
        int retVal = 0;
        void* retObj = &retVal;
-       rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj);
+       rmiComm->remoteCall(objectId, methodId, paramCls, paramObj, numParam);
+       // Waiting for return value
+       while(!retValueReceived2);
+       rmiComm->getReturnValue(retType, retObj);
+       //retValueReceived2.exchange(false);
+       retValueReceived2 = false;
+       didGetReturnBytes.exchange(true);
+
+       cout << "Getting return value for callback(): " << retVal << endl;
+
        return retVal;
 }
 
@@ -96,7 +120,8 @@ int TestClassComplete_Stub::callBack() {
 int main(int argc, char *argv[])
 {
 
-       int port = 5010;
+       int portSend = 5000;
+       int portRecv = 6000;
        const char* address = "localhost";
        //const char* address = "192.168.2.191";        // RPi2
        //const char* skeletonAddress = "128.195.136.170";      // dc-9.calit2.uci.edu
@@ -106,17 +131,29 @@ int main(int argc, char *argv[])
        //const char* callbackAddress = "192.168.2.191";        // RPi2
        int rev = 0;
        bool bResult = false;
-       vector<int> ports;
-       ports.push_back(12345);
-       ports.push_back(22346);
+       //vector<int> ports;
+       //ports.push_back(12345);
+       //ports.push_back(22346);
        //ports.push_back(32344);
        //ports.push_back(43212);
 
-       TestClassComplete *tcStub = new TestClassComplete_Stub(port, skeletonAddress, callbackAddress, rev, &bResult, ports);
+       TestClassComplete *tcStub = new TestClassComplete_Stub(portSend, portRecv, skeletonAddress, rev, &bResult);
+       //cout << "Getting return value from getShort(): " << tcStub->getShort(1234) << endl;
+       //cout << "Getting return value from getShort(): " << tcStub->getShort(4321) << endl;
+       //cout << "Getting return value from getShort(): " << tcStub->getShort(5678) << endl;
        cout << "==== CALLBACK ====" << endl;
        CallBackInterface *cbSingle = new CallBack(2354);
        tcStub->registerCallback(cbSingle);
+       //tcStub->registerCallback(cbSingle);
+       CallBackInterface *cbSingle1 = new CallBack(2646);
+       tcStub->registerCallback(cbSingle1);
+       CallBackInterface *cbSingle2 = new CallBack(2000);
+       tcStub->registerCallback(cbSingle2);
        cout << "Return value from callback: " << tcStub->callBack() << endl;
+       //cout << "Return value from callback: " << tcStub->callBack() << endl;
+
+       // TODO: we need this while loop at the end to keep the threads running
+       while(true);
 
        return 0;
 }
index 0ad75869f1436ca13846c5badae0fca7c810ab0b..65524f76eb52e7152c8ffd7f0ae0dc7abe2e7862 100644 (file)
@@ -7,32 +7,30 @@
 
 using namespace std;
 
-TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, string _callbackAddress, int _port) {
+TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, int _portSend, int _portRecv) {
        bool _bResult = false;
        mainObj = _mainObj;
-       callbackAddress = _callbackAddress;
-       rmiObj = new IoTRMIObject(_port, &_bResult);
-       set0Allowed.insert(-9999);
-       ___waitRequestInvokeMethod();
+       rmiComm = new IoTRMICommServer(_portSend, _portRecv, &_bResult);
+       IoTRMIUtil::mapSkel->insert(make_pair(_mainObj, this));
+       IoTRMIUtil::mapSkelId->insert(make_pair(_mainObj, objectId));
+       rmiComm->registerSkeleton(objectId, &methodReceived);
+       thread th1 (&TestClassInterface_Skeleton::___waitRequestInvokeMethod, this, this);
+//     th1.detach();
+       th1.join();
 }
 
-TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, int _objIdCnt, string _callbackAddress) {
+TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, IoTRMIComm *_rmiComm, int _objectId) {
        bool _bResult = false;
        mainObj = _mainObj;
-       objIdCnt = _objIdCnt;
-       callbackAddress = _callbackAddress;
-       set0Allowed.insert(-9999);
-       ___waitRequestInvokeMethod();
+       rmiComm = _rmiComm;
+       objectId = _objectId;
+       rmiComm->registerSkeleton(objectId, &methodReceived);
 }
 
 TestClassInterface_Skeleton::~TestClassInterface_Skeleton() {
-       if (rmiObj != NULL) {
-               delete rmiObj;
-               rmiObj = NULL;
-       }
-       if (rmiCall != NULL) {
-               delete rmiCall;
-               rmiCall = NULL;
+       if (rmiComm != NULL) {
+               delete rmiComm;
+               rmiComm = NULL;
        }
        for(CallBackInterfaceWithCallBack* cb : vecCallbackObj) {
                delete cb;
@@ -40,6 +38,11 @@ TestClassInterface_Skeleton::~TestClassInterface_Skeleton() {
        }
 }
 
+bool TestClassInterface_Skeleton::didInitWaitInvoke() {
+
+       return didAlreadyInitWaitInvoke;
+}
+
 short TestClassInterface_Skeleton::getShort(short in) {
        return mainObj->getShort(in);
 }
@@ -48,84 +51,112 @@ void TestClassInterface_Skeleton::registerCallback(CallBackInterfaceWithCallBack
        mainObj->registerCallback(_cb);
 }
 
-void TestClassInterface_Skeleton::___regCB() {
-       int numParam = 3;
-       vector<int> param1;
-       string param2 = "";
-       int param3 = 0;
-       string paramCls[] = { "int*", "String", "int" };
-       void* paramObj[] = { &param1, &param2, &param3 };
-       rmiObj->getMethodParams(paramCls, numParam, paramObj);
-       bool bResult = false;
-       rmiCall = new IoTRMICall(param1[1], param2.c_str(), param3, &bResult);
-}
-
 int TestClassInterface_Skeleton::callBack() {
        return mainObj->callBack();
 }
 
-void TestClassInterface_Skeleton::___getShort() {
+void TestClassInterface_Skeleton::___getShort(TestClassInterface_Skeleton* skel) {
+       char* localMethodBytes = new char[methodLen];
+       memcpy(localMethodBytes, skel->methodBytes, methodLen);
+       //cout << "Bytes inside getShort: " << endl;
+       //IoTRMIUtil::printBytes(localMethodBytes, methodLen, false);
+       didGetMethodBytes.exchange(true);
        string paramCls[] = { "short" };
        int numParam = 1;
        short in;
        void* paramObj[] = { &in };
-       rmiObj->getMethodParams(paramCls, numParam, paramObj);
+       skel->rmiComm->getMethodParams(paramCls, numParam, paramObj, localMethodBytes);
        short retVal = getShort(in);
+       cout << "Getting return value getShort(): " << retVal << endl;
        void* retObj = &retVal;
-       rmiObj->sendReturnObj(retObj, "short");
+       skel->rmiComm->sendReturnObj(retObj, "short", localMethodBytes);
+       cout << "Sent return value for getShort()" << endl;
+       delete[] localMethodBytes;
 }
 
-void TestClassInterface_Skeleton::___registerCallback() {
+void TestClassInterface_Skeleton::___registerCallback(TestClassInterface_Skeleton* skel) {
+       char* localMethodBytes = new char[methodLen];
+       memcpy(localMethodBytes, skel->methodBytes, methodLen);
+       didGetMethodBytes.exchange(true);
        string paramCls[] = { "int" };
        int numParam = 1;
        int numStubs0 = 0;
        void* paramObj[] = { &numStubs0 };
-       rmiObj->getMethodParams(paramCls, numParam, paramObj);
-       //CallBackInterfaceWithCallBack* stub0 = new CallBackInterfaceWithCallBack_CallbackStub(rmiCall, callbackAddress, objIdCnt, ports);
-       CallBackInterfaceWithCallBack* stub0 = new CallBackInterfaceWithCallBack_Stub(rmiCall, callbackAddress, objIdCnt, ports);
-       vecCallbackObj.push_back(stub0);
-       objIdCnt++;
-       registerCallback(stub0);
+       skel->rmiComm->getMethodParams(paramCls, numParam, paramObj, localMethodBytes);
+       // Choosing the right stub
+       int objIdRecv = numStubs0;
+       CallBackInterfaceWithCallBack* stub0 = NULL;
+       auto it = IoTRMIUtil::mapStub->find(objIdRecv);
+       if (it == IoTRMIUtil::mapStub->end()) { // Not in the map, so new object
+               stub0 = new CallBackInterfaceWithCallBack_Stub(rmiComm, objIdRecv);
+               IoTRMIUtil::mapStub->insert(make_pair(objIdRecv, stub0));
+               cout << "Create new stub for Callback! ID=" << objIdRecv << endl;
+               rmiComm->setObjectIdCounter(objIdRecv);
+               rmiComm->decrementObjectIdCounter();
+       } else {
+               stub0 = (CallBackInterfaceWithCallBack_Stub*) it->second;
+               cout << "Stub exists for Callback! ID=" << objIdRecv << endl;
+       }
+       skel->vecCallbackObj.push_back(stub0);
+       skel->registerCallback(stub0);
+       delete[] localMethodBytes;
 }
 
-void TestClassInterface_Skeleton::___callBack() {
+void TestClassInterface_Skeleton::___callBack(TestClassInterface_Skeleton* skel) {
+       char* localMethodBytes = new char[methodLen];
+       memcpy(localMethodBytes, skel->methodBytes, methodLen);
+       didGetMethodBytes.exchange(true);
        string paramCls[] = {  };
        int numParam = 0;
        void* paramObj[] = {  };
-       rmiObj->getMethodParams(paramCls, numParam, paramObj);
+       skel->rmiComm->getMethodParams(paramCls, numParam, paramObj, localMethodBytes);
        int retVal = callBack();
        void* retObj = &retVal;
-       rmiObj->sendReturnObj(retObj, "int");
+       skel->rmiComm->sendReturnObj(retObj, "int", localMethodBytes);
+       delete[] localMethodBytes;
 }
 
-void TestClassInterface_Skeleton::___waitRequestInvokeMethod() {
+void TestClassInterface_Skeleton::___waitRequestInvokeMethod(TestClassInterface_Skeleton* skel) {
+       cout << "Running loop!" << endl;
+       //didAlreadyInitWaitInvoke.exchange(true);
+       skel->didAlreadyInitWaitInvoke = true;
        while (true) {
-               rmiObj->getMethodBytes();
-               int _objectId = rmiObj->getObjectId();
-               int methodId = rmiObj->getMethodId();
-               if (_objectId == object0Id) {
-                       if (set0Allowed.find(methodId) == set0Allowed.end()) {
+               if (!methodReceived)
+                       continue;
+               skel->methodBytes = skel->rmiComm->getMethodBytes();
+               skel->methodLen = skel->rmiComm->getMethodLength();
+               cout << endl;
+               // TODO: Get method length as well!!!
+               //methodReceived.exchange(false);
+               methodReceived = false;
+               int _objectId = skel->rmiComm->getObjectId(skel->methodBytes);
+               int methodId = skel->rmiComm->getMethodId(skel->methodBytes);
+               if (_objectId == objectId) {
+                       if (skel->set0Allowed.find(methodId) == skel->set0Allowed.end()) {
                                cerr << "Object with object Id: " << _objectId << "  is not allowed to access method: " << methodId << endl;
                                return;
                        }
                }
-               else {
-                       cerr << "Object Id: " << _objectId << " not recognized!" << endl;
-                       return;
-               }
+               else
+                       continue;
                switch (methodId) {
-                       case 0: ___getShort(); break;
-                       case 1: ___registerCallback(); break;
-                       case 2: ___callBack(); break;
-                       case -9999: ___regCB(); break;
+                       case 0: { thread th0 (&TestClassInterface_Skeleton::___getShort, std::ref(skel), skel); th0.detach(); break; }
+                                       //___getShort(skel); break;
+                       case 1: { thread th1 (&TestClassInterface_Skeleton::___registerCallback, std::ref(skel), skel); th1.detach(); break; }
+                                       //___registerCallback(skel); break;
+                       case 2: { thread th2 (&TestClassInterface_Skeleton::___callBack, std::ref(skel), skel); th2.detach(); break; }
+                                       //___callBack(skel); break;
                        default: 
                        cerr << "Method Id " << methodId << " not recognized!" << endl;
-                       throw exception();
+                       //throw exception();
+                       return;
                }
+               cout << "Out of switch statement!" << endl;
        }
 }
 
 
+
 int main(int argc, char *argv[])
 {
        // First argument is port number
@@ -139,10 +170,11 @@ int main(int argc, char *argv[])
        cout << argv3 << endl;
        cout << argv4 << endl;*/
 
-       int port = 5010;
+       int portSend = 5000;
+       int portRecv = 6000;
        //TestClassInterface *tc = new TestClass(argv2, argv3, argv4);
        TestClassInterface *tc = new TestClass(123, 2.345, "test");
-       TestClassInterface_Skeleton *tcSkel = new TestClassInterface_Skeleton(tc, "localhost", port);
+       TestClassInterface_Skeleton *tcSkel = new TestClassInterface_Skeleton(tc, portSend, portRecv);
 
        //delete tc;
        //delete tcSkel;
index 1b0bc08d5f9fe51a7d14a8b554262e9e0b01ba4b..f3b2cbc498d1ab61a684589b0bb5a286cdc92ac0 100644 (file)
@@ -38,10 +38,10 @@ public class TestClassCallbacks_Stub {
                System.out.println("Registered callback!");
 
                System.out.println("Return value from callback 1: " + tcstub.callBack() + "\n\n");
-               //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)4576) + "\n\n");
-               //System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n");
-               //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1233) + "\n\n");
-               //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1321) + "\n\n");
+               System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)4576) + "\n\n");
+               System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n");
+               System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1233) + "\n\n");
+               System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1321) + "\n\n");
                while(true) {}
        }
 }