Commit ddee395b authored by Martin Vala's avatar Martin Vala

Fixing issue when more redirectors

parent 53c43ece
Pipeline #1486 passed with stage
in 15 minutes and 31 seconds
......@@ -36,6 +36,8 @@ void Feeder::onExit(Message * inMsg, std::vector<std::string> & /*out*/)
/// onExit
///
SPD_TRACE("::onExit inMSG [{}]", inMsg->uuid());
uint32_t slots = 0;
uint32_t i = 0;
bool found = false;
......@@ -52,47 +54,50 @@ void Feeder::onExit(Message * inMsg, std::vector<std::string> & /*out*/)
i++;
}
}
mNodeInfo->mutable_hosts()->DeleteSubrange(i, 1);
mNodeInfo->set_slots(slots);
SPD_TRACE("::onExit inMSG [{}]", inMsg->uuid());
for (auto t : mWorkerTasks[inMsg->uuid()])
// TODO: Continue if worker (Better check is needed)
if (found)
{
SPD_WARN("WORKER [{}] exit. Moving task [{}:{}] to Job::pending",
inMsg->uuid(),
t->jobid(),
t->taskid());
Job * j = mNodeManager->job(t->jobid());
if (j)
mNodeInfo->mutable_hosts()->DeleteSubrange(i, 1);
mNodeInfo->set_slots(slots);
for (auto t : mWorkerTasks[inMsg->uuid()])
{
if (j->isTaskInQueue(t->taskid(), Salsa::Job::running))
j->moveTask(t->taskid(), Salsa::Job::running, Salsa::Job::pending);
SPD_WARN("WORKER [{}] exit. Moving task [{}:{}] to Job::pending",
inMsg->uuid(),
t->jobid(),
t->taskid());
Job * j = mNodeManager->job(t->jobid());
if (j)
{
if (j->isTaskInQueue(t->taskid(), Salsa::Job::running))
j->moveTask(t->taskid(), Salsa::Job::running, Salsa::Job::pending);
else if (j->isTaskInQueue(t->taskid(), Salsa::Job::assigned))
j->moveTask(t->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
else if (j->isTaskInQueue(t->taskid(), Salsa::Job::assigned))
j->moveTask(t->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
if (!getenv("SALSA_FAST"))
{
std::shared_ptr<Consumer> c = mNodeManager->consumer(j->consumer());
std::vector<std::string> o;
o.push_back("JOBRESUBMITED");
std::string payload;
t->SerializeToString(&payload);
o.push_back(payload);
mNodeManager->sendWhisper(c->pipe().get(), j->feeder(), o);
if (!getenv("SALSA_FAST"))
{
std::shared_ptr<Consumer> c = mNodeManager->consumer(j->consumer());
std::vector<std::string> o;
o.push_back("JOBRESUBMITED");
std::string payload;
t->SerializeToString(&payload);
o.push_back(payload);
mNodeManager->sendWhisper(c->pipe().get(), j->feeder(), o);
}
}
}
}
mWorkerTasks[inMsg->uuid()].clear();
mWorkerTasks.erase(inMsg->uuid());
mClients.erase(inMsg->uuid());
mWorkerTasks[inMsg->uuid()].clear();
mWorkerTasks.erase(inMsg->uuid());
mClients.erase(inMsg->uuid());
if (mNodeManager->haveMoreTasks())
{
subscribe(inMsg->uuid());
if (mNodeManager->haveMoreTasks())
{
subscribe(inMsg->uuid());
}
mNodeManager->print();
}
mNodeManager->print();
}
void Feeder::onWhisper(Message * inMsg, std::vector<std::string> & out)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment