#include "MongoReplicable.hpp"
namespace sprawl
{
namespace serialization
{
void MongoReplicableSerializer::StartArray(String const& name, uint32_t& size, bool b)
{
ReplicableSerializer<MongoSerializer>::StartArray(name, size, b);
m_allArrays.insert(m_current_key);
}
uint32_t MongoReplicableSerializer::StartObject(String const& name, bool b)
{
uint32_t ret = ReplicableSerializer<MongoSerializer>::StartObject(name, b);
m_allObjs.insert(m_current_key);
return ret;
}
uint32_t MongoReplicableSerializer::StartMap(String const& name, bool b)
{
uint32_t ret = ReplicableSerializer<MongoSerializer>::StartMap(name, b);
m_allObjs.insert(m_current_key);
return ret;
}
void MongoReplicableSerializer::Mark()
{
ReplicableSerializer<MongoSerializer>::Mark();
m_markedArrays = std::move( m_allArrays );
m_markedObjs = std::move( m_allObjs );
m_markedObjData = std::move( m_objData );
m_array_tracker.clear();
}
void MongoReplicableSerializer::Discard()
{
ReplicableSerializer<MongoSerializer>::Discard();
m_allArrays.clear();
m_allObjs.clear();
m_objData.clear();
m_array_tracker.clear();
}
void MongoReplicableSerializer::Reset()
{
ReplicableSerializer<MongoSerializer>::Reset();
m_allArrays.clear();
m_markedArrays.clear();
m_allObjs.clear();
m_markedObjs.clear();
m_objData.clear();
m_markedObjData.clear();
m_array_tracker.clear();
}
void MongoReplicableSerializer::PushKey(String const& name, bool forArray)
{
bool parentIsArray = false;
if(!m_current_key.empty() && m_current_key[m_current_key.size() - 2] < 0)
{
parentIsArray = true;
}
if( this->m_name_index.count(name) == 0 )
{
this->m_reverse_name_index[this->m_highest_name] = name;
this->m_name_index[name] = this->m_highest_name++;
}
int key = this->m_name_index[name];
if(forArray)
{
//Use sign bit to encode this as an array member.
key = -key;
}
this->m_current_key.push_back(key);
if(parentIsArray)
{
int32_t& item = m_array_tracker.back();
++item;
m_current_key.push_back( item );
}
else
{
int16_t& depth = this->m_depth_tracker[this->m_current_key];
depth++;
this->m_current_key.push_back( depth );
}
if(forArray)
{
this->m_array_tracker.push_back(0);
}
}
void MongoReplicableSerializer::PopKey()
{
if( m_current_key[m_current_key.size() - 2] < 0 )
{
this->m_array_tracker.pop_back();
}
ReplicableSerializer<MongoSerializer>::PopKey();
}
std::vector<mongo::BSONObj> MongoReplicableSerializer::BuildDelta(MongoReplicableSerializer::BuildDeltaParams const& params)
{
ReplicationBSONMap const& objs = params.objs;
ReplicationMap const& data = params.data;
ReplicationMap const& markedData = params.markedData;
ReplicationSet const& allArrays = params.allArrays;
ReplicationSet const& markedArrays = params.markedArrays;
ReplicationSet const& allObjs = params.allObjs;
ReplicationSet const& markedObjs = params.markedObjs;
mongo::BSONObjBuilder updateQuery;
struct ObjectData
{
ObjectData()
: parentObject(nullptr)
, children()
, data()
, shortKey()
, fullKey()
, intKey()
, isArray(false)
, isNew(true)
, hasValidChildren(false)
, hasRemovedChildren(false)
, removeQuery()
{
//
}
void MarkNotNew()
{
isNew = false;
if(parentObject)
{
parentObject->MarkNotNew();
}
}
void SetRemovedChildren()
{
hasRemovedChildren = true;
if(parentObject)
{
parentObject->SetRemovedChildren();
}
}
void SetValidChildren()
{
hasValidChildren = true;
if(parentObject)
{
parentObject->SetValidChildren();
}
}
mongo::BSONElement BuildData()
{
if(children.empty())
{
return data;
}
if(isArray)
{
mongo::BSONArrayBuilder arrayBuilder;
for(auto& kvp : children)
{
arrayBuilder.append(kvp.second->BuildData());
}
mongo::BSONObjBuilder objBuilder;
objBuilder.append("firstElem", arrayBuilder.arr());
objData = objBuilder.obj();
}
else
{
mongo::BSONObjBuilder childObjsBuilder;
for(auto& kvp : children)
{
childObjsBuilder.appendAs(kvp.second->BuildData(), kvp.second->shortKey.c_str());
}
mongo::BSONObjBuilder objBuilder;
objBuilder.append("firstElem", childObjsBuilder.obj());
objData = objBuilder.obj();
}
return objData.firstElement();
}
ObjectData* parentObject;
std::map<ReplicationKey, ObjectData*, std::less<ReplicationKey>, sprawl::memory::StlWrapper<std::pair<ReplicationKey, ObjectData*>>> children;
mongo::BSONElement data;
mongo::BSONObj objData;
sprawl::String shortKey;
sprawl::String fullKey;
ReplicationKey intKey;
bool isArray;
bool isNew;
bool hasValidChildren;
bool hasRemovedChildren;
mongo::BSONObjBuilder removeQuery;
};
std::list<ObjectData, sprawl::memory::StlWrapper<ObjectData>> allObjects;
std::map<ReplicationKey, ObjectData*, std::less<ReplicationKey>, sprawl::memory::StlWrapper<std::pair<ReplicationKey, ObjectData*>>> allKeys;
ObjectData* currentObject;
typedef std::set<ObjectData*, std::less<ObjectData*>, sprawl::memory::StlWrapper<ObjectData*>> ObjectSet;
ObjectSet rootObjects;
auto buildObjectData = [
this,
&allKeys,
&allObjects,
¤tObject,
&rootObjects
] (
ReplicationKey const& keyToBuild,
bool newObj,
bool removal
)
{
ReplicationKey currentKey;
currentObject = nullptr;
uint32_t size = keyToBuild.size();
const int32_t* cArray = &keyToBuild[0];
for(uint32_t i = 1; i < size; i += 2)
{
currentKey.push_back(cArray[i - 1]);
currentKey.push_back(cArray[i]);
ObjectData* parentObject = currentObject;
auto it = allKeys.find(currentKey);
if(it != allKeys.end())
{
currentObject = it->second;
}
else
{
allObjects.emplace_back();
currentObject = &allObjects.back();
int key = currentKey[currentKey.size() - 2];
if(key < 0)
{
key = -key;
currentObject->isArray = true;
}
if(parentObject)
{
if(parentObject->isArray)
{
char buf[16];
#ifdef _WIN32
_snprintf( buf, 16, "%d", currentKey.back() - 1 );
#else
snprintf( buf, 16, "%d", currentKey.back() - 1 );
#endif
currentObject->shortKey = buf;
}
else
{
currentObject->shortKey = this->m_reverse_name_index[key];
}
currentObject->fullKey = parentObject->fullKey + "." + currentObject->shortKey;
parentObject->children.insert(std::make_pair(currentKey, currentObject));
}
else
{
currentObject->shortKey = this->m_reverse_name_index[key];
currentObject->fullKey = currentObject->shortKey;
rootObjects.insert(currentObject);
}
currentObject->parentObject = parentObject;
currentObject->intKey = currentKey;
allKeys.insert(std::make_pair(currentKey, currentObject));
}
if(!newObj)
{
currentObject->isNew = false;
}
if(removal)
{
currentObject->SetRemovedChildren();
}
else
{
currentObject->SetValidChildren();
}
}
};
bool changed_something = false;
//Adds and changes
for( auto& kvp : data )
{
auto it = markedData.find(kvp.first);
bool isNew = ( it == markedData.end() );
if( isNew || kvp.second != it->second )
{
changed_something = true;
mongo::BSONElement obj = objs.at(kvp.first)[this->m_reverse_name_index[kvp.first[kvp.first.size()-2]].c_str()];
buildObjectData(kvp.first, isNew, false);
if(currentObject)
{
currentObject->data = obj;
}
}
}
for( auto& key : allArrays )
{
if(!markedArrays.count(key))
{
if(!allKeys.count(key))
{
buildObjectData(key, true, false);
if(currentObject)
{
mongo::BSONObjBuilder b;
b.append("firstElement", mongo::BSONArray());
currentObject->objData = b.obj();
currentObject->data = currentObject->objData.firstElement();
}
changed_something = true;
}
}
else
{
auto kvp = allKeys.find(key);
if(kvp != allKeys.end())
{
kvp->second->MarkNotNew();
}
}
}
for( auto& key : allObjs )
{
if(!markedObjs.count(key))
{
if(!allKeys.count(key))
{
changed_something = true;
buildObjectData(key, true, false);
if(currentObject)
{
mongo::BSONObjBuilder b;
b.append("firstElement", mongo::BSONObj());
currentObject->objData = b.obj();
currentObject->data = currentObject->objData.firstElement();
}
}
}
else
{
auto kvp = allKeys.find(key);
if(kvp != allKeys.end())
{
kvp->second->MarkNotNew();
}
}
}
struct
{
void operator()(mongo::BSONObjBuilder& b, ObjectData* obj)
{
if(obj->isNew || obj->children.empty())
{
b.appendAs(obj->BuildData(), obj->fullKey.c_str());
}
else
{
for(auto& kvp : obj->children)
{
(*this)(b, kvp.second);
}
}
}
} getAddQuery;
if(changed_something)
{
mongo::BSONObjBuilder changed;
for(auto obj : rootObjects)
{
getAddQuery(changed, obj);
}
updateQuery.append("$set", changed.obj());
}
//Removes get their own special treatment
mongo::BSONObjBuilder removed;
bool removed_something = false;
for( auto& key : allArrays )
{
if(!allKeys.count(key))
{
buildObjectData(key, false, false);
}
}
for( auto& key : allObjs )
{
if(!allKeys.count(key))
{
buildObjectData(key, false, false);
}
}
for( auto& kvp : markedData )
{
auto it = data.find(kvp.first);
if( it == data.end() )
{
buildObjectData(kvp.first, false, true);
if(currentObject)
{
currentObject->removeQuery.append(currentObject->fullKey.c_str(), "");
}
removed_something = true;
}
}
for( auto& key : markedArrays )
{
if(!allArrays.count(key))
{
buildObjectData(key, false, true);
if(currentObject)
{
currentObject->removeQuery.append(currentObject->fullKey.c_str(), "");
}
removed_something = true;
}
}
for( auto& key : markedObjs )
{
if(!allObjs.count(key))
{
buildObjectData(key, false, true);
if(currentObject)
{
currentObject->removeQuery.append(currentObject->fullKey.c_str(), "");
}
removed_something = true;
}
}
struct
{
mongo::BSONObj operator()(ObjectData* obj)
{
if(obj->hasValidChildren)
{
for(auto& kvp : obj->children)
{
obj->removeQuery.appendElementsUnique((*this)(kvp.second));
}
return obj->removeQuery.obj();
}
else
{
return BSON(obj->fullKey.c_str() << "");
}
}
} getRemoveQuery;
if(removed_something)
{
for(ObjectData* obj : rootObjects)
{
if(obj->hasRemovedChildren)
{
removed.appendElementsUnique(getRemoveQuery(obj));
}
}
updateQuery.append("$unset", removed.obj());
}
std::vector<mongo::BSONObj> ret;
if(changed_something || removed_something)
{
ret.push_back(updateQuery.obj());
}
//If array members have been removed, we have to generate an arbitrary number of additional queries.
//This is one of the most frustrating aspects of Mongo: You can't directly remove an element from an array;
//you have to change it to a value that you know doesn't exist anywhere else in the array, then instruct
//mongo to remove all elements of that value from the array. Very backward.
//Worse than that, if you have nested arrays (say, [ [ 1, 2, 3 ], [ 4, 5, 6 ] ]) and you remove an item
//from the inner array and also remove an item from the outer array (say, changing it to [ [ 1, 2 ] ]
//mongo won't let you perform both removals at the same time. They have to be executed in separate steps.
//That means that we have to generate N additional queries to mongo to clean up these shrunk arrays. Blech.
struct
{
bool ArrayAlreadyAccountedFor(ObjectData* obj)
{
while(obj)
{
if(obj->isArray && objectsAccountedFor.count(obj))
{
return true;
}
obj = obj->parentObject;
}
return false;
}
void AddAccountedArray(ObjectData* obj)
{
while(obj)
{
if(obj->isArray)
{
objectsAccountedFor.insert(obj);
}
obj = obj->parentObject;
}
}
mongo::BSONObj operator()(ObjectData* obj)
{
if(obj->hasValidChildren)
{
mongo::BSONObjBuilder removedArrayIndexes;
for(auto& kvp : obj->children)
{
removedArrayIndexes.appendElementsUnique((*this)(kvp.second));
}
return removedArrayIndexes.obj();
}
else
{
if(obj->parentObject && obj->parentObject->isArray)
{
if(objectsAlreadyPulled.count(obj->parentObject))
{
return mongo::BSONObj();
}
if(ArrayAlreadyAccountedFor(obj->parentObject))
{
newRootObjects.insert(obj->parentObject);
return mongo::BSONObj();
}
pulled_something = true;
AddAccountedArray(obj->parentObject);
objectsAlreadyPulled.insert(obj->parentObject);
return BSON(obj->parentObject->fullKey.c_str() << mongo::BSONNULL);
}
else
{
return mongo::BSONObj();
}
}
}
void Reset()
{
newRootObjects.clear();
objectsAccountedFor.clear();
pulled_something = false;
}
ObjectSet newRootObjects;
ObjectSet objectsAccountedFor;
ObjectSet objectsAlreadyPulled;
bool pulled_something;
} getCorrectRemoveIndex;
if(removed_something)
{
while(!rootObjects.empty())
{
mongo::BSONObjBuilder pullQuery;
mongo::BSONObjBuilder pulled;
getCorrectRemoveIndex.Reset();
for(ObjectData* obj : rootObjects)
{
if(obj->hasRemovedChildren)
{
pulled.appendElementsUnique(getCorrectRemoveIndex(obj));
}
}
if(getCorrectRemoveIndex.pulled_something)
{
pullQuery.append("$pull", pulled.obj());
ret.push_back(pullQuery.obj());
}
rootObjects = std::move(getCorrectRemoveIndex.newRootObjects);
}
}
return std::move(ret);
}
void MongoReplicableSerializer::serialize(mongo::Date_t* var, String const& name, bool PersistToDB)
{
this->m_serializer->Reset();
this->PushKey(name);
m_serializer->serialize(var, name, PersistToDB);
m_data.insert(std::make_pair(m_current_key, m_serializer->Str()));
m_objData.insert(std::make_pair(m_current_key, m_serializer->Obj()));
if(!m_marked)
{
m_baseline->serialize(var, name, PersistToDB);
}
this->PopKey();
}
void MongoReplicableSerializer::serialize(mongo::BSONObj* var, String const& name, bool PersistToDB)
{
this->m_serializer->Reset();
this->PushKey(name);
m_serializer->serialize(var, name, PersistToDB);
m_data.insert(std::make_pair(m_current_key, m_serializer->Str()));
m_objData.insert(std::make_pair(m_current_key, m_serializer->Obj()));
if(!m_marked)
{
m_baseline->serialize(var, name, PersistToDB);
}
this->PopKey();
}
void MongoReplicableSerializer::serialize(mongo::OID* var, String const& name, bool PersistToDB)
{
this->m_serializer->Reset();
this->PushKey(name);
m_serializer->serialize(var, name, PersistToDB);
m_data.insert(std::make_pair(m_current_key, m_serializer->Str()));
m_objData.insert(std::make_pair(m_current_key, m_serializer->Obj()));
if(!m_marked)
{
m_baseline->serialize(var, name, PersistToDB);
}
this->PopKey();
}
void MongoReplicableDeserializer::serialize(mongo::OID* var, String const& name, bool PersistToDB)
{
m_serializer->Reset();
PushKey(name);
MongoSerializer serializer;
serializer % m_current_key;
//Do nothing if this element hasn't changed.
auto it = m_diffs.find(m_current_key);
if(it != m_diffs.end())
{
m_serializer->Data(it->second);
m_serializer->serialize(var, name, PersistToDB);
m_diffs.erase(it);
}
PopKey();
}
template class ReplicableSerializer<MongoSerializer>;
template class ReplicableDeserializer<MongoDeserializer>;
}
}
| # | Change | User | Description | Committed | |
|---|---|---|---|---|---|
| #1 | 23398 | ququlala | "Forking branch Mainline of shadauxcat-libsprawl to ququlala-libsprawl." | ||
| //guest/ShadauxCat/Sprawl/Mainline/serialization/mongo/MongoReplicable.cpp | |||||
| #2 | 14783 | ShadauxCat |
Style corrections (placement of const) #review-14784 |
||
| #1 | 11496 | ShadauxCat | Initial checkin: Current states for csbuild and libSprawl | ||