From e9b05be069860b5a810b55b30ae07c2ce48af407 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Sat, 20 Jan 2018 00:35:47 -0800 Subject: [PATCH] edits --- version2/src/C/CloudComm.cc | 46 ++++++++++++++++++------------- version2/src/C/Commit.cc | 32 ++++++--------------- version2/src/C/CommitPart.cc | 4 --- version2/src/C/CommitPart.h | 1 - version2/src/C/Entry.h | 4 +-- version2/src/C/Table.cc | 18 ++---------- version2/src/C/TransactionPart.cc | 4 --- version2/src/C/TransactionPart.h | 1 - version2/src/C/URL.h | 3 ++ 9 files changed, 42 insertions(+), 71 deletions(-) diff --git a/version2/src/C/CloudComm.cc b/version2/src/C/CloudComm.cc index 54b92da..edc0706 100644 --- a/version2/src/C/CloudComm.cc +++ b/version2/src/C/CloudComm.cc @@ -86,7 +86,6 @@ void CloudComm::initCrypt() { if (password == NULL) { return; } - try { key = initKey(); password = NULL;// drop password @@ -101,33 +100,40 @@ void CloudComm::initCrypt() { * Builds the URL for the given request. */ URL *CloudComm::buildRequest(bool isput, int64_t sequencenumber, int64_t maxentries) { - IoTString *reqstring = isput ? "req=putslot" : "req=getslot"; - IoTString *urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber; + const char *reqstring = isput ? "req=putslot" : "req=getslot"; + char * buffer = (char *) malloc(baseurl->length() + 200); + memcpy(buffer, baseurl->internalBytes(), baseurl->length()); + int offset = baseurl->length(); + offset+=sprintf(&buffer[offset], "?%s&seq=%" PRId64, reqstring, sequencenumber); if (maxentries != 0) - urlstr += "&max=" + maxentries; + sprintf(&buffer[offset], "&max=%" PRId64, maxentries); + IoTString *urlstr = new IoTString(buffer); + free(buffer); return new URL(urlstr); } void CloudComm::setSalt() { - if (salt != NULL) { - // Salt already sent to server so dont set it again + // Salt already sent to server so don't set it again return; } - + try { Array *saltTmp = new Array(CloudComm_SALT_SIZE); random->nextBytes(saltTmp); - for (int i = 0; i < CloudComm_SALT_SIZE; i++) { - printf("%d\n", (int)saltTmp->get(i) & 255); - } - - URL *url = new URL(baseurl + "?req=setsalt"); + char * buffer = (char *) malloc(baseurl->length() + 100); + memcpy(buffer, baseurl->internalBytes(), baseurl->length()); + int offset = baseurl->length(); + offset+=sprintf(&buffer[offset], "?req=setsalt"); + IoTString *urlstr = new IoTString(buffer); + free(buffer); + + URL *url = new URL(urlstr); timer->startTime(); URLConnection *con = url->openConnection(); HttpURLConnection *http = (HttpURLConnection *) con; - + http->setRequestMethod("POST"); http->setFixedLengthStreamingMode(saltTmp->length()); http->setDoOutput(true); @@ -140,8 +146,6 @@ void CloudComm::setSalt() { int responsecode = http->getResponseCode(); if (responsecode != HttpURLConnection_HTTP_OK) { - // TODO: Remove this print - printf("%d\n", responsecode); throw new Error("Invalid response"); } @@ -159,7 +163,14 @@ bool CloudComm::getSalt() { HttpURLConnection *http = NULL; try { - url = new URL(baseurl + "?req=getsalt"); + char * buffer = (char *) malloc(baseurl->length() + 100); + memcpy(buffer, baseurl->internalBytes(), baseurl->length()); + int offset = baseurl->length(); + offset+=sprintf(&buffer[offset], "?req=getsalt"); + IoTString *urlstr = new IoTString(buffer); + free(buffer); + + url = new URL(urlstr); } catch (Exception *e) { throw new Error("getSlot failed"); } @@ -219,9 +230,7 @@ Array *CloudComm::encryptSlotAndPrependIV(Array *rawData, Arrayinit(Cipher_ENCRYPT_MODE, key, ivSpec); - Array *encryptedBytes = cipher->doFinal(rawData); - Array *chars = new Array(encryptedBytes->length() + CloudComm_IV_SIZE); System_arraycopy(ivBytes, 0, chars, 0, ivBytes->length()); System_arraycopy(encryptedBytes, 0, chars, CloudComm_IV_SIZE, encryptedBytes->length()); @@ -238,7 +247,6 @@ Array *CloudComm::stripIVAndDecryptSlot(Array *rawData) { Array *encryptedBytes = new Array(rawData->length() - CloudComm_IV_SIZE); System_arraycopy(rawData, 0, ivBytes, 0, CloudComm_IV_SIZE); System_arraycopy(rawData, CloudComm_IV_SIZE, encryptedBytes, 0, encryptedBytes->length); - IvParameterSpec *ivSpec = new IvParameterSpec(ivBytes); Cipher *cipher = Cipher_getInstance("AES/CTR/NoPadding"); cipher->init(Cipher_DECRYPT_MODE, key, ivSpec); diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index 34d51dc..6019732 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -16,7 +16,6 @@ Commit::Commit() : liveKeys(new Hashset) { } - Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) : parts(new Hashtable()), missingParts(NULL), @@ -109,18 +108,13 @@ int32_t Commit::getNumberOfParts() { } void Commit::setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (int32_t partNumber : parts->keySet()) { - CommitPart *part = parts->get(partNumber); - part->setDead(); + if (!isDead) { + isDead = true; + // Make all the parts of this transaction dead + for (int32_t partNumber : parts->keySet()) { + CommitPart *part = parts->get(partNumber); + part->setDead(); + } } } @@ -130,17 +124,14 @@ CommitPart *Commit::getPart(int index) { void Commit::createCommitParts() { parts->clear(); - // Convert to chars Array *charData = convertDataToBytes(); - int commitPartCount = 0; int currentPosition = 0; int remaining = charData->length(); while (remaining > 0) { - bool isLastPart = false; // determine how much to copy int copySize = CommitPart_MAX_NON_HEADER_SIZE; @@ -164,7 +155,6 @@ void Commit::createCommitParts() { } void Commit::decodeCommitData() { - // Calculate the size of the data section int dataSize = 0; for (int i = 0; i < parts->keySet()->size(); i++) { @@ -221,37 +211,31 @@ Array *convertDataToBytes() { void Commit::setKVsMap(Hashtable *newKVs) { keyValueUpdateSet->clear(); - liveKeys->clear(); - keyValueUpdateSet->addAll(newKVs->values()); + liveKeys->clear(); liveKeys->addAll(newKVs->keySet()); } Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { - if (older == NULL) { return newer; } else if (newer == NULL) { return older; } - Hashtable *kvSet = new Hashtable(); for (KeyValue *kv : older->getKeyValueUpdateSet()) { kvSet->put(kv->getKey(), kv); } - for (KeyValue *kv : newer->getKeyValueUpdateSet()) { kvSet->put(kv->getKey(), kv); } int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber(); - if (transactionSequenceNumber == -1) { transactionSequenceNumber = older->getTransactionSequenceNumber(); } Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber); - newCommit->setKVsMap(kvSet); return newCommit; diff --git a/version2/src/C/CommitPart.cc b/version2/src/C/CommitPart.cc index 3f3208d..2e08c4b 100644 --- a/version2/src/C/CommitPart.cc +++ b/version2/src/C/CommitPart.cc @@ -24,10 +24,6 @@ int CommitPart::getSize() { return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data->length(); } -void CommitPart::setSlot(Slot *s) { - parentslot = s; -} - int CommitPart::getPartNumber() { return partNumber; } diff --git a/version2/src/C/CommitPart.h b/version2/src/C/CommitPart.h index 8c8e926..a569f1c 100644 --- a/version2/src/C/CommitPart.h +++ b/version2/src/C/CommitPart.h @@ -24,7 +24,6 @@ public: CommitPart(Slot *s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, Array *_data, bool _isLastPart); ~CommitPart(); int getSize(); - void setSlot(Slot *s); int getPartNumber(); int getDataSize(); Array *getData(); diff --git a/version2/src/C/Entry.h b/version2/src/C/Entry.h index 1a8c4b5..e006b4a 100644 --- a/version2/src/C/Entry.h +++ b/version2/src/C/Entry.h @@ -23,8 +23,7 @@ class Entry : public Liveness { superceded by a newer update. */ private: bool islive; - -protected: + protected: Slot *parentslot; public: @@ -63,6 +62,7 @@ public: * Returns a copy of the Entry that can be added to a different slot. */ virtual Entry *getCopy(Slot *s) = 0; + friend Entry *Entry_decode(Slot *slot, ByteBuffer *bb); }; /** diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 2c8b8e8..5e69e13 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -316,10 +316,7 @@ bool Table::update() { Array *newSlots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); sendToServer(NULL); - - updateLiveTransactionsAndStatus(); - return true; } catch (Exception *e) { for (int64_t m : localCommunicationTable->keySet()) { @@ -336,7 +333,6 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) { // There is already an arbitrator return false; } - NewKey *newKey = new NewKey(NULL, keyName, machineId); if (sendToServer(newKey)) { @@ -370,7 +366,6 @@ void Table::addKV(IoTString *key, IoTString *value) { } TransactionStatus *Table::commitTransaction() { - if (pendingTransactionBuilder->getKVUpdates()->size() == 0) { // transaction with no updates will have no effect on the system return new TransactionStatus(TransactionStatus_StatusNoEffect, -1); @@ -444,13 +439,8 @@ int64_t Table::getLocalSequenceNumber() { return localSequenceNumber; } - -bool lastInsertedNewKey = false; - bool Table::sendToServer(NewKey *newKey) { - bool fromRetry = false; - try { if (hadPartialSendToServer) { Array *newSlots = cloud->getSlots(sequenceNumber + 1); @@ -467,26 +457,23 @@ bool Table::sendToServer(NewKey *newKey) { for (Transaction *transaction : lastTransactionPartsSent->keySet()) { transaction->resetServerFailure(); - // Update which transactions parts still need to be sent transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - // Add the transaction status to the outstanding list outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); // Update the transaction status transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - // Check if all the transaction parts were successfully sent and if so then remove it from pending + // Check if all the transaction parts were successfully + // sent and if so then remove it from pending if (transaction->didSendAllParts()) { transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); pendingTransactionQueue->remove(transaction); } } } else { - newSlots = sendSlotsReturn->getThird(); - bool isInserted = false; for (uint si = 0; si < newSlots->length(); si++) { Slot *s = newSlots->get(si); @@ -1134,7 +1121,6 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey if (newKeyEntry != NULL) { newKeyEntry->setSlot(slot); if (slot->hasSpace(newKeyEntry)) { - slot->addEntry(newKeyEntry); inserted = true; } diff --git a/version2/src/C/TransactionPart.cc b/version2/src/C/TransactionPart.cc index 86393d0..fc13ead 100644 --- a/version2/src/C/TransactionPart.cc +++ b/version2/src/C/TransactionPart.cc @@ -8,10 +8,6 @@ int TransactionPart::getSize() { return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data->length(); } -void TransactionPart::setSlot(Slot *s) { - parentslot = s; -} - Pair TransactionPart::getTransactionId() { return transactionId; } diff --git a/version2/src/C/TransactionPart.h b/version2/src/C/TransactionPart.h index f832c30..c8aa649 100644 --- a/version2/src/C/TransactionPart.h +++ b/version2/src/C/TransactionPart.h @@ -35,7 +35,6 @@ public: } int getSize(); - void setSlot(Slot *s); Pair getTransactionId(); int64_t getArbitratorId(); Pair getPartId(); diff --git a/version2/src/C/URL.h b/version2/src/C/URL.h index 762c1ae..72139c3 100644 --- a/version2/src/C/URL.h +++ b/version2/src/C/URL.h @@ -1,5 +1,8 @@ #ifndef URL_H #define URL_H +#include "common.h" class URL { + public: + URL(IoTString *string); }; #endif -- 2.34.1