Commit 86ee4c17 authored by Martin Vala's avatar Martin Vala

Super cleanup in NodeZyre.

Nodes should print enter,exit,whisper
parent c852abeb
......@@ -35,6 +35,7 @@
# VS
.vscode/*
!.vscode/settings.json
!.vscode/launch.json
# Project specific
pars
......
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "(gdb) Launch",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/build/src/salsa",
"args": [
"-c",
"${workspaceFolder}/etc/salsa.json",
"-n",
"rdrG",
"--debug"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": true,
"MIMode": "gdb",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
}
]
}
......@@ -3,6 +3,7 @@
"files.insertFinalNewline": true,
"C_Cpp.clang_format_path": "/usr/bin/clang-format",
"C_Cpp.clang_format_fallbackStyle": "LLVM",
"C_Cpp.errorSquiggles": "Disabled",
"editor.formatOnSave": true,
"files.associations": {
"*.C": "cpp",
......@@ -58,6 +59,11 @@
"hash_map": "cpp",
"hash_set": "cpp",
"unordered_set": "cpp",
"numeric": "cpp"
}
"numeric": "cpp",
"filesystem": "cpp",
"list": "cpp",
"random": "cpp"
},
"C_Cpp.intelliSenseEngineFallback": "Disabled",
"C_Cpp.configurationWarnings": "Disabled"
}
......@@ -68,6 +68,7 @@ endif()
add_subdirectory(base)
add_subdirectory(zmq)
add_subdirectory(src)
add_subdirectory(src/test)
add_subdirectory(doc)
if (CMAKE_INSTALL_PREFIX STREQUAL "/usr")
......
# SALSA - Scalable Adaptive Large Structures Analysis
The goal of the project is to develop framework, that will distribute large job between workers and process it in parallel.
The goal of the project is to develop framework, that will distribute large job between workers and process it in parallel.
Framework should be able to adapt to different conditions in order to analyse data as effectively as possible.
# Installation
Install `OpenBrain` repository
```
yum install yum-plugin-copr -y
yum copr enable mvala/ob -y
```
Install salsa package
```
yum install salsa -y
```
......@@ -32,21 +34,23 @@ scripts/make.sh
```
### scripts/make.sh options
| Option | Description |
| :-------- | :--------------------------------------------------------------- |
| `install` | Install after compilation. |
| `clean` | Destroy build folder before building. |
| `clang` | Use Clang compiler instead of system default. |
| `strict` | Enable all warnings (requires `clang`). Not `-Werror`! |
| `ninja` | Use Ninja build system instead of system default. |
| `doc` | Build documentation (via Doxygen) |
| Option | Description |
| :-------- | :------------------------------------------------------------------------------------------- |
| `install` | Install after compilation. |
| `clean` | Destroy build folder before building. |
| `clang` | Use Clang compiler instead of system default. |
| `strict` | Enable all warnings (requires `clang`). Not `-Werror`! |
| `ninja` | Use Ninja build system instead of system default. |
| `doc` | Build documentation (via Doxygen) |
| `release` | Build as release. (This is used in pipeline.) Infers `doc` and `strict`. Warnings as errors. |
`strict` option was created for development purposes, so you can compile with all warnings, while not failing build (no `-Werror`).
If you are contributing, make sure that you pass compilation with at least `release` option. Even better if you pass with `release clang` (thanks to Clang's `-Weverything`).
## Example running master-worker arch
```bash
./build/src/salsa -c etc/salsa.json -nH_1m,H_1w:4 # Runs 4 workers. Beware! Every worker gets all cores of machine (thus overloading it)!
./build/src/salsa -n master,worker:4 # Runs 4 workers. Beware! Every worker gets all cores of machine (thus overloading it)!
./build/src/salsa-feeder path/to/list # Command list is one command per line.
```
......@@ -9,13 +9,7 @@ set (MY_INCLUDE_DIRS
${PROTOBUF_INCLUDE_DIR}
)
set (MY_LINK_DIRS
${CMAKE_CURRENT_BINARY_DIR}
${PROTOBUF_LIBRARIES}
)
include_directories( ${MY_INCLUDE_DIRS} )
link_directories( "${MY_LINK_DIRS}")
add_library (${PACKAGE} ${SRCS} ${PROTO_SRCS} ${PROTO_HDRS})
target_link_libraries (${PACKAGE} ${JSONCPP_LIBRARIES} ${PROTOBUF_LIBRARIES} SalsaProto)
......@@ -23,8 +17,6 @@ target_link_libraries (${PACKAGE} ${JSONCPP_LIBRARIES} ${PROTOBUF_LIBRARIES} Sal
file(GLOB HDRS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.hh" )
set_target_properties(${PACKAGE} PROPERTIES PUBLIC_HEADER "${HDRS}")
# message("${HDRS}")
install(TARGETS ${PACKAGE}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT LIBRARY
......
#include "Consumer.hh"
#include "NodeManager.hh"
namespace Salsa
{
Consumer::Consumer(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm)
: Distributor(uuid, pipe, nm)
{
///
/// Constructor
///
}
Consumer::~Consumer()
{
///
/// Destructor
///
}
void Consumer::onEnter(Message * inMsg, std::vector<std::string> & out, std::string /*type*/)
{
///
/// onEnter
///
std::vector<std::string> & in = inMsg->content();
for (auto s : in)
{
SPD_TRACE("::onEnter IN [{}]", s.c_str());
}
out.push_back("AUTHOK");
SPD_TRACE("AUTHOK");
}
void Consumer::onExit(Message * inMsg, std::vector<std::string> & /*out*/)
{
///
/// onExit
///
std::vector<std::string> & in = inMsg->content();
for (auto s : in)
{
SPD_TRACE("::onExit IN [{}]", s.c_str());
}
SPD_TRACE("Handling EXIT from [{}]", inMsg->uuid());
std::vector<std::string> jobs;
mNodeManager->jobs(inMsg->uuid(), jobs);
for (auto j : jobs)
{
SPD_TRACE("Terminating job [{}] from upstream [{}]", j, inMsg->uuid());
mNodeManager->terminateJob(j);
}
}
void Consumer::onWhisper(Message * inMsg, std::vector<std::string> & out)
{
///
/// onWhisper
///
std::vector<std::string> & in = inMsg->content();
for (auto s : in)
{
SPD_TRACE("::onWhisper IN [{}]", s.c_str());
}
if (in[0] == "START")
{
int32_t n = 1;
n = mNodeManager->nSlots(1.5);
if (n != 0)
{
out.push_back("SENDTASKS");
out.push_back(fmt::format("{}", n));
SPD_TRACE("SENDTASKS");
}
}
else if (in[0] == "TASK")
{
SPD_TRACE("TASK");
Salsa::TaskInfo * ti = new Salsa::TaskInfo();
{
if (!ti->ParseFromString(in[1].c_str()))
{
SPD_ERROR("Message does not contain ProtoBuf message!");
return;
}
}
// TODO : now we need to tell all feeders that theat they should subscribe to workers
// (probably in addTask when creating new Job)
mNodeManager->addTask(ti, mUUID, inMsg->uuid());
}
else if (in[0] == "NOMORETASKS")
{
SPD_DEBUG("WE NEED TO SET haveMoreTasks to false");
mNodeManager->noMoreTasks(in[1]);
}
else
{
out.push_back("START");
SPD_TRACE("START");
}
}
} // namespace Salsa
#pragma once
#include <Distributor.hh>
#include <TaskInfo.pb.h>
namespace Salsa
{
///
/// \class Consumer
///
/// \brief Base Consumer class
/// \author Matej Fedor <matej.fedor.mf@gmail.com>
/// \author Martin Vala <mvala@cern.ch>
/// \author Branislav Beke <beke.public@yandex.com>
///
class Consumer : public Distributor
{
public:
Consumer(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm);
virtual ~Consumer();
void onEnter(Message * inMsg, std::vector<std::string> & out, std::string type);
void onExit(Message * inMsg, std::vector<std::string> & out);
void onWhisper(Message * inMsg, std::vector<std::string> & out);
// void resultTask(std::string uuid, Salsa::TaskInfo * task);
};
} // namespace Salsa
#include "Distributor.hh"
namespace Salsa
{
Distributor::Distributor(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm)
: Object()
, mUUID(uuid)
, mPipe(pipe)
, mNodeManager(nm)
{
///
/// Constructor
///
}
Distributor::~Distributor()
{
///
/// Destructor
///
delete mNodeInfo;
}
void Distributor::addClient(std::string uuid, std::string type)
{
///
/// Adds client
///
mClients.insert(std::make_pair(uuid, type));
}
void Distributor::removeClient(std::string uuid)
{
///
/// Remove client
///
mClients.erase(uuid);
}
void Distributor::addOther(std::string uuid, std::string type)
{
///
/// Adds other
///
mOthers.insert(std::make_pair(uuid, type));
}
void Distributor::removeOther(std::string uuid)
{
///
/// Remove other
///
mOthers.erase(uuid);
}
void Distributor::print() const
{
///
/// Prints info
///
SPD_DEBUG("clients [{}] others [{}] pipe [{}]",
mClients.size(),
mOthers.size(),
static_cast<void *>(mPipe.get()));
}
void Distributor::onEnter(Message * /*inMsg*/,
std::vector<std::string> & /*out*/,
std::string /*type*/)
{
///
/// onEnter
///
}
void Distributor::onExit(Message * /*inMsg*/, std::vector<std::string> & /*out*/)
{
///
/// onExit
///
}
void Distributor::onWhisper(Message * /*inMsg*/, std::vector<std::string> & /*out*/)
{
///
/// onWhisper
///
}
std::shared_ptr<Socket> Distributor::pipe() const
{
///
/// Returns pipe socket
///
return mPipe;
}
std::string Distributor::uuid() const
{
///
/// Returns uuid
///
return std::move(mUUID);
}
NodeInfo * Distributor::nodeInfo() const
{
///
/// Returns node info
///
return mNodeInfo;
}
} // namespace Salsa
#pragma once
#include <NodeInfo.pb.h>
#include <Socket.hh>
#include <map>
namespace Salsa
{
///
/// \class Distributor
///
/// \brief Base Distributor class
/// \author Matej Fedor <matej.fedor.mf@gmail.com>
/// \author Martin Vala <mvala@cern.ch>
/// \author Branislav Beke <beke.public@yandex.com>
///
class NodeManager;
class Distributor : public Object
{
public:
Distributor(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm);
virtual ~Distributor();
std::string uuid() const;
std::shared_ptr<Socket> pipe() const;
void print() const;
void addClient(std::string uuid, std::string type);
void removeClient(std::string uuid);
void addOther(std::string uuid, std::string type);
void removeOther(std::string uuid);
NodeInfo * nodeInfo() const;
virtual void onEnter(Message * inMsg, std::vector<std::string> & out, std::string type);
virtual void onExit(Message * inMsg, std::vector<std::string> & out);
virtual void onWhisper(Message * inMsg, std::vector<std::string> & out);
protected:
std::string mUUID{}; ///< Self uuid
std::shared_ptr<Socket> mPipe = nullptr; ///< Pipe for messages
std::map<std::string, std::string> mClients{}; ///< List of clients
std::map<std::string, std::string> mOthers{}; ///< List of others
NodeManager * mNodeManager = nullptr; ///< Node Manager
NodeInfo * mNodeInfo{new NodeInfo()}; ///< Node Info
};
} // namespace Salsa
#include "Feeder.hh"
#include "NodeManager.hh"
using namespace fmt::literals;
namespace Salsa
{
Feeder::Feeder(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm)
: Distributor(uuid, pipe, nm)
{
///
/// Constructor
///
mNodeInfo->set_uuid(mUUID);
}
Feeder::~Feeder()
{
///
/// Destructor
///
}
void Feeder::onEnter(Message * /*inMsg*/, std::vector<std::string> & out, std::string type)
{
///
/// onEnter
///
if (type == "WORKER" && mNodeManager->hasJobs())
{
out.push_back("SUB");
}
}
void Feeder::onExit(Message * inMsg, std::vector<std::string> & /*out*/)
{
///
/// onExit
///
uint32_t slots = 0;
uint32_t i = 0;
bool found = false;
for (auto m : mNodeInfo->hosts())
{
if (inMsg->uuid() == m.uuid())
{
found = true;
}
else
{
slots += m.slots();
if (!found)
i++;
}
}
mNodeInfo->mutable_hosts()->DeleteSubrange(i, 1);
mNodeInfo->set_slots(slots);
// mNodeInfo->PrintDebugString();
}
void Feeder::onWhisper(Message * inMsg, std::vector<std::string> & out)
{
///
/// onWhisper
///
std::vector<std::string> inContent = inMsg->content();
SPD_TRACE("::onWhisper inMSG [{}]", inContent[0]);
if (inContent[0] == "FREESLOT")
{
SPD_TRACE("Searching for task in one of jobs");
Salsa::TaskInfo * task = mNodeManager->getNextTask();
if (task == nullptr)
{
SPD_TRACE("Sending back NOMORETASKS");
out.push_back("NOMORETASKS");
out.push_back(inContent[1]);
}
else
{
out.push_back("TASK");
std::string payload;
task->SerializeToString(&payload);
out.push_back(payload);
out.push_back(inContent[1]);
}
}
else if (inContent[0] == "TASK_RESULT")
{
std::string payload = inContent[1];
Salsa::TaskInfo * task = new Salsa::TaskInfo();
{
if (!task->ParseFromString(payload))
{
SPD_ERROR("Message does not contain ProtoBuf message!");
for (auto s : inContent)
{
SPD_ERROR("::onWhisper inMSG [{}]", s);
}
return;
}
}
SPD_TRACE("ResultTask for JOB [{}]", task->jobid());
// task is deleted in next line
mNodeManager->resultTask(task);
}
else if (inContent[0] == "NODEINFO")
{
std::string payload = inContent[1];
Salsa::NodeInfo * ni = mNodeInfo->add_hosts();
if (!ni->ParseFromString(payload))
{
SPD_ERROR("[NodeInfo] Message does not contain ProtoBuf message!");
}
uint32_t slots = 0;
for (auto m : mNodeInfo->hosts())
{
slots += m.slots();
}
mNodeInfo->set_slots(slots);
// mNodeInfo->PrintDebugString();
}
}
void Feeder::subscribe(std::string uuid)
{
///
/// subscribe
///
SPD_INFO("Client [{}] started", uuid);
SPD_TRACE("Feeders -> [{}]", mClients.size());
for (auto w : mClients)
{
std::vector<std::string> out;
out.push_back("SUB");
mNodeManager->sendWhisper(pipe().get(), w.first, out);
}
}
void Feeder::terminateJob(std::string uuid)
{
///
/// Terminate job
///
for (auto w : mClients)
{
std::vector<std::string> out;
out.push_back("TERMINATEJOB");
out.push_back(uuid);
mNodeManager->sendWhisper(pipe().get(), w.first, out);
}
SPD_INFO("JOB [{}] has finished", uuid);
}
} // namespace Salsa
#pragma once
#include <Distributor.hh>
namespace Salsa
{
///
/// \class Feeder
///
/// \brief Base Feeder class
/// \author Matej Fedor <matej.fedor.mf@gmail.com>
/// \author Martin Vala <mvala@cern.ch>
/// \author Branislav Beke <beke.public@yandex.com>
///
class Feeder : public Distributor
{
public:
Feeder(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm);
virtual ~Feeder();
virtual void onEnter(Message * inMsg, std::vector<std::string> & out, std::string type);
virtual void onExit(Message * inMsg, std::vector<std::string> & out);
virtual void onWhisper(Message * inMsg, std::vector<std::string> & out);
void subscribe(std::string uuid);
void terminateJob(std::string uuid);
};
} // namespace Salsa
......@@ -14,8 +14,8 @@ int nVertex; // number vertex
HyperCube::HyperCube(int power, int start)
: Object()
, _power(power)
, _start(start)
, mPower(power)
, mStart(start)
{
nVertex = pow(2, power);
......@@ -41,16 +41,16 @@ void HyperCube::createAdjMatrix()
std::bitset<64> b(i);
std::string bits = b.to_string<char, std::char_traits<char>, std::allocator<char>>();
bits.erase(0, 64 - (_power + 1));
bits.erase(0, 64 - (mPower + 1));
bitsVector[i] = bits;
}
_adjMatrix.resize(nVertex);
mAdjMatrix.resize(nVertex);
for (std::size_t i = 0; i < bitsVector.size(); i++)
{
_adjMatrix[i].resize(nVertex);
mAdjMatrix[i].resize(nVertex);
for (std::size_t j = 0; j < bitsVector.size(); j++)
{
int diff = 0;
......@@ -60,7 +60,7 @@ void HyperCube::createAdjMatrix()
diff++;
}
if (diff == 1)
_adjMatrix[i][j] = 1;
mAdjMatrix[i][j] = 1;
}
}
}
......@@ -70,34 +70,34 @@ void HyperCube::creatPaths()
bool flag = false;
std::vector<int> initVector = {_start};
_paths.push_back(initVector);
_passedNodes.push_back(_start);
std::vector<int> initVector = {mStart};
mPaths.push_back(initVector);
mPassedNodes.push_back(mStart);
do
{
std::vector<int> tmpVector;
for (std::size_t i = 0; i < _paths[_paths.size() - 1].size(); i++)
for (std::size_t i = 0; i < mPaths[mPaths.size() - 1].size(); i++)
{
std::size_t pos = _paths[_paths.size() - 1][i] - 1;
for (std::size_t j = 0; j < _adjMatrix[pos].size(); j++)
std::size_t pos = mPaths[mPaths.size() - 1][i] - 1;
for (std::size_t j = 0; j < mAdjMatrix[pos].size(); j++)
{
bool passed = std::find(_passedNodes.begin(), _passedNodes.end(), j + 1) !=
_passedNodes.end();
bool passed = std::find(mPassedNodes.begin(), mPassedNodes.end(), j + 1) !=
mPassedNodes.end();
if (_adjMatrix[pos][j] == 1 && !passed)
if (mAdjMatrix[pos][j] == 1 && !passed)
{
tmpVector.push_back(j + 1);
_passedNodes.push_back(j + 1);
mPassedNodes.push_back(j + 1);
}
}
}
if (tmpVector.size() > 0)
{
_paths.push_back(tmpVector);
mPaths.push_back(tmpVector);
flag = true;
}
else
......@@ -109,22 +109,22 @@ void HyperCube::creatPaths()
void HyperCube::print() const
{
for (std::size_t i = 0; i < _adjMatrix.size(); ++i)
for (std::size_t i = 0; i < mAdjMatrix.size(); ++i)
{
std::string s;
for (std::size_t j = 0; j < _adjMatrix[i].size(); ++j)
for (std::size_t j = 0; j < mAdjMatrix[i].size(); ++j)
{
s.append(std::to_string(_adjMatrix[i][j]));