Commit 24283d1a authored by Martin Vala's avatar Martin Vala Committed by Branislav Beke

Running is reported

To disable set SALSA_FAST env
parent 157da5d1
#include "Feeder.hh"
#include "NodeManager.hh"
#include <Job.hh>
#include <NodeManager.hh>
using namespace fmt::literals;
namespace Salsa
{
......@@ -83,6 +84,34 @@ void Feeder::onWhisper(Message * inMsg, std::vector<std::string> & out)
out.push_back(inContent[1]);
}
}
else if (inContent[0] == "TASK_IS_RUNNING")
{
SPD_TRACE("TASK_IS_RUNNING");
std::string payload = inContent[1];
Salsa::TaskInfo * task = new Salsa::TaskInfo();
{
if (!task->ParseFromString(payload))
{
SPD_ERROR("Message does not contain ProtoBuf message!");
for (auto s : inContent)
{
SPD_ERROR("::onWhisper inMSG [{}]", s);
}
return;
}
}
Job * j = mNodeManager->job(task->jobid());
if (j)
{
std::shared_ptr<Consumer> c = mNodeManager->consumer(j->consumer());
std::vector<std::string> o;
o.push_back(inContent[0]);
o.push_back(inContent[1]);
mNodeManager->sendWhisper(c->pipe().get(), j->feeder(), o);
}
}
else if (inContent[0] == "TASK_RESULT")
{
std::string payload = inContent[1];
......
......@@ -100,7 +100,7 @@ Salsa::TaskInfo * Job::nextJob()
return nullptr;
Salsa::TaskInfo * ji = j->second;
moveTask(j->first, QueueType::pending, QueueType::running);
moveTask(j->first, QueueType::pending, QueueType::assigned);
return ji;
}
......@@ -117,6 +117,21 @@ void Job::tasks(std::vector<Salsa::TaskInfo *> & v, QueueType type)
mTasks[type].clear();
}
bool Job::isTaskInQueue(uint32_t id, QueueType type) const
{
///
/// Returns if task is i queue
///
auto search = mTasks[type].find(id);
if (search != mTasks[type].end())
{
return true;
}
return false;
}
void Job::print() const
{
///
......
......@@ -45,6 +45,7 @@ public:
std::string feeder() const;
bool haveMoreTasks() const;
void haveMoreTasks(bool hasMoreTasks);
bool isTaskInQueue(uint32_t id, QueueType type) const;
protected:
std::map<uint32_t, Salsa::TaskInfo *> mTasks[all] = {}; ///< Lists of jobs
......
......@@ -205,7 +205,7 @@ bool TaskPool::handlePipe(void * p)
Job * job = mNodeManager->job(ts->task()->jobid());
if (job != nullptr)
{
SPD_DEBUG("TASK ENDED JOB [{}:{}]", ts->task()->jobid(), ts->task()->taskid());
SPD_TRACE("TASK ENDED JOB [{}:{}]", ts->task()->jobid(), ts->task()->taskid());
job->removeTask(ts->task()->taskid(), Salsa::Job::running);
}
std::vector<std::string> out;
......@@ -216,18 +216,18 @@ bool TaskPool::handlePipe(void * p)
ts = findFreeTask();
if (ts && ts->id() > 0)
{
SPD_DEBUG("AFTER TASK_RESULT sending reserving task [{}]", ts->id());
SPD_TRACE("AFTER TASK_RESULT sending reserving task [{}]", ts->id());
out.push_back("&");
out.push_back("FREESLOT");
out.push_back(fmt::format("{}", ts->id()));
ts->state(TaskState::assigned);
}
SPD_DEBUG("Searching to worker [{}]", wkuuid);
SPD_TRACE("Searching to worker [{}]", wkuuid);
std::shared_ptr<Salsa::Worker> wk = mNodeManager->worker(wkuuid);
if (wk)
{
SPD_DEBUG("Sending via wk [{}] to feeder [{}]", wkuuid, upstream);
SPD_TRACE("Sending via wk [{}] to feeder [{}]", wkuuid, upstream);
mNodeManager->sendWhisper(wk->pipe().get(), upstream, out);
}
}
......
......@@ -150,11 +150,18 @@ void Worker::onWhisper(Message * inMsg, std::vector<std::string> & out)
mNodeManager->addTask(task, mUUID, inMsg->uuid(), Salsa::Job::running);
mNodeManager->runTask(ts, mUUID, inMsg->uuid());
if (!getenv("SALSA_FAST"))
{
out.push_back("TASK_IS_RUNNING");
out.push_back(payload);
}
ts = mNodeManager->taskPool()->findFreeTask();
if (ts && ts->id() > 0)
{
SPD_TRACE("AFTER TASK reserving task [{}]", ts->id());
// TODO if free core available then do
out.push_back("&");
out.push_back("FREESLOT");
out.push_back(fmt::format("{}", ts->id()));
ts->state(TaskState::assigned);
......
......@@ -232,6 +232,32 @@ int main(int argc, char ** argv)
jobs.print();
}
}
else if (s == "TASK_IS_RUNNING")
{
char * payload_str = zmsg_popstr(msg);
std::string payload = payload_str;
free(payload_str);
Salsa::TaskInfo * taskInfo = new Salsa::TaskInfo();
{
if (!taskInfo->ParseFromString(payload))
{
SPD_ERROR("Message does not contain ProtoBuf message!");
continue;
}
}
SPD_TRACE("TASK_IS_RUNNING from [{} JOBID [{}] TASKID [{}] is done rc [{}]",
zyre_event_peer_uuid(event),
taskInfo->jobid(),
taskInfo->taskid(),
taskInfo->returncode());
if (jobs.isTaskInQueue(taskInfo->taskid(), Salsa::Job::assigned))
jobs.moveTask(taskInfo->taskid(), Salsa::Job::assigned, Salsa::Job::running);
jobs.print();
delete taskInfo;
}
else if (s == "TASK_RESULT")
{
......@@ -253,8 +279,12 @@ int main(int argc, char ** argv)
taskInfo->jobid(),
taskInfo->taskid(),
taskInfo->returncode());
Salsa::Job::QueueType qt = Salsa::Job::running;
if (jobs.isTaskInQueue(taskInfo->taskid(), Salsa::Job::assigned))
qt = Salsa::Job::assigned;
jobs.moveTask(taskInfo->taskid(),
Salsa::Job::running,
qt,
taskInfo->returncode() == 0 ? Salsa::Job::done : Salsa::Job::failed);
jobs.print();
......
......@@ -120,6 +120,9 @@ bool NodeManagerZyre::sendWhisper(Socket * s, std::string to, std::vector<std::s
{
SocketZyre * sz = static_cast<SocketZyre *>(s);
zmsg_t * m = nullptr;
if (v[0] == "&")
v.erase(v.begin());
for (auto str : v)
{
if (m == nullptr)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment