9f45c60695e68c3e0e88f025980ef24536acc83c
[iot2.git] / iotjava / iotrmi / C++ / IoTRMIComm.hpp
1 /** Class IoTRMIComm combines the functionalities
2  *  of IoTRMICall and IoTRMIObject to create a single
3  *  communication class with two sockets serving one
4  *  directional traffic for each.
5  *
6  * @author      Rahmadi Trimananda <rtrimana @ uci.edu>
7  * @version     1.0
8  * @since       2017-01-28
9  */
10 #ifndef _IOTRMICOMM_HPP__
11 #define _IOTRMICOMM_HPP__
12
13 #include <iostream>
14 #include <string>
15 #include <atomic>
16 #include <limits>
17 #include <thread>
18 #include <mutex>
19
20 #include "IoTSocketServer.hpp"
21 #include "IoTSocketClient.hpp"
22 #include "ConcurrentLinkedListQueue.cpp"
23
24 using namespace std;
25
26 std::atomic<bool> didGetMethodBytes(false);
27 std::atomic<bool> didGetReturnBytes(false);
28
29 mutex regSkelMutex;
30 mutex regStubMutex;
31 mutex retValMutex;
32 mutex remoteCallMutex;
33 mutex sendReturnObjMutex;
34
35 class IoTRMIComm {
36         public:
37                 IoTRMIComm();
38                 ~IoTRMIComm();
39                 // Public methods
40                 virtual void            sendReturnObj(void* retObj, string type, char* methodBytes) = 0;
41                 virtual void            sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) = 0;
42                 int                                     returnLength(void* retObj[], string retCls[], int numRet);
43                 char*                           returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet);
44                 char*                           getMethodBytes();
45                 int                                     getMethodLength();
46                 int                                     getObjectIdFromMethod();
47                 static int                      getObjectId(char* packetBytes);
48                 static int                      getMethodId(char* packetBytes);
49                 static int                      getPacketType(char* packetBytes);
50                 void**                          getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes);
51                 void                            registerSkeleton(int objectId, bool* methodReceived);
52                 void                            registerStub(int objectId, int methodId, bool* retValueReceived);
53                 int                                     getObjectIdCounter();
54                 void                            setObjectIdCounter(int objIdCounter);
55                 void                            decrementObjectIdCounter();
56
57                 int                                     methodLength(string paramCls[], void* paramObj[], int numParam);
58                 char*                           methodToBytes(int objectId, int methId, string paramCls[], void* paramObj[],
59                                                                 char* method, int numParam);
60                 virtual void            remoteCall(int objectId, int methodId, string paramCls[], 
61                                                                 void* paramObj[], int numParam) = 0;
62                 void*                           getReturnValue(string retType, void* retObj);
63                 
64                 void**                          getStructObjects(string retType[], int numRet, void* retObj[]);
65                 void**                          getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]);
66
67         protected:
68                 IoTRMIUtil                                      *rmiUtil;
69                 char*                                           methodBytes;
70                 int                                                     methodLen;
71                 char*                                           retValueBytes;
72                 int                                                     retValueLen;
73                 ConcurrentLinkedListQueue       methodQueue;
74                 ConcurrentLinkedListQueue       returnQueue;
75                 map<int,bool*>                          mapSkeletonId;
76                 map<string,bool*>                       mapStubId;
77                 int                                                     objectIdCounter = std::numeric_limits<int>::max();
78
79         private:
80                 // Private methods
81                 void                            wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm);
82                 void                            wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm);
83 };
84
85
86 // Constructor
87 IoTRMIComm::IoTRMIComm() {
88
89         rmiUtil = new IoTRMIUtil();
90         methodBytes = NULL;
91         retValueBytes = NULL;
92         methodLen = 0;
93         retValueLen = 0;
94         thread th1 (&IoTRMIComm::wakeUpThreadOnMethodCall, this, this);
95         th1.detach();
96         thread th2 (&IoTRMIComm::wakeUpThreadOnReturnValue, this, this);
97         th2.detach();   
98
99 }
100
101
102 // Destructor
103 IoTRMIComm::~IoTRMIComm() {
104
105         // Clean up
106         if (rmiUtil != NULL) {  
107                 delete rmiUtil;
108                 rmiUtil = NULL;         
109         }
110 }
111
112
113 void IoTRMIComm::wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm) {
114
115         int methLen = 0;
116         //cout << "Starting wakeUpThreadOnMethodCall()" << endl;
117         while(true) {
118                 // Convert back to char*
119                 char* queueHead = rmiComm->methodQueue.deQAndGetLength(&methLen);
120                 if (queueHead != NULL) {
121                         rmiComm->methodBytes = queueHead;
122                         rmiComm->methodLen = methLen;
123                         //IoTRMIUtil::printBytes(rmiComm->methodBytes, rmiComm->methodLen, false);
124                         int currObjId = rmiComm->getObjectId(rmiComm->methodBytes);
125                         auto search = rmiComm->mapSkeletonId.find(currObjId);
126                         bool* methRecv = search->second;
127                         didGetMethodBytes.exchange(false);
128                         *methRecv = true;
129                         while(!didGetMethodBytes);
130                 }
131         }
132 }
133
134
135 void IoTRMIComm::wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm) {
136
137         int retLen = 0;
138         //cout << "Starting wakeUpThreadOnReturnValue()" << endl;
139         while(true) {
140                 // Convert back to char*
141                 char* queueHead = rmiComm->returnQueue.deQAndGetLength(&retLen);
142                 if (queueHead != NULL) {
143                         rmiComm->retValueBytes = queueHead;
144                         rmiComm->retValueLen = retLen;
145                         //IoTRMIUtil::printBytes(rmiComm->retValueBytes, rmiComm->retValueLen, false);
146                         int objectId = rmiComm->getObjectId(rmiComm->retValueBytes);
147                         int methodId = rmiComm->getMethodId(rmiComm->retValueBytes);
148                         string strKey = to_string(objectId) + "-" + to_string(methodId);
149                         auto search = rmiComm->mapStubId.find(strKey);
150                         bool* retRecv = search->second;
151                         didGetReturnBytes.exchange(false);
152                         *retRecv = true;
153                         while(!didGetReturnBytes);
154                 }
155         }
156 }
157
158
159 // registerSkeleton() registers the skeleton to be woken up
160 void IoTRMIComm::registerSkeleton(int objectId, bool* methodReceived) {
161
162         lock_guard<mutex> guard(regSkelMutex);
163         mapSkeletonId.insert(make_pair(objectId, methodReceived));
164 }
165
166
167 // registerStub() registers the skeleton to be woken up
168 void IoTRMIComm::registerStub(int objectId, int methodId, bool* retValueReceived) {
169
170         lock_guard<mutex> guard(regStubMutex);
171         string strKey = to_string(objectId) + "-" + to_string(methodId);
172         mapStubId.insert(make_pair(strKey, retValueReceived));
173 }
174
175
176 // getObjectIdCounter() gets object Id counter
177 int     IoTRMIComm::getObjectIdCounter() {
178
179         return objectIdCounter;
180 }
181
182
183 // setObjectIdCounter() sets object Id counter
184 void IoTRMIComm::setObjectIdCounter(int objIdCounter) {
185
186         objectIdCounter = objIdCounter;
187 }
188
189
190 // decrementObjectIdCounter() gets object Id counter
191 void IoTRMIComm::decrementObjectIdCounter() {
192
193         objectIdCounter--;
194 }
195
196
197 // Get method bytes from the socket
198 char* IoTRMIComm::getMethodBytes() {
199
200         // Get method bytes
201         return methodBytes;
202 }
203
204
205 // Get method length from the socket
206 int IoTRMIComm::getMethodLength() {
207
208         // Get method bytes
209         return methodLen;
210 }
211
212
213 // Get object Id from bytes
214 int IoTRMIComm::getObjectIdFromMethod() {
215
216         char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN];
217         memcpy(objectIdBytes, methodBytes, IoTRMIUtil::OBJECT_ID_LEN);
218         // Get method signature 
219         int objectId = 0;
220         IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes);
221         
222         return objectId;
223 }
224
225
226 // Get object Id from bytes (static version)
227 int IoTRMIComm::getObjectId(char* packetBytes) {
228
229         char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN];
230         memcpy(objectIdBytes, packetBytes, IoTRMIUtil::OBJECT_ID_LEN);
231         // Get method signature 
232         int objectId = 0;
233         IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes);
234         
235         return objectId;
236 }
237
238
239 // Get methodId from bytes (static version)
240 int IoTRMIComm::getMethodId(char* packetBytes) {
241
242         // Get method Id
243         char methodIdBytes[IoTRMIUtil::METHOD_ID_LEN];
244         int offset = IoTRMIUtil::OBJECT_ID_LEN;
245         memcpy(methodIdBytes, packetBytes + offset, IoTRMIUtil::METHOD_ID_LEN);
246         // Get method signature 
247         int methodId = 0;
248         IoTRMIUtil::byteArrayToInt(&methodId, methodIdBytes);
249         
250         return methodId;
251 }
252
253
254 // Get methodId from bytes (static version)
255 int IoTRMIComm::getPacketType(char* packetBytes) {
256
257         // Get method Id
258         char packetTypeBytes[IoTRMIUtil::METHOD_ID_LEN];
259         int offset = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
260         memcpy(packetTypeBytes, packetBytes + offset, IoTRMIUtil::PACKET_TYPE_LEN);
261         // Get method signature 
262         int packetType = 0;
263         IoTRMIUtil::byteArrayToInt(&packetType, packetTypeBytes);
264         
265         return packetType;
266 }
267
268
269 // Get method parameters and return an array of parameter objects
270 //
271 // For primitive objects:
272 // | 32-bit method ID | m-bit actual data (fixed length)  |
273 // 
274 // For string, arrays, and non-primitive objects:
275 // | 32-bit method ID | 32-bit length | n-bit actual data | ...
276 void** IoTRMIComm::getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes) {
277
278         // Byte scanning position
279         int pos = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
280         for (int i = 0; i < numParam; i++) {
281                 int paramLen = rmiUtil->getTypeSize(paramCls[i]);
282                 // Get the 32-bit field in the byte array to get the actual
283                 //              length (this is a param with indefinite length)
284                 if (paramLen == -1) {
285                         char bytPrmLen[IoTRMIUtil::PARAM_LEN];
286                         memcpy(bytPrmLen, methodBytes + pos, IoTRMIUtil::PARAM_LEN);
287                         pos = pos + IoTRMIUtil::PARAM_LEN;
288                         int* prmLenPtr = IoTRMIUtil::byteArrayToInt(&paramLen, bytPrmLen);
289                         paramLen = *prmLenPtr;
290                 }
291                 char paramBytes[paramLen];
292                 memcpy(paramBytes, methodBytes + pos, paramLen);
293                 pos = pos + paramLen;
294                 paramObj[i] = IoTRMIUtil::getParamObject(paramObj[i], paramCls[i].c_str(), paramBytes, paramLen);
295         }
296
297         return paramObj;
298 }
299
300
301 // Find the bytes length of a return object (struct that has more than 1 member)
302 int     IoTRMIComm::returnLength(void* retObj[], string retCls[], int numRet) {
303
304         // Get byte arrays and calculate return bytes length
305         int returnLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
306         for (int i = 0; i < numRet; i++) {
307                 // Find the return length
308                 int retObjLen = rmiUtil->getTypeSize(retCls[i]);
309                 if (retObjLen == -1) { // Store the length of the field - indefinite length
310                         retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]);
311                         // Some space for return length, i.e. 32 bits for integer               
312                         returnLen = returnLen + IoTRMIUtil::RETURN_LEN;
313                 }
314                 // Calculate returnLen
315                 returnLen = returnLen + retObjLen;
316         }
317
318         return returnLen;
319 }
320
321
322 // Convert return object (struct members) into bytes
323 char* IoTRMIComm::returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet) {
324
325         int pos = 0;
326         // Get byte arrays and calculate return bytes length
327         for (int i = 0; i < numRet; i++) {
328                 // Find the return length
329                 int retObjLen = rmiUtil->getTypeSize(retCls[i]);
330                 if (retObjLen == -1) { // Store the length of the field - indefinite length
331                         retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]);
332                         // Write the return length
333                         char retLenBytes[IoTRMIUtil::RETURN_LEN];
334                         IoTRMIUtil::intToByteArray(retObjLen, retLenBytes);
335                         memcpy(retBytes + pos, retLenBytes, IoTRMIUtil::RETURN_LEN);                    
336                         pos = pos + IoTRMIUtil::RETURN_LEN;
337                 }
338                 // Get array of bytes and put it in the array of array of bytes
339                 char objBytes[retObjLen];
340                 IoTRMIUtil::getObjectBytes(objBytes, retObj[i], retCls[i].c_str());
341                 memcpy(retBytes + pos, objBytes, retObjLen);
342                 pos = pos + retObjLen;
343         }
344
345         return retBytes;
346 }
347
348
349 // Get return value for single values (non-structs)
350 void* IoTRMIComm::getReturnValue(string retType, void* retObj) {
351
352         // Receive return value and return it to caller
353         lock_guard<mutex> guard(retValMutex);
354         // Copy just the actual return value bytes
355         int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
356         int retActualLen = retValueLen - headerLen;
357         //char *retActualBytes = new char[retActualLen];
358         char retActualBytes[retActualLen];
359         memcpy(retActualBytes, retValueBytes + headerLen, retActualLen);
360         //IoTRMIUtil::printBytes(retActualBytes, retActualLen, false);
361         retObj = IoTRMIUtil::getParamObject(retObj, retType.c_str(), retActualBytes, retActualLen);
362         // Delete received bytes object
363         delete[] retValueBytes;
364         //delete[] retActualBytes;
365         
366         return retObj;
367 }
368
369
370 // Get a set of return objects (struct)
371 void** IoTRMIComm::getStructObjects(string retType[], int numRet, void* retObj[]) {
372
373         // Critical section that is used by different objects
374         lock_guard<mutex> guard(retValMutex);
375         // Copy just the actual return value bytes
376         int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
377         int retActualLen = retValueLen - headerLen;
378         char retActualBytes[retActualLen];
379         memcpy(retActualBytes, retValueBytes + headerLen, retActualLen);
380         // Return size of array of struct
381         retObj = getReturnObjects(retActualBytes, retType, numRet, retObj);
382         // Delete received bytes object
383         delete[] retValueBytes;
384         
385         return retObj;
386 }
387
388
389 // Find the bytes length of a method
390 int IoTRMIComm::methodLength(string paramCls[], void* paramObj[], int numParam) {
391
392         // Get byte arrays and calculate method bytes length
393         // Start from the object Id + method Id...
394         int methodLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
395         for (int i = 0; i < numParam; i++) {
396                 // Find the parameter length
397                 int paramLen = rmiUtil->getTypeSize(paramCls[i]);
398                 if (paramLen == -1) { // Store the length of the field - indefinite length
399                         paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]);
400                         // Some space for param length, i.e. 32 bits for integer                
401                         methodLen = methodLen + IoTRMIUtil::PARAM_LEN;
402                 }
403                 // Calculate methodLen
404                 methodLen = methodLen + paramLen;
405         }
406         return methodLen;
407 }
408
409
410 // Convert method and its parameters into bytes
411 char* IoTRMIComm::methodToBytes(int objectId, int methId, string paramCls[], 
412                 void* paramObj[], char* method, int numParam) {
413
414         // Get object Id in bytes
415         char objId[IoTRMIUtil::OBJECT_ID_LEN];
416         IoTRMIUtil::intToByteArray(objectId, objId);
417         memcpy(method, objId, IoTRMIUtil::OBJECT_ID_LEN);
418         int pos = IoTRMIUtil::OBJECT_ID_LEN;
419         // Get method Id in bytes
420         char methodId[IoTRMIUtil::METHOD_ID_LEN];
421         IoTRMIUtil::intToByteArray(methId, methodId);
422         memcpy(method + pos, methodId, IoTRMIUtil::METHOD_ID_LEN);
423         pos = pos + IoTRMIUtil::METHOD_ID_LEN;
424         char packetType[IoTRMIUtil::PACKET_TYPE_LEN];
425         IoTRMIUtil::intToByteArray(IoTRMIUtil::METHOD_TYPE, methodId);
426         memcpy(method + pos, methodId, IoTRMIUtil::PACKET_TYPE_LEN);
427         pos = pos + IoTRMIUtil::PACKET_TYPE_LEN;
428         // Get byte arrays and calculate method bytes length
429         for (int i = 0; i < numParam; i++) {
430                 // Find the parameter length
431                 int paramLen = rmiUtil->getTypeSize(paramCls[i]);
432                 if (paramLen == -1) { // Store the length of the field - indefinite length
433                         paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]);
434                         // Write the parameter length
435                         char prmLenBytes[IoTRMIUtil::PARAM_LEN];
436                         IoTRMIUtil::intToByteArray(paramLen, prmLenBytes);
437                         memcpy(method + pos, prmLenBytes, IoTRMIUtil::PARAM_LEN);                       
438                         pos = pos + IoTRMIUtil::PARAM_LEN;
439                 }
440                 // Get array of bytes and put it in the array of array of bytes
441                 char objBytes[paramLen];
442                 IoTRMIUtil::getObjectBytes(objBytes, paramObj[i], paramCls[i].c_str());
443                 memcpy(method + pos, objBytes, paramLen);
444                 pos = pos + paramLen;
445         }
446
447         return method;
448 }
449
450
451 // Get return objects for structs
452 void** IoTRMIComm::getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]) {
453
454         // Byte scanning position
455         int pos = 0;
456         for (int i = 0; i < numRet; i++) {
457                 int retLen = rmiUtil->getTypeSize(retCls[i]);
458                 // Get the 32-bit field in the byte array to get the actual
459                 //              length (this is a param with indefinite length)
460                 if (retLen == -1) {
461                         char bytRetLen[IoTRMIUtil::RETURN_LEN];
462                         memcpy(bytRetLen, retBytes + pos, IoTRMIUtil::RETURN_LEN);
463                         pos = pos + IoTRMIUtil::RETURN_LEN;
464                         int* retLenPtr = IoTRMIUtil::byteArrayToInt(&retLen, bytRetLen);
465                         retLen = *retLenPtr;
466                 }
467                 char retObjBytes[retLen];
468                 memcpy(retObjBytes, retBytes + pos, retLen);
469                 pos = pos + retLen;
470                 retObj[i] = IoTRMIUtil::getParamObject(retObj[i], retCls[i].c_str(), retObjBytes, retLen);
471         }
472
473         return retObj;
474 }
475 #endif
476
477