summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoredwin <edwin@99606f1f-bbb2-4562-8e6b-387ddef1dc61>2010-01-09 14:15:51 (GMT)
committer edwin <edwin@99606f1f-bbb2-4562-8e6b-387ddef1dc61>2010-01-09 14:15:51 (GMT)
commitff0d33071d3bdc77251c81ffec346129aa1fc76c (patch)
treee06c65feb468ca8cdfdd3e09ee22cecc7b2329c1
parent4fa34f9db503d278ef7cfa874fc73c20e915082a (diff)
improved shutdown procedures
git-svn-id: https://open.syn3.nl/syn3/svndav/default/trunk/projects/synapse@165 99606f1f-bbb2-4562-8e6b-387ddef1dc61
-rw-r--r--CMakeLists.txt5
-rw-r--r--ccallman.cpp25
-rw-r--r--ccallman.h3
-rw-r--r--cmessageman.cpp55
-rw-r--r--cmessageman.h8
-rw-r--r--cnetman.cpp25
-rw-r--r--cnetman.h9
-rw-r--r--csession.h9
-rw-r--r--cuserman.cpp16
-rw-r--r--cuserman.h4
-rw-r--r--modules/conn_json.module/module.cpp13
-rw-r--r--modules/core.module/module.cpp76
-rw-r--r--modules/lirc.module/module.cpp17
-rw-r--r--modules/net.module/module.cpp17
-rw-r--r--modules/test.module/module.cpp30
15 files changed, 257 insertions, 55 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3699d0e..9f9f75d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2,6 +2,7 @@ cmake_minimum_required(VERSION 2.6)
PROJECT(synapse)
+
#uncomment this for releases or debugs
#SET(CMAKE_BUILD_TYPE Release)
SET(CMAKE_BUILD_TYPE Debug)
@@ -10,7 +11,7 @@ SET(CMAKE_BUILD_TYPE Debug)
SET(CMAKE_VERBOSE_MAKEFILE ON)
#add definitions, compiler switches, etc.
-ADD_DEFINITIONS(-Wall -O2 -pthread)
+ADD_DEFINITIONS(-Wall -O2 -pthread -pg)
#list all source files here
file(GLOB sources *.cpp)
@@ -19,7 +20,7 @@ ADD_EXECUTABLE(synapse ${sources})
#need to link to some other libraries ? just add them here
-TARGET_LINK_LIBRARIES(synapse boost_thread-mt dl boost_regex-mt boost_system-mt)
+TARGET_LINK_LIBRARIES(synapse boost_thread-mt dl boost_regex-mt boost_system-mt -pg)
SUBDIRS(modules)
diff --git a/ccallman.cpp b/ccallman.cpp
index 50acd97..b99a513 100644
--- a/ccallman.cpp
+++ b/ccallman.cpp
@@ -75,14 +75,17 @@ void CcallMan::endCall(CcallList::iterator callI)
/*!
\fn CcallMan::print()
*/
-void CcallMan::print()
+void CcallMan::print(int verbose)
{
- DEB( statsTotal << " calls processed, " << callList.size() << " calls queued" );
+ if (verbose)
+ DEB( statsTotal << " calls processed, " << callList.size() << " calls queued" );
string status;
for (CcallList::iterator callI=callList.begin(); callI!=callList.end(); callI++)
{
if (callI->started)
status="RUNNING";
+ else if (!verbose)
+ continue;
else
status="QUEUED ";
@@ -125,4 +128,22 @@ bool CcallMan::interruptCall(string event, int src, int dst)
}
}
return false;
+}
+
+//interrupt all running calls (used for shutdown)
+bool CcallMan::interruptAll()
+{
+ for (CcallList::iterator callI=callList.begin(); callI!=callList.end(); callI++)
+ {
+ //send interrupt
+ if (callI->started && callI->threadPtr)
+ {
+ DEB("Interrupting call: " << callI->msg->event << " FROM " << callI->msg->src << " TO " <<
+ callI->dst->id << ":" << callI->dst->user->getName() << "@" << callI->dst->module->name
+ << callI->msg->getPrint(" |"));
+
+ callI->threadPtr->interrupt();
+ }
+ }
+ return false;
} \ No newline at end of file
diff --git a/ccallman.h b/ccallman.h
index b64efa5..4ff1700 100644
--- a/ccallman.h
+++ b/ccallman.h
@@ -30,7 +30,8 @@ public:
CcallList::iterator startCall(const CthreadPtr & threadPtr);
void endCall(CcallList::iterator callI);
bool interruptCall(string event, int src, int dst);
- void print();
+ bool interruptAll();
+ void print(int verbose=1);
int statsTotal;
CcallList callList;
diff --git a/cmessageman.cpp b/cmessageman.cpp
index 5560fa5..d8fba71 100644
--- a/cmessageman.cpp
+++ b/cmessageman.cpp
@@ -56,11 +56,23 @@ bool CmessageMan::sendMessage(const CmodulePtr &module, const CmsgPtr & msg)
// -our job is to verify if everything is ok and populate the call queue
// -internally the core only works with smartpointers, so most stuff thats not in msg will be a smartpointer.
+ if (shutdown)
+ return false;
//no src session specified means use default session of module:
//NOTE: this is the only case where modify the actual msg object.
if (!msg->src)
- msg->src=module->defaultSessionId;
+ {
+ if (module->defaultSessionId!=SESSION_DISABLED)
+ {
+ msg->src=module->defaultSessionId;
+ }
+ else
+ {
+ ERROR("send: module " << module->name << " want to send " << msg->event << " from its default session, but is doesnt have one." );
+ return false;
+ }
+ }
//resolve source session id to session a pointer
@@ -70,7 +82,7 @@ bool CmessageMan::sendMessage(const CmodulePtr &module, const CmsgPtr & msg)
if (!src)
{
//not found. we cant send an error back yet, so just return false
- ERROR("send: in module " << module->name << " session " << msg->src << " does not exist");
+ ERROR("send: module " << module->name << " want to send " << msg->event << " from non-existing session " << msg->src );
return false;
}
@@ -78,7 +90,7 @@ bool CmessageMan::sendMessage(const CmodulePtr &module, const CmsgPtr & msg)
if (src->module!=module)
{
//module is not the session owner. we cant send an error back yet, so just return false
- ERROR("send: module " << module->name << " does not own session " << msg->src );
+ ERROR("send: module " << module->name << " wants to send " << msg->event << " from session " << msg->src << ", but isnt the owner of this session.");
return false;
}
@@ -240,7 +252,7 @@ void CmessageMan::operator()()
}
//get next call
- while ((callI=callMan.startCall(threadPtr)) == CcallList::iterator() || shutdown)
+ while ((callI=callMan.startCall(threadPtr)) == CcallList::iterator())
{
//no call ready...
//indicate we're idle
@@ -329,14 +341,13 @@ void CmessageMan::activeThread()
/*!
- \fn CmessageMan::endThread()
called when thread is ready and does nothing
*/
bool CmessageMan::idleThread()
{
activeThreads--;
//we want less threads? let this one die by returning false
- if (wantCurrentThreads<currentThreads || shutdown)
+ if (wantCurrentThreads<currentThreads)
{
currentThreads--;
return false;
@@ -351,8 +362,6 @@ bool CmessageMan::idleThread()
*/
void CmessageMan::checkThread()
{
- if (shutdown)
- return;
//if all threads are active, indicate that we want one want one more
//(there always should be a least one idle thread)
@@ -382,18 +391,6 @@ void CmessageMan::checkThread()
*/
int CmessageMan::run(string coreName, string moduleName)
{
-/* INFO("test");
- Cvar v;
-
-// INFO("print" << v["geert"]["sub"]);
- v["geert"].clear();
- v="test";
- INFO(v.getPrint());
-
-
- INFO("k");
-return(1);
-*/
//load the first module as user core UNLOCKED!
loadModule(coreName, "core");
this->firstModuleName=moduleName;
@@ -402,7 +399,7 @@ return(1);
checkThread();
//thread manager loop
- while (1)
+ while (! (shutdown && callMan.callList.empty() ))
{
sleep(10);
@@ -413,9 +410,6 @@ return(1);
callMan.print();
userMan.print();
- if (shutdown)
- break;
-
if (maxActiveThreads<wantCurrentThreads-1)
{
wantCurrentThreads--;
@@ -427,16 +421,18 @@ return(1);
}
}
+ //loop exits when shutdown=true and callist is empty.
//shutdown loop
+ wantCurrentThreads=0;
while(1)
{
-
{
lock_guard<mutex> lock(threadMutex);
if (currentThreads)
{
INFO("shutting down - waiting for threads to end:" << currentThreads);
+ //send all running calls an interrupt
threadCond.notify_all();
}
else
@@ -463,6 +459,12 @@ CsessionPtr CmessageMan::loadModule(string path, string userName)
{
CmodulePtr module(new Cmodule);
+ if (shutdown)
+ {
+ ERROR("Shutting down, cant load new module: " << path);
+ return (CsessionPtr());
+ }
+
//modules get unloaded automaticly when the module-object is deleted:
if (module->load(path))
{
@@ -541,7 +543,8 @@ bool CmessageMan::isModuleReady(string path)
void CmessageMan::doShutdown(int exit=0)
{
WARNING("Shutdown requested, exit code="<<exit);
- shutdown=1;
+ shutdown=true;
+ userMan.doShutdown();
this->exit=exit;
threadCond.notify_all();
}
diff --git a/cmessageman.h b/cmessageman.h
index 5e7adf7..43afe65 100644
--- a/cmessageman.h
+++ b/cmessageman.h
@@ -46,10 +46,14 @@ public:
CmessageMan();
~CmessageMan();
- bool sendMessage(const CmodulePtr & modulePtr, const CmsgPtr & msg);
- //these are the threads:
+
+ //these 2 are 'the' threads, and do their own locking:
void operator()();
int run(string coreName,string moduleName);
+
+ //the rest is not thread safe, so callers are responsible for locking:
+ bool sendMessage(const CmodulePtr & modulePtr, const CmsgPtr & msg);
+
void checkThread();
CsessionPtr loadModule(string path, string userName);
CeventPtr getEvent(const string & name);
diff --git a/cnetman.cpp b/cnetman.cpp
index a135ed8..2158e8b 100644
--- a/cnetman.cpp
+++ b/cnetman.cpp
@@ -152,7 +152,7 @@ bool CnetMan::doDisconnect(int id)
lock_guard<mutex> lock(threadMutex);
if (nets.find(id)==nets.end())
{
- ERROR("id " << id << " does not exist, ignoring disconnect request");
+ DEB("id " << id << " does not exist, ignoring disconnect request");
return false;
}
nets[id]->doDisconnect();
@@ -175,6 +175,28 @@ bool CnetMan::doWrite(int id, string & data)
}
+void CnetMan::doShutdown()
+{
+ {
+ lock_guard<mutex> lock(threadMutex);
+ //close all ports
+ DEB("Closing all open ports");
+ for (CacceptorMap::iterator acceptorI=acceptors.begin(); acceptorI!=acceptors.end(); acceptorI++)
+ {
+ acceptorI->second->get_io_service().post(bind(&CnetMan::closeHandler,this,acceptorI->first));
+ }
+
+ //FIXME: we need a sleep here? since it takes a while to post and process the request, and in that time new connections could arrive?
+
+ //disconnect all connections
+ DEB("Disconneting all connections");
+ for (CnetMap::iterator netI=nets.begin(); netI!=nets.end(); netI++)
+ {
+ netI->second->doDisconnect();
+ }
+
+ }
+}
void CnetMan::listening(int port)
{
@@ -226,3 +248,4 @@ void CnetMan::read(int id, asio::streambuf &readBuffer, std::size_t bytesTransfe
DEB("Read data " << id << ":" << &readBuffer);
}
+
diff --git a/cnetman.h b/cnetman.h
index 8f74944..5778784 100644
--- a/cnetman.h
+++ b/cnetman.h
@@ -46,10 +46,15 @@ class CnetMan
//for both client and server:
bool doDisconnect(int id);
bool doWrite(int id, string & data);
+ void doShutdown();
private:
- map<int, CnetPtr> nets;
- map<int, CacceptorPtr> acceptors;
+ typedef map<int, CnetPtr> CnetMap;
+ CnetMap nets;
+
+ typedef map<int, CacceptorPtr> CacceptorMap;
+ CacceptorMap acceptors;
+
mutex threadMutex;
void closeHandler(int port);
diff --git a/csession.h b/csession.h
index 1e82ad8..6f60930 100644
--- a/csession.h
+++ b/csession.h
@@ -21,9 +21,16 @@ typedef shared_ptr<class Csession> CsessionPtr;
#include "cmodule.h"
#include "cuser.h"
-
+/*
+ disabled sessions have id SESSION_DISABLED (-1)
+ broadcasts go to sessions id 0
+ core always has session id 1
+ rest of the world has 2 and higher.
+*/
#define SESSION_DISABLED -1
+
+
/**
@author
*/
diff --git a/cuserman.cpp b/cuserman.cpp
index c38eb97..5f72694 100644
--- a/cuserman.cpp
+++ b/cuserman.cpp
@@ -22,6 +22,7 @@ CuserMan::CuserMan()
{
sessionCounter=0;
sessionMaxPerUser=1000;
+ shutdown=false;
addGroup(CgroupPtr(new Cgroup("core")));
addGroup(CgroupPtr(new Cgroup("modules")));
@@ -141,9 +142,15 @@ CgroupPtr CuserMan::getGroup(const string &groupName)
*/
int CuserMan::addSession( CsessionPtr session)
{
+ if (shutdown)
+ {
+ ERROR("Shutting down, cant add new session.");
+ return (SESSION_DISABLED);
+ }
+
//too much for this user already?
int userSessions=0;
- for (int sessionId=0; sessionId<MAX_SESSIONS; sessionId++)
+ for (int sessionId=1; sessionId<MAX_SESSIONS; sessionId++)
{
CsessionPtr chkSession;
chkSession=getSession(sessionId);
@@ -214,7 +221,7 @@ bool CuserMan::delSession(const int sessionId)
}
//reset shared ptr.
//as soon as nobody uses the session object anymore it will be destroyed.
-//NIET: sessions[sessionId]->id=SESSION_DISABLED;
+//NIET, is onhandig in de praktijk, ook bij shutdown: sessions[sessionId]->id=SESSION_DISABLED;
sessions[sessionId].reset();
return (true);
}
@@ -241,3 +248,8 @@ void CuserMan::print()
}
}
+
+void CuserMan::doShutdown()
+{
+ shutdown=true;
+} \ No newline at end of file
diff --git a/cuserman.h b/cuserman.h
index d2410f0..ad131c5 100644
--- a/cuserman.h
+++ b/cuserman.h
@@ -39,13 +39,15 @@ public:
CsessionPtr getSession(const int & sessionId);
bool delSession(const int id);
void print();
+ void doShutdown();
private:
+ bool shutdown;
list<CuserPtr> users;
list<CgroupPtr> groups;
//performance: we use an oldskool array, so session lookups are quick
int sessionCounter;
- CsessionPtr sessions[MAX_SESSIONS];
+ CsessionPtr sessions[MAX_SESSIONS+1];
int sessionMaxPerUser;
};
diff --git a/modules/conn_json.module/module.cpp b/modules/conn_json.module/module.cpp
index 7ac6188..3eb266a 100644
--- a/modules/conn_json.module/module.cpp
+++ b/modules/conn_json.module/module.cpp
@@ -82,11 +82,14 @@ SYNAPSE_HANDLER(all)
jsonMsg.push_back(msg.dst);
jsonMsg.push_back(msg.event);
- //convert the parameters
- Value jsonPars;
- Cvar2Value(msg,jsonPars);
-
- jsonMsg.push_back(jsonPars);
+ //convert the parameters, if any
+ if (!msg.isEmpty())
+ {
+ Value jsonPars;
+ Cvar2Value(msg,jsonPars);
+
+ jsonMsg.push_back(jsonPars);
+ }
INFO("json: " << write( jsonMsg));
}
diff --git a/modules/core.module/module.cpp b/modules/core.module/module.cpp
index 5bbb4c4..9ff0e84 100644
--- a/modules/core.module/module.cpp
+++ b/modules/core.module/module.cpp
@@ -78,6 +78,13 @@ SYNAPSE_REGISTER(module_Init)
out["recvGroup"]="core";
out.send();
+ out.clear();
+ out.event="core_ChangeEvent";
+ out["event"]="module_Shutdown";
+ out["modifyGroup"]="core";
+ out["sendGroup"]="core";
+ out["recvGroup"]="modules";
+ out.send();
/// core_Login
out.clear();
@@ -440,6 +447,65 @@ SYNAPSE_REGISTER(core_NewSession)
}
+SYNAPSE_REGISTER(core_Shutdown)
+{
+ //this ends all sessions, so that all modules are eventually unloaded and the core shuts down.
+
+ Cmsg endmsg;
+ lock_guard<mutex> lock(messageMan->threadMutex);
+
+ WARNING("Shutdown requested, ending all sessions.");
+
+ //first tell all the modules we want them to shut down
+ endmsg.clear();
+ endmsg.event="module_Shutdown";
+ endmsg.dst=0;
+ //use lowlevel sendMessage, since endmsg.send would deadlock
+ messageMan->sendMessage((CmodulePtr)module,CmsgPtr(new Cmsg(endmsg)));
+
+
+ //now tell all the sessions they are ended, and actually delete them
+ //do this PER session.
+ for (int sessionId=2; sessionId<MAX_SESSIONS; sessionId++)
+ {
+
+ CsessionPtr session=messageMan->userMan.getSession(sessionId);
+ if (session)
+ {
+ //send endmessage to the (still existing ) session:
+ endmsg.clear();
+ endmsg.event="module_SessionEnd";
+ endmsg.dst=sessionId;
+ //use lowlevel sendMessage, since endmsg.send would deadlock
+ messageMan->sendMessage((CmodulePtr)module,CmsgPtr(new Cmsg(endmsg)));
+
+ //inform everyone the session has ended
+ endmsg.clear();
+ endmsg.event="module_SessionEnded";
+ endmsg["session"]=sessionId;
+ endmsg.dst=0;
+ messageMan->sendMessage((CmodulePtr)module,CmsgPtr(new Cmsg(endmsg)));
+
+ //now actually delete the session
+ //Csession object stays intact as long as there are shared_ptr's referring to it from the call queue
+ if (!messageMan->userMan.delSession(sessionId))
+ ERROR("cant delete session" << sessionId);
+
+ //when the last session for a module is gone the module is unloaded.
+ //when the last module is unloaded the program shuts down.
+ }
+ }
+
+ //delete the core session, so nobody can do any corestuff from now on
+ messageMan->userMan.delSession(1);
+
+ //make the shutdown flag true:
+ //-this prevents new sessions from being created and modules from being loaded.
+ //-also all calls to sendMessage will be silently ignored.
+ messageMan->doShutdown(msg["exit"]);
+}
+
+
SYNAPSE_REGISTER(core_Logout)
{
string error;
@@ -460,6 +526,7 @@ SYNAPSE_REGISTER(core_Logout)
endmsg.dst=msg.src;
endmsg.send();
+ //inform the rest of the world
endmsg.event="module_SessionEnded";
endmsg["session"]=endmsg.dst;
endmsg.dst=0;
@@ -469,7 +536,7 @@ SYNAPSE_REGISTER(core_Logout)
{
lock_guard<mutex> lock(messageMan->threadMutex);
if (!messageMan->userMan.delSession(msg.src))
- ERROR("cant delete session");
+ ERROR("cant delete session" << msg.src);
}
}
}
@@ -562,13 +629,6 @@ SYNAPSE_REGISTER(core_Interrupt)
out.send();
}
-SYNAPSE_REGISTER(core_Shutdown)
-{
- {
- lock_guard<mutex> lock(messageMan->threadMutex);
- messageMan->doShutdown((int)msg["exit"]);
- }
-}
SYNAPSE_REGISTER(core_ChangeLogging)
{
diff --git a/modules/lirc.module/module.cpp b/modules/lirc.module/module.cpp
index 38c5684..677a617 100644
--- a/modules/lirc.module/module.cpp
+++ b/modules/lirc.module/module.cpp
@@ -96,3 +96,20 @@ SYNAPSE_REGISTER(lirc_Disconnect)
net.doDisconnect(msg.src);
}
+/** When a session ends, make sure the corresponding network connection is disconnected as well.
+ *
+ */
+SYNAPSE_REGISTER(module_SessionEnded)
+{
+ net.doDisconnect(msg["session"]);
+}
+
+/** Called when synapse whats the module to shutdown completely
+ * This makes sure that all ports and network connections are closed, so there wont be any 'hanging' threads left.
+ * If you care about data-ordering, send this to session-id that sended you the net_Connected.
+ */
+SYNAPSE_REGISTER(module_Shutdown)
+{
+ //let the net module shut down to fix the rest
+ net.doShutdown();
+}
diff --git a/modules/net.module/module.cpp b/modules/net.module/module.cpp
index 058e9f9..4348485 100644
--- a/modules/net.module/module.cpp
+++ b/modules/net.module/module.cpp
@@ -150,6 +150,7 @@ SYNAPSE_REGISTER(net_Close)
net.doClose(msg["port"]);
}
+
/** Disconnections the connection related to src
*/
SYNAPSE_REGISTER(net_Disconnect)
@@ -167,5 +168,21 @@ SYNAPSE_REGISTER(net_Write)
net.doWrite(msg.src, msg["data"]);
}
+/** When a session ends, make sure the corresponding network connection is disconnected as well.
+ *
+ */
+SYNAPSE_REGISTER(module_SessionEnded)
+{
+ net.doDisconnect(msg["session"]);
+}
+/** Called when synapse whats the module to shutdown completely
+ * This makes sure that all ports and network connections are closed, so there wont be any 'hanging' threads left.
+ * If you care about data-ordering, send this to session-id that sended you the net_Connected.
+ */
+SYNAPSE_REGISTER(module_Shutdown)
+{
+ //let the net module shut down to fix the rest
+ net.doShutdown();
+}
diff --git a/modules/test.module/module.cpp b/modules/test.module/module.cpp
index 6f61ae0..5c2fd1d 100644
--- a/modules/test.module/module.cpp
+++ b/modules/test.module/module.cpp
@@ -34,19 +34,45 @@ SYNAPSE_REGISTER(module_Init)
out["path"]="modules/conn_json.module/libconn_json.so";
out.send();
+// out.clear();
+// out.event="core_ChangeLogging";
+// out["logSends"]=0;
+// out["logReceives"]=0;
+// out.send();
- while(1)
+// out.clear();
+// out.event="loop";
+// out["bla"]="loopback test";
+// out.dst=msg.dst;
+// out.send();
+
+
+ for (int bla=0; bla < 2; bla++)
{
out.clear();
out.event="test";
out.dst=0;
-out.clear();
+ out.clear();
out.send();
sleep(1);
}
+
+ out.event="core_Shutdown";
+ out.dst=0;
+ out.send();
}
+SYNAPSE_REGISTER(loop)
+{
+
+ Cmsg out;
+ out.clear();
+ out.event="loop";
+ out["bla"]="loopback test";
+ out.dst=msg.dst;
+ out.send();
+}
SYNAPSE_REGISTER(lirc_Ready)