if (isDead) {
// If dead then just kill this part and move on
- newPart.setDead();
+ newPart->setDead();
return;
}
- CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-
+ CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
+
if (previoslySeenPart != NULL) {
// Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
+ previoslySeenPart->setDead();
+ } else if (newPart->isLastPart()) {
missingParts = new HashSet<int32_t>();
hasLastPart = true;
-
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
+
+ for (int i = 0; i < newPart->getPartNumber(); i++) {
+ if (parts->get(i) == NULL) {
+ missingParts->add(i);
}
}
}
-
+
if (!fldisComplete && hasLastPart) {
-
+
// We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
-
+ missingParts->remove(newPart->getPartNumber());
+
// Check if all the parts have been seen
- if (missingParts.size() == 0) {
-
+ if (missingParts->size() == 0) {
+
// We have all the parts
fldisComplete = true;
-
+
// Decode all the parts and create the key value guard and update sets
decodeCommitData();
-
+
// Get the sequence number and arbitrator of this transaction
- sequenceNumber = parts.get(0).getSequenceNumber();
- machineId = parts.get(0).getMachineId();
- transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
+ sequenceNumber = parts->get(0)->getSequenceNumber();
+ machineId = parts->get(0)->getMachineId();
+ transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
}
}
}
- int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
+int64_t Commit::getSequenceNumber() {
+ return sequenceNumber;
+}
- Hashtable<int32_t, CommitPart> getParts() {
- return parts;
- }
+int64_t Commit::getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+}
- void addKV(KeyValue kv) {
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
+Hashtable<int32_t, CommitPart *> *Commit::getParts() {
+ return parts;
+}
- void invalidateKey(IoTString key) {
- liveKeys.remove(key);
+void Commit::addKV(KeyValue *kv) {
+ keyValueUpdateSet->add(kv);
+ liveKeys->add(kv->getKey());
+}
- if (liveKeys.size() == 0) {
- setDead();
- }
- }
+void Commit::invalidateKey(IoTString *key) {
+ liveKeys->remove(key);
- Set<KeyValue> getKeyValueUpdateSet() {
- return keyValueUpdateSet;
- }
+ if (liveKeys->size() == 0) {
+ setDead();
+ }
+}
-int32_t getNumberOfParts() {
- return parts.size();
+Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
}
- void setDead() {
- if (isDead) {
- // Already dead
- return;
- }
+int32_t Commit::getNumberOfParts() {
+ return parts->size();
+}
- // Set dead
- isDead = true;
+void Commit::setDead() {
+ if (isDead) {
+ // Already dead
+ return;
+ }
- // Make all the parts of this transaction dead
- for (int32_t partNumber : parts.keySet()) {
- CommitPart part = parts.get(partNumber);
- part.setDead();
- }
- }
+ // Set dead
+ isDead = true;
- CommitPart getPart(int index) {
- return parts.get(index);
- }
+ // Make all the parts of this transaction dead
+ for (int32_t partNumber : parts->keySet()) {
+ CommitPart part = parts->get(partNumber);
+ part->setDead();
+ }
+}
- void createCommitParts() {
+CommitPart *Commit::getPart(int index) {
+ return parts->get(index);
+}
- parts.clear();
+void Commit::createCommitParts() {
+ parts->clear();
- // Convert to chars
- char[] charData = convertDataToBytes();
+ // Convert to chars
+ Array<char> *charData = convertDataToBytes();
- int commitPartCount = 0;
- int currentPosition = 0;
- int remaining = charData.length;
+ int commitPartCount = 0;
+ int currentPosition = 0;
+ int remaining = charData->length();
- while (remaining > 0) {
+ while (remaining > 0) {
- Boolean isLastPart = false;
- // determine how much to copy
- int copySize = CommitPart.MAX_NON_HEADER_SIZE;
- if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
- copySize = remaining;
- isLastPart = true; // last bit of data so last part
- }
+ bool isLastPart = false;
+ // determine how much to copy
+ int copySize = CommitPart_MAX_NON_HEADER_SIZE;
+ if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
+ copySize = remaining;
+ isLastPart = true;// last bit of data so last part
+ }
- // Copy to a smaller version
- char[] partData = new char[copySize];
- System.arraycopy(charData, currentPosition, partData, 0, copySize);
+ // Copy to a smaller version
+ Array<char> *partData = new Array<char>(copySize);
+ System->arraycopy(charData, currentPosition, partData, 0, copySize);
- CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
- parts.put(part.getPartNumber(), part);
+ CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+ parts->put(part->getPartNumber(), part);
- // Update position, count and remaining
- currentPosition += copySize;
- commitPartCount++;
- remaining -= copySize;
- }
- }
+ // Update position, count and remaining
+ currentPosition += copySize;
+ commitPartCount++;
+ remaining -= copySize;
+ }
+}
- void decodeCommitData() {
+void Commit::decodeCommitData() {
- // Calculate the size of the data section
- int dataSize = 0;
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- dataSize += tp.getDataSize();
- }
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (int i = 0; i < parts->keySet()->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ dataSize += tp->getDataSize();
+ }
- char[] combinedData = new char[dataSize];
- int currentPosition = 0;
+ Array<char> *combinedData = new Array<char>(dataSize);
+ int currentPosition = 0;
- // Stitch all the data sections together
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
- currentPosition += tp.getDataSize();
- }
+ // Stitch all the data sections together
+ for (int i = 0; i < parts->keySet()->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ System->arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ currentPosition += tp->getDataSize();
+ }
- // Decoder Object
- ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+ // Decoder Object
+ ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
- // Decode how many key value pairs need to be decoded
- int numberOfKVUpdates = bbDecode.getInt();
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVUpdates = bbDecode->getInt();
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
- }
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue *kv = (KeyValue *)KeyValue->decode(bbDecode);
+ keyValueUpdateSet->add(kv);
+ liveKeys->add(kv->getKey());
+ }
+}
- char[] convertDataToBytes() {
+Array<char> *convertDataToBytes() {
- // Calculate the size of the data
- int sizeOfData = sizeof(int32_t); // Number of Update KV's
- for (KeyValue kv : keyValueUpdateSet) {
- sizeOfData += kv.getSize();
- }
+ // Calculate the size of the data
+ int sizeOfData = sizeof(int32_t); // Number of Update KV's
+ for (KeyValue *kv : keyValueUpdateSet) {
+ sizeOfData += kv->getSize();
+ }
- // Data handlers and storage
- char[] dataArray = new char[sizeOfData];
- ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
+ // Data handlers and storage
+ Array<char> *dataArray = new Array<char>(sizeOfData);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
- // Encode the size of the updates and guard sets
- bbEncode.putInt(keyValueUpdateSet.size());
+ // Encode the size of the updates and guard sets
+ bbEncode->putInt(keyValueUpdateSet->size());
- // Encode all the updates
- for (KeyValue kv : keyValueUpdateSet) {
- kv.encode(bbEncode);
- }
+ // Encode all the updates
+ for (KeyValue *kv : keyValueUpdateSet) {
+ kv->encode(bbEncode);
+ }
- return bbEncode.array();
- }
+ return bbEncode->array();
+}
- void setKVsMap(Hashtable<IoTString, KeyValue> newKVs) {
- keyValueUpdateSet.clear();
- liveKeys.clear();
+void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
+ keyValueUpdateSet->clear();
+ liveKeys->clear();
- keyValueUpdateSet.addAll(newKVs.values());
- liveKeys.addAll(newKVs.keySet());
+ keyValueUpdateSet->addAll(newKVs->values());
+ liveKeys->addAll(newKVs->keySet());
- }
+}
- static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
+Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
- if (older == NULL) {
- return newer;
- } else if (newer == NULL) {
- return older;
- }
+ if (older == NULL) {
+ return newer;
+ } else if (newer == NULL) {
+ return older;
+ }
- Hashtable<IoTString, KeyValue> kvSet = new Hashtable<IoTString, KeyValue>();
- for (KeyValue kv : older.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
+ Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
+ for (KeyValue *kv : older->getKeyValueUpdateSet()) {
+ kvSet->put(kv->getKey(), kv);
+ }
- for (KeyValue kv : newer.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
+ for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
+ kvSet->put(kv->getKey(), kv);
+ }
- int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
+ int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
- if (transactionSequenceNumber == -1) {
- transactionSequenceNumber = older.getTransactionSequenceNumber();
- }
+ if (transactionSequenceNumber == -1) {
+ transactionSequenceNumber = older->getTransactionSequenceNumber();
+ }
- Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
+ Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
- newCommit.setKVsMap(kvSet);
+ newCommit->setKVsMap(kvSet);
- return newCommit;
- }
+ return newCommit;
}