Commit 157da5d1 authored by Martin Vala's avatar Martin Vala

Merge branch '51-fake-task-mode'

parents a568cf11 d2f6f05d
Pipeline #1452 passed with stage
in 14 minutes and 51 seconds
......@@ -67,6 +67,8 @@ void Consumer::onWhisper(Message * inMsg, std::vector<std::string> & out)
int32_t n = 1;
n = mNodeManager->nSlots(1.5);
if (getenv("SALSA_FAKE"))
n *= 10;
if (n != 0)
{
out.push_back("SENDTASKS");
......
......@@ -340,6 +340,10 @@ void NodeManager::resultTask(Salsa::TaskInfo * task)
out.push_back(payload);
int32_t slots = nSlots();
// TODO only for testing, REMOVE IT later
if (getenv("SALSA_FAKE"))
slots *= 10;
if (search->second->size(Job::pending) < slots)
{
if (search->second->haveMoreTasks())
......@@ -423,12 +427,27 @@ std::shared_ptr<Worker> NodeManager::worker(std::string uuid) const
return nullptr;
}
void NodeManager::reserveTaskSlot()
Job * NodeManager::job(std::string uuid)
{
///
/// Returns job
/// /param uuid UUID
///
auto search = mJobs.find(uuid);
if (search != mJobs.end())
{
return search->second;
}
return nullptr;
}
void NodeManager::addTaskSlot()
{
///
/// Reserve task slot
///
mTaskPool = new TaskPool();
if (mTaskPool == nullptr)
mTaskPool = new TaskPool(this);
}
bool NodeManager::handleTaskPool(void * /*p*/)
......
......@@ -38,9 +38,10 @@ public:
std::shared_ptr<Feeder> feeder(std::string uuid) const;
std::shared_ptr<Consumer> consumer(std::string uuid) const;
std::shared_ptr<Worker> worker(std::string uuid) const;
Job * job(std::string uuid);
TaskPool * taskPool();
virtual void reserveTaskSlot();
virtual void addTaskSlot();
bool hasJobs() const;
int32_t nSlots(double mult = 1.0) const;
void jobs(std::string client_uuid, std::vector<std::string> & jobs) const;
......@@ -55,11 +56,11 @@ public:
virtual bool sendWhisper(Socket * s, std::string to, std::vector<std::string> & v);
protected:
std::map<std::string, Job *> mJobs{}; ///< List of jobs
std::vector<std::string> mActiveJobs{}; ///< List of active jobs
std::map<std::string, std::shared_ptr<Worker>> mWorkers{}; ///< List of Workers
std::map<std::string, std::shared_ptr<Consumer>> mConsumers{}; ///< List of Consumers
std::map<std::string, std::shared_ptr<Feeder>> mFeeders{}; ///< List of Feeders
TaskPool * mTaskPool = 0; ///< Task pool
std::map<std::string, Job *> mJobs{}; ///< List of jobs
std::vector<std::string> mActiveJobs{}; ///< List of active jobs
std::map<std::string, std::shared_ptr<Worker>> mWorkers{}; ///< List of Workers
std::map<std::string, std::shared_ptr<Consumer>> mConsumers{}; ///< List of Consumers
std::map<std::string, std::shared_ptr<Feeder>> mFeeders{}; ///< List of Feeders
TaskPool * mTaskPool = nullptr; ///< Task pool
};
} // namespace Salsa
#include "TaskExecutor.hh"
namespace Salsa
{
TaskExecutor::TaskExecutor()
: Object()
{
///
/// Constructor
///
}
TaskExecutor::~TaskExecutor()
{
///
/// Destructor
///
}
void * TaskExecutor::pipe() const
{
///
/// Returns pointer to pipe
///
return nullptr;
}
void TaskExecutor::taskState(TaskState * ts)
{
///
/// Sets task state
///
mTaskState = ts;
}
TaskState * TaskExecutor::taskState() const
{
///
/// Returns task state
///
return mTaskState;
}
} // namespace Salsa
#pragma once
#include <TaskState.hh>
namespace Salsa
{
///
/// \class TaskExecutor
///
/// \brief Base TaskExecutor class
/// \author Matej Fedor <matej.fedor.mf@gmail.com>
/// \author Martin Vala <mvala@cern.ch>
/// \author Branislav Beke <beke.public@yandex.com>
///
class TaskExecutor : public Object
{
public:
TaskExecutor();
virtual ~TaskExecutor();
/// run
virtual bool run(std::string, std::string) = 0;
/// Handle pipe
virtual bool handlePipe(std::vector<std::string> &) = 0;
virtual void * pipe() const;
void taskState(TaskState * ts);
TaskState * taskState() const;
protected:
TaskState * mTaskState = nullptr; ///< Task state
};
} // namespace Salsa
#include "TaskExecutorFake.hh"
#include <TaskPool.hh>
namespace Salsa
{
TaskExecutorFake::TaskExecutorFake(TaskPool * tp)
: TaskExecutor()
, mTaskPool(tp)
{
///
/// Constructor
///
}
TaskExecutorFake::~TaskExecutorFake()
{
///
/// Destructor
///
delete mPointer;
}
bool TaskExecutorFake::run(std::string worker, std::string upstream)
{
///
/// Run
///
mWorker = worker;
mUpstream = upstream;
SPD_DEBUG("Running fake task worker [{}] upstream [{}]", mWorker, mUpstream);
mTaskState->state(TaskState::State::running);
mTaskPool->handlePipe(mPointer);
return true;
}
bool TaskExecutorFake::handlePipe(std::vector<std::string> & extra)
{
///
/// Handle pipe
///
SPD_DEBUG("Handling pipe for fake task worker [{}] upstream [{}]", mWorker, mUpstream);
extra.push_back(mWorker);
extra.push_back(mUpstream);
return true;
}
void * TaskExecutorFake::pipe() const
{
///
/// Returns pipe
///
return mPointer;
}
} // namespace Salsa
#pragma once
#include <TaskExecutor.hh>
namespace Salsa
{
///
/// \class TaskExecutorFake
///
/// \brief TaskExecutorFake class
/// \author Matej Fedor <matej.fedor.mf@gmail.com>
/// \author Martin Vala <mvala@cern.ch>
/// \author Branislav Beke <beke.public@yandex.com>
///
class TaskPool;
class TaskExecutorFake : public TaskExecutor
{
public:
TaskExecutorFake(TaskPool * tp);
virtual ~TaskExecutorFake();
virtual bool run(std::string, std::string);
virtual void * pipe() const;
virtual bool handlePipe(std::vector<std::string> & extra);
private:
Object * mPointer{new Object()}; ///< Fake pointer
std::string mWorker{}; ///< Worker
std::string mUpstream{}; ///< Upstream
TaskPool * mTaskPool = nullptr; ///< Fake pointer
};
} // namespace Salsa
#include "TaskPool.hh"
#include <Job.hh>
#include <NodeManager.hh>
#include <TaskExecutor.hh>
namespace Salsa
{
TaskPool::TaskPool()
TaskPool::TaskPool(NodeManager * nm)
: Object()
, mNodeManager(nm)
{
///
/// Constructor
......@@ -158,4 +161,79 @@ void TaskPool::print(bool verbose) const
stat[TaskState::running]);
}
bool TaskPool::handlePipe(void * p)
{
///
/// Handle pipe
///
TaskState * ts = find(p);
if (ts == nullptr)
{
SPD_ERROR("ts by actor [{}] is null!!!", static_cast<void *>(p));
return false;
}
if (ts->executor() == nullptr)
{
SPD_ERROR("ts->executor() by actor [{}] is null!!!", static_cast<void *>(p));
return false;
}
if (ts->executor()->pipe() == nullptr)
{
SPD_ERROR("ts->executor()->pipe() by actor [{}] is null!!!", static_cast<void *>(p));
return false;
}
std::vector<std::string> extra;
ts->executor()->handlePipe(extra);
TaskState::State state = ts->state();
if (state == TaskState::assigned)
{
ts->state(TaskState::running);
// handle mJobs
// Nothing for now i think
}
else if (state == TaskState::running)
{
ts->state(TaskState::idle);
ts->pid(0);
std::string wkuuid = extra[0];
std::string upstream = extra[1];
// handle mJobs
Job * job = mNodeManager->job(ts->task()->jobid());
if (job != nullptr)
{
SPD_DEBUG("TASK ENDED JOB [{}:{}]", ts->task()->jobid(), ts->task()->taskid());
job->removeTask(ts->task()->taskid(), Salsa::Job::running);
}
std::vector<std::string> out;
out.push_back("TASK_RESULT");
std::string payload;
ts->task()->SerializeToString(&payload);
out.push_back(payload);
ts = findFreeTask();
if (ts && ts->id() > 0)
{
SPD_DEBUG("AFTER TASK_RESULT sending reserving task [{}]", ts->id());
out.push_back("&");
out.push_back("FREESLOT");
out.push_back(fmt::format("{}", ts->id()));
ts->state(TaskState::assigned);
}
SPD_DEBUG("Searching to worker [{}]", wkuuid);
std::shared_ptr<Salsa::Worker> wk = mNodeManager->worker(wkuuid);
if (wk)
{
SPD_DEBUG("Sending via wk [{}] to feeder [{}]", wkuuid, upstream);
mNodeManager->sendWhisper(wk->pipe().get(), upstream, out);
}
}
print();
return true;
}
} // namespace Salsa
......@@ -13,10 +13,11 @@ namespace Salsa
/// \author Branislav Beke <beke.public@yandex.com>
///
class Job;
class NodeManager;
class TaskPool : public Object
{
public:
TaskPool();
TaskPool(NodeManager * nm);
virtual ~TaskPool();
void add(void * p, TaskState * t);
......@@ -26,10 +27,12 @@ public:
void changeState(uint32_t id, TaskState::State state);
uint32_t nSlotFree();
bool terminateJob(Job * job);
bool handlePipe(void * p);
void print(bool verbose = false) const;
protected:
std::map<void *, TaskState *> mTasks{}; ///< List of task slots
std::map<void *, TaskState *> mTasks{}; ///< List of task slots
NodeManager * mNodeManager = nullptr; ///< Node manager
};
} // namespace Salsa
#include "TaskState.hh"
#include <TaskExecutor.hh>
namespace Salsa
{
TaskState::TaskState()
TaskState::TaskState(TaskExecutor * te)
: Object()
, mTaskExecutor(te)
{
///
/// Constructor
......@@ -15,6 +17,7 @@ TaskState::~TaskState()
/// Destructor
///
delete mTask;
delete mTaskExecutor;
}
void TaskState::id(uint32_t id)
......@@ -79,6 +82,14 @@ void TaskState::task(TaskInfo * t)
mTask = t;
}
TaskExecutor * TaskState::executor()
{
///
/// Returns task executor
///
return mTaskExecutor;
}
void TaskState::killTask()
{
///
......
......@@ -13,7 +13,7 @@ namespace Salsa
/// \author Martin Vala <mvala@cern.ch>
/// \author Branislav Beke <beke.public@yandex.com>
///
class TaskExecutor;
class TaskState : public Object
{
public:
......@@ -27,25 +27,27 @@ public:
all
};
TaskState();
TaskState(TaskExecutor * te = nullptr);
virtual ~TaskState();
void id(uint32_t id);
uint32_t id() const;
void state(State s);
State state() const;
TaskInfo * task() const;
void task(TaskInfo * t);
void print(bool verbose = false) const;
void pid(uint32_t pid);
uint32_t pid() const;
void killTask();
void id(uint32_t id);
uint32_t id() const;
void state(State s);
State state() const;
TaskInfo * task() const;
void task(TaskInfo * t);
void print(bool verbose = false) const;
void pid(uint32_t pid);
uint32_t pid() const;
void killTask();
TaskExecutor * executor();
protected:
uint32_t mId = 0; ///< ID of task state
State mState = idle; ///< Status of actor
TaskInfo * mTask = nullptr; ///< TaskInfo held by said actor
uint32_t mPID = 0; ///< Task PID
uint32_t mId = 0; ///< ID of task state
State mState = idle; ///< Status of actor
TaskInfo * mTask = nullptr; ///< TaskInfo held by said actor
uint32_t mPID = 0; ///< Task PID
TaskExecutor * mTaskExecutor = nullptr; ///< Task Executor
};
} // namespace Salsa
......@@ -43,7 +43,7 @@ Worker::Worker(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm)
for (int i = 0; i < mNumCores; i++)
{
SPD_TRACE("Worker [{}] slot [{}]", mUUID, i);
mNodeManager->reserveTaskSlot();
mNodeManager->addTaskSlot();
}
mNodeInfo->set_uuid(mUUID);
mNodeInfo->set_slots(mNumCores);
......
#include "NodeManagerZyre.hh"
#include <MessageZyre.hh>
#include <SocketZyre.hh>
#include <TaskStateZmq.hh>
#include <TaskExecutorFake.hh>
#include <TaskExecutorForkZmq.hh>
namespace Salsa
{
NodeManagerZyre::NodeManagerZyre(NodeZyre * nz)
......@@ -55,17 +56,32 @@ bool NodeManagerZyre::onWhisper(std::string self, Message * msg)
return sendWhisper(s, msg->uuid(), values);
}
void NodeManagerZyre::reserveTaskSlot()
void NodeManagerZyre::addTaskSlot()
{
///
/// Reserve task slot
///
TaskStateZmq * ts = new TaskStateZmq(zactor_new(Salsa::ActorZmq::SalsaActorForkFn, nullptr));
mNodeZyre->pollerZmq()->add(ts->actor());
if (mTaskPool == nullptr)
mTaskPool = new TaskPool();
mTaskPool->add(ts->actor(), ts);
mTaskPool = new TaskPool(this);
if (getenv("SALSA_FAKE"))
{
SPD_DEBUG("Fake jobs");
TaskExecutor * te = new TaskExecutorFake(mTaskPool);
TaskState * ts = new TaskState(te);
te->taskState(ts);
mTaskPool->add(ts->executor()->pipe(), ts);
}
else
{
zactor_t * act = zactor_new(Salsa::ActorZmq::SalsaActorForkFn, nullptr);
TaskExecutor * te = new TaskExecutorForkZmq(act);
TaskState * ts = new TaskState(te);
te->taskState(ts);
mNodeZyre->pollerZmq()->add(static_cast<zactor_t *>(ts->executor()->pipe()));
mTaskPool->add(ts->executor()->pipe(), ts);
}
}
void NodeManagerZyre::runTask(TaskState * ts, std::string wk, std::string upstream)
......@@ -74,22 +90,12 @@ void NodeManagerZyre::runTask(TaskState * ts, std::string wk, std::string upstre
/// Run task
///
SPD_TRACE(
"Task [{}:{}] from upstream [{}]", ts->task()->jobid(), ts->task()->taskid(), upstream);
zmsg_t * outMsg = zmsg_new();
zmsg_addstrf(outMsg, "%s", ts->task()->data().c_str());
zmsg_addstr(outMsg, wk.c_str());
zmsg_addstr(outMsg, upstream.c_str());
zmsg_addstr(outMsg, ts->task()->jobid().c_str());
for (int i = 0; i < ts->task()->logtargets_size(); i++)
{
zmsg_addstrf(outMsg, "%s", ts->task()->logtargets(i).c_str());
}
TaskStateZmq * tsz = static_cast<TaskStateZmq *>(ts);
zsock_send(tsz->actor(), "m", outMsg);
zmsg_destroy(&outMsg);
SPD_TRACE("Task [{}:{}] wk [{}] upstream [{}]",
ts->task()->jobid(),
ts->task()->taskid(),
wk,
upstream);
ts->executor()->run(wk, upstream);
}
bool NodeManagerZyre::handleTaskPool(void * p)
......@@ -101,88 +107,7 @@ bool NodeManagerZyre::handleTaskPool(void * p)
if (mTaskPool == nullptr)
return false;
TaskStateZmq * ts = dynamic_cast<TaskStateZmq *>(mTaskPool->find(p));
if (ts == nullptr)
{
SPD_ERROR("ts by actor [{}] is null!!!", static_cast<void *>(p));
return false;
}
zmsg_t * message = zmsg_recv(ts->actor());
if (zframe_streq(zmsg_first(message), "$PID"))
{
char * pidStr = zframe_strdup(zmsg_next(message));
uint32_t pid = static_cast<uint32_t>(strtoul(pidStr, nullptr, 0));
ts->pid(pid);
ts->state(TaskState::State::running);
std::string payload;
ts->task()->SerializeToString(&payload);
SPD_DEBUG(
"JOB [{}:{}] PID [{}] started", ts->task()->jobid(), ts->task()->taskid(), pidStr);
free(pidStr);
}
else if (zframe_streq(zmsg_first(message), "$EXIT"))
{
char * exitStatusStr = zframe_strdup(zmsg_next(message));
uint32_t exitStatus = static_cast<uint32_t>(strtoul(exitStatusStr, nullptr, 0));
free(exitStatusStr);
ts->task()->set_returncode(exitStatus);
SPD_DEBUG("JOB [{}:{}] PID [{}] finished with rc [{}] killed [{}]",
ts->task()->jobid(),
ts->task()->taskid(),
ts->pid(),
ts->task()->returncode(),
ts->state() == TaskState::killed);
ts->state(TaskState::State::idle);
ts->pid(0);
std::string payload;
ts->task()->SerializeToString(&payload);
char * wkuuid = zframe_strdup(zmsg_next(message));
char * upstream = zframe_strdup(zmsg_next(message));
auto search = mJobs.find(ts->task()->jobid());
if (search != mJobs.end())
{
SPD_TRACE("TASK ENDED JOB [{}:{}]", ts->task()->jobid(), ts->task()->taskid());
search->second->removeTask(ts->task()->taskid(), Salsa::Job::running);
}
std::vector<std::string> out;
out.push_back("TASK_RESULT");
out.push_back(payload);
if (mTaskPool)
{
TaskState * ts = mTaskPool->findFreeTask();
if (ts && ts->id() > 0)
{
SPD_TRACE("AFTER TASK_RESULT sending reserving task [{}]", ts->id());
out.push_back("&");
out.push_back("FREESLOT");
out.push_back(fmt::format("{}", ts->id()));
ts->state(TaskState::assigned);
}
SPD_TRACE("Searching to worker [{}]", wkuuid);
std::shared_ptr<Salsa::Worker> wk = worker(wkuuid);
if (wk)
{
SPD_TRACE("Sending via wk [{}] to feeder [{}]", wkuuid, upstream);
sendWhisper(wk->pipe().get(), upstream, out);
}
}
free(wkuuid);
free(upstream);
}
zmsg_destroy(&message);
if (mTaskPool)
mTaskPool->print();
return true;
return mTaskPool->handlePipe(p);
}
bool NodeManagerZyre::sendWhisper(Socket * s, std::string to, std::vector<std::string> & v)
......
......@@ -25,7 +25,7 @@ public:
bool onWhisper(std::string self, Message * msg);
virtual bool handleTaskPool(void * p);
virtual void reserveTaskSlot();
virtual void addTaskSlot();
virtual void runTask(TaskState * ts, std::string wk, std::string upstream);
virtual bool sendWhisper(Socket * s, std::string to, std::vector<std::string> & v);
......
#include "TaskExecutorForkZmq.hh"
#include <TaskState.hh>
namespace Salsa
{
TaskExecutorForkZmq::TaskExecutorForkZmq(zactor_t * actor)
: TaskExecutor()
, mZactor(actor)
{
///
/// Constructor
///
}
TaskExecutorForkZmq::~TaskExecutorForkZmq()
{
///
/// Destructor
///
if (mZactor)
zactor_destroy(&mZactor);
}
bool TaskExecutorForkZmq::run(std::string worker, std::string upstream)
{
///
/// Run
///
if (mTaskState == nullptr)
return false;
if (pipe() == nullptr)
return false;
zmsg_t * outMsg = zmsg_new();
zmsg_addstrf(outMsg, "%s", mTaskState->task()->data().c_str());
zmsg_addstr(outMsg, worker.c_str());
zmsg_addstr(outMsg, upstream.c_str());
zmsg_addstr(outMsg, mTaskState->task()->jobid().c_str());
for (int i = 0; i < mTaskState->task()->logtargets_size(); i++)
{
zmsg_addstrf(outMsg, "%s", mTaskState->task()->logtargets(i).c_str());
}
zsock_send(pipe(), "m", outMsg);
zmsg_destroy(&outMsg);
return true;
}
void * TaskExecutorForkZmq::pipe() const
{
///
/// Returns pipe
///
return mZactor;
}
bool TaskExecutorForkZmq::handlePipe(std::vector<std::string> & extra)
{
///
/// Handle pipe
///
zmsg_t * message = zmsg_recv(pipe());
if (zframe_streq(zmsg_first(message), "$PID"))
{
char * pidStr = zframe_strdup(zmsg_next(message));
uint32_t pid = static_cast<uint32_t>(strtoul(pidStr, nullptr, 0));
mTaskState->pid(pid);
// mTaskState->state(TaskState::State::running);
std::string payload;
mTaskState->task()->SerializeToString(&payload);
SPD_DEBUG("JOB [{}:{}] PID [{}] started",
mTaskState->task()->jobid(),
mTaskState->task()->taskid(),
pidStr);
free(pidStr);
}
else if (zframe_streq(zmsg_first(message), "$EXIT"))
{
char * exitStatusStr = zframe_strdup(zmsg_next(message));
uint32_t exitStatus = static_cast<uint32_t>(strtoul(exitStatusStr, nullptr, 0));
free(exitStatusStr);
mTaskState->task()->set_returncode(exitStatus);
SPD_DEBUG("JOB [{}:{}] PID [{}] finished with rc [{}] killed [{}]",
mTaskState->task()->jobid(),
mTaskState->task()->taskid(),
mTaskState->pid(),
mTaskState->task()->returncode(),
mTaskState->state() == TaskState::killed);
// mTaskState->state(TaskState::State::idle);
// mTaskState->pid(0);
// std::string payload;
// mTaskState->task()->SerializeToString(&payload);
char * wkuuid = zframe_strdup(zmsg_next(message));
extra.push_back(wkuuid);
char * upstream = zframe_strdup(zmsg_next(message));