import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/** Class IoTRMICall is a class that serves method calls on stub.
* <p>
*/
private IoTRMIUtil rmiUtil;
private IoTSocketClient rmiClient;
- //private List<String> listMethodId; // Map from method ID to signature
-
-
+ private byte[] retValueBytes;
+ //private AtomicBoolean didGetReturnValue;
+ //private Map<String,byte[]> mapReturnValue; // Store the return value received in a map
+ private ConcurrentLinkedQueue<byte[]> returnQueue;
+ private Map<String,AtomicBoolean> mapStubId;
+ private AtomicBoolean didGetReturnBytes;
+ private int objectIdCounter = Integer.MAX_VALUE;
+
/**
* Constructors
*/
public IoTRMICall(int _port, String _address, int _rev) throws IOException {
+ //didGetReturnValue = new AtomicBoolean(false);
rmiUtil = new IoTRMIUtil();
rmiClient = new IoTSocketClient(_port, _address, _rev);
+ retValueBytes = null;
+ returnQueue = new ConcurrentLinkedQueue<byte[]>();
+ mapStubId = new HashMap<String,AtomicBoolean>();
+ didGetReturnBytes = new AtomicBoolean(false);
+ //mapReturnValue = new HashMap<String,byte[]>();
+ waitForReturnValue();
+ wakeUpThread();
+ }
+
+
+ public IoTRMICall(int _localPort, int _port, String _address, int _rev) throws IOException {
+
+ //didGetReturnValue = new AtomicBoolean(false);
+ rmiUtil = new IoTRMIUtil();
+ rmiClient = new IoTSocketClient(_localPort, _port, _address, _rev);
+ retValueBytes = null;
+ returnQueue = new ConcurrentLinkedQueue<byte[]>();
+ mapStubId = new HashMap<String,AtomicBoolean>();
+ didGetReturnBytes = new AtomicBoolean(false);
+ //mapReturnValue = new HashMap<String,byte[]>();
+ waitForReturnValue();
+ wakeUpThread();
}
/**
- * remoteCall() calls a method remotely by passing in parameters and getting a return Object
+ * waitForReturnValue() starts a thread that waits for return value for a method invocation
+ */
+ public void waitForReturnValue() {
+
+ Thread thread = new Thread() {
+ public void run() {
+ byte[] retBytes = null;
+ while(true) {
+ try {
+ retBytes = rmiClient.receiveBytes(retBytes);
+ if (retBytes != null) {
+ System.out.println("Return value not null: " + Arrays.toString(retBytes));
+ //byte[] keyBytes = getObjectAndMethodIdBytes();
+ //String strKeyBytes = new String(keyBytes);
+ returnQueue.offer(retBytes);
+ } else
+ Thread.sleep(100);
+ retBytes = null;
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw new Error("IoTRMICall: Error receiving return value bytes!");
+ }
+ }
+ }
+ };
+ thread.start();
+ }
+
+
+ /**
+ * wakeUpThread() wakes up the correct thread
+ */
+ public void wakeUpThread() {
+
+ Thread thread = new Thread() {
+ public void run() {
+ while(true) {
+ // Take the current method from the queue and wake up the correct thread
+ retValueBytes = returnQueue.poll();
+ if (retValueBytes != null) { // If there is method bytes
+ System.out.println("methodBytes in wake up thread: " + Arrays.toString(retValueBytes));
+ int objectId = getObjectId();
+ int methodId = getMethodId();
+ String strKey = objectId + "-" + methodId;
+ AtomicBoolean retRecv = mapStubId.get(strKey);
+ System.out.println("boolean status: " + retRecv + " with key: " + strKey);
+ didGetReturnBytes.set(false);
+ while(!retRecv.compareAndSet(false, true));
+ System.out.println("boolean status: " + retRecv + " - map has: " + mapStubId.size());
+ while(!didGetReturnBytes.get()); // While skeleton is still processing
+ }
+ }
+ }
+ };
+ thread.start();
+ }
+
+
+ /**
+ * registerStub() registers the skeleton to be woken up
+ */
+ public synchronized void registerStub(int objectId, int methodId, AtomicBoolean retValueReceived) {
+
+ String strKey = objectId + "-" + methodId;
+ System.out.println("Key exist? " + mapStubId.containsKey(strKey));
+ mapStubId.put(strKey, retValueReceived);
+ System.out.println("\n\nAdding keyBytes: " + strKey + " now size: " + mapStubId.size() + "\n\n");
+ }
+
+
+ /**
+ * getObjectIdCounter() gets object Id counter
+ */
+ public int getObjectIdCounter() {
+
+ return objectIdCounter;
+ }
+
+
+ /**
+ * setObjectIdCounter() sets object Id counter
+ */
+ public void setObjectIdCounter(int objIdCounter) {
+
+ objectIdCounter = objIdCounter;
+ }
+
+
+ /**
+ * decrementObjectIdCounter() gets object Id counter
+ */
+ public void decrementObjectIdCounter() {
+
+ objectIdCounter--;
+ }
+
+
+
+ /**
+ * setGetReturnBytes() set boolean if there is a new return value already
+ */
+ public synchronized boolean setGetReturnBytes() {
+
+ return didGetReturnBytes.compareAndSet(false, true);
+ }
+
+
+ /**
+ * getObjectAndMethodIdBytes() extracts object Id and method Id from method bytes
+ */
+ public byte[] getObjectAndMethodIdBytes() {
+
+ int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
+ byte[] objectMethodIdBytes = new byte[objMethIdLen];
+ System.arraycopy(retValueBytes, 0, objectMethodIdBytes, 0, objMethIdLen);
+ return objectMethodIdBytes;
+ }
+
+
+ /**
+ * getObjectAndMethodIdBytes() gets object and method Id in bytes
+ */
+ public byte[] getObjectAndMethodIdBytes(int objectId, int methodId) {
+
+ int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
+ byte[] objectMethodIdBytes = new byte[objMethIdLen];
+ byte[] objIdBytes = IoTRMIUtil.intToByteArray(objectId);
+ byte[] methIdBytes = IoTRMIUtil.intToByteArray(methodId);
+ System.arraycopy(objIdBytes, 0, objectMethodIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
+ System.arraycopy(methIdBytes, 0, objectMethodIdBytes, IoTRMIUtil.OBJECT_ID_LEN, IoTRMIUtil.METHOD_ID_LEN);
+ return objectMethodIdBytes;
+ }
+
+
+ /**
+ * getObjectId() gets object Id from bytes
+ */
+ public int getObjectId() {
+
+ // Get object Id bytes
+ byte[] objectIdBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN];
+ System.arraycopy(retValueBytes, 0, objectIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
+ // Get object Id
+ int objectId = IoTRMIUtil.byteArrayToInt(objectIdBytes);
+ return objectId;
+ }
+
+
+ /**
+ * getMethodId() gets method Id from bytes
+ */
+ public int getMethodId() {
+
+ // Get method Id bytes
+ byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN];
+ // Method Id is positioned after object Id in the byte array
+ System.arraycopy(retValueBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN);
+ // Get method Id
+ int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes);
+ // Get method Id
+ return methodId;
+ }
+
+
+ /**
+ * remoteCall() calls a method remotely by passing in parameters and getting a return Object (DEPRECATED)
*/
- public synchronized Object remoteCall(int objectId, int methodId, Class<?> retType, Class<?> retGenTypeKey,
+ public synchronized Object remoteCall(int objectId, int methodId, Class<?> retType,
Class<?> retGenTypeVal, Class<?>[] paramCls, Object[] paramObj) {
// Send method info
ex.printStackTrace();
throw new Error("IoTRMICall: Error when receiving bytes - rmiClient.receiveBytes()");
}
- System.out.println("Return object bytes: " + Arrays.toString(retObjBytes));
- retObj = IoTRMIUtil.getParamObject(retType, retGenTypeKey, retGenTypeVal, retObjBytes);
+ retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retObjBytes);
+ }
+ return retObj;
+ }
+
+
+ public synchronized void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj) {
+
+ // Send method info
+ byte[] methodBytes = methodToBytes(objectId, methodId, paramCls, paramObj);
+ try {
+ rmiClient.sendBytes(methodBytes);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ throw new Error("IoTRMICall: Error when sending bytes - rmiClient.sendBytes()");
}
+ }
+
+
+ /**
+ * getReturnValue() returns return value
+ */
+ public synchronized Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal) {
+
+ // Receive return value and return it to caller
+ // Now just strip off the object ID and method ID
+ int valByteLen = retValueBytes.length - (IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
+ byte[] retValBytes = new byte[valByteLen];
+ // Method Id is positioned after object Id in the byte array
+ System.arraycopy(retValueBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retValBytes, 0, valByteLen);
+ Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
+ // This means the right object and method have gotten the return value, so we set this back to false
+ return retObj;
+ }
+
+
+ /**
+ * getReturnValue() returns return value
+ */
+ public synchronized Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal, byte[] retValueBytes) {
+
+ // Receive return value and return it to caller
+ // Now just strip off the object ID and method ID
+ int valByteLen = retValueBytes.length - (IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
+ byte[] retValBytes = new byte[valByteLen];
+ // Method Id is positioned after object Id in the byte array
+ System.arraycopy(retValueBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retValBytes, 0, valByteLen);
+ Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
+ // This means the right object and method have gotten the return value, so we set this back to false
return retObj;
}
}
- public static void main(String[] args) throws Exception {
-
- String[] test = { "123", "456", "789" };
- byte[] b = IoTRMIUtil.getObjectBytes(test);
-
- Boolean[] test2 = new Boolean[] { true, false, false };
- byte[] b2 = IoTRMIUtil.getObjectBytes(test2);
-
- System.out.println(Arrays.toString(b));
- System.out.println(Arrays.toString(b2));
-
- String[] c = (String[]) IoTRMIUtil.getParamObjectArray(String[].class, b);
- System.out.println(Arrays.toString(c));
-
- Boolean[] c2 = (Boolean[]) IoTRMIUtil.getParamObjectArray(Boolean[].class, b2);
- System.out.println(Arrays.toString(c2));
-
- // Set
- /*Set<String> set = new HashSet<String>();
- set.add("1234");
- set.add("5678");
-
- byte[] objBytes = IoTRMIUtil.getObjectBytes(set);
- System.out.println(Arrays.toString(objBytes));
- Object obj = IoTRMIUtil.getParamObject(Set.class, null, String.class, objBytes);
-
- @SuppressWarnings("unchecked")
- Set<String> setStr = (Set<String>) obj;
- System.out.println("Set: " + setStr.toString());*/
-
- // List
- /*List<Long> list = new ArrayList<Long>();
- list.add(12345678l);
- list.add(23455432l);
- list.add(34566543l);
-
- byte[] objBytes = IoTRMIUtil.getObjectBytes(list);
- System.out.println(Arrays.toString(objBytes));
- Object obj = IoTRMIUtil.getParamObject(List.class, null, Long.class, objBytes);
+ /**
+ * remoteCall() calls a method remotely by passing in parameters and getting a return Object
+ */
+ public synchronized Object[] getStructObjects(Class<?>[] retType, Class<?>[] retGenTypeVal) {
- @SuppressWarnings("unchecked")
- List<Long> listStr = (List<Long>) obj;
- System.out.println("List: " + listStr.toString());*/
+ // Receive return value and return it to caller
+ Object[] retObj = null;
+ byte[] retObjBytes = null;
+ try {
+ retObjBytes = rmiClient.receiveBytes(retObjBytes);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ throw new Error("IoTRMICall: Error when receiving bytes - rmiClient.receiveBytes()");
+ }
+ retObj = getReturnObjects(retObjBytes, retType, retGenTypeVal);
- // Map
- Map<Long,Integer> map = new HashMap<Long,Integer>();
- map.put(12345678l, 1234);
- map.put(23455432l, 5678);
- map.put(34566543l, 4321);
+ return retObj;
+ }
- byte[] objBytes = IoTRMIUtil.getObjectBytes(map);
- System.out.println(Arrays.toString(objBytes));
- Object obj = IoTRMIUtil.getParamObject(Map.class, Long.class, Integer.class, objBytes);
- map = (Map<Long,Integer>) obj;
- System.out.println("Received map: " + map.toString());
+ public Object[] getReturnObjects(byte[] retBytes, Class<?>[] arrCls, Class<?>[] arrGenValCls) {
- //@SuppressWarnings("unchecked")
- //List<Long> listStr = (List<Long>) obj;
- //System.out.println("List: " + listStr.toString());
+ // Byte scanning position
+ int pos = 0;
+ Object[] retObj = new Object[arrCls.length];
+ for (int i=0; i < arrCls.length; i++) {
+
+ String retType = arrCls[i].getSimpleName();
+ int retSize = rmiUtil.getTypeSize(retType);
+ // Get the 32-bit field in the byte array to get the actual
+ // length (this is a param with indefinite length)
+ if (retSize == -1) {
+ byte[] bytRetLen = new byte[IoTRMIUtil.RETURN_LEN];
+ System.arraycopy(retBytes, pos, bytRetLen, 0, IoTRMIUtil.RETURN_LEN);
+ pos = pos + IoTRMIUtil.RETURN_LEN;
+ retSize = IoTRMIUtil.byteArrayToInt(bytRetLen);
+ }
+ byte[] retObjBytes = new byte[retSize];
+ System.arraycopy(retBytes, pos, retObjBytes, 0, retSize);
+ pos = pos + retSize;
+ retObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], retObjBytes);
+ }
+ return retObj;
}
}