1 package iotruntime.master;
4 import iotruntime.slave.IoTAddress;
5 import iotruntime.slave.IoTDeviceAddress;
6 import iotruntime.messages.*;
9 import org.objectweb.asm.ClassReader;
10 import org.objectweb.asm.ClassWriter;
11 import org.objectweb.asm.ClassVisitor;
16 import java.io.BufferedReader;
17 import java.io.InputStream;
18 import java.io.InputStreamReader;
20 import java.io.FileInputStream;
21 import java.io.FileOutputStream;
22 import java.io.ObjectInputStream;
23 import java.io.ObjectOutputStream;
24 import java.io.IOException;
25 import java.lang.ClassNotFoundException;
26 import java.lang.Class;
27 import java.lang.reflect.*;
28 import java.net.Socket;
29 import java.net.ServerSocket;
31 import static java.lang.Math.toIntExact;
33 /** Class IoTMaster is responsible to use ClassRuntimeInstrumenterMaster
34 * to instrument the controller/device bytecode and starts multiple
35 * IoTSlave running on different JVM's in a distributed fashion.
37 * @author Rahmadi Trimananda <rahmadi.trimananda @ uci.edu>
41 public class IoTMaster {
44 * IoTMaster class properties
46 * CommunicationHandler maintains the data structure for hostnames and ports
47 * LoadBalancer assigns a job onto a host based on certain metrics
49 private CommunicationHandler commHan;
50 private LoadBalancer lbIoT;
51 private RouterConfig routerConfig;
52 private ObjectInitHandler objInitHand;
53 private ObjectAddressInitHandler objAddInitHand;
54 private String[] strObjectNames;
55 private Map<String,ClassRuntimeInstrumenterMaster> mapClassNameToCrim;
57 * These properties hold information of a certain object
60 private String strObjName;
61 private String strObjClassName;
62 private String strObjClassInterfaceName;
63 private String strObjStubClsIntfaceName;
64 private String strIoTMasterHostAdd;
65 private String strIoTSlaveControllerHostAdd;
66 private String strIoTSlaveObjectHostAdd;
67 private Class[] arrFieldClasses;
68 private Object[] arrFieldValues;
69 private Socket filesocket;
71 // Constants that are to be extracted from config file
72 private static String STR_MASTER_MAC_ADD;
73 private static String STR_IOT_CODE_PATH;
74 private static String STR_CONT_PATH;
75 private static String STR_RUNTIME_DIR;
76 private static String STR_CLS_PATH;
77 private static String STR_RMI_PATH;
78 private static String STR_RMI_HOSTNAME;
79 private static String STR_LOG_FILE_PATH;
80 private static String STR_SSH_USERNAME;
81 private static String STR_ROUTER_ADD;
82 private static String STR_MONITORING_HOST;
83 private static String STR_ZB_GATEWAY_ADDRESS;
84 private static String STR_ZB_GATEWAY_PORT;
85 private static String STR_ZB_IOTMASTER_PORT;
86 private static String STR_NUM_CALLBACK_PORTS;
87 private static String STR_JVM_INIT_HEAP_SIZE;
88 private static String STR_JVM_MAX_HEAP_SIZE;
89 private static boolean BOOL_VERBOSE;
92 * IoTMaster class constants
94 * Name constants - not to be configured by users
96 private static final String STR_IOT_MASTER_NAME = "IoTMaster";
97 private static final String STR_CFG_FILE_EXT = ".config";
98 private static final String STR_CLS_FILE_EXT = ".class";
99 private static final String STR_JAR_FILE_EXT = ".jar";
100 private static final String STR_ZIP_FILE_EXT = ".zip";
101 private static final String STR_TCP_PROTOCOL = "tcp";
102 private static final String STR_UDP_PROTOCOL = "udp";
103 private static final String STR_TCPGW_PROTOCOL = "tcpgw";
104 private static final String STR_NO_PROTOCOL = "nopro";
105 private static final String STR_SELF_MAC_ADD = "00:00:00:00:00:00";
106 private static final String STR_INTERFACE_CLS_CFG = "INTERFACE_CLASS";
107 private static final String STR_INT_STUB_CLS_CFG = "INTERFACE_STUB_CLASS";
108 private static final String STR_FILE_TRF_CFG = "ADDITIONAL_ZIP_FILE";
109 private static final String STR_YES = "Yes";
110 private static final String STR_NO = "No";
113 * Runtime class name constants - not to be configured by users
115 private static final String STR_REL_INSTRUMENTER_CLS = "iotruntime.master.RelationInstrumenter";
116 private static final String STR_SET_INSTRUMENTER_CLS = "iotruntime.master.SetInstrumenter";
117 private static final String STR_IOT_SLAVE_CLS = "iotruntime.slave.IoTSlave";
118 private static final String STR_IOT_DEV_ADD_CLS = "IoTDeviceAddress";
119 private static final String STR_IOT_ZB_ADD_CLS = "IoTZigbeeAddress";
120 private static final String STR_IOT_ADD_CLS = "IoTAddress";
126 public IoTMaster(String[] argObjNms) {
132 objAddInitHand = null;
133 strObjectNames = argObjNms;
135 strObjClassName = null;
136 strObjClassInterfaceName = null;
137 strObjStubClsIntfaceName = null;
138 strIoTMasterHostAdd = null;
139 strIoTSlaveControllerHostAdd = null;
140 strIoTSlaveObjectHostAdd = null;
141 arrFieldClasses = null;
142 arrFieldValues = null;
144 mapClassNameToCrim = null;
146 STR_MASTER_MAC_ADD = null;
147 STR_IOT_CODE_PATH = null;
148 STR_CONT_PATH = null;
149 STR_RUNTIME_DIR = null;
152 STR_RMI_HOSTNAME = null;
153 STR_LOG_FILE_PATH = null;
154 STR_SSH_USERNAME = null;
155 STR_ROUTER_ADD = null;
156 STR_MONITORING_HOST = null;
157 STR_ZB_GATEWAY_ADDRESS = null;
158 STR_ZB_GATEWAY_PORT = null;
159 STR_ZB_IOTMASTER_PORT = null;
160 STR_NUM_CALLBACK_PORTS = null;
161 STR_JVM_INIT_HEAP_SIZE = null;
162 STR_JVM_MAX_HEAP_SIZE = null;
163 BOOL_VERBOSE = false;
167 * A method to initialize CommunicationHandler, LoadBalancer, RouterConfig and ObjectInitHandler
171 private void initLiveDataStructure() {
173 commHan = new CommunicationHandler(BOOL_VERBOSE);
174 lbIoT = new LoadBalancer(BOOL_VERBOSE);
175 lbIoT.setupLoadBalancer();
176 routerConfig = new RouterConfig();
177 routerConfig.getAddressList(STR_ROUTER_ADD);
178 objInitHand = new ObjectInitHandler(BOOL_VERBOSE);
179 objAddInitHand = new ObjectAddressInitHandler(BOOL_VERBOSE);
180 mapClassNameToCrim = new HashMap<String,ClassRuntimeInstrumenterMaster>();
184 * A method to initialize constants from config file
188 private void parseIoTMasterConfigFile() {
189 // Parse configuration file
190 Properties prop = new Properties();
191 String strCfgFileName = STR_IOT_MASTER_NAME + STR_CFG_FILE_EXT;
192 File file = new File(strCfgFileName);
193 FileInputStream fis = null;
195 fis = new FileInputStream(file);
198 } catch (IOException ex) {
199 System.out.println("IoTMaster: Error reading config file: " + strCfgFileName);
200 ex.printStackTrace();
202 // Initialize constants from config file
203 STR_MASTER_MAC_ADD = prop.getProperty("MAC_ADDRESS");
204 STR_IOT_CODE_PATH = prop.getProperty("IOT_CODE_PATH");
205 STR_CONT_PATH = prop.getProperty("CONTROLLERS_CODE_PATH");
206 STR_RUNTIME_DIR = prop.getProperty("RUNTIME_DIR");
207 STR_CLS_PATH = prop.getProperty("CLASS_PATH");
208 STR_RMI_PATH = prop.getProperty("RMI_PATH");
209 STR_RMI_HOSTNAME = prop.getProperty("RMI_HOSTNAME");
210 STR_LOG_FILE_PATH = prop.getProperty("LOG_FILE_PATH");
211 STR_SSH_USERNAME = prop.getProperty("SSH_USERNAME");
212 STR_ROUTER_ADD = prop.getProperty("ROUTER_ADD");
213 STR_MONITORING_HOST = prop.getProperty("MONITORING_HOST");
214 STR_ZB_GATEWAY_ADDRESS = prop.getProperty("ZIGBEE_GATEWAY_ADDRESS");
215 STR_ZB_GATEWAY_PORT = prop.getProperty("ZIGBEE_GATEWAY_PORT");
216 STR_ZB_IOTMASTER_PORT = prop.getProperty("ZIGBEE_IOTMASTER_PORT");
217 STR_NUM_CALLBACK_PORTS = prop.getProperty("NUMBER_CALLBACK_PORTS");
218 STR_JVM_INIT_HEAP_SIZE = prop.getProperty("JVM_INIT_HEAP_SIZE");
219 STR_JVM_MAX_HEAP_SIZE = prop.getProperty("JVM_MAX_HEAP_SIZE");
220 if(prop.getProperty("VERBOSE").equals(STR_YES)) {
224 RuntimeOutput.print("IoTMaster: Extracting information from config file: " + strCfgFileName, BOOL_VERBOSE);
225 RuntimeOutput.print("STR_MASTER_MAC_ADD=" + STR_MASTER_MAC_ADD, BOOL_VERBOSE);
226 RuntimeOutput.print("STR_IOT_CODE_PATH=" + STR_IOT_CODE_PATH, BOOL_VERBOSE);
227 RuntimeOutput.print("STR_CONT_PATH=" + STR_CONT_PATH, BOOL_VERBOSE);
228 RuntimeOutput.print("STR_RUNTIME_DIR=" + STR_RUNTIME_DIR, BOOL_VERBOSE);
229 RuntimeOutput.print("STR_CLS_PATH=" + STR_CLS_PATH, BOOL_VERBOSE);
230 RuntimeOutput.print("STR_RMI_PATH=" + STR_RMI_PATH, BOOL_VERBOSE);
231 RuntimeOutput.print("STR_RMI_HOSTNAME=" + STR_RMI_HOSTNAME, BOOL_VERBOSE);
232 RuntimeOutput.print("STR_LOG_FILE_PATH=" + STR_LOG_FILE_PATH, BOOL_VERBOSE);
233 RuntimeOutput.print("STR_SSH_USERNAME=" + STR_SSH_USERNAME, BOOL_VERBOSE);
234 RuntimeOutput.print("STR_ROUTER_ADD=" + STR_ROUTER_ADD, BOOL_VERBOSE);
235 RuntimeOutput.print("STR_MONITORING_HOST=" + STR_MONITORING_HOST, BOOL_VERBOSE);
236 RuntimeOutput.print("STR_ZB_GATEWAY_ADDRESS=" + STR_ZB_GATEWAY_ADDRESS, BOOL_VERBOSE);
237 RuntimeOutput.print("STR_ZB_GATEWAY_PORT=" + STR_ZB_GATEWAY_PORT, BOOL_VERBOSE);
238 RuntimeOutput.print("STR_ZB_IOTMASTER_PORT=" + STR_ZB_IOTMASTER_PORT, BOOL_VERBOSE);
239 RuntimeOutput.print("STR_NUM_CALLBACK_PORTS=" + STR_NUM_CALLBACK_PORTS, BOOL_VERBOSE);
240 RuntimeOutput.print("STR_JVM_INIT_HEAP_SIZE=" + STR_JVM_INIT_HEAP_SIZE, BOOL_VERBOSE);
241 RuntimeOutput.print("STR_JVM_MAX_HEAP_SIZE=" + STR_JVM_MAX_HEAP_SIZE, BOOL_VERBOSE);
242 RuntimeOutput.print("BOOL_VERBOSE=" + BOOL_VERBOSE, BOOL_VERBOSE);
243 RuntimeOutput.print("IoTMaster: Information extracted successfully!", BOOL_VERBOSE);
247 * A method to parse information from a config file
249 * @param strCfgFileName Config file name
250 * @param strCfgField Config file field name
253 private String parseConfigFile(String strCfgFileName, String strCfgField) {
254 // Parse configuration file
255 Properties prop = new Properties();
256 File file = new File(strCfgFileName);
257 FileInputStream fis = null;
259 fis = new FileInputStream(file);
262 } catch (IOException ex) {
263 System.out.println("IoTMaster: Error reading config file: " + strCfgFileName);
264 ex.printStackTrace();
266 System.out.println("IoTMaster: Reading " + strCfgField +
267 " from config file: " + strCfgFileName + " with value: " +
268 prop.getProperty(strCfgField, null));
269 // NULL is returned if the property isn't found
270 return prop.getProperty(strCfgField, null);
274 * A method to send files from IoTMaster
276 * @param filesocket File socket object
277 * @param sFileName File name
278 * @param lFLength File length
281 private void sendFile(Socket filesocket, String sFileName, long lFLength) throws IOException {
283 File file = new File(sFileName);
284 byte[] bytFile = new byte[toIntExact(lFLength)];
285 InputStream inFileStream = new FileInputStream(file);
287 OutputStream outFileStream = filesocket.getOutputStream();
289 while ((iCount = inFileStream.read(bytFile)) > 0) {
290 outFileStream.write(bytFile, 0, iCount);
293 RuntimeOutput.print("IoTMaster: File sent!", BOOL_VERBOSE);
297 * A method to create a thread
299 * @param sSSHCmd SSH command
302 private void createThread(String sSSHCmd) throws IOException {
304 // Start a new thread to start a new JVM
306 Runtime runtime = Runtime.getRuntime();
307 Process process = runtime.exec(sSSHCmd);
309 RuntimeOutput.print("Executing: " + sSSHCmd, BOOL_VERBOSE);
313 * A method to send command from master and receive reply from slave
315 * @params msgSend Message object
316 * @params strPurpose String that prints purpose message
317 * @params inStream Input stream
318 * @params outStream Output stream
321 private void commMasterToSlave(Message msgSend, String strPurpose,
322 ObjectInputStream inStream, ObjectOutputStream outStream)
323 throws IOException, ClassNotFoundException {
325 // Send message/command from master
326 outStream.writeObject(msgSend);
327 RuntimeOutput.print("IoTMaster: Send message: " + strPurpose, BOOL_VERBOSE);
329 // Get reply from slave as acknowledgment
330 Message msgReply = (Message) inStream.readObject();
331 RuntimeOutput.print("IoTMaster: Reply message: " + msgReply.getMessage(), BOOL_VERBOSE);
335 * A private method to instrument IoTSet device
337 * @params strFieldIdentifier String field name + object ID
338 * @params strFieldName String field name
339 * @params strIoTSlaveObjectHostAdd String slave host address
340 * @params inStream ObjectInputStream communication
341 * @params inStream ObjectOutputStream communication
344 private void instrumentIoTSetDevice(String strFieldIdentifier, String strObjName, String strFieldName, String strIoTSlaveObjectHostAdd,
345 ObjectInputStream inStream, ObjectOutputStream outStream)
346 throws IOException, ClassNotFoundException, InterruptedException {
348 // Get information from the set
349 List<Object[]> listObject = objAddInitHand.getFields(strFieldIdentifier);
350 // Create a new IoTSet
351 Message msgCrtIoTSet = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTSET, strFieldName);
352 commMasterToSlave(msgCrtIoTSet, "Create new IoTSet for IoTDeviceAddress!", inStream, outStream);
353 int iRows = listObject.size();
354 RuntimeOutput.print("IoTMaster: Number of rows for IoTDeviceAddress: " + iRows, BOOL_VERBOSE);
355 // Transfer the address
356 for(int iRow=0; iRow<iRows; iRow++) {
357 arrFieldValues = listObject.get(iRow);
358 // Get device address - if 00:00:00:00:00:00 that means it needs the driver object address (self)
359 String strDeviceAddress = null;
360 String strDeviceAddressKey = null;
361 if (arrFieldValues[0].equals(STR_SELF_MAC_ADD)) {
362 strDeviceAddress = strIoTSlaveObjectHostAdd;
363 strDeviceAddressKey = strObjName + "-" + strIoTSlaveObjectHostAdd;
365 strDeviceAddress = routerConfig.getIPFromMACAddress((String) arrFieldValues[0]);
366 strDeviceAddressKey = strObjName + "-" + strDeviceAddress;
368 int iDestDeviceDriverPort = (int) arrFieldValues[1];
369 String strProtocol = (String) arrFieldValues[2];
370 // Check for wildcard feature
371 boolean bSrcPortWildCard = false;
372 boolean bDstPortWildCard = false;
373 if (arrFieldValues.length > 3) {
374 bSrcPortWildCard = (boolean) arrFieldValues[3];
375 bDstPortWildCard = (boolean) arrFieldValues[4];
377 // Add the port connection into communication handler - if it's not assigned yet
378 if (commHan.getComPort(strDeviceAddressKey) == null) {
379 commHan.addPortConnection(strIoTSlaveObjectHostAdd, strDeviceAddressKey);
383 System.out.println("\n\n DEBUG: InstrumentSetDevice: Object Name: " + strObjName);
384 System.out.println("DEBUG: InstrumentSetDevice: Port number: " + commHan.getComPort(strDeviceAddressKey));
385 System.out.println("DEBUG: InstrumentSetDevice: Device address: " + strDeviceAddressKey + "\n\n");
387 // Send address one by one
388 Message msgGetIoTSetObj = null;
389 if (bDstPortWildCard) {
390 String strUniqueDev = strDeviceAddressKey + ":" + iRow;
391 msgGetIoTSetObj = new MessageGetDeviceObject(IoTCommCode.GET_DEVICE_IOTSET_OBJECT,
392 strDeviceAddress, commHan.getAdditionalPort(strUniqueDev), iDestDeviceDriverPort, bSrcPortWildCard, bDstPortWildCard);
394 msgGetIoTSetObj = new MessageGetDeviceObject(IoTCommCode.GET_DEVICE_IOTSET_OBJECT,
395 strDeviceAddress, commHan.getComPort(strDeviceAddressKey), iDestDeviceDriverPort, bSrcPortWildCard, bDstPortWildCard);
396 commMasterToSlave(msgGetIoTSetObj, "Get IoTSet objects!", inStream, outStream);
398 // Reinitialize IoTSet on device object
399 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTSET_FIELD),
400 "Reinitialize IoTSet fields!", inStream, outStream);
405 * A private method to instrument IoTSet Zigbee device
407 * @params Map.Entry<String,Object> Entry of map IoTSet instrumentation
408 * @params strFieldName String field name
409 * @params strIoTSlaveObjectHostAdd String slave host address
410 * @params inStream ObjectInputStream communication
411 * @params inStream ObjectOutputStream communication
414 private void instrumentIoTSetZBDevice(Map.Entry<String,Object> map, String strObjName, String strFieldName, String strIoTSlaveObjectHostAdd,
415 ObjectInputStream inStream, ObjectOutputStream outStream)
416 throws IOException, ClassNotFoundException, InterruptedException {
418 // Get information from the set
419 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
420 // Create a new IoTSet
421 Message msgCrtIoTSet = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTSET, strFieldName);
422 commMasterToSlave(msgCrtIoTSet, "Create new IoTSet for IoTZigbeeAddress!", inStream, outStream);
423 // Prepare ZigbeeConfig
424 String strZigbeeGWAddress = routerConfig.getIPFromMACAddress(STR_ZB_GATEWAY_ADDRESS);
425 String strZigbeeGWAddressKey = strObjName + "-" + strZigbeeGWAddress;
426 int iZigbeeGWPort = Integer.parseInt(STR_ZB_GATEWAY_PORT);
427 int iZigbeeIoTMasterPort = Integer.parseInt(STR_ZB_IOTMASTER_PORT);
428 commHan.addDevicePort(iZigbeeIoTMasterPort);
429 ZigbeeConfig zbConfig = new ZigbeeConfig(strZigbeeGWAddress, iZigbeeGWPort, iZigbeeIoTMasterPort,
431 // Add the port connection into communication handler - if it's not assigned yet
432 if (commHan.getComPort(strZigbeeGWAddressKey) == null) {
433 commHan.addPortConnection(strIoTSlaveObjectHostAdd, strZigbeeGWAddressKey);
435 int iRows = setInstrumenter.numberOfRows();
436 RuntimeOutput.print("IoTMaster: Number of rows for IoTZigbeeAddress: " + iRows, BOOL_VERBOSE);
439 System.out.println("\n\n DEBUG: InstrumentZigbeeDevice: Object Name: " + strObjName);
440 System.out.println("DEBUG: InstrumentZigbeeDevice: Port number: " + commHan.getComPort(strZigbeeGWAddressKey));
441 System.out.println("DEBUG: InstrumentZigbeeDevice: Device address: " + strZigbeeGWAddress + "\n\n");
443 // Transfer the address
444 for(int iRow=0; iRow<iRows; iRow++) {
445 arrFieldValues = setInstrumenter.fieldValues(iRow);
446 // Get device address
447 String strZBDevAddress = (String) arrFieldValues[0];
448 // Send policy to Zigbee gateway - TODO: Need to clear policy first?
449 zbConfig.setPolicy(strIoTSlaveObjectHostAdd, commHan.getComPort(strZigbeeGWAddressKey), strZBDevAddress);
450 // Send address one by one
451 Message msgGetIoTSetZBObj = new MessageGetSimpleDeviceObject(IoTCommCode.GET_ZB_DEV_IOTSET_OBJECT,
453 commMasterToSlave(msgGetIoTSetZBObj, "Get IoTSet objects!", inStream, outStream);
455 zbConfig.closeConnection();
456 // Reinitialize IoTSet on device object
457 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTSET_FIELD), "Reinitialize IoTSet fields!", inStream, outStream);
462 * A private method to instrument IoTSet of addresses
464 * @params strFieldIdentifier String field name + object ID
465 * @params strFieldName String field name
466 * @params inStream ObjectInputStream communication
467 * @params inStream ObjectOutputStream communication
470 private void instrumentIoTSetAddress(String strFieldIdentifier, String strFieldName,
471 ObjectInputStream inStream, ObjectOutputStream outStream)
472 throws IOException, ClassNotFoundException, InterruptedException {
474 // Get information from the set
475 List<Object[]> listObject = objAddInitHand.getFields(strFieldIdentifier);
476 // Create a new IoTSet
477 Message msgCrtIoTSet = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTSET, strFieldName);
478 commMasterToSlave(msgCrtIoTSet, "Create new IoTSet for IoTAddress!", inStream, outStream);
479 int iRows = listObject.size();
480 RuntimeOutput.print("IoTMaster: Number of rows for IoTAddress: " + iRows, BOOL_VERBOSE);
481 // Transfer the address
482 for(int iRow=0; iRow<iRows; iRow++) {
483 arrFieldValues = listObject.get(iRow);
484 // Get device address
485 String strAddress = (String) arrFieldValues[0];
486 // Send address one by one
487 Message msgGetIoTSetAddObj = new MessageGetSimpleDeviceObject(IoTCommCode.GET_ADD_IOTSET_OBJECT,
489 commMasterToSlave(msgGetIoTSetAddObj, "Get IoTSet objects!", inStream, outStream);
491 // Reinitialize IoTSet on device object
492 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTSET_FIELD),
493 "Reinitialize IoTSet fields!", inStream, outStream);
498 * A private method to instrument an object on a specific machine and setting up policies
500 * @params strFieldObjectID String field object ID
503 private void instrumentObject(String strFieldObjectID) throws IOException {
505 // Extract the interface name for RMI
506 // e.g. ProximitySensorInterface, TempSensorInterface, etc.
508 String strObjCfgFile = STR_IOT_CODE_PATH + strObjClassName + "/" + strObjClassName + STR_CFG_FILE_EXT;
509 strObjClassInterfaceName = parseConfigFile(strObjCfgFile, STR_INTERFACE_CLS_CFG);
510 strObjStubClsIntfaceName = parseConfigFile(strObjCfgFile, STR_INT_STUB_CLS_CFG);
511 // Create an object name, e.g. ProximitySensorImplPS1
512 strObjName = strObjClassName + strFieldObjectID;
513 // Check first if host exists
514 if(commHan.objectExists(strObjName)) {
515 // If this object exists already ...
516 // Re-read IoTSlave object hostname for further reference
517 strIoTSlaveObjectHostAdd = commHan.getHostAddress(strObjName);
518 RuntimeOutput.print("IoTMaster: Object with name: " + strObjName + " has existed!", BOOL_VERBOSE);
520 // If this is a new object ... then create one
521 // Get host address for IoTSlave from LoadBalancer
522 //strIoTSlaveObjectHostAdd = lbIoT.selectHost();
523 strIoTSlaveObjectHostAdd = routerConfig.getIPFromMACAddress(lbIoT.selectHost());
524 if (strIoTSlaveControllerHostAdd == null)
525 throw new Error("IoTMaster: Could not translate MAC to IP address! Please check the router's /tmp/dhcp.leases!");
526 RuntimeOutput.print("IoTMaster: Object name: " + strObjName, BOOL_VERBOSE);
527 // Add port connection and get port numbers
528 // Naming for objects ProximitySensor becomes ProximitySensor0, ProximitySensor1, etc.
529 commHan.addPortConnection(strIoTSlaveObjectHostAdd, strObjName);
530 commHan.addActiveControllerObject(strFieldObjectID, strObjName, strObjClassName, strObjClassInterfaceName,
531 strObjStubClsIntfaceName, strIoTSlaveObjectHostAdd, arrFieldValues, arrFieldClasses);
532 // ROUTING POLICY: IoTMaster and device/controller object
533 // Master-slave communication
534 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTMasterHostAdd,
535 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjName));
536 // ROUTING POLICY: Send the same routing policy to both the hosts
537 routerConfig.configureHostMainPolicies(strIoTMasterHostAdd, strIoTMasterHostAdd,
538 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjName));
539 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTMasterHostAdd,
540 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjName));
541 // Need to accommodate callback functions here - open ports for TCP
542 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveControllerHostAdd,
543 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
544 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTSlaveControllerHostAdd,
545 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
546 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveControllerHostAdd,
547 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
548 // Instrument the IoTSet declarations inside the class file
549 instrumentObjectIoTSet(strFieldObjectID);
551 // Send routing policy to router for controller object
552 // ROUTING POLICY: RMI communication - RMI registry and stub ports
553 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
554 STR_TCP_PROTOCOL, commHan.getRMIRegPort(strObjName));
555 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
556 STR_TCP_PROTOCOL, commHan.getRMIStubPort(strObjName));
557 // Send the same set of routing policies to compute nodes
558 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
559 STR_TCP_PROTOCOL, commHan.getRMIRegPort(strObjName));
560 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
561 STR_TCP_PROTOCOL, commHan.getRMIRegPort(strObjName));
562 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
563 STR_TCP_PROTOCOL, commHan.getRMIStubPort(strObjName));
564 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
565 STR_TCP_PROTOCOL, commHan.getRMIStubPort(strObjName));
566 // Send the same set of routing policies for callback ports
567 setCallbackPortsPolicy(strObjName, STR_ROUTER_ADD, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
571 * A private method to set router policies for callback ports
573 * @params strRouterAdd String router address
574 * @params strIoTSlaveControllerHostAdd String slave controller host address
575 * @params strIoTSlaveObjectHostAdd String slave object host address
576 * @params strProtocol String protocol
577 * @return iPort Integer port number
579 private void setCallbackPortsPolicy(String strObjName, String strRouterAdd, String strIoTSlaveControllerHostAdd,
580 String strIoTSlaveObjectHostAdd, String strProtocol) {
582 int iNumCallbackPorts = Integer.parseInt(STR_NUM_CALLBACK_PORTS);
583 Integer[] rmiCallbackPorts = commHan.getCallbackPorts(strObjName, iNumCallbackPorts);
585 // Iterate over port numbers and set up policies
586 for (int i=0; i<iNumCallbackPorts; i++) {
587 routerConfig.configureRouterMainPolicies(strRouterAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
588 strProtocol, rmiCallbackPorts[i]);
589 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
590 strProtocol, rmiCallbackPorts[i]);
591 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
592 strProtocol, rmiCallbackPorts[i]);
597 * A private method to set router policies for IoTDeviceAddress objects
599 * @params strFieldIdentifier String field name + object ID
600 * @params Map.Entry<String,Object> Entry of map IoTSet instrumentation
601 * @params strIoTSlaveObjectHostAdd String slave host address
604 private void setRouterPolicyIoTSetDevice(String strFieldIdentifier, Map.Entry<String,Object> map,
605 String strIoTSlaveObjectHostAdd) {
607 // Get information from the set
608 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
609 int iRows = setInstrumenter.numberOfRows();
610 RuntimeOutput.print("IoTMaster: Number of rows for IoTDeviceAddress: " + iRows, BOOL_VERBOSE);
611 // Transfer the address
612 for(int iRow=0; iRow<iRows; iRow++) {
613 arrFieldValues = setInstrumenter.fieldValues(iRow);
614 objAddInitHand.addField(strFieldIdentifier, arrFieldValues); // Save this for object instantiation
615 // Get device address - if 00:00:00:00:00:00 that means it needs the driver object address (self)
616 String strDeviceAddress = null;
617 String strDeviceAddressKey = null;
618 if (arrFieldValues[0].equals(STR_SELF_MAC_ADD)) {
619 strDeviceAddress = strIoTSlaveObjectHostAdd;
620 strDeviceAddressKey = strObjName + "-" + strIoTSlaveObjectHostAdd;
621 } else { // Concatenate object name and IP address to give unique key - for a case where there is one device for multiple drivers
622 strDeviceAddress = routerConfig.getIPFromMACAddress((String) arrFieldValues[0]);
623 strDeviceAddressKey = strObjName + "-" + strDeviceAddress;
625 int iDestDeviceDriverPort = (int) arrFieldValues[1];
626 String strProtocol = (String) arrFieldValues[2];
627 // Add the port connection into communication handler - if it's not assigned yet
628 if (commHan.getComPort(strDeviceAddressKey) == null)
629 commHan.addPortConnection(strIoTSlaveObjectHostAdd, strDeviceAddressKey);
630 boolean bDstPortWildCard = false;
631 // Recognize this and allocate different ports for it
632 if (arrFieldValues.length > 3) {
633 bDstPortWildCard = (boolean) arrFieldValues[4];
634 if (bDstPortWildCard) { // This needs a unique source port
635 String strUniqueDev = strDeviceAddressKey + ":" + iRow;
636 commHan.addAdditionalPort(strUniqueDev);
641 System.out.println("\n\n DEBUG: InstrumentPolicySetDevice: Object Name: " + strObjName);
642 System.out.println("DEBUG: InstrumentPolicySetDevice: Port number: " + commHan.getComPort(strDeviceAddressKey));
643 System.out.println("DEBUG: InstrumentPolicySetDevice: Device address: " + strDeviceAddressKey + "\n\n");
646 // Send routing policy to router for device drivers and devices
647 // ROUTING POLICY: RMI communication - RMI registry and stub ports
648 if((iDestDeviceDriverPort == -1) && (!strProtocol.equals(STR_NO_PROTOCOL))) {
649 // Port number -1 means that we don't set the policy strictly to port number level
650 // "nopro" = no protocol specified for just TCP or just UDP (can be both used as well)
651 // ROUTING POLICY: Device driver and device
652 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress, strProtocol);
653 // ROUTING POLICY: Send to the compute node where the device driver is
654 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd, strDeviceAddress, strProtocol);
655 } else if((iDestDeviceDriverPort == -1) && (strProtocol.equals(STR_NO_PROTOCOL))) {
656 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress);
657 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd, strDeviceAddress);
658 } else if(strProtocol.equals(STR_TCPGW_PROTOCOL)) {
659 // This is a TCP protocol that connects, e.g. a phone to our runtime system
660 // that provides a gateway access (accessed through destination port number)
661 commHan.addDevicePort(iDestDeviceDriverPort);
662 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress, STR_TCP_PROTOCOL, iDestDeviceDriverPort);
663 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd, strDeviceAddress, STR_TCP_PROTOCOL, iDestDeviceDriverPort);
664 routerConfig.configureRouterHTTPPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress);
665 routerConfig.configureHostHTTPPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd, strDeviceAddress);
667 // Other port numbers...
668 commHan.addDevicePort(iDestDeviceDriverPort);
669 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress, strProtocol, commHan.getComPort(strDeviceAddressKey),
670 iDestDeviceDriverPort);
671 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd, strDeviceAddress, strProtocol, commHan.getComPort(strDeviceAddressKey),
672 iDestDeviceDriverPort);
678 * A private method to set router policies for IoTAddress objects
680 * @params strFieldIdentifier String field name + object ID
681 * @params Map.Entry<String,Object> Entry of map IoTSet instrumentation
682 * @params strHostAddress String host address
685 private void setRouterPolicyIoTSetAddress(String strFieldIdentifier, Map.Entry<String,Object> map,
686 String strHostAddress) {
688 // Get information from the set
689 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
690 int iRows = setInstrumenter.numberOfRows();
691 RuntimeOutput.print("IoTMaster: Number of rows for IoTAddress: " + iRows, BOOL_VERBOSE);
692 // Transfer the address
693 for(int iRow=0; iRow<iRows; iRow++) {
694 arrFieldValues = setInstrumenter.fieldValues(iRow);
695 objAddInitHand.addField(strFieldIdentifier, arrFieldValues); // Save this for object instantiation
696 // Get device address
697 String strAddress = (String) arrFieldValues[0];
698 // Setting up router policies for HTTP/HTTPs
699 routerConfig.configureRouterHTTPPolicies(STR_ROUTER_ADD, strHostAddress, strAddress);
700 routerConfig.configureHostHTTPPolicies(strHostAddress, strHostAddress, strAddress);
705 * A private method to instrument an object's IoTSet and IoTRelation field to up policies
707 * Mostly the IoTSet fields would contain IoTDeviceAddress objects
709 * @params strFieldObjectID String field object ID
712 private void instrumentObjectIoTSet(String strFieldObjectID) throws IOException {
714 // If this is a new object ... then create one
715 // Instrument the class source code and look for IoTSet for device addresses
716 // e.g. @config private IoTSet<IoTDeviceAddress> lb_addresses;
717 String strObjectClassNamePath = STR_IOT_CODE_PATH + strObjClassName + "/" + strObjClassName + STR_CLS_FILE_EXT;
718 FileInputStream fis = new FileInputStream(strObjectClassNamePath);
719 ClassReader cr = new ClassReader(fis);
720 ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
721 // We need Object ID to instrument IoTDeviceAddress
722 ClassRuntimeInstrumenterMaster crim = new ClassRuntimeInstrumenterMaster(cw, strFieldObjectID, BOOL_VERBOSE);
725 RuntimeOutput.print("IoTMaster: Going to instrument for " + strObjClassName + " with objectID " +
726 strFieldObjectID, BOOL_VERBOSE);
727 // Get the object and the class names
728 // Build objects for IoTSet and IoTRelation fields in the device object classes
729 mapClassNameToCrim.put(strObjClassName + strFieldObjectID, crim);
730 HashMap<String,Object> hmObjectFieldObjects = crim.getFieldObjects();
731 for(Map.Entry<String,Object> map : hmObjectFieldObjects.entrySet()) {
732 RuntimeOutput.print("IoTMaster: Object name: " + map.getValue().getClass().getName(), BOOL_VERBOSE);
733 // Iterate over HashMap and choose between processing
734 String strFieldName = map.getKey();
735 String strClassName = map.getValue().getClass().getName();
736 String strFieldIdentifier = strFieldName + strFieldObjectID;
737 if(strClassName.equals(STR_SET_INSTRUMENTER_CLS)) {
738 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
739 if(setInstrumenter.getObjTableName().equals(STR_IOT_DEV_ADD_CLS)) {
740 // Instrument the normal IoTDeviceAddress
741 setRouterPolicyIoTSetDevice(strFieldIdentifier, map, strIoTSlaveObjectHostAdd);
742 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ADD_CLS)) {
743 // Instrument the IoTAddress
744 setRouterPolicyIoTSetAddress(strFieldIdentifier, map, strIoTSlaveObjectHostAdd);
745 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ZB_ADD_CLS)) {
746 // Instrument the IoTZigbeeAddress - special feature for Zigbee device support
747 RuntimeOutput.print("IoTMaster: IoTZigbeeAddress found! No router policy is set here..",
750 String strErrMsg = "IoTMaster: Device driver object" +
751 " can only have IoTSet<IoTAddress>, IoTSet<IoTDeviceAddress>," +
752 " or IoTSet<IoTZigbeeAddress>!";
753 throw new Error(strErrMsg);
756 String strErrMsg = "IoTMaster: Device driver object can only have IoTSet for addresses!";
757 throw new Error(strErrMsg);
764 * A private method to create an object on a specific machine
766 * @params strObjName String object name
767 * @params strObjClassName String object class name
768 * @params strObjClassInterfaceName String object class interface name
769 * @params strIoTSlaveObjectHostAdd String IoTSlave host address
770 * @params strFieldObjectID String field object ID
771 * @params arrFieldValues Array of field values
772 * @params arrFieldClasses Array of field classes
775 private void createObject(String strObjName, String strObjClassName, String strObjClassInterfaceName, String strObjStubClsIntfaceName,
776 String strIoTSlaveObjectHostAdd, String strFieldObjectID, Object[] arrFieldValues, Class[] arrFieldClasses)
777 throws IOException, FileNotFoundException, ClassNotFoundException, InterruptedException {
784 start = System.currentTimeMillis();
786 // Construct ssh command line
787 // e.g. ssh rtrimana@dw-2.eecs.uci.edu cd <path>;
788 // java -cp $CLASSPATH:./*.jar
789 // -Djava.rmi.server.codebase=file:./*.jar
790 // iotruntime.IoTSlave dw-1.eecs.uci.edu 46151 23829 42874 &
791 // The In-Port for IoTMaster is the Out-Port for IoTSlave and vice versa
792 String strSSHCommand = STR_SSH_USERNAME + strIoTSlaveObjectHostAdd + " cd " + STR_RUNTIME_DIR + " sudo java " +
793 STR_CLS_PATH + " " + STR_RMI_PATH + " " + STR_RMI_HOSTNAME +
794 strIoTSlaveObjectHostAdd + " " + STR_IOT_SLAVE_CLS + " " + strIoTMasterHostAdd + " " +
795 commHan.getComPort(strObjName) + " " + commHan.getRMIRegPort(strObjName) + " " +
796 commHan.getRMIStubPort(strObjName) + " >& " + STR_LOG_FILE_PATH + strObjName + ".log &";
797 RuntimeOutput.print(strSSHCommand, BOOL_VERBOSE);
798 // Start a new thread to start a new JVM
799 createThread(strSSHCommand);
800 ServerSocket serverSocket = new ServerSocket(commHan.getComPort(strObjName));
801 Socket socket = serverSocket.accept();
802 ObjectInputStream inStream = new ObjectInputStream(socket.getInputStream());
803 ObjectOutputStream outStream = new ObjectOutputStream(socket.getOutputStream());
806 result = System.currentTimeMillis()-start;
807 System.out.println("\n\n ==> Time needed to start JVM for " + strObjName + ": " + result + "\n\n");
810 start = System.currentTimeMillis();
812 // Create message to transfer file first
813 String sFileName = strObjClassName + STR_JAR_FILE_EXT;
814 String sPath = STR_IOT_CODE_PATH + strObjClassName + "/" + sFileName;
815 File file = new File(sPath);
816 commMasterToSlave(new MessageSendFile(IoTCommCode.TRANSFER_FILE, sFileName, file.length()),
817 "Sending file!", inStream, outStream);
818 // Send file - JAR file for object creation
819 sendFile(serverSocket.accept(), sPath, file.length());
820 Message msgReply = (Message) inStream.readObject();
821 RuntimeOutput.print("IoTMaster: Reply message: " + msgReply.getMessage(), BOOL_VERBOSE);
824 result = System.currentTimeMillis()-start;
825 System.out.println("\n\n ==> Time needed to send JAR file for " + strObjName + ": " + result + "\n\n");
828 start = System.currentTimeMillis();
830 // Pack object information to create object on a IoTSlave
831 Message msgObjIoTSlave = new MessageCreateObject(IoTCommCode.CREATE_OBJECT, strIoTSlaveObjectHostAdd,
832 strObjClassName, strObjName, strObjClassInterfaceName, strObjStubClsIntfaceName, commHan.getRMIRegPort(strObjName),
833 commHan.getRMIStubPort(strObjName), arrFieldValues, arrFieldClasses);
835 commMasterToSlave(msgObjIoTSlave, "Sending object information", inStream, outStream);
836 // Instrument the class source code and look for IoTSet for device addresses
837 // e.g. @config private IoTSet<IoTDeviceAddress> lb_addresses;
838 RuntimeOutput.print("IoTMaster: Instantiating for " + strObjClassName + " with objectID " +
839 strFieldObjectID, BOOL_VERBOSE);
840 // Get the object and the class names
841 // Build objects for IoTSet and IoTRelation fields in the device object classes
842 ClassRuntimeInstrumenterMaster crim = mapClassNameToCrim.get(strObjClassName + strFieldObjectID);
843 HashMap<String,Object> hmObjectFieldObjects = crim.getFieldObjects();
844 for(Map.Entry<String,Object> map : hmObjectFieldObjects.entrySet()) {
845 RuntimeOutput.print("IoTMaster: Object name: " + map.getValue().getClass().getName(), BOOL_VERBOSE);
846 // Iterate over HashMap and choose between processing
847 String strFieldName = map.getKey();
848 String strClassName = map.getValue().getClass().getName();
849 String strFieldIdentifier = strFieldName + strFieldObjectID;
850 if(strClassName.equals(STR_SET_INSTRUMENTER_CLS)) {
851 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
852 if(setInstrumenter.getObjTableName().equals(STR_IOT_DEV_ADD_CLS)) {
853 // Instrument the normal IoTDeviceAddress
855 instrumentIoTSetDevice(strFieldIdentifier, strObjName, strFieldName, strIoTSlaveObjectHostAdd, inStream, outStream);
857 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ZB_ADD_CLS)) {
858 // Instrument the IoTZigbeeAddress - special feature for Zigbee device support
860 instrumentIoTSetZBDevice(map, strObjName, strFieldName, strIoTSlaveObjectHostAdd, inStream, outStream);
862 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ADD_CLS)) {
863 // Instrument the IoTAddress
865 instrumentIoTSetAddress(strFieldIdentifier, strFieldName, inStream, outStream);
868 String strErrMsg = "IoTMaster: Device driver object" +
869 " can only have IoTSet<IoTAddress>, IoTSet<IoTDeviceAddress>," +
870 " or IoTSet<IoTZigbeeAddress>!";
871 throw new Error(strErrMsg);
874 String strErrMsg = "IoTMaster: Device driver object can only have IoTSet for addresses!";
875 throw new Error(strErrMsg);
879 // TODO: Change this later
880 outStream.writeObject(new MessageSimple(IoTCommCode.END_SESSION));
883 result = System.currentTimeMillis()-start;
884 System.out.println("\n\n ==> Time needed to create object " + strObjName + " and instrument IoTDeviceAddress: " + result + "\n\n");
890 serverSocket.close();
895 * A private method to create controller objects
899 private void createControllerObjects() throws InterruptedException {
901 // Create a list of threads
902 List<Thread> threads = new ArrayList<Thread>();
903 // Get the list of active controller objects and loop it
904 List<String> listActiveControllerObject = commHan.getActiveControllerObjectList();
905 for(String strObjName : listActiveControllerObject) {
907 ObjectCreationInfo objCrtInfo = commHan.getObjectCreationInfo(strObjName);
908 Thread objectThread = new Thread(new Runnable() {
912 createObject(strObjName, objCrtInfo.getObjectClassName(), objCrtInfo.getObjectClassInterfaceName(),
913 objCrtInfo.getObjectStubClassInterfaceName(), objCrtInfo.getIoTSlaveObjectHostAdd(),
914 commHan.getFieldObjectID(strObjName), commHan.getArrayFieldValues(strObjName),
915 commHan.getArrayFieldClasses(strObjName));
916 } catch (IOException |
917 ClassNotFoundException |
918 InterruptedException ex) {
919 ex.printStackTrace();
924 threads.add(objectThread);
925 objectThread.start();
928 for (Thread thread : threads) {
931 } catch (InterruptedException ex) {
932 ex.printStackTrace();
939 * A private method to instrument IoTSet
941 * @params Map.Entry<String,Object> Entry of map IoTSet instrumentation
942 * @params strFieldName String field name
945 private void instrumentIoTSet(Map.Entry<String,Object> map, String strFieldName)
946 throws IOException, ClassNotFoundException, InterruptedException {
948 // Get information from the set
949 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
950 objInitHand.addField(strFieldName, IoTCommCode.CREATE_NEW_IOTSET);
952 int iRows = setInstrumenter.numberOfRows();
953 for(int iRow=0; iRow<iRows; iRow++) {
954 // Get field classes and values
955 arrFieldClasses = setInstrumenter.fieldClasses(iRow);
956 arrFieldValues = setInstrumenter.fieldValues(iRow);
957 // Get object ID and class name
958 String strObjID = setInstrumenter.fieldObjectID(iRow);
959 strObjClassName = setInstrumenter.fieldEntryType(strObjID);
960 // Call the method to create an object
961 instrumentObject(strObjID);
962 int iNumOfPorts = Integer.parseInt(STR_NUM_CALLBACK_PORTS);
963 objInitHand.addObjectIntoField(strFieldName, strIoTSlaveObjectHostAdd, strObjName,
964 strObjClassName, strObjClassInterfaceName, strObjStubClsIntfaceName, commHan.getRMIRegPort(strObjName),
965 commHan.getRMIStubPort(strObjName), commHan.getCallbackPorts(strObjName, iNumOfPorts));
971 * A private method to instrument IoTRelation
973 * @params Map.Entry<String,Object> Entry of map IoTRelation instrumentation
974 * @params strFieldName String field name
977 private void instrumentIoTRelation(Map.Entry<String,Object> map, String strFieldName)
978 throws IOException, ClassNotFoundException, InterruptedException {
980 // Get information from the set
981 RelationInstrumenter relationInstrumenter = (RelationInstrumenter) map.getValue();
982 int iRows = relationInstrumenter.numberOfRows();
983 objInitHand.addField(strFieldName, IoTCommCode.CREATE_NEW_IOTRELATION);
985 for(int iRow=0; iRow<iRows; iRow++) {
986 // Operate on the first set first
987 arrFieldClasses = relationInstrumenter.firstFieldClasses(iRow);
988 arrFieldValues = relationInstrumenter.firstFieldValues(iRow);
989 String strObjID = relationInstrumenter.firstFieldObjectID(iRow);
990 strObjClassName = relationInstrumenter.firstEntryFieldType(strObjID);
991 // Call the method to create an object
992 instrumentObject(strObjID);
993 // Get the first object controller host address
994 String strFirstIoTSlaveObjectHostAdd = strIoTSlaveObjectHostAdd;
995 int iNumOfPorts = Integer.parseInt(STR_NUM_CALLBACK_PORTS);
996 objInitHand.addObjectIntoField(strFieldName, strIoTSlaveObjectHostAdd, strObjName,
997 strObjClassName, strObjClassInterfaceName, strObjStubClsIntfaceName,
998 commHan.getRMIRegPort(strObjName), commHan.getRMIStubPort(strObjName),
999 commHan.getCallbackPorts(strObjName, iNumOfPorts));
1000 // Operate on the second set
1001 arrFieldClasses = relationInstrumenter.secondFieldClasses(iRow);
1002 arrFieldValues = relationInstrumenter.secondFieldValues(iRow);
1003 strObjID = relationInstrumenter.secondFieldObjectID(iRow);
1004 strObjClassName = relationInstrumenter.secondEntryFieldType(strObjID);
1005 // Call the method to create an object
1006 instrumentObject(strObjID);
1007 // Get the second object controller host address
1008 String strSecondIoTSlaveObjectHostAdd = strIoTSlaveObjectHostAdd;
1009 objInitHand.addSecondObjectIntoField(strFieldName, strIoTSlaveObjectHostAdd, strObjName,
1010 strObjClassName, strObjClassInterfaceName, strObjStubClsIntfaceName,
1011 commHan.getRMIRegPort(strObjName), commHan.getRMIStubPort(strObjName),
1012 commHan.getCallbackPorts(strObjName, iNumOfPorts));
1013 // ROUTING POLICY: first and second controller objects in IoTRelation
1014 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strFirstIoTSlaveObjectHostAdd,
1015 strSecondIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
1016 // ROUTING POLICY: Send the same routing policy to both the hosts
1017 routerConfig.configureHostMainPolicies(strFirstIoTSlaveObjectHostAdd, strFirstIoTSlaveObjectHostAdd,
1018 strSecondIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
1019 routerConfig.configureHostMainPolicies(strSecondIoTSlaveObjectHostAdd, strFirstIoTSlaveObjectHostAdd,
1020 strSecondIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
1025 * A method to reinitialize IoTSet and IoTRelation in the code based on ObjectInitHandler information
1027 * @params inStream ObjectInputStream communication
1028 * @params outStream ObjectOutputStream communication
1031 private void initializeSetsAndRelations(ObjectInputStream inStream, ObjectOutputStream outStream)
1032 throws IOException, ClassNotFoundException {
1033 // Get list of fields
1034 List<String> strFields = objInitHand.getListOfFields();
1035 // Iterate on HostAddress
1036 for(String str : strFields) {
1037 IoTCommCode iotcommMsg = objInitHand.getFieldMessage(str);
1038 if (iotcommMsg == IoTCommCode.CREATE_NEW_IOTSET) {
1039 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO CREATE IOTSET
1040 Message msgCrtIoTSet = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTSET, str);
1041 commMasterToSlave(msgCrtIoTSet, "Create new IoTSet!", inStream, outStream);
1042 List<ObjectInitInfo> listObject = objInitHand.getListObjectInitInfo(str);
1043 for (ObjectInitInfo objInitInfo : listObject) {
1044 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO FILL IN IOTSET
1045 commMasterToSlave(new MessageGetObject(IoTCommCode.GET_IOTSET_OBJECT, objInitInfo.getIoTSlaveObjectHostAdd(),
1046 objInitInfo.getObjectName(), objInitInfo.getObjectClassName(), objInitInfo.getObjectClassInterfaceName(),
1047 objInitInfo.getObjectStubClassInterfaceName(), objInitInfo.getRMIRegistryPort(), objInitInfo.getRMIStubPort(),
1048 objInitInfo.getRMICallbackPorts()), "Get IoTSet object!", inStream, outStream);
1051 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO REINITIALIZE IOTSET FIELD
1052 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTSET_FIELD),
1053 "Renitialize IoTSet field!", inStream, outStream);
1054 } else if (iotcommMsg == IoTCommCode.CREATE_NEW_IOTRELATION) {
1055 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO CREATE IOTRELATION
1056 Message msgCrtIoTRel = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTRELATION, str);
1057 commMasterToSlave(msgCrtIoTRel, "Create new IoTRelation!", inStream, outStream);
1058 List<ObjectInitInfo> listObject = objInitHand.getListObjectInitInfo(str);
1059 List<ObjectInitInfo> listSecondObject = objInitHand.getSecondObjectInitInfo(str);
1060 Iterator it = listSecondObject.iterator();
1061 for (ObjectInitInfo objInitInfo : listObject) {
1062 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO FILL IN IOTRELATION (FIRST OBJECT)
1063 commMasterToSlave(new MessageGetObject(IoTCommCode.GET_IOTRELATION_FIRST_OBJECT,
1064 objInitInfo.getIoTSlaveObjectHostAdd(), objInitInfo.getObjectName(), objInitInfo.getObjectClassName(),
1065 objInitInfo.getObjectClassInterfaceName(), objInitInfo.getObjectStubClassInterfaceName(),
1066 objInitInfo.getRMIRegistryPort(), objInitInfo.getRMIStubPort(), objInitInfo.getRMICallbackPorts()),
1067 "Get IoTRelation first object!", inStream, outStream);
1068 ObjectInitInfo objSecObj = (ObjectInitInfo) it.next();
1069 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO FILL IN IOTRELATION (SECOND OBJECT)
1070 commMasterToSlave(new MessageGetObject(IoTCommCode.GET_IOTRELATION_SECOND_OBJECT,
1071 objSecObj.getIoTSlaveObjectHostAdd(), objSecObj.getObjectName(), objSecObj.getObjectClassName(),
1072 objSecObj.getObjectClassInterfaceName(), objSecObj.getObjectStubClassInterfaceName(),
1073 objSecObj.getRMIRegistryPort(), objSecObj.getRMIStubPort(), objSecObj.getRMICallbackPorts()),
1074 "Get IoTRelation second object!", inStream, outStream);
1076 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO REINITIALIZE IOTRELATION FIELD
1077 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTRELATION_FIELD),
1078 "Renitialize IoTRelation field!", inStream, outStream);
1084 * A method to set router basic policies at once
1086 * @param strRouter String router name
1089 private void setRouterBasicPolicies(String strRouter) {
1091 String strMonitorHost = routerConfig.getIPFromMACAddress(STR_MONITORING_HOST);
1092 routerConfig.configureRouterICMPPolicies(strRouter, strMonitorHost);
1093 routerConfig.configureRouterDHCPPolicies(strRouter);
1094 routerConfig.configureRouterDNSPolicies(strRouter);
1095 routerConfig.configureRouterSSHPolicies(strRouter, strMonitorHost);
1096 routerConfig.configureRejectPolicies(strRouter);
1100 * A method to set host basic policies at once
1102 * @param strHost String host name
1105 private void setHostBasicPolicies(String strHost) {
1107 String strMonitorHost = routerConfig.getIPFromMACAddress(STR_MONITORING_HOST);
1108 routerConfig.configureHostDHCPPolicies(strHost);
1109 routerConfig.configureHostDNSPolicies(strHost);
1110 if (strHost.equals(strMonitorHost)) {
1111 // Check if this is the monitoring host
1112 routerConfig.configureHostICMPPolicies(strHost);
1113 routerConfig.configureHostSSHPolicies(strHost);
1115 routerConfig.configureHostICMPPolicies(strHost, strMonitorHost);
1116 routerConfig.configureHostSSHPolicies(strHost, strMonitorHost);
1118 // Apply SQL allowance policies to master host
1119 if (strHost.equals(strIoTMasterHostAdd)) {
1120 routerConfig.configureHostSQLPolicies(strHost);
1122 routerConfig.configureRejectPolicies(strHost);
1126 * A method to create a thread for policy deployment
1128 * @param strRouterAddress String router address to configure
1129 * @param setHostAddresses Set of strings for host addresses to configure
1132 private void createPolicyThreads(String strRouterAddress, Set<String> setHostAddresses) throws IOException {
1134 // Create a list of threads
1135 List<Thread> threads = new ArrayList<Thread>();
1136 // Start threads for hosts
1137 for(String strAddress : setHostAddresses) {
1138 Thread policyThread = new Thread(new Runnable() {
1140 synchronized(this) {
1141 routerConfig.sendHostPolicies(strAddress);
1145 threads.add(policyThread);
1146 policyThread.start();
1147 RuntimeOutput.print("Deploying policies for: " + strAddress, BOOL_VERBOSE);
1149 // A thread for router
1150 Thread policyThread = new Thread(new Runnable() {
1152 synchronized(this) {
1153 routerConfig.sendRouterPolicies(strRouterAddress);
1157 threads.add(policyThread);
1158 policyThread.start();
1159 RuntimeOutput.print("Deploying policies on router: " + strRouterAddress, BOOL_VERBOSE);
1161 for (Thread thread : threads) {
1164 } catch (InterruptedException ex) {
1165 ex.printStackTrace();
1172 * A method to assign objects to multiple JVMs, including
1173 * the controller/device object that uses other objects
1174 * in IoTSet and IoTRelation
1178 private void createObjects() {
1185 // Extract hostname for this IoTMaster from MySQL DB
1186 strIoTMasterHostAdd = routerConfig.getIPFromMACAddress(STR_MASTER_MAC_ADD);
1187 // Loop as we can still find controller/device classes
1188 for(int i=0; i<strObjectNames.length; i++) {
1190 start = System.currentTimeMillis();
1192 // Assign a new list of PrintWriter objects
1193 routerConfig.renewPrintWriter();
1194 // Get controller names one by one
1195 String strObjControllerName = strObjectNames[i];
1196 // Use LoadBalancer to assign a host address
1197 //strIoTSlaveControllerHostAdd = lbIoT.selectHost();
1198 strIoTSlaveControllerHostAdd = routerConfig.getIPFromMACAddress(lbIoT.selectHost());
1199 if (strIoTSlaveControllerHostAdd == null)
1200 throw new Error("IoTMaster: Could not translate MAC to IP address! Please check the router's /tmp/dhcp.leases!");
1201 // == START INITIALIZING CONTROLLER/DEVICE IOTSLAVE ==
1202 // Add port connection and get port numbers
1203 // Naming for objects ProximitySensor becomes ProximitySensor0, ProximitySensor1, etc.
1204 commHan.addPortConnection(strIoTSlaveControllerHostAdd, strObjControllerName);
1205 // ROUTING POLICY: IoTMaster and main controller object
1206 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTMasterHostAdd,
1207 strIoTSlaveControllerHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjControllerName));
1208 // ROUTING POLICY: Send the same routing policy to both the hosts
1209 routerConfig.configureHostMainPolicies(strIoTMasterHostAdd, strIoTMasterHostAdd,
1210 strIoTSlaveControllerHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjControllerName));
1211 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTMasterHostAdd,
1212 strIoTSlaveControllerHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjControllerName));
1214 // Construct ssh command line and create a controller thread for e.g. AcmeProximity
1215 String strSSHCommand = STR_SSH_USERNAME + strIoTSlaveControllerHostAdd + " cd " +
1216 STR_RUNTIME_DIR + " sudo java " + STR_JVM_INIT_HEAP_SIZE + " " +
1217 STR_JVM_MAX_HEAP_SIZE + " " + STR_CLS_PATH + " " +
1218 STR_RMI_PATH + " " + STR_IOT_SLAVE_CLS + " " + strIoTMasterHostAdd + " " +
1219 commHan.getComPort(strObjControllerName) + " " +
1220 commHan.getRMIRegPort(strObjControllerName) + " " +
1221 commHan.getRMIStubPort(strObjControllerName) + " >& " +
1222 STR_LOG_FILE_PATH + strObjControllerName + ".log &";
1223 RuntimeOutput.print(strSSHCommand, BOOL_VERBOSE);
1224 createThread(strSSHCommand);
1225 // Wait for connection
1226 // Create a new socket for communication
1227 ServerSocket serverSocket = new ServerSocket(commHan.getComPort(strObjControllerName));
1228 Socket socket = serverSocket.accept();
1229 ObjectInputStream inStream = new ObjectInputStream(socket.getInputStream());
1230 ObjectOutputStream outStream = new ObjectOutputStream(socket.getOutputStream());
1231 RuntimeOutput.print("IoTMaster: Communication established!", BOOL_VERBOSE);
1234 result = System.currentTimeMillis()-start;
1235 System.out.println("\n\n ==> From start until after SSH for main controller: " + result);
1237 start = System.currentTimeMillis();
1239 // Send files for every controller class
1240 // e.g. AcmeProximity.jar and AcmeProximity.zip
1241 String strControllerClassName = strObjControllerName + STR_CLS_FILE_EXT;
1242 String strControllerClassNamePath = STR_CONT_PATH + strObjControllerName + "/" +
1243 strControllerClassName;
1245 String strControllerJarName = strObjControllerName + STR_JAR_FILE_EXT;
1246 String strControllerJarNamePath = STR_CONT_PATH + strObjControllerName + "/" +
1247 strControllerJarName;
1248 File file = new File(strControllerJarNamePath);
1249 commMasterToSlave(new MessageSendFile(IoTCommCode.TRANSFER_FILE, strControllerJarName, file.length()),
1250 "Sending file!", inStream, outStream);
1251 // Send file - Class file for object creation
1252 sendFile(serverSocket.accept(), strControllerJarNamePath, file.length());
1253 Message msgReply = (Message) inStream.readObject();
1254 RuntimeOutput.print("IoTMaster: Reply message: " + msgReply.getMessage(), BOOL_VERBOSE);
1255 // Send .zip file if additional zip file is specified
1256 String strObjCfgFile = strObjControllerName + STR_CFG_FILE_EXT;
1257 String strObjCfgFilePath = STR_CONT_PATH + strObjControllerName + "/" + strObjCfgFile;
1258 String strAdditionalFile = parseConfigFile(strObjCfgFilePath, STR_FILE_TRF_CFG);
1259 if (strAdditionalFile.equals(STR_YES)) {
1260 String strControllerCmpName = strObjControllerName + STR_ZIP_FILE_EXT;
1261 String strControllerCmpNamePath = STR_CONT_PATH + strObjControllerName + "/" +
1262 strControllerCmpName;
1263 file = new File(strControllerCmpNamePath);
1264 commMasterToSlave(new MessageSendFile(IoTCommCode.TRANSFER_FILE, strControllerCmpName, file.length()),
1265 "Sending file!", inStream, outStream);
1266 // Send file - Class file for object creation
1267 sendFile(serverSocket.accept(), strControllerCmpNamePath, file.length());
1268 msgReply = (Message) inStream.readObject();
1269 RuntimeOutput.print("IoTMaster: Reply message: " + msgReply.getMessage(), BOOL_VERBOSE);
1271 // Create main controller/device object
1272 commMasterToSlave(new MessageCreateMainObject(IoTCommCode.CREATE_MAIN_OBJECT, strObjControllerName),
1273 "Create main object!", inStream, outStream);
1276 result = System.currentTimeMillis()-start;
1277 System.out.println("\n\n ==> From IoTSlave start until main controller object is created: " + result);
1278 System.out.println(" ==> Including file transfer times!\n\n");
1280 start = System.currentTimeMillis();
1282 // == END INITIALIZING CONTROLLER/DEVICE IOTSLAVE ==
1283 // Instrumenting one file
1284 RuntimeOutput.print("IoTMaster: Opening class file: " + strControllerClassName, BOOL_VERBOSE);
1285 RuntimeOutput.print("IoTMaster: Class file path: " + strControllerClassNamePath, BOOL_VERBOSE);
1286 FileInputStream fis = new FileInputStream(strControllerClassNamePath);
1287 ClassReader cr = new ClassReader(fis);
1288 ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
1289 ClassRuntimeInstrumenterMaster crim = new ClassRuntimeInstrumenterMaster(cw, null, BOOL_VERBOSE);
1292 // Get the object and the class names
1293 // Build objects for IoTSet and IoTRelation fields in the controller/device classes
1294 HashMap<String,Object> hmControllerFieldObjects = crim.getFieldObjects();
1295 for(Map.Entry<String,Object> map : hmControllerFieldObjects.entrySet()) {
1296 RuntimeOutput.print("IoTMaster: Object name: " + map.getValue().getClass().getName(), BOOL_VERBOSE);
1297 // Iterate over HashMap and choose between processing
1298 // SetInstrumenter vs. RelationInstrumenter
1299 String strFieldName = map.getKey();
1300 String strClassName = map.getValue().getClass().getName();
1301 if(strClassName.equals(STR_SET_INSTRUMENTER_CLS)) {
1302 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
1303 if(setInstrumenter.getObjTableName().equals(STR_IOT_DEV_ADD_CLS)) {
1304 String strErrMsg = "IoTMaster: Controller object" +
1305 " cannot have IoTSet<IoTDeviceAddress>!";
1306 throw new Error(strErrMsg);
1307 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ZB_ADD_CLS)) {
1308 String strErrMsg = "IoTMaster: Controller object" +
1309 " cannot have IoTSet<ZigbeeAddress>!";
1310 throw new Error(strErrMsg);
1311 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ADD_CLS)) {
1312 // Instrument the IoTAddress
1313 setRouterPolicyIoTSetAddress(strFieldName, map, strIoTSlaveControllerHostAdd);
1314 instrumentIoTSetAddress(strFieldName, strFieldName, inStream, outStream);
1317 instrumentIoTSet(map, strFieldName);
1319 } else if (strClassName.equals(STR_REL_INSTRUMENTER_CLS)) {
1320 instrumentIoTRelation(map, strFieldName);
1324 result = System.currentTimeMillis()-start;
1325 System.out.println("\n\n ==> Time needed to instrument device driver objects: " + result + "\n\n");
1326 System.out.println(" ==> #Objects: " + commHan.getActiveControllerObjectList().size() + "\n\n");
1329 start = System.currentTimeMillis();
1331 // ROUTING POLICY: Deploy basic policies if this is the last controller
1332 if (i == strObjectNames.length-1) {
1333 // ROUTING POLICY: implement basic policies to reject all other irrelevant traffics
1334 for(String s: commHan.getHosts()) {
1335 setHostBasicPolicies(s);
1337 // We retain all the basic policies for router,
1338 // but we delete the initial allowance policies for internal all TCP and UDP communications
1339 setRouterBasicPolicies(STR_ROUTER_ADD);
1341 // Close access to policy files and deploy policies
1342 routerConfig.close();
1343 // Deploy the policy
1344 HashSet<String> setAddresses = new HashSet<String>(commHan.getHosts());
1345 setAddresses.add(strIoTMasterHostAdd);
1346 createPolicyThreads(STR_ROUTER_ADD, setAddresses);
1349 result = System.currentTimeMillis()-start;
1350 System.out.println("\n\n ==> Time needed to send policy files and deploy them : " + result + "\n\n");
1353 start = System.currentTimeMillis();
1355 // Separating object creations and Set/Relation initializations
1356 createControllerObjects();
1359 result = System.currentTimeMillis()-start;
1360 System.out.println("\n\n ==> Time needed to instantiate objects: " + result + "\n\n");
1362 start = System.currentTimeMillis();
1364 // Sets and relations initializations
1365 initializeSetsAndRelations(inStream, outStream);
1368 result = System.currentTimeMillis()-start;
1369 System.out.println("\n\n ==> Time needed to initialize sets and relations: " + result + "\n\n");
1371 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO EXECUTE INIT METHOD
1372 commMasterToSlave(new MessageSimple(IoTCommCode.INVOKE_INIT_METHOD),
1373 "Invoke init() method!", inStream, outStream);
1374 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO END PROCESS
1375 outStream.writeObject(new MessageSimple(IoTCommCode.END_SESSION));
1379 serverSocket.close();
1380 commHan.printLists();
1381 lbIoT.printHostInfo();
1384 } catch (IOException |
1385 InterruptedException |
1386 ClassNotFoundException ex) {
1387 System.out.println("IoTMaster: Exception: "
1389 ex.printStackTrace();
1393 public static void main(String args[]) {
1395 // Detect the available controller/device classes
1396 // Input args[] should be used to list the controllers/devices
1397 // e.g. java IoTMaster AcmeProximity AcmeThermostat AcmeVentController
1398 IoTMaster iotMaster = new IoTMaster(args);
1400 iotMaster.parseIoTMasterConfigFile();
1401 // Initialize CommunicationHandler, LoadBalancer, and RouterConfig
1402 iotMaster.initLiveDataStructure();
1404 iotMaster.createObjects();