This chapter describes some parts of the current sendmail X implementation. Obviously the implementation may change at any time, this is not part of the sendmail X specification. Only the external interfaces, i.e., those described in previous chapters, specify the sendmail X behavior and configuration. The usual disclaimer ``may be subject to changes'' applies, i.e., nobody should rely on the descriptions given here, they are for people who actually want to understand (and modify or maintain) the source code. Some of the descriptions might not be up to date (usually the source code is implemented/changed first and updating the documentation may lag a bit).
There are some parts of the sendmail X source code which should be explained before the individual modules and the libraries. Those are conventions that are common to at least two modules and which will be used in the following sections.
In Section 3.1.1.2 some remarks about identifiers have been made. In this section the structure of session and transaction identifiers are explained. These identifiers for the SMTP server consist (currently) of a leading `S', a 64 bit counter, and an 8 bit process index, i.e., the format is: "S%016qX%02X". Those identifiers are of course used in other modules too, especially the QMGR. For simplicity the identifiers for the delivery agents follow a similar scheme: a leading `C', an 8 bit process index, a running counter (32 bit) and a thread index (32 bit). Notice that only the SMTPS identifiers are supposed to be unique over a very long time period (it takes a while before a 64 bit counter wraps around). The SMTPC (DA) identifiers are unique for a shorter time (see Section 3.1.1.2 about the requirements) and they allow easy identification of delivery threads in SMTPC. The type for this identifier is sessta_id_T.
To uniquely identify recipients they are enumerated within a transaction, so their format is "%19s-%04X" where the first 19 characters is the SMTPS transaction id and the last four characters are the recipient index (this limits the number of recipients per transaction to 65535 which seems to be enough4.1). The type for this identifier is rcpt_id_T.
In some cases it is useful to have only one identifier in a structure and some field that denotes which identifier type is actually used, i.e., a superset of sessta_id_T and rcpt_id_T. This type is smtp_id_T.
Notice: under certain circumstances it might be worth to store identifiers as binary data instead of printable strings. For example, if they are used for large indices that are stored in main memory. For SMTP server identifiers this shrinks the size from 20 bytes down to 12 (8 byte for the counter, 4 for the process index assuming that it's a 32 bit system, i.e., the 8 bit process index will be stored in a 32 bit word). For the recipient identifiers the length will shrink from 25 bytes (probably stored in 28 bytes) down to 16 bytes. Whether it is worth to trade memory storage for conversion overhead (and added complexity) remains to be seen. In the first version of sendmail X this will probably not be implemented. See also Section 3.13.6.1.1 about size estimates.
This section talks about the integration of asynchronous functions with the event thread library. Section 3.1.1.1 gives a description of asynchronous functions, stating that a result (callback) function RC is invoked with the current status and the result of an aysnchronous function as parameters. However, the implementation of this is not as simple as it might seem. The event thread library (see Section 3.20.5 for a functional description and Section 4.3.8 for the implementation) uses a task context that is passed to worker threads which in turn execute the application function (passing it the entire task context).
Note: there are two different problems described here:
Case 2 can be turned into case 1 by splitting a function into two (``continuation''), i.e., when the result is required to continue then the first half of the function saves its state and stops execution, e.g., it goes back to the wait queue. The second half of the function can then either be executed via the callback, or it is activated and combines the requested result with the current state to achieve the final result. Note: this kind of programming is not particularly simple as explained in Section 3.16.4, but it avoids blocking as much as possible.
There are basically two approaches to this problem:
The advantage of solution 1 is that it requires less context switching (even if it is only a thread context). If the function RC is a callback that is directly invoked from the function that receives the result from an external source, then the result values should not be those that are returned to the worker manager (e.g., OK, WAITQ, RUNQ, DEL, see 3.20.5.1.1), unless we put an extra handler inbetween that knows about these return values and can handle them properly by performing the appropriate actions. Therefore this approach requires careful programming, see below for details.
Solution 2 does not mess around with the worker function and the task (event thread) context without telling the event thread system about it, hence it does not have the programming restriction mentioned above; it allows for a more normal programming style. However, there needs to be a way to match the condition that became true with the task that is waiting for it. Usually this is done via identifiers (see Section 3.1.1.2), however, the current implementation only allows for a single character to be sent over the internal notification pipe4.2. Hence it is complicated to find the right task - waking up one after another to let them figure out themselves whether ``their'' condition is met can be expensive. One possible solution for this is to maintain a queue of results instead of writing the results directly into some structure. Elements in the result queue contain the necessary identifier to find the right task that is waiting for the condition/result.
An asynchronous call sequence looks like this:
The next step depends on which solution has been chosen:
If the task C is invoked via the callback from result_handler() then we either have to make C aware of this (implicitly by ``knowing'' which parts of C are invoked by callbacks or explicitly by passing this information somehow, e.g., encoded in the task structure). We can either require that a callback has different set of return values, i.e., only EVTHR_OK, or we need an ``inbetween'' function that can interpret the result values for a manager thread and manipulate the task context accordingly. The latter seems like the most generic and clean solution. Notice, however, that we have to take care of the usual problem with accessing the task while it is outside of our control, i.e., if it has been returned to the event thread system earlier on, the well-known problems arise (see Section 3.20.5.1.5, additionally the task must know whether it placed itself earlier on into the wait queue to avoid doing it again).
In the previous section solution 3a for approach 1 states that the callback function may store the result in a data structure. If the caller of the asynchronous function waits for the result, then the access to the structure must be synchronized and the caller must be informed that the data is available (valid). Note: this is not a good solution to the problem of asynchronous functions (because the caller blocks while waiting without giving the event threads library a chance to switch to another thread; if too many functions are doing this the system may even deadlock), but just a simple approach until a better solution is found.
The simple algorithm uses a mutex (to protect access to the shared data structure and the status variable), a condition variable (to signal completion), and a status variable which has three states:
Note: due to the asynchronous operation, states 2 and 3 do not need to happen in that order. Moreover, state 2 may not be reached at all because the callee is done before the caller needs the result (this is the best case because it avoids waiting).
Caller:
status = init; /* initialize system */
invoke callee /* call asynchronous function */
... /* do something */
lock; /* acquire mutex */
if (status == init) { /* is it still the initial value? */
status = wait; /* indicate that caller is waiting */
while (status == wait) /* wait until status changes */
cond_wait /* wait for signal from callee, this also unlocks the mutex */
}
}
unlock;
Callee:
... /* compute result */ lock; /* acquire mutex */ v = result; /* set result */ notify = (status == wait); /* is caller waiting? */ status = sent; /* result is available */ if (notify) cond_signal /* notify caller if it is waiting */ unlock; /* done */
This can be extended if there are multiple function calls and hence multiple results to wait for. It would be ugly to use an individual status variable for each function, hence a status variable and a counter are used. If the counter is zero, then all results are available. The status variable simply indicates whether the caller is waiting for a result and hence the callee should use the condition variable to signal that the result is valid. Since there can be multiple callees, only the one for which the counter is zero and the status variable is wait must signal completion.
Caller:
status = init; /* initialize system */
counter = 0;
...
lock
++counter;
invoke callee /* call asynchronous function */
unlock /* (maybe multiple times) */
... /* do something */
lock /* acquire mutex */
if (counter > 0) { /* are there outstanding results? */
status = wait; /* indicate that caller is waiting */
while (status == wait && counter > 0) /* wait for results */
cond_wait /* wait for signal from callee */
}
unlock /* done */
Callee:
... /* compute result */
lock /* acquire mutex */
--counter;
v = result; /* set result */
/* is caller waiting and this is the last result? */
if (status == wait && counter == 0) {
status = sent; /* results are available */
cond_signal /* notify caller */
}
unlock /* done */
Notes:
If the result value cannot have all possible values in its range, then two values can be designated as v-init and v-wait while all others can be considered as v-sent. This removes the need for a status variable because the result variable v itself is used for that purpose:
Caller:
v = v_init; /* initialize system */
invoke callee /* call asynchronous function */
... /* do something */
lock /* acquire mutex */
if (v == v_init) { /* is it still the initial value? */
v = v_wait; /* indicate that caller is waiting */
while (v == v_wait) /* wait until status changes */
cond_wait /* wait for signal from callee */
}
}
unlock
Callee:
... /* compute result */ lock; /* acquire mutex */ notify = (v == v_wait); /* is caller waiting? */ v = result; /* set result */ if (notify) cond_signal /* notify caller if it is waiting */ unlock /* done */
Solution 2 described in Section 4.1.2.2 needs some form of synchronization too: as described in the previous Section 4.1.2.3 the callee may may return a result before or after the caller wants to access it.
Using the first algorithm from the previous section with a slight modification like this:
Caller:
status = init; /* initialize system */ invoke callee /* call asynchronous function */ ... /* do something */ lock; /* acquire mutex */ waiting = (status == init); /* is it still the initial value? */ if (waiting) status = wait; /* indicate that caller is waiting */ unlock; if (waiting) put task into condqueue
Callee:
... /* compute result */ lock; /* acquire mutex */ v = result; /* set result */ notify = (status == wait); /* is caller waiting? */ status = sent; /* result is available */ if (notify) /* if caller is waiting: */ cond_signal /* notify evthr system about condition */ unlock; /* done */
causes a race condition: after unlocking the mutex in the caller but before putting the task into condqueue the callee may signal the event thread library that condition is fulfilled. To avoid this race condition, a condition variable and a mutex must be combined as it is done by pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex):
The pthread_cond_wait(3) function atomically blocks the current thread waiting on the condition variable specified by cond, and unblocks the mutex specified by mutex.
In the modified version the caller just instructs the event thread library to put the task into the condqueue:
status = init; /* initialize system */
invoke callee /* call asynchronous function */
... /* do something */
lock; /* acquire mutex */
waiting = (status == init); /* is it still the initial value? */
if (waiting) {
status = wait; /* indicate that caller is waiting */
return to event thread library
with return code that requests to
put task into condqueue
}
and the library itself unlocks the mutex afterwards. A condition variable for event threads hence consists of a mutex and an identifier (see Section 3.1.1.2) to match caller and callee. Such a condition variable belongs to a task (just like a file descriptor on which a task might be waiting).
To deal with various kinds of errors, it is necessary to write functions such that they are ``reversible'', i.e., they either perform a transaction or they don't change the state (in an inconsistent manner). Unfortunately, it is fairly complicated to write all functions in such a way. For example, if multiple changes must be made each of which can fail, then either the previous state must be preserved such that it can be restored when something goes wrong, or the changes must be undone individually. For this to work properly, it is essential that the ``undo'' operation itself can not fail. Hence an ``undo'' operation must not rely on resources that might become unavailable during processing; for example, if it requires memory, then that memory must be allocated before the whole operation is started, or at least while the invidual changes are made, such that the ``undo'' operation does not need to allocate memory. This might not be achievable in some cases, however. For example, there are operations in sendmail X that require updates to two persistent databases, i.e., DEFEDB and IBDB. Both of these require disk space which cannot be pre-allocated nor can it be guaranteed that two independent disk write operations will succeed. If the first one fails, then we don't need to perform the second one. However, if the first one succeeds and the second one fails, then there is no guarantee that the first operation can be undone. Hence the operations must be performed in such a manner that even in the worst case mail will not be lost but at most delivered more than once.
Transaction based processing is also fairly complicated in case of asynchronous operations. If a function has to perform changes to a local state and creates a list of change requests which will be performed later on by some asynchronously running thread, then that list of change requests should offer a callback functionality which can be invoked with the status of the operation such that it can either perform the local changes (if that has not been done yet) or undo the local changes (if they have been performed earlier). An additional problem is that other functions may have performed changes inbetween. Hence it is not possible to save the old state and restore it in case of an error because that would erase other (valid) changes. Instead ``relative'' changes must be performed, e.g., in/decreasing a counter.
As mentioned in Section 2.14, item 8, C has some inherent security problems because it does not provide boundary checks on buffers, arrays, or strings. Those checks must be implemented by the program itself. Here are some (obvious) hints when writing code for sendmail X (or other secure programs):
The sendmail X libraries provide communication buffers and replacements for strings, see 3.16.11.1.1, and 3.16.7, which must be used as much as possible; exception should only be made for external libraries that do not support these data structures.
Order of implementation:
based on state threads
Milestone 3 is complete if a mail can be received by SMTPS, safely stored in a queue by QMGR, scheduled for delivery by a very simple scheduler in the QMGR, and then delivered by SMTPC.
Milestone 3 has been reached 2002-08-19.
Notice: this is prototype code. There are many unfinished places within the existing code not to mention all the parts that are completely missing.
Milestone 4 has been reached 2002-09-19.
Next steps:
As of 2004-01-01 sendmail X is running as MTA on the machine of the author. sendmail 8 is only used for mail submission.
This section describes the implementation of some libraries which are used by various sendmail X modules.
Queues, lists, et.al., are taken from OpenBSD <sys/queue.h>.
Two hash table implementations are available: a conventional one with one key and a version with two keys. The latter is currently not used, it was intended for the DA database.
Classes can be used to check whether a key is in a set.
Classes can be implemented via a hash table: sendmail 8 allows up to 256 classes (there is a simple mapping of class names to single byte values). Each element in a class is stored in a hash table, the RHS is a bitmap to indicate to which classes it belongs. Checking whether something is in a class is done as follows: lookup word in hash table: if it exists: is it a member of the class we are looking for (which is just a bitnset() test)? This kind of implementation uses one hash table for many classes.
Alternative implementations are lists if a class has only a few elements (linear search) or tree.
Various versions of balanced trees are available which are based on code that has been found on the internet. See include/sm/tree.h, include/sm/avl.h, and include/sm/bsd-tree.h. The latter is taken from OpenBSD <sys/tree.h>, it can be used to generate functions that operate on splay trees and red-black trees. That is, there aren't generic functions that operate on trees, but specific functions are generated that operate on one type. Of course it is possible to generate a generic version where a tree node contains the usual data, i.e., a key and a pointer to the value.
A restricted size cache is implemented using a hash table (for access) and a linked list which is kept in most-recently used order (MRU).
Note: if we don't need to dynamically delete and add entries from the RSC and we don't need the MRU feature, then we can probably use a (binary) hash table which keeps tracks of the number of entries in it.
The structure definitions given below are hidden from the application.
typedef struct rsc_S rsc_T, *rsc_P;
struct rsc_entry_S
{
CIRCLEQ_ENTRY(rsc_entry_S) rsce_link; /* MRU linkage */
const char *rsce_key; /* lookup key */
unsigned int rsce_len; /* key len */
void *rsce_value; /* corresponding value */
};
struct rsc_S
{
bht_P rsc_table; /* table with key, rsc_entry pairs */
unsigned int rsc_limit; /* max # of entries */
unsigned int rsc_used; /* current # of entries */
rsc_create_F rsc_create; /* constructor */
rsc_delete_F rsc_delete; /* destructor */
CIRCLEQ_HEAD(, rsc_entry_S) rsc_link; /* MRU linkage */
void *rsc_ctx; /* application context */
};
The following functions (and function types) are provided:
typedef void *(*rsc_create_F) (const char *_key, unsigned int _len,
void *_value, void *_ctx);
typedef sm_ret_T (*rsc_delete_F) (void *_value, void *_ctx);
typedef void (rsc_walk_F)(const char *_key, const void *_value, const void *_ctx);
extern rsc_P rsc_create(sm_rpool_P _rpool, unsigned int _limit, unsigned int _htsize,
rsc_create_F _create, rsc_delete_F _delete, void *_ctx);
extern void rsc_free(rsc_P _rsc);
extern void rsc_walk(rsc_P _rsc, rsc_walk_F *_f);
extern sm_ret_T rsc_add(rsc_P _rsc, bool _delok, const char *_key,
unsigned int _len, void *_value, void **_pvalue);
extern const void *rsc_lookup(rsc_P _cache, const char *_key, unsigned int _len);
extern sm_ret_T rsc_rm(rsc_P _rsc, const char *_key, unsigned int _len);
extern int rsc_usage(rsc_P _rsc);
extern void rsc_stats(rsc_P _rsc, char *_out, unsigned int _len);
If an RSC stores data of different types, it must be possible to distinguish between them. This is necessary for functions that ``walk'' through an RSC and perform operations on the data or just for generic functions, i.e., create and delete. As such, the RSC could store an additional type and pass it to the delete and create functions. However, this could also handled by the application itself, i.e., it adds a unique identifier to the data that it stores in an RSC.
The API of a typed RSC implementation differs as follow:
Note that this causes problems with a unified API as described in Section 3.16.12.
We could use a fixed number of keys and set those that we are not interested in to NULL, or we could use a variable number of keys and specify a key and an index. How easy is the second solution to implement? The keys might be implemented as an array with some upper limit that is specified at creation time.
A possible approach to allow for any number of keys is to specify an array of keys (and corresponding lengths). During initialization the maximum number of keys is defined (and stored in the structure describing the DB). Access methods pass in a key and length and the index of the key, where zero is the primary key. When an element is entered, the primary key is used. What about the other keys? They must be specified too since the appropriate entries in the hash tables must be constructed. This seems to become ugly. Let's try a simpler approach first (one, two, three keys).
This might be implemented by having a (second) link in the entries which points to the next element with the same key. Check the hash table implementation.
The event thread library provides the basic framework for a worker based thread pool that is driven by I/O events and by wakeup times for tasks. See Section 3.20.5 for a functional description.
This library uses a general context that describes one event thread system and a per task context. It uses two queues: a run queue of tasks that can be executed, and a wait queue of tasks that wait for some event. Each task is in exactly one of the queues at each time.
The event thread context looks like this (2004-07-27):
struct sm_evthr_ctx_S
{
pthread_cond_t evthrc_cv;
pthread_mutex_t evthrc_waitqmut;
pthread_mutex_t evthrc_runqmut;
int evthrc_max; /* max. number of threads */
int evthrc_min; /* min. number of threads */
int evthrc_cur; /* current number of threads */
int evthrc_idl; /* idle threads */
int evthrc_stop; /* stop threads */
int evthrc_maxfd; /* maximum number of FDs */
timeval_T evthrc_time; /* current time */
sm_evthr_task_P *evthrc_fd2t; /* array to map FDs to tasks */
/* pipe between control thread and worker/signal threads */
int evthrc_pipe[2];
CIRCLEQ_HEAD(, sm_evthr_task_S) evthrc_waitq;
CIRCLEQ_HEAD(, sm_evthr_task_S) evthrc_runq;
};
The system maintains some number of worker threads between the specified minimum and maximum. Idle threads vanish after some timeout until the minimum number is reached.
An application function has this prototype:
typedef sm_ret_T (evthr_task_F)(sm_evthr_task_P);
That is, the function receives the per-task context, whose structure is listed next, as a parameter. Even though this is a violation of the abstraction principle, it allows for some functionality which would be awkward to achieve otherwise. For example, a user application can directly manipulate the next wakeup time. We could hide this behind a void pointer and provide some functions to manipulate the task context if we want to hide the implementation.
struct sm_evthr_task_S
{
CIRCLEQ_ENTRY(sm_evthr_task_S) evthrt_next;
/* protects evthrt_rqevf and evthrt_sleep */
pthread_mutex_t evthrt_mutex;
int evthrt_rqevf; /* requested event flags; see below */
int evthrt_evocc; /* events occurred; see below */
int evthrt_state; /* current state; see below */
int evthrt_fd; /* fd to watch */
timeval_T evthrt_sleep; /* when to wake up */
evthr_task_F *evthrt_fct; /* function to execute */
void *evthrt_actx; /* application context */
sm_evthr_ctx_P evthrt_ctx; /* evthr context */
sm_evthr_nc_P evthrt_nc; /* network connection */
};
The first element describes the linked list, i.e., either the wait queue or the run queue. The second element (requested event flags) lists the events the task is waiting for:
| flag | meaning |
| EVT_EV_RD | task is waiting for fd to become ready for read |
| EVT_EV_WR | task is waiting for fd to become ready for write |
| EVT_EV_LI | task is waiting for fd to become ready for accept |
| EVT_EV_SL | task is waiting for timeout |
The third element (events occurred) lists the events that activated the task:
| flag | meaning |
| EVT_EV_RD_Y | task is ready for read |
| EVT_EV_WR_Y | task is ready for write |
| EVT_EV_LI_Y | task received a new connection |
| EVT_EV_SL_Y | task has been woken up (after timeout) |
The fourth element (status) consists of some internal state flags, e.g., in which queue the task is and what it wants to do next.
The event thread library provides these functions:
extern sm_ret_T evthr_init(sm_evthr_ctx_P *_pctx, int _minthr, int _maxthr, int _maxfd);
extern sm_ret_T evthr_task_new(sm_evthr_ctx_P _ctx,
sm_evthr_task_P *_task,
int _ev,
int _fd,
timeval_T *_sleept,
evthr_task_F *_fct,
void *_taskctx);
extern sm_ret_T evthr_loop(sm_evthr_ctx_P _ctx);
extern sm_ret_T evthr_waitq_app(sm_evthr_task_P _task);
extern sm_ret_T evthr_en_wr(sm_evthr_task_P _task);
extern sm_ret_T evthr_time(sm_evthr_ctx_P _ctx, timeval_T *_ct);
extern sm_ret_T evthr_new_sl(sm_evthr_task_P _task, timeval_T _slpt, bool _change);
An application first initializes the library (evthr_init()), then it creates at least one task (evthr_task_new()), and thereafter calls evthr_loop() which in turn monitors the desired events (I/O, timeout) and invokes the callback functions. Those callback functions return a result which indicates what should happen with the task:
| flag | meaning |
| EVTHR_OK | do nothing, task has been taken care of |
| EVTHR_WAITQ | put in wait queue |
| EVTHR_RUNQ | put in run queue |
| EVTHR_SLPQ | sleep for a while |
| EVTHR_DEL | delete task |
| EVTHR_TERM | terminate event thread loop |
Section 3.20.5 describes some problems that need to be solved:
Question: what about callback functions for signals? Currently the usual signals terminate the system.
sendmail X uses RCBs for communication between the various modules (see Section 3.16.11.1.1). This section describes some functions that simplify handling of RCB based communication for modules which use the event thread library (see Section 4.3.8).
This library uses the following structure:
struct rcbcom_ctx_S
{
sm_evthr_task_P rcbcom_tsk; /* event thread task */
sm_rcb_P rcbcom_rdrcb; /* RCB for communication (rd) */
SIMPLEQ_HEAD(, sm_rcbe_S) rcbcom_wrrcbl; /* RCB list (wr) */
pthread_mutex_t rcbcom_wrmutex;
};
The first entry is a pointer to the task that is responsible for the communication. The second entry is the RCB in which data is received. The third is a list of RCBs for data that must be send out, this list is proteced by the mutex that is the last element of the structure.
The following functions are provided by the library:
sm_ret_T sm_rcbcom_open(rcbcom_ctx_P rcbcom_ctx): create a RCB communication context.
sm_ret_T sm_rcbcom_close(rcbcom_ctx_P rcbcom_ctx): close a RCB communication context.
sm_ret_T sm_rcbe_new_enc(sm_rcbe_P *prcbe, int minsz): create an entry for the write RCB list and open it for encoding.
sm_rcbcom_prerep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, sm_rcbe_P *prcbe): prepare a reply to a module: close the read RCB after decoding, open it for encoding, create a new RCB for writing (using sm_rcbe_new_enc()), and put the task back into the wait queue (unless tsk is NULL). Notice: after calling this function (with tsk not NULL), the task is not under control of the caller anymore, hence all manipulation of its state must be done via the functions provided by the event thread library.
sm_rcbcom_endrep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, bool notified, sm_rcbe_P rcbe): close the write RCB, append it to the RCB list in the context, and if notified is not set inform the task about the new write request (using evthr_en_wr()).
sm_rcbcom2mod(sm_evthr_task_P tsk, rcbcom_ctx_P rcbcom_ctx): send the first element of the RCB write list to filedescriptor specified in the task. This function should be called when the event threads library invoke the callback for the task and denotes that the file descriptor is ready for writing, i.e., it will be called from an event thread task that checks for I/O activity on that file descriptor. Since the function does not conform to the function specification for a task, it is called by a wrapper that extracts the RCB communication context from the task (probably indirectly). The function will only send the first element of the list; if the list is empty afterwards, it will disable the write request for this task, otherwise the event thread library will invoke the callback again when the filedescriptor is ready for writing. This offers a chance for an optimization: check whether another RCB can be written; question: is there a call that can determine the free buffer size for the file descriptor such that the next write operation does not block?
An asynchronous DNS resolver has been implemented (libdns/) according to the specification of Section 3.16.13.
A DNS request has the following structure:
struct dns_req_S
{
sm_cstr_P dnsreq_query; /* the query itself */
time_T dnsreq_start; /* request start time */
/* unsigned int dnsreq_tries; * retries */
dns_type_T dnsreq_type; /* query type */
unsigned short dnsreq_flags; /* currently: is in list? */
sm_str_P dnsreq_key; /* key for hash table: query + type */
dns_callback_F *dnsreq_fct; /* callback into application */
void *dnsreq_ctx; /* context for application callback */
TAILQ_ENTRY(dns_req_S) dnsreq_link; /* next entry */
};
A DNS result consists of a list of entries with the following elements:
typedef union
{
ipv4_T dnsresu_a;
sm_cstr_P dnsresu_name; /* name from DNS */
} dns_resu_T;
struct dns_res_S
{
sm_cstr_P dnsres_query; /* original query */
sm_ret_T dnsres_ret; /* error code */
dns_type_T dnsres_qtype; /* original query type */
unsigned int dnsres_entries; /* number of entries */
unsigned int dnsres_maxentries; /* max. number of entries */
dns_resl_T dnsres_hd; /* head of list of mx entries */
};
struct dns_rese_S
{
dns_type_T dnsrese_type; /* result type */
unsigned int dnsrese_ttl; /* TTL from DNS */
unsigned short dnsrese_pref; /* preference from DNS */
unsigned short dnsrese_weight; /* for internal randomization */
sm_cstr_P dnsrese_name; /* RR name */
TAILQ_ENTRY(dns_rese_S) dnsrese_link; /* next entry */
dns_resu_T dnsrese_val;
};
A DNS manager context contains the following elements:
struct dns_mgr_ctx_S
{
unsigned int dnsmgr_flags;
dns_rql_T dnsmgr_req_hd; /* list of requests */
dns_req_P dnsmgr_req_cur; /* current request */
/* hash table to store requests */
bht_P dnsmgr_req_ht;
#if SM_USE_PTHREADS
pthread_mutex_t dnsmgr_mutex; /* for the entire context? */
sm_evthr_task_P dnsmgr_tsk; /* XXX Just one? */
sm_evthr_task_P dnsmgr_cleanup;
#endif /* SM_USE_PTHREADS */
};
and a DNS task looks like this:
struct dns_tsk_S
{
/* XXX int or statethreads socket */
int dnstsk_fd; /* socket */
dns_mgr_ctx_P dnstsk_mgr; /* DNS manager */
uint dnstsk_flags; /* operating flags */
uint dnstsk_timeouts; /* queries that timed out */
sockaddr_in_T dnstsk_sin; /* socket description */
sm_str_P dnstsk_rd; /* read buffer */
sm_str_P dnstsk_wr; /* write buffer */
};
The DNS library offers functions to create and delete a DNS manager context:
sm_ret_T dns_mgr_ctx_new(uint flags, dns_mgr_ctx_P *pdns_mgr_ctx); sm_ret_T dns_mgr_ctx_del(dns_mgr_ctx_P dns_mgr_ctx);
and similar functions for DNS tasks:
sm_ret_T dns_tsk_new(dns_mgr_ctx_P dns_mgr_ctx, uint flags, ipv4_T ipv4,
dns_tsk_P *pdns_tsk);
sm_ret_T dns_tsk_del(dns_tsk_P dns_tsk);
sm_ret_T dns_tsk_start(dns_mgr_ctx_P dns_mgr_ctx, dns_tsk_P dns_tsk,
sm_evthr_ctx_P evthr_ctx)
An application can make a DNS request using the following function:
sm_ret_T dns_req_add(dns_mgr_ctx_P dns_mgr_ctx, sm_cstr_P query,
dns_type_T type, dns_callback_F *fct, void *ctx);
Internal functions of the library are:
sm_ret_T dns_comm_tsk(sm_evthr_task_P tsk);
sm_ret_T dns_tsk_cleanup(sm_evthr_task_P tsk);
sm_ret_T dns_tsk_rd(sm_evthr_task_P tsk)
sm_ret_T dns_tsk_wr(sm_evthr_task_P tsk)
void dns_req_del(void *value, void *ctx);
sm_ret_T dns_receive(dns_tsk_P dns_tsk);
sm_ret_T dns_send(dns_tsk_P dns_tsk);
sm_ret_T dns_decode(sm_str_P ans, uchar *query, int qlen, dns_type_T *ptype,
dns_res_P dns_res);
dns_comm_tsk() is a function that is called whenever I/O activity is possible, it invokes the read and write functions dns_tsk_rd() and dns_tsk_wr(); respectively. dns_tsk_cleanup() is cleanup task that deals with requests which didn't receive an answer within a certain amount of time.
The following steps are necessary for initializing and starting the DNS resolver (when using the event threads system):
/* Initialize DNS resolver */ ret = dns_rslv_new(random); /* Create DNS manager context */ ret = dns_mgr_ctx_new(0, &dns_mgr_ctx); /* Create one DNS resolver task */ ret = dns_tsk_new(dns_mgr_ctx, 0, ipv4, &dns_tsk); /* Start DNS tasks (resolver and cleanup) */ ret = dns_tsk_start(dns_mgr_ctx, dns_tsk, evthr_ctx);
dns_req_add() creates a DNS request and adds it to the list maintained in the DNS manager context (dnsmgr_req_hd), unless such a request is already queued, which is checked via the hash table dnsmgr_req_ht, to which the request is always added. The key for the hash table is the DNS query string and the DNS query type concatenated (if the query string has a trailing dot it will be removed). Whenever a DNS request is added to the list the write event is triggered for dns_tsk_wr(), which will take the current request (pointed to by dnsmgr_req_cur), form a DNS query and send it to a DNS server using dns_send(). Notice: the request will not be removed from the list, this is done by either the cleanup task or the read function. To indicate whether a request is in the list or merely in the hash table, the field dnsreq_flags is used.
The hash table is used by the function dns_tsk_rd() which receives replies from a DNS server to identify all requests for a DNS query (it uses dns_tsk_rd() and dns_decode()). The function removes all those requests from the hash table and the DNS manager list, and creates a local list out of them. Then it walks through that list and invokes the callback functions specified in the requests with DNS result and the application specific context.
The cleanup task dns_tsk_cleanup() uses the DNS manager list of requests (from the first element up to the current one), and checks whether they have timed out (based on the time the request has been made dnsreq_start). If this is the case then the request is removed from the list and appended to a local list. Moreover, all requests in the hash table for the same query are moved into the local list too - this needs improvement. As soon as a request is found that is not timed out (or the current element is reached), the search stops (this currently doesn't allow for individual timeouts). Thereafter, the callbacks for the requests in the local lists are invoked with a DNS result that contains an error code (timeout).
There is currently a simple implementation for logging in libmta/log.c. It closely follows the ISC logging module API mentioned in Section 3.16.16.1.
The current implementation has a slight problem with logfile rotation. The programs use stdout/stderr for logging; they do not open a named logfile themselves, this is done by the MCP for them. Hence there is no way for them to reopen a logfile, instead the sm_log_reopen() function rewinds the file using ftruncate(2) and fseek(3).
Alternatively the name of a logfile can be passed to each module such that it can open the file itself and hence the filename is available for the reopen function. The filename could be stored in the logging context and hence the reopen function could act accordingly, i.e., use sm_io_open() if a filename exists and otherwise the method described above.
The first prototype of SMTPS is based on state threads (see Section 3.20.3.1).
This prototype uses a set of threads which is limited by an upper and lower number. If threads are idle for some time and there are more than a specified number of idle threads available, they terminate. If not enough threads are available, new ones are created up to a specified maximum.
Remark (placed here so it doesn't get lost):
there is a restricted number (
60000) of possible open
connections to one port.
Could that limit the throughput we are trying to achieve
or is such a high number of connections unfeasible?
We do not want a separate connection between QMGR and SMTPS for each thread in SMTPS, hence we need to associate the data from QMGR with the right thread in SMTPS. One approach is to have a receiver thread in SMTPS which communicates with the QMGR. It receives all data from the QMGR and identifies the context (session/transaction) to which the data belongs. This needs some list of all contexts, e.g., an AVL tree, or, if the number of entries is small enough, a linear list. Question: is there some better method, i.e., instead of searching some structure have direct access to the right thread (see also Section 3.1.1.2)? There might be some optimization possible since each SMTPS has only a limited number of threads, so we could have an array of that size and encode an index into that array into the RCB, e.g., use another ID type that is passed around (like a context pointer). It then adds that data to the context and notifies the thread There is one thread to read from the communication channel, but multiple tasks can actually write to it; writing is sequentialized by a mutex. In a conventional thread model, we would just select on I/O activities for that channel and notifications when a new RCB is added to the list to send to the QMGR (like it is done in the QMGR), however, in state threads I/O activity is controlled by the internal scheduler. Since there are multiple threads, it might be necessary to control how far ahead the writers can be of the readers (to avoid starvation and unfairness). However, this should be self-adjusting since threads are waiting for replies for requests they send before they send out new ones (by default, in some cases a few requests may be outstanding from one thread). If too many threads send data, then the capacity of the communication channel and the way requests are handled by the QMGR should avoid starvation and guarantee fairness.
As explained above there is one thread that takes care of the communication between the module and the QMGR. This thread uses the following structure as context:
struct s2q_ctx_S
{
int s2q_status; /* status */
st_netfd_t s2q_fd; /* fd for communication */
int s2q_smtps_id; /* smtps id */
st_mutex_t s2q_wr_mutex; /* mutex for write */
unsigned int s2q_maxrcbs; /* max. # of outstanding requests */
unsigned int s2q_currcbs; /* current # of outstanding requests */
sessta_id_P *s2q_sids; /* array of session ids */
smtps_sess_P *s2q_sess; /* array of session ctx */
};
For initialization and termination of the communication task the following two functions are provided:
sm_ret_T sm_s2q_init(s2q_ctx_P s2q_ctx, int smtps_id, unsigned int maxrcbs); sm_ret_T sm_s2q_stop(s2q_ctx_P s2q_ctx);
The initialization function connects to the QMGR and stores the file descriptor for communication in s2q_fd. It allocates two arrays for sessions IDs and session contexts which are used to find the SMTPS session for an incoming RCB, and it sends the initial ``A new SMTPS has been started'' to the QMGR. Finally sm_s2q_init() starts a thread that executes the function void *sm_rcb_from_srv(void *arg) which receives the s2q context as parameter. This function receives an RCB from QMGR and notifies the thread that is associated with the task via a condition variable; the thread can be found using the s2q_sids array.
Data can be sent to the QMGR using one of the functions sm_s2q_*() for new session ID, close session ID, new transaction ID, new recipient, close transaction ID, and discard transaction ID.
The function sm_w4q2s_reply() is used to wait for a reply from QMGR. It waits on a condition variable (which is stored in the SMTPS session context) which is signalled by sm_rcb_from_qmgr().
Initially the SMTP server sends the QMGR its id and the maximum number of threads it is going to create.
| RT_S2Q_NID | id of new SMTPS |
| RT_S2Q_ID | id of SMTPS |
| RT_S2Q_CID | close SMTPS (id) |
| RT_S2Q_STAT | status |
| RT_S2Q_MAXTHRDS | max number of threads |
| RT_S2Q_NSEID | new session id |
| RT_S2Q_SEID | session id |
| RT_S2Q_CSEID | close session id |
| RT_S2Q_CLTIP4 | client IPv4 address |
| RT_S2Q_CLTIP6 | client IPv6 address |
| RT_S2Q_CLTPORT | client port |
| RT_S2Q_NTAID | new transaction id |
| RT_S2Q_TAID | transaction id |
| RT_S2Q_CTAID | close transaction id |
| RT_S2Q_DTAID | discard transaction id |
| RT_S2Q_MAIL | mail from |
| RT_S2Q_RCPT_IDX | rcpt idx |
| RT_S2Q_RCPT | rcpt to |
| RT_S2Q_CDBID | cdb id |
The common reply format from QMGR to SMTPS consists of the SMTPS id (which is only transmitted for paranoia), a session or transaction id, a status code and an optional status text:
| RT_Q2S_ID | SMTPS id |
| RT_Q2S_SEID/RT_Q2S_TAID | session/transaction id |
| RT_Q2S_STAT |
status (ok, reject, more detailed?) |
| RT_Q2S_STATT | status text |
The function sm_rcb_from_srv() uses the session/transaction id to find the correct thread to which the rest of the RCB will be given.
| RT_Q2S_ID | id of SMTPS |
| RT_Q2S_STAT | status for session/transaction/... |
| RT_Q2S_STATV | status value (text follows) |
| RT_Q2S_STATT | status text |
| RT_Q2S_SEID | session id |
| RT_Q2S_TAID | transaction id |
| RT_Q2S_RCPT_IDX | rcpt idx |
| RT_Q2S_CDBID | cdb id |
| RT_Q2S_THRDS | slow down |
| RT_Q2S_STOP | stop reception (use slow = 0?) |
| RT_Q2S_DOWN | shut down |
The SMTP server uses the AR as map lookup server to avoid blocking calls in the state-threads application. While the anti-spam logic etc is implemented in SMTPS, the map lookups are performed by SMAR. Hence SMTPS only sends minimal information to SMAR, e.g., the sender or recipient address and asks for lookups in some maps with certain features, e.g., lookup the full address, the domain part, the address without details (``+detail'').
| RT_S2A_TAID | transaction id |
| RT_S2A_MAIL | mail from |
| RT_S2A_RCPT_IDX | rcpt idx |
| RT_S2A_RCPT | rcpt to (printable address) |
| RT_S2A_LTYPE | lookup type |
| RT_S2A_LFLAGS | lookup flags |
To simplify the SMTP server code, the reply format for SMAR is basically the same as for QMGR:
| RT_A2S_ID | id of SMTPS |
| RT_A2S_TAID | transaction id |
| RT_A2S_STAT |
status (ok, reject, more detailed?) |
| RT_A2S_STATT | status message |
| RT_A2S_MAIL_ST | mail status |
| RT_A2S_RCPT_IDX | rcpt index |
| RT_A2S_RCPT_ST | rcpt status |
Values for lookup types are:
| LT_LOCAL_ADDR | is local address? |
| LT_RCPT_OK | is address ok as a recipient? |
| LT_MAIL_OK | is address ok as a sender? |
Values for lookup flags are:
| LF_DOMAIN | try domain |
| LF_FULL | try full address |
| LF_LOCAL | try localpart |
| LF_NODETAIL | try without detail |
As explained elsewhere (2.8.2) it is possible to specify multiple delivery classes and multiple delivery agents that implement delivery classes. The former are referenced by the address resolver when selecting a mailer. The latter are selected by the scheduler after it receives a mailer to use.
Every delivery agent has an index and a list of delivery classes it implements. There is also a list of delivery classes (which are referenced by some id, most likely a numeric index into an array). This list is maintained by SMAR, each DA, and QMGR (and must obviously be kept in sync if numeric indices are used instead of names). QMGR keeps for each delivery class a list of delivery agents that implement the class, which can be used by the scheduler to select a DA that will perform a delivery attempt.
Note: As described in Section 3.8.3.1 item 6, the first version of sendmail X does not need to implement the full set of this; all delivery agents implement the same delivery classes, hence they can be selected freely without any restriction.
The first prototype of SMTPC is based on state threads (see Section 3.20.3.1).
It follows a similar thread model as that used for the SMTP server daemon (see Section 4.4).
See Section 3.4.5.1 for a description.
As usual, a protocol header is sent first. Moreover, the next entry in each RCB is the identifier of the SMTPC to which the QMGR wants to talk: RT_Q2C_ID.
The rest of the RCB is described below for each function.
Notice: for status codes an additional text field might follow, which currently isn't specified here.
RT_Q2C_DCID: delivery class id.
More data to follow, e.g., requirements about the session.
For the transaction data see below (item 7).
RT_C2Q_SESTAT: session status: either SMTP status code or an error code, e.g., connection refused etc.
The recipient data might be repeated to list multiple recipients Notice: we may run into a size limit of RCBs here; do we need something like a continuation RCB?.
See Section 3.9.2.1 about the status information that is sent from an SMTP client to QMGR.
RT_C2Q_TASTAT: transaction status: this is the overall status of a transaction. It has two parts: an error code and an error state, the latter describes in which state of the SMTP dialogue (or internal state) the error occurred. An optional error message can be sent (RT_C2Q_STATT). If recipient statuses are sent then the record type RT_C2Q_TARSTAT is used instead. A recipient status consists of three items:
RT_Q2C_CSEID: close session id
This value can be pretty much ignored for all practical purposes, except if we want to see whether the server behaves properly and still responds.
RT_C2Q_SESTAT: session status (or do we want to use a different record type? Might be useful to distinguish to avoid confusion)
The main SMTPC context structure looks like this:
struct sc_ctx_S
{
unsigned int sc_max_thrds; /* Max number of threads */
unsigned int sc_wait_thrds; /* # of threads waiting to accept */
unsigned int sc_busy_thrds; /* # of threads processing request */
unsigned int sc_rqst_count; /* Total # of processed requests */
uint32_t sc_status; /* SMTPC status */
sm_str_P sc_hostname; /* SMTPC hostname */
sc_t_ctx_P *sc_scts; /* array of sct's */
};
The last element of that structure is an array of SMTPC thread contexts
(
to sc_max_thrds
):
struct sc_t_ctx_S
{
sc_ctx_P sct_sc_ctx; /* pointer back to sc_ctx */
unsigned int sct_thr_id; /* thread id (debugging) */
unsigned int sct_status;
st_cond_t sct_cond_rd; /* received data from QMGR */
sc_sess_P sct_sess; /* current session */
};
The condition variable denotes when data from the QMGR is received for this particular thread. The last element is a pointer to the SMTPC session:
struct sc_sess_S
{
sc_t_ctx_P scse_sct_ctx; /* pointer to thread context */
sm_file_T *scse_fp; /* file to use (SMTP) */
sm_str_P scse_rd; /* smtp read buffer */
sm_str_P scse_wr; /* smtp write buffer */
sm_str_P scse_str; /* str for general use */
sm_rpool_P scse_rpool;
unsigned int scse_cap; /* server capabilities */
unsigned int scse_flags;
unsigned int scse_state;
struct in_addr *scse_client; /* XXX use a generic struct! */
sc_ta_P scse_ta; /* current transaction */
sessta_id_T scse_id;
sm_rcb_P scse_rcb; /* rcb for communication with QMGR */
SOCK_IN_T scse_rmt_addr; /* Remote address */
st_netfd_t scse_rmt_fd; /* fd */
};
The SMTPC transaction structure looks as follows:
struct sc_ta_S
{
sc_sess_P scta_sess; /* pointer to session */
sm_rpool_P scta_rpool;
sc_mail_P scta_mail; /* mail from */
sc_rcpts_P scta_rcpts; /* rcpts */
sc_rcpt_P scta_rcpt_p; /* current rcpt for reply */
unsigned int scta_rcpts_rcvd; /* # of recipients replies received */
unsigned int scta_rcpts_tot; /* number of recipients total */
unsigned int scta_rcpts_snt; /* number of recipients sent */
unsigned int scta_rcpts_ok; /* number of recipients ok */
unsigned int scta_rcpts_lmtp; /* #LMTP rcpts still to collect */
unsigned int scta_state;
smtp_status_T scta_status; /* SMTP status code (if applicable) */
sessta_id_T scta_id; /* transaction id */
sm_str_P scta_cdb_id; /* CDB id */
};
In the main() function SMTPC calls several initialization function, one of which (sc_init(sc_ctx)) initializes the SMTPC context and allocates the array of SMTPC thread contexts. Then it starts the minimum number of threads (using start_threads(sc_ctx)) and the main thread takes care of signals afterwards. The threads run the function sc_hdl_requests() which receives the SMTPC context as parameter. This function looks for a free entry in the SMTPC thread context array, and allocates a new thread context which it assigns to that entry. It also allocates a new SMTPC session context. Thereafter it sets its status to SC_T_FREE and the first thread that is called informs the QMGR communication thread that SMTPC is ready to process tasks. The main part of the function processes a loop:
while (WAIT_THREADS(sc_ctx) <= max_wait_threads) { ... }
i.e., the thread stays active as long as the number of waiting threads is below the allowed maximum. This takes care of too many waiting threads by simply terminating them if the condition is false, in which case the thread cleans up after itself and terminates. Inside the loop the thread waits on its condition variable: sc_t_ctx->sct_cond_rd. If that wait times out, the current session (if one is open) will be terminated. If the QMGR actually has a task for this thread, then it first checks whether another thread should be started:
if (WAIT_THREADS(sc_ctx) < min_wait_threads &&
TOTAL_THREADS(sc_ctx) < MAX_THREADS(sc_ctx))
{ /* start another thread */ }
and then handles the current session: handle_session(sc_t_ctx). This functions handles one SMTP client session. The state of the session is recorded in sc_sess->scse_state and can take one of the following values:
| SCSE_ST_NONE | no session active |
| SCSE_ST_NEW | new session |
| SCSE_ST_CONNECTED | connection succeeded |
| SCSE_ST_GREETED | received greeting |
| SCSE_ST_OPEN | connection open |
| SCSE_ST_CLOSED | close session |
Based on this state the function opens a session if that hadn't happened yet and performs one transaction according to the data from the QMGR. Depending on a flag in sc_sess->scse_flags the session is optionally closed afterwards.
As usual there is one thread that takes care of the communication between the module and the QMGR. This thread uses the following structure as context:
struct c2q_ctx_S
{
sc_ctx_P c2q_sc_ctx; /* pointer back to SMTPC context */
unsigned int c2q_status; /* status */
st_netfd_t c2q_fd; /* fd for communication */
unsigned int c2q_sc_id; /* smtpc id */
st_cond_t c2q_cond_rd; /* cond.var for read */
st_cond_t c2q_cond_wr; /* cond.var for write */
unsigned int c2q_maxses; /* max. # of open sessions */
sc_sess_P *c2q_sess; /* array of session ctx */
};
For initialization and termination the following two functions are provided:
sm_ret_T sm_c2q_init(sc_ctx_P sc_ctx, c2q_ctx_P c2q_ctx, unsigned int sc_idx, unsigned int maxses); sm_ret_T sm_c2q_stop(c2q_ctx_P c2q_ctx);
The initialization function starts a thread that executes the function void *sc_rcb_from_qmgr(void *arg) which receives the c2q context as parameter. This function receives an RCB from QMGR and notifies a thread that is associated with the task or finds a free SMTPC thread if it is a new task. To maintain the former information one array for session contexts c2q_sess is allocated; its size is maxses which is set to MAX_THREADS(sc_ctx) by the caller. This allows the communication module to find the correct session context based on the session (or transaction) identifier sent by the QMGR in its requests if the request refers to an open session. To find a free SMTPC thread, the array sc_scts in the SMTPC context is searched for a NULL entry.
Status information can be sent back to the QMGR using the function sm_ret_T sc_c2q(sc_t_ctx_P sc_t_ctx, uint32_t whichstatus, sm_ret_T ret, c2q_ctx_P c2q_ctx).
The SMTP client functionality is fairly restricted right now, but the system implements full pipelining (in contrast to sendmail 8 which uses MAIL as synchronization point). As usual, the SMTP client is also able to speak LMTP.
To open and close a SMTP session two functions are provided: sm_ret_T sc_sess_open(sc_t_ctx_P sc_t_ctx) and sm_ret_T sc_sess_close(sc_t_ctx_P sc_t_ctx). The function sm_ret_T sc_one_ta(sc_t_ctx_P sc_t_ctx) performs one SMTP transaction. As it can be seen from the prototypes, the only parameter passed to these function is the SMTPC thread context which contains (directly or indirectly) pointers to the current SMTPC session and transaction.
As shown in Section 4.6.2.1, the SMTPC session context contains three strings (see 3.16.7) that are used for the SMTP dialog and related operations.
Since the content database stores the mail in SMTP format, it can be sent out directly without any interaction. Similar to the SMTP server side this function access the file buffer directly to avoid too much copying.
Just some items to take into consideration for the implementation of the queue manager. These are written down here so they don't get lost...
Problem here: what about disk I/O? For example: calling fsync() for the logfile may cause the queue manager to block. If the thread implementation doesn't schedule another thread while one is blocked on disk I/O, then the entire process will hang and the queue manager will not respond to other requests.
If this actually happens (fairly likely on some OSs with user-land pthread implementation), and it causes a problem (performance), then it might be necessary to create another process that actually performs disk I/O on behalf of the QMGR.
How about a flow diagram? Some architectural overview would be nice.
The QMGR should not have to deal with many connections. SMTPS and SMTPC are multi-threaded themselves; we may have some SMTPS/SMTPC processes. However, it won't be so many that we have a problem with the number of connections to monitor, i.e., poll() should be sufficient.
Which threading model should we choose? Just a few worker threads that will go back to work whenever they encounter a blocking action? See Section 3.20 for discussion.
Do we need priority queues or can we serve all jobs FIFO?
The QMGR is based on the event threads library described in Section 4.3.8.
Currently access to tasks is controlled via the mutexes that control the queues: if a task is taken out of a queue, it is under the sole control of the thread that did it, no other thread can (should) access the task. Unless we change this access model, no mutex is necessary for individual tasks.
The queue manager has several data structures that can be concurrently accessed from different threads. Hence the access must be protected by mutexes unless there are other means which prevent conflicts. Some data structures can be rather larger, e.g., the various DBs and caches. Locking them for an extended time may cause lock contention. Some data structures and operations on them may allow to lock only a single element, others may require to lock the entire structure. Examples of the latter are adding and removing elements which in most cases require locking of the entire structure.
In some cases there might be ways around locking contention. For example, to delete items from a DB (or cache) the item might be just marked ``Delete'' instead of actually deleting it. This only requires locking of a single entry, not the entire DB. Those ``Delete'' entries can be removed in a single sweep later on (or during normal ``walk through'' operations), or they can be simply reclaimed for use. Question: what is more efficient? That is, if the DB is large and a walk through all elements is required to free a few then that might take too long, and we shouldn't hold a lock too long. We could gather ``Delete'' elements in a queue, then we don't have to walk through the entire DB. However, then the position of the elements must be fixed such that we can directly access and delete them, or at least lookup prior to deletion must be fast. If the DB internally may rearrange the location of entries then we can't keep a pointer to them. Question: will this ever happen? Some DB versions may do this, how about the ones we use? In some cases, some of the algorithm may require that DB elements don't move, but in most cases the elements just contain pointers to the data which isn't moved and hence can be accessed even if the DB rearranges its internal data structures.
If a system locks various items then there is a potential for deadlocks. One way to prevent this is a locking hierarchy, i.e., items are always locked in the same order. We probably need to define a locking order. It's currently unclear how this can be done such that access is still efficient without too much locking contention. See also Section 4.7.2 for possible ways around locking contention.
The main context for QMGR looks like this: (2004-04-14)
struct qmgr_ctx_S
{
sm_magic_T sm_magic;
pthread_mutex_t qmgr_mutex;
unsigned int qmgr_status; /* see below, QMGR_ST_* */
time_T qmgr_st_time;
/* Resource flags */
uint32_t qmgr_rflags; /* see QMGR_RFL_* */
/* Overall value to indicate resource usage 0:free 100:overloaded */
unsigned int qmgr_total_usage;
/* Status flags */
uint32_t qmgr_sflags; /* see QMGR_SFL_* */
sm_str_P qmgr_hostname;
sm_str_P qmgr_pm_addr; /* <postmaster@hostname> */
/* info about connections? */
fs_ctx_P qmgr_fs_ctx;
cdb_fsctx_P qmgr_cdb_fsctx;
unsigned long qmgr_cdb_kbfree;
edb_fsctx_P qmgr_edb_fsctx;
unsigned long qmgr_edb_kbfree;
unsigned long qmgr_ibdb_kbfree;
/* SMTPS */
id_count_T qmgr_idc; /* last used SMTP id counter */
int qmgr_sslfd; /* listen fd */
int qmgr_ssnfd; /* number of used fds */
uint32_t qmgr_ssused; /* bitmask for used elements */
qss_ctx_P qmgr_ssctx[MAX_SMTPS_FD];
ssocc_ctx_P qmgr_ssocc_ctx;
occ_ctx_P qmgr_occ_ctx;
/* SMTPC */
int qmgr_sclfd; /* listen fd */
int qmgr_scnfd; /* number of used fds */
uint8_t qmgr_scused; /* bitmask for used elements */
qsc_ctx_P qmgr_scctx[MAX_SMTPC_FD];
sm_evthr_ctx_P qmgr_ev_ctx; /* event thread context */
iqdb_P qmgr_iqdb; /* rsc for incoming edb */
ibdb_ctx_P qmgr_ibdb; /* backup for incoming edb */
sm_evthr_task_P qmgr_icommit; /* task for ibdbc commits */
qss_opta_P qmgr_optas; /* open transactions (commit) */
sm_evthr_task_P qmgr_sched; /* scheduling task */
aq_ctx_P qmgr_aq; /* active envelope db */
edb_ctx_P qmgr_edb; /* deferred envelope db */
edbc_ctx_P qmgr_edbc; /* cache for envelope db */
sm_evthr_task_P qmgr_tsk_cleanup; /* task for cleanup */
qcleanup_ctx_P qmgr_cleanup_ctx;
sm_maps_P qmgr_maps; /* map system context */
/* AR */
sm_evthr_task_P qmgr_ar_tsk; /* address resolver task */
int qmgr_ar_fd; /* communication fd */
qar_ctx_P qmgr_ar_ctx;
sm_rcbh_T qmgr_rcbh; /* head for RCB list */
unsigned int qmgr_rcbn; /* number of entries in RCB list */
/* currently protected by qmgr_mutex */
qmgr_conf_T qmgr_conf;
sm_log_ctx_P qmgr_lctx;
sm_logconfig_P qmgr_lcfg;
uint8_t qmgr_usage[QMGR_RFL_LAST_I + 1];
uint8_t qmgr_lower[QMGR_RFL_LAST_I + 1];
uint8_t qmgr_upper[QMGR_RFL_LAST_I + 1];
};
There are task contexts for QMGR/SMTPS (2004-04-15):
struct qss_ctx_S
{
sm_magic_T sm_magic;
rcbcom_ctx_T qss_com;
qmgr_ctx_P qss_qmgr_ctx; /* pointer back to main ctx */
int qss_id; /* SMTPS id */
uint8_t qss_bit; /* bit for qmgr_ssctx */
qss_status_T qss_status; /* status of SMTPS */
unsigned int qss_max_thrs; /* upper limit for threads */
unsigned int qss_max_cur_thrs; /* current limit for threads */
unsigned int qss_cur_session; /* current # of sessions */
};
and QMGR/SMTPC (2004-04-15):
struct qsc_ctx_S
{
sm_magic_T sm_magic;
rcbcom_ctx_T qsc_com;
qmgr_ctx_P qsc_qmgr_ctx; /* pointer back to main ctx */
int qsc_id; /* SMTPC id */
uint8_t qsc_bit; /* bit for qmgr_ssctx */
dadb_ctx_P qsc_dadb_ctx; /* pointer to DA DB context */
/* split this in status and flags? */
qsc_status_T qsc_status; /* status of SMTPC */
uint32_t qsc_id_cnt;
};
Both refer to a generic communication structure:
struct qcom_ctx_S
{
qmgr_ctx_P qcom_qmgr_ctx; /* pointer back to main ctx */
sm_evthr_task_P qcom_tsk; /* pointer to evthr task */
sm_rcb_P qcom_rdrcb; /* rcb for rd */
SIMPLEQ_HEAD(, sm_rcbl_S) qcom_wrrcbl; /* rcb list for wr */
pthread_mutex_t qcom_wrmutex; /* protect qss_wrrcb */
};
The QMGR holds also the necessary data for SMTPS sessions (2004-04-15):
struct qss_sess_S
{
sessta_id_T qsses_id;
time_T qsses_st_time;
sm_rpool_P qsess_rpool;
struct in_addr qsess_client; /* XXX use a generic struct! */
};
and transactions (2004-04-15):
struct qss_ta_S
{
sm_rpool_P qssta_rpool;
time_T qssta_st_time;
qss_mail_P qssta_mail; /* mail from */
qss_rcpts_T qssta_rcpts; /* rcpts */
unsigned int qssta_rcpts_tot; /* total number of recipients */
unsigned int qssta_flags;
sessta_id_T qssta_id;
cdb_id_P qssta_cdb_id;
size_t qssta_msg_size; /* KB */
qss_ctx_P qssta_ssctx; /* pointer back to SMTPS ctx */
pthread_mutex_t qssta_mutex;
};
The open transaction context (from SMTPS) stores information about outstanding transactions, i.e., those transactions in SMTPS that have ended the data transmission, but have not yet been confirmed by the QMGR. This data structure (fixed size queue) is used for group commits to notify the threads in the SMTPS servers that hold the open transactions.
struct qss_opta_S
{
unsigned int qot_max; /* allocated size */
unsigned int qot_cur; /* currently used (basically last-first) */
unsigned int qot_first; /* first index to read */
unsigned int qot_last; /* last index to read (first to write) */
pthread_mutex_t qot_mutex;
qss_ta_P *qot_tas; /* array of open transactions */
};
Other structures that the QMGR currently uses are
All envelope DBs (INCEDB: ibdb and iqdb, ACTEDB, DEFEDB, EDB) have their own mutexes in their context structures.
IQDB contains references to qss_sess_T, qss_ta_T, and qss_rcpts_T.
The recipient structure in AQ uses these flags:
| AQR_FL_IQDB | from IQDB |
| AQR_FL_DEFEDB | from DEFEDB |
| AQR_FL_SENT2AR | Sent to AR |
| AQR_FL_RCVD4AR | Received from AR |
| AQR_FL_RDY4DLVRY | Ready for delivery |
| AQR_FL_SCHED | Scheduled for delivery, is going to be sent to DA |
| AQR_FL_WAIT4UPD | Waiting for status update, must not be touched by scheduler |
| AQR_FL_TO | Too long in AQ |
| AQR_FL_TEMP | temporary failure |
| AQR_FL_PERM | permanent failure |
| AQR_FL_ARF | failure from SMAR |
| AQR_FL_DAF | failure from DA |
| AQR_FL_MEMAR | memory allocation for aqr_addrs failed, use fallback |
| AQR_FL_ARINCOMPL | addr resolution incomplete |
| AQR_FL_ARF_ADD | rcpt with SMAR failure added to delivery list |
| AQR_FL_TO_ADD | rcpt with timeout added to delivery list |
| AQR_FL_IS_BNC | this is a bounce |
| AQR_FL_IS_DBNC | double bounce |
| AQR_FL_DSN_PERM | perm error |
| AQR_FL_DSN_TMT | timeout |
| AQR_FL_DSN_GEN | bounce has been generated |
| AQR_FL_CNT_UPD | rcpt counters have been updated, i.e., aq_upd_ta_rcpt_cnts() has been called |
| AQR_FL_STAT_UPD | rcpt status (aqr_status) has been updated individually |
Section 2.4.3.3 explains how transaction and recipient data flows through the various DBs in QMGR. This section tries to tie the various steps to functions in QMGR (which are explained in Section 4.7.5)
The main() function of the QMGR is very simple (Notice: in almost all example code error checking etc has been removed for simplicity).
ret = sm_qmgr_init0(qmgr_ctx); /* basic initialization */
ret = sm_qmgr_rdcf(qmgr_ctx); /* read configuration */
ret = sm_qmgr_init(qmgr_ctx); /* initialization after configuration */
ret = sm_qmgr_start(qmgr_ctx); /* start all components */
ret = sm_qmgr_loop(qmgr_ctx); /* start event threads loop */
ret = sm_qmgr_stop(qmgr_ctx); /* stop all componets */
where all functions do what is obvious from their name.
The main loop sm_qmgr_loop() simply calls evthr_loop(qmgr_ctx->qmgr_ev_ctx).
sm_qmgr_init0() performs basic initialization, sm_qmgr_rdcf() reads the configuration (currently (2004-02-13) only command line parameters), and sm_qmgr_init() initializes various QMGR data structures.
sm_qmgr_start() starts various tasks:
ret = sm_qm_stli(qmgr_ctx);
ret = sm_qm_stcommit(qmgr_ctx, now);
ret = sm_qm_stsched(qmgr_ctx, now);
sm_qm_stli() starts two (event thread) tasks listening for connections from SMTPS and SMTPC using the function sm_qm_smtpsli() and sm_qm_smtpcli(). sm_qm_stcommit() starts the periodic commit task and sm_qm_stsched() starts the scheduling task.
The two listener tasks sm_qm_smtpsli() and sm_qm_smtpcli() do basically the same: wait for a new connection from the respective service (SMTPS/SMTPC), ``register'' it in the QMGR context, and start one task sm_qmgr_smtpX(sm_evthr_task_P tsk) that takes care of the communication with the SMTPX process. Notes:
The communication tasks sm_qmgr_smtpX() dispatch a read function sm_smtpX2qmgr() or a write function sm_qmgr2smtpX to deal with the communication request. Those functions use the read RCB qsX_rdrcb to read (sequentially) data from SMTPS/SMTPC and a list of write RCBs qsX_wrrcbl to write data back to those modules. Access to the latter is protected by a mutex and RCBs are appended to the list by various functions. The communication tasks are activated via read/write availability, where the write availability is additionally triggered by functions that put something into the list of write RCBs (otherwise the task would be activated most of the time without actually having anything to do).
The read functions sm_smtpX2qmgr() receive an RCB qsX_ctx->qsX_rdrcb from the module and then call the function sm_qsX_react() to decode the RCB and act accordingly. Those functions may return different value to determine what should happen next with the task. If it is an error, the task terminates (which might be overkill), other values are: QMGR_R_WAITQ (translated to EVTHR_WAITQ), QMGR_R_ASYNC (translated to EVTHR_OK), EVTHR_DEL which cause the task to terminate; other values are directly returned to the event threads library. QMGR_R_ASYNC means that the task has already been returned to the event thread system (waitq), see Section 3.20.5.1.3.
The write function sm_qmgr2mod() locks the mutex qsX_wrmutex, then checks whether the list qsX_wrrcbl of RCBs is empty. If it is, then the task returns and turns off the WRITE request. Otherwise it sends the first element to the respective module using sm_rcb_snd(), removes that element and if the list is empty thereafter turns off the WRITE when it returns. Notice: it currently does not go through the entire list trying to write it all. This is done to prevent the thread from blocking, it is assumed that a single RCB can be sent. This might be wrong in which case the thread blocks (and hopefully another runs), which might be prevented by requiring enough space in the communication buffer (can be set via setsockopt() for sockets).
The commit task sm_qm_stcommit() is responsible for group commits. It checks the list of open transactions qmgr_ctx->qmgr_optas and if it isn't empty calls q_ibdb_commit(qmgr_ctx) which in turns commits the current INCEDB and then notifies all outstanding transactions of this fact. This is done by going through the list and adding an RCB with the commit information to the list of RCBs qss_wrrcbl for the task qss_ta->qssta_ssctx that handles the transaction qss_ta.
The scheduling function sm_qm_stsched() is supposed to implement the core of the QMGR.
A recipient goes through the following stages:
The function sm_qs2c_task(qsc_ctx_P qsc_ctx, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe, sessta_id_P da_sess_id, sessta_id_P da_ta_id) creates one session with one transaction for SMTPC. The protocol is as follows:
| RT_Q2C_ID | SMTPC identifier |
| RT_Q2C_DCID | delivery class identifier |
| RT_Q2C_ONESEID | Session id, only one transaction (hack) |
| RT_Q2C_SRVIP4 | IPv4 address of server (hack) |
| RT_Q2C_NTAID | New transaction id |
| RT_Q2C_MAIL | Mail from address |
| RT_Q2C_CDBID | CDB identifier |
| RT_Q2C_RCPT_IDX | recipient index |
| RT_Q2C_RCPT | recipient address |
Additional recipients can be added via sm_qs2c_add_rcpt(qsc_ctx_P qsc_ctx, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe) which justs adds recipient index and address to the RCB.
If the transaction denotes a bounce message only one recipient can be send and instead of the record tag RT_Q2C_NTAID either RT_Q2C_NTAIDB (bounce) or RT_Q2C_NTAIDDB (double bounce) is used. Additionally an entire error text is sent using RT_Q2C_B_MSG (bounce message) as record tag. Currently this does not include the headers. It should be something like:
Hi! This is the sendmail X MTA. I'm sorry to inform you that a mail from you could not be delivered. See below for details.
listing recipient address, delivery host, and delivery message for each failed recipient.
The main QMGR context contains three arrays which store the lower and upper thresholds for various resources and the current usage. A single scalar contains the overall resource usage.
uint8_t qmgr_usage[QMGR_RFL_LAST_I + 1]; uint8_t qmgr_lower[QMGR_RFL_LAST_I + 1]; uint8_t qmgr_upper[QMGR_RFL_LAST_I + 1]; /* Overall value to indicate resource usage 0:free 100:overloaded */ unsigned int qmgr_total_usage;
To store the amount of free disk space, two data structures are used: one to store the amount of available disk space per partition (see also Section 3.4.10.13.1):
struct filesys_S {
dev_t fs_dev; /* unique device id */
unsigned long fs_kbfree; /* KB free */
unsigned long fs_blksize; /* block size, in bytes */
time_T fs_lastupdate; /* last time fs_kbfree was updated */
const char *fs_path; /* some path in the FS */
};
and one which contains an array of those individual structures:
struct fs_ctx_S {
#if SM_USE_PTHREADS
pthread_mutex_t fsc_mutex;
#endif /* SM_USE_PTHREADS */
int fsc_cur_entries; /* cur. number of entries in fsc_sys*/
int fsc_max_entries; /* max. number of entries in fsc_sys*/
filesys_P fsc_sys; /* array of filesys_T */
};
The function qm_comp_resource(qmgr_ctx_P qmgr_ctx, thr_lock_T locktype) computes a value that is a measure for the overall resource usage: qmgr_total_usage. Moreover, the function also invokes functions that return the amount of free disk for a DB that is stored on disk: cdb_fs_getfree(), edb_fs_getfree(), and ibdb_fs_getfree(). Each of these functions receives a pointer to a variable of type fs_ctx_T and a pointer to a integer variable which will contain the amount of available disk space after a succesful return. The functions themselves check the last update timestamp to avoid invoking system functions too often. Since each DB operations tries to keep track of the amount of disk space changes, this should return a reasonable estimate of the actual value.
The function q2s_throttle(qss_ctx_P qss_ctx, sm_evthr_task_P tsk, unsigned int nthreads) informs one SMTP server (referenced by qss_ctx) about the new maximum number of threads it should allow.
The generic function qs_control(qss_ctx_P qss_ctx, int direction, unsigned int use, unsigned int resource) checks the new usage of a resource and based on the input parameter direction decides whether to (un)throttle one SMTP server. qs_control() has the following behavior: throttle the system iff
else unthrottle the system iff
The specific function qs_unthrottle(qss_ctx_P qss_ctx) checks whether one SMTP server can be unthrottled based on the current resource usage. It is called by sm_smtps_wakeup() which is scheduled by sm_qss_wakeup(qmgr_ctx_P qmgr_ctx, thr_lock_T locktype) as a sleep() task. sm_qss_wakeup() in turn is invoked from qm_resource() when all resources are available (again).
The function qs_comp_control(qss_ctx_P qss_ctx, bool unthrottle) is invoked from sm_qss_react(). It will only check whether the address resolver (SMAR) is available and accordingly call qs_control().
The requirements for updating the recipient status after a delivery attempt has been made are described in Section 2.4.3.4. Section 3.4.16 describes the the functionality, which distinguishes several reasons that require updating the status of a recipient:
Before examining these cases, a short note about updating the various queues: entries in IQDB are removed immediately if the recipient was in that queue (this can be done because the recipient is safely stored in DEFEDB or IBDB). To update DEFEDB and IBDB more complicated measures are taken: a request is queued that the status must be changed (this may also mean removal of an entry from the respective DB) while the function goes through all the recipients of the transaction. DEFEDB provides functions to do this: edb_ta_rm_req() and edb_rcpt_rm_req() which are described in Section 4.10.4. See Section 4.10.2 about the implementation of updating IBDB based on a list of change requests. However, if a recipient was successfully delivered on the first attempt, IBDB can be updated directly: even if further operations fail (which are related to updating the recipient status), the recipient can be removed without any negative consequences.
As explained in Section 3.4.16.1 it is necessary to preserve the order of updates for recipients and transactions when those changes are committed to DEFEDB.
To update the status for some (failed) recipients (case 2a) the function
qm_fr_sc_rcpts(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_rcb_P rcb, unsigned int err_st)
is used, the RCB contains the recipient status from a DA. This function simply takes the data out of the RCB and updates the recipient status in the active queue. For this it invokes
aq_rcpt_status(aq_ctx, da_ta_id, idx, rcpt_status, err_st, errmsg),
which updates the field aqr_status_new that is later on used for aq_upd_ta_rcpt_cnts() (see 4.7.5.7.6) which requires the previous and the new status of a recipient to determine which recipients counters to change in the transaction context.
To update the status for an entire transaction (case 2b) from a DA the function
qda_update_ta_stat(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_ret_T status, unsigned int err_st, dadb_ctx_P dadb_ctx, dadb_entry_P dadb_entry, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, thr_lock_T locktype)
is called. This function walks through all recipients of a transaction and updates the various DBs and counters based on the individual