12 const char * query_str="QUERY_STRING";
13 const char * uri_str="REQUEST_URI";
14 const char * method_str="REQUEST_METHOD";
15 const char * iotcloudroot_str="IOTCLOUD_ROOT";
16 const char * length_str="CONTENT_LENGTH";
18 IoTQuery::IoTQuery(FCGX_Request *request) :
29 requestsequencenumber(0),
30 numqueueentries(DEFAULT_SIZE),
34 IoTQuery::~IoTQuery() {
43 bool IoTQuery::checkDirectory() {
45 int err=stat(directory, &s);
48 return S_ISDIR(s.st_mode);
51 void IoTQuery::decodeQuery() {
52 int len=strlen(query);
53 char * str=new char[len+1];
54 memcpy(str, query, len+1);
58 char *command=strsep(&tok_ptr, "&");
59 if (strncmp(command, "putslot", 7) == 0)
62 if (strncmp(command, "getslot", 7) == 0)
65 /* Load Sequence Number for request */
66 char *sequencenumber_str = strsep(&tok_ptr, "&");
68 if (sequencenumber_str != NULL)
69 requestsequencenumber = strtoll(sequencenumber_str, NULL, 10);
71 /* Update size if we get request */
72 char * numqueueentries_str = tok_ptr;
73 if (numqueueentries_str != NULL)
74 numqueueentries = strtoll(numqueueentries_str, NULL, 10);
79 void doWrite(int fd, char *data, long long length) {
82 long long byteswritten=write(fd, &data[offset], length);
83 if (byteswritten > 0) {
84 length -= byteswritten;
85 offset += byteswritten;
87 cerr << "Bytes not written";
93 bool doRead(int fd, void *buf, int numbytes) {
95 char *ptr=(char *)buf;
97 int bytesread=read(fd, ptr+offset, numbytes);
100 numbytes -= bytesread;
103 } while (numbytes!=0);
107 void IoTQuery::getSlot() {
108 int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
109 long long numbytes = 0;
110 int filesizes[numrequeststosend];
111 int fdarray[numrequeststosend];
113 for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
115 char *filename=getSlotFileName(seqn);
116 if (stat(filename, &st) == 0) {
117 fdarray[index]=open(filename, O_RDONLY);
118 filesizes[index]=st.st_size;
119 numbytes+=filesizes[index];
126 const char header[]="getdata";
127 long long size=sizeof(header)+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
128 char * response = new char[size];
130 memcpy(response, header, sizeof(header));
131 offset+=sizeof(header);
132 memcpy(response + offset, &numrequeststosend, sizeof(numrequeststosend));
133 offset+=sizeof(numrequeststosend);
134 for(int i=0;i<numrequeststosend;i++) {
135 memcpy(response + offset, &filesizes[i], sizeof(int));
140 for(int i=0;i<numrequeststosend;i++) {
142 doRead(fdarray[i], response+offset, filesizes[i]);
143 offset+=filesizes[i];
148 sendResponse(response, size);
152 for(int i=0;i<numrequeststosend;i++) {
158 void IoTQuery::putSlot() {
159 if (requestsequencenumber!=(newestentry+1)) {
164 int numberofliveslots=(int) ((newestentry-oldestentry)+1);
165 if (numberofliveslots >= numqueueentries) {
169 //write slot data out to file
170 char *filename = getSlotFileName(requestsequencenumber);
171 int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
172 doWrite(slotfd, data, length);
175 newestentry = requestsequencenumber; // update sequence number
176 updateStatusFile(); // update counts
177 char command[]="putdata";
178 sendResponse(command, sizeof(command));
181 void IoTQuery::sendResponse(char * bytes, int len) {
182 cout << "Accept-Ranges: bytes\r\n"
183 << "Content-Length: " << len << "\r\n"
185 cout.write(bytes, len);
188 char * IoTQuery::getSlotFileName(long long slot) {
189 int directorylen=strlen(directory);
190 char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination
191 snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
195 void IoTQuery::removeOldestSlot() {
196 if (oldestentry!=0) {
197 char * filename=getSlotFileName(oldestentry);
204 void IoTQuery::processQuery() {
209 if (strncmp(method, "POST", 4) != 0)
212 if (directory == NULL ||
216 if (!openStatusFile())
230 void IoTQuery::readData() {
232 data = new char[length+1];
233 memset(data, 0, length+1);
234 cin.read(data, length);
239 } while (!cin.eof());
242 void IoTQuery::getQuery() {
243 uri = FCGX_GetParam(uri_str, request->envp);
244 query = FCGX_GetParam(query_str, request->envp);
245 method = FCGX_GetParam(method_str, request->envp);
246 iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
248 char * reqlength = FCGX_GetParam(length_str, request->envp);
250 length=strtoll(reqlength, NULL, 10);
256 void IoTQuery::getDirectory() {
257 char * split = strchr((char *)uri, '?');
260 int split_len = (int) (split-uri);
261 int rootdir_len = strlen(iotcloudroot);
262 int directory_len = split_len + rootdir_len + 1;
263 directory = new char[directory_len];
264 memcpy(directory, iotcloudroot, rootdir_len);
265 memcpy(directory + rootdir_len, uri, split_len);
266 directory[directory_len]=0;
269 int doread(int fd, void *ptr, size_t count, off_t offset) {
271 size_t bytesread=pread(fd, ptr, count, offset);
272 if (bytesread==count) {
274 } else if (bytesread==0) {
280 void IoTQuery::updateStatusFile() {
281 pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
282 pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
283 pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
286 bool IoTQuery::openStatusFile() {
287 char statusfile[]="queuestatus";
288 int len=strlen(directory);
290 char * filename=new char[len+sizeof(statusfile)+2];
291 memcpy(filename, directory, len);
293 memcpy(filename+len+1, statusfile, sizeof(statusfile));
294 filename[len+sizeof(statusfile)+1]=0;
295 fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
303 if (doread(fd, &size, sizeof(size), OFFSET_MAX))
304 numqueueentries=size;
309 if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
314 if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))