Commit e2e3cc70 authored by Branislav Beke's avatar Branislav Beke 🙃

Merge branch '58-options-for-feeder-to-generate-jobs' into 'master'

Resolve "Options for feeder to generate jobs"

Closes #58

See merge request !43
parents ab244bed 2c97b262
Pipeline #1536 passed with stage
in 15 minutes and 56 seconds
......@@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 2.8 FATAL_ERROR)
set(CPACK_PACKAGE_VERSION_MAJOR "0")
set(CPACK_PACKAGE_VERSION_MINOR "2")
set(CPACK_PACKAGE_VERSION_PATCH "0")
set(CPACK_PACKAGE_VERSION_RELEASE "0.4")
set(CPACK_PACKAGE_VERSION_RELEASE "0.5")
include(cmake/cmake_base.cmake)
......
......@@ -193,7 +193,7 @@ bool TaskPool::handlePipe(void * p)
// handle mJobs
// Nothing for now i think
}
else if (state == TaskState::running)
else if (state == TaskState::running || state == TaskState::killed)
{
ts->state(TaskState::idle);
ts->pid(0);
......
......@@ -99,33 +99,22 @@ void TaskState::killTask()
if (mPID == 0)
return;
int rc = kill(mPID, SIGTERM);
int rc = kill(mPID, SIGKILL);
if (rc == 0)
{
SPD_DEBUG("JOB [{}:{}] PID [{}] was killed", mTask->jobid(), mTask->taskid(), mPID);
}
else
{
SPD_WARN("JOB [{}:{}] PID [{}] could not be killed !!! Trying to kill it via [SIGKILL] ...",
SPD_WARN("JOB [{}:{}] PID [{}] was killed by [SIGKILL] signal",
mTask->jobid(),
mTask->taskid(),
mPID);
rc = kill(mPID, SIGKILL);
if (rc == 0)
{
SPD_WARN("JOB [{}:{}] PID [{}] was killed by [SIGKILL] signal",
mTask->jobid(),
mTask->taskid(),
mPID);
}
else
{
SPD_ERROR("JOB [{}:{}] PID [{}] could not be killed it [SIGKILL] !!!",
mTask->jobid(),
mTask->taskid(),
mPID);
}
}
else
{
SPD_ERROR("JOB [{}:{}] PID [{}] could not be killed it [SIGKILL] !!!",
mTask->jobid(),
mTask->taskid(),
mPID);
}
// mPID = 0;
mState = killed;
}
......
......@@ -9,7 +9,7 @@
#define SALSA_VERSION_MAJOR(version) (static_cast<uint32_t>(version) >> 22)
#define SALSA_VERSION_MINOR(version) ((static_cast<uint32_t>(version) >> 12) & 0x3ff)
#define SALSA_VERSION_PATCH(version) (static_cast<uint32_t>(version) & 0xfff)
#define SALSA_VERSION_RELEASE @CPACK_PACKAGE_VERSION_RELEASE@ // Maybe unused?
#define SALSA_VERSION_RELEASE "@CPACK_PACKAGE_VERSION_RELEASE@"
#define SALSA_VERSION SALSA_MAKE_VERSION(@CPACK_PACKAGE_VERSION_MAJOR@, @CPACK_PACKAGE_VERSION_MINOR@, @CPACK_PACKAGE_VERSION_PATCH@)
......
......@@ -5,6 +5,7 @@
#include <memory>
#include <salsa.hh>
#include <spdlog/sinks/stdout_sinks.h>
#include <sstream>
#include <zyre.h>
[[noreturn]] void help()
{
......@@ -31,6 +32,8 @@ int main(int argc, char ** argv)
int debugLevel = static_cast<int>(spdlog::level::info);
// input filename
std::string inputFilename = "";
// input template
std::string inputTemplate = "";
// dicovery url
std::string discoveryUrl = "";
// endpoint url
......@@ -39,10 +42,11 @@ int main(int argc, char ** argv)
int port = 10000;
// ***** Default values END *****
std::string short_options = "hf:p:d:W;";
std::string short_options = "hf:t:p:d:W;";
struct option long_options[] = {
{"help", no_argument, nullptr, 'h'},
{"file", required_argument, nullptr, 'f'},
{"template", required_argument, nullptr, 't'},
{"port", required_argument, nullptr, 'p'},
{"discovery", required_argument, nullptr, 'd'},
{"endpoint", required_argument, nullptr, 'e'},
......@@ -65,6 +69,11 @@ int main(int argc, char ** argv)
help();
case 'f':
inputFilename = optarg;
inputTemplate = "";
break;
case 't':
inputTemplate = optarg;
inputFilename = "";
break;
case 'p':
port = atoi(optarg);
......@@ -89,7 +98,7 @@ int main(int argc, char ** argv)
SALSA_VERSION_PATCH(SALSA_VERSION),
SALSA_VERSION_RELEASE);
if (inputFilename.empty())
if (inputFilename.empty() && inputTemplate.empty())
help();
zuuid_t * jobuuid = zuuid_new();
......@@ -97,33 +106,64 @@ int main(int argc, char ** argv)
zuuid_destroy(&jobuuid);
std::string salsa_type = "FEEDER";
Salsa::Job jobs(salsa_jobid, salsa_type);
Salsa::Job jobs(salsa_jobid, salsa_type);
uint32_t id = 0;
std::string payload;
std::string command;
if (!inputFilename.empty())
{
std::ifstream ifs(inputFilename);
std::ifstream ifs(inputFilename);
uint32_t id = 0;
std::string command = "";
std::string payload = "";
do
do
{
std::getline(ifs, command);
if (command.empty())
{
continue;
}
Salsa::TaskInfo * ji = new Salsa::TaskInfo();
ji->set_clientid(getuid());
ji->set_groupid(getgid());
ji->set_jobid(salsa_jobid);
ji->set_taskid(id);
ji->set_data(command);
ji->clear_logtargets();
// ji->add_logtargets(fmt::format("file:///tmp/salsa-{}.log", id));
jobs.addTask(id, ji, Salsa::Job::pending);
id++;
} while (!command.empty());
}
if (!inputTemplate.empty())
{
std::getline(ifs, command);
if (command.empty())
std::stringstream ss(inputTemplate);
std::string item;
std::vector<std::string> elems;
while (std::getline(ss, item, ':'))
{
continue;
elems.push_back(std::move(item));
}
uint32_t n = atoi(elems[1].c_str());
for (uint32_t i = 0; i < n; ++i)
{
command = elems[0];
Salsa::TaskInfo * ji = new Salsa::TaskInfo();
ji->set_clientid(getuid());
ji->set_groupid(getgid());
ji->set_jobid(salsa_jobid);
ji->set_taskid(id);
ji->set_data(command);
ji->clear_logtargets();
// ji->add_logtargets(fmt::format("file:///tmp/salsa-{}.log", id));
jobs.addTask(id, ji, Salsa::Job::pending);
id++;
}
Salsa::TaskInfo * ji = new Salsa::TaskInfo();
ji->set_clientid(getuid());
ji->set_groupid(getgid());
ji->set_jobid(salsa_jobid);
ji->set_taskid(id);
ji->set_data(command);
ji->clear_logtargets();
ji->add_logtargets(fmt::format("file:///tmp/salsa-{}.log", id));
// ji.SerializeToString(&payload);
jobs.addTask(id, ji, Salsa::Job::pending);
id++;
} while (!command.empty());
}
zyre_t * feeder = zyre_new("feeder");
zyre_set_header(feeder, "X-SALSA-NODE-TYPE", "%s", salsa_type.c_str());
......
......@@ -151,7 +151,8 @@ void ActorZmq::SalsaActorForkFn(zsock_t * pPipe, void *)
if (pid == 0)
{
// TODDO this needs to be improved if we cannot set UUID
// TODO this needs to be improved if we cannot set UID
// TODO Chekc if uid is equal to process uid (in non-root case)
if (getuid() == 0)
{
SPD_TRACE(
......@@ -240,12 +241,17 @@ void ActorZmq::SalsaActorForkFn(zsock_t * pPipe, void *)
zactor_destroy(&pWatcherActor);
close(pipefd[0]);
int rc = WEXITSTATUS(stat);
// In case of kill -9 : returning 137
if (stat == 9)
rc = 137;
SPD_TRACE("PA Parent: Exit [{}]", stat);
SPD_TRACE("PA Parent: Exit [{}] rc [{}]", stat, rc);
{
zmsg_t * pTx = zmsg_new();
zmsg_addstr(pTx, "$EXIT");
zmsg_addstrf(pTx, "%d", WEXITSTATUS(stat));
zmsg_addstrf(pTx, "%d", rc);
zmsg_addstr(pTx, pWorker);
zmsg_addstr(pTx, pUpsream);
zmsg_addstr(pTx, pClient);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment