Commit 78aeb6c7 authored by Martin Vala's avatar Martin Vala

Merge branch '28-poller-for-job-logs' into 'master'

Resolve "Poller for job logs"

Closes #28

See merge request !26
parents 35b06952 0df55b81
Pipeline #945 passed with stage
in 6 minutes and 12 seconds
......@@ -65,9 +65,6 @@ void ActorZmq::SalsaActorForkFn(zsock_t * pipe, void *)
pid_t pid = 0;
std::shared_ptr<spdlog::logger> logger = spdlog::get("console");
zpoller_t * watcherPoller = zpoller_new(nullptr);
zpoller_add(watcherPoller, pipe);
while (true)
{
zmsg_t * rx = zmsg_recv(pipe);
......@@ -138,7 +135,13 @@ void ActorZmq::SalsaActorForkFn(zsock_t * pipe, void *)
} while (tmp);
cmd_[count] = nullptr;
SPD_TRACE("[[CHILD]] -> Sleeping for 100ms");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
SPD_TRACE("[[CHILD]] -> Configuring pipes");
// After these lines you'll be unable to log anything to console, so don't even
// try. It's literally a waste of time.
close(pipefd[0]);
dup2(pipefd[1], STDOUT_FILENO);
dup2(pipefd[1], STDERR_FILENO);
......@@ -159,51 +162,24 @@ void ActorZmq::SalsaActorForkFn(zsock_t * pipe, void *)
zmsg_destroy(&tx);
}
const int LIMIT = PIPE_BUF;
int stat = -1;
char buffer[LIMIT + 1] = {'\0'};
int stat = -1;
close(pipefd[1]);
zactor_t * watcherActor = zactor_new(actor_procwait_support_, &pid);
zpoller_add(watcherPoller, watcherActor);
log.set_fd(pipefd[0]);
zactor_t * watcherActor = zactor_new(actor_procwait_support_, &log);
SPD_DEBUG("[[PARENT]] Running command...");
// Read from pipe until child dies
int waitPeriod = 200;
while (true)
{
if (waitPeriod != -1)
waitpid(pid, &stat, WUNTRACED);
if (WIFEXITED(stat) || WIFSIGNALED(stat))
{
ssize_t readRet = read(pipefd[0], buffer, LIMIT);
if (readRet > 0)
{
if (buffer[0] != '\0')
{
log.write(buffer);
memset(buffer, 0, sizeof(buffer));
}
}
else if (readRet == 0)
{
waitPeriod = -1;
SPD_DEBUG("[[ PARENT ]] END of file, waitPeriod=-1");
}
}
// Possible death race condition... I'm looking at you Valgrind
void * recvSock = zpoller_wait(watcherPoller, waitPeriod);
if (recvSock == watcherActor)
{
char * msg = zstr_recv(watcherActor);
std::string recvStat = msg;
free(msg);
stat = std::stoi(recvStat);
SPD_TRACE("[[PARENT]] Actor ended [{}]", stat);
zstr_sendf(watcherActor, "$EXIT");
break;
}
}
zpoller_remove(watcherPoller, watcherActor);
zactor_destroy(&watcherActor);
close(pipefd[0]);
......@@ -233,28 +209,55 @@ void ActorZmq::SalsaActorForkFn(zsock_t * pipe, void *)
} // END execute command
} // END while (true)
zpoller_destroy(&watcherPoller);
SPD_TRACE("Terminating persistent actor");
return;
}
void ActorZmq::actor_procwait_support_(zsock_t * pipe, void * pPID)
void ActorZmq::actor_procwait_support_(zsock_t * pipe, void * loggerObject)
{
zsock_signal(pipe, 0);
pid_t PID = *static_cast<pid_t *>(pPID);
SPD_DEBUG("[[PSupport]] Got PID [{}]", PID);
Log & commandLogger = *(static_cast<Log *>(loggerObject));
const int LIMIT = PIPE_BUF;
char buffer[LIMIT + 1] = {'\0'};
int fd = commandLogger.get_fd();
zpoller_t * poller = zpoller_new(nullptr);
zpoller_add(poller, pipe);
zpoller_add(poller, &fd);
int stat = 0;
while (true)
{
waitpid(PID, &stat, WUNTRACED);
if (WIFEXITED(stat) || WIFSIGNALED(stat))
// Possible death race condition... I'm looking at you Valgrind
void * recvSock = zpoller_wait(poller, -1);
if (recvSock == pipe)
{
zstr_sendf(pipe, "%d", stat);
break;
char * msg = zstr_recv(pipe);
std::string recvMsg = msg;
free(msg);
if (recvMsg == "$EXIT")
{
break;
}
}
else if (recvSock == &fd)
{
ssize_t readRet = read(fd, buffer, LIMIT);
if (readRet > 0)
{
if (buffer[0] != '\0')
{
commandLogger.write(buffer);
memset(buffer, 0, sizeof(buffer));
}
}
}
}
zpoller_remove(poller, pipe);
zpoller_remove(poller, &fd);
zpoller_destroy(&poller);
return;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment