bower_components
test/bower_components
+/src/bin
+
+
# Ignoring those pesky .DS_Store files on mac
.DS_Store
--- /dev/null
+((nil . ((indent-tabs-mode . t))))
+
--- /dev/null
+package iotcloud;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This Entry records the abort sent by a given machine.
+ * @author Ali Younis <ayounis@uci.edu>
+ * @version 1.0
+ */
+
+
+class Abort extends Entry {
+ private long seqnum;
+ private long machineid;
+
+ Abort(Slot slot, long _seqnum, long _machineid) {
+ super(slot);
+ seqnum=_seqnum;
+ machineid=_machineid;
+ }
+
+ long getMachineID() {
+ return machineid;
+ }
+
+ long getSequenceNumber() {
+ return seqnum;
+ }
+
+ static Entry decode(Slot slot, ByteBuffer bb) {
+ long seqnum=bb.getLong();
+ long machineid=bb.getLong();
+ return new Abort(slot, seqnum, machineid);
+ }
+
+ void encode(ByteBuffer bb) {
+ bb.put(Entry.TypeAbort);
+ bb.putLong(seqnum);
+ bb.putLong(machineid);
+ }
+
+ int getSize() {
+ return 2*Long.BYTES+Byte.BYTES;
+ }
+
+ byte getType() {
+ return Entry.TypeAbort;
+ }
+
+ Entry getCopy(Slot s) {
+ return new Abort(s, seqnum, machineid);
+ }
+}
\ No newline at end of file
--- /dev/null
+package iotcloud;
+import java.io.*;
+import java.net.*;
+import java.util.Arrays;
+import javax.crypto.*;
+import javax.crypto.spec.*;
+import java.security.SecureRandom;
+
+/**
+ * This class provides a communication API to the webserver. It also
+ * validates the HMACs on the slots and handles encryption.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
+class CloudComm {
+ String baseurl;
+ Cipher encryptCipher;
+ Cipher decryptCipher;
+ Mac mac;
+ String password;
+ SecureRandom random;
+ static final int SALT_SIZE = 8;
+ byte salt[];
+ Table table;
+
+ /**
+ * Empty Constructor needed for child class.
+ */
+
+ CloudComm() {
+ }
+
+ /**
+ * Constructor for actual use. Takes in the url and password.
+ */
+
+ CloudComm(Table _table, String _baseurl, String _password) {
+ this.table=_table;
+ this.baseurl=_baseurl;
+ this.password = _password;
+ this.random = new SecureRandom();
+ }
+
+ /**
+ * Generates Key from password.
+ */
+
+ private SecretKeySpec initKey() {
+ try {
+ PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(), salt, 65536, 128);
+ SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
+ return new SecretKeySpec(tmpkey.getEncoded(), "AES");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Failed generating key.");
+ }
+ }
+
+ /**
+ * Inits the HMAC generator.
+ */
+
+ private void initCrypt() {
+ try {
+ SecretKeySpec key=initKey();
+ password = null; // drop password
+ mac = Mac.getInstance("HmacSHA256");
+ mac.init(key);
+ encryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding");
+ encryptCipher.init(Cipher.ENCRYPT_MODE, key);
+ decryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding");
+ decryptCipher.init(Cipher.DECRYPT_MODE, key);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Failed To Initialize Ciphers");
+ }
+ }
+
+ /*
+ * Builds the URL for the given request.
+ */
+
+ private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException {
+ String reqstring=isput?"req=putslot":"req=getslot";
+ String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber;
+ if (maxentries != 0)
+ urlstr += "&max="+maxentries;
+ return new URL(urlstr);
+ }
+
+ public void setSalt() {
+ try {
+ salt = new byte[SALT_SIZE];
+ random.nextBytes(salt);
+ URL url=new URL(baseurl+"?req=setsalt");
+ URLConnection con=url.openConnection();
+ HttpURLConnection http = (HttpURLConnection) con;
+ http.setRequestMethod("POST");
+ http.setFixedLengthStreamingMode(salt.length);
+ http.setDoOutput(true);
+ http.connect();
+ OutputStream os=http.getOutputStream();
+ os.write(salt);
+ int responsecode=http.getResponseCode();
+ if (responsecode != HttpURLConnection.HTTP_OK)
+ throw new Error("Invalid response");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Failed setting salt");
+ }
+ initCrypt();
+ }
+
+ private void getSalt() throws Exception {
+ URL url=new URL(baseurl+"?req=getsalt");
+ URLConnection con=url.openConnection();
+ HttpURLConnection http = (HttpURLConnection) con;
+ http.setRequestMethod("POST");
+ http.connect();
+
+ InputStream is=http.getInputStream();
+ DataInputStream dis=new DataInputStream(is);
+ int salt_length=dis.readInt();
+ byte [] tmp=new byte[salt_length];
+ dis.readFully(tmp);
+ salt=tmp;
+ }
+
+ /*
+ * API for putting a slot into the queue. Returns null on success.
+ * On failure, the server will send slots with newer sequence
+ * numbers.
+ */
+
+ Slot[] putSlot(Slot slot, int max) {
+ try {
+ if (salt == null) {
+ getSalt();
+ initCrypt();
+ }
+
+ long sequencenumber=slot.getSequenceNumber();
+ byte[] bytes=slot.encode(mac);
+ bytes = encryptCipher.doFinal(bytes);
+
+ URL url=buildRequest(true, sequencenumber, max);
+ URLConnection con=url.openConnection();
+ HttpURLConnection http = (HttpURLConnection) con;
+
+ http.setRequestMethod("POST");
+ http.setFixedLengthStreamingMode(bytes.length);
+ http.setDoOutput(true);
+ http.connect();
+
+ OutputStream os=http.getOutputStream();
+ os.write(bytes);
+
+ InputStream is=http.getInputStream();
+ DataInputStream dis=new DataInputStream(is);
+ byte[] resptype=new byte[7];
+ dis.readFully(resptype);
+ if (Arrays.equals(resptype, "getslot".getBytes()))
+ return processSlots(dis);
+ else if (Arrays.equals(resptype, "putslot".getBytes()))
+ return null;
+ else
+ throw new Error("Bad response to putslot");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("putSlot failed");
+ }
+ }
+
+ /**
+ * Request the server to send all slots with the given
+ * sequencenumber or newer.
+ */
+
+ Slot[] getSlots(long sequencenumber) {
+ try {
+ if (salt == null) {
+ getSalt();
+ initCrypt();
+ }
+
+ URL url=buildRequest(false, sequencenumber, 0);
+ URLConnection con=url.openConnection();
+ HttpURLConnection http = (HttpURLConnection) con;
+ http.setRequestMethod("POST");
+ http.connect();
+ InputStream is=http.getInputStream();
+
+ DataInputStream dis=new DataInputStream(is);
+
+ byte[] resptype=new byte[7];
+ dis.readFully(resptype);
+ if (!Arrays.equals(resptype, "getslot".getBytes()))
+ throw new Error("Bad Response: "+new String(resptype));
+ else
+ return processSlots(dis);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("getSlots failed");
+ }
+ }
+
+ /**
+ * Method that actually handles building Slot objects from the
+ * server response. Shared by both putSlot and getSlots.
+ */
+
+ private Slot[] processSlots(DataInputStream dis) throws Exception {
+ int numberofslots=dis.readInt();
+ int[] sizesofslots=new int[numberofslots];
+ Slot[] slots=new Slot[numberofslots];
+ for(int i=0; i<numberofslots; i++)
+ sizesofslots[i]=dis.readInt();
+
+ for(int i=0; i<numberofslots; i++) {
+ byte[] data=new byte[sizesofslots[i]];
+ dis.readFully(data);
+
+ data = decryptCipher.doFinal(data);
+
+ slots[i]=Slot.decode(table, data, mac);
+ }
+ dis.close();
+ return slots;
+ }
+}
--- /dev/null
+package iotcloud;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * This Entry records the abort sent by a given machine.
+ * @author Ali Younis <ayounis@uci.edu>
+ * @version 1.0
+ */
+
+
+class Commit extends Entry {
+ private long seqnum;
+ private Set<KeyValue> keyValueUpdateSet;
+
+
+ public Commit(Slot slot, long _seqnum, long _machineid) {
+ super(slot);
+ seqnum=_seqnum;
+ machineid=_machineid;
+ }
+
+ public long getSequenceNumber() {
+ return seqnum;
+ }
+
+
+
+
+
+ static Entry decode(Slot slot, ByteBuffer bb) {
+ long seqnum=bb.getLong();
+ long machineid=bb.getLong();
+ return new Abort(slot, seqnum, machineid);
+ }
+
+ public void encode(ByteBuffer bb) {
+ bb.put(Entry.TypeAbort);
+ bb.putLong(seqnum);
+ bb.putLong(machineid);
+ }
+
+ public int getSize() {
+ return 2*Long.BYTES+Byte.BYTES;
+ }
+
+ public byte getType() {
+ return Entry.TypeAbort;
+ }
+
+ public Entry getCopy(Slot s) {
+ return new Abort(s, seqnum, machineid);
+ }
+}
\ No newline at end of file
--- /dev/null
+package iotcloud;
+import java.nio.ByteBuffer;
+
+/**
+ * Generic class that wraps all the different types of information
+ * that can be stored in a Slot.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+abstract class Entry implements Liveness {
+
+
+ static final byte TypeCommit = 1;
+ static final byte TypeAbort = 2;
+ static final byte TypeTransaction = 3;
+ static final byte TypeNewKey = 4;
+ static final byte TypeLastMessage = 5;
+ static final byte TypeRejectedMessage = 6;
+ static final byte TypeTableStatus = 7;
+
+
+
+ /* Records whether the information is still live or has been
+ superceded by a newer update. */
+
+ private boolean islive = true;
+ private Slot parentslot;
+
+ public Entry(Slot _parentslot) {
+ parentslot = _parentslot;
+ }
+
+ /**
+ * Static method for decoding byte array into Entry objects. First
+ * byte tells the type of entry.
+ */
+
+ static Entry decode(Slot slot, ByteBuffer bb) {
+ byte type = bb.get();
+ switch (type) {
+
+ case TypeLastMessage:
+ return LastMessage.decode(slot, bb);
+
+ case TypeRejectedMessage:
+ return RejectedMessage.decode(slot, bb);
+
+ case TypeTableStatus:
+ return TableStatus.decode(slot, bb);
+
+ default:
+ throw new Error("Unrecognized Entry Type: " + type);
+ }
+ }
+
+ /**
+ * Returns true if the Entry object is still live.
+ */
+
+ public boolean isLive() {
+ return islive;
+ }
+
+ /**
+ * Flags the entry object as dead. Also decrements the live count
+ * of the parent slot.
+ */
+
+ public void setDead() {
+ islive = false;
+ parentslot.decrementLiveCount();
+ }
+
+ /**
+ * Serializes the Entry object into the byte buffer.
+ */
+
+ abstract void encode(ByteBuffer bb);
+
+ /**
+ * Returns the size in bytes the entry object will take in the byte
+ * array.
+ */
+
+ abstract int getSize();
+
+ /**
+ * Returns a byte encoding the type of the entry object.
+ */
+
+ abstract byte getType();
+
+ /**
+ * Returns a copy of the Entry that can be added to a different slot.
+ */
+ abstract Entry getCopy(Slot s);
+
+}
--- /dev/null
+package iotcloud;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.nio.ByteBuffer;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import java.lang.NullPointerException;
+
+
+class Guard {
+
+ static final byte Equal = 1;
+ static final byte NotEqual = 2;
+
+ private IoTString booleanExpression;
+
+ public Guard() {
+ booleanExpression = null;
+ }
+
+ public Guard(IoTString _booleanExpression) {
+ booleanExpression = _booleanExpression;
+ }
+
+ /**
+ * Create an equality expression for a key value.
+ *
+ */
+ public static String createExpression(IoTString keyName, IoTString keyValue, byte op) {
+ if (op == Equal) {
+ return keyName.toString() + "=='" + keyValue.toString() + "'";
+ } else if (op == NotEqual) {
+ return keyName.toString() + "!='" + keyValue.toString() + "'";
+ }
+
+ // Unrecognized op
+ return null;
+ }
+
+ /**
+ * Add a boolean expression to the guard.
+ *
+ */
+ public void setGuardExpression(String expr) {
+ booleanExpression = new IoTString(expr);
+ }
+
+ /**
+ * Evaluate the guard expression for a given set of key value pairs.
+ *
+ */
+ public boolean evaluate(Set<KeyValue> kvSet) throws ScriptException, NullPointerException {
+
+ // There are no conditions to evaluate
+ if (booleanExpression == null) {
+ return true;
+ }
+
+ // All the current key value pairs that we need to evaluate the condition
+ String[] variables = new String[kvSet.size()];
+
+ // Fill the variables array
+ int i = 0;
+ for (KeyValue kv : kvSet) {
+ variables[i] = kv.getKey() + " ='" + kv.getValue() + "'";
+ i++;
+ }
+
+ // Prep the evaluation engine (script engine)
+ ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
+ for (String s : variables) {
+ engine.eval(s);
+ }
+
+ // Evaluate the guard condition
+ return 1 == (Integer)engine.eval(booleanExpression.toString());
+ }
+
+ /**
+ * Get the size of the guard condition
+ *
+ */
+ public int getSize() {
+ return Integer.BYTES + booleanExpression.length();
+ }
+
+ public void encode(ByteBuffer bb) {
+ bb.putInt(booleanExpression.length());
+ bb.put(booleanExpression.internalBytes());
+ }
+
+ static Guard decode(ByteBuffer bb) {
+ int exprLength = bb.getInt();
+ byte[] expr = new byte[exprLength];
+ bb.get(expr);
+ return new Guard(IoTString.shallow(expr));
+ }
+}
\ No newline at end of file
--- /dev/null
+package iotcloud;
+
+import java.util.Arrays;
+
+/**
+ * IoTString is wraps the underlying byte string. We don't use the
+ * standard String class as we have bytes and not chars.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
+final public class IoTString {
+ byte[] array;
+ int hashcode;
+
+ private IoTString() {
+ }
+
+ /**
+ * Builds an IoTString object around the byte array. This
+ * constructor makes a copy, so the caller is free to modify the byte array.
+ */
+
+ public IoTString(byte[] _array) {
+ array=(byte[]) _array.clone();
+ hashcode=Arrays.hashCode(array);
+ }
+
+ /**
+ * Converts the String object to a byte representation and stores it
+ * into the IoTString object.
+ */
+
+ public IoTString(String str) {
+ array=str.getBytes();
+ hashcode=Arrays.hashCode(array);
+ }
+
+ /**
+ * Internal methods to build an IoTString using the byte[] passed
+ * in. Caller is responsible for ensuring the byte[] is never
+ * modified.
+ */
+
+ static IoTString shallow(byte[] _array) {
+ IoTString i=new IoTString();
+ i.array = _array;
+ i.hashcode = Arrays.hashCode(_array);
+ return i;
+ }
+
+ /**
+ * Internal method to grab a reference to our byte array. Caller
+ * must not modify it.
+ */
+
+ byte[] internalBytes() {
+ return array;
+ }
+
+ /**
+ * Returns the hashCode as computed by Arrays.hashcode(byte[]).
+ */
+
+ public int hashCode() {
+ return hashcode;
+ }
+
+ /**
+ * Returns a String representation of the IoTString.
+ */
+
+ public String toString() {
+ return new String(array);
+ }
+
+ /**
+ * Returns a copy of the underlying byte string.
+ */
+
+ public byte[] getBytes() {
+ return (byte[]) array.clone();
+ }
+
+ /**
+ * Returns true if two byte strings have the same content.
+ */
+
+ public boolean equals(Object o) {
+ if (o instanceof IoTString) {
+ IoTString i=(IoTString)o;
+ return Arrays.equals(array, i.array);
+ }
+ return false;
+ }
+
+ /**
+ * Returns the length in bytes of the IoTString.
+ */
+
+ public int length() {
+ return array.length;
+ }
+}
--- /dev/null
+package iotcloud;
+import java.nio.ByteBuffer;
+
+/**
+ * KeyValue entry for Slot.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+class KeyValue /*extends Entry */ {
+ private IoTString key;
+ private IoTString value;
+
+ public KeyValue(IoTString _key, IoTString _value) {
+ key=_key;
+ value=_value;
+ }
+
+ public IoTString getKey() {
+ return key;
+ }
+
+ public IoTString getValue() {
+ return value;
+ }
+
+ static KeyValue decode(ByteBuffer bb) {
+ int keylength=bb.getInt();
+ int valuelength=bb.getInt();
+ byte[] key=new byte[keylength];
+ byte[] value=new byte[valuelength];
+ bb.get(key);
+ bb.get(value);
+ return new KeyValue(IoTString.shallow(key), IoTString.shallow(value));
+ }
+
+ public void encode(ByteBuffer bb) {
+ bb.putInt(key.length());
+ bb.putInt(value.length());
+ bb.put(key.internalBytes());
+ bb.put(value.internalBytes());
+ }
+
+ public int getSize() {
+ return 2*Integer.BYTES+key.length()+value.length();
+ }
+
+ public String toString() {
+ return value.toString();
+ }
+}
--- /dev/null
+package iotcloud;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This Entry records the last message sent by a given machine.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+
+class LastMessage extends Entry {
+ private long machineid;
+ private long seqnum;
+
+ public LastMessage(Slot slot, long _machineid, long _seqnum) {
+ super(slot);
+ machineid=_machineid;
+ seqnum=_seqnum;
+ }
+
+ public long getMachineID() {
+ return machineid;
+ }
+
+ public long getSequenceNumber() {
+ return seqnum;
+ }
+
+ static Entry decode(Slot slot, ByteBuffer bb) {
+ long machineid=bb.getLong();
+ long seqnum=bb.getLong();
+ return new LastMessage(slot, machineid, seqnum);
+ }
+
+ public void encode(ByteBuffer bb) {
+ bb.put(Entry.TypeLastMessage);
+ bb.putLong(machineid);
+ bb.putLong(seqnum);
+ }
+
+ public int getSize() {
+ return 2*Long.BYTES+Byte.BYTES;
+ }
+
+ public byte getType() {
+ return Entry.TypeLastMessage;
+ }
+
+ public Entry getCopy(Slot s) {
+ return new LastMessage(s, machineid, seqnum);
+ }
+}
+
+
--- /dev/null
+package iotcloud;
+
+/**
+ * Interface common to both classes that record information about the
+ * last message sent by a machine. (Either a Slot or a LastMessage.
+ * @author Brian Demsky <bdemsky@uci.edu>
+ * @version 1.0
+ */
+
+interface Liveness {
+}
--- /dev/null
+all: server
+
+JAVAC = javac
+JAVADOC = javadoc
+BIN_DIR = bin
+DOCS_DIR = docs
+
+server:
+ $(JAVAC) -d $(BIN_DIR) *.java
+
+doc: server
+ $(JAVADOC) -private -d $(DOCS_DIR) *.java
+
+clean:
+ rm -r bin/*
+ rm -r docs/*
+ rm *~
--- /dev/null
+package iotcloud;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This Entry records the abort sent by a given machine.
+ * @author Ali Younis <ayounis@uci.edu>
+ * @version 1.0
+ */
+
+
+class NewKey extends Entry {
+ private IoTString key;
+ private long machineid;
+
+ public NewKey(Slot slot, IoTString _key, long _machineid) {
+ super(slot);
+ key = _key;
+ machineid = _machineid;
+ }
+
+ public long getMachineID() {
+ return machineid;
+ }
+
+ public IoTString getKey() {
+ return key;
+ }
+
+ static Entry decode(Slot slot, ByteBuffer bb) {
+ int keylength = bb.getInt();
+ byte[] key = new byte[keylength];
+ bb.get(key);
+ long machineid = bb.getLong();
+
+ return new NewKey(slot, IoTString.shallow(key), machineid);
+ }
+
+ public void encode(ByteBuffer bb) {
+ bb.put(Entry.TypeAbort);
+ bb.putInt(key.length());
+ bb.put(key.internalBytes());
+ bb.putLong(machineid);
+ }
+
+ public int getSize() {
+ return Long.BYTES + Byte.BYTES + key.length();
+ }
+
+ public byte getType() {
+ return Entry.TypeNewKey;
+ }
+
+ public Entry getCopy(Slot s) {
+ return new NewKey(s, key, machineid);
+ }
+}
\ No newline at end of file
--- /dev/null
+package iotcloud;
+
+class Pair<A,B> {
+ private A a;
+ private B b;
+
+ Pair(A a, B b) {
+ this.a=a;
+ this.b=b;
+ }
+
+ A getFirst() {
+ return a;
+ }
+
+ B getSecond() {
+ return b;
+ }
+
+ public String toString() {
+ return "<"+a+","+b+">";
+ }
+}
--- /dev/null
+package iotcloud;
+
+import java.util.Set;
+import java.util.HashSet;
+
+import javax.script.ScriptException;
+import java.lang.NullPointerException;
+
+
+class PendingTransaction {
+
+ static final byte Equal = Guard.Equal;
+ static final byte NotEqual = Guard.NotEqual;
+
+ private Set<KeyValue> keyValueUpdateSet;
+ private Guard guard;
+
+ public PendingTransaction() {
+ keyValueUpdateSet = new HashSet<KeyValue>();
+ guard = new Guard();
+ }
+
+ /**
+ * Add a new key value to the updates
+ *
+ */
+ public void addKV(KeyValue newKV) {
+
+ // Make sure there are no duplicates
+ for (KeyValue kv : keyValueUpdateSet) {
+ if (kv.getKey().equals(newKV.getKey())) {
+
+ // Remove key if we are adding a newer version of the same key
+ keyValueUpdateSet.remove(kv);
+ break;
+ }
+ }
+
+ // Add the key to the hash set
+ keyValueUpdateSet.add(newKV);
+ }
+
+ /**
+ * Get the key value update set
+ *
+ */
+ public Set<KeyValue> getKVUpdates() {
+ return keyValueUpdateSet;
+ }
+
+ /**
+ * Get the guard
+ *
+ */
+ public Guard getGuard() {
+ return guard;
+ }
+
+ /**
+ * Add a guard to this transaction
+ *
+ */
+ public void addGuard(Guard _guard) {
+ guard = _guard;
+ }
+
+ /**
+ * Evaluate the guard expression for a given transaction using a set of key value pairs.
+ *
+ */
+ public boolean evaluate(Set<KeyValue> kvSet) throws ScriptException, NullPointerException {
+
+ // Evaluate the guard using the current KV Set
+ return guard.evaluate(kvSet);
+ }
+
+ /**
+ * Add a boolean expression to the guard.
+ *
+ */
+ public void setGuardExpression(String expr) {
+ guard.setGuardExpression(expr);
+ }
+
+ /**
+ * Trampoline static method.
+ *
+ */
+ public static String createExpression(IoTString keyName, IoTString keyValue, byte op) {
+ return Guard.createExpression(keyName, keyValue, op);
+ }
+}
\ No newline at end of file
--- /dev/null
+package iotcloud;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+
+/**
+ * Entry for tracking messages that the server rejected. We have to
+ * make sure that all clients know that this message was rejected to
+ * prevent the server from reusing these messages in an attack.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+
+class RejectedMessage extends Entry {
+ /* Machine identifier */
+ private long machineid;
+ /* Oldest sequence number in range */
+ private long oldseqnum;
+ /* Newest sequence number in range */
+ private long newseqnum;
+ /* Is the machine identifier of the relevant slots equal to (or not
+ * equal to) the specified machine identifier. */
+ private boolean equalto;
+ /* Set of machines that have not received notification. */
+ private HashSet<Long> watchset;
+
+ RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
+ super(slot);
+ machineid=_machineid;
+ oldseqnum=_oldseqnum;
+ newseqnum=_newseqnum;
+ equalto=_equalto;
+ }
+
+ long getOldSeqNum() {
+ return oldseqnum;
+ }
+
+ long getNewSeqNum() {
+ return newseqnum;
+ }
+
+ boolean getEqual() {
+ return equalto;
+ }
+
+ long getMachineID() {
+ return machineid;
+ }
+
+ static Entry decode(Slot slot, ByteBuffer bb) {
+ long machineid=bb.getLong();
+ long oldseqnum=bb.getLong();
+ long newseqnum=bb.getLong();
+ byte equalto=bb.get();
+ return new RejectedMessage(slot, machineid, oldseqnum, newseqnum, equalto==1);
+ }
+
+ void setWatchSet(HashSet<Long> _watchset) {
+ watchset=_watchset;
+ }
+
+ void removeWatcher(long machineid) {
+ if (watchset.remove(machineid))
+ if (watchset.isEmpty())
+ setDead();
+ }
+
+ void encode(ByteBuffer bb) {
+ bb.put(Entry.TypeRejectedMessage);
+ bb.putLong(machineid);
+ bb.putLong(oldseqnum);
+ bb.putLong(newseqnum);
+ bb.put(equalto?(byte)1:(byte)0);
+ }
+
+ int getSize() {
+ return 3*Long.BYTES + 2*Byte.BYTES;
+ }
+
+ byte getType() {
+ return Entry.TypeRejectedMessage;
+ }
+
+ Entry getCopy(Slot s) {
+ return new RejectedMessage(s, machineid, oldseqnum, newseqnum, equalto);
+ }
+}
--- /dev/null
+package iotcloud;
+import java.util.Vector;
+import java.nio.ByteBuffer;
+import javax.crypto.Mac;
+import java.util.Arrays;
+
+/**
+ * Data structuring for holding Slot information.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+class Slot implements Liveness {
+ /** Sets the slot size. */
+ static final int SLOT_SIZE=2048;
+ /** Sets the size for the HMAC. */
+ static final int HMAC_SIZE=32;
+
+ /** Sequence number of the slot. */
+ private long seqnum;
+ /** HMAC of previous slot. */
+ private byte[] prevhmac;
+ /** HMAC of this slot. */
+ private byte[] hmac;
+ /** Machine that sent this slot. */
+ private long machineid;
+ /** Vector of entries in this slot. */
+ private Vector<Entry> entries;
+ /** Pieces of information that are live. */
+ private int livecount;
+ /** Flag that indicates whether this slot is still live for
+ * recording the machine that sent it. */
+ private boolean seqnumlive;
+ /** Number of bytes of free space. */
+ private int freespace;
+ /** Reference to Table */
+ private Table table;
+
+ Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) {
+ seqnum=_seqnum;
+ machineid=_machineid;
+ prevhmac=_prevhmac;
+ hmac=_hmac;
+ entries=new Vector<Entry>();
+ livecount=1;
+ seqnumlive=true;
+ freespace = SLOT_SIZE - getBaseSize();
+ table=_table;
+ }
+
+ Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac) {
+ this(_table, _seqnum, _machineid, _prevhmac, null);
+ }
+
+ Slot(Table _table, long _seqnum, long _machineid) {
+ this(_table, _seqnum, _machineid, new byte[HMAC_SIZE], null);
+ }
+
+ byte[] getHMAC() {
+ return hmac;
+ }
+
+ byte[] getPrevHMAC() {
+ return prevhmac;
+ }
+
+ void addEntry(Entry e) {
+ e=e.getCopy(this);
+ entries.add(e);
+ livecount++;
+ freespace -= e.getSize();
+ }
+
+ private void addShallowEntry(Entry e) {
+ entries.add(e);
+ livecount++;
+ freespace -= e.getSize();
+ }
+
+ /**
+ * Returns true if the slot has free space to hold the entry without
+ * using its reserved space. */
+
+ boolean hasSpace(Entry e) {
+ int newfreespace = freespace - e.getSize();
+ return newfreespace >= 0;
+ }
+
+ Vector<Entry> getEntries() {
+ return entries;
+ }
+
+ static Slot decode(Table table, byte[] array, Mac mac) {
+ mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE);
+ byte[] realmac=mac.doFinal();
+
+ ByteBuffer bb=ByteBuffer.wrap(array);
+ byte[] hmac=new byte[HMAC_SIZE];
+ byte[] prevhmac=new byte[HMAC_SIZE];
+ bb.get(hmac);
+ bb.get(prevhmac);
+ if (!Arrays.equals(realmac, hmac))
+ throw new Error("Server Error: Invalid HMAC! Potential Attack!");
+
+ long seqnum=bb.getLong();
+ long machineid=bb.getLong();
+ int numentries=bb.getInt();
+ Slot slot=new Slot(table, seqnum, machineid, prevhmac, hmac);
+
+ for(int i=0; i<numentries; i++) {
+ slot.addShallowEntry(Entry.decode(slot, bb));
+ }
+
+ return slot;
+ }
+
+ byte[] encode(Mac mac) {
+ byte[] array=new byte[SLOT_SIZE];
+ ByteBuffer bb=ByteBuffer.wrap(array);
+ /* Leave space for the slot HMAC. */
+ bb.position(HMAC_SIZE);
+ bb.put(prevhmac);
+ bb.putLong(seqnum);
+ bb.putLong(machineid);
+ bb.putInt(entries.size());
+ for(Entry entry:entries) {
+ entry.encode(bb);
+ }
+ /* Compute our HMAC */
+ mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE);
+ byte[] realmac=mac.doFinal();
+ hmac = realmac;
+ bb.position(0);
+ bb.put(realmac);
+ return array;
+ }
+
+ /**
+ * Returns the empty size of a Slot. Includes 2 HMACs, the machine
+ * identifier, the sequence number, and the number of entries.
+ */
+ int getBaseSize() {
+ return 2*HMAC_SIZE+2*Long.BYTES+Integer.BYTES;
+ }
+
+ /**
+ * Returns the live set of entries for this Slot. Generates a fake
+ * LastMessage entry to represent the information stored by the slot
+ * itself.
+ */
+
+ Vector<Entry> getLiveEntries(boolean resize) {
+ Vector<Entry> liveEntries=new Vector<Entry>();
+ for(Entry entry: entries) {
+ if (entry.isLive()) {
+ if (!resize || entry.getType() != Entry.TypeTableStatus)
+ liveEntries.add(entry);
+ }
+ }
+
+ if (seqnumlive && !resize)
+ liveEntries.add(new LastMessage(this, machineid, seqnum));
+
+ return liveEntries;
+ }
+
+ /**
+ * Returns the sequence number of the slot.
+ */
+
+ long getSequenceNumber() {
+ return seqnum;
+ }
+
+ /**
+ * Returns the machine that sent this slot.
+ */
+
+ long getMachineID() {
+ return machineid;
+ }
+
+ /**
+ * Records that a newer slot records the fact that this slot was
+ * sent by the relevant machine.
+ */
+
+ void setDead() {
+ seqnumlive=false;
+ decrementLiveCount();
+ }
+
+ /**
+ * Update the count of live entries.
+ */
+
+ void decrementLiveCount() {
+ livecount--;
+ if (livecount==0)
+ table.decrementLiveCount();
+ }
+
+ /**
+ * Returns whether the slot stores any live information.
+ */
+
+ boolean isLive() {
+ return livecount > 0;
+ }
+
+ public String toString() {
+ return "<"+getSequenceNumber()+">";
+ }
+}
--- /dev/null
+package iotcloud;
+
+/**
+ * Circular buffer that holds the live set of slots.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+class SlotBuffer {
+ static final int DEFAULT_SIZE = 128;
+
+ private Slot[] array;
+ private int head;
+ private int tail;
+ private long oldestseqn;
+
+ SlotBuffer() {
+ array=new Slot[DEFAULT_SIZE+1];
+ head=tail=0;
+ oldestseqn=0;
+ }
+
+ int size() {
+ if (head >= tail)
+ return head - tail;
+ return (array.length + head) - tail;
+ }
+
+ int capacity() {
+ return array.length - 1;
+ }
+
+ void resize(int newsize) {
+ if (newsize == (array.length-1))
+ return;
+ Slot[] newarray = new Slot[newsize+1];
+ int currsize = size();
+ int index = tail;
+ for(int i=0; i < currsize; i++) {
+ newarray[i] = array[index];
+ if ((++index) == array.length)
+ index = 0;
+ }
+ array = newarray;
+ tail = 0;
+ head = currsize;
+ }
+
+ private void incrementHead() {
+ head++;
+ if (head >= array.length)
+ head=0;
+ }
+
+ private void incrementTail() {
+ tail++;
+ if (tail >= array.length)
+ tail=0;
+ }
+
+ void putSlot(Slot s) {
+ array[head]=s;
+ incrementHead();
+
+ if (oldestseqn==0)
+ oldestseqn = s.getSequenceNumber();
+
+ if (head==tail) {
+ incrementTail();
+ oldestseqn++;
+ }
+ }
+
+ Slot getSlot(long seqnum) {
+ int diff=(int) (seqnum-oldestseqn);
+ int index=diff + tail;
+ if (index >= array.length) {
+ if (head >= tail)
+ return null;
+ index-= array.length;
+ }
+
+ if (index >= array.length)
+ return null;
+
+ if (head >= tail && index >= head)
+ return null;
+
+ return array[index];
+ }
+
+ long getOldestSeqNum() {
+ return oldestseqn;
+ }
+
+ long getNewestSeqNum() {
+ return oldestseqn + size() - 1;
+ }
+}
--- /dev/null
+package iotcloud;
+
+/**
+ * Slot indexer allows slots in both the slot buffer and the new
+ * server response to looked up in a consistent fashion.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+class SlotIndexer {
+ private Slot[] updates;
+ private SlotBuffer buffer;
+ private long firstslotseqnum;
+
+ SlotIndexer(Slot[] _updates, SlotBuffer _buffer) {
+ buffer = _buffer;
+ updates = _updates;
+ firstslotseqnum = updates[0].getSequenceNumber();
+ }
+
+ Slot getSlot(long seqnum) {
+ if (seqnum >= firstslotseqnum) {
+ int offset = (int) (seqnum - firstslotseqnum);
+ if (offset >= updates.length)
+ throw new Error("Invalid Slot Sequence Number Reference");
+ else
+ return updates[offset];
+ } else
+ return buffer.getSlot(seqnum);
+ }
+}
--- /dev/null
+package iotcloud;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.Vector;
+import java.util.Random;
+import java.util.Queue;
+import java.util.LinkedList;
+/**
+ * IoTTable data structure. Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+final public class Table {
+ private int numslots; //number of slots stored in buffer
+
+ //table of key-value pairs
+ private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
+
+ // machine id -> (sequence number, Slot or LastMessage); records last message by each client
+ private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
+ // machine id -> ...
+ private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+ private Vector<Long> rejectedmessagelist = new Vector<Long>();
+ private SlotBuffer buffer;
+ private CloudComm cloud;
+ private long sequencenumber; //Largest sequence number a client has received
+ private long localmachineid;
+ private TableStatus lastTableStatus;
+ static final int FREE_SLOTS = 10; //number of slots that should be kept free
+ static final int SKIP_THRESHOLD = 10;
+ private long liveslotcount = 0;
+ private int chance;
+ static final double RESIZE_MULTIPLE = 1.2;
+ static final double RESIZE_THRESHOLD = 0.75;
+ static final int REJECTED_THRESHOLD = 5;
+ private int resizethreshold;
+ private long lastliveslotseqn; //smallest sequence number with a live entry
+ private Random random = new Random();
+
+ private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
+ private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
+
+
+ public Table(String baseurl, String password, long _localmachineid) {
+ localmachineid = _localmachineid;
+ buffer = new SlotBuffer();
+ numslots = buffer.capacity();
+ setResizeThreshold();
+ sequencenumber = 0;
+ cloud = new CloudComm(this, baseurl, password);
+ lastliveslotseqn = 1;
+
+ pendingTransQueue = new LinkedList<PendingTransaction>();
+ }
+
+ public Table(CloudComm _cloud, long _localmachineid) {
+ localmachineid = _localmachineid;
+ buffer = new SlotBuffer();
+ numslots = buffer.capacity();
+ setResizeThreshold();
+ sequencenumber = 0;
+ cloud = _cloud;
+
+ pendingTransQueue = new LinkedList<PendingTransaction>();
+ }
+
+ public void rebuild() {
+ Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+ validateandupdate(newslots, true);
+ }
+
+ public void update() {
+ Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+
+ validateandupdate(newslots, false);
+ }
+
+ public IoTString get(IoTString key) {
+ KeyValue kv = table.get(key);
+ if (kv != null)
+ return kv.getValue();
+ else
+ return null;
+ }
+
+ public void initTable() {
+ cloud.setSalt();//Set the salt
+ Slot s = new Slot(this, 1, localmachineid);
+ TableStatus status = new TableStatus(s, numslots);
+ s.addEntry(status);
+ Slot[] array = cloud.putSlot(s, numslots);
+ if (array == null) {
+ array = new Slot[] {s};
+ /* update data structure */
+ validateandupdate(array, true);
+ } else {
+ throw new Error("Error on initialization");
+ }
+ }
+
+ public String toString() {
+ return table.toString();
+ }
+
+ public void startTransaction() {
+ // Create a new transaction, invalidates any old pending transactions.
+ pendingTransBuild = new PendingTransaction();
+ }
+
+ public void commitTransaction() {
+
+ // Add the pending transaction to the queue
+ pendingTransQueue.add(pendingTransBuild);
+
+ while (!pendingTransQueue.isEmpty()) {
+ if (tryput( pendingTransQueue.peek(), false)) {
+ pendingTransQueue.poll();
+ }
+ }
+ }
+
+ public void addKV(IoTString key, IoTString value) {
+ KeyValue kv = new KeyValue(key, value);
+ pendingTransBuild.addKV(kv);
+ }
+
+ public void addGuard(IoTString key, IoTString value) {
+ KeyValue kv = new KeyValue(key, value);
+ pendingTransBuild.addKV(kv);
+ }
+
+
+
+
+
+
+ void decrementLiveCount() {
+ liveslotcount--;
+ }
+
+
+ private void setResizeThreshold() {
+ int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
+ resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
+ }
+
+ private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
+ Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize = true; //Resize is forced
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status = new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
+
+ if (! rejectedmessagelist.isEmpty()) {
+ /* TODO: We should avoid generating a rejected message entry if
+ * there is already a sufficient entry in the queue (e.g.,
+ * equalsto value of true and same sequence number). */
+
+ long old_seqn = rejectedmessagelist.firstElement();
+ if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+ long new_seqn = rejectedmessagelist.lastElement();
+ RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+ s.addEntry(rm);
+ } else {
+ long prev_seqn = -1;
+ int i = 0;
+ /* Go through list of missing messages */
+ for (; i < rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ if (s_msg != null)
+ break;
+ prev_seqn = curr_seqn;
+ }
+ /* Generate rejected message entry for missing messages */
+ if (prev_seqn != -1) {
+ RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+ s.addEntry(rm);
+ }
+ /* Generate rejected message entries for present messages */
+ for (; i < rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ long machineid = s_msg.getMachineID();
+ RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ s.addEntry(rm);
+ }
+ }
+ }
+
+ long newestseqnum = buffer.getNewestSeqNum();
+ long oldestseqnum = buffer.getOldestSeqNum();
+ if (lastliveslotseqn < oldestseqnum)
+ lastliveslotseqn = oldestseqnum;
+
+ long seqn = lastliveslotseqn;
+ boolean seenliveslot = false;
+ long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+ long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
+
+ for (; seqn < threshold; seqn++) {
+ Slot prevslot = buffer.getSlot(seqn);
+ //Push slot number forward
+ if (! seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (! prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for (Entry liveentry : liveentries) {
+ if (s.hasSpace(liveentry)) {
+ s.addEntry(liveentry);
+ } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
+ if (!resize) {
+ System.out.print("B"); //?
+ return tryput(pendingTrans, true);
+ }
+ }
+ }
+ }
+
+
+ Transaction trans = new Transaction(s,
+ s.getSequenceNumber(),
+ localmachineid,
+ pendingTrans.getKVUpdates(),
+ pendingTrans.getGuard());
+ boolean insertedTrans = false;
+ if (s.hasSpace(trans)) {
+ s.addEntry(trans);
+ insertedTrans=true;
+ }
+
+ /* now go through live entries from least to greatest sequence number until
+ * either all live slots added, or the slot doesn't have enough room
+ * for SKIP_THRESHOLD consecutive entries*/
+ int skipcount = 0;
+ search:
+ for (; seqn <= newestseqnum; seqn++) {
+ Slot prevslot = buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (!prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for (Entry liveentry : liveentries) {
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else {
+ skipcount++;
+ if (skipcount > SKIP_THRESHOLD)
+ break search;
+ }
+ }
+ }
+
+ int max = 0;
+ if (resize)
+ max = newsize;
+ Slot[] array = cloud.putSlot(s, max);
+ if (array == null) {
+ array = new Slot[] {s};
+ rejectedmessagelist.clear();
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
+ insertedTrans = false;
+ }
+
+ /* update data structure */
+ validateandupdate(array, true);
+
+ return insertedTrans;
+ }
+
+ private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+ /* The cloud communication layer has checked slot HMACs already
+ before decoding */
+ if (newslots.length == 0) return;
+
+ long firstseqnum = newslots[0].getSequenceNumber();
+ if (firstseqnum <= sequencenumber)
+ throw new Error("Server Error: Sent older slots!");
+
+ SlotIndexer indexer = new SlotIndexer(newslots, buffer);
+ checkHMACChain(indexer, newslots);
+
+ HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
+
+ initExpectedSize(firstseqnum);
+ for (Slot slot : newslots) {
+ processSlot(indexer, slot, acceptupdatestolocal, machineSet);
+ updateExpectedSize();
+ }
+
+ /* If there is a gap, check to see if the server sent us everything. */
+ if (firstseqnum != (sequencenumber + 1)) {
+ checkNumSlots(newslots.length);
+ if (!machineSet.isEmpty())
+ throw new Error("Missing record for machines: " + machineSet);
+ }
+
+ commitNewMaxSize();
+
+ /* Commit new to slots. */
+ for (Slot slot : newslots) {
+ buffer.putSlot(slot);
+ liveslotcount++;
+ }
+ sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+ }
+
+ private int expectedsize, currmaxsize;
+
+ private void checkNumSlots(int numslots) {
+ if (numslots != expectedsize)
+ throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
+ }
+
+ private void initExpectedSize(long firstsequencenumber) {
+ long prevslots = firstsequencenumber;
+ expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
+ currmaxsize = numslots;
+ }
+
+ private void updateExpectedSize() {
+ expectedsize++;
+ if (expectedsize > currmaxsize)
+ expectedsize = currmaxsize;
+ }
+
+ private void updateCurrMaxSize(int newmaxsize) {
+ currmaxsize = newmaxsize;
+ }
+
+ private void commitNewMaxSize() {
+ if (numslots != currmaxsize)
+ buffer.resize(currmaxsize);
+
+ numslots = currmaxsize;
+ setResizeThreshold();
+ }
+
+ // private void processEntry(KeyValue entry, SlotIndexer indexer) {
+ // IoTString key=entry.getKey();
+ // KeyValue oldvalue=table.get(key);
+ // if (oldvalue != null) {
+ // oldvalue.setDead();
+ // }
+ // table.put(key, entry);
+ // }
+
+ private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+ updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
+ }
+
+ private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+ long oldseqnum = entry.getOldSeqNum();
+ long newseqnum = entry.getNewSeqNum();
+ boolean isequal = entry.getEqual();
+ long machineid = entry.getMachineID();
+ for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
+ Slot slot = indexer.getSlot(seqnum);
+ if (slot != null) {
+ long slotmachineid = slot.getMachineID();
+ if (isequal != (slotmachineid == machineid)) {
+ throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
+ }
+ }
+ }
+
+ HashSet<Long> watchset = new HashSet<Long>();
+ for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
+ long entry_mid = lastmsg_entry.getKey();
+ /* We've seen it, don't need to continue to watch. Our next
+ * message will implicitly acknowledge it. */
+ if (entry_mid == localmachineid)
+ continue;
+ Pair<Long, Liveness> v = lastmsg_entry.getValue();
+ long entry_seqn = v.getFirst();
+ if (entry_seqn < newseqnum) {
+ addWatchList(entry_mid, entry);
+ watchset.add(entry_mid);
+ }
+ }
+ if (watchset.isEmpty())
+ entry.setDead();
+ else
+ entry.setWatchSet(watchset);
+ }
+
+ private void addWatchList(long machineid, RejectedMessage entry) {
+ HashSet<RejectedMessage> entries = watchlist.get(machineid);
+ if (entries == null)
+ watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
+ entries.add(entry);
+ }
+
+ private void processEntry(TableStatus entry, SlotIndexer indexer) {
+ int newnumslots = entry.getMaxSlots();
+ updateCurrMaxSize(newnumslots);
+ if (lastTableStatus != null)
+ lastTableStatus.setDead();
+ lastTableStatus = entry;
+ }
+
+ private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ machineSet.remove(machineid);
+
+ HashSet<RejectedMessage> watchset = watchlist.get(machineid);
+ if (watchset != null) {
+ for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
+ RejectedMessage rm = rmit.next();
+ if (rm.getNewSeqNum() <= seqnum) {
+ /* Remove it from our watchlist */
+ rmit.remove();
+ /* Decrement machines that need to see this notification */
+ rm.removeWatcher(machineid);
+ }
+ }
+ }
+
+ if (machineid == localmachineid) {
+ /* Our own messages are immediately dead. */
+ if (liveness instanceof LastMessage) {
+ ((LastMessage)liveness).setDead();
+ } else if (liveness instanceof Slot) {
+ ((Slot)liveness).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+
+
+ Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
+ if (lastmsgentry == null)
+ return;
+
+ long lastmsgseqnum = lastmsgentry.getFirst();
+ Liveness lastentry = lastmsgentry.getSecond();
+ if (machineid != localmachineid) {
+ if (lastentry instanceof LastMessage) {
+ ((LastMessage)lastentry).setDead();
+ } else if (lastentry instanceof Slot) {
+ ((Slot)lastentry).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+
+ if (machineid == localmachineid) {
+ if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
+ throw new Error("Server Error: Mismatch on local machine sequence number");
+ } else {
+ if (lastmsgseqnum > seqnum)
+ throw new Error("Server Error: Rollback on remote machine sequence number");
+ }
+ }
+
+ private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
+ for (Entry entry : slot.getEntries()) {
+ switch (entry.getType()) {
+ // case Entry.TypeKeyValue:
+ // processEntry((KeyValue)entry, indexer);
+ // break;
+
+ case Entry.TypeLastMessage:
+ processEntry((LastMessage)entry, indexer, machineSet);
+ break;
+
+ case Entry.TypeRejectedMessage:
+ processEntry((RejectedMessage)entry, indexer);
+ break;
+
+ case Entry.TypeTableStatus:
+ processEntry((TableStatus)entry, indexer);
+ break;
+
+ default:
+ throw new Error("Unrecognized type: " + entry.getType());
+ }
+ }
+ }
+
+ private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
+ for (int i = 0; i < newslots.length; i++) {
+ Slot currslot = newslots[i];
+ Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
+ if (prevslot != null &&
+ !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
+ throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
+ }
+ }
+}
--- /dev/null
+package iotcloud;
+import java.nio.ByteBuffer;
+
+/**
+ * TableStatus entries record the current size of the data structure
+ * in slots. Used to remember the size and to perform resizes.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+
+class TableStatus extends Entry {
+ private int maxslots;
+
+ TableStatus(Slot slot, int _maxslots) {
+ super(slot);
+ maxslots=_maxslots;
+ }
+
+ int getMaxSlots() {
+ return maxslots;
+ }
+
+ static Entry decode(Slot slot, ByteBuffer bb) {
+ int maxslots=bb.getInt();
+ return new TableStatus(slot, maxslots);
+ }
+
+ void encode(ByteBuffer bb) {
+ bb.put(Entry.TypeTableStatus);
+ bb.putInt(maxslots);
+ }
+
+ int getSize() {
+ return Integer.BYTES+Byte.BYTES;
+ }
+
+ byte getType() {
+ return Entry.TypeTableStatus;
+ }
+
+ Entry getCopy(Slot s) {
+ return new TableStatus(s, maxslots);
+ }
+}
--- /dev/null
+package iotcloud;
+
+/**
+ * Test cases.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+public class Test {
+ public static void main(String[] args) {
+ // if(args[0].equals("2"))
+ // test2();
+ // else if(args[0].equals("3"))
+ // test3();
+ // else if(args[0].equals("4"))
+ // test4();
+ // else if(args[0].equals("5"))
+ // test5();
+
+ }
+
+
+
+ // static Thread buildThread(String prefix, Table t) {
+ // return new Thread() {
+ // public void run() {
+ // for(int i=0; i<10000; i++) {
+ // String a=prefix+i;
+ // IoTString ia=new IoTString(a);
+ // t.put(ia, ia);
+ // System.out.println(ia+"->"+t.get(ia));
+ // }
+ // }
+ // };
+ // }
+
+ // static void test5() {
+ // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ // t1.rebuild();
+ // System.out.println(t1);
+ // }
+
+ // static void test4() {
+ // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ // t1.rebuild();
+ // t2.rebuild();
+ // Thread thr1=buildThread("p1", t1);
+ // Thread thr2=buildThread("p2", t2);
+ // thr1.start();
+ // thr2.start();
+ // try {
+ // thr1.join();
+ // thr2.join();
+ // } catch (Exception e) {
+ // e.printStackTrace();
+ // }
+ // }
+
+ // static void test3() {
+ // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ // t1.rebuild();
+ // t2.rebuild();
+ // for(int i=0; i<600; i++) {
+ // String a="STR"+i;
+ // String b="ABR"+i;
+ // IoTString ia=new IoTString(a);
+ // IoTString ib=new IoTString(b);
+ // t1.put(ia, ia);
+ // t2.put(ib, ib);
+ // t1.update();
+ // System.out.println(ib+"->"+t1.get(ib));
+ // System.out.println(ia+"->"+t2.get(ia));
+ // }
+ // }
+
+ // static void test2() {
+ // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ // t1.initTable();
+ // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ // t2.update();
+ // for(int i=0; i<600; i++) {
+ // String a="STR"+i;
+ // String b="ABR"+i;
+ // IoTString ia=new IoTString(a);
+ // IoTString ib=new IoTString(b);
+ // t1.put(ia, ia);
+ // t2.put(ib, ib);
+ // t1.update();
+ // System.out.println(ib+"->"+t1.get(ib));
+ // System.out.println(ia+"->"+t2.get(ia));
+ // }
+ // }
+}
--- /dev/null
+package iotcloud;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.HashSet;
+
+class Transaction extends Entry {
+
+ private long seqnum;
+ private long machineid;
+ private Set<KeyValue> keyValueUpdateSet;
+ private Guard guard;
+
+ public Transaction(Slot slot, long _seqnum, long _machineid, Set<KeyValue> _keyValueUpdateSet, Guard _guard) {
+ super(slot);
+ seqnum = _seqnum;
+ machineid = _machineid;
+ keyValueUpdateSet = _keyValueUpdateSet;
+ guard = _guard;
+ }
+
+ public long getMachineID() {
+ return machineid;
+ }
+
+ public long getSequenceNumber() {
+ return seqnum;
+ }
+
+ public byte getType() {
+ return Entry.TypeLastMessage;
+ }
+
+ public int getSize() {
+ int size = 2 * Long.BYTES + Byte.BYTES; // seq, machine id, entry type
+ size += Integer.BYTES; // number of KV's
+
+ // Size of each KV
+ for (KeyValue kv : keyValueUpdateSet) {
+ size += kv.getSize();
+ }
+
+ // Size of the guard
+ size += guard.getSize();
+
+ return size;
+ }
+
+
+ public void encode(ByteBuffer bb) {
+ bb.put(Entry.TypeTransaction);
+ bb.putLong(seqnum);
+ bb.putLong(machineid);
+
+ for (KeyValue kv : keyValueUpdateSet) {
+ kv.encode(bb);
+ }
+
+ guard.encode(bb);
+ }
+
+ static Entry decode(Slot slot, ByteBuffer bb) {
+ long seqnum = bb.getLong();
+ long machineid = bb.getLong();
+ int numberOfKeys = bb.getInt();
+
+ Set<KeyValue> kvSet = new HashSet<KeyValue>();
+
+ for (int i = 0; i < numberOfKeys; i++) {
+ KeyValue kv = KeyValue.decode(bb);
+ kvSet.add(kv);
+ }
+
+ Guard guard = Guard.decode(bb);
+
+ return new Transaction(slot, seqnum, machineid, kvSet, guard);
+ }
+
+
+
+ public Entry getCopy(Slot s) {
+ return new Transaction(s, seqnum, machineid, keyValueUpdateSet, guard);
+ }
+}
\ No newline at end of file
--- /dev/null
+1) add better resizing support...gets stuck when it is full now...
+2) Transaction does not check arbitrator is the same for all keys and guards
\ No newline at end of file
--- /dev/null
+indent_with_tabs = 2
+indent_cmt_with_tabs = True
+indent_columns = 2
+indent_class = True
+output_tab_size = 2
+nl_if_brace = Remove
+nl_brace_else = Remove
+nl_elseif_brace = Remove
+nl_struct_brace = Remove
+nl_union_brace = Remove
+nl_fcall_brace = Remove
+nl_for_brace = Remove
+nl_fdef_brace = Remove
+nl_while_brace = Remove
+nl_do_brace = Remove
+nl_brace_while = Remove
+nl_switch_brace = Remove
+nl_before_case = True
+nl_try_brace = Remove
+nl_catch_brace = Remove
+nl_brace_catch = Remove
+sp_func_proto_paren = Remove
+sp_func_def_paren = Remove
+sp_inside_fparens = remove
+sp_inside_fparen = remove
+sp_func_call_paren = Remove
+sp_fparen_brace = Add
+sp_sparen_brace = Add
+sp_paren_brace = Add
+sp_else_brace = Add
+sp_brace_else = Add
+sp_catch_brace = Add
+sp_brace_catch = Add
+sp_try_brace = Add
+sp_after_sparen = Add
+sp_cond_colon = remove
+sp_cond_question = remove
--- /dev/null
+indent_with_tabs = 2
+indent_cmt_with_tabs = True
+indent_columns = 2
+indent_class = True
+output_tab_size = 2
+nl_if_brace = Remove
+nl_brace_else = Remove
+nl_elseif_brace = Remove
+nl_struct_brace = Remove
+nl_union_brace = Remove
+nl_fcall_brace = Remove
+nl_for_brace = Remove
+nl_fdef_brace = Remove
+nl_while_brace = Remove
+nl_do_brace = Remove
+nl_brace_while = Remove
+nl_switch_brace = Remove
+nl_before_case = True
+nl_try_brace = Remove
+nl_catch_brace = Remove
+nl_brace_catch = Remove
+sp_func_proto_paren = Remove
+sp_func_def_paren = Remove
+sp_inside_fparens = remove
+sp_inside_fparen = remove
+sp_func_call_paren = Remove
+sp_fparen_brace = Add
+sp_sparen_brace = Add
+sp_paren_brace = Add
+sp_else_brace = Add
+sp_brace_else = Add
+sp_catch_brace = Add
+sp_brace_catch = Add
+sp_try_brace = Add
+sp_after_sparen = Add
+sp_cond_colon = remove
+sp_cond_question = remove
--- /dev/null
+tabbing:
+ uncrustify -c java.cfg --no-backup $$(find .. -name "*.java")
+ uncrustify -c C.cfg --no-backup $$(find .. -name "*.cpp")
+ uncrustify -c C.cfg --no-backup $$(find .. -name "*.h")
--- /dev/null
+((nil . ((indent-tabs-mode . t))))
+
--- /dev/null
+CPPFLAGS=-O0 -g -Wall
+
+all: iotcloud.fcgi
+
+iotcloud.fcgi: iotcloud.o iotquery.o
+ g++ $(CPPFLAGS) -o iotcloud.fcgi iotcloud.o iotquery.o -lfcgi -lfcgi++
+
+iotcloud.o: iotcloud.cpp iotquery.h
+ g++ $(CPPFLAGS) -c -o iotcloud.o iotcloud.cpp
+
+iotquery.o: iotquery.cpp iotquery.h
+ g++ $(CPPFLAGS) -c -o iotquery.o iotquery.cpp
+
+clean:
+ rm *.o iotcloud.fcgi
--- /dev/null
+1) Requires apache2
+2) Requires fastcgi (libapache2-mod-fastcgi and libfcgi-dev)
+
+Setup on ubuntu
+1) Install modules
+
+2) Add .htaccess file in /var/www/html
+RewriteEngine on
+RewriteBase /
+SetHandler cgi-script
+RewriteRule ^([a-zA-Z0-9._]*\.iotcloud/([a-zA-Z0-9._]*))$ /cgi-bin/iotcloud.fcgi/$1
+
+3) Create account directory. For example, create the directory test.iotcloud in /var/www/html
+ -- To password protect, create the following .htaccess file in the account directory:
+AuthType Basic
+AuthName "Private"
+AuthUserFile /var/www/html/foo.iotcloud/.htpasswd
+Require valid-user
+
+4) In apache2.conf, add to the /var/www directory section:
+AllowOverride FileInfo AuthConfig
+
+5) In the sites-enabled/000-default.conf file, add the line:
+SetEnv IOTCLOUD_ROOT /iotcloud/
+
+6) Create the /iotcloud directory.
+
+7) Create the account directory in the /iotcloud directory. For example, test.iotcloud and give it permissions that the apache daemon can write to.
+
+8) Compile cloud server by typing make
+
+9) Copy it to the cgi-bin directory.
--- /dev/null
+#include <iostream>
+#include "iotquery.h"
+
+using namespace std;
+
+
+int main(void) {
+ // Backup the stdio streambufs
+ streambuf * cin_streambuf = cin.rdbuf();
+ streambuf * cout_streambuf = cout.rdbuf();
+ streambuf * cerr_streambuf = cerr.rdbuf();
+
+ FCGX_Request request;
+
+ FCGX_Init();
+ FCGX_InitRequest(&request, 0, 0);
+
+ while (FCGX_Accept_r(&request) == 0) {
+ fcgi_streambuf cin_fcgi_streambuf(request.in);
+ fcgi_streambuf cout_fcgi_streambuf(request.out);
+ fcgi_streambuf cerr_fcgi_streambuf(request.err);
+
+ cin.rdbuf(&cin_fcgi_streambuf);
+ cout.rdbuf(&cout_fcgi_streambuf);
+ cerr.rdbuf(&cerr_fcgi_streambuf);
+
+ IoTQuery * iotquery=new IoTQuery(&request);
+ iotquery->processQuery();
+
+ delete iotquery;
+ }
+
+ // restore stdio streambufs
+ cin.rdbuf(cin_streambuf);
+ cout.rdbuf(cout_streambuf);
+ cerr.rdbuf(cerr_streambuf);
+
+ return 0;
+}
+
--- /dev/null
+#include "iotquery.h"
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
+
+using namespace std;
+
+const char * query_str="QUERY_STRING";
+const char * uri_str="REQUEST_URI";
+const char * method_str="REQUEST_METHOD";
+const char * iotcloudroot_str="IOTCLOUD_ROOT";
+const char * length_str="CONTENT_LENGTH";
+
+IoTQuery::IoTQuery(FCGX_Request *request) :
+ request(request),
+ data(NULL),
+ directory(NULL),
+ uri(NULL),
+ query(NULL),
+ method(NULL),
+ iotcloudroot(NULL),
+ length(0),
+ oldestentry(0),
+ newestentry(0),
+ requestsequencenumber(0),
+ numqueueentries(DEFAULT_SIZE),
+ fd(-1),
+ reqGetSlot(false),
+ reqPutSlot(false),
+ reqSetSalt(false),
+ reqGetSalt(false) {
+}
+
+IoTQuery::~IoTQuery() {
+ if (fd >= 0)
+ close(fd);
+ if (directory)
+ delete directory;
+ if (data)
+ delete data;
+}
+
+/**
+ * Returns true if the account directory exists.
+ */
+
+bool IoTQuery::checkDirectory() {
+ struct stat s;
+ int err=stat(directory, &s);
+ if (-1 == err)
+ return false;
+ return S_ISDIR(s.st_mode);
+}
+
+/**
+ * Decodes query string from client. Extracts type of request,
+ * sequence number, and whether the request changes the number of
+ * slots.
+ */
+
+void IoTQuery::decodeQuery() {
+ int len=strlen(query);
+ char * str=new char[len+1];
+ memcpy(str, query, len+1);
+ char *tok_ptr=str;
+
+ /* Parse commands */
+ char *command=strsep(&tok_ptr, "&");
+ if (strncmp(command, "req=putslot", 11) == 0)
+ reqPutSlot = true;
+ else if (strncmp(command, "req=getslot", 11) == 0)
+ reqGetSlot = true;
+ else if (strncmp(command, "req=setsalt", 11) == 0)
+ reqSetSalt = true;
+ else if (strncmp(command, "req=getsalt", 11) == 0)
+ reqGetSalt = true;
+
+ /* Load Sequence Number for request */
+ char *sequencenumber_str = strsep(&tok_ptr, "&");
+ if (sequencenumber_str != NULL &&
+ strncmp(sequencenumber_str, "seq=", 4) == 0) {
+ sequencenumber_str = strchr(sequencenumber_str, '=');
+ if (sequencenumber_str != NULL) {
+ requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+ }
+ }
+
+ /* don't allow a really old sequence number */
+ if (requestsequencenumber < oldestentry)
+ requestsequencenumber = oldestentry;
+
+ /* Update size if we get request */
+ char * numqueueentries_str = tok_ptr;
+ if (numqueueentries_str != NULL &&
+ strncmp(numqueueentries_str, "max=", 4) == 0) {
+ numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
+ numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+ }
+
+ delete str;
+}
+
+/**
+ * Helper function to write data to file.
+ */
+
+void doWrite(int fd, char *data, long long length) {
+ long long offset=0;
+ do {
+ long long byteswritten=write(fd, &data[offset], length);
+ if (byteswritten > 0) {
+ length -= byteswritten;
+ offset += byteswritten;
+ } else {
+ cerr << "Bytes not written" << endl;
+ if (byteswritten < 0) {
+ cerr << strerror(errno) << " error writing slot file" << endl;
+ }
+ return;
+ }
+ } while(length != 0);
+}
+
+/** Helper function to read data from file. */
+bool doRead(int fd, void *buf, int numbytes) {
+ int offset=0;
+ char *ptr=(char *)buf;
+ do {
+ int bytesread=read(fd, ptr+offset, numbytes);
+ if (bytesread > 0) {
+ offset += bytesread;
+ numbytes -= bytesread;
+ } else
+ return false;
+ } while (numbytes!=0);
+ return true;
+}
+
+/**
+ * Function that handles a getSlot request.
+ */
+
+void IoTQuery::getSlot() {
+ int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
+ if (numrequeststosend < 0)
+ numrequeststosend = 0;
+ long long numbytes = 0;
+ int filesizes[numrequeststosend];
+ int fdarray[numrequeststosend];
+ int index=0;
+ for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+ struct stat st;
+ char *filename=getSlotFileName(seqn);
+ if (stat(filename, &st) == 0) {
+ fdarray[index]=open(filename, O_RDONLY);
+ filesizes[index]=st.st_size;
+ numbytes+=filesizes[index];
+ } else {
+ fdarray[index]=-1;
+ filesizes[index]=0;
+ }
+ delete filename;
+ }
+ const char header[]="getslot";
+
+ /* Size is the header + the payload + space for number of requests
+ plus sizes of each slot */
+
+ long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;
+ char * response = new char[size];
+ long long offset=0;
+ memcpy(response, header, sizeof(header)-1);
+ offset+=sizeof(header)-1;
+ int numreq=htonl(numrequeststosend);
+ memcpy(response + offset, &numreq, sizeof(numreq));
+ offset+=sizeof(numrequeststosend);
+ for(int i=0; i<numrequeststosend; i++) {
+ int filesize=htonl(filesizes[i]);
+ memcpy(response + offset, &filesize, sizeof(filesize));
+ offset+=sizeof(int);
+ }
+
+ /* Read the file data into the buffer */
+ for(int i=0; i<numrequeststosend; i++) {
+ if (fdarray[i]>=0) {
+ doRead(fdarray[i], response+offset, filesizes[i]);
+ offset+=filesizes[i];
+ }
+ }
+
+ /* Send the response out to the webserver. */
+ sendResponse(response, size);
+
+ /* Delete the response buffer and close the files. */
+ delete response;
+ for(int i=0; i<numrequeststosend; i++) {
+ if (fdarray[i] >= 0)
+ close(fdarray[i]);
+ }
+}
+
+/**
+ * The method setSalt handles a setSalt request from the client.
+ */
+
+void IoTQuery::setSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int saltfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
+ doWrite(saltfd, data, length);
+ char response[0];
+ sendResponse(response, 0);
+ close(saltfd);
+ delete filename;
+}
+
+/**
+ * The method getSalt handles a setSalt request from the client.
+ */
+
+void IoTQuery::getSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int filesize = 0;
+ struct stat st;
+ if (stat(filename, &st) == 0) {
+ filesize=st.st_size;
+ } else {
+ delete filename;
+ return;
+ }
+ int saltfd = open(filename, O_RDONLY);
+ int responsesize = filesize + sizeof(int);
+ char * response = new char[responsesize];
+ doRead(saltfd, response+ sizeof(int), filesize);
+ int n_filesize=htonl(filesize);
+ *((int*) response) = n_filesize;
+ sendResponse(response, responsesize);
+ close(saltfd);
+ delete filename;
+ delete response;
+}
+
+/**
+ * The method putSlot handles a putSlot request from the client
+ */
+
+void IoTQuery::putSlot() {
+ /* Check if the request is stale and send update in that case. This
+ servers as an implicit failure of the request. */
+ if (requestsequencenumber!=(newestentry+1)) {
+ getSlot();
+ return;
+ }
+
+ /* See if we have too many slots and if so, delete the old one */
+ int numberofliveslots=(int) ((newestentry-oldestentry)+1);
+ if (numberofliveslots >= numqueueentries) {
+ removeOldestSlot();
+ }
+
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSlotFileName(requestsequencenumber);
+ int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
+ doWrite(slotfd, data, length);
+ close(slotfd);
+ delete filename;
+ newestentry = requestsequencenumber;
+
+ /* Update the seuqence numbers and other status file information. */
+ updateStatusFile();
+
+ /* Send response acknowledging success */
+ char command[]="putslot";
+ sendResponse(command, sizeof(command)-1);
+}
+
+/**
+ * Method sends response. It wraps in appropriate headers for web
+ * server.
+ */
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+ cout << "Accept-Ranges: bytes\r\n"
+ << "Content-Length: " << len << "\r\n"
+ << "\r\n";
+ cout.write(bytes, len);
+}
+
+/**
+ * Computes the name for a slot file for the given sequence number.
+ */
+
+char * IoTQuery::getSlotFileName(long long seqnum) {
+ int directorylen=strlen(directory);
+
+ /* Size is 19 digits for ASCII representation of a long + 4
+ characters for SLOT string + 1 character for null termination +
+ directory size*/
+
+ char * filename=new char[25+directorylen];
+ snprintf(filename, 25+directorylen, "%s/SLOT%lld", directory, seqnum);
+ return filename;
+}
+
+/**
+ * Computes the name for a salt file
+ */
+
+char * IoTQuery::getSaltFileName() {
+ int directorylen=strlen(directory);
+
+ /* Size is 4 characters for SALT string + 1 character for null
+ termination + directory size*/
+
+ char * filename=new char[6+directorylen];
+ snprintf(filename, 6+directorylen, "%s/SALT", directory);
+ return filename;
+}
+
+/**
+ * Removes the oldest slot file
+ */
+
+void IoTQuery::removeOldestSlot() {
+ if (oldestentry!=0) {
+ char * filename=getSlotFileName(oldestentry);
+ unlink(filename);
+ delete filename;
+ }
+ oldestentry++;
+}
+
+/**
+ * Processes the query sent to the fastcgi handler.
+ */
+
+void IoTQuery::processQuery() {
+ getQuery();
+ getDirectory();
+ readData();
+
+ /* Verify that we receive a post request. */
+ if (strncmp(method, "POST", 4) != 0) {
+ cerr << "Not POST Request" << endl;
+ return;
+ }
+
+ /* Make sure the directory is okay. */
+ if (directory == NULL ||
+ !checkDirectory()) {
+ cerr << "Directory " << directory << " does not exist" << endl;
+ return;
+ }
+
+ /* Get queue state from the status file. If it doesn't exist,
+ create it. */
+ if (!openStatusFile()) {
+ cerr << "Failed to open status file" << endl;
+ return;
+ }
+
+ /* Lock status file to keep other requests out. */
+ flock(fd, LOCK_EX);
+
+ /* Decode query. */
+ decodeQuery();
+
+ /* Handle request. */
+ if (reqGetSlot)
+ getSlot();
+ else if (reqPutSlot)
+ putSlot();
+ else if (reqSetSalt)
+ setSalt();
+ else if (reqGetSalt)
+ getSalt();
+ else {
+ cerr << "No recognized request" << endl;
+ return;
+ }
+}
+
+/**
+ * Reads in data for request. This is used for the slot to be
+ * inserted.
+ */
+
+void IoTQuery::readData() {
+ if (length) {
+ data = new char[length+1];
+ memset(data, 0, length+1);
+ cin.read(data, length);
+ }
+ do {
+ char dummy;
+ cin >> dummy;
+ } while (!cin.eof());
+}
+
+
+/**
+ * Reads relevant environmental variables to find out the request.
+ */
+
+void IoTQuery::getQuery() {
+ uri = FCGX_GetParam(uri_str, request->envp);
+ query = FCGX_GetParam(query_str, request->envp);
+ method = FCGX_GetParam(method_str, request->envp);
+ iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+
+ /** We require the content-length header to be sent. */
+ char * reqlength = FCGX_GetParam(length_str, request->envp);
+ if (reqlength) {
+ length=strtoll(reqlength, NULL, 10);
+ } else {
+ length=0;
+ }
+}
+
+/**
+ * Initializes directory field from environmental variables.
+ */
+
+void IoTQuery::getDirectory() {
+ char * split = strchr((char *)uri, '?');
+ if (split == NULL)
+ return;
+ int split_len = (int) (split-uri);
+ int rootdir_len = strlen(iotcloudroot);
+ int directory_len = split_len + rootdir_len + 1;
+ directory = new char[directory_len];
+ memcpy(directory, iotcloudroot, rootdir_len);
+ memcpy(directory + rootdir_len, uri, split_len);
+ directory[directory_len-1]=0;
+}
+
+/**
+ * Helper function that is used to read the status file.
+ */
+
+int doread(int fd, void *ptr, size_t count, off_t offset) {
+ do {
+ size_t bytesread=pread(fd, ptr, count, offset);
+ if (bytesread==count) {
+ return 1;
+ } else if (bytesread==0) {
+ return 0;
+ }
+ } while(1);
+}
+
+
+/**
+ * Writes the current state to the status file.
+ */
+
+void IoTQuery::updateStatusFile() {
+ pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
+ pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
+ pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
+}
+
+/**
+ * Reads in queue state from the status file. Returns true if
+ * successful.
+ */
+
+bool IoTQuery::openStatusFile() {
+ char statusfile[]="queuestatus";
+ int len=strlen(directory);
+
+ char * filename=new char[len+sizeof(statusfile)+2];
+ memcpy(filename, directory, len);
+ filename[len]='/';
+ memcpy(filename+len+1, statusfile, sizeof(statusfile));
+ filename[len+sizeof(statusfile)+1]=0;
+ fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
+ delete filename;
+
+ if (fd < 0) {
+ cerr << strerror(errno) << " error opening statusfile" << endl;
+ return false;
+ }
+
+ /* Read in queue size, oldest sequence number, and newest sequence number. */
+ int size;
+ int needwrite=0;
+ if (doread(fd, &size, sizeof(size), OFFSET_MAX))
+ numqueueentries=size;
+ else
+ needwrite=1;
+
+ long long entry;
+ if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
+ oldestentry=entry;
+ else
+ needwrite=1;
+
+ if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
+ newestentry=entry;
+ else
+ needwrite=1;
+
+ if (needwrite)
+ updateStatusFile();
+
+ return true;
+}
+
+
--- /dev/null
+#ifndef IOTQUERY_H
+#define IOTQUERY_H
+#include <iostream>
+#include "fcgio.h"
+#include "fcgi_stdio.h"
+
+#define DEFAULT_SIZE 128
+#define OFFSET_MAX 0
+#define OFFSET_OLD 4
+#define OFFSET_NEW 12
+
+class IoTQuery {
+public:
+ IoTQuery(FCGX_Request * request);
+ ~IoTQuery();
+ void processQuery();
+
+private:
+ void sendResponse(char *data, int length);
+ void getQuery();
+ void getDirectory();
+ void readData();
+ bool checkDirectory();
+ bool openStatusFile();
+ void updateStatusFile();
+ void decodeQuery();
+ void getSlot();
+ void putSlot();
+ void setSalt();
+ void getSalt();
+ void removeOldestSlot();
+ char * getSlotFileName(long long);
+ char * getSaltFileName();
+
+ FCGX_Request * request;
+ char *data;
+ /* Directory slot files are placed in. */
+ char *directory;
+ /* Full URI from Apache */
+ const char * uri;
+ /* Query portion of URI */
+ const char * query;
+ /* Type of request: GET or PUT */
+ const char * method;
+ /* Root directory for all accounts */
+ const char * iotcloudroot;
+ /* Expected length of data from client */
+ long long length;
+ /* Sequence number for oldest slot */
+ long long oldestentry;
+ /* Sequence number for newest slot */
+ long long newestentry;
+ /* Sequence number from request */
+ long long requestsequencenumber;
+ /* Size of queue */
+ int numqueueentries;
+ /* fd for queuestatus file */
+ int fd;
+ /* Is the request to get a slot? */
+ bool reqGetSlot;
+ /* Is the request to put a slot? */
+ bool reqPutSlot;
+ /* Is the request to set the salt? */
+ bool reqSetSalt;
+ /* Is the request to get the salt? */
+ bool reqGetSalt;
+};
+#endif