Commit ad8de65b authored by Martin Vala's avatar Martin Vala

Moved functionality to Executors

parent d35f9902
Pipeline #1444 passed with stage
in 15 minutes and 31 seconds
......@@ -423,12 +423,27 @@ std::shared_ptr<Worker> NodeManager::worker(std::string uuid) const
return nullptr;
}
void NodeManager::reserveTaskSlot()
Job * NodeManager::job(std::string uuid)
{
///
/// Returns job
/// /param uuid UUID
///
auto search = mJobs.find(uuid);
if (search != mJobs.end())
{
return search->second;
}
return nullptr;
}
void NodeManager::addTaskSlot()
{
///
/// Reserve task slot
///
mTaskPool = new TaskPool();
if (mTaskPool == nullptr)
mTaskPool = new TaskPool(this);
}
bool NodeManager::handleTaskPool(void * /*p*/)
......
......@@ -38,9 +38,10 @@ public:
std::shared_ptr<Feeder> feeder(std::string uuid) const;
std::shared_ptr<Consumer> consumer(std::string uuid) const;
std::shared_ptr<Worker> worker(std::string uuid) const;
Job * job(std::string uuid);
TaskPool * taskPool();
virtual void reserveTaskSlot();
virtual void addTaskSlot();
bool hasJobs() const;
int32_t nSlots(double mult = 1.0) const;
void jobs(std::string client_uuid, std::vector<std::string> & jobs) const;
......
......@@ -14,5 +14,27 @@ TaskExecutor::~TaskExecutor()
/// Destructor
///
}
void TaskExecutor::taskState(TaskState * ts)
{
///
/// Sets task state
///
mTaskState = ts;
}
TaskState * TaskExecutor::taskState() const
{
///
/// Returns task state
///
return mTaskState;
}
void * TaskExecutor::pipe() const
{
///
/// Returns pointer to pipe
///
return nullptr;
}
} // namespace Salsa
#pragma once
#include <Object.hh>
#include <TaskState.hh>
namespace Salsa
{
......@@ -12,7 +12,6 @@ namespace Salsa
/// \author Martin Vala <mvala@cern.ch>
/// \author Branislav Beke <beke.public@yandex.com>
///
class TaskExecutor : public Object
{
public:
......@@ -22,10 +21,20 @@ public:
/// Init
virtual bool init() = 0;
/// Assigne
virtual bool assigne() = 0;
virtual bool assigne(std::string, std::string) = 0;
/// Running
virtual bool run() = 0;
/// Exit
virtual bool exit() = 0;
/// Pipe
virtual void * pipe() const;
/// Handle pipe
virtual bool handlePipe(std::vector<std::string> &) = 0;
void taskState(TaskState * ts);
TaskState * taskState() const;
protected:
TaskState * mTaskState = nullptr; ///< Task state
};
} // namespace Salsa
......@@ -21,7 +21,7 @@ bool TaskExecutorFake::init()
///
return true;
}
bool TaskExecutorFake::assigne()
bool TaskExecutorFake::assigne(std::string /*worker*/, std::string /*upstream*/)
{
///
/// Assigne
......@@ -43,4 +43,12 @@ bool TaskExecutorFake::exit()
return true;
}
void * TaskExecutorFake::pipe() const
{
///
/// Returns pipe
///
return nullptr;
}
} // namespace Salsa
......@@ -19,9 +19,10 @@ public:
TaskExecutorFake();
virtual ~TaskExecutorFake();
virtual bool init();
virtual bool assigne();
virtual bool run();
virtual bool exit();
virtual bool init();
virtual bool assigne(std::string, std::string);
virtual bool run();
virtual bool exit();
virtual void * pipe() const;
};
} // namespace Salsa
#include "TaskPool.hh"
#include <Job.hh>
#include <NodeManager.hh>
#include <TaskExecutor.hh>
namespace Salsa
{
TaskPool::TaskPool()
TaskPool::TaskPool(NodeManager * nm)
: Object()
, mNodeManager(nm)
{
///
/// Constructor
......@@ -158,4 +161,79 @@ void TaskPool::print(bool verbose) const
stat[TaskState::running]);
}
bool TaskPool::handlePipe(void * p)
{
///
/// Handle pipe
///
TaskState * ts = find(p);
if (ts == nullptr)
{
SPD_ERROR("ts by actor [{}] is null!!!", static_cast<void *>(p));
return false;
}
if (ts->executor() == nullptr)
{
SPD_ERROR("ts->executor() by actor [{}] is null!!!", static_cast<void *>(p));
return false;
}
if (ts->executor()->pipe() == nullptr)
{
SPD_ERROR("ts->executor()->pipe() by actor [{}] is null!!!", static_cast<void *>(p));
return false;
}
std::vector<std::string> extra;
ts->executor()->handlePipe(extra);
TaskState::State state = ts->state();
if (state == TaskState::assigned)
{
ts->state(TaskState::running);
// handle mJobs
// Nothing for now i think
}
else if (state == TaskState::running)
{
ts->state(TaskState::idle);
ts->pid(0);
std::string wkuuid = extra[0];
std::string upstream = extra[1];
// handle mJobs
Job * job = mNodeManager->job(ts->task()->jobid());
if (job != nullptr)
{
SPD_TRACE("TASK ENDED JOB [{}:{}]", ts->task()->jobid(), ts->task()->taskid());
job->removeTask(ts->task()->taskid(), Salsa::Job::running);
}
std::vector<std::string> out;
out.push_back("TASK_RESULT");
std::string payload;
ts->task()->SerializeToString(&payload);
out.push_back(payload);
ts = findFreeTask();
if (ts && ts->id() > 0)
{
SPD_TRACE("AFTER TASK_RESULT sending reserving task [{}]", ts->id());
out.push_back("&");
out.push_back("FREESLOT");
out.push_back(fmt::format("{}", ts->id()));
ts->state(TaskState::assigned);
}
SPD_TRACE("Searching to worker [{}]", wkuuid);
std::shared_ptr<Salsa::Worker> wk = mNodeManager->worker(wkuuid);
if (wk)
{
SPD_TRACE("Sending via wk [{}] to feeder [{}]", wkuuid, upstream);
mNodeManager->sendWhisper(wk->pipe().get(), upstream, out);
}
}
print();
return true;
}
} // namespace Salsa
......@@ -13,10 +13,11 @@ namespace Salsa
/// \author Branislav Beke <beke.public@yandex.com>
///
class Job;
class NodeManager;
class TaskPool : public Object
{
public:
TaskPool();
TaskPool(NodeManager * nm);
virtual ~TaskPool();
void add(void * p, TaskState * t);
......@@ -26,10 +27,12 @@ public:
void changeState(uint32_t id, TaskState::State state);
uint32_t nSlotFree();
bool terminateJob(Job * job);
bool handlePipe(void * p);
void print(bool verbose = false) const;
protected:
std::map<void *, TaskState *> mTasks{}; ///< List of task slots
std::map<void *, TaskState *> mTasks{}; ///< List of task slots
NodeManager * mNodeManager = nullptr; ///< Node manager
};
} // namespace Salsa
#include "TaskState.hh"
#include <TaskExecutor.hh>
namespace Salsa
{
......
#pragma once
#include <TaskExecutor.hh>
#include <Object.hh>
#include <TaskInfo.pb.h>
namespace Salsa
......@@ -13,7 +13,7 @@ namespace Salsa
/// \author Martin Vala <mvala@cern.ch>
/// \author Branislav Beke <beke.public@yandex.com>
///
class TaskExecutor;
class TaskState : public Object
{
public:
......
......@@ -43,7 +43,7 @@ Worker::Worker(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm)
for (int i = 0; i < mNumCores; i++)
{
SPD_TRACE("Worker [{}] slot [{}]", mUUID, i);
mNodeManager->reserveTaskSlot();
mNodeManager->addTaskSlot();
}
mNodeInfo->set_uuid(mUUID);
mNodeInfo->set_slots(mNumCores);
......
#include "NodeManagerZyre.hh"
#include <MessageZyre.hh>
#include <SocketZyre.hh>
#include <TaskExecutorForkZmq.hh>
#include <TaskStateZmq.hh>
namespace Salsa
{
......@@ -55,17 +56,20 @@ bool NodeManagerZyre::onWhisper(std::string self, Message * msg)
return sendWhisper(s, msg->uuid(), values);
}
void NodeManagerZyre::reserveTaskSlot()
void NodeManagerZyre::addTaskSlot()
{
///
/// Reserve task slot
///
TaskStateZmq * ts = new TaskStateZmq(zactor_new(Salsa::ActorZmq::SalsaActorForkFn, nullptr));
mNodeZyre->pollerZmq()->add(ts->actor());
if (mTaskPool == nullptr)
mTaskPool = new TaskPool();
mTaskPool->add(ts->actor(), ts);
mTaskPool = new TaskPool(this);
zactor_t * act = zactor_new(Salsa::ActorZmq::SalsaActorForkFn, nullptr);
TaskExecutor * te = new TaskExecutorForkZmq(act);
TaskState * ts = new TaskState(te);
te->taskState(ts);
mNodeZyre->pollerZmq()->add(static_cast<zactor_t *>(ts->executor()->pipe()));
mTaskPool->add(ts->executor()->pipe(), ts);
}
void NodeManagerZyre::runTask(TaskState * ts, std::string wk, std::string upstream)
......@@ -77,19 +81,7 @@ void NodeManagerZyre::runTask(TaskState * ts, std::string wk, std::string upstre
SPD_TRACE(
"Task [{}:{}] from upstream [{}]", ts->task()->jobid(), ts->task()->taskid(), upstream);
zmsg_t * outMsg = zmsg_new();
zmsg_addstrf(outMsg, "%s", ts->task()->data().c_str());
zmsg_addstr(outMsg, wk.c_str());
zmsg_addstr(outMsg, upstream.c_str());
zmsg_addstr(outMsg, ts->task()->jobid().c_str());
for (int i = 0; i < ts->task()->logtargets_size(); i++)
{
zmsg_addstrf(outMsg, "%s", ts->task()->logtargets(i).c_str());
}
TaskStateZmq * tsz = static_cast<TaskStateZmq *>(ts);
zsock_send(tsz->actor(), "m", outMsg);
zmsg_destroy(&outMsg);
ts->executor()->assigne(wk, upstream);
}
bool NodeManagerZyre::handleTaskPool(void * p)
......@@ -101,88 +93,7 @@ bool NodeManagerZyre::handleTaskPool(void * p)
if (mTaskPool == nullptr)
return false;
TaskStateZmq * ts = dynamic_cast<TaskStateZmq *>(mTaskPool->find(p));
if (ts == nullptr)
{
SPD_ERROR("ts by actor [{}] is null!!!", static_cast<void *>(p));
return false;
}
zmsg_t * message = zmsg_recv(ts->actor());
if (zframe_streq(zmsg_first(message), "$PID"))
{
char * pidStr = zframe_strdup(zmsg_next(message));
uint32_t pid = static_cast<uint32_t>(strtoul(pidStr, nullptr, 0));
ts->pid(pid);
ts->state(TaskState::State::running);
std::string payload;
ts->task()->SerializeToString(&payload);
SPD_DEBUG(
"JOB [{}:{}] PID [{}] started", ts->task()->jobid(), ts->task()->taskid(), pidStr);
free(pidStr);
}
else if (zframe_streq(zmsg_first(message), "$EXIT"))
{
char * exitStatusStr = zframe_strdup(zmsg_next(message));
uint32_t exitStatus = static_cast<uint32_t>(strtoul(exitStatusStr, nullptr, 0));
free(exitStatusStr);
ts->task()->set_returncode(exitStatus);
SPD_DEBUG("JOB [{}:{}] PID [{}] finished with rc [{}] killed [{}]",
ts->task()->jobid(),
ts->task()->taskid(),
ts->pid(),
ts->task()->returncode(),
ts->state() == TaskState::killed);
ts->state(TaskState::State::idle);
ts->pid(0);
std::string payload;
ts->task()->SerializeToString(&payload);
char * wkuuid = zframe_strdup(zmsg_next(message));
char * upstream = zframe_strdup(zmsg_next(message));
auto search = mJobs.find(ts->task()->jobid());
if (search != mJobs.end())
{
SPD_TRACE("TASK ENDED JOB [{}:{}]", ts->task()->jobid(), ts->task()->taskid());
search->second->removeTask(ts->task()->taskid(), Salsa::Job::running);
}
std::vector<std::string> out;
out.push_back("TASK_RESULT");
out.push_back(payload);
if (mTaskPool)
{
TaskState * ts = mTaskPool->findFreeTask();
if (ts && ts->id() > 0)
{
SPD_TRACE("AFTER TASK_RESULT sending reserving task [{}]", ts->id());
out.push_back("&");
out.push_back("FREESLOT");
out.push_back(fmt::format("{}", ts->id()));
ts->state(TaskState::assigned);
}
SPD_TRACE("Searching to worker [{}]", wkuuid);
std::shared_ptr<Salsa::Worker> wk = worker(wkuuid);
if (wk)
{
SPD_TRACE("Sending via wk [{}] to feeder [{}]", wkuuid, upstream);
sendWhisper(wk->pipe().get(), upstream, out);
}
}
free(wkuuid);
free(upstream);
}
zmsg_destroy(&message);
if (mTaskPool)
mTaskPool->print();
return true;
return mTaskPool->handlePipe(p);
}
bool NodeManagerZyre::sendWhisper(Socket * s, std::string to, std::vector<std::string> & v)
......
......@@ -25,7 +25,7 @@ public:
bool onWhisper(std::string self, Message * msg);
virtual bool handleTaskPool(void * p);
virtual void reserveTaskSlot();
virtual void addTaskSlot();
virtual void runTask(TaskState * ts, std::string wk, std::string upstream);
virtual bool sendWhisper(Socket * s, std::string to, std::vector<std::string> & v);
......
#include "TaskExecutorForkZmq.hh"
#include <TaskState.hh>
namespace Salsa
{
TaskExecutorForkZmq::TaskExecutorForkZmq()
TaskExecutorForkZmq::TaskExecutorForkZmq(zactor_t * actor)
: TaskExecutor()
, mZactor(actor)
{
///
/// Constructor
......@@ -13,7 +15,10 @@ TaskExecutorForkZmq::~TaskExecutorForkZmq()
///
/// Destructor
///
if (mZactor)
zactor_destroy(&mZactor);
}
bool TaskExecutorForkZmq::init()
{
///
......@@ -21,11 +26,30 @@ bool TaskExecutorForkZmq::init()
///
return true;
}
bool TaskExecutorForkZmq::assigne()
bool TaskExecutorForkZmq::assigne(std::string worker, std::string upstream)
{
///
/// Assigne
///
if (mTaskState == nullptr)
return false;
if (pipe() == nullptr)
return false;
zmsg_t * outMsg = zmsg_new();
zmsg_addstrf(outMsg, "%s", mTaskState->task()->data().c_str());
zmsg_addstr(outMsg, worker.c_str());
zmsg_addstr(outMsg, upstream.c_str());
zmsg_addstr(outMsg, mTaskState->task()->jobid().c_str());
for (int i = 0; i < mTaskState->task()->logtargets_size(); i++)
{
zmsg_addstrf(outMsg, "%s", mTaskState->task()->logtargets(i).c_str());
}
zsock_send(pipe(), "m", outMsg);
zmsg_destroy(&outMsg);
return true;
}
bool TaskExecutorForkZmq::run()
......@@ -42,5 +66,66 @@ bool TaskExecutorForkZmq::exit()
///
return true;
}
void * TaskExecutorForkZmq::pipe() const
{
///
/// Returns pipe
///
return mZactor;
}
bool TaskExecutorForkZmq::handlePipe(std::vector<std::string> & extra)
{
///
/// Handle pipe
///
zmsg_t * message = zmsg_recv(pipe());
if (zframe_streq(zmsg_first(message), "$PID"))
{
char * pidStr = zframe_strdup(zmsg_next(message));
uint32_t pid = static_cast<uint32_t>(strtoul(pidStr, nullptr, 0));
mTaskState->pid(pid);
// mTaskState->state(TaskState::State::running);
std::string payload;
mTaskState->task()->SerializeToString(&payload);
SPD_DEBUG("JOB [{}:{}] PID [{}] started",
mTaskState->task()->jobid(),
mTaskState->task()->taskid(),
pidStr);
free(pidStr);
}
else if (zframe_streq(zmsg_first(message), "$EXIT"))
{
char * exitStatusStr = zframe_strdup(zmsg_next(message));
uint32_t exitStatus = static_cast<uint32_t>(strtoul(exitStatusStr, nullptr, 0));
free(exitStatusStr);
mTaskState->task()->set_returncode(exitStatus);
SPD_DEBUG("JOB [{}:{}] PID [{}] finished with rc [{}] killed [{}]",
mTaskState->task()->jobid(),
mTaskState->task()->taskid(),
mTaskState->pid(),
mTaskState->task()->returncode(),
mTaskState->state() == TaskState::killed);
// mTaskState->state(TaskState::State::idle);
// mTaskState->pid(0);
// std::string payload;
// mTaskState->task()->SerializeToString(&payload);
char * wkuuid = zframe_strdup(zmsg_next(message));
extra.push_back(wkuuid);
char * upstream = zframe_strdup(zmsg_next(message));
extra.push_back(upstream);
free(wkuuid);
free(upstream);
}
zmsg_destroy(&message);
return true;
}
} // namespace Salsa
#pragma once
#include <ActorZmq.hh>
#include <TaskExecutor.hh>
namespace Salsa
......@@ -16,12 +17,17 @@ namespace Salsa
class TaskExecutorForkZmq : public TaskExecutor
{
public:
TaskExecutorForkZmq();
TaskExecutorForkZmq(zactor_t * actor = nullptr);
virtual ~TaskExecutorForkZmq();
virtual bool init();
virtual bool assigne();
virtual bool run();
virtual bool exit();
virtual bool init();
virtual bool assigne(std::string worker, std::string upstream);
virtual bool run();
virtual bool exit();
virtual void * pipe() const;
virtual bool handlePipe(std::vector<std::string> & extra);
protected:
zactor_t * mZactor = nullptr; ///< ZMQ Actor pointer
};
} // namespace Salsa
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment