blob: a8138028d3ce42ebf5a76eb83f9d24ea25da7414 [file] [log] [blame]
/* $Header: /usr/sctpCVS/rserpool/lib/distributor.c,v 1.1 2006-05-30 18:02:01 randall Exp $ */
/*
* Copyright (C) 2002 Cisco Systems Inc,
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the project nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE PROJECT AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE PROJECT OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <distributor.h>
hashableHash *
create_hashableHash(int pid,char *string,int sz)
{
hashableHash *o;
o = calloc(1,sizeof(hashableHash));
if(o != NULL){
o->hash = HashedTbl_create(string,sz);
if(o->hash != NULL){
o->protocolId = pid;
}else{
free(o);
o = NULL;
}
}
return(o);
}
void
destroy_hashableHash(hashableHash *o)
{
HashedTbl_destroy(o->hash);
free(o);
}
hashableDlist *
create_hashdlist(int strm)
{
hashableDlist *o;
o = calloc(1,sizeof(hashableDlist));
if(o != NULL){
o->list = dlist_create();
if(o->list != NULL){
o->streamNo = strm;
}else{
free(o);
o = NULL;
}
}
return(o);
}
void
destroy_hashDlist(hashableDlist *o)
{
dlist_destroy(o->list);
free(o);
}
distributor *
createDistributor()
{
int ret,i;
distributor *o;
o = (distributor *)calloc(1,sizeof(distributor));
if(o == NULL)
return(NULL);
o->fdSubscribersList = dlist_create();
if(o->fdSubscribersList == NULL){
/* failed to create */
free(o);
return(NULL);
}
o->lazyClockTickList = dlist_create();
if(o->lazyClockTickList == NULL){
dlist_destroy(o->fdSubscribersList);
free(o);
return(NULL);
}
o->startStopList = dlist_create();
if(o->startStopList == NULL){
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
free(o);
return(NULL);
}
/* we specify 11 subscribers */
o->fdSubscribers = HashedTbl_create("File Descriptor Subscribers", 11);
if(o->fdSubscribers == NULL){
/* failed to create */
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
dlist_destroy(o->startStopList);
free(o);
return(NULL);
}
o->msgSubscriberList = dlist_create();
if(o->msgSubscriberList == NULL){
/* failed to create */
HashedTbl_destroy(o->fdSubscribers);
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
dlist_destroy(o->startStopList);
free(o);
return(NULL);
}
o->msgSubscribers = HashedTbl_create("Master Hash of Hashes", 11);
if(o->msgSubscribers == NULL){
/* failed to create */
dlist_destroy(o->msgSubscriberList);
HashedTbl_destroy(o->fdSubscribers);
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
dlist_destroy(o->startStopList);
free(o);
return(NULL);
}
o->msgSubZero = create_hashableHash(DIST_SCTP_PROTO_ID_DEFAULT,
"Master Hash of Zero PID",
11);
if(o->msgSubZero == NULL){
/* failed to create */
HashedTbl_destroy(o->msgSubscribers);
dlist_destroy(o->msgSubscriberList);
HashedTbl_destroy(o->fdSubscribers);
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
dlist_destroy(o->startStopList);
free(o);
return(NULL);
}
o->msgSubZeroNoStrm = create_hashdlist(DIST_STREAM_DEFAULT);
if(o->msgSubZeroNoStrm == NULL){
/* failed to create */
destroy_hashableHash(o->msgSubZero);
HashedTbl_destroy(o->msgSubscribers);
dlist_destroy(o->msgSubscriberList);
HashedTbl_destroy(o->fdSubscribers);
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
dlist_destroy(o->startStopList);
free(o);
return(NULL);
}
o->numfds = DEFAULT_POLL_LIST_SZ;
o->numused = 0;
o->fdlist = (struct pollfd *)calloc(DEFAULT_POLL_LIST_SZ,sizeof(struct pollfd));
if(o->fdlist == NULL){
/* failed to create */
destroy_hashDlist(o->msgSubZeroNoStrm);
destroy_hashableHash(o->msgSubZero);
HashedTbl_destroy(o->msgSubscribers);
dlist_destroy(o->msgSubscriberList);
HashedTbl_destroy(o->fdSubscribers);
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
dlist_destroy(o->startStopList);
free(o);
return(NULL);
}
/* init fd structs */
for(i=0;i<o->numfds;i++){
o->fdlist[i].fd = -1;
o->fdlist[i].events = 0;
o->fdlist[i].revents = 0;
}
o->timerlist = dlist_create();
if(o->timerlist == NULL){
/* failed to create */
free(o->fdlist);
destroy_hashDlist(o->msgSubZeroNoStrm);
destroy_hashableHash(o->msgSubZero);
HashedTbl_destroy(o->msgSubscribers);
dlist_destroy(o->msgSubscriberList);
HashedTbl_destroy(o->fdSubscribers);
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
dlist_destroy(o->startStopList);
free(o);
return(NULL);
}
/* now that everything is created lets set some
* of the relationships up.
*/
/* we first insert into the msgSubZero Hash table the
* default stream list.
*/
ret = HashedTbl_enterKeyed(o->msgSubZero->hash,
o->msgSubZeroNoStrm->streamNo,
(void*)o->msgSubZeroNoStrm,
(void*)&o->msgSubZeroNoStrm->streamNo,
sizeof(int));
if(ret != LIB_STATUS_GOOD){
/* We have a problem. bail. */
dlist_destroy(o->timerlist);
free(o->fdlist);
destroy_hashDlist(o->msgSubZeroNoStrm);
destroy_hashableHash(o->msgSubZero);
HashedTbl_destroy(o->msgSubscribers);
dlist_destroy(o->msgSubscriberList);
HashedTbl_destroy(o->fdSubscribers);
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
dlist_destroy(o->startStopList);
free(o);
return(NULL);
}
/* Now we must put the default PID hash into
* the master hashtable.
*/
ret = HashedTbl_enterKeyed(o->msgSubscribers,
o->msgSubZero->protocolId,
(void*)o->msgSubZero,
(void*)&o->msgSubZero->protocolId,
sizeof(int));
if(ret != LIB_STATUS_GOOD){
/* We have a problem. bail. */
dlist_destroy(o->timerlist);
free(o->fdlist);
destroy_hashDlist(o->msgSubZeroNoStrm);
destroy_hashableHash(o->msgSubZero);
HashedTbl_destroy(o->msgSubscribers);
dlist_destroy(o->msgSubscriberList);
HashedTbl_destroy(o->fdSubscribers);
dlist_destroy(o->fdSubscribersList);
dlist_destroy(o->lazyClockTickList);
dlist_destroy(o->startStopList);
free(o);
return(NULL);
}
/* set the I am not done flag */
o->notdone = 1;
/* setup clock tick */
o->idleClockTick.tv_sec = DIST_IDLE_CLOCK_SEC;
o->idleClockTick.tv_usec = DIST_IDLE_CLOCK_USEC;
/* get the last known time setup */
gettimeofday(&o->lastKnownTime,(struct timezone *)NULL);
return(o);
}
void
destroyDistributor(distributor *o)
{
timerEntry *te;
fdEntry *fe;
hashableHash *hh;
hashableDlist *he;
msgConsumer *mc;
auditTick *at;
StartStop *sat;
void *keyP;
/* we must get each element out of the
* multi-keyed tables before destroying.
* since both hashable lists that we
* keep a pointer to for shortcuts in
* our main structure are in the individual lists
* and will be found that way there is no need
* to do a :
* destroy_hashDlist(o->msgSubZeroNoStrm);
* <or>
* destroy_hashableHash(o->msgSubZero);
*/
/* the list of timers first. */
dlist_reset(o->timerlist);
while((te = (timerEntry *)dlist_getNext(o->timerlist)) != NULL){
free(te);
}
dlist_destroy(o->timerlist);
/* now free audit tick list */
dlist_reset(o->lazyClockTickList);
while((at = (auditTick *)dlist_getNext(o->lazyClockTickList)) != NULL){
free(at);
}
dlist_destroy(o->lazyClockTickList);
/* now free the start/stop list */
dlist_reset(o->startStopList);
while((sat = (StartStop *)dlist_getNext(o->startStopList)) != NULL){
free(sat);
}
dlist_destroy(o->startStopList);
/* get rid of all the file descriptors */
free(o->fdlist);
/* here we dump the hash table since all entries
* are also maintained on the dlist too
*/
HashedTbl_destroy(o->fdSubscribers);
/* Now go through and free up all on our list */
dlist_reset(o->fdSubscribersList);
while((fe = (fdEntry *)dlist_getNext(o->fdSubscribersList)) != NULL){
free(fe);
}
dlist_destroy(o->fdSubscribersList);
/* Now we must go through the hash table
* of hashableHashes. And for each hashableHash
* pull all the hashableLists out. Each list is
* just destroyed since we will cleanup the underlying
* msgConsumers when we destroy the global msgSubscriberList.
*
*/
HashedTbl_rewind(o->msgSubscribers);
while((hh = (hashableHash *)HashedTbl_getNext(o->msgSubscribers,&keyP)) != NULL){
/* we have a hashable hash, clean it up */
HashedTbl_rewind(hh->hash);
while((he = (hashableDlist *)HashedTbl_getNext(hh->hash,&keyP)) != NULL){
/* we purge since list entries are all in msgSubscribers */
destroy_hashDlist(he);
}
destroy_hashableHash(hh);
}
HashedTbl_destroy(o->msgSubscribers);
/* Now lets go out and get the msg subscriber entries too. */
dlist_reset(o->msgSubscriberList);
while((mc = (msgConsumer *)dlist_getNext(o->msgSubscriberList)) != NULL){
free(mc);
}
dlist_destroy(o->msgSubscriberList);
/* ok free the base and we are clean */
free(o);
}
int
dist_TimerStart(distributor *o,timerFunc f, u_long a, u_long b, void *c, void *d)
{
int ret;
timerEntry *te,*pe;
/* allocate a new timer element */
te = calloc(1,sizeof(timerEntry));
if(te == NULL){
return(LIB_STATUS_BAD);
}
te->arg1 = c;
te->arg2 = d;
te->action = f;
te->tv_sec = a;
te->tv_usec = b;
#ifdef MINIMIZE_TIME_CALLS
te->started.tv_sec = o->lastKnownTime.tv_sec;
te->started.tv_usec = o->lastKnownTime.tv_usec;
#else
gettimeofday(&te->started,(struct timezone *)NULL);
/* update the time, which is used before we go
* back into the poll()/select().
*/
o->lastKnownTime.tv_sec = te->started.tv_sec;
o->lastKnownTime.tv_usec = te->started.tv_usec;
#endif
/* now setup the expire time */
te->expireTime.tv_sec = te->started.tv_sec + a;
te->expireTime.tv_usec = te->started.tv_usec + b;
while(te->expireTime.tv_usec > 1000000){
te->expireTime.tv_sec++;
te->expireTime.tv_usec -= 1000000;
}
/* put in a special precaution because I am
* paranoid. Set bad status in case my logic
* is wrong :/
*/
ret = LIB_STATUS_BAD;
/* carefully place it on the list */
dlist_reset(o->timerlist);
while((pe = (timerEntry *)dlist_get(o->timerlist)) != NULL){
if(pe->expireTime.tv_sec > te->expireTime.tv_sec){
/* this one expires after the new one */
ret = dlist_insertHere(o->timerlist,(void *)te);
break;
}else if(pe->expireTime.tv_sec == te->expireTime.tv_sec){
/* got to check usec's since equal seconds of expiration */
if(pe->expireTime.tv_usec > te->expireTime.tv_usec){
/* ok we are expiring some time sooner than this
* dude
*/
ret = dlist_insertHere(o->timerlist,(void *)te);
break;
}
}
/* otherwise we expire after this one and fall out
* hitting the next get.
*/
}
if(pe == NULL){
/* at the end of the list, and never got in append it. */
ret = dlist_append(o->timerlist,(void *)te);
}
dlist_reset(o->timerlist);
if(ret != LIB_STATUS_GOOD){
/* we could not insert it into the list */
/* clean up and fail */
free(te);
return(LIB_STATUS_BAD);
}
/* and now return */
return(LIB_STATUS_GOOD);
}
int
dist_TimerStop(distributor *o,timerFunc f, void *a, void *b)
{
/* find the first timer with matching function and args */
/* and remove it from the list */
timerEntry *pe,*pf;
dlist_reset(o->timerlist);
while((pe = (timerEntry *)dlist_get(o->timerlist)) != NULL){
if((pe->action == f) &&
(pe->arg1 == a) &&
(pe->arg2 == b)){
pf = dlist_getThis(o->timerlist);
if(pf != pe){
/* tragic list failure, here
* during debug only.
*/
abort();
return(LIB_STATUS_BAD1);
}else{
free(pe);
break;
}
}
}
if(pe == NULL){
/* never found it on the list */
return(LIB_STATUS_BAD);
}
return(LIB_STATUS_GOOD);
}
int
dist_addFd(distributor *o,int fd,serviceFunc fdNotify, int mask,void *arg)
{
fdEntry *fe;
int i,fnd,ret;
/* first allocate a fd service structure and init it */
fe = calloc(1,sizeof(fdEntry));
if(fe == NULL){
return(LIB_STATUS_BAD);
}
/* init it */
fe->onTheList = 0;
fe->fdItServices = fd;
fe->fdNotify = fdNotify;
fe->arg = arg;
/* now place it in the hash table and verify its unique */
ret = HashedTbl_enterKeyed(o->fdSubscribers,
fe->fdItServices,
(void*)fe,
(void*)&fe->fdItServices,
sizeof(int));
if(ret != LIB_STATUS_GOOD){
/* no good, probably already in */
free(fe);
return(ret);
}
/* now add it to the linked list */
ret = dlist_append(o->fdSubscribersList,(void *)fe);
if(ret != LIB_STATUS_GOOD){
/* now we must back it out of the hash table */
HashedTbl_removeKeyed(o->fdSubscribers,
fe->fdItServices,
(void*)&fe->fdItServices,
sizeof(int),
(void **)NULL);
free(fe);
return(ret);
}
/* now plop in a place in the fd select array */
/* first do we need to grow array? */
if((o->numused+1) >= o->numfds){
/* yes, this will push us to have none spare */
struct pollfd *tp;
tp = (struct pollfd *)calloc((o->numfds*2),sizeof(struct pollfd));
if(tp == NULL){
void *v;
/* can't get bigger we are out of here */
HashedTbl_removeKeyed(o->fdSubscribers,
fe->fdItServices,
(void*)&fe->fdItServices,
sizeof(int),
(void **)NULL);
/* push pointer so next get will get the last one */
dlist_getToTheEnd(o->fdSubscribersList);
/* do that so we can do a getThis() */
v = dlist_get(o->fdSubscribersList);
/* now pull it */
v = dlist_getThis(o->fdSubscribersList);
free(fe);
return(LIB_STATUS_BAD1);
}
memcpy(tp,o->fdlist,(sizeof(struct pollfd) * o->numfds));
/* init the rest of the struct */
for(i=o->numfds;i<(o->numfds*2);i++){
tp[i].fd = -1;
tp[i].events = 0;
tp[i].revents = 0;
}
/* adjust the size */
o->numfds *= 2;
/* free old list */
free(o->fdlist);
/* place in our new one */
o->fdlist = tp;
}
fnd = 0;
/* now lets find a entry and use it */
for(i=o->numused;i<o->numfds;i++){
if(o->fdlist[i].fd == -1){
/* found a empty spot */
o->fdlist[i].fd = fd;
o->fdlist[i].events = mask;
o->numused++;
fnd = 1;
break;
}
}
if(!fnd){
for(i=0;i<o->numused;i++){
if(o->fdlist[i].fd == -1){
/* found a empty spot */
o->fdlist[i].fd = fd;
o->fdlist[i].events = mask;
o->numused++;
fnd = 1;
break;
}
}
}
if(!fnd){
/* TSNH */
void *v;
HashedTbl_removeKeyed(o->fdSubscribers,
fe->fdItServices,
(void*)&fe->fdItServices,
sizeof(int),
(void **)NULL);
/* push pointer so next get will get the last one */
dlist_getToTheEnd(o->fdSubscribersList);
/* do that so we can do a getThis() */
v = dlist_get(o->fdSubscribersList);
/* now pull it */
v = dlist_getThis(o->fdSubscribersList);
free(fe);
return(LIB_STATUS_BAD);
}
/* now add it to the list of service functions */
return(LIB_STATUS_GOOD);
}
int
dist_deleteFd(distributor *o,int fd)
{
fdEntry *fe;
int i,removed;
/* Look up the fd, and remove */
removed = -1;
fe = HashedTbl_removeKeyed(o->fdSubscribers,
fd,
(void*)&fd,
sizeof(int),
(void **)NULL);
/* find it in the fdSubscribersList and remove */
dlist_reset(o->fdSubscribersList);
while((fe = (fdEntry *)dlist_get(o->fdSubscribersList)) != NULL){
if(fe->fdItServices == fd){
fe = dlist_getThis(o->fdSubscribersList);
}
}
/* remove it from the fdlist and subtract current used */
for(i=0;i<o->numfds;i++){
if(o->fdlist[i].fd == fd){
o->fdlist[i].fd = -1;
o->fdlist[i].events = 0;
o->fdlist[i].revents = 0;
removed = i;
o->numused--;
break;
}
}
/* now free the memory */
if(fe != NULL){
free(fe);
}
/* For netbsd we need to collapse the list */
if(removed != -1){
if(removed != o->numused){
/* if removed == o->numused
* we removed the last one, no
* action needed.
*/
o->fdlist[removed].fd = o->fdlist[o->numused].fd;
o->fdlist[removed].events = o->fdlist[o->numused].events;
o->fdlist[removed].revents = o->fdlist[o->numused].revents;
o->fdlist[o->numused].fd = -1;
o->fdlist[o->numused].events = 0;
o->fdlist[o->numused].revents = 0;
}
}
return(LIB_STATUS_GOOD);
}
int
dist_changeFd(distributor *o,int fd,int newmask)
{
int i;
for(i=0;i<o->numfds;i++){
if(o->fdlist[i].fd == fd){
o->fdlist[i].events = newmask;
break;
}
}
return(LIB_STATUS_GOOD);
}
int
__place_mc_in_dlist(dlist_t *dl,msgConsumer *mc)
{
msgConsumer *at;
dlist_reset(dl);
while((at = (msgConsumer *)dlist_get(dl)) != NULL){
/* if this one has a greater precedence our
* new one slips in ahead of the one we are
* looking at on the list
*/
if(at->precedence > mc->precedence){
return(dlist_insertHere(dl,(void *)mc));
}
}
/* place it at the end of the list */
return(dlist_append(dl,(void *)mc));
}
void
__pull_from_dlist(dlist_t *dl,msgConsumer *mc)
{
msgConsumer *at;
dlist_reset(dl);
while((at = (msgConsumer *)dlist_get(dl)) != NULL){
if(at == mc){
at = (msgConsumer *)dlist_getThis(dl);
break;
}
}
}
msgConsumer *
__pull_from_dlist2(dlist_t *dl,messageFunc mf,int sctp_proto, int stream_no,void *arg)
{
msgConsumer *at,*nat;
nat = NULL;
dlist_reset(dl);
while((at = (msgConsumer *)dlist_get(dl)) != NULL){
if((at->SCTPprotocolId == sctp_proto) &&
(at->SCTPstreamId == stream_no) &&
(at->msgNotify == mf) &&
(at->arg == arg)){
nat = (msgConsumer *)dlist_getThis(dl);
if(nat != at){
/* help */
abort();
}
break;
}
}
return(nat);
}
int
dist_msgSubscribe(distributor *o,messageFunc mf,int sctp_proto, int stream_no,int priority, void *arg)
{
int ret;
msgConsumer *mc;
hashableHash *hh;
HashedTbl *hofl;
hashableDlist *hdl;
/* Ok, first we look up the hashTable with the sctp_proto number
* If we don't find it we add a new hash table for this to
* the master hash table. Once we have the master
* hash table and our sub-hash table, we then lookup
* the stream_no to find the hashDlist structure.
* With that we add this guy to the list (after allocating a
* entry to pop on and fill it in). As a short cut we look to
* see if the two are the default values, if so dump it right
* in to our pre-stored list.
*/
/* first get a msgConsumer */
mc = (msgConsumer *)calloc(1,sizeof(msgConsumer));
if(mc == NULL){
/* no memory, out of here */
return(LIB_STATUS_BAD);
}
mc->precedence = priority;
mc->SCTPprotocolId = sctp_proto;
mc->SCTPstreamId = stream_no;
mc->msgNotify = mf;
mc->arg = arg;
/* now that we have our structure see if we take the
* short cut.
*/
if((sctp_proto == DIST_SCTP_PROTO_ID_DEFAULT) &&
(stream_no == DIST_STREAM_DEFAULT)){
ret = __place_mc_in_dlist(o->msgSubZeroNoStrm->list,mc);
if(ret != LIB_STATUS_GOOD){
free(mc);
}
return(ret);
}
/* ok if we reach here, we must put it on some
* other list in all of our hash tables. So much
* for the short cut :)
*/
/* first find the right hashableHash.
*/
hh = HashedTbl_lookupKeyed(o->msgSubscribers,
sctp_proto,
&sctp_proto,
sizeof(int),
(void **)NULL);
if(hh == NULL){
/* no hashable hash table exists for the protocol type */
/* we must create it */
char string[32];
sprintf(string,"protocolId:0x%x",sctp_proto);
hh = create_hashableHash(sctp_proto,string,11);
if(hh == NULL){
/* failed to alloc-create */
free(mc);
return(LIB_STATUS_BAD);
}
/* now we must enter it in the master table */
ret = HashedTbl_enterKeyed(o->msgSubscribers,hh->protocolId,(void *)hh,&hh->protocolId,sizeof(int));
if(ret != LIB_STATUS_GOOD){
/* something bad is happening */
free(mc);
destroy_hashableHash(hh);
return(LIB_STATUS_BAD);
}
/* ok, we now have the hasable hash in the table and hofl setup right */
hofl = hh->hash;
}else{
/* we found one */
hofl = hh->hash;
}
/* now lets find the low level hashDlist */
hdl = (hashableDlist *)HashedTbl_lookupKeyed(hofl,
stream_no,
&stream_no,
sizeof(int),
(void **)NULL);
if(hdl == NULL){
/* it does not exist so we must
* build one and dump it in the hofl table.
*/
hdl = create_hashdlist(stream_no);
if(hdl == NULL){
/* if we are in trouble, we don't kill the
* previous hash table we built since
* it may be used again when the memory problem
* is cleared.. of course we may not even
* have created it too.
*/
free(mc);
return(LIB_STATUS_BAD);
}
/* now add it to our hash table */
ret = HashedTbl_enterKeyed(hofl,hdl->streamNo,(void *)hdl,&hdl->streamNo,sizeof(int));
if(ret != LIB_STATUS_GOOD){
/* no way to add the dlist to the table.. we are out of here. */
free(mc);
destroy_hashDlist(hdl);
return(LIB_STATUS_BAD);
}
/* ok its in the table and now hdl points to the
* guy we need to add to.
*/
}
/* now lets place it on the global list */
ret = __place_mc_in_dlist(o->msgSubscriberList,mc);
if(ret != LIB_STATUS_GOOD){
free(mc);
return(ret);
}
/* now on the individual list */
ret = __place_mc_in_dlist(hdl->list,mc);
if(ret != LIB_STATUS_GOOD){
__pull_from_dlist(o->msgSubscriberList,mc);
free(mc);
}
return(ret);
}
int
dist_msgUnsubscribe(distributor *o,messageFunc mf,int sctp_proto, int stream_no,void *arg)
{
msgConsumer *mc,*mc2;
hashableHash *hh;
HashedTbl *hofl;
hashableDlist *hdl;
/* first lets pull it from the master table of whats in the dlist*/
mc2 = mc = __pull_from_dlist2(o->msgSubscriberList,mf,sctp_proto,stream_no,arg);
hh = HashedTbl_lookupKeyed(o->msgSubscribers,
sctp_proto,
&sctp_proto,
sizeof(int),
(void **)NULL);
if(hh != NULL){
hofl = hh->hash;
hdl = (hashableDlist *)HashedTbl_lookupKeyed(hofl,
stream_no,
&stream_no,
sizeof(int),
(void **)NULL);
if(hdl != NULL){
/* ok lets find the entry here */
mc2 = __pull_from_dlist2(hdl->list,mf,sctp_proto,stream_no,arg);
}
}
if(mc && mc2){
if(mc != mc2){
/* help! */
abort();
}
/* normal case */
free(mc);
}else if(mc){
free(mc);
}else if(mc2){
/* strange should have been in other list */
free(mc2);
}else{
return(LIB_STATUS_BAD);
}
return(LIB_STATUS_GOOD);
}
int
dist_setDone(distributor *o)
{
o->notdone = 0;
return(LIB_STATUS_GOOD);
}
void
__check_time_out_list(distributor *o)
{
/* some timeout has occured ? */
timerEntry *te;
dlist_dlink *dl,*tmpl;
tmpl = NULL;
/* first get the time */
gettimeofday(&o->lastKnownTime,(struct timezone *)NULL);
/* get each one and have a look */
dlist_reset(o->timerlist);
while((te = (timerEntry *)dlist_get(o->timerlist)) != NULL){
if((te->expireTime.tv_sec < o->lastKnownTime.tv_sec) ||
((te->expireTime.tv_sec == o->lastKnownTime.tv_sec) &&
(te->expireTime.tv_usec < o->lastKnownTime.tv_usec))
){
/* ok this one has expired */
dl = dlist_getThisSlist(o->timerlist);
if(dl != NULL){
dl->next = tmpl;
tmpl = dl;
}
}else{
/* ok the time is beyond this one I am looking at */
/* we should have no more after this */
break;
}
}
dl = tmpl;
/* call every timer that is up on our temp list */
while(dl != NULL){
tmpl = dl->next;
te = (timerEntry *)dl->ent;
free(dl);
(*te->action)(te->arg1,te->arg2);
free(te);
dl = tmpl;
}
}
void
__sendClockTick(distributor *o)
{
/* distribute a clock tick to all subscribers for ticks */
auditTick *at;
dlist_reset(o->lazyClockTickList);
while((at = (auditTick *)dlist_get(o->lazyClockTickList)) != NULL){
(*at->tick)(at->arg);
}
}
void
__sendStartStopEvent(distributor *o,int event)
{
/* distribute a clock tick to all subscribers for ticks */
StartStop *at;
dlist_reset(o->startStopList);
while((at = (StartStop *)dlist_get(o->startStopList)) != NULL){
if(at->flags & event)
(*at->activate)(at->arg,event);
}
}
int
dist_startupChange(distributor *o,startStopFunc sf, void *arg, int howmask)
{
int chnged;
StartStop *at;
chnged = 0;
dlist_reset(o->startStopList);
while((at = (StartStop *)dlist_get(o->startStopList)) != NULL){
if((sf == at->activate) && (arg == at->arg)){
chnged = 1;
at->flags = howmask;
if(howmask == DIST_DONT_CALL_ME){
/* pull it from the list too */
void *v;
v = dlist_getThis(o->startStopList);
if(v != at){
printf("Serious error, TSNH!\n");
return(LIB_STATUS_BAD3);
}else{
free(at);
}
}
}
}
if(!chnged){
/* new entry */
int ret;
at = calloc(1,sizeof(StartStop));
if(at == NULL){
/* no memory */
return(LIB_STATUS_BAD);
}
at->activate = sf;
at->flags = howmask;
at->arg = arg;
ret = dlist_append(o->startStopList,(void *)at);
if(ret != LIB_STATUS_GOOD){
free(at);
return(LIB_STATUS_BAD);
}
}
return(LIB_STATUS_GOOD);
}
int
dist_process(distributor *o)
{
int i,ret,errOut;
int clockTickFlag;
int doingMultipleReads;
timerEntry *te;
fdEntry *fe;
dlist_t *list1;
int timeout;
#ifdef USE_SELECT
struct timeval timeoutt;
int cnt;
fd_set readfds,writefds,exceptfds;
#endif
list1 = dlist_create();
if(list1 == NULL){
printf("Error no memory\n");
return(LIB_STATUS_BAD);
}
i = 0;
doingMultipleReads = 0;
__sendStartStopEvent(o,DIST_CALL_ME_ON_START);
while(o->notdone){
dlist_reset(o->timerlist);
te = dlist_get(o->timerlist);
#ifdef USE_SELECT
/* spin through and setup things.*/
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
cnt = 0;
for(i=0;i<o->numfds;i++){
if(o->fdlist[i].events & POLLIN){
if(o->fdlist[i].fd > cnt){
cnt = o->fdlist[i].fd;
}
FD_SET(o->fdlist[i].fd,&readfds);
}
if(o->fdlist[i].events & POLLOUT){
if(o->fdlist[i].fd > cnt){
cnt = o->fdlist[i].fd;
}
FD_SET(o->fdlist[i].fd,&writefds);
}
cnt++;
}
#endif
/* prepare timeout */
#ifndef MINIMIZE_TIME_CALLS
gettimeofday(&o->lastKnownTime,(struct timezone *)NULL);
#endif
if(te != NULL){
/* yep, a timer is running use the delta */
clockTickFlag = 0;
if(te->expireTime.tv_sec >= o->lastKnownTime.tv_sec){
timeout = ((te->expireTime.tv_sec - o->lastKnownTime.tv_sec) * 1000);
if(te->expireTime.tv_usec > o->lastKnownTime.tv_usec){
timeout += ((te->expireTime.tv_usec - o->lastKnownTime.tv_usec)/1000);
}else if(te->expireTime.tv_usec < o->lastKnownTime.tv_usec){
/* borrow 1 second from the upper seconds
*/
timeout += (((1000000 - o->lastKnownTime.tv_usec) + te->expireTime.tv_usec)/1000);
/* subtract out the borrowed time above */
timeout -= 1000;
}
/* subtract off 10ms this is due to
* the clock synchronization issues. What
* happens is the O/S will round up leaving us a
* drift upwards. This compensates for that drift,
* we only do this if we have more than 11ms to wait.
*/
if(timeout > 11){
timeout -= 10;
}
}else{
/* expire time is past? */
timeout = 0;
}
if(timeout <= 0){
/* problem, time out has occured */
__check_time_out_list(o);
continue;
}
}else{
/* no timeout setup default */
clockTickFlag = 1;
timeout = (o->idleClockTick.tv_sec * 1000) + o->idleClockTick.tv_usec/1000;
}
#ifdef USE_SELECT
if(doingMultipleReads){
/* poll for fd only */
timeoutt.tv_sec = 0;
timeoutt.tv_usec = 0;
}else{
timeoutt.tv_sec = (timeout/1000);
timeoutt.tv_usec = (timeout%1000) * 1000;
}
ret = select(cnt,&readfds,&writefds,&exceptfds,&timeoutt);
#else
if(doingMultipleReads){
/* non-blocking poll */
ret = poll(o->fdlist,o->numused,0);
}else{
ret = poll(o->fdlist,o->numused,timeout);
}
#endif
/* save off the error from poll/select */
errOut = errno;
/* we always check the timeout updating the time in the process */
__check_time_out_list(o);
if(ret > 0){
/* ok we have something to do */
#ifdef USE_SELECT
/* translate the select terminology into the poll fundamentals */
for(i=0;i<o->numfds;i++){
if(o->fdlist[i].fd == -1)
continue;
o->fdlist[i].revents = 0;
if(FD_ISSET(o->fdlist[i].fd,&readfds)){
o->fdlist[i].revents |= POLLIN;
}
if(FD_ISSET(i,&writefds)){
o->fdlist[i].revents |= POLLOUT;
}
}
#endif
/* now rip through and call all the appropriate
* functions
*/
for(i=0;i<o->numfds;i++){
if(o->fdlist[i].fd == -1){
continue;
}
if(o->fdlist[i].revents == 0){
continue;
}
/* got a live one */
fe = (fdEntry *)HashedTbl_lookupKeyed(o->fdSubscribers,
o->fdlist[i].fd,
&o->fdlist[i].fd,
sizeof(int),
(void **)NULL);
if(fe == NULL){
/* huh? TSNH */
o->fdlist[i].fd = -1;
o->numused--;
if(i != o->numused){
o->fdlist[i].fd = o->fdlist[o->numused].fd;
o->fdlist[i].events = o->fdlist[o->numused].events;
o->fdlist[i].revents = o->fdlist[o->numused].revents;
o->fdlist[o->numused].fd = -1;
o->fdlist[o->numused].events = 0;
o->fdlist[o->numused].revents = 0;
}
continue;
}
if((*fe->fdNotify)(fe->arg,o->fdlist[i].fd,o->fdlist[i].revents) > 0){
if(fe->onTheList == 0){
if(dlist_append(list1,(void *)fe) < LIB_STATUS_GOOD){
printf("Can't append??\n");
}else{
fe->onTheList = 1;
}
}
}
o->fdlist[i].revents = 0;
}
}else if((ret == 0) && (clockTickFlag) && (!doingMultipleReads)){
__sendClockTick(o);
}else if((ret == 0) && (doingMultipleReads)){
dlist_reset(list1);
fe = (fdEntry *)dlist_get(list1);
if((*fe->fdNotify)(fe->arg,o->fdlist[i].fd,0) <= 0){
void *v;
fe->onTheList = 0;
v = dlist_getThis(list1);
if(v != (void *)fe){
printf("Help TSNH!\n");
}
}
}
if(dlist_getCnt(list1)){
/* if we have some on the list then we need
* to arrange to just poll non-blocking
*/
doingMultipleReads = 1;
}else{
doingMultipleReads = 0;
}
}
__sendStartStopEvent(o,DIST_CALL_ME_ON_STOP);
dlist_destroy(list1);
return(LIB_STATUS_GOOD);
}
void
__distribute_msg_o_list(dlist_t *dl,messageEnvolope *msg)
{
/* distribute over a list the particular message *msg */
msgConsumer *at;
dlist_reset(dl);
while((at = (msgConsumer *)dlist_get(dl)) != NULL){
(*at->msgNotify)(at->arg,msg);
if((msg->data == NULL) || (msg->totData == NULL)){
break;
}
}
}
void
dist_sendmessage(distributor *o,messageEnvolope *msg)
{
hashableHash *hh;
HashedTbl *hofl;
hashableDlist *hdl;
/* first lets lookup in the master table for the protocol */
msg->distrib = (void *)o;
hdl = NULL;
hh = HashedTbl_lookupKeyed(o->msgSubscribers,
msg->protocolId,
&msg->protocolId,
sizeof(int),
(void **)NULL);
if(hh != NULL){
/* ok do we have a protocolID specific entry? - yes */
hofl = hh->hash;
/* is the stream there? */
hdl = (hashableDlist *)HashedTbl_lookupKeyed(hofl,
msg->streamNo,
&msg->streamNo,
sizeof(int),
(void **)NULL);
if(hdl == NULL){
/* no, see if the default stream is there */
int x;
x = DIST_STREAM_DEFAULT;
hdl = (hashableDlist *)HashedTbl_lookupKeyed(hofl,
x,
&x,
sizeof(int),
(void **)NULL);
}
}
if(hdl != NULL){
/* ok lets send it over this list */
__distribute_msg_o_list(hdl->list,msg);
}
/* if we did not consume and we were on some other list, pass it over the
* default list.
*/
if((msg->data != NULL) && (msg->totData != NULL) && (hdl != o->msgSubZeroNoStrm)){
__distribute_msg_o_list(o->msgSubZeroNoStrm->list,msg);
}
if((msg->totData != NULL) && (msg->data != NULL) && (msg->takeOk)){
free(msg->totData);
}
}
int
dist_wants_audit_tick(distributor *o,auditFunc af,void *arg)
{
auditTick *at;
at = calloc(1,sizeof(auditTick));
if(at == NULL){
return(LIB_STATUS_BAD);
}
at->arg = arg;
at->tick = af;
return(dlist_append(o->lazyClockTickList,(void *)at));
}
int
dist_no_audit_tick(distributor *o,auditFunc af,void *arg)
{
/* find and remove this guy */
auditTick *at;
dlist_reset(o->lazyClockTickList);
while((at = (auditTick *)dlist_get(o->lazyClockTickList)) != NULL){
if((at->arg == arg) && (at->tick == af)){
/* got it */
dlist_getThis(o->lazyClockTickList);
free(at);
return(LIB_STATUS_GOOD);
}
}
return(LIB_STATUS_BAD);
}
void
dist_changelazyclock(distributor *o,struct timeval *tm)
{
o->idleClockTick.tv_sec = tm->tv_sec;
o->idleClockTick.tv_usec = tm->tv_usec;
}