Commit 91653fdf authored by Branislav Beke's avatar Branislav Beke 🙃 Committed by Martin Vala

Major code refactoring

parent cb7a0b7f
......@@ -7,8 +7,8 @@ build-c7:
stage: build
script: scripts/make.sh release
build-fc28:
image: obuser/salsa-fedora-28
build-fc29:
image: obuser/salsa-fedora-29
stage: build
script: scripts/make.sh release
......
......@@ -45,8 +45,8 @@ if (CMAKE_BUILD_TYPE MATCHES "^[Rr]elease" OR CMAKE_BUILD_TYPE MATCHES "^[Rr]elW
# Ignore certain ((sometimes) unavoidable) warnings
add_compile_options("-Wno-padded")
add_compile_options("-Wno-exit-time-destructors")
add_compile_options("-Wno-global-constructors")
#add_compile_options("-Wno-exit-time-destructors")
#add_compile_options("-Wno-global-constructors")
if (NOT DEFINED WARN_EVERYTHING)
set (WARN_EVERYTHING YES) # Enabling privileges of Clang masterrace
......
......@@ -2,7 +2,7 @@
namespace Salsa
{
std::sig_atomic_t Actor::_gInterrupted = 0;
std::sig_atomic_t Actor::msInterrupted = 0;
Actor::Actor()
: Object()
......@@ -25,7 +25,7 @@ void Actor::signalHandler(int signalNumber)
///
interrupted(signalNumber);
SPD_DEBUG("Interrupted with signal : {}", _gInterrupted);
SPD_DEBUG("Interrupted with signal [{}]", msInterrupted);
SPD_INFO("Shutting down...");
}
......
......@@ -36,20 +36,20 @@ public:
/// Returns if salsa is interrupted
static std::sig_atomic_t interrupted()
{
return _gInterrupted;
return msInterrupted;
}
/// Setter salsa interruption
static void interrupted(int interrupted)
static void interrupted(std::sig_atomic_t sig)
{
_gInterrupted = interrupted;
msInterrupted = sig;
}
/// Setter salsa interruption
static void signalHandler(int signalNumber);
private:
static std::sig_atomic_t _gInterrupted; ///< flag if salsa is interrupted
static std::sig_atomic_t msInterrupted; ///< flag if salsa is interrupted
};
} // namespace Salsa
set(PACKAGE SalsaBase)
file(GLOB SRCS "*.cxx")
file(GLOB SRCS "*.cc")
set (MY_INCLUDE_DIRS
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
......@@ -17,7 +17,7 @@ set (MY_LINK_DIRS
include_directories( ${MY_INCLUDE_DIRS} )
link_directories( "${MY_LINK_DIRS}")
add_library (${PACKAGE} ${SRCS} ${PROTO_SRCS} ${PROTO_HDRS})
add_library (${PACKAGE} ${SRCS} ${PROTO_SRCS} ${PROTO_HDRS})
target_link_libraries (${PACKAGE} ${JSONCPP_LIBRARIES} ${PROTOBUF_LIBRARIES} SalsaProto)
file(GLOB HDRS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.hh" )
......
......@@ -2,27 +2,38 @@
namespace Salsa
{
uint64_t Log::id_ = 0;
uint64_t Log::msID = 0;
Log::Log()
: mName(std::to_string(msID++))
{
}
Log::~Log()
{
}
int Log::add(std::string where)
{
if (where == "console" || where == "")
{
// Console (STD out/err)
sinks.push_back(std::make_shared<spdlog::sinks::stdout_sink_st>());
mSinks.push_back(std::make_shared<spdlog::sinks::stdout_sink_st>());
}
else if (where.find("file://") == 0)
{
// Simple file
// auto logger = spdlog::basic_logger_mt("mylogger", "log.txt");
sinks.push_back(
mSinks.push_back(
std::make_shared<spdlog::sinks::basic_file_sink_mt>(where.substr(7).c_str(), true));
}
else if (where.find("zmq://") == 0)
{
// TODO implement ZMQ to SPDLog
;
}
else
{
// throw std::runtime_error("Specified sink not found! sink: [" + where + "]");
return 1;
}
return 0;
......@@ -31,24 +42,27 @@ int Log::add(std::string where)
int Log::create()
{
// Sanity check
if (target == nullptr)
// We could remove this to "reset" the logger, but there may be some risks to this
if (mTarget == nullptr)
{
// Create shared logger
// TODO find a way to drop sink?
if (id == "")
if (mName == "")
{
id = fmt::format("salsa-runlog-{}", id_);
mName = fmt::format("salsa-runlog-{}", msID);
}
target = std::make_shared<spdlog::logger>(id.c_str(), sinks.begin(), sinks.end());
target->set_pattern("%v[[--ENDL--]]");
mTarget = std::make_shared<spdlog::logger>(mName.c_str(), mSinks.begin(), mSinks.end());
mTarget->set_pattern("%v[[--ENDL--]]");
// mTarget->set_formatter(std::make_shared<spdlog::pattern_formatter>(
// logFormat, spdlog::pattern_time_type::local, ""));
}
return 0;
}
int Log::write(char * msg)
int Log::write(char const * msg)
{
// VERY self-explanatory
target->info("{}", msg);
mTarget->info("{}", msg);
return 0;
}
} // namespace Salsa
......@@ -18,52 +18,59 @@ namespace Salsa
class Log
{
public:
Log()
: id(std::to_string(id_++))
Log();
~Log();
/// Add output sink (file, console, zmq) for SPDLOG
int add(std::string);
/// Set name of job (only used for spdlog logger identification)
void name(char const * newName)
{
mName = newName;
}
~Log()
/// Set name of job (only used for spdlog logger identification)
void name(std::string newName)
{
mName = newName;
}
/// Add output sink (file, console, zmq) for SPDLOG
int add(std::string);
/// Set name of job (only used for spdlog logger identification)
void setName(char const * nID)
/// Get name of job (only used for spdlog logger identification)
std::string name() const
{
id = nID;
return mName;
}
/// Create SPDLOG loger
int create();
/// Write to logger
int write(char *);
int write(char const *);
/// Get SPDLOG logger handle
std::shared_ptr<spdlog::logger> & spd()
std::shared_ptr<spdlog::logger> spd()
{
return target;
return mTarget;
}
/// Get info about sinks
int empty()
{
return sinks.empty();
return mSinks.empty();
}
/// Set FD of pipe to watch
void set_fd(int newfd)
void fd(int newFD)
{
fd = newfd;
mFD = newFD;
}
/// Get FD of currently watched pipe
int get_fd() const
int fd() const
{
return fd;
return mFD;
}
private:
int fd = -1; ///< FD of current pipe
static uint64_t id_; ///< Static Job ID (holds index)
std::string id = nullptr; ///< ID (name) of current job)
std::vector<spdlog::sink_ptr> sinks; ///< Sinks for SPDLOG
std::shared_ptr<spdlog::logger> target = nullptr; ///< SPDLOG logger handle
int mFD = -1; ///< FD of current pipe
static uint64_t msID; ///< Static Job newName (holds index)
std::string mName = nullptr; ///< newName (name) of current job)
std::vector<spdlog::sink_ptr> mSinks; ///< Sinks for SPDLOG
std::shared_ptr<spdlog::logger> mTarget = nullptr; ///< SPDLOG logger handle
};
} // namespace Salsa
......@@ -17,7 +17,7 @@ class Message : public Object
{
public:
/// Node event type
enum ENodeEvent
enum EventType
{
UNKNOWN = 0,
ENTER,
......@@ -32,7 +32,7 @@ public:
virtual void print() const = 0;
/// Returns node event type
virtual ENodeEvent nodeEvent() const = 0;
virtual EventType event() const = 0;
/// Returns node uuid
virtual std::string uuid() const = 0;
......
......@@ -3,34 +3,28 @@
#include <iostream>
namespace Salsa
{
Node::Node(std::string name, std::string uuid)
: _name(name)
, _uuid(uuid)
Node::Node(std::string newName, std::string uuid)
: mName(newName)
, mUUID(uuid)
{
///
/// Constructor
///
SPD_DEBUG("Constructing node name [{}] UUID [{}]", name, uuid);
SPD_DEBUG("Constructing node name [{}] UUID [{}]", mName, mUUID);
}
Node::~Node()
{
///
/// Destructor
///
for (auto n : _nodes)
{
if (!dynamic_cast<Actor *>(n))
{
delete n;
}
}
_nodes.clear();
SPD_TRACE("### Destroy Node [{}] ###", mName);
for (auto p : _publishers)
for (auto & p : mPublishers)
{
p->publish(name(), "");
delete p;
p->publish(name(), ""); // Eh, why?
// delete p;
}
}
......@@ -46,7 +40,7 @@ void Node::json(Json::Value & /*root*/)
//
// And that, kids, is how I made a function meaningless with mere 4 chars...
/*
for (auto n : _nodes)
for (auto n : mChildNodes)
{
spdlog::get("console")->debug("Node::json() : name={} uuid={}", n->name(), n->uuid());
......@@ -87,7 +81,7 @@ void Node::json(Json::Value & /*root*/)
}
*/
// if (spdlog::get("console")->level() < 2 && !_parent)
// if (spdlog::get("console")->level() < 2 && !mpParent)
// std::cout << root << std::endl;
}
......@@ -96,47 +90,67 @@ void Node::print() const
///
/// Prints node info
///
SPD_DEBUG("Node::print() : name={} nodes={} publishers={} this={} parent={}",
_name,
_nodes.size(),
_publishers.size(),
std::shared_ptr<Node> pParent = nullptr;
try
{
pParent = static_cast<std::shared_ptr<Node>>(mpParent);
}
catch (std::bad_weak_ptr &)
{
}
SPD_DEBUG("Node::print() : name [{}] nodes [{}] publishers [{}] this [{}] parent [{}]",
mName,
mChildNodes.size(),
mPublishers.size(),
reinterpret_cast<void const *>(this),
static_cast<void *>(_parent));
static_cast<void *>(pParent.get()));
for (auto n : _nodes)
n->print();
for (auto const & node : mChildNodes)
{
node->print();
}
}
Node * Node::find(std::string name) const
std::shared_ptr<Node> Node::find(std::string whatName) const
{
///
/// Returns node by name
///
for (auto n : _nodes)
// We should start using algos...
// But that is really not necessary for like 10-20 nodes
for (auto node : mChildNodes)
{
if (!name.compare(n->name()))
return n;
if (name() == whatName)
{
return node;
}
}
return nullptr;
}
void Node::removeByUuid(std::string uuid)
void Node::removeByUuid(std::string whatUUID)
{
///
/// Removes node by uuid
///
int i = 0;
for (auto n : _nodes)
// I'm sure there's a more elegant way
// I don't have internet rn tho, so
// TODO
int iNode = 0;
for (auto node : mChildNodes)
{
if (!uuid.compare(n->uuid()))
if (whatUUID == node->uuid())
{
delete n;
_nodes.erase(_nodes.begin() + i);
// delete n; // Unnecessary. Smart pointers FTW
// Also, one very important concept: Delete only what you create
mChildNodes.erase(mChildNodes.begin() + iNode);
}
i++;
iNode++;
}
}
......@@ -146,33 +160,46 @@ void Node::publish()
/// Publish network status
///
Node * p = this;
while (p->parent())
// Get to highest node in stack?
// TODO @mvala explanation?
Node * pNode = this;
while (true)
{
p = p->parent();
}
try
{
if (!p)
return;
std::shared_ptr<Node> pParent = //
static_cast<std::shared_ptr<Node>>(pNode->parent());
SPD_DEBUG("Node::publish() Publishing from {} nPublishers={}", p->name(), _publishers.size());
pNode = pParent.get();
}
catch (std::bad_weak_ptr &)
{
break;
}
}
if (p->publishers().empty())
if (pNode->publishers().empty())
{
SPD_DEBUG("Node::publish() No publisher defined !!! Exiting ...");
SPD_DEBUG("Node::publish() No publisher defined! Aborting publish()");
return;
}
Json::Value json;
p->json(json);
SPD_DEBUG(
"Node::publish() Publishing from [{}] nPublishers [{}]", pNode->name(), mPublishers.size());
Json::Value jsonData;
pNode->json(jsonData);
Json::StreamWriterBuilder builder;
builder.settings_["indentation"] = "";
std::string out = Json::writeString(builder, json);
for (auto pub : p->publishers())
std::string outStr = Json::writeString(builder, jsonData);
for (auto const & pub : pNode->publishers())
{
SPD_DEBUG("Node::publish() name={} data={}", p->name(), out);
pub->publish(p->name(), out);
SPD_DEBUG("Node::publish() name [{}] data [{}]", pNode->name(), outStr);
pub->publish(pNode->name(), outStr);
}
}
......
#pragma once
// TODO? #include <Object.hh>
#include <json/json.h>
#include <memory>
#include <salsa.hh>
#include <string>
......@@ -17,7 +19,7 @@ namespace Salsa
/// \author Branislav Beke <beke.public@yandex.com>
///
class Node
class Node : public std::enable_shared_from_this<Node> // TODO ? public Object
{
public:
Node(std::string name = "", std::string uuid = "");
......@@ -30,67 +32,68 @@ public:
/// Rerurns node name
std::string name() const
{
return _name;
return mName;
}
/// Rerurns node uuid
std::string uuid() const
{
return _uuid;
return mUUID;
}
/// Returns parent node
Node * parent() const
std::weak_ptr<Node> parent() const
{
return _parent;
return mpParent;
}
/// Returns nodes
std::vector<Node *> nodes() const
std::vector<std::shared_ptr<Node>> nodes() const
{
return _nodes;
return mChildNodes;
}
/// Sets node name
void name(std::string n)
{
_name = n;
mName = n;
}
/// Sets node uuid
void uuid(std::string uuid)
{
_uuid = uuid;
mUUID = uuid;
}
/// Sets parent
void parent(Node * node)
void parent(std::weak_ptr<Node> node)
{
_parent = node;
mpParent = node;
}
/// Adds node to the list of nodes
void add(Node * node)
void add(std::shared_ptr<Node> node)
{
_nodes.push_back(node);
mChildNodes.push_back(node);
node->parent(shared_from_this());
}
/// Find node by name
Node * find(std::string name) const;
std::shared_ptr<Node> find(std::string name) const;
/// Remove node by uuid
void removeByUuid(std::string uuid);
/// Adds publisher to the node
void add(Publisher * p)
void add(std::shared_ptr<Publisher> p)
{
_publishers.push_back(p);
mPublishers.push_back(p);
}
/// Returns publishers
std::vector<Publisher *> publishers() const
std::vector<std::shared_ptr<Publisher>> publishers() const
{
return _publishers;
return mPublishers;
}
protected:
std::string _name = ""; ///< Node name
std::string _uuid = ""; ///< Node uuid
Node * _parent = nullptr; ///< Parent node
std::vector<Node *> _nodes = {}; ///< List of nodes
std::vector<Publisher *> _publishers = {}; ///< List of publishers
std::string mName = ""; ///< Node name
std::string mUUID = ""; ///< Node uuid
std::weak_ptr<Node> mpParent; ///< Parent node
std::vector<std::shared_ptr<Node>> mChildNodes = {}; ///< List of nodes
std::vector<std::shared_ptr<Publisher>> mPublishers = {}; ///< List of publishers
};
} // namespace Salsa
......@@ -14,5 +14,5 @@ Object::~Object()
///
}
std::shared_ptr<spdlog::logger> Object::consoleLogger = spdlog::stdout_color_mt("salsa");
std::shared_ptr<spdlog::logger> Object::mpConsoleLogger = spdlog::stdout_color_mt("salsa");
} // namespace Salsa
......@@ -23,16 +23,16 @@ public:
static std::shared_ptr<spdlog::logger> get_console_output()
{
// This method is inline, so compiler gets hinted very strongly to inline it.
return consoleLogger;
return mpConsoleLogger;
}
/// Sets console log level
static void set_console_level(spdlog::level::level_enum level)
{
consoleLogger->set_level(level);
mpConsoleLogger->set_level(level);
}
private:
static std::shared_ptr<spdlog::logger> consoleLogger; ///< Pointer to spd logger
static std::shared_ptr<spdlog::logger> mpConsoleLogger; ///< Pointer to spd logger
};
} // namespace Salsa
......@@ -7,17 +7,18 @@ namespace Salsa
{
Packetizer::Packetizer(EType type)
: Object()
, nodeType(type)
, mcNodeType(type)
{
switch (nodeType)
SPD_INFO("Packetizer - DISREGARD FOLLOWING");
switch (mcNodeType)
{
case MASTER:
case master:
SPD_INFO("Running Packetizer [type=MASTER]");
break;
case WORKER:
case worker:
SPD_INFO("Running Packetizer [type=WORKER]");
break;
case NONE:
case none:
SPD_INFO("No packetizer. You shouldn't see this.");
}
}
......@@ -32,35 +33,37 @@ Packetizer::~Packetizer()
void Packetizer::addNode(std::string node)
{
SPD_TRACE("--> Adding node [{}]", node);
nodes.push_back(node);
mNodes.push_back(node);
}
void Packetizer::removeNode(std::string node)
{
SPD_TRACE("--> Removing node [{}]", node);
nodes.erase(std::find(nodes.begin(), nodes.end(), node));
mNodes.erase(std::find(mNodes.begin(), mNodes.end(), node));
}
int Packetizer::addJob(Salsa::JobInfo command)
{
waiting.push(command);
mWaitingJobs.push(command);
return 0;
}
int Packetizer::finishJob(JobInfo job)
{
int i = 0;
for (auto & runjob : running)
// Highly ineffective, needs opt (but not right now...)
int iJob = 0;
for (auto & runjob : mRunningJobs)
{
if (runjob.clientid() == job.clientid() && runjob.jobid() == job.jobid() &&
runjob.taskid() == job.taskid())
{
running.erase(running.begin() + i);
finished.push_back(job);
mRunningJobs.erase(mRunningJobs.begin() + iJob);
mFinishedJobs.push_back(job);
break; // TODO maybe?
}
else
{
i++;
iJob++;
}
}
return 0;
......@@ -70,9 +73,9 @@ JobInfo Packetizer::getNextJob()
{
if (nextJobAvailable())
{
JobInfo firstInQ = waiting.front();
waiting.pop();
running.push_back(firstInQ);
JobInfo firstInQ = mWaitingJobs.front();
mWaitingJobs.pop();
mRunningJobs.push_back(firstInQ);
return firstInQ;
}
return JobInfo();
......@@ -80,20 +83,20 @@ JobInfo Packetizer::getNextJob()
bool Packetizer::nextJobAvailable()
{
return waiting.size() > 0;
return mWaitingJobs.size() > 0;
}
bool Packetizer::finishedJobAvailable()
{
return finished.size() > 0;
return mFinishedJobs.size() > 0;
}
JobInfo Packetizer::getFinishedJob()
{
if (finishedJobAvailable())
{
JobInfo ji = finished.back();
finished.pop_back();
JobInfo ji = mFinishedJobs.back();
mFinishedJobs.pop_back();
return ji;
}
......@@ -102,12 +105,12 @@ JobInfo Packetizer::getFinishedJob()
std::vector<std::string> Packetizer::getNodes()
{
return nodes;
return mNodes;
}
size_t Packetizer::getRunningCount()
{
return running.size();
return mRunningJobs.size();
}
} // namespace Salsa
......@@ -24,10 +24,11 @@ public:
/// Type of the packetizer (deprecated)