Commit d2f6f05d authored by Martin Vala's avatar Martin Vala

Fake job ready

parent ec3ac82a
Pipeline #1448 passed with stage
in 14 minutes and 54 seconds
......@@ -67,6 +67,8 @@ void Consumer::onWhisper(Message * inMsg, std::vector<std::string> & out)
int32_t n = 1;
n = mNodeManager->nSlots(1.5);
if (getenv("SALSA_FAKE"))
n *= 10;
if (n != 0)
{
out.push_back("SENDTASKS");
......
......@@ -340,6 +340,10 @@ void NodeManager::resultTask(Salsa::TaskInfo * task)
out.push_back(payload);
int32_t slots = nSlots();
// TODO only for testing, REMOVE IT later
if (getenv("SALSA_FAKE"))
slots *= 10;
if (search->second->size(Job::pending) < slots)
{
if (search->second->haveMoreTasks())
......
#include "TaskExecutorFake.hh"
#include <TaskPool.hh>
namespace Salsa
{
TaskExecutorFake::TaskExecutorFake()
TaskExecutorFake::TaskExecutorFake(TaskPool * tp)
: TaskExecutor()
, mTaskPool(tp)
{
///
/// Constructor
......@@ -13,12 +15,30 @@ TaskExecutorFake::~TaskExecutorFake()
///
/// Destructor
///
delete mPointer;
}
bool TaskExecutorFake::run(std::string /*worker*/, std::string /*upstream*/)
bool TaskExecutorFake::run(std::string worker, std::string upstream)
{
///
/// Run
///
mWorker = worker;
mUpstream = upstream;
SPD_DEBUG("Running fake task worker [{}] upstream [{}]", mWorker, mUpstream);
mTaskState->state(TaskState::State::running);
mTaskPool->handlePipe(mPointer);
return true;
}
bool TaskExecutorFake::handlePipe(std::vector<std::string> & extra)
{
///
/// Handle pipe
///
SPD_DEBUG("Handling pipe for fake task worker [{}] upstream [{}]", mWorker, mUpstream);
extra.push_back(mWorker);
extra.push_back(mUpstream);
return true;
}
void * TaskExecutorFake::pipe() const
......@@ -26,7 +46,7 @@ void * TaskExecutorFake::pipe() const
///
/// Returns pipe
///
return nullptr;
return mPointer;
}
} // namespace Salsa
......@@ -12,14 +12,21 @@ namespace Salsa
/// \author Martin Vala <mvala@cern.ch>
/// \author Branislav Beke <beke.public@yandex.com>
///
class TaskPool;
class TaskExecutorFake : public TaskExecutor
{
public:
TaskExecutorFake();
TaskExecutorFake(TaskPool * tp);
virtual ~TaskExecutorFake();
virtual bool run(std::string, std::string);
virtual void * pipe() const;
virtual bool handlePipe(std::vector<std::string> & extra);
private:
Object * mPointer{new Object()}; ///< Fake pointer
std::string mWorker{}; ///< Worker
std::string mUpstream{}; ///< Upstream
TaskPool * mTaskPool = nullptr; ///< Fake pointer
};
} // namespace Salsa
......@@ -205,7 +205,7 @@ bool TaskPool::handlePipe(void * p)
Job * job = mNodeManager->job(ts->task()->jobid());
if (job != nullptr)
{
SPD_TRACE("TASK ENDED JOB [{}:{}]", ts->task()->jobid(), ts->task()->taskid());
SPD_DEBUG("TASK ENDED JOB [{}:{}]", ts->task()->jobid(), ts->task()->taskid());
job->removeTask(ts->task()->taskid(), Salsa::Job::running);
}
std::vector<std::string> out;
......@@ -216,18 +216,18 @@ bool TaskPool::handlePipe(void * p)
ts = findFreeTask();
if (ts && ts->id() > 0)
{
SPD_TRACE("AFTER TASK_RESULT sending reserving task [{}]", ts->id());
SPD_DEBUG("AFTER TASK_RESULT sending reserving task [{}]", ts->id());
out.push_back("&");
out.push_back("FREESLOT");
out.push_back(fmt::format("{}", ts->id()));
ts->state(TaskState::assigned);
}
SPD_TRACE("Searching to worker [{}]", wkuuid);
SPD_DEBUG("Searching to worker [{}]", wkuuid);
std::shared_ptr<Salsa::Worker> wk = mNodeManager->worker(wkuuid);
if (wk)
{
SPD_TRACE("Sending via wk [{}] to feeder [{}]", wkuuid, upstream);
SPD_DEBUG("Sending via wk [{}] to feeder [{}]", wkuuid, upstream);
mNodeManager->sendWhisper(wk->pipe().get(), upstream, out);
}
}
......
#include "NodeManagerZyre.hh"
#include <MessageZyre.hh>
#include <SocketZyre.hh>
#include <TaskExecutorFake.hh>
#include <TaskExecutorForkZmq.hh>
namespace Salsa
{
......@@ -63,12 +64,24 @@ void NodeManagerZyre::addTaskSlot()
if (mTaskPool == nullptr)
mTaskPool = new TaskPool(this);
zactor_t * act = zactor_new(Salsa::ActorZmq::SalsaActorForkFn, nullptr);
TaskExecutor * te = new TaskExecutorForkZmq(act);
TaskState * ts = new TaskState(te);
te->taskState(ts);
mNodeZyre->pollerZmq()->add(static_cast<zactor_t *>(ts->executor()->pipe()));
mTaskPool->add(ts->executor()->pipe(), ts);
if (getenv("SALSA_FAKE"))
{
SPD_DEBUG("Fake jobs");
TaskExecutor * te = new TaskExecutorFake(mTaskPool);
TaskState * ts = new TaskState(te);
te->taskState(ts);
mTaskPool->add(ts->executor()->pipe(), ts);
}
else
{
zactor_t * act = zactor_new(Salsa::ActorZmq::SalsaActorForkFn, nullptr);
TaskExecutor * te = new TaskExecutorForkZmq(act);
TaskState * ts = new TaskState(te);
te->taskState(ts);
mNodeZyre->pollerZmq()->add(static_cast<zactor_t *>(ts->executor()->pipe()));
mTaskPool->add(ts->executor()->pipe(), ts);
}
}
void NodeManagerZyre::runTask(TaskState * ts, std::string wk, std::string upstream)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment