Only in libmilter.org: docs diff -u libmilter.org/engine.c libmilter/engine.c --- libmilter.org/engine.c 2002-07-29 18:40:47.000000000 +0200 +++ libmilter/engine.c 2002-11-13 12:35:05.000000000 +0100 @@ -87,6 +87,8 @@ static char **dec_argv __P((char *, size_t)); static int dec_arg2 __P((char *, size_t, char **, char **)); +static int socket_data_ready __P((socket_t)); + /* states */ #define ST_NONE (-1) #define ST_INIT 0 /* initial state */ @@ -181,7 +183,7 @@ ** ctx -- context structure ** ** Returns: -** MI_FAILURE/MI_SUCCESS +** MI_FAILURE/MI_SUCCESS/MI_CONTINUE */ int mi_engine(ctx) @@ -190,7 +192,7 @@ size_t len; int i; socket_t sd; - int ret = MI_SUCCESS; + int ret = MI_CONTINUE; int ncmds = sizeof(cmds) / sizeof(cmdfct); int curstate = ST_INIT; int newstate; @@ -207,7 +209,9 @@ arg.a_ctx = ctx; sd = ctx->ctx_sd; fi_abort = ctx->ctx_smfi->xxfi_abort; - mi_clr_macros(ctx, 0); + curstate = ctx->ctx_state; + if (curstate == ST_INIT) + mi_clr_macros(ctx, 0); fix_stm(ctx); r = _SMFIS_NONE; do @@ -313,7 +317,10 @@ free(buf); buf = NULL; } - continue; + if (socket_data_ready(sd)) + continue; + else + break; } } arg.a_len = len; @@ -363,9 +370,18 @@ ret = MI_FAILURE; break; } - } while (!bitset(CT_END, cmds[i].cm_todo)); - if (ret != MI_SUCCESS) + if (bitset(CT_END, cmds[i].cm_todo)) + ret = MI_SUCCESS; + + if (ret == MI_CONTINUE && !socket_data_ready(sd)) + break; + + } while (ret == MI_CONTINUE); + + ctx->ctx_state = curstate; + + if (ret == MI_FAILURE) { /* call abort only if in a mail transaction */ if (fi_abort != NULL && call_abort) @@ -373,11 +389,13 @@ } /* close must always be called */ - if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) - (void) (*fi_close)(ctx); + if (ret != MI_CONTINUE) { + if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) + (void) (*fi_close)(ctx); + } if (r != _SMFIS_KEEP && buf != NULL) free(buf); - mi_clr_macros(ctx, 0); + /* mi_clr_macros(ctx, 0); */ return ret; } /* @@ -1150,3 +1168,45 @@ /* are we in the correct state? It must be "End of Message". */ return ctx->ctx_state == ST_ENDM; } + +/* +** SOCKET_DATA_READY +** +** Parameters: +** sd -- socket_t +** +** Returns: +** +*/ +static int socket_data_ready (sd) + socket_t sd; +{ + fd_set readset, excset; + struct timeval timeout; + + if (sd >= FD_SETSIZE) + return 0; + + FD_ZERO(&readset); + FD_SET(sd, &readset); + FD_ZERO(&excset); + FD_SET(sd, &excset); + + timeout.tv_sec = 0; + timeout.tv_usec = 0; + + while (1) + { + int n; + + n = select(sd + 1, &readset, NULL, &excset, &timeout); + if (n < 0) { + if (errno == EINTR) + continue; + return 0; + } + return (n > 0 ? 1 : 0); + } + return 0; +} + diff -u libmilter.org/handler.c libmilter/handler.c --- libmilter.org/handler.c 2002-04-29 17:06:48.000000000 +0200 +++ libmilter/handler.c 2002-12-12 14:26:41.219117000 +0100 @@ -13,36 +13,546 @@ #include "libmilter.h" +/* work data structure - linked list */ +struct work_t { + SMFICTX_PTR w_ctx; /* session context */ + int w_state; /* work state */ + long w_id; /* work id */ + struct work_t *next; /* next work in the list */ +}; + +typedef struct work_t WORK_T; + +/* data */ +static WORK_T *mi_work = NULL; /* head of linked list */ +static smutex_t w_mutex = PTHREAD_MUTEX_INITIALIZER; /* linked list mutex */ +static pthread_cond_t w_cond = PTHREAD_COND_INITIALIZER; /* conditional variable */ + +static pthread_t tmux_id = -1; + +/* prototypes */ +static void *mi_worker __P((void *)); +static void *mi_multiplexer __P((void *)); +static void mi_dispose_context __P((SMFICTX_PTR)); +static int mi_work_lock __P((void)); +static int mi_work_unlock __P((void)); +static int mi_work_add __P((WORK_T *)); +static int mi_work_del __P((WORK_T *)); + +/* +** +** +** +*/ +static int N_Threads = 0; /* max number of available threads */ +static int A_Threads = 0; /* current number of available threads */ + +static smutex_t N_Threads_Mutex = PTHREAD_MUTEX_INITIALIZER; + +static int threads_avail __P((int)); +static int threads_total __P((int)); + +/* states of work being processed by multiplexer */ + +#define ST_WORK_NONE -1 +#define ST_WORK_INIT 0 /* initial state */ +#define ST_WORK_ENGINE 1 /* work being processed by mi_engine */ +#define ST_WORK_WAIT_1 2 /* waiting to be catched by multiplexer */ +#define ST_WORK_MUX 3 /* caught - waiting for fd ready */ +#define ST_WORK_WAIT_2 4 /* data ready - waiting for a worker */ +#define ST_WORK_OUT 5 /* end of session */ + + +#define THR_WAIT 30 /* time to wait for new work - 30 secs */ +#define MIN_THREADS 2 /* minimum number of threads to keep around */ +#define MIN_IDLE 1 /* minimum number of idle threads */ +#define MUX_WAIT 200000 /* time to wait for file descriptor ready usecs */ + +static int dbg = 10; + /* ** HANDLE_SESSION -- Handle a connected session in its own context +** Called when new connection arrives +** Handles first engine state and then creates new work +** structure and adds it to work linked list ** -** Parameters: -** ctx -- context structure +** Parameters: +** ctx -- context structure ** -** Returns: -** MI_SUCCESS/MI_FAILURE +** Returns: +** MI_SUCCESS/MI_FAILURE */ int mi_handle_session(ctx) SMFICTX_PTR ctx; { - int ret; + int ret = MI_CONTINUE; + char *name = "mi_handle_session"; + WORK_T *w = NULL; + + if (tmux_id == -1) + { + int r; + if ((r = thread_create(&tmux_id, mi_multiplexer, (void *) NULL)) != 0) + { + smi_log(SMI_LOG_ERR, "%s: thread_create() failed: %d, %s", name, r, sm_errstring(r)); + tmux_id = -1; + return MI_FAILURE; + } + } if (ctx == NULL) return MI_FAILURE; + + dbg = ctx->ctx_dbg; + ctx->ctx_id = (sthread_t) sthread_get_id(); + /* + ** assume initial state = 0 - in fact ST_INIT defined + ** at engine.c * - shall move definitions from engine.c + ** to libmilter.h + */ + ctx->ctx_state = 0; + /* - ** detach so resources are free when the thread returns - ** if we ever "wait" for threads, this call must be removed + **detach so resources are free when the thread returns + **if we ever "wait" for threads, this call must be removed */ if (pthread_detach(ctx->ctx_id) != 0) ret = MI_FAILURE; + else + { + ret = mi_engine(ctx); + + if (ret == MI_CONTINUE) + { + if ((w = (WORK_T *) malloc(sizeof(WORK_T))) == NULL) + { + smi_log(SMI_LOG_ERR,"%s : malloc error : %s", name, sm_errstring(errno)); + ret = MI_FAILURE; + } + else + { + memset(w, 0, sizeof(WORK_T)); + w->w_state = ST_WORK_WAIT_1; + w->w_ctx = ctx; + (void ) mi_work_lock(); + if (mi_work_add(w) == MI_SUCCESS) + ctx->ctx_id = w->w_id; + else + { + smi_log(SMI_LOG_ERR,"%s : mi_work_add error : %s", + name, sm_errstring(errno)); + free(w); + w = NULL; + ret = MI_FAILURE; + } + (void ) mi_work_unlock(); + } + } + } + + if (ret != MI_CONTINUE) + { + mi_dispose_context(ctx); + ctx = NULL; + if (w != NULL) + free(w); + } + + return ret; +} + +/* +** MI_MULTIPLEXER - multiplexer +** waits ready state on all sockets and dispatch +** work to workers threads +** +** Parameters : +** arg - dummy argument (it's a thread...) +** +** Returns : +** dummy NULL pointer : (it's a thread...) +*/ + +void * +mi_multiplexer(arg) + void *arg; +{ + int n, nfds; + WORK_T *w; + fd_set readset, excset; + struct timeval timeout; + char *name = "mi_multiplexer_thread"; + int r; + + sthread_t t_id = sthread_get_id(); + + time_t ti, now; + ti = now = time(NULL); + + if (dbg > 4) + smi_log(SMI_LOG_DEBUG, "Entering mi_multiplexer..."); + + /* + **detach so resources are free when the thread returns + **if we ever "wait" for threads, this call must be removed + */ + if ((r = pthread_detach(t_id)) != 0) + smi_log(SMI_LOG_ERR,"%s : thread_dettach() failed: %d, %s", + name, r, sm_errstring(r)); + + while (1) + { + if (time(NULL) - now > 120) + { + time_t t = time(NULL); + now = t - (t % 120); + if (dbg > 5) + smi_log(SMI_LOG_DEBUG, "%s : nb : %6ld [%d %d]", + name, now - ti, N_Threads, A_Threads); + } + + /* setup file descriptor to select */ + + nfds = 0; + FD_ZERO(&readset); + FD_ZERO(&excset); + + (void) mi_work_lock(); + for (w = mi_work; w != NULL; w = w->next) + { + if (w->w_state == ST_WORK_WAIT_1 || w->w_state == ST_WORK_MUX) + { + if (w->w_ctx->ctx_sd < FD_SETSIZE) + { + w->w_state = ST_WORK_MUX; + FD_SET(w->w_ctx->ctx_sd, &readset); + FD_SET(w->w_ctx->ctx_sd, &excset); + if (w->w_ctx->ctx_sd > nfds) + nfds = w->w_ctx->ctx_sd; + } + else + { + /* log it : can't handle sd > FD_SETSIZE (1024) */ + smi_log(SMI_LOG_ERR, "%s : ctx_sd [%d] greater than FD_SETSIZE", + name, w->w_ctx->ctx_sd); + } + } + } + (void) mi_work_unlock(); + + timeout.tv_sec = 0; + timeout.tv_usec = MUX_WAIT; + + /* if #fd > 0 then select else sleep */ + if (0 && nfds == 0) + timeout.tv_sec = 1; + + if (nfds > 0) + n = select(nfds + 1, &readset, NULL, &excset, &timeout); + else + n = select(0, NULL, NULL, NULL, &timeout); + + /* interrupted system call */ + if (n < 0 && errno == EINTR) + continue; + + /* error condition */ + if (n < 0) + { + smi_log(SMI_LOG_ERR, + "%s : select failed: %d, %s", + name, errno, sm_errstring (errno)); + continue; + } + + /* timeout */ + if (n == 0) + continue; + + (void) mi_work_lock(); + for (w = mi_work; w != NULL; w = w->next) + { + /* check only works in the state ST_WORK_MUX ??? */ + if (w->w_state != ST_WORK_WAIT_2) + continue; + + if ((r = thread_create (&t_id, mi_worker, (void *) w)) != 0) + { + smi_log(SMI_LOG_ERR, "%s: thread_create() failed: %d, %s", + name, r, sm_errstring (r)); + /* do some other thing... help ??? */ + } + } + (void) mi_work_unlock(); + + /* read ready or exception */ + if (n > 0) + { + int avail = avail_threads(0); + int total = total_threads(0); + + do + { + smi_log(SMI_LOG_ERR, "WORKERS : total=[%d], avail=[%d]", total, avail); + + (void) mi_work_lock(); + /* distribute work */ + for (w = mi_work; w != NULL; w = w->next) + { + /* check only works in the state ST_WORK_MUX ??? */ + if (w->w_state != ST_WORK_MUX) + continue; + + /* it shall surely be on this state if fd is ready */ + if (FD_ISSET(w->w_ctx->ctx_sd, &readset) || + FD_ISSET(w->w_ctx->ctx_sd, &excset)) + { + /* is there available idle threads */ + if (A_Threads > 0) + { + /* Yes ! Let's signal one of them */ + w->w_state = ST_WORK_WAIT_2; + if ((r = pthread_cond_signal(&w_cond)) != 0) + { + smi_log(SMI_LOG_ERR, "%s: pthread_cond_signal() failed: %d, %s", + name, r, sm_errstring (r)); + /* do something other */ + } + } + else + { + /* No ! Let's launch a new worker */ + int ret; + pthread_t t_id; + + w->w_state = ST_WORK_WAIT_2; + if ((r = thread_create (&t_id, mi_worker, (void *) w)) != 0) + { + smi_log(SMI_LOG_ERR, "%s: thread_create() failed: %d, %s", + name, r, sm_errstring (r)); + /* do some other thing... help ??? */ + } + } + break; + } + } + (void) mi_work_unlock(); + } + while (w != NULL); + + } + } +} + + +/* +** MI_WORKER - worker thread +** handles work distributed by the multiplexer +** +** Parameters : +** arg - pointer to a WORK_T structure - the first +** work to handle +** +** Returns : +** dummy NULL pointer : (it's a thread...) +*/ +void * +mi_worker(arg) + void *arg; +{ + WORK_T *w = (WORK_T *) arg; + SMFICTX_PTR ctx = NULL; + char *name; + int r; + + sthread_t t_id = sthread_get_id(); + + /* + **detach so resources are free when the thread returns + **if we ever "wait" for threads, this call must be removed + */ + if ((r = pthread_detach(t_id)) != 0) + smi_log(SMI_LOG_ERR,"%s : thread_dettach() failed: %d, %s", + name, r, sm_errstring(r)); + + if (w == NULL) + { + (void) mi_work_lock(); + for (w = mi_work; w != NULL; w = w->next) + { + if (w->w_state == ST_WORK_WAIT_2) + { + w->w_state = ST_WORK_ENGINE; + break; + } + } + (void) mi_work_unlock(); + if (w == NULL) + return NULL; + } else + { + (void) mi_work_lock(); + if (w->w_state != ST_WORK_WAIT_2) + { + (void) mi_work_unlock(); + return NULL; + } + w->w_state = ST_WORK_ENGINE; + (void) mi_work_unlock(); + } + + ctx = w->w_ctx; + if (ctx == NULL) + return NULL; + + total_threads(1); + + if (dbg > 4) + smi_log(SMI_LOG_DEBUG, "Entering mi_worker... id=[%d] st=[%d]", + ctx->ctx_id, ctx->ctx_state); + + do + { + int avail, total; + int ret = MI_CONTINUE; + + if (dbg > 4) + smi_log(SMI_LOG_DEBUG, "Will enter mi_engine... t_id=[%ld] id=[%d] st=[%d]", + (long) t_id, ctx->ctx_id, ctx->ctx_state); + ret = mi_engine(ctx); + + /* are we at the end of the session ? */ + if (ret == MI_CONTINUE) + { + /* No */ + (void) mi_work_lock(); + w->w_state = ST_WORK_WAIT_1; + (void) mi_work_unlock(); + } + else + { + /* Yes */ + mi_dispose_context(ctx); + w->w_ctx = ctx = NULL; + (void) mi_work_lock(); + (void) mi_work_del(w); + (void) mi_work_unlock(); + } + + do + { + struct timespec timeout; + int r; + + (void) smutex_lock (&N_Threads_Mutex); + /* + ** Thread shall exit if ther's more than + ** one available idle worker + */ + if (N_Threads > MIN_THREADS && A_Threads > MIN_IDLE) + { + if (dbg > 4) + smi_log(SMI_LOG_DEBUG, + "Exiting mi_worker... A t_id=[%d] - avail=[%ld] - total=[%d]", + (long) t_id, A_Threads, N_Threads); + if (N_Threads > 0) + N_Threads--; + (void) smutex_unlock (&N_Threads_Mutex); + return NULL; + } + (void) smutex_unlock (&N_Threads_Mutex); + + /* wait for notification */ + timeout.tv_sec = time (NULL) + THR_WAIT; + timeout.tv_nsec = 0; + + w = NULL; + ctx = NULL; + + (void) mi_work_lock (); + + A_Threads++; + r = pthread_cond_timedwait (&w_cond, &w_mutex, &timeout); + A_Threads--; + + if (r == 0) + { + /* r = 0 : ther's work available to handle */ + for (w = mi_work; w != NULL; w = w->next) + { + if (w->w_state == ST_WORK_WAIT_2) + { + ctx = w->w_ctx; + if (ctx != NULL) + { + w->w_state = ST_WORK_ENGINE; + } + else + { + w = NULL; + /* do some syslog and delete this record */ + smi_log(SMI_LOG_ERR, " ctx NULL"); + continue; + } + break; + } + } + } + + (void) mi_work_unlock(); + + /* got a work */ + if (ctx != NULL && w != NULL) + break; + + /* timeout - let's wait again */ + if (r == ETIMEDOUT) + continue; + + /* error condition */ + if (r != 0) + { + smi_log (SMI_LOG_ERR, + "%ld: pthread_cond_timedwait(Work) failed: %d, %s", + (long) t_id, r, sm_errstring (r)); + + total_threads(-1); + + return NULL; + } + } while (1); + + } while (1); + + return NULL; +} + + +/* +** DISPOSE_CONTEXT - free context allocated memory +** and close socket, if open +** +** Parameters : +** ctx -- context structure +** +** Returns : +** none +*/ +static void +mi_dispose_context(ctx) + SMFICTX_PTR ctx; +{ + if (ctx == NULL) + return; + if (ValidSocket(ctx->ctx_sd)) { (void) closesocket(ctx->ctx_sd); @@ -61,6 +571,201 @@ } mi_clr_macros(ctx, 0); free(ctx); - ctx = NULL; - return ret; } + +/* +** WORK_LOCK - locks work linked list +** +** Parameters : +** name - name of calling function +** +** Returns : +** MI_FAILURE/MI_SUCCESS +*/ +static int +mi_work_lock() +{ + char *name = "mi_work_lock"; + + if (!smutex_lock (&w_mutex)) + { + smi_log(SMI_LOG_ERR,"%s : mutex_lock(w_mutex) failed", name); + return MI_FAILURE; + } + return MI_SUCCESS; +} + +/* +** WORK_UNLOCK - unlocks work linked list +** +** Parameters : +** name - name of calling function +** +** Returns : +** MI_FAILURE/MI_SUCCESS +*/ +static int +mi_work_unlock() +{ + char *name = "mi_work_unlock"; + + if (!smutex_unlock (&w_mutex)) + { + smi_log(SMI_LOG_ERR,"%s : mutex_unlock(w_mutex) failed", name); + return MI_FAILURE; + } + + return MI_SUCCESS; +} + +/* +** WORK_ADD - adds a new work at the end of +** the work linked list +** +** Parameters : +** w - work to add +** +** Returns : +** MI_FAILURE/MI_SUCCESS +*/ +static int +mi_work_add(w) + WORK_T*w; +{ + int r; + static long id = 0; + + if (w == NULL) + return MI_FAILURE; + + w->w_id = ++id; + w->next = NULL; + + if (mi_work == NULL) + { + mi_work = w; + } + else + { + WORK_T *p = mi_work; + + while (p->next != NULL) + p = p->next; + p->next = w; + } + + return MI_SUCCESS; +} + +/* +** WORK_DEL - deletes a work from the work linked list +** +** Parameters : +** w - work to delete +** +** Returns : +** MI_FAILURE/MI_SUCCESS +*/ +static int +mi_work_del(w) + WORK_T *w; +{ + WORK_T *p, *prev; + int res = MI_FAILURE; + + if (mi_work == NULL) + { + /* work list is empty */ + return MI_FAILURE; + } + + p = mi_work; + prev = NULL; + + while (p != NULL) + { + if (p == w) + { + res = MI_SUCCESS; + if (prev == NULL) + mi_work = p->next; + else + prev->next = p->next; + if (p != NULL) + { + if (p->w_ctx != NULL) + { + /* + ** log... + res = MI_FAILURE; + */ + } + free (p); + } + break; + } + prev = p; + p = p->next; + } + + return res; +} + +/* +** AVAIL_THREADS -- tell how many workers are available +** +** Parameters: +** n - number to add or delete to total +** +** Returns: +** >= 0- number of threads (before adding "n") +** <0- error +** +*/ +int +avail_threads (n) + int n; +{ + int avail = 0; + + (void) smutex_lock (&N_Threads_Mutex); + + avail = A_Threads; + if (n >= 0 || (A_Threads + n) >= 0) + A_Threads += n; + + (void ) smutex_unlock (&N_Threads_Mutex); + + return avail; +} + +/* +** TOTAL_THREADS -- tell how many workers are present +** +** Parameters: +** n - number to add or delete to total +** name - name of calling procedure +** log - optional string to log +** +** Returns: +** >= 0- number of threads (before adding "n") +** <0- error +** +*/ +int +total_threads (n) + int n; +{ + int total; + + (void ) smutex_lock (&N_Threads_Mutex); + + total = N_Threads; + if (n >= 0 || (N_Threads + n) >= 0) + N_Threads += n; + + (void ) smutex_unlock (&N_Threads_Mutex); + + return total; +} + diff -u libmilter.org/signal.c libmilter/signal.c --- libmilter.org/signal.c 2002-03-23 01:55:19.000000000 +0100 +++ libmilter/signal.c 2002-10-25 11:30:46.000000000 +0200 @@ -107,6 +107,8 @@ if (sigwait(&set, &sig) != 0) #endif /* defined(SOLARIS) || defined(__svr5__) */ { + if (errno == EINTR) + continue; smi_log(SMI_LOG_ERR, "%s: sigwait returned error: %d", (char *)name, errno);