+
+ private void setResizeThreshold() {
+ int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
+ resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
+ }
+
+ private boolean tryput(IoTString key, IoTString value, boolean resize) {
+ Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize=true; //Resize is forced
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status=new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
+
+ if (! rejectedmessagelist.isEmpty()) {
+ /* TODO: We should avoid generating a rejected message entry if
+ * there is already a sufficient entry in the queue (e.g.,
+ * equalsto value of true and same sequence number). */
+
+ long old_seqn=rejectedmessagelist.firstElement();
+ if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+ long new_seqn=rejectedmessagelist.lastElement();
+ RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+ s.addEntry(rm);
+ } else {
+ long prev_seqn = -1;
+ int i=0;
+ /* Go through list of missing messages */
+ for(; i<rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ if (s_msg != null)
+ break;
+ prev_seqn=curr_seqn;
+ }
+ /* Generate rejected message entry for missing messages */
+ if (prev_seqn != -1) {
+ RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+ s.addEntry(rm);
+ }
+ /* Generate rejected message entries for present messages */
+ for(; i<rejectedmessagelist.size(); i++) {
+ long curr_seqn=rejectedmessagelist.get(i);
+ Slot s_msg=buffer.getSlot(curr_seqn);
+ long machineid=s_msg.getMachineID();
+ RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ s.addEntry(rm);
+ }
+ }
+ }
+
+ long newestseqnum = buffer.getNewestSeqNum();
+ long oldestseqnum = buffer.getOldestSeqNum();
+ if (lastliveslotseqn < oldestseqnum)
+ lastliveslotseqn = oldestseqnum;
+
+ long seqn = lastliveslotseqn;
+ boolean seenliveslot = false;
+ long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+ long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
+
+ for(; seqn < threshold; seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (! seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (! prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for(Entry liveentry:liveentries) {
+ if (s.hasSpace(liveentry)) {
+ s.addEntry(liveentry);
+ } else if (seqn==firstiffull) { //if there's no space but the entry is about to fall off the queue
+ if (!resize) {
+ System.out.print("B"); //?
+ return tryput(key, value, true);
+ }
+ }
+ }
+ }
+
+ KeyValue kv=new KeyValue(s, key, value);
+ boolean insertedkv=false;
+ if (s.hasSpace(kv)) {
+ s.addEntry(kv);
+ insertedkv=true;
+ }
+
+ /* now go through live entries from least to greatest sequence number until
+ * either all live slots added, or the slot doesn't have enough room
+ * for SKIP_THRESHOLD consecutive entries*/
+ int skipcount=0;
+ search:
+ for(; seqn <= newestseqnum; seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (!prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for(Entry liveentry:liveentries) {
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else {
+ skipcount++;
+ if (skipcount > SKIP_THRESHOLD)
+ break search;
+ }
+ }
+ }
+
+ int max=0;
+ if (resize)
+ max = newsize;
+ Slot[] array=cloud.putSlot(s, max);
+ if (array == null) {
+ array = new Slot[] {s};
+ rejectedmessagelist.clear();
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
+ insertedkv=false;
+ }
+
+ /* update data structure */
+ validateandupdate(array, true);
+
+ return insertedkv;
+ }
+
+ private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+ /* The cloud communication layer has checked slot HMACs already
+ before decoding */
+ if (newslots.length==0) return;