Commit 12dd6ed3 authored by Branislav Beke's avatar Branislav Beke 🙃

Merge branch '53-publish-redirector-state' into 'master'

Resolve "Publish redirector state"

Closes #53

See merge request !36
parents b1ab932a 53824a0e
Pipeline #1472 passed with stage
in 15 minutes and 33 seconds
......@@ -51,6 +51,14 @@ If you are contributing, make sure that you pass compilation with at least `rele
## Example running master-worker arch
```bash
./build/src/salsa -n master,worker:4 # Runs 4 workers. Beware! Every worker gets all cores of machine (thus overloading it)!
./build/src/salsa-feeder path/to/list # Command list is one command per line.
$SALSA_ROOT/build/src/salsa -c $SALSA_ROOT/etc/salsa.json -n rdr,worker:4 # Runs 4 workers. Beware! Every worker gets all cores of machine (thus overloading it)!
$SALSA_ROOT/build/src/salsa-feeder path/to/list # Command list is one command per line.
```
## Publish
Run before (on rdr)
```
export SALSA_PUB_URL=">tcp://localhost:5555"
```
......@@ -105,6 +105,9 @@ void Feeder::onWhisper(Message * inMsg, std::vector<std::string> & out)
Job * j = mNodeManager->job(task->jobid());
if (j)
{
if (j->isTaskInQueue(task->taskid(), Salsa::Job::assigned))
j->moveTask(task->taskid(), Salsa::Job::assigned, Salsa::Job::running);
std::shared_ptr<Consumer> c = mNodeManager->consumer(j->consumer());
std::vector<std::string> o;
o.push_back(inContent[0]);
......
......@@ -147,6 +147,22 @@ void Job::print() const
SPD_TRACE("Feeder [{}] Consumer [{}]", mFeederUUID, mConsumerUUID);
}
void Job::json(Json::Value & json)
{
///
/// Export data in json format
///
Json::Value d;
d["name"] = mUUID;
d["P"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::pending].size());
d["A"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::assigned].size());
d["R"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::running].size());
d["D"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::done].size());
d["F"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::failed].size());
json.append(d);
}
int Job::size(QueueType t)
{
///
......
......@@ -2,7 +2,7 @@
#include <Object.hh>
#include <TaskInfo.pb.h>
#include <json/json.h>
namespace Salsa
{
///
......@@ -31,6 +31,7 @@ public:
virtual ~Job();
void print() const;
void json(Json::Value & json);
Salsa::TaskInfo * nextJob();
void tasks(std::vector<Salsa::TaskInfo *> & v, QueueType type);
bool addTask(uint32_t id, Salsa::TaskInfo * job, QueueType q);
......
#include "NodeManager.hh"
#include <json/json.h>
namespace Salsa
{
NodeManager::NodeManager()
......@@ -23,6 +25,7 @@ NodeManager::~NodeManager()
}
mJobs.clear();
delete mTaskPool;
delete mPublisher;
}
void NodeManager::print(std::string /*opt*/) const
......@@ -307,11 +310,11 @@ Salsa::TaskInfo * NodeManager::getNextTask()
search->first,
task->jobid(),
task->taskid());
print();
// print();
return task;
}
}
print();
// print();
// removing jobstring from mActiveJobs
mActiveJobs.erase(std::remove(begin(mActiveJobs), end(mActiveJobs), jobstr),
end(mActiveJobs));
......@@ -326,39 +329,42 @@ void NodeManager::resultTask(Salsa::TaskInfo * task)
/// Handle result of task
///
auto search = mJobs.find(task->jobid());
if (search != mJobs.end())
Job * j = job(task->jobid());
if (j == nullptr)
return;
SPD_TRACE("TASK ENDED JOB[{}:{}]", task->jobid(), task->taskid());
// search->second->moveTask(task->taskid(), task, Salsa::Job::running, Salsa::Job::done);
if (j->isTaskInQueue(task->taskid(), Salsa::Job::assigned))
j->removeTask(task->taskid(), Salsa::Job::assigned);
else
j->removeTask(task->taskid(), Salsa::Job::running);
std::shared_ptr<Consumer> c = consumer(j->consumer());
std::vector<std::string> out;
out.push_back("TASK_RESULT");
std::string payload;
task->SerializeToString(&payload);
out.push_back(payload);
int32_t slots = nSlots();
// TODO only for testing, REMOVE IT later
if (getenv("SALSA_FAKE"))
slots *= 10;
if (j->size(Job::pending) < slots)
{
SPD_TRACE("TASK ENDED JOB[{}:{}]", task->jobid(), task->taskid());
// search->second->moveTask(task->taskid(), task, Salsa::Job::running, Salsa::Job::done);
search->second->removeTask(task->taskid(), Salsa::Job::running);
std::shared_ptr<Consumer> c = consumer(search->second->consumer());
std::vector<std::string> out;
out.push_back("TASK_RESULT");
std::string payload;
task->SerializeToString(&payload);
out.push_back(payload);
int32_t slots = nSlots();
// TODO only for testing, REMOVE IT later
if (getenv("SALSA_FAKE"))
slots *= 10;
if (search->second->size(Job::pending) < slots)
if (j->haveMoreTasks())
{
if (search->second->haveMoreTasks())
{
SPD_DEBUG("We are requesting new tasks [{}] haveMotreTasks [{}]",
slots,
search->second->haveMoreTasks());
out.push_back("&");
out.push_back("SENDTASKS");
out.push_back(fmt::format("{}", slots));
}
SPD_DEBUG(
"We are requesting new tasks [{}] haveMotreTasks [{}]", slots, j->haveMoreTasks());
out.push_back("&");
out.push_back("SENDTASKS");
out.push_back(fmt::format("{}", slots));
}
sendWhisper(c->pipe().get(), search->second->feeder(), out);
}
sendWhisper(c->pipe().get(), j->feeder(), out);
}
void NodeManager::terminateJob(std::string uuid)
......@@ -527,4 +533,45 @@ bool NodeManager::haveMoreTasks(std::string jobuuid)
return false;
}
void NodeManager::publisher(Publisher * p)
{
///
/// Sets publisher
///
mPublisher = p;
}
Publisher * NodeManager::publisher() const
{
///
/// Returns publisher
///
return mPublisher;
}
void NodeManager::publish() const
{
///
/// Publishes node manager state
///
if (!mPublisher)
return;
if (mJobs.size() == 0)
return;
std::string name = "xxx";
Json::Value json;
for (auto j : mJobs)
{
j.second->json(json["tasks"]);
}
Json::StreamWriterBuilder wbuilder;
wbuilder["indentation"] = "";
std::string data = Json::writeString(wbuilder, json);
print();
SPD_TRACE("Publish name [{}] data [{}] ", name, data);
mPublisher->publish(name, data);
}
} // namespace Salsa
......@@ -3,6 +3,7 @@
#include <Feeder.hh>
#include <Job.hh>
#include <Object.hh>
#include <Publisher.hh>
#include <Socket.hh>
#include <TaskPool.hh>
#include <Worker.hh>
......@@ -55,12 +56,17 @@ public:
virtual bool handleTaskPool(void * p);
virtual bool sendWhisper(Socket * s, std::string to, std::vector<std::string> & v);
virtual void publisher(Publisher * p);
virtual Publisher * publisher() const;
virtual void publish() const;
protected:
std::map<std::string, Job *> mJobs{}; ///< List of jobs
std::vector<std::string> mActiveJobs{}; ///< List of active jobs
std::map<std::string, std::shared_ptr<Worker>> mWorkers{}; ///< List of Workers
std::map<std::string, std::shared_ptr<Consumer>> mConsumers{}; ///< List of Consumers
std::map<std::string, std::shared_ptr<Feeder>> mFeeders{}; ///< List of Feeders
TaskPool * mTaskPool = nullptr; ///< Task pool
std::map<std::string, Job *> mJobs{}; ///< List of jobs
std::vector<std::string> mActiveJobs{}; ///< List of active jobs
std::map<std::string, std::shared_ptr<Worker>> mWorkers{}; ///< List of Workers
std::map<std::string, std::shared_ptr<Consumer>> mConsumers{}; ///< List of Consumers
std::map<std::string, std::shared_ptr<Feeder>> mFeeders{}; ///< List of Feeders
TaskPool * mTaskPool = nullptr; ///< Task pool
Publisher * mPublisher = nullptr; ///< Publisher
};
} // namespace Salsa
......@@ -391,7 +391,7 @@ std::shared_ptr<Salsa::Node> create_root_node( //
}
else if (props.mType == "gossip")
{
SPD_TRACE("Using gossip : [{}] [{}]...", props.mDiscovery, props.mUrl);
// SPD_TRACE("Using gossip : [{}] [{}]...", props.mDiscovery, props.mUrl);
SPD_INFO("Using gossip : [{}] [{}]...", props.mDiscovery, props.mUrl);
if (!props.mUrl.empty())
zyre_set_endpoint(pSocketZyre->zyre(), "%s", props.mUrl.c_str());
......
......@@ -44,12 +44,12 @@ int main(int argc, char ** args)
nextOption = getopt_long(argc, args, short_options.data(), long_options, nullptr);
}
SPD_TRACE("Starting {} v{}.{}.{}-{} ...",
SALSA_NAME,
SALSA_VERSION_MAJOR(SALSA_VERSION),
SALSA_VERSION_MINOR(SALSA_VERSION),
SALSA_VERSION_PATCH(SALSA_VERSION),
SALSA_VERSION_RELEASE);
SPD_INFO("Starting {} v{}.{}.{}-{} ...",
SALSA_NAME,
SALSA_VERSION_MAJOR(SALSA_VERSION),
SALSA_VERSION_MINOR(SALSA_VERSION),
SALSA_VERSION_PATCH(SALSA_VERSION),
SALSA_VERSION_RELEASE);
// MAIN
handle_debug_from_env(debugLevel);
......@@ -57,7 +57,7 @@ int main(int argc, char ** args)
// Adding '@' to bind
url.insert(0, 1, '@');
SPD_TRACE("Listening at '{}' with subscription '{}'", url.data(), subscribe.data());
SPD_INFO("Listening at '{}' with subscription '{}'", url.data(), subscribe.data());
// Creating socket
zsock_t * sub = zsock_new_sub(url.data(), subscribe.data());
......@@ -67,8 +67,12 @@ int main(int argc, char ** args)
msg = zmsg_recv(sub);
if (!msg)
continue;
zmsg_print(msg);
char * name = zmsg_popstr(msg);
char * data = zmsg_popstr(msg);
SPD_INFO("name [{}] data [{}]", name, data);
zmsg_destroy(&msg);
free(name);
free(data);
}
// Clean socket
......
#include "NodeManagerZyre.hh"
#include <MessageZyre.hh>
#include <PublisherZmq.hh>
#include <SocketZyre.hh>
#include <TaskExecutorFake.hh>
#include <TaskExecutorForkZmq.hh>
......@@ -13,7 +14,11 @@ NodeManagerZyre::NodeManagerZyre(NodeZyre * nz)
/// Constructor
///
// mNodeInfo.set_uuid();
char * pubUrl = getenv("SALSA_PUB_URL");
if (pubUrl)
{
mPublisher = new PublisherZmq(pubUrl);
}
}
NodeManagerZyre::~NodeManagerZyre()
{
......
......@@ -161,6 +161,7 @@ int NodeZyre::exec()
mNodeManager->onWhisper(zyre_uuid(pSocket->zyre()), msg);
}
mNodeManager->publish();
delete msg;
pSocket = nullptr;
}
......
......@@ -9,7 +9,7 @@ PublisherZmq::PublisherZmq(std::string url)
/// Constructor
///
mpSocket = zsock_new_pub(Publisher::mURL.c_str());
mpSocket = zsock_new_pub(url.c_str());
}
PublisherZmq::~PublisherZmq()
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment