3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.util.Arrays;
6 import java.util.ArrayList;
7 import java.util.HashMap;
11 import java.lang.reflect.*;
13 import java.util.concurrent.*;
14 import java.util.concurrent.atomic.AtomicBoolean;
17 /** Abstract class IoTRMIComm is a class that combines IoTRMIObject and IoTRMICall
19 * We will arbitrate packets into 2 queues and wake up the right threads/callers.
20 * We separate traffics one-directionally.
22 * @author Rahmadi Trimananda <rtrimana @ uci.edu>
26 public abstract class IoTRMIComm {
31 protected IoTRMIUtil rmiUtil;
32 protected byte[] methodBytes;
33 protected byte[] retValueBytes;
34 protected ConcurrentLinkedQueue<byte[]> methodQueue;
35 protected ConcurrentLinkedQueue<byte[]> returnQueue;
36 protected Map<Integer,AtomicBoolean> mapSkeletonId;
37 protected Map<String,AtomicBoolean> mapStubId;
38 protected AtomicBoolean didGetMethodBytes;
39 protected AtomicBoolean didGetReturnBytes;
40 protected int objectIdCounter = Integer.MAX_VALUE;
43 * Constructor (for skeleton)
45 public IoTRMIComm() throws
46 ClassNotFoundException, InstantiationException,
47 IllegalAccessException, IOException {
49 rmiUtil = new IoTRMIUtil();
52 methodQueue = new ConcurrentLinkedQueue<byte[]>();
53 returnQueue = new ConcurrentLinkedQueue<byte[]>();
54 mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
55 mapStubId = new HashMap<String,AtomicBoolean>();
56 didGetMethodBytes = new AtomicBoolean(false);
57 didGetReturnBytes = new AtomicBoolean(false);
58 wakeUpThreadOnMethodCall();
59 wakeUpThreadOnReturnValue();
64 * wakeUpThreadOnMethodCall() wakes up the correct thread when receiving method call
66 private void wakeUpThreadOnMethodCall() {
68 Thread thread = new Thread() {
71 // Take the current method from the queue and wake up the correct thread
72 methodBytes = methodQueue.poll();
73 if (methodBytes != null) { // If there is method bytes
74 int currObjId = getObjectId(methodBytes);
75 AtomicBoolean methRecv = mapSkeletonId.get(currObjId);
76 didGetMethodBytes.set(false);
77 while(!methRecv.compareAndSet(false, true));
78 while(!didGetMethodBytes.get()); // While skeleton is still processing
88 * wakeUpThreadOnReturnValue() wakes up the correct thread when receiving return value
90 private void wakeUpThreadOnReturnValue() {
92 Thread thread = new Thread() {
95 // Take the current method from the queue and wake up the correct thread
96 retValueBytes = returnQueue.poll();
97 if (retValueBytes != null) { // If there is method bytes
98 System.out.println("retValBytes in wake up thread: " + Arrays.toString(retValueBytes));
99 int objectId = getObjectId(retValueBytes);
100 int methodId = getMethodId(retValueBytes);
101 String strKey = objectId + "-" + methodId;
102 AtomicBoolean retRecv = mapStubId.get(strKey);
103 //System.out.println("boolean status: " + retRecv + " with key: " + strKey);
104 didGetReturnBytes.set(false);
105 while(!retRecv.compareAndSet(false, true));
106 //System.out.println("boolean status: " + retRecv + " - map has: " + mapStubId.size());
107 while(!didGetReturnBytes.get()); // While skeleton is still processing
117 * registerSkeleton() registers the skeleton to be woken up
119 public synchronized void registerSkeleton(int objectId, AtomicBoolean methodReceived) {
121 mapSkeletonId.put(objectId, methodReceived);
126 * registerStub() registers the skeleton to be woken up
128 public synchronized void registerStub(int objectId, int methodId, AtomicBoolean retValueReceived) {
130 String strKey = objectId + "-" + methodId;
131 //System.out.println("Key exist? " + mapStubId.containsKey(strKey));
132 mapStubId.put(strKey, retValueReceived);
133 //System.out.println("\n\nAdding keyBytes: " + strKey + " now size: " + mapStubId.size() + "\n\n");
138 * getObjectIdCounter() gets object Id counter
140 public int getObjectIdCounter() {
142 return objectIdCounter;
147 * setObjectIdCounter() sets object Id counter
149 public void setObjectIdCounter(int objIdCounter) {
151 objectIdCounter = objIdCounter;
156 * decrementObjectIdCounter() gets object Id counter
158 public void decrementObjectIdCounter() {
165 * setGetMethodBytes() set didGetMethodBytes to true after getting the bytes
167 public boolean setGetMethodBytes() {
169 return didGetMethodBytes.compareAndSet(false, true);
174 * setGetReturnBytes() set didGetReturnBytes if there is a new return value already
176 public synchronized boolean setGetReturnBytes() {
178 return didGetReturnBytes.compareAndSet(false, true);
183 * getMethodBytes() get the method in bytes
185 public byte[] getMethodBytes() throws IOException {
187 // Just return the methodBytes content
193 * static version of getObjectId()
195 public static int getObjectId(byte[] packetBytes) {
197 // Get object Id bytes
198 byte[] objectIdBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN];
199 System.arraycopy(packetBytes, 0, objectIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
201 int objectId = IoTRMIUtil.byteArrayToInt(objectIdBytes);
207 * static version of getMethodId()
209 public static int getMethodId(byte[] packetBytes) {
211 // Get method Id bytes
212 byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN];
213 // Method Id is positioned after object Id in the byte array
214 System.arraycopy(packetBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN);
216 int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes);
223 * static version of getPacketType() - either method or return value (position is after object Id and method Id)
225 public static int getPacketType(byte[] packetBytes) {
227 // Get packet type bytes
228 byte[] packetTypeBytes = new byte[IoTRMIUtil.PACKET_TYPE_LEN];
229 int offset = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
230 System.arraycopy(packetBytes, offset, packetTypeBytes, 0, IoTRMIUtil.PACKET_TYPE_LEN);
231 // Get packet type (for now we assume 1 as method and -1 as return value
232 int packetType = IoTRMIUtil.byteArrayToInt(packetTypeBytes);
238 * getMethodParams() gets method params based on byte array received
240 * Basically this is the format of a method in bytes:
241 * 1) 32-bit value of object ID
242 * 2) 32-bit value of method ID
243 * 3) m parameters with n-bit value each (m x n-bit)
244 * For the parameters that don't have definite length,
245 * we need to extract the length from a preceding 32-bit
246 * field in front of it.
248 * For primitive objects:
249 * | 32-bit object ID | 32-bit method ID | m-bit actual data (fixed length) | ...
251 * For string, arrays, and non-primitive objects:
252 * | 32-bit object ID | 32-bit method ID | 32-bit length | n-bit actual data | ...
255 public Object[] getMethodParams(Class<?>[] arrCls, Class<?>[] arrGenValCls, byte[] methodBytes) {
257 // Byte scanning position
258 int pos = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
259 Object[] paramObj = new Object[arrCls.length];
260 for (int i=0; i < arrCls.length; i++) {
262 String paramType = arrCls[i].getSimpleName();
263 int paramSize = rmiUtil.getTypeSize(paramType);
264 // Get the 32-bit field in the byte array to get the actual
265 // length (this is a param with indefinite length)
266 if (paramSize == -1) {
267 byte[] bytPrmLen = new byte[IoTRMIUtil.PARAM_LEN];
268 System.arraycopy(methodBytes, pos, bytPrmLen, 0, IoTRMIUtil.PARAM_LEN);
269 pos = pos + IoTRMIUtil.PARAM_LEN;
270 paramSize = IoTRMIUtil.byteArrayToInt(bytPrmLen);
272 byte[] paramBytes = new byte[paramSize];
273 System.arraycopy(methodBytes, pos, paramBytes, 0, paramSize);
274 pos = pos + paramSize;
275 paramObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], paramBytes);
283 * sendReturnObj() abstract version
285 public abstract void sendReturnObj(Object retObj, byte[] methodBytes);
289 * sendReturnObj() abstract version
291 public abstract void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes);
295 * returnToBytes() takes array of objects and generates bytes
297 public byte[] returnToBytes(Class<?>[] retCls, Object[] retObj) {
299 // Get byte arrays and calculate method bytes length
300 int numbRet = retObj.length;
302 byte[][] objBytesArr = new byte[numbRet][];
303 for (int i = 0; i < numbRet; i++) {
304 // Get byte arrays for the objects
305 objBytesArr[i] = IoTRMIUtil.getObjectBytes(retObj[i]);
306 String clsName = retCls[i].getSimpleName();
307 int retObjLen = rmiUtil.getTypeSize(clsName);
308 if (retObjLen == -1) { // indefinite length - store the length first
309 retLen = retLen + IoTRMIUtil.RETURN_LEN;
311 retLen = retLen + objBytesArr[i].length;
313 // Construct return in byte array
314 byte[] retBytes = new byte[retLen];
316 // Iteration for copying bytes
317 for (int i = 0; i < numbRet; i++) {
319 String clsName = retCls[i].getSimpleName();
320 int retObjLen = rmiUtil.getTypeSize(clsName);
321 if (retObjLen == -1) { // indefinite length
322 retObjLen = objBytesArr[i].length;
323 byte[] retLenBytes = IoTRMIUtil.intToByteArray(retObjLen);
324 System.arraycopy(retLenBytes, 0, retBytes, pos, IoTRMIUtil.RETURN_LEN);
325 pos = pos + IoTRMIUtil.RETURN_LEN;
327 System.arraycopy(objBytesArr[i], 0, retBytes, pos, retObjLen);
328 pos = pos + retObjLen;
336 * remoteCall() abstract version
338 public abstract void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj);
342 * getReturnValue() returns return value object
344 public Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal) {
346 // Receive return value and return it to caller
347 // Now just strip off the object ID and method ID
348 int headerLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
349 int valByteLen = retValueBytes.length - headerLen;
350 byte[] retValBytes = new byte[valByteLen];
351 // Method Id is positioned after object Id in the byte array
352 System.arraycopy(retValueBytes, headerLen, retValBytes, 0, valByteLen);
353 Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
354 // This means the right object and method have gotten the return value, so we set this back to false
360 * methodToBytes() returns byte representation of a method
362 public byte[] methodToBytes(int objectId, int methId, Class<?>[] paramCls, Object[] paramObj) {
364 // Initialized to the length of method ID
365 int methodLen = IoTRMIUtil.OBJECT_ID_LEN;
366 byte[] objId = IoTRMIUtil.intToByteArray(objectId);
367 // Get method ID in bytes
368 byte[] methodId = IoTRMIUtil.intToByteArray(methId);
369 // Get byte arrays and calculate method bytes length
370 int numbParam = paramObj.length;
371 methodLen = methodLen + IoTRMIUtil.METHOD_ID_LEN;
372 methodLen = methodLen + IoTRMIUtil.PACKET_TYPE_LEN;
373 byte[][] objBytesArr = new byte[numbParam][];
374 for (int i = 0; i < numbParam; i++) {
375 // Get byte arrays for the objects
376 objBytesArr[i] = IoTRMIUtil.getObjectBytes(paramObj[i]);
377 String clsName = paramCls[i].getSimpleName();
378 int paramLen = rmiUtil.getTypeSize(clsName);
379 if (paramLen == -1) { // indefinite length - store the length first
380 methodLen = methodLen + IoTRMIUtil.PARAM_LEN;
382 methodLen = methodLen + objBytesArr[i].length;
384 // Construct method in byte array
385 byte[] method = new byte[methodLen];
387 System.arraycopy(objId, 0, method, 0, IoTRMIUtil.METHOD_ID_LEN);
388 pos = pos + IoTRMIUtil.OBJECT_ID_LEN;
389 System.arraycopy(methodId, 0, method, pos, IoTRMIUtil.METHOD_ID_LEN);
390 pos = pos + IoTRMIUtil.METHOD_ID_LEN;
391 int packetType = IoTRMIUtil.METHOD_TYPE; // This is a method
392 byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
393 System.arraycopy(packetTypeBytes, 0, method, pos, IoTRMIUtil.PACKET_TYPE_LEN);
394 pos = pos + IoTRMIUtil.PACKET_TYPE_LEN;
395 // Second iteration for copying bytes
396 for (int i = 0; i < numbParam; i++) {
398 String clsName = paramCls[i].getSimpleName();
399 int paramLen = rmiUtil.getTypeSize(clsName);
400 if (paramLen == -1) { // indefinite length
401 paramLen = objBytesArr[i].length;
402 byte[] paramLenBytes = IoTRMIUtil.intToByteArray(paramLen);
403 System.arraycopy(paramLenBytes, 0, method, pos, IoTRMIUtil.PARAM_LEN);
404 pos = pos + IoTRMIUtil.PARAM_LEN;
406 System.arraycopy(objBytesArr[i], 0, method, pos, paramLen);
407 pos = pos + paramLen;
415 * getStructObjects() calls a method remotely by passing in parameters and getting a return Object
417 /*public synchronized Object[] getStructObjects(Class<?>[] retType, Class<?>[] retGenTypeVal) {
419 // Receive return value and return it to caller
420 Object[] retObj = null;
421 byte[] retObjBytes = null;
423 retObjBytes = rmiClientRecv.receiveBytes(retObjBytes);
424 } catch (IOException ex) {
425 ex.printStackTrace();
426 throw new Error("IoTRMICall: Error when receiving bytes - rmiClient.receiveBytes()");
428 retObj = getReturnObjects(retObjBytes, retType, retGenTypeVal);
435 * remoteCall() calls a method remotely by passing in parameters and getting a return Object
437 public Object[] getReturnObjects(byte[] retBytes, Class<?>[] arrCls, Class<?>[] arrGenValCls) {
439 // Byte scanning position
441 Object[] retObj = new Object[arrCls.length];
442 for (int i=0; i < arrCls.length; i++) {
444 String retType = arrCls[i].getSimpleName();
445 int retSize = rmiUtil.getTypeSize(retType);
446 // Get the 32-bit field in the byte array to get the actual
447 // length (this is a param with indefinite length)
449 byte[] bytRetLen = new byte[IoTRMIUtil.RETURN_LEN];
450 System.arraycopy(retBytes, pos, bytRetLen, 0, IoTRMIUtil.RETURN_LEN);
451 pos = pos + IoTRMIUtil.RETURN_LEN;
452 retSize = IoTRMIUtil.byteArrayToInt(bytRetLen);
454 byte[] retObjBytes = new byte[retSize];
455 System.arraycopy(retBytes, pos, retObjBytes, 0, retSize);
457 retObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], retObjBytes);