Commit 53c43ece authored by Branislav Beke's avatar Branislav Beke 🙃

Merge branch '44-handle-worker-kill' into 'master'

Resolve "Handle worker kill"

Closes #44

See merge request !38
parents a4be0314 985bc747
Pipeline #1485 passed with stage
in 15 minutes and 29 seconds
......@@ -62,7 +62,11 @@
"numeric": "cpp",
"filesystem": "cpp",
"list": "cpp",
"random": "cpp"
"random": "cpp",
"bit": "cpp",
"map": "cpp",
"set": "cpp",
"string": "cpp"
},
"C_Cpp.intelliSenseEngineFallback": "Disabled",
"C_Cpp.configurationWarnings": "Disabled"
......
......@@ -95,7 +95,6 @@ void Consumer::onWhisper(Message * inMsg, std::vector<std::string> & out)
}
else if (in[0] == "NOMORETASKS")
{
SPD_DEBUG("WE NEED TO SET haveMoreTasks to false");
mNodeManager->noMoreTasks(in[1]);
}
else
......
......@@ -54,7 +54,45 @@ void Feeder::onExit(Message * inMsg, std::vector<std::string> & /*out*/)
}
mNodeInfo->mutable_hosts()->DeleteSubrange(i, 1);
mNodeInfo->set_slots(slots);
// mNodeInfo->PrintDebugString();
SPD_TRACE("::onExit inMSG [{}]", inMsg->uuid());
for (auto t : mWorkerTasks[inMsg->uuid()])
{
SPD_WARN("WORKER [{}] exit. Moving task [{}:{}] to Job::pending",
inMsg->uuid(),
t->jobid(),
t->taskid());
Job * j = mNodeManager->job(t->jobid());
if (j)
{
if (j->isTaskInQueue(t->taskid(), Salsa::Job::running))
j->moveTask(t->taskid(), Salsa::Job::running, Salsa::Job::pending);
else if (j->isTaskInQueue(t->taskid(), Salsa::Job::assigned))
j->moveTask(t->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
if (!getenv("SALSA_FAST"))
{
std::shared_ptr<Consumer> c = mNodeManager->consumer(j->consumer());
std::vector<std::string> o;
o.push_back("JOBRESUBMITED");
std::string payload;
t->SerializeToString(&payload);
o.push_back(payload);
mNodeManager->sendWhisper(c->pipe().get(), j->feeder(), o);
}
}
}
mWorkerTasks[inMsg->uuid()].clear();
mWorkerTasks.erase(inMsg->uuid());
mClients.erase(inMsg->uuid());
if (mNodeManager->haveMoreTasks())
{
subscribe(inMsg->uuid());
}
mNodeManager->print();
}
void Feeder::onWhisper(Message * inMsg, std::vector<std::string> & out)
{
......@@ -82,6 +120,10 @@ void Feeder::onWhisper(Message * inMsg, std::vector<std::string> & out)
task->SerializeToString(&payload);
out.push_back(payload);
out.push_back(inContent[1]);
mWorkerTasks[inMsg->uuid()].push_back(task);
SPD_TRACE("mWorkerTasks[{}] vector size [{}]",
inMsg->uuid(),
mWorkerTasks[inMsg->uuid()].size());
}
}
else if (inContent[0] == "TASK_IS_RUNNING")
......@@ -132,7 +174,18 @@ void Feeder::onWhisper(Message * inMsg, std::vector<std::string> & out)
}
}
SPD_TRACE("ResultTask for JOB [{}]", task->jobid());
// TODO Simplify this
int i = 0;
for (auto t : mWorkerTasks[inMsg->uuid()])
{
if (t->taskid() == task->taskid())
{
mWorkerTasks[inMsg->uuid()].erase(mWorkerTasks[inMsg->uuid()].begin() + i);
break;
}
i++;
}
// task is deleted in next line
mNodeManager->resultTask(task);
}
......
#pragma once
#include <Distributor.hh>
#include <TaskInfo.pb.h>
namespace Salsa
{
......@@ -25,5 +26,8 @@ public:
void subscribe(std::string uuid);
void terminateJob(std::string uuid);
protected:
std::map<std::string, std::vector<TaskInfo *>> mWorkerTasks{}; ///< Worker tasks
};
} // namespace Salsa
......@@ -296,6 +296,7 @@ Salsa::TaskInfo * NodeManager::getNextTask()
/// Return Next task from job
///
Salsa::TaskInfo * task = nullptr;
SPD_TRACE("mActiveJobs.size() [{}]", mActiveJobs.size());
while (mActiveJobs.size() > 0 && task == nullptr)
{
int index = rand() % mActiveJobs.size();
......@@ -523,6 +524,32 @@ void NodeManager::noMoreTasks(std::string jobuuid)
}
}
bool NodeManager::haveMoreTasks()
{
///
/// Sets no more tasks to any job
///
bool rc = false;
for (auto j : mJobs)
{
if (j.second->Job::size(Job::pending))
{
j.second->haveMoreTasks(true);
bool isJob = false;
for (auto s : mActiveJobs)
{
if (s == j.first)
isJob = true;
}
if (!isJob)
mActiveJobs.push_back(j.first);
rc = true;
}
}
return rc;
}
bool NodeManager::haveMoreTasks(std::string jobuuid)
{
///
......
......@@ -49,9 +49,11 @@ public:
Salsa::TaskInfo * getNextTask();
void resultTask(Salsa::TaskInfo * task);
void noMoreTasks(std::string jobuuid);
bool haveMoreTasks();
bool haveMoreTasks(std::string jobuuid);
/// Run task interface
virtual void runTask(TaskState * ts, std::string wk, std::string upstream) = 0;
virtual void terminateJob(std::string uuid);
virtual bool handleTaskPool(void * p);
virtual bool sendWhisper(Socket * s, std::string to, std::vector<std::string> & v);
......
......@@ -258,7 +258,6 @@ int main(int argc, char ** argv)
jobs.print();
delete taskInfo;
}
else if (s == "TASK_RESULT")
{
char * payload_str = zmsg_popstr(msg);
......@@ -290,6 +289,32 @@ int main(int argc, char ** argv)
jobs.print();
delete taskInfo;
}
else if (s == "JOBRESUBMITED")
{
char * payload_str = zmsg_popstr(msg);
std::string payload = payload_str;
free(payload_str);
Salsa::TaskInfo * taskInfo = new Salsa::TaskInfo();
{
if (!taskInfo->ParseFromString(payload))
{
SPD_ERROR("Message does not contain ProtoBuf message!");
continue;
}
}
SPD_TRACE("JOBRESUBMITED from [{} JOBID [{}] TASKID [{}] is done rc [{}]",
zyre_event_peer_uuid(event),
taskInfo->jobid(),
taskInfo->taskid(),
taskInfo->returncode());
if (jobs.isTaskInQueue(taskInfo->taskid(), Salsa::Job::running))
jobs.moveTask(taskInfo->taskid(), Salsa::Job::running, Salsa::Job::assigned);
jobs.print();
delete taskInfo;
}
zmsg_destroy(&msg);
}
zyre_event_destroy(&event);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment