From: rtrimana Date: Tue, 31 Jan 2017 23:36:14 +0000 (-0800) Subject: Tested C++ RPS/RMI libraries for arbitrary objects and arbitrary remote calls X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=4d5080a1607fb27cfe8a769ef8ed34beef42859d;p=iot2.git Tested C++ RPS/RMI libraries for arbitrary objects and arbitrary remote calls --- diff --git a/iotjava/iotrmi/C++/ConcurrentLinkedListQueue.cpp b/iotjava/iotrmi/C++/ConcurrentLinkedListQueue.cpp index 3fd0273..9b6826d 100644 --- a/iotjava/iotrmi/C++/ConcurrentLinkedListQueue.cpp +++ b/iotjava/iotrmi/C++/ConcurrentLinkedListQueue.cpp @@ -6,9 +6,10 @@ using namespace std; -Node::Node(void* val) { +Node::Node(char* val, int len) { value = val; + length = len; next = NULL; } @@ -26,12 +27,18 @@ Node::~Node() { } -void* Node::getValue() { +char* Node::getValue() { return value; } +int Node::getLength() { + + return length; +} + + Node* Node::getNext() { return next; @@ -53,21 +60,21 @@ ConcurrentLinkedListQueue::ConcurrentLinkedListQueue() { ConcurrentLinkedListQueue::~ConcurrentLinkedListQueue() { - void* val = NULL; + char* val = NULL; do { // Dequeue and free everything up val = dequeue(); } while(val != NULL); } -void ConcurrentLinkedListQueue::enqueue(void* value) { +void ConcurrentLinkedListQueue::enqueue(char* value, int length) { - lock_guard guard(mtx); + lock_guard guard(queueMutex); if (tail == NULL && head == NULL) { // first element - tail = new Node(value); + tail = new Node(value, length); head = tail; // Both tail and head point to the first element } else { // Next elements - Node* newEl = new Node(value); + Node* newEl = new Node(value, length); tail->setNext(newEl); tail = newEl; } @@ -75,9 +82,9 @@ void ConcurrentLinkedListQueue::enqueue(void* value) { // Return element and remove from list -void* ConcurrentLinkedListQueue::dequeue() { +char* ConcurrentLinkedListQueue::dequeue() { - lock_guard guard(mtx); + lock_guard guard(queueMutex); if (tail == NULL && head == NULL) { // empty return NULL; } else { @@ -87,7 +94,7 @@ void* ConcurrentLinkedListQueue::dequeue() { tail = NULL; } else head = head->getNext(); - void* retVal = retEl->getValue(); + char* retVal = retEl->getValue(); // Prepare retEl for deletion retEl->setNext(NULL); delete retEl; @@ -96,80 +103,33 @@ void* ConcurrentLinkedListQueue::dequeue() { } } +// Return element, length, and remove it from list +char* ConcurrentLinkedListQueue::deQAndGetLength(int* length) { -void enQ(ConcurrentLinkedListQueue* methodQueue, char** test) { - - /*char* test = new char[3]; - test[0] = 'a'; - test[1] = 'b'; - test[2] = 'c';*/ - void* ptr = test; - for(int i=0; i<10; i++ ) { - cout << "Enqueuing: " << test << " address: " << ptr << endl; - methodQueue->enqueue(ptr); - } -} - - -void deQ(ConcurrentLinkedListQueue* methodQueue) { - - for(int i=0; i<12; i++) { - void* result = methodQueue->dequeue(); - if (result != NULL) { - cout << "Dequeue result: " << result << endl; - cout << "Dequeue result: " << *((char**) result) << endl; - } else { - cout << "Result is NULL!!! End of queue!" << endl; - } + lock_guard guard(queueMutex); + if (tail == NULL && head == NULL) { // empty + *length = 0; + return 0; + } else { + Node* retEl = head; + if (head->getNext() == NULL) { + head = NULL; + tail = NULL; + } else + head = head->getNext(); + char* retVal = retEl->getValue(); + *length = retEl->getLength(); + // Prepare retEl for deletion + retEl->setNext(NULL); + delete retEl; + // Return just the value + //cout << "Print bytes inside dequeue: "; + //IoTRMIUtil::printBytes(*((char**) retVal), *length, false); + //cout << "Dequeuing: " << *((char**) retVal) << endl; + //cout << "Dequeuing address: " << std::ref(retVal) << endl; + return retVal; } } -int main(int argc, char *argv[]) -{ - ConcurrentLinkedListQueue* methodQueue = new ConcurrentLinkedListQueue(); - /*cout << "Dequeue result: " << methodQueue->dequeue() << endl; - string str = "this is a test string"; - const char* test = str.c_str(); - const char** test2 = &test; - cout << "Initial result: " << test << endl; - cout << "Initial result 2: " << *test2 << endl; - void* ptr = &test; - cout << "Pointer: " << ptr << endl; - methodQueue->enqueue(ptr); - methodQueue->enqueue(ptr); - void* result = methodQueue->dequeue(); - cout << "Result: " << result << endl; - cout << "Dequeue result: " << *((const char**) result) << endl; - void* result2 = methodQueue->dequeue(); - cout << "Dequeue result: " << *((const char**) result2) << endl; - void* result3 = methodQueue->dequeue(); - cout << "Dequeue result: " << result3 << endl;*/ - //thread t1,t2; - - //t1 = thread(enQ, methodQueue); - //t2 = thread(deQ, methodQueue); - - //t1.join(); - //t2.join(); - - char* test = new char[3]; - test[0] = 'a'; - test[1] = 'b'; - test[2] = 'c'; - void* ptr = &test; - methodQueue->enqueue(ptr); - void* result = methodQueue->dequeue(); - cout << "Dequeue result: " << *((char**) result) << endl; - - thread t1,t2; - - t1 = thread(enQ, methodQueue, &test); - t2 = thread(deQ, methodQueue); - - t1.join(); - t2.join(); - - return 0; -} diff --git a/iotjava/iotrmi/C++/ConcurrentLinkedListQueue.hpp b/iotjava/iotrmi/C++/ConcurrentLinkedListQueue.hpp index e18e5e4..2007323 100644 --- a/iotjava/iotrmi/C++/ConcurrentLinkedListQueue.hpp +++ b/iotjava/iotrmi/C++/ConcurrentLinkedListQueue.hpp @@ -3,10 +3,12 @@ #include #include +#include "IoTRMIUtil.hpp" + /** Class ConcurrentLinkedListQueue is a queue that can handle * concurrent requests and packets for IoT communication via socket. *

- * It stores object through a void pointer. + * It stores object through a char pointer. * * @author Rahmadi Trimananda * @version 1.0 @@ -15,18 +17,20 @@ using namespace std; -mutex mtx; +mutex queueMutex; class Node { private: Node* next; - void* value; + char* value; + int length; public: - Node(void* val); + Node(char* val, int len); ~Node(); - void* getValue(); + char* getValue(); + int getLength(); Node* getNext(); void setNext(Node* nxt); @@ -42,7 +46,8 @@ class ConcurrentLinkedListQueue { public: ConcurrentLinkedListQueue(); ~ConcurrentLinkedListQueue(); - void enqueue(void* value); // Enqueue to tail - void* dequeue(); // Dequeue from tail + void enqueue(char* value, int length); // Enqueue to tail + char* dequeue(); // Dequeue from tail + char* deQAndGetLength(int* length); // Dequeue from tail and return length }; #endif diff --git a/iotjava/iotrmi/C++/IoTRMICall.hpp b/iotjava/iotrmi/C++/IoTRMICall.hpp index a630798..a92f3e3 100644 --- a/iotjava/iotrmi/C++/IoTRMICall.hpp +++ b/iotjava/iotrmi/C++/IoTRMICall.hpp @@ -37,7 +37,6 @@ class IoTRMICall { void** getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]); private: - map mapSign2MethodId; IoTRMIUtil *rmiUtil; IoTSocketClient *rmiClient; diff --git a/iotjava/iotrmi/C++/IoTRMIComm.hpp b/iotjava/iotrmi/C++/IoTRMIComm.hpp new file mode 100644 index 0000000..9f45c60 --- /dev/null +++ b/iotjava/iotrmi/C++/IoTRMIComm.hpp @@ -0,0 +1,477 @@ +/** Class IoTRMIComm combines the functionalities + * of IoTRMICall and IoTRMIObject to create a single + * communication class with two sockets serving one + * directional traffic for each. + * + * @author Rahmadi Trimananda + * @version 1.0 + * @since 2017-01-28 + */ +#ifndef _IOTRMICOMM_HPP__ +#define _IOTRMICOMM_HPP__ + +#include +#include +#include +#include +#include +#include + +#include "IoTSocketServer.hpp" +#include "IoTSocketClient.hpp" +#include "ConcurrentLinkedListQueue.cpp" + +using namespace std; + +std::atomic didGetMethodBytes(false); +std::atomic didGetReturnBytes(false); + +mutex regSkelMutex; +mutex regStubMutex; +mutex retValMutex; +mutex remoteCallMutex; +mutex sendReturnObjMutex; + +class IoTRMIComm { + public: + IoTRMIComm(); + ~IoTRMIComm(); + // Public methods + virtual void sendReturnObj(void* retObj, string type, char* methodBytes) = 0; + virtual void sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) = 0; + int returnLength(void* retObj[], string retCls[], int numRet); + char* returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet); + char* getMethodBytes(); + int getMethodLength(); + int getObjectIdFromMethod(); + static int getObjectId(char* packetBytes); + static int getMethodId(char* packetBytes); + static int getPacketType(char* packetBytes); + void** getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes); + void registerSkeleton(int objectId, bool* methodReceived); + void registerStub(int objectId, int methodId, bool* retValueReceived); + int getObjectIdCounter(); + void setObjectIdCounter(int objIdCounter); + void decrementObjectIdCounter(); + + int methodLength(string paramCls[], void* paramObj[], int numParam); + char* methodToBytes(int objectId, int methId, string paramCls[], void* paramObj[], + char* method, int numParam); + virtual void remoteCall(int objectId, int methodId, string paramCls[], + void* paramObj[], int numParam) = 0; + void* getReturnValue(string retType, void* retObj); + + void** getStructObjects(string retType[], int numRet, void* retObj[]); + void** getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]); + + protected: + IoTRMIUtil *rmiUtil; + char* methodBytes; + int methodLen; + char* retValueBytes; + int retValueLen; + ConcurrentLinkedListQueue methodQueue; + ConcurrentLinkedListQueue returnQueue; + map mapSkeletonId; + map mapStubId; + int objectIdCounter = std::numeric_limits::max(); + + private: + // Private methods + void wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm); + void wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm); +}; + + +// Constructor +IoTRMIComm::IoTRMIComm() { + + rmiUtil = new IoTRMIUtil(); + methodBytes = NULL; + retValueBytes = NULL; + methodLen = 0; + retValueLen = 0; + thread th1 (&IoTRMIComm::wakeUpThreadOnMethodCall, this, this); + th1.detach(); + thread th2 (&IoTRMIComm::wakeUpThreadOnReturnValue, this, this); + th2.detach(); + +} + + +// Destructor +IoTRMIComm::~IoTRMIComm() { + + // Clean up + if (rmiUtil != NULL) { + delete rmiUtil; + rmiUtil = NULL; + } +} + + +void IoTRMIComm::wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm) { + + int methLen = 0; + //cout << "Starting wakeUpThreadOnMethodCall()" << endl; + while(true) { + // Convert back to char* + char* queueHead = rmiComm->methodQueue.deQAndGetLength(&methLen); + if (queueHead != NULL) { + rmiComm->methodBytes = queueHead; + rmiComm->methodLen = methLen; + //IoTRMIUtil::printBytes(rmiComm->methodBytes, rmiComm->methodLen, false); + int currObjId = rmiComm->getObjectId(rmiComm->methodBytes); + auto search = rmiComm->mapSkeletonId.find(currObjId); + bool* methRecv = search->second; + didGetMethodBytes.exchange(false); + *methRecv = true; + while(!didGetMethodBytes); + } + } +} + + +void IoTRMIComm::wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm) { + + int retLen = 0; + //cout << "Starting wakeUpThreadOnReturnValue()" << endl; + while(true) { + // Convert back to char* + char* queueHead = rmiComm->returnQueue.deQAndGetLength(&retLen); + if (queueHead != NULL) { + rmiComm->retValueBytes = queueHead; + rmiComm->retValueLen = retLen; + //IoTRMIUtil::printBytes(rmiComm->retValueBytes, rmiComm->retValueLen, false); + int objectId = rmiComm->getObjectId(rmiComm->retValueBytes); + int methodId = rmiComm->getMethodId(rmiComm->retValueBytes); + string strKey = to_string(objectId) + "-" + to_string(methodId); + auto search = rmiComm->mapStubId.find(strKey); + bool* retRecv = search->second; + didGetReturnBytes.exchange(false); + *retRecv = true; + while(!didGetReturnBytes); + } + } +} + + +// registerSkeleton() registers the skeleton to be woken up +void IoTRMIComm::registerSkeleton(int objectId, bool* methodReceived) { + + lock_guard guard(regSkelMutex); + mapSkeletonId.insert(make_pair(objectId, methodReceived)); +} + + +// registerStub() registers the skeleton to be woken up +void IoTRMIComm::registerStub(int objectId, int methodId, bool* retValueReceived) { + + lock_guard guard(regStubMutex); + string strKey = to_string(objectId) + "-" + to_string(methodId); + mapStubId.insert(make_pair(strKey, retValueReceived)); +} + + +// getObjectIdCounter() gets object Id counter +int IoTRMIComm::getObjectIdCounter() { + + return objectIdCounter; +} + + +// setObjectIdCounter() sets object Id counter +void IoTRMIComm::setObjectIdCounter(int objIdCounter) { + + objectIdCounter = objIdCounter; +} + + +// decrementObjectIdCounter() gets object Id counter +void IoTRMIComm::decrementObjectIdCounter() { + + objectIdCounter--; +} + + +// Get method bytes from the socket +char* IoTRMIComm::getMethodBytes() { + + // Get method bytes + return methodBytes; +} + + +// Get method length from the socket +int IoTRMIComm::getMethodLength() { + + // Get method bytes + return methodLen; +} + + +// Get object Id from bytes +int IoTRMIComm::getObjectIdFromMethod() { + + char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN]; + memcpy(objectIdBytes, methodBytes, IoTRMIUtil::OBJECT_ID_LEN); + // Get method signature + int objectId = 0; + IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes); + + return objectId; +} + + +// Get object Id from bytes (static version) +int IoTRMIComm::getObjectId(char* packetBytes) { + + char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN]; + memcpy(objectIdBytes, packetBytes, IoTRMIUtil::OBJECT_ID_LEN); + // Get method signature + int objectId = 0; + IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes); + + return objectId; +} + + +// Get methodId from bytes (static version) +int IoTRMIComm::getMethodId(char* packetBytes) { + + // Get method Id + char methodIdBytes[IoTRMIUtil::METHOD_ID_LEN]; + int offset = IoTRMIUtil::OBJECT_ID_LEN; + memcpy(methodIdBytes, packetBytes + offset, IoTRMIUtil::METHOD_ID_LEN); + // Get method signature + int methodId = 0; + IoTRMIUtil::byteArrayToInt(&methodId, methodIdBytes); + + return methodId; +} + + +// Get methodId from bytes (static version) +int IoTRMIComm::getPacketType(char* packetBytes) { + + // Get method Id + char packetTypeBytes[IoTRMIUtil::METHOD_ID_LEN]; + int offset = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN; + memcpy(packetTypeBytes, packetBytes + offset, IoTRMIUtil::PACKET_TYPE_LEN); + // Get method signature + int packetType = 0; + IoTRMIUtil::byteArrayToInt(&packetType, packetTypeBytes); + + return packetType; +} + + +// Get method parameters and return an array of parameter objects +// +// For primitive objects: +// | 32-bit method ID | m-bit actual data (fixed length) | +// +// For string, arrays, and non-primitive objects: +// | 32-bit method ID | 32-bit length | n-bit actual data | ... +void** IoTRMIComm::getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes) { + + // Byte scanning position + int pos = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN; + for (int i = 0; i < numParam; i++) { + int paramLen = rmiUtil->getTypeSize(paramCls[i]); + // Get the 32-bit field in the byte array to get the actual + // length (this is a param with indefinite length) + if (paramLen == -1) { + char bytPrmLen[IoTRMIUtil::PARAM_LEN]; + memcpy(bytPrmLen, methodBytes + pos, IoTRMIUtil::PARAM_LEN); + pos = pos + IoTRMIUtil::PARAM_LEN; + int* prmLenPtr = IoTRMIUtil::byteArrayToInt(¶mLen, bytPrmLen); + paramLen = *prmLenPtr; + } + char paramBytes[paramLen]; + memcpy(paramBytes, methodBytes + pos, paramLen); + pos = pos + paramLen; + paramObj[i] = IoTRMIUtil::getParamObject(paramObj[i], paramCls[i].c_str(), paramBytes, paramLen); + } + + return paramObj; +} + + +// Find the bytes length of a return object (struct that has more than 1 member) +int IoTRMIComm::returnLength(void* retObj[], string retCls[], int numRet) { + + // Get byte arrays and calculate return bytes length + int returnLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN; + for (int i = 0; i < numRet; i++) { + // Find the return length + int retObjLen = rmiUtil->getTypeSize(retCls[i]); + if (retObjLen == -1) { // Store the length of the field - indefinite length + retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]); + // Some space for return length, i.e. 32 bits for integer + returnLen = returnLen + IoTRMIUtil::RETURN_LEN; + } + // Calculate returnLen + returnLen = returnLen + retObjLen; + } + + return returnLen; +} + + +// Convert return object (struct members) into bytes +char* IoTRMIComm::returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet) { + + int pos = 0; + // Get byte arrays and calculate return bytes length + for (int i = 0; i < numRet; i++) { + // Find the return length + int retObjLen = rmiUtil->getTypeSize(retCls[i]); + if (retObjLen == -1) { // Store the length of the field - indefinite length + retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]); + // Write the return length + char retLenBytes[IoTRMIUtil::RETURN_LEN]; + IoTRMIUtil::intToByteArray(retObjLen, retLenBytes); + memcpy(retBytes + pos, retLenBytes, IoTRMIUtil::RETURN_LEN); + pos = pos + IoTRMIUtil::RETURN_LEN; + } + // Get array of bytes and put it in the array of array of bytes + char objBytes[retObjLen]; + IoTRMIUtil::getObjectBytes(objBytes, retObj[i], retCls[i].c_str()); + memcpy(retBytes + pos, objBytes, retObjLen); + pos = pos + retObjLen; + } + + return retBytes; +} + + +// Get return value for single values (non-structs) +void* IoTRMIComm::getReturnValue(string retType, void* retObj) { + + // Receive return value and return it to caller + lock_guard guard(retValMutex); + // Copy just the actual return value bytes + int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN; + int retActualLen = retValueLen - headerLen; + //char *retActualBytes = new char[retActualLen]; + char retActualBytes[retActualLen]; + memcpy(retActualBytes, retValueBytes + headerLen, retActualLen); + //IoTRMIUtil::printBytes(retActualBytes, retActualLen, false); + retObj = IoTRMIUtil::getParamObject(retObj, retType.c_str(), retActualBytes, retActualLen); + // Delete received bytes object + delete[] retValueBytes; + //delete[] retActualBytes; + + return retObj; +} + + +// Get a set of return objects (struct) +void** IoTRMIComm::getStructObjects(string retType[], int numRet, void* retObj[]) { + + // Critical section that is used by different objects + lock_guard guard(retValMutex); + // Copy just the actual return value bytes + int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN; + int retActualLen = retValueLen - headerLen; + char retActualBytes[retActualLen]; + memcpy(retActualBytes, retValueBytes + headerLen, retActualLen); + // Return size of array of struct + retObj = getReturnObjects(retActualBytes, retType, numRet, retObj); + // Delete received bytes object + delete[] retValueBytes; + + return retObj; +} + + +// Find the bytes length of a method +int IoTRMIComm::methodLength(string paramCls[], void* paramObj[], int numParam) { + + // Get byte arrays and calculate method bytes length + // Start from the object Id + method Id... + int methodLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN; + for (int i = 0; i < numParam; i++) { + // Find the parameter length + int paramLen = rmiUtil->getTypeSize(paramCls[i]); + if (paramLen == -1) { // Store the length of the field - indefinite length + paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]); + // Some space for param length, i.e. 32 bits for integer + methodLen = methodLen + IoTRMIUtil::PARAM_LEN; + } + // Calculate methodLen + methodLen = methodLen + paramLen; + } + return methodLen; +} + + +// Convert method and its parameters into bytes +char* IoTRMIComm::methodToBytes(int objectId, int methId, string paramCls[], + void* paramObj[], char* method, int numParam) { + + // Get object Id in bytes + char objId[IoTRMIUtil::OBJECT_ID_LEN]; + IoTRMIUtil::intToByteArray(objectId, objId); + memcpy(method, objId, IoTRMIUtil::OBJECT_ID_LEN); + int pos = IoTRMIUtil::OBJECT_ID_LEN; + // Get method Id in bytes + char methodId[IoTRMIUtil::METHOD_ID_LEN]; + IoTRMIUtil::intToByteArray(methId, methodId); + memcpy(method + pos, methodId, IoTRMIUtil::METHOD_ID_LEN); + pos = pos + IoTRMIUtil::METHOD_ID_LEN; + char packetType[IoTRMIUtil::PACKET_TYPE_LEN]; + IoTRMIUtil::intToByteArray(IoTRMIUtil::METHOD_TYPE, methodId); + memcpy(method + pos, methodId, IoTRMIUtil::PACKET_TYPE_LEN); + pos = pos + IoTRMIUtil::PACKET_TYPE_LEN; + // Get byte arrays and calculate method bytes length + for (int i = 0; i < numParam; i++) { + // Find the parameter length + int paramLen = rmiUtil->getTypeSize(paramCls[i]); + if (paramLen == -1) { // Store the length of the field - indefinite length + paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]); + // Write the parameter length + char prmLenBytes[IoTRMIUtil::PARAM_LEN]; + IoTRMIUtil::intToByteArray(paramLen, prmLenBytes); + memcpy(method + pos, prmLenBytes, IoTRMIUtil::PARAM_LEN); + pos = pos + IoTRMIUtil::PARAM_LEN; + } + // Get array of bytes and put it in the array of array of bytes + char objBytes[paramLen]; + IoTRMIUtil::getObjectBytes(objBytes, paramObj[i], paramCls[i].c_str()); + memcpy(method + pos, objBytes, paramLen); + pos = pos + paramLen; + } + + return method; +} + + +// Get return objects for structs +void** IoTRMIComm::getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]) { + + // Byte scanning position + int pos = 0; + for (int i = 0; i < numRet; i++) { + int retLen = rmiUtil->getTypeSize(retCls[i]); + // Get the 32-bit field in the byte array to get the actual + // length (this is a param with indefinite length) + if (retLen == -1) { + char bytRetLen[IoTRMIUtil::RETURN_LEN]; + memcpy(bytRetLen, retBytes + pos, IoTRMIUtil::RETURN_LEN); + pos = pos + IoTRMIUtil::RETURN_LEN; + int* retLenPtr = IoTRMIUtil::byteArrayToInt(&retLen, bytRetLen); + retLen = *retLenPtr; + } + char retObjBytes[retLen]; + memcpy(retObjBytes, retBytes + pos, retLen); + pos = pos + retLen; + retObj[i] = IoTRMIUtil::getParamObject(retObj[i], retCls[i].c_str(), retObjBytes, retLen); + } + + return retObj; +} +#endif + + diff --git a/iotjava/iotrmi/C++/IoTRMICommClient.hpp b/iotjava/iotrmi/C++/IoTRMICommClient.hpp new file mode 100644 index 0000000..0328059 --- /dev/null +++ b/iotjava/iotrmi/C++/IoTRMICommClient.hpp @@ -0,0 +1,175 @@ +/** Class IoTRMICommClient implements the client side + * of IoTRMIComm class. + * + * @author Rahmadi Trimananda + * @version 1.0 + * @since 2017-01-28 + */ +#ifndef _IOTRMICOMMCLIENT_HPP__ +#define _IOTRMICOMMCLIENT_HPP__ + +#include +#include +#include +#include +#include +#include + +#include "IoTRMIComm.hpp" + +using namespace std; + +mutex clientRemoteCallMutex; +mutex clientSendReturnObjMutex; + +class IoTRMICommClient : public IoTRMIComm { + public: + IoTRMICommClient(int _portSend, int _portRecv, const char* _address, int _rev, bool* _bResult); + ~IoTRMICommClient(); + // Public methods + void sendReturnObj(void* retObj, string type, char* methodBytes); + void sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes); + void remoteCall(int objectId, int methodId, string paramCls[], void* paramObj[], int numParam); + //void waitForPackets(); + //void waitForPackets(IoTRMICommClient* rmiComm); + + private: + IoTSocketClient *rmiClientSend; + IoTSocketClient *rmiClientRecv; + + // Private methods + void waitForPackets(IoTRMICommClient* rmiComm); +}; + + +// Constructor +IoTRMICommClient::IoTRMICommClient(int _portSend, int _portRecv, const char* _address, int _rev, bool* _bResult) : IoTRMIComm() { + + rmiClientRecv = new IoTSocketClient(_portSend, _address, _rev, _bResult); + rmiClientSend = new IoTSocketClient(_portRecv, _address, _rev, _bResult); + thread th1 (&IoTRMICommClient::waitForPackets, this, this); + th1.detach(); + +} + + +// Destructor +IoTRMICommClient::~IoTRMICommClient() { + + // Clean up + if (rmiClientRecv != NULL) { + delete rmiClientRecv; + rmiClientRecv = NULL; + } + if (rmiClientSend != NULL) { + delete rmiClientSend; + rmiClientSend = NULL; + } +} + + +void IoTRMICommClient::waitForPackets(IoTRMICommClient* rmiComm) { + + char* packetBytes = NULL; + int packetLen = 0; + while(true) { + fflush(NULL); + packetBytes = rmiClientRecv->receiveBytes(packetBytes, &packetLen); + fflush(NULL); + if (packetBytes != NULL) { // If there is method bytes + //IoTRMIUtil::printBytes(packetBytes, packetLen, false); + //packetBytesPtr = &packetBytes; + int packetType = getPacketType(packetBytes); + if (packetType == IoTRMIUtil::METHOD_TYPE) { + rmiComm->methodQueue.enqueue(packetBytes, packetLen); + } else if (packetType == IoTRMIUtil::RET_VAL_TYPE) { + rmiComm->returnQueue.enqueue(packetBytes, packetLen); + } else { + // TODO: We need to log error message when we come to running this using IoTSlave + // TODO: Beware that using "cout" in the process will kill it (as IoTSlave is loaded in Sentinel) + cerr << "IoTRMICommClient: Packet type is unknown: " << packetType << endl; + exit(1); + } + } + packetBytes = NULL; + packetLen = 0; + } +} + + +// Send return values in bytes to the caller +void IoTRMICommClient::sendReturnObj(void* retObj, string type, char* methodBytes) { + + // Critical section that is used by different objects + lock_guard guard(sendReturnObjMutex); + // Find the length of return object in bytes + int retLen = rmiUtil->getTypeSize(type); + if (retLen == -1) { + retLen = rmiUtil->getVarTypeSize(type, retObj); + } + // Copy the header and object bytes + int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN; + int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN; + char retAllObjBytes[headerLen+retLen]; + // Copy object and method Id first + memcpy(retAllObjBytes, methodBytes, objAndMethIdLen); + // Copy objectId + methodId + packet type in bytes + char packType[IoTRMIUtil::PACKET_TYPE_LEN]; + IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType); + memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN); + // Copy object into byte array + char retObjBytes[retLen]; + IoTRMIUtil::getObjectBytes(retObjBytes, retObj, type.c_str()); + memcpy(retAllObjBytes + headerLen, retObjBytes, retLen); + fflush(NULL); + rmiClientSend->sendBytes(retAllObjBytes, headerLen+retLen); + fflush(NULL); +} + + +// Send return values in bytes to the caller (for more than one object - struct) +void IoTRMICommClient::sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) { + + // Critical section that is used by different objects + lock_guard guard(sendReturnObjMutex); + // Find the length of return object in bytes + int retLen = returnLength(retObj, type, numRet); + // Copy the header and object bytes + int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN; + int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN; + char retAllObjBytes[headerLen+retLen]; + // Copy object and method Id first + memcpy(retAllObjBytes, methodBytes, objAndMethIdLen); + // Copy objectId + methodId + packet type in bytes + char packType[IoTRMIUtil::PACKET_TYPE_LEN]; + IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType); + memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN); + // Copy object into byte array + char retObjBytes[retLen]; + returnToBytes(retObj, type, retObjBytes, numRet); + memcpy(retAllObjBytes + headerLen, retObjBytes, retLen); + fflush(NULL); + rmiClientSend->sendBytes(retAllObjBytes, headerLen+retLen); + fflush(NULL); +} + + +// Calls a method remotely by passing in parameters and getting a return object +void IoTRMICommClient::remoteCall(int objectId, int methodId, string paramCls[], + void* paramObj[], int numParam) { + + // Critical section that is used by different objects + lock_guard guard(remoteCallMutex); + // Send input parameters + int len = methodLength(paramCls, paramObj, numParam); + char method[len]; + methodToBytes(objectId, methodId, paramCls, paramObj, method, numParam); + // Send bytes + fflush(NULL); + rmiClientSend->sendBytes(method, len); + fflush(NULL); + +} +#endif + + diff --git a/iotjava/iotrmi/C++/IoTRMICommServer.hpp b/iotjava/iotrmi/C++/IoTRMICommServer.hpp new file mode 100644 index 0000000..18f59c2 --- /dev/null +++ b/iotjava/iotrmi/C++/IoTRMICommServer.hpp @@ -0,0 +1,193 @@ +/** Class IoTRMICommServer implements the server side + * of IoTRMIComm class. + * + * @author Rahmadi Trimananda + * @version 1.0 + * @since 2017-01-28 + */ +#ifndef _IOTRMICOMMSERVER_HPP__ +#define _IOTRMICOMMSERVER_HPP__ + +#include +#include +#include +#include +#include +#include + +#include "IoTRMIComm.hpp" + +using namespace std; + + +class IoTRMICommServer : public IoTRMIComm { + public: + IoTRMICommServer(int _portSend, int _portRecv, bool* _bResult); + ~IoTRMICommServer(); + // Public methods + void sendReturnObj(void* retObj, string type, char* methodBytes); + void sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes); + void remoteCall(int objectId, int methodId, string paramCls[], void* paramObj[], int numParam); + + private: + IoTSocketServer *rmiServerSend; + IoTSocketServer *rmiServerRecv; + + // Private methods + void waitForConnectionOnServerRecv(); + void waitForConnectionOnServerSend(); + void waitForPackets(IoTRMICommServer* rmiComm); +}; + + +// Constructor +IoTRMICommServer::IoTRMICommServer(int _portSend, int _portRecv, bool* _bResult) : IoTRMIComm() { + + rmiServerSend = new IoTSocketServer(_portSend, _bResult); + rmiServerRecv = new IoTSocketServer(_portRecv, _bResult); + thread th1 (&IoTRMICommServer::waitForConnectionOnServerSend, this); + thread th2 (&IoTRMICommServer::waitForConnectionOnServerRecv, this); + th1.join(); + th2.join(); + thread th3 (&IoTRMICommServer::waitForPackets, this, this); + th3.detach(); +} + + +// Destructor +IoTRMICommServer::~IoTRMICommServer() { + + // Clean up + if (rmiServerSend != NULL) { + delete rmiServerSend; + rmiServerSend = NULL; + } + if (rmiServerRecv != NULL) { + delete rmiServerRecv; + rmiServerRecv = NULL; + } +} + + +void IoTRMICommServer::waitForConnectionOnServerRecv() { + + cout << "Wait on connection ServerRecv!" << endl; + rmiServerRecv->connect(); + cout << "Connected on connection ServerRecv!" << endl; +} + + +void IoTRMICommServer::waitForConnectionOnServerSend() { + + cout << "Wait on connection ServerSend!" << endl; + rmiServerSend->connect(); + cout << "Connected on connection ServerSend!" << endl; +} + + +void IoTRMICommServer::waitForPackets(IoTRMICommServer* rmiComm) { + + char* packetBytes = NULL; + int packetLen = 0; + //cout << "Starting waitForPacketsOnServer()" << endl; + while(true) { + fflush(NULL); + packetBytes = rmiComm->rmiServerRecv->receiveBytes(packetBytes, &packetLen); + fflush(NULL); + if (packetBytes != NULL) { // If there is method bytes + //IoTRMIUtil::printBytes(packetBytes, packetLen, false); + int packetType = IoTRMIComm::getPacketType(packetBytes); + if (packetType == IoTRMIUtil::METHOD_TYPE) { + rmiComm->methodQueue.enqueue(packetBytes, packetLen); + } else if (packetType == IoTRMIUtil::RET_VAL_TYPE) { + rmiComm->returnQueue.enqueue(packetBytes, packetLen); + } else { + // TODO: We need to log error message when we come to running this using IoTSlave + // TODO: Beware that using "cout" in the process will kill it (as IoTSlave is loaded in Sentinel) + cerr << "IoTRMICommServer: Packet type is unknown: " << packetType << endl; + exit(1); + } + } + packetBytes = NULL; + packetLen = 0; + } +} + + +// Send return values in bytes to the caller +void IoTRMICommServer::sendReturnObj(void* retObj, string type, char* methodBytes) { + + // Critical section that is used by different objects + lock_guard guard(sendReturnObjMutex); + // Find the length of return object in bytes + int retLen = rmiUtil->getTypeSize(type); + if (retLen == -1) { + retLen = rmiUtil->getVarTypeSize(type, retObj); + } + // Copy the header and object bytes + int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN; + int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN; + char retAllObjBytes[headerLen+retLen]; + // Copy object and method Id first + memcpy(retAllObjBytes, methodBytes, objAndMethIdLen); + // Copy objectId + methodId + packet type in bytes + char packType[IoTRMIUtil::PACKET_TYPE_LEN]; + IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType); + memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN); + // Copy object into byte array + char retObjBytes[retLen]; + IoTRMIUtil::getObjectBytes(retObjBytes, retObj, type.c_str()); + memcpy(retAllObjBytes + headerLen, retObjBytes, retLen); + fflush(NULL); + IoTRMIUtil::printBytes(retAllObjBytes, headerLen+retLen, false); + rmiServerSend->sendBytes(retAllObjBytes, headerLen+retLen); + fflush(NULL); +} + + +// Send return values in bytes to the caller (for more than one object - struct) +void IoTRMICommServer::sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) { + + // Critical section that is used by different objects + lock_guard guard(sendReturnObjMutex); + // Find the length of return object in bytes + int retLen = returnLength(retObj, type, numRet); + // Copy the header and object bytes + int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN; + int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN; + char retAllObjBytes[headerLen+retLen]; + // Copy object and method Id first + memcpy(retAllObjBytes, methodBytes, objAndMethIdLen); + // Copy objectId + methodId + packet type in bytes + char packType[IoTRMIUtil::PACKET_TYPE_LEN]; + IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType); + memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN); + // Copy object into byte array + char retObjBytes[retLen]; + returnToBytes(retObj, type, retObjBytes, numRet); + memcpy(retAllObjBytes + headerLen, retObjBytes, retLen); + fflush(NULL); + rmiServerSend->sendBytes(retAllObjBytes, headerLen+retLen); + fflush(NULL); +} + + +// Calls a method remotely by passing in parameters and getting a return object +void IoTRMICommServer::remoteCall(int objectId, int methodId, string paramCls[], + void* paramObj[], int numParam) { + + // Critical section that is used by different objects + lock_guard guard(remoteCallMutex); + // Send input parameters + int len = methodLength(paramCls, paramObj, numParam); + char method[len]; + methodToBytes(objectId, methodId, paramCls, paramObj, method, numParam); + // Send bytes + fflush(NULL); + rmiServerSend->sendBytes(method, len); + fflush(NULL); + +} +#endif + + diff --git a/iotjava/iotrmi/C++/IoTRMIUtil.hpp b/iotjava/iotrmi/C++/IoTRMIUtil.hpp index 2c3be17..74f6d32 100644 --- a/iotjava/iotrmi/C++/IoTRMIUtil.hpp +++ b/iotjava/iotrmi/C++/IoTRMIUtil.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -97,11 +98,19 @@ class IoTRMIUtil { // Constants const static int OBJECT_ID_LEN = 4; // 4 bytes = 32 bits const static int METHOD_ID_LEN = 4; // 4 bytes = 32 bits + const static int PACKET_TYPE_LEN = 4;// 4 bytes = 32 bits const static int PARAM_LEN = 4; // 4 bytes = 32 bits (4-byte field that stores the length of the param) const static int RETURN_LEN = 4; // 4 bytes = 32 bits (4-byte field that stores the length of the return object) const static int CHAR_LEN = 2; // 2 bytes (we follow Java convention) const static int BYTE_LEN = 1; // 1 byte const static int BOOL_LEN = 1; // 1 byte + const static int METHOD_TYPE = 1; // Packet type of method + const static int RET_VAL_TYPE = -1; // Packet type of return value + + // Static containers + static map* mapStub; // Map object to its stub ID + static map* mapSkel; // Map object to its skeleton + static map* mapSkelId; // Map object to its skeleton ID private: map mapPrimitives; @@ -109,6 +118,10 @@ class IoTRMIUtil { map mapNonPrimitives; }; +map* IoTRMIUtil::mapStub = new map(); +map* IoTRMIUtil::mapSkel = new map(); +map* IoTRMIUtil::mapSkelId = new map(); + // Constructor IoTRMIUtil::IoTRMIUtil() { diff --git a/iotjava/iotrmi/C++/IoTSocket.hpp b/iotjava/iotrmi/C++/IoTSocket.hpp index 3bbf74f..52907ba 100644 --- a/iotjava/iotrmi/C++/IoTSocket.hpp +++ b/iotjava/iotrmi/C++/IoTSocket.hpp @@ -40,6 +40,10 @@ static const int MSG_LEN_SIZE = 4; #define SD_SEND 0x01 #define SD_BOTH 0x02 +mutex sendBytesMutex; +mutex recvBytesMutex; +mutex sendAckMutex; +mutex recvAckMutex; class IoTSocket { public: @@ -92,6 +96,9 @@ IoTSocket::~IoTSocket() { // Send bytes over the wire bool IoTSocket::sendBytes(char* pVals, int iLen) { + // Critical section that is used by different objects + lock_guard guard(sendBytesMutex); + int i = 0; char size[MSG_LEN_SIZE]; // Convert int to byte array and fix endianness @@ -102,10 +109,13 @@ bool IoTSocket::sendBytes(char* pVals, int iLen) { return false; } + IoTRMIUtil::printBytes(size, 4, false); + if (send(m_iSock, (char *) pVals, iLen, 0) == -1) { perror("IoTSocket: Send bytes error!"); return false; } + #ifdef DEBUG_ACK if (!receiveAck()) return false; @@ -121,6 +131,9 @@ bool IoTSocket::sendBytes(char* pVals, int iLen) { // Generate an array of char on the heap and return it char* IoTSocket::receiveBytes(char* pVals, int* len) { + // Critical section that is used by different objects + lock_guard guard(recvBytesMutex); + int i = 0; int j = 0; char* pTemp = NULL; @@ -162,6 +175,7 @@ char* IoTSocket::receiveBytes(char* pVals, int* len) if (iTotalBytes == iLen) bEnd = true; } + #ifdef DEBUG_ACK if (!sendAck()) return NULL; @@ -187,6 +201,8 @@ bool IoTSocket::close() // Receive a short ack from the client bool IoTSocket::receiveAck() { + // Critical section that is used by different objects + lock_guard guard(recvAckMutex); char temp[1]; int iTotal = 0; int iResult = 0; @@ -208,6 +224,8 @@ bool IoTSocket::receiveAck() // Send a short ack to the client bool IoTSocket::sendAck() { + // Critical section that is used by different objects + lock_guard guard(sendAckMutex); char temp[1]; temp[0] = 42; diff --git a/iotjava/iotrmi/C++/basics/CallBack.hpp b/iotjava/iotrmi/C++/basics/CallBack.hpp index 3aa71ae..7b7c8a1 100644 --- a/iotjava/iotrmi/C++/basics/CallBack.hpp +++ b/iotjava/iotrmi/C++/basics/CallBack.hpp @@ -39,7 +39,7 @@ void CallBack::setInt(int _i) { void CallBack::needCallback(TestClassComplete* tc) { - cout << "Short from TestClass: " << tc->getShort(1234); + cout << endl << "Short from TestClass: " << tc->getShort(1234) << endl << endl; } #endif diff --git a/iotjava/iotrmi/C++/basics/TestClass.hpp b/iotjava/iotrmi/C++/basics/TestClass.hpp index e4918b5..ca96349 100644 --- a/iotjava/iotrmi/C++/basics/TestClass.hpp +++ b/iotjava/iotrmi/C++/basics/TestClass.hpp @@ -105,13 +105,6 @@ TestClass::TestClass(int _int, float _float, string _string) { } -void TestClass::registerCallback(CallBackInterfaceWithCallBack* _cb) { - - cbvec.push_back(_cb); - cout << "Registering callback object!" << endl; -} - - void TestClass::registerCallbackArray(vector _cb) { for (CallBackInterfaceWithCallBack* cb : _cb) { @@ -142,13 +135,26 @@ void TestClass::registerCallbackComplex(int in, vectorneedCallback(this); + //cout << "Sum: " << sum << endl; sum = sum + cb->printInt(); + cb->needCallback(this); + //cb->needCallback(this); + TestClass* tc = new TestClass(); + cb->needCallback(tc); + //cout << "Sum after: " << sum << endl; } + cout << "About to return sum: " << sum << endl; return sum; } diff --git a/iotjava/iotrmi/C++/basics/TestClassComplete_Stub.cpp b/iotjava/iotrmi/C++/basics/TestClassComplete_Stub.cpp index 4003d89..e524f4d 100644 --- a/iotjava/iotrmi/C++/basics/TestClassComplete_Stub.cpp +++ b/iotjava/iotrmi/C++/basics/TestClassComplete_Stub.cpp @@ -8,34 +8,24 @@ using namespace std; -TestClassComplete_Stub::TestClassComplete_Stub(int _port, const char* _skeletonAddress, string _callbackAddress, int _rev, bool* _bResult, vector _ports) { - callbackAddress = _callbackAddress; - ports = _ports; - rmiCall = new IoTRMICall(_port, _skeletonAddress, _rev, _bResult); - set0Allowed.insert(-9998); - //thread th1 (&TestClassComplete_Stub::___initCallBack, this); - //th1.detach(); - ___regCB(); +TestClassComplete_Stub::TestClassComplete_Stub(int _portSend, int _portRecv, const char* _skeletonAddress, int _rev, bool* _bResult) { + rmiComm = new IoTRMICommClient(_portSend, _portRecv, _skeletonAddress, _rev, _bResult); + rmiComm->registerStub(objectId, 0, &retValueReceived0); + rmiComm->registerStub(objectId, 2, &retValueReceived2); + IoTRMIUtil::mapStub->insert(make_pair(objectId, this)); } -TestClassComplete_Stub::TestClassComplete_Stub(IoTRMICall* _rmiCall, string _callbackAddress, int _objIdCnt, vector _ports) { - callbackAddress = _callbackAddress; - rmiCall = _rmiCall; - objIdCnt = _objIdCnt; - set0Allowed.insert(-9998); - //thread th1 (&TestClassComplete_Stub::___initCallBack, this); - //th1.detach(); - ___regCB(); +TestClassComplete_Stub::TestClassComplete_Stub(IoTRMIComm* _rmiComm, int _objectId) { + rmiComm = _rmiComm; + objectId = _objectId; + rmiComm->registerStub(objectId, 0, &retValueReceived0); + rmiComm->registerStub(objectId, 2, &retValueReceived2); } TestClassComplete_Stub::~TestClassComplete_Stub() { - if (rmiCall != NULL) { - delete rmiCall; - rmiCall = NULL; - } - if (rmiObj != NULL) { - delete rmiObj; - rmiObj = NULL; + if (rmiComm != NULL) { + delete rmiComm; + rmiComm = NULL; } for(CallBackInterface* cb : vecCallbackObj) { delete cb; @@ -43,32 +33,45 @@ TestClassComplete_Stub::~TestClassComplete_Stub() { } } +mutex mtxMethodExec1; // TODO: We probably need to correlate this always with class name, e.g. methodExecCallBackInterfaceWithCallBack void TestClassComplete_Stub::registerCallback(CallBackInterface* _cb) { - //CallBackInterface_CallbackSkeleton* skel0 = new CallBackInterface_CallbackSkeleton(_cb, callbackAddress, objIdCnt++); - CallBackInterface_Skeleton* skel0 = new CallBackInterface_Skeleton(_cb, callbackAddress, objIdCnt++); - vecCallbackObj.push_back(skel0); - int ___paramCB0 = 1; + lock_guard guard(mtxMethodExec1); + int objIdSent = 0; + auto it = IoTRMIUtil::mapSkel->find(_cb); + if (it == IoTRMIUtil::mapSkel->end()) { // Not in the map, so new object + objIdSent = rmiComm->getObjectIdCounter(); + rmiComm->decrementObjectIdCounter(); + CallBackInterface_Skeleton* skel0 = new CallBackInterface_Skeleton(_cb, rmiComm, objIdSent); + vecCallbackObj.push_back(skel0); + IoTRMIUtil::mapSkel->insert(make_pair(_cb, skel0)); + IoTRMIUtil::mapSkelId->insert(make_pair(_cb, objIdSent)); + cout << "Create new skeleton for TestClass! ID=" << objIdSent << endl; + //thread th0 (&CallBackInterface_Skeleton::___waitRequestInvokeMethod, std::ref(skel0)); + thread th0 (&CallBackInterface_Skeleton::___waitRequestInvokeMethod, std::ref(skel0), std::ref(skel0)); + th0.detach(); + //while(!didAlreadyInitWaitInvoke); + while(!skel0->didInitWaitInvoke()); + } else { + auto itId = IoTRMIUtil::mapSkelId->find(_cb); + objIdSent = itId->second; + cout << "Skeleton exists for TestClass! ID=" << objIdSent << endl; + } + + //int ___paramCB0 = 1; + int ___paramCB0 = objIdSent; int methodId = 1; string retType = "void"; int numParam = 1; string paramCls[] = { "int" }; void* paramObj[] = { &___paramCB0 }; void* retObj = NULL; - rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj); -} - -void TestClassComplete_Stub::___regCB() { - int numParam = 3; - int methodId = -9999; - string retType = "void"; - string paramCls[] = { "int*", "String", "int" }; - int rev = 0; - void* paramObj[] = { &ports, &callbackAddress, &rev }; - void* retObj = NULL; - rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj); + rmiComm->remoteCall(objectId, methodId, paramCls, paramObj, numParam); } -short TestClassComplete_Stub::getShort(short in) { +mutex mtxMethodExec0; // TODO: We probably need to correlate this always with class name, e.g. methodExecCallBackInterfaceWithCallBack +short TestClassComplete_Stub::getShort(short in) { + lock_guard guard(mtxMethodExec0); + cout << "getShort() is called!!!" << endl << endl; int methodId = 0; string retType = "short"; int numParam = 1; @@ -76,11 +79,23 @@ short TestClassComplete_Stub::getShort(short in) { void* paramObj[] = { &in }; short retVal = 0; void* retObj = &retVal; - rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj); + cout << "Calling remote call!" << endl; + rmiComm->remoteCall(objectId, methodId, paramCls, paramObj, numParam); + cout << "Finished calling remote call!" << endl; + // Waiting for return value + while(!retValueReceived0); + rmiComm->getReturnValue(retType, retObj); + //retValueReceived0.exchange(false); + retValueReceived0 = false; + didGetReturnBytes.exchange(true); + cout << "Getting return value for getShort(): " << retVal << endl; + return retVal; } -int TestClassComplete_Stub::callBack() { +mutex mtxMethodExec2; // TODO: We probably need to correlate this always with class name, e.g. methodExecCallBackInterfaceWithCallBack +int TestClassComplete_Stub::callBack() { + lock_guard guard(mtxMethodExec2); int methodId = 2; string retType = "int"; int numParam = 0; @@ -88,7 +103,16 @@ int TestClassComplete_Stub::callBack() { void* paramObj[] = { }; int retVal = 0; void* retObj = &retVal; - rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj); + rmiComm->remoteCall(objectId, methodId, paramCls, paramObj, numParam); + // Waiting for return value + while(!retValueReceived2); + rmiComm->getReturnValue(retType, retObj); + //retValueReceived2.exchange(false); + retValueReceived2 = false; + didGetReturnBytes.exchange(true); + + cout << "Getting return value for callback(): " << retVal << endl; + return retVal; } @@ -96,7 +120,8 @@ int TestClassComplete_Stub::callBack() { int main(int argc, char *argv[]) { - int port = 5010; + int portSend = 5000; + int portRecv = 6000; const char* address = "localhost"; //const char* address = "192.168.2.191"; // RPi2 //const char* skeletonAddress = "128.195.136.170"; // dc-9.calit2.uci.edu @@ -106,17 +131,29 @@ int main(int argc, char *argv[]) //const char* callbackAddress = "192.168.2.191"; // RPi2 int rev = 0; bool bResult = false; - vector ports; - ports.push_back(12345); - ports.push_back(22346); + //vector ports; + //ports.push_back(12345); + //ports.push_back(22346); //ports.push_back(32344); //ports.push_back(43212); - TestClassComplete *tcStub = new TestClassComplete_Stub(port, skeletonAddress, callbackAddress, rev, &bResult, ports); + TestClassComplete *tcStub = new TestClassComplete_Stub(portSend, portRecv, skeletonAddress, rev, &bResult); + //cout << "Getting return value from getShort(): " << tcStub->getShort(1234) << endl; + //cout << "Getting return value from getShort(): " << tcStub->getShort(4321) << endl; + //cout << "Getting return value from getShort(): " << tcStub->getShort(5678) << endl; cout << "==== CALLBACK ====" << endl; CallBackInterface *cbSingle = new CallBack(2354); tcStub->registerCallback(cbSingle); + //tcStub->registerCallback(cbSingle); + CallBackInterface *cbSingle1 = new CallBack(2646); + tcStub->registerCallback(cbSingle1); + CallBackInterface *cbSingle2 = new CallBack(2000); + tcStub->registerCallback(cbSingle2); cout << "Return value from callback: " << tcStub->callBack() << endl; + //cout << "Return value from callback: " << tcStub->callBack() << endl; + + // TODO: we need this while loop at the end to keep the threads running + while(true); return 0; } diff --git a/iotjava/iotrmi/C++/basics/TestClassInterface_Skeleton.cpp b/iotjava/iotrmi/C++/basics/TestClassInterface_Skeleton.cpp index 0ad7586..65524f7 100644 --- a/iotjava/iotrmi/C++/basics/TestClassInterface_Skeleton.cpp +++ b/iotjava/iotrmi/C++/basics/TestClassInterface_Skeleton.cpp @@ -7,32 +7,30 @@ using namespace std; -TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, string _callbackAddress, int _port) { +TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, int _portSend, int _portRecv) { bool _bResult = false; mainObj = _mainObj; - callbackAddress = _callbackAddress; - rmiObj = new IoTRMIObject(_port, &_bResult); - set0Allowed.insert(-9999); - ___waitRequestInvokeMethod(); + rmiComm = new IoTRMICommServer(_portSend, _portRecv, &_bResult); + IoTRMIUtil::mapSkel->insert(make_pair(_mainObj, this)); + IoTRMIUtil::mapSkelId->insert(make_pair(_mainObj, objectId)); + rmiComm->registerSkeleton(objectId, &methodReceived); + thread th1 (&TestClassInterface_Skeleton::___waitRequestInvokeMethod, this, this); +// th1.detach(); + th1.join(); } -TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, int _objIdCnt, string _callbackAddress) { +TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, IoTRMIComm *_rmiComm, int _objectId) { bool _bResult = false; mainObj = _mainObj; - objIdCnt = _objIdCnt; - callbackAddress = _callbackAddress; - set0Allowed.insert(-9999); - ___waitRequestInvokeMethod(); + rmiComm = _rmiComm; + objectId = _objectId; + rmiComm->registerSkeleton(objectId, &methodReceived); } TestClassInterface_Skeleton::~TestClassInterface_Skeleton() { - if (rmiObj != NULL) { - delete rmiObj; - rmiObj = NULL; - } - if (rmiCall != NULL) { - delete rmiCall; - rmiCall = NULL; + if (rmiComm != NULL) { + delete rmiComm; + rmiComm = NULL; } for(CallBackInterfaceWithCallBack* cb : vecCallbackObj) { delete cb; @@ -40,6 +38,11 @@ TestClassInterface_Skeleton::~TestClassInterface_Skeleton() { } } +bool TestClassInterface_Skeleton::didInitWaitInvoke() { + + return didAlreadyInitWaitInvoke; +} + short TestClassInterface_Skeleton::getShort(short in) { return mainObj->getShort(in); } @@ -48,84 +51,112 @@ void TestClassInterface_Skeleton::registerCallback(CallBackInterfaceWithCallBack mainObj->registerCallback(_cb); } -void TestClassInterface_Skeleton::___regCB() { - int numParam = 3; - vector param1; - string param2 = ""; - int param3 = 0; - string paramCls[] = { "int*", "String", "int" }; - void* paramObj[] = { ¶m1, ¶m2, ¶m3 }; - rmiObj->getMethodParams(paramCls, numParam, paramObj); - bool bResult = false; - rmiCall = new IoTRMICall(param1[1], param2.c_str(), param3, &bResult); -} - int TestClassInterface_Skeleton::callBack() { return mainObj->callBack(); } -void TestClassInterface_Skeleton::___getShort() { +void TestClassInterface_Skeleton::___getShort(TestClassInterface_Skeleton* skel) { + char* localMethodBytes = new char[methodLen]; + memcpy(localMethodBytes, skel->methodBytes, methodLen); + //cout << "Bytes inside getShort: " << endl; + //IoTRMIUtil::printBytes(localMethodBytes, methodLen, false); + didGetMethodBytes.exchange(true); string paramCls[] = { "short" }; int numParam = 1; short in; void* paramObj[] = { &in }; - rmiObj->getMethodParams(paramCls, numParam, paramObj); + skel->rmiComm->getMethodParams(paramCls, numParam, paramObj, localMethodBytes); short retVal = getShort(in); + cout << "Getting return value getShort(): " << retVal << endl; void* retObj = &retVal; - rmiObj->sendReturnObj(retObj, "short"); + skel->rmiComm->sendReturnObj(retObj, "short", localMethodBytes); + cout << "Sent return value for getShort()" << endl; + delete[] localMethodBytes; } -void TestClassInterface_Skeleton::___registerCallback() { +void TestClassInterface_Skeleton::___registerCallback(TestClassInterface_Skeleton* skel) { + char* localMethodBytes = new char[methodLen]; + memcpy(localMethodBytes, skel->methodBytes, methodLen); + didGetMethodBytes.exchange(true); string paramCls[] = { "int" }; int numParam = 1; int numStubs0 = 0; void* paramObj[] = { &numStubs0 }; - rmiObj->getMethodParams(paramCls, numParam, paramObj); - //CallBackInterfaceWithCallBack* stub0 = new CallBackInterfaceWithCallBack_CallbackStub(rmiCall, callbackAddress, objIdCnt, ports); - CallBackInterfaceWithCallBack* stub0 = new CallBackInterfaceWithCallBack_Stub(rmiCall, callbackAddress, objIdCnt, ports); - vecCallbackObj.push_back(stub0); - objIdCnt++; - registerCallback(stub0); + skel->rmiComm->getMethodParams(paramCls, numParam, paramObj, localMethodBytes); + // Choosing the right stub + int objIdRecv = numStubs0; + CallBackInterfaceWithCallBack* stub0 = NULL; + auto it = IoTRMIUtil::mapStub->find(objIdRecv); + if (it == IoTRMIUtil::mapStub->end()) { // Not in the map, so new object + stub0 = new CallBackInterfaceWithCallBack_Stub(rmiComm, objIdRecv); + IoTRMIUtil::mapStub->insert(make_pair(objIdRecv, stub0)); + cout << "Create new stub for Callback! ID=" << objIdRecv << endl; + rmiComm->setObjectIdCounter(objIdRecv); + rmiComm->decrementObjectIdCounter(); + } else { + stub0 = (CallBackInterfaceWithCallBack_Stub*) it->second; + cout << "Stub exists for Callback! ID=" << objIdRecv << endl; + } + skel->vecCallbackObj.push_back(stub0); + skel->registerCallback(stub0); + delete[] localMethodBytes; } -void TestClassInterface_Skeleton::___callBack() { +void TestClassInterface_Skeleton::___callBack(TestClassInterface_Skeleton* skel) { + char* localMethodBytes = new char[methodLen]; + memcpy(localMethodBytes, skel->methodBytes, methodLen); + didGetMethodBytes.exchange(true); string paramCls[] = { }; int numParam = 0; void* paramObj[] = { }; - rmiObj->getMethodParams(paramCls, numParam, paramObj); + skel->rmiComm->getMethodParams(paramCls, numParam, paramObj, localMethodBytes); int retVal = callBack(); void* retObj = &retVal; - rmiObj->sendReturnObj(retObj, "int"); + skel->rmiComm->sendReturnObj(retObj, "int", localMethodBytes); + delete[] localMethodBytes; } -void TestClassInterface_Skeleton::___waitRequestInvokeMethod() { +void TestClassInterface_Skeleton::___waitRequestInvokeMethod(TestClassInterface_Skeleton* skel) { + cout << "Running loop!" << endl; + //didAlreadyInitWaitInvoke.exchange(true); + skel->didAlreadyInitWaitInvoke = true; while (true) { - rmiObj->getMethodBytes(); - int _objectId = rmiObj->getObjectId(); - int methodId = rmiObj->getMethodId(); - if (_objectId == object0Id) { - if (set0Allowed.find(methodId) == set0Allowed.end()) { + if (!methodReceived) + continue; + skel->methodBytes = skel->rmiComm->getMethodBytes(); + skel->methodLen = skel->rmiComm->getMethodLength(); + cout << endl; + // TODO: Get method length as well!!! + //methodReceived.exchange(false); + methodReceived = false; + int _objectId = skel->rmiComm->getObjectId(skel->methodBytes); + int methodId = skel->rmiComm->getMethodId(skel->methodBytes); + if (_objectId == objectId) { + if (skel->set0Allowed.find(methodId) == skel->set0Allowed.end()) { cerr << "Object with object Id: " << _objectId << " is not allowed to access method: " << methodId << endl; return; } } - else { - cerr << "Object Id: " << _objectId << " not recognized!" << endl; - return; - } + else + continue; switch (methodId) { - case 0: ___getShort(); break; - case 1: ___registerCallback(); break; - case 2: ___callBack(); break; - case -9999: ___regCB(); break; + case 0: { thread th0 (&TestClassInterface_Skeleton::___getShort, std::ref(skel), skel); th0.detach(); break; } + //___getShort(skel); break; + case 1: { thread th1 (&TestClassInterface_Skeleton::___registerCallback, std::ref(skel), skel); th1.detach(); break; } + //___registerCallback(skel); break; + case 2: { thread th2 (&TestClassInterface_Skeleton::___callBack, std::ref(skel), skel); th2.detach(); break; } + //___callBack(skel); break; default: cerr << "Method Id " << methodId << " not recognized!" << endl; - throw exception(); + //throw exception(); + return; } + cout << "Out of switch statement!" << endl; } } + int main(int argc, char *argv[]) { // First argument is port number @@ -139,10 +170,11 @@ int main(int argc, char *argv[]) cout << argv3 << endl; cout << argv4 << endl;*/ - int port = 5010; + int portSend = 5000; + int portRecv = 6000; //TestClassInterface *tc = new TestClass(argv2, argv3, argv4); TestClassInterface *tc = new TestClass(123, 2.345, "test"); - TestClassInterface_Skeleton *tcSkel = new TestClassInterface_Skeleton(tc, "localhost", port); + TestClassInterface_Skeleton *tcSkel = new TestClassInterface_Skeleton(tc, portSend, portRecv); //delete tc; //delete tcSkel; diff --git a/iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java b/iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java index 1b0bc08..f3b2cbc 100644 --- a/iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java +++ b/iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java @@ -38,10 +38,10 @@ public class TestClassCallbacks_Stub { System.out.println("Registered callback!"); System.out.println("Return value from callback 1: " + tcstub.callBack() + "\n\n"); - //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)4576) + "\n\n"); - //System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n"); - //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1233) + "\n\n"); - //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1321) + "\n\n"); + System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)4576) + "\n\n"); + System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n"); + System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1233) + "\n\n"); + System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1321) + "\n\n"); while(true) {} } }