This chapter describes some parts of the current sendmail X implementation. Obviously the implementation may change at any time, this is not part of the sendmail X specification. Only the external interfaces, i.e., those described in previous chapters, specify the sendmail X behavior and configuration. The usual disclaimer ``may be subject to changes'' applies, i.e., nobody should rely on the descriptions given here, they are for people who actually want to understand (and modify or maintain) the source code. Some of the descriptions might not be up to date (usually the source code is implemented/changed first and updating the documentation may lag a bit).
There are some parts of the sendmail X source code which should be explained before the individual modules and the libraries. Those are conventions that are common to at least two modules and which will be used in the following sections.
In Section 3.1.1.2 some remarks about identifiers have been made. In this section the structure of session and transaction identifiers are explained. These identifiers for the SMTP server consist (currently) of a leading `S', a 64 bit counter, and an 8 bit process index, i.e., the format is: "S%016qX%02X". Those identifiers are of course used in other modules too, especially the QMGR. For simplicity the identifiers for the delivery agents follow a similar scheme: a leading `C', an 8 bit process index, a running counter (32 bit) and a thread index (32 bit). Notice that only the SMTPS identifiers are supposed to be unique over a very long time period (it takes a while before a 64 bit counter wraps around). The SMTPC (DA) identifiers are unique for a shorter time (see Section 3.1.1.2 about the requirements) and they allow easy identification of delivery threads in SMTPC. The type for this identifier is sessta_id_T.
To uniquely identify recipients they are enumerated within a transaction, so their format is "%19s-%04X" where the first 19 characters is the SMTPS transaction id and the last four characters are the recipient index (this limits the number of recipients per transaction to 65535 which seems to be enough4.1). The type for this identifier is rcpt_id_T.
In some cases it is useful to have only one identifier in a structure and some field that denotes which identifier type is actually used, i.e., a superset of sessta_id_T and rcpt_id_T. This type is smtp_id_T.
Notice: under certain circumstances it might be worth to store identifiers as binary data instead of printable strings. For example, if they are used for large indices that are stored in main memory. For SMTP server identifiers this shrinks the size from 20 bytes down to 12 (8 byte for the counter, 4 for the process index assuming that it's a 32 bit system, i.e., the 8 bit process index will be stored in a 32 bit word). For the recipient identifiers the length will shrink from 25 bytes (probably stored in 28 bytes) down to 16 bytes. Whether it is worth to trade memory storage for conversion overhead (and added complexity) remains to be seen. In the first version of sendmail X this will probably not be implemented. See also Section 3.13.6.1.1 about size estimates.
This section talks about the integration of asynchronous functions with the event thread library. Section 3.1.1.1 gives a description of asynchronous functions, stating that a result (callback) function RC is invoked with the current status and the result of an aysnchronous function as parameters. However, the implementation of this is not as simple as it might seem. The event thread library (see Section 3.20.5 for a functional description and Section 4.3.8 for the implementation) uses a task context that is passed to worker threads which in turn execute the application function (passing it the entire task context).
Note: there are two different problems described here:
Case 2 can be turned into case 1 by splitting a function into two (``continuation''), i.e., when the result is required to continue then the first half of the function saves its state and stops execution, e.g., it goes back to the wait queue. The second half of the function can then either be executed via the callback, or it is activated and combines the requested result with the current state to achieve the final result. Note: this kind of programming is not particularly simple as explained in Section 3.16.4, but it avoids blocking as much as possible.
There are basically two approaches to this problem:
The advantage of solution 1 is that it requires less context switching (even if it is only a thread context). If the function RC is a callback that is directly invoked from the function that receives the result from an external source, then the result values should not be those that are returned to the worker manager (e.g., OK, WAITQ, RUNQ, DEL, see 3.20.5.1.1), unless we put an extra handler inbetween that knows about these return values and can handle them properly by performing the appropriate actions. Therefore this approach requires careful programming, see below for details.
Solution 2 does not mess around with the worker function and the task (event thread) context without telling the event thread system about it, hence it does not have the programming restriction mentioned above; it allows for a more normal programming style. However, there needs to be a way to match the condition that became true with the task that is waiting for it. Usually this is done via identifiers (see Section 3.1.1.2), however, the current implementation only allows for a single character to be sent over the internal notification pipe4.2. Hence it is complicated to find the right task - waking up one after another to let them figure out themselves whether ``their'' condition is met can be expensive. One possible solution for this is to maintain a queue of results instead of writing the results directly into some structure. Elements in the result queue contain the necessary identifier to find the right task that is waiting for the condition/result.
An asynchronous call sequence looks like this:
The next step depends on which solution has been chosen:
If the task C is invoked via the callback from result_handler() then we either have to make C aware of this (implicitly by ``knowing'' which parts of C are invoked by callbacks or explicitly by passing this information somehow, e.g., encoded in the task structure). We can either require that a callback has different set of return values, i.e., only EVTHR_OK, or we need an ``inbetween'' function that can interpret the result values for a manager thread and manipulate the task context accordingly. The latter seems like the most generic and clean solution. Notice, however, that we have to take care of the usual problem with accessing the task while it is outside of our control, i.e., if it has been returned to the event thread system earlier on, the well-known problems arise (see Section 3.20.5.1.5, additionally the task must know whether it placed itself earlier on into the wait queue to avoid doing it again).
In the previous section solution 3a for approach 1 states that the callback function may store the result in a data structure. If the caller of the asynchronous function waits for the result, then the access to the structure must be synchronized and the caller must be informed that the data is available (valid). Note: this is not a good solution to the problem of asynchronous functions (because the caller blocks while waiting without giving the event threads library a chance to switch to another thread; if too many functions are doing this the system may even deadlock), but just a simple approach until a better solution is found.
The simple algorithm uses a mutex (to protect access to the shared data structure and the status variable), a condition variable (to signal completion), and a status variable which has three states:
Note: due to the asynchronous operation, states 2 and 3 do not need to happen in that order. Moreover, state 2 may not be reached at all because the callee is done before the caller needs the result (this is the best case because it avoids waiting).
Caller:
status = init; /* initialize system */ invoke callee /* call asynchronous function */ ... /* do something */ lock; /* acquire mutex */ if (status == init) { /* is it still the initial value? */ status = wait; /* indicate that caller is waiting */ while (status == wait) /* wait until status changes */ cond_wait /* wait for signal from callee, this also unlocks the mutex */ } } unlock;
Callee:
... /* compute result */ lock; /* acquire mutex */ v = result; /* set result */ notify = (status == wait); /* is caller waiting? */ status = sent; /* result is available */ if (notify) cond_signal /* notify caller if it is waiting */ unlock; /* done */
This can be extended if there are multiple function calls and hence multiple results to wait for. It would be ugly to use an individual status variable for each function, hence a status variable and a counter are used. If the counter is zero, then all results are available. The status variable simply indicates whether the caller is waiting for a result and hence the callee should use the condition variable to signal that the result is valid. Since there can be multiple callees, only the one for which the counter is zero and the status variable is wait must signal completion.
Caller:
status = init; /* initialize system */ counter = 0; ... lock ++counter; invoke callee /* call asynchronous function */ unlock /* (maybe multiple times) */ ... /* do something */ lock /* acquire mutex */ if (counter > 0) { /* are there outstanding results? */ status = wait; /* indicate that caller is waiting */ while (status == wait && counter > 0) /* wait for results */ cond_wait /* wait for signal from callee */ } unlock /* done */
Callee:
... /* compute result */ lock /* acquire mutex */ --counter; v = result; /* set result */ /* is caller waiting and this is the last result? */ if (status == wait && counter == 0) { status = sent; /* results are available */ cond_signal /* notify caller */ } unlock /* done */
Notes:
If the result value cannot have all possible values in its range, then two values can be designated as v-init and v-wait while all others can be considered as v-sent. This removes the need for a status variable because the result variable v itself is used for that purpose:
Caller:
v = v_init; /* initialize system */ invoke callee /* call asynchronous function */ ... /* do something */ lock /* acquire mutex */ if (v == v_init) { /* is it still the initial value? */ v = v_wait; /* indicate that caller is waiting */ while (v == v_wait) /* wait until status changes */ cond_wait /* wait for signal from callee */ } } unlock
Callee:
... /* compute result */ lock; /* acquire mutex */ notify = (v == v_wait); /* is caller waiting? */ v = result; /* set result */ if (notify) cond_signal /* notify caller if it is waiting */ unlock /* done */
Solution 2 described in Section 4.1.2.2 needs some form of synchronization too: as described in the previous Section 4.1.2.3 the callee may may return a result before or after the caller wants to access it.
Using the first algorithm from the previous section with a slight modification like this:
Caller:
status = init; /* initialize system */ invoke callee /* call asynchronous function */ ... /* do something */ lock; /* acquire mutex */ waiting = (status == init); /* is it still the initial value? */ if (waiting) status = wait; /* indicate that caller is waiting */ unlock; if (waiting) put task into condqueue
Callee:
... /* compute result */ lock; /* acquire mutex */ v = result; /* set result */ notify = (status == wait); /* is caller waiting? */ status = sent; /* result is available */ if (notify) /* if caller is waiting: */ cond_signal /* notify evthr system about condition */ unlock; /* done */
causes a race condition: after unlocking the mutex in the caller but before putting the task into condqueue the callee may signal the event thread library that condition is fulfilled. To avoid this race condition, a condition variable and a mutex must be combined as it is done by pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex):
The pthread_cond_wait(3) function atomically blocks the current thread waiting on the condition variable specified by cond, and unblocks the mutex specified by mutex.
In the modified version the caller just instructs the event thread library to put the task into the condqueue:
status = init; /* initialize system */ invoke callee /* call asynchronous function */ ... /* do something */ lock; /* acquire mutex */ waiting = (status == init); /* is it still the initial value? */ if (waiting) { status = wait; /* indicate that caller is waiting */ return to event thread library with return code that requests to put task into condqueue }
and the library itself unlocks the mutex afterwards. A condition variable for event threads hence consists of a mutex and an identifier (see Section 3.1.1.2) to match caller and callee. Such a condition variable belongs to a task (just like a file descriptor on which a task might be waiting).
To deal with various kinds of errors, it is necessary to write functions such that they are ``reversible'', i.e., they either perform a transaction or they don't change the state (in an inconsistent manner). Unfortunately, it is fairly complicated to write all functions in such a way. For example, if multiple changes must be made each of which can fail, then either the previous state must be preserved such that it can be restored when something goes wrong, or the changes must be undone individually. For this to work properly, it is essential that the ``undo'' operation itself can not fail. Hence an ``undo'' operation must not rely on resources that might become unavailable during processing; for example, if it requires memory, then that memory must be allocated before the whole operation is started, or at least while the invidual changes are made, such that the ``undo'' operation does not need to allocate memory. This might not be achievable in some cases, however. For example, there are operations in sendmail X that require updates to two persistent databases, i.e., DEFEDB and IBDB. Both of these require disk space which cannot be pre-allocated nor can it be guaranteed that two independent disk write operations will succeed. If the first one fails, then we don't need to perform the second one. However, if the first one succeeds and the second one fails, then there is no guarantee that the first operation can be undone. Hence the operations must be performed in such a manner that even in the worst case mail will not be lost but at most delivered more than once.
Transaction based processing is also fairly complicated in case of asynchronous operations. If a function has to perform changes to a local state and creates a list of change requests which will be performed later on by some asynchronously running thread, then that list of change requests should offer a callback functionality which can be invoked with the status of the operation such that it can either perform the local changes (if that has not been done yet) or undo the local changes (if they have been performed earlier). An additional problem is that other functions may have performed changes inbetween. Hence it is not possible to save the old state and restore it in case of an error because that would erase other (valid) changes. Instead ``relative'' changes must be performed, e.g., in/decreasing a counter.
As mentioned in Section 2.14, item 8, C has some inherent security problems because it does not provide boundary checks on buffers, arrays, or strings. Those checks must be implemented by the program itself. Here are some (obvious) hints when writing code for sendmail X (or other secure programs):
The sendmail X libraries provide communication buffers and replacements for strings, see 3.16.11.1.1, and 3.16.7, which must be used as much as possible; exception should only be made for external libraries that do not support these data structures.
Order of implementation:
based on state threads
Milestone 3 is complete if a mail can be received by SMTPS, safely stored in a queue by QMGR, scheduled for delivery by a very simple scheduler in the QMGR, and then delivered by SMTPC.
Milestone 3 has been reached 2002-08-19.
Notice: this is prototype code. There are many unfinished places within the existing code not to mention all the parts that are completely missing.
Milestone 4 has been reached 2002-09-19.
Next steps:
As of 2004-01-01 sendmail X is running as MTA on the machine of the author. sendmail 8 is only used for mail submission.
This section describes the implementation of some libraries which are used by various sendmail X modules.
Queues, lists, et.al., are taken from OpenBSD <sys/queue.h>.
Two hash table implementations are available: a conventional one with one key and a version with two keys. The latter is currently not used, it was intended for the DA database.
Classes can be used to check whether a key is in a set.
Classes can be implemented via a hash table: sendmail 8 allows up to 256 classes (there is a simple mapping of class names to single byte values). Each element in a class is stored in a hash table, the RHS is a bitmap to indicate to which classes it belongs. Checking whether something is in a class is done as follows: lookup word in hash table: if it exists: is it a member of the class we are looking for (which is just a bitnset() test)? This kind of implementation uses one hash table for many classes.
Alternative implementations are lists if a class has only a few elements (linear search) or tree.
Various versions of balanced trees are available which are based on code that has been found on the internet. See include/sm/tree.h, include/sm/avl.h, and include/sm/bsd-tree.h. The latter is taken from OpenBSD <sys/tree.h>, it can be used to generate functions that operate on splay trees and red-black trees. That is, there aren't generic functions that operate on trees, but specific functions are generated that operate on one type. Of course it is possible to generate a generic version where a tree node contains the usual data, i.e., a key and a pointer to the value.
A restricted size cache is implemented using a hash table (for access) and a linked list which is kept in most-recently used order (MRU).
Note: if we don't need to dynamically delete and add entries from the RSC and we don't need the MRU feature, then we can probably use a (binary) hash table which keeps tracks of the number of entries in it.
The structure definitions given below are hidden from the application.
typedef struct rsc_S rsc_T, *rsc_P; struct rsc_entry_S { CIRCLEQ_ENTRY(rsc_entry_S) rsce_link; /* MRU linkage */ const char *rsce_key; /* lookup key */ unsigned int rsce_len; /* key len */ void *rsce_value; /* corresponding value */ }; struct rsc_S { bht_P rsc_table; /* table with key, rsc_entry pairs */ unsigned int rsc_limit; /* max # of entries */ unsigned int rsc_used; /* current # of entries */ rsc_create_F rsc_create; /* constructor */ rsc_delete_F rsc_delete; /* destructor */ CIRCLEQ_HEAD(, rsc_entry_S) rsc_link; /* MRU linkage */ void *rsc_ctx; /* application context */ };
The following functions (and function types) are provided:
typedef void *(*rsc_create_F) (const char *_key, unsigned int _len, void *_value, void *_ctx); typedef sm_ret_T (*rsc_delete_F) (void *_value, void *_ctx); typedef void (rsc_walk_F)(const char *_key, const void *_value, const void *_ctx); extern rsc_P rsc_create(sm_rpool_P _rpool, unsigned int _limit, unsigned int _htsize, rsc_create_F _create, rsc_delete_F _delete, void *_ctx); extern void rsc_free(rsc_P _rsc); extern void rsc_walk(rsc_P _rsc, rsc_walk_F *_f); extern sm_ret_T rsc_add(rsc_P _rsc, bool _delok, const char *_key, unsigned int _len, void *_value, void **_pvalue); extern const void *rsc_lookup(rsc_P _cache, const char *_key, unsigned int _len); extern sm_ret_T rsc_rm(rsc_P _rsc, const char *_key, unsigned int _len); extern int rsc_usage(rsc_P _rsc); extern void rsc_stats(rsc_P _rsc, char *_out, unsigned int _len);
If an RSC stores data of different types, it must be possible to distinguish between them. This is necessary for functions that ``walk'' through an RSC and perform operations on the data or just for generic functions, i.e., create and delete. As such, the RSC could store an additional type and pass it to the delete and create functions. However, this could also handled by the application itself, i.e., it adds a unique identifier to the data that it stores in an RSC.
The API of a typed RSC implementation differs as follow:
Note that this causes problems with a unified API as described in Section 3.16.12.
We could use a fixed number of keys and set those that we are not interested in to NULL, or we could use a variable number of keys and specify a key and an index. How easy is the second solution to implement? The keys might be implemented as an array with some upper limit that is specified at creation time.
A possible approach to allow for any number of keys is to specify an array of keys (and corresponding lengths). During initialization the maximum number of keys is defined (and stored in the structure describing the DB). Access methods pass in a key and length and the index of the key, where zero is the primary key. When an element is entered, the primary key is used. What about the other keys? They must be specified too since the appropriate entries in the hash tables must be constructed. This seems to become ugly. Let's try a simpler approach first (one, two, three keys).
This might be implemented by having a (second) link in the entries which points to the next element with the same key. Check the hash table implementation.
The event thread library provides the basic framework for a worker based thread pool that is driven by I/O events and by wakeup times for tasks. See Section 3.20.5 for a functional description.
This library uses a general context that describes one event thread system and a per task context. It uses two queues: a run queue of tasks that can be executed, and a wait queue of tasks that wait for some event. Each task is in exactly one of the queues at each time.
The event thread context looks like this (2004-07-27):
struct sm_evthr_ctx_S { pthread_cond_t evthrc_cv; pthread_mutex_t evthrc_waitqmut; pthread_mutex_t evthrc_runqmut; int evthrc_max; /* max. number of threads */ int evthrc_min; /* min. number of threads */ int evthrc_cur; /* current number of threads */ int evthrc_idl; /* idle threads */ int evthrc_stop; /* stop threads */ int evthrc_maxfd; /* maximum number of FDs */ timeval_T evthrc_time; /* current time */ sm_evthr_task_P *evthrc_fd2t; /* array to map FDs to tasks */ /* pipe between control thread and worker/signal threads */ int evthrc_pipe[2]; CIRCLEQ_HEAD(, sm_evthr_task_S) evthrc_waitq; CIRCLEQ_HEAD(, sm_evthr_task_S) evthrc_runq; };
The system maintains some number of worker threads between the specified minimum and maximum. Idle threads vanish after some timeout until the minimum number is reached.
An application function has this prototype:
typedef sm_ret_T (evthr_task_F)(sm_evthr_task_P);
That is, the function receives the per-task context, whose structure is listed next, as a parameter. Even though this is a violation of the abstraction principle, it allows for some functionality which would be awkward to achieve otherwise. For example, a user application can directly manipulate the next wakeup time. We could hide this behind a void pointer and provide some functions to manipulate the task context if we want to hide the implementation.
struct sm_evthr_task_S { CIRCLEQ_ENTRY(sm_evthr_task_S) evthrt_next; /* protects evthrt_rqevf and evthrt_sleep */ pthread_mutex_t evthrt_mutex; int evthrt_rqevf; /* requested event flags; see below */ int evthrt_evocc; /* events occurred; see below */ int evthrt_state; /* current state; see below */ int evthrt_fd; /* fd to watch */ timeval_T evthrt_sleep; /* when to wake up */ evthr_task_F *evthrt_fct; /* function to execute */ void *evthrt_actx; /* application context */ sm_evthr_ctx_P evthrt_ctx; /* evthr context */ sm_evthr_nc_P evthrt_nc; /* network connection */ };
The first element describes the linked list, i.e., either the wait queue or the run queue. The second element (requested event flags) lists the events the task is waiting for:
flag | meaning |
EVT_EV_RD | task is waiting for fd to become ready for read |
EVT_EV_WR | task is waiting for fd to become ready for write |
EVT_EV_LI | task is waiting for fd to become ready for accept |
EVT_EV_SL | task is waiting for timeout |
The third element (events occurred) lists the events that activated the task:
flag | meaning |
EVT_EV_RD_Y | task is ready for read |
EVT_EV_WR_Y | task is ready for write |
EVT_EV_LI_Y | task received a new connection |
EVT_EV_SL_Y | task has been woken up (after timeout) |
The fourth element (status) consists of some internal state flags, e.g., in which queue the task is and what it wants to do next.
The event thread library provides these functions:
extern sm_ret_T evthr_init(sm_evthr_ctx_P *_pctx, int _minthr, int _maxthr, int _maxfd); extern sm_ret_T evthr_task_new(sm_evthr_ctx_P _ctx, sm_evthr_task_P *_task, int _ev, int _fd, timeval_T *_sleept, evthr_task_F *_fct, void *_taskctx); extern sm_ret_T evthr_loop(sm_evthr_ctx_P _ctx); extern sm_ret_T evthr_waitq_app(sm_evthr_task_P _task); extern sm_ret_T evthr_en_wr(sm_evthr_task_P _task); extern sm_ret_T evthr_time(sm_evthr_ctx_P _ctx, timeval_T *_ct); extern sm_ret_T evthr_new_sl(sm_evthr_task_P _task, timeval_T _slpt, bool _change);
An application first initializes the library (evthr_init()), then it creates at least one task (evthr_task_new()), and thereafter calls evthr_loop() which in turn monitors the desired events (I/O, timeout) and invokes the callback functions. Those callback functions return a result which indicates what should happen with the task:
flag | meaning |
EVTHR_OK | do nothing, task has been taken care of |
EVTHR_WAITQ | put in wait queue |
EVTHR_RUNQ | put in run queue |
EVTHR_SLPQ | sleep for a while |
EVTHR_DEL | delete task |
EVTHR_TERM | terminate event thread loop |
Section 3.20.5 describes some problems that need to be solved:
Question: what about callback functions for signals? Currently the usual signals terminate the system.
sendmail X uses RCBs for communication between the various modules (see Section 3.16.11.1.1). This section describes some functions that simplify handling of RCB based communication for modules which use the event thread library (see Section 4.3.8).
This library uses the following structure:
struct rcbcom_ctx_S { sm_evthr_task_P rcbcom_tsk; /* event thread task */ sm_rcb_P rcbcom_rdrcb; /* RCB for communication (rd) */ SIMPLEQ_HEAD(, sm_rcbe_S) rcbcom_wrrcbl; /* RCB list (wr) */ pthread_mutex_t rcbcom_wrmutex; };
The first entry is a pointer to the task that is responsible for the communication. The second entry is the RCB in which data is received. The third is a list of RCBs for data that must be send out, this list is proteced by the mutex that is the last element of the structure.
The following functions are provided by the library:
sm_ret_T sm_rcbcom_open(rcbcom_ctx_P rcbcom_ctx): create a RCB communication context.
sm_ret_T sm_rcbcom_close(rcbcom_ctx_P rcbcom_ctx): close a RCB communication context.
sm_ret_T sm_rcbe_new_enc(sm_rcbe_P *prcbe, int minsz): create an entry for the write RCB list and open it for encoding.
sm_rcbcom_prerep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, sm_rcbe_P *prcbe): prepare a reply to a module: close the read RCB after decoding, open it for encoding, create a new RCB for writing (using sm_rcbe_new_enc()), and put the task back into the wait queue (unless tsk is NULL). Notice: after calling this function (with tsk not NULL), the task is not under control of the caller anymore, hence all manipulation of its state must be done via the functions provided by the event thread library.
sm_rcbcom_endrep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, bool notified, sm_rcbe_P rcbe): close the write RCB, append it to the RCB list in the context, and if notified is not set inform the task about the new write request (using evthr_en_wr()).
sm_rcbcom2mod(sm_evthr_task_P tsk, rcbcom_ctx_P rcbcom_ctx): send the first element of the RCB write list to filedescriptor specified in the task. This function should be called when the event threads library invoke the callback for the task and denotes that the file descriptor is ready for writing, i.e., it will be called from an event thread task that checks for I/O activity on that file descriptor. Since the function does not conform to the function specification for a task, it is called by a wrapper that extracts the RCB communication context from the task (probably indirectly). The function will only send the first element of the list; if the list is empty afterwards, it will disable the write request for this task, otherwise the event thread library will invoke the callback again when the filedescriptor is ready for writing. This offers a chance for an optimization: check whether another RCB can be written; question: is there a call that can determine the free buffer size for the file descriptor such that the next write operation does not block?
An asynchronous DNS resolver has been implemented (libdns/) according to the specification of Section 3.16.13.
A DNS request has the following structure:
struct dns_req_S { sm_cstr_P dnsreq_query; /* the query itself */ time_T dnsreq_start; /* request start time */ /* unsigned int dnsreq_tries; * retries */ dns_type_T dnsreq_type; /* query type */ unsigned short dnsreq_flags; /* currently: is in list? */ sm_str_P dnsreq_key; /* key for hash table: query + type */ dns_callback_F *dnsreq_fct; /* callback into application */ void *dnsreq_ctx; /* context for application callback */ TAILQ_ENTRY(dns_req_S) dnsreq_link; /* next entry */ };
A DNS result consists of a list of entries with the following elements:
typedef union { ipv4_T dnsresu_a; sm_cstr_P dnsresu_name; /* name from DNS */ } dns_resu_T; struct dns_res_S { sm_cstr_P dnsres_query; /* original query */ sm_ret_T dnsres_ret; /* error code */ dns_type_T dnsres_qtype; /* original query type */ unsigned int dnsres_entries; /* number of entries */ unsigned int dnsres_maxentries; /* max. number of entries */ dns_resl_T dnsres_hd; /* head of list of mx entries */ }; struct dns_rese_S { dns_type_T dnsrese_type; /* result type */ unsigned int dnsrese_ttl; /* TTL from DNS */ unsigned short dnsrese_pref; /* preference from DNS */ unsigned short dnsrese_weight; /* for internal randomization */ sm_cstr_P dnsrese_name; /* RR name */ TAILQ_ENTRY(dns_rese_S) dnsrese_link; /* next entry */ dns_resu_T dnsrese_val; };
A DNS manager context contains the following elements:
struct dns_mgr_ctx_S { unsigned int dnsmgr_flags; dns_rql_T dnsmgr_req_hd; /* list of requests */ dns_req_P dnsmgr_req_cur; /* current request */ /* hash table to store requests */ bht_P dnsmgr_req_ht; #if SM_USE_PTHREADS pthread_mutex_t dnsmgr_mutex; /* for the entire context? */ sm_evthr_task_P dnsmgr_tsk; /* XXX Just one? */ sm_evthr_task_P dnsmgr_cleanup; #endif /* SM_USE_PTHREADS */ };
and a DNS task looks like this:
struct dns_tsk_S { /* XXX int or statethreads socket */ int dnstsk_fd; /* socket */ dns_mgr_ctx_P dnstsk_mgr; /* DNS manager */ uint dnstsk_flags; /* operating flags */ uint dnstsk_timeouts; /* queries that timed out */ sockaddr_in_T dnstsk_sin; /* socket description */ sm_str_P dnstsk_rd; /* read buffer */ sm_str_P dnstsk_wr; /* write buffer */ };
The DNS library offers functions to create and delete a DNS manager context:
sm_ret_T dns_mgr_ctx_new(uint flags, dns_mgr_ctx_P *pdns_mgr_ctx); sm_ret_T dns_mgr_ctx_del(dns_mgr_ctx_P dns_mgr_ctx);
and similar functions for DNS tasks:
sm_ret_T dns_tsk_new(dns_mgr_ctx_P dns_mgr_ctx, uint flags, ipv4_T ipv4, dns_tsk_P *pdns_tsk); sm_ret_T dns_tsk_del(dns_tsk_P dns_tsk); sm_ret_T dns_tsk_start(dns_mgr_ctx_P dns_mgr_ctx, dns_tsk_P dns_tsk, sm_evthr_ctx_P evthr_ctx)
An application can make a DNS request using the following function:
sm_ret_T dns_req_add(dns_mgr_ctx_P dns_mgr_ctx, sm_cstr_P query, dns_type_T type, dns_callback_F *fct, void *ctx);
Internal functions of the library are:
sm_ret_T dns_comm_tsk(sm_evthr_task_P tsk); sm_ret_T dns_tsk_cleanup(sm_evthr_task_P tsk); sm_ret_T dns_tsk_rd(sm_evthr_task_P tsk) sm_ret_T dns_tsk_wr(sm_evthr_task_P tsk) void dns_req_del(void *value, void *ctx); sm_ret_T dns_receive(dns_tsk_P dns_tsk); sm_ret_T dns_send(dns_tsk_P dns_tsk); sm_ret_T dns_decode(sm_str_P ans, uchar *query, int qlen, dns_type_T *ptype, dns_res_P dns_res);
dns_comm_tsk() is a function that is called whenever I/O activity is possible, it invokes the read and write functions dns_tsk_rd() and dns_tsk_wr(); respectively. dns_tsk_cleanup() is cleanup task that deals with requests which didn't receive an answer within a certain amount of time.
The following steps are necessary for initializing and starting the DNS resolver (when using the event threads system):
/* Initialize DNS resolver */ ret = dns_rslv_new(random); /* Create DNS manager context */ ret = dns_mgr_ctx_new(0, &dns_mgr_ctx); /* Create one DNS resolver task */ ret = dns_tsk_new(dns_mgr_ctx, 0, ipv4, &dns_tsk); /* Start DNS tasks (resolver and cleanup) */ ret = dns_tsk_start(dns_mgr_ctx, dns_tsk, evthr_ctx);
dns_req_add() creates a DNS request and adds it to the list maintained in the DNS manager context (dnsmgr_req_hd), unless such a request is already queued, which is checked via the hash table dnsmgr_req_ht, to which the request is always added. The key for the hash table is the DNS query string and the DNS query type concatenated (if the query string has a trailing dot it will be removed). Whenever a DNS request is added to the list the write event is triggered for dns_tsk_wr(), which will take the current request (pointed to by dnsmgr_req_cur), form a DNS query and send it to a DNS server using dns_send(). Notice: the request will not be removed from the list, this is done by either the cleanup task or the read function. To indicate whether a request is in the list or merely in the hash table, the field dnsreq_flags is used.
The hash table is used by the function dns_tsk_rd() which receives replies from a DNS server to identify all requests for a DNS query (it uses dns_tsk_rd() and dns_decode()). The function removes all those requests from the hash table and the DNS manager list, and creates a local list out of them. Then it walks through that list and invokes the callback functions specified in the requests with DNS result and the application specific context.
The cleanup task dns_tsk_cleanup() uses the DNS manager list of requests (from the first element up to the current one), and checks whether they have timed out (based on the time the request has been made dnsreq_start). If this is the case then the request is removed from the list and appended to a local list. Moreover, all requests in the hash table for the same query are moved into the local list too - this needs improvement. As soon as a request is found that is not timed out (or the current element is reached), the search stops (this currently doesn't allow for individual timeouts). Thereafter, the callbacks for the requests in the local lists are invoked with a DNS result that contains an error code (timeout).
There is currently a simple implementation for logging in libmta/log.c. It closely follows the ISC logging module API mentioned in Section 3.16.16.1.
The current implementation has a slight problem with logfile rotation. The programs use stdout/stderr for logging; they do not open a named logfile themselves, this is done by the MCP for them. Hence there is no way for them to reopen a logfile, instead the sm_log_reopen() function rewinds the file using ftruncate(2) and fseek(3).
Alternatively the name of a logfile can be passed to each module such that it can open the file itself and hence the filename is available for the reopen function. The filename could be stored in the logging context and hence the reopen function could act accordingly, i.e., use sm_io_open() if a filename exists and otherwise the method described above.
The first prototype of SMTPS is based on state threads (see Section 3.20.3.1).
This prototype uses a set of threads which is limited by an upper and lower number. If threads are idle for some time and there are more than a specified number of idle threads available, they terminate. If not enough threads are available, new ones are created up to a specified maximum.
Remark (placed here so it doesn't get lost):
there is a restricted number ( 60000) of possible open
connections to one port.
Could that limit the throughput we are trying to achieve
or is such a high number of connections unfeasible?
We do not want a separate connection between QMGR and SMTPS for each thread in SMTPS, hence we need to associate the data from QMGR with the right thread in SMTPS. One approach is to have a receiver thread in SMTPS which communicates with the QMGR. It receives all data from the QMGR and identifies the context (session/transaction) to which the data belongs. This needs some list of all contexts, e.g., an AVL tree, or, if the number of entries is small enough, a linear list. Question: is there some better method, i.e., instead of searching some structure have direct access to the right thread (see also Section 3.1.1.2)? There might be some optimization possible since each SMTPS has only a limited number of threads, so we could have an array of that size and encode an index into that array into the RCB, e.g., use another ID type that is passed around (like a context pointer). It then adds that data to the context and notifies the thread There is one thread to read from the communication channel, but multiple tasks can actually write to it; writing is sequentialized by a mutex. In a conventional thread model, we would just select on I/O activities for that channel and notifications when a new RCB is added to the list to send to the QMGR (like it is done in the QMGR), however, in state threads I/O activity is controlled by the internal scheduler. Since there are multiple threads, it might be necessary to control how far ahead the writers can be of the readers (to avoid starvation and unfairness). However, this should be self-adjusting since threads are waiting for replies for requests they send before they send out new ones (by default, in some cases a few requests may be outstanding from one thread). If too many threads send data, then the capacity of the communication channel and the way requests are handled by the QMGR should avoid starvation and guarantee fairness.
As explained above there is one thread that takes care of the communication between the module and the QMGR. This thread uses the following structure as context:
struct s2q_ctx_S { int s2q_status; /* status */ st_netfd_t s2q_fd; /* fd for communication */ int s2q_smtps_id; /* smtps id */ st_mutex_t s2q_wr_mutex; /* mutex for write */ unsigned int s2q_maxrcbs; /* max. # of outstanding requests */ unsigned int s2q_currcbs; /* current # of outstanding requests */ sessta_id_P *s2q_sids; /* array of session ids */ smtps_sess_P *s2q_sess; /* array of session ctx */ };
For initialization and termination of the communication task the following two functions are provided:
sm_ret_T sm_s2q_init(s2q_ctx_P s2q_ctx, int smtps_id, unsigned int maxrcbs); sm_ret_T sm_s2q_stop(s2q_ctx_P s2q_ctx);
The initialization function connects to the QMGR and stores the file descriptor for communication in s2q_fd. It allocates two arrays for sessions IDs and session contexts which are used to find the SMTPS session for an incoming RCB, and it sends the initial ``A new SMTPS has been started'' to the QMGR. Finally sm_s2q_init() starts a thread that executes the function void *sm_rcb_from_srv(void *arg) which receives the s2q context as parameter. This function receives an RCB from QMGR and notifies the thread that is associated with the task via a condition variable; the thread can be found using the s2q_sids array.
Data can be sent to the QMGR using one of the functions sm_s2q_*() for new session ID, close session ID, new transaction ID, new recipient, close transaction ID, and discard transaction ID.
The function sm_w4q2s_reply() is used to wait for a reply from QMGR. It waits on a condition variable (which is stored in the SMTPS session context) which is signalled by sm_rcb_from_qmgr().
Initially the SMTP server sends the QMGR its id and the maximum number of threads it is going to create.
RT_S2Q_NID | id of new SMTPS |
RT_S2Q_ID | id of SMTPS |
RT_S2Q_CID | close SMTPS (id) |
RT_S2Q_STAT | status |
RT_S2Q_MAXTHRDS | max number of threads |
RT_S2Q_NSEID | new session id |
RT_S2Q_SEID | session id |
RT_S2Q_CSEID | close session id |
RT_S2Q_CLTIP4 | client IPv4 address |
RT_S2Q_CLTIP6 | client IPv6 address |
RT_S2Q_CLTPORT | client port |
RT_S2Q_NTAID | new transaction id |
RT_S2Q_TAID | transaction id |
RT_S2Q_CTAID | close transaction id |
RT_S2Q_DTAID | discard transaction id |
RT_S2Q_MAIL | mail from |
RT_S2Q_RCPT_IDX | rcpt idx |
RT_S2Q_RCPT | rcpt to |
RT_S2Q_CDBID | cdb id |
The common reply format from QMGR to SMTPS consists of the SMTPS id (which is only transmitted for paranoia), a session or transaction id, a status code and an optional status text:
RT_Q2S_ID | SMTPS id |
RT_Q2S_SEID/RT_Q2S_TAID | session/transaction id |
RT_Q2S_STAT![]() ![]() |
status (ok, reject, more detailed?) |
RT_Q2S_STATT | status text |
The function sm_rcb_from_srv() uses the session/transaction id to find the correct thread to which the rest of the RCB will be given.
RT_Q2S_ID | id of SMTPS |
RT_Q2S_STAT | status for session/transaction/... |
RT_Q2S_STATV | status value (text follows) |
RT_Q2S_STATT | status text |
RT_Q2S_SEID | session id |
RT_Q2S_TAID | transaction id |
RT_Q2S_RCPT_IDX | rcpt idx |
RT_Q2S_CDBID | cdb id |
RT_Q2S_THRDS | slow down |
RT_Q2S_STOP | stop reception (use slow = 0?) |
RT_Q2S_DOWN | shut down |
The SMTP server uses the AR as map lookup server to avoid blocking calls in the state-threads application. While the anti-spam logic etc is implemented in SMTPS, the map lookups are performed by SMAR. Hence SMTPS only sends minimal information to SMAR, e.g., the sender or recipient address and asks for lookups in some maps with certain features, e.g., lookup the full address, the domain part, the address without details (``+detail'').
RT_S2A_TAID | transaction id |
RT_S2A_MAIL | mail from |
RT_S2A_RCPT_IDX | rcpt idx |
RT_S2A_RCPT | rcpt to (printable address) |
RT_S2A_LTYPE | lookup type |
RT_S2A_LFLAGS | lookup flags |
To simplify the SMTP server code, the reply format for SMAR is basically the same as for QMGR:
RT_A2S_ID | id of SMTPS |
RT_A2S_TAID | transaction id |
RT_A2S_STAT![]() ![]() |
status (ok, reject, more detailed?) |
RT_A2S_STATT | status message |
RT_A2S_MAIL_ST | mail status |
RT_A2S_RCPT_IDX | rcpt index |
RT_A2S_RCPT_ST | rcpt status |
Values for lookup types are:
LT_LOCAL_ADDR | is local address? |
LT_RCPT_OK | is address ok as a recipient? |
LT_MAIL_OK | is address ok as a sender? |
Values for lookup flags are:
LF_DOMAIN | try domain |
LF_FULL | try full address |
LF_LOCAL | try localpart |
LF_NODETAIL | try without detail |
As explained elsewhere (2.8.2) it is possible to specify multiple delivery classes and multiple delivery agents that implement delivery classes. The former are referenced by the address resolver when selecting a mailer. The latter are selected by the scheduler after it receives a mailer to use.
Every delivery agent has an index and a list of delivery classes it implements. There is also a list of delivery classes (which are referenced by some id, most likely a numeric index into an array). This list is maintained by SMAR, each DA, and QMGR (and must obviously be kept in sync if numeric indices are used instead of names). QMGR keeps for each delivery class a list of delivery agents that implement the class, which can be used by the scheduler to select a DA that will perform a delivery attempt.
Note: As described in Section 3.8.3.1 item 6, the first version of sendmail X does not need to implement the full set of this; all delivery agents implement the same delivery classes, hence they can be selected freely without any restriction.
The first prototype of SMTPC is based on state threads (see Section 3.20.3.1).
It follows a similar thread model as that used for the SMTP server daemon (see Section 4.4).
See Section 3.4.5.1 for a description.
As usual, a protocol header is sent first. Moreover, the next entry in each RCB is the identifier of the SMTPC to which the QMGR wants to talk: RT_Q2C_ID.
The rest of the RCB is described below for each function.
Notice: for status codes an additional text field might follow, which currently isn't specified here.
RT_Q2C_DCID: delivery class id.
More data to follow, e.g., requirements about the session.
For the transaction data see below (item 7).
RT_C2Q_SESTAT: session status: either SMTP status code or an error code, e.g., connection refused etc.
The recipient data might be repeated to list multiple recipients Notice: we may run into a size limit of RCBs here; do we need something like a continuation RCB?.
See Section 3.9.2.1 about the status information that is sent from an SMTP client to QMGR.
RT_C2Q_TASTAT: transaction status: this is the overall status of a transaction. It has two parts: an error code and an error state, the latter describes in which state of the SMTP dialogue (or internal state) the error occurred. An optional error message can be sent (RT_C2Q_STATT). If recipient statuses are sent then the record type RT_C2Q_TARSTAT is used instead. A recipient status consists of three items:
RT_Q2C_CSEID: close session id
This value can be pretty much ignored for all practical purposes, except if we want to see whether the server behaves properly and still responds.
RT_C2Q_SESTAT: session status (or do we want to use a different record type? Might be useful to distinguish to avoid confusion)
The main SMTPC context structure looks like this:
struct sc_ctx_S { unsigned int sc_max_thrds; /* Max number of threads */ unsigned int sc_wait_thrds; /* # of threads waiting to accept */ unsigned int sc_busy_thrds; /* # of threads processing request */ unsigned int sc_rqst_count; /* Total # of processed requests */ uint32_t sc_status; /* SMTPC status */ sm_str_P sc_hostname; /* SMTPC hostname */ sc_t_ctx_P *sc_scts; /* array of sct's */ };
The last element of that structure is an array of SMTPC thread contexts
( to sc_max_thrds
):
struct sc_t_ctx_S { sc_ctx_P sct_sc_ctx; /* pointer back to sc_ctx */ unsigned int sct_thr_id; /* thread id (debugging) */ unsigned int sct_status; st_cond_t sct_cond_rd; /* received data from QMGR */ sc_sess_P sct_sess; /* current session */ };
The condition variable denotes when data from the QMGR is received for this particular thread. The last element is a pointer to the SMTPC session:
struct sc_sess_S { sc_t_ctx_P scse_sct_ctx; /* pointer to thread context */ sm_file_T *scse_fp; /* file to use (SMTP) */ sm_str_P scse_rd; /* smtp read buffer */ sm_str_P scse_wr; /* smtp write buffer */ sm_str_P scse_str; /* str for general use */ sm_rpool_P scse_rpool; unsigned int scse_cap; /* server capabilities */ unsigned int scse_flags; unsigned int scse_state; struct in_addr *scse_client; /* XXX use a generic struct! */ sc_ta_P scse_ta; /* current transaction */ sessta_id_T scse_id; sm_rcb_P scse_rcb; /* rcb for communication with QMGR */ SOCK_IN_T scse_rmt_addr; /* Remote address */ st_netfd_t scse_rmt_fd; /* fd */ };
The SMTPC transaction structure looks as follows:
struct sc_ta_S { sc_sess_P scta_sess; /* pointer to session */ sm_rpool_P scta_rpool; sc_mail_P scta_mail; /* mail from */ sc_rcpts_P scta_rcpts; /* rcpts */ sc_rcpt_P scta_rcpt_p; /* current rcpt for reply */ unsigned int scta_rcpts_rcvd; /* # of recipients replies received */ unsigned int scta_rcpts_tot; /* number of recipients total */ unsigned int scta_rcpts_snt; /* number of recipients sent */ unsigned int scta_rcpts_ok; /* number of recipients ok */ unsigned int scta_rcpts_lmtp; /* #LMTP rcpts still to collect */ unsigned int scta_state; smtp_status_T scta_status; /* SMTP status code (if applicable) */ sessta_id_T scta_id; /* transaction id */ sm_str_P scta_cdb_id; /* CDB id */ };
In the main() function SMTPC calls several initialization function, one of which (sc_init(sc_ctx)) initializes the SMTPC context and allocates the array of SMTPC thread contexts. Then it starts the minimum number of threads (using start_threads(sc_ctx)) and the main thread takes care of signals afterwards. The threads run the function sc_hdl_requests() which receives the SMTPC context as parameter. This function looks for a free entry in the SMTPC thread context array, and allocates a new thread context which it assigns to that entry. It also allocates a new SMTPC session context. Thereafter it sets its status to SC_T_FREE and the first thread that is called informs the QMGR communication thread that SMTPC is ready to process tasks. The main part of the function processes a loop:
while (WAIT_THREADS(sc_ctx) <= max_wait_threads) { ... }
i.e., the thread stays active as long as the number of waiting threads is below the allowed maximum. This takes care of too many waiting threads by simply terminating them if the condition is false, in which case the thread cleans up after itself and terminates. Inside the loop the thread waits on its condition variable: sc_t_ctx->sct_cond_rd. If that wait times out, the current session (if one is open) will be terminated. If the QMGR actually has a task for this thread, then it first checks whether another thread should be started:
if (WAIT_THREADS(sc_ctx) < min_wait_threads && TOTAL_THREADS(sc_ctx) < MAX_THREADS(sc_ctx)) { /* start another thread */ }
and then handles the current session: handle_session(sc_t_ctx). This functions handles one SMTP client session. The state of the session is recorded in sc_sess->scse_state and can take one of the following values:
SCSE_ST_NONE | no session active |
SCSE_ST_NEW | new session |
SCSE_ST_CONNECTED | connection succeeded |
SCSE_ST_GREETED | received greeting |
SCSE_ST_OPEN | connection open |
SCSE_ST_CLOSED | close session |
Based on this state the function opens a session if that hadn't happened yet and performs one transaction according to the data from the QMGR. Depending on a flag in sc_sess->scse_flags the session is optionally closed afterwards.
As usual there is one thread that takes care of the communication between the module and the QMGR. This thread uses the following structure as context:
struct c2q_ctx_S { sc_ctx_P c2q_sc_ctx; /* pointer back to SMTPC context */ unsigned int c2q_status; /* status */ st_netfd_t c2q_fd; /* fd for communication */ unsigned int c2q_sc_id; /* smtpc id */ st_cond_t c2q_cond_rd; /* cond.var for read */ st_cond_t c2q_cond_wr; /* cond.var for write */ unsigned int c2q_maxses; /* max. # of open sessions */ sc_sess_P *c2q_sess; /* array of session ctx */ };
For initialization and termination the following two functions are provided:
sm_ret_T sm_c2q_init(sc_ctx_P sc_ctx, c2q_ctx_P c2q_ctx, unsigned int sc_idx, unsigned int maxses); sm_ret_T sm_c2q_stop(c2q_ctx_P c2q_ctx);
The initialization function starts a thread that executes the function void *sc_rcb_from_qmgr(void *arg) which receives the c2q context as parameter. This function receives an RCB from QMGR and notifies a thread that is associated with the task or finds a free SMTPC thread if it is a new task. To maintain the former information one array for session contexts c2q_sess is allocated; its size is maxses which is set to MAX_THREADS(sc_ctx) by the caller. This allows the communication module to find the correct session context based on the session (or transaction) identifier sent by the QMGR in its requests if the request refers to an open session. To find a free SMTPC thread, the array sc_scts in the SMTPC context is searched for a NULL entry.
Status information can be sent back to the QMGR using the function sm_ret_T sc_c2q(sc_t_ctx_P sc_t_ctx, uint32_t whichstatus, sm_ret_T ret, c2q_ctx_P c2q_ctx).
The SMTP client functionality is fairly restricted right now, but the system implements full pipelining (in contrast to sendmail 8 which uses MAIL as synchronization point). As usual, the SMTP client is also able to speak LMTP.
To open and close a SMTP session two functions are provided: sm_ret_T sc_sess_open(sc_t_ctx_P sc_t_ctx) and sm_ret_T sc_sess_close(sc_t_ctx_P sc_t_ctx). The function sm_ret_T sc_one_ta(sc_t_ctx_P sc_t_ctx) performs one SMTP transaction. As it can be seen from the prototypes, the only parameter passed to these function is the SMTPC thread context which contains (directly or indirectly) pointers to the current SMTPC session and transaction.
As shown in Section 4.6.2.1, the SMTPC session context contains three strings (see 3.16.7) that are used for the SMTP dialog and related operations.
Since the content database stores the mail in SMTP format, it can be sent out directly without any interaction. Similar to the SMTP server side this function access the file buffer directly to avoid too much copying.
Just some items to take into consideration for the implementation of the queue manager. These are written down here so they don't get lost...
Problem here: what about disk I/O? For example: calling fsync() for the logfile may cause the queue manager to block. If the thread implementation doesn't schedule another thread while one is blocked on disk I/O, then the entire process will hang and the queue manager will not respond to other requests.
If this actually happens (fairly likely on some OSs with user-land pthread implementation), and it causes a problem (performance), then it might be necessary to create another process that actually performs disk I/O on behalf of the QMGR.
How about a flow diagram? Some architectural overview would be nice.
The QMGR should not have to deal with many connections. SMTPS and SMTPC are multi-threaded themselves; we may have some SMTPS/SMTPC processes. However, it won't be so many that we have a problem with the number of connections to monitor, i.e., poll() should be sufficient.
Which threading model should we choose? Just a few worker threads that will go back to work whenever they encounter a blocking action? See Section 3.20 for discussion.
Do we need priority queues or can we serve all jobs FIFO?
The QMGR is based on the event threads library described in Section 4.3.8.
Currently access to tasks is controlled via the mutexes that control the queues: if a task is taken out of a queue, it is under the sole control of the thread that did it, no other thread can (should) access the task. Unless we change this access model, no mutex is necessary for individual tasks.
The queue manager has several data structures that can be concurrently accessed from different threads. Hence the access must be protected by mutexes unless there are other means which prevent conflicts. Some data structures can be rather larger, e.g., the various DBs and caches. Locking them for an extended time may cause lock contention. Some data structures and operations on them may allow to lock only a single element, others may require to lock the entire structure. Examples of the latter are adding and removing elements which in most cases require locking of the entire structure.
In some cases there might be ways around locking contention. For example, to delete items from a DB (or cache) the item might be just marked ``Delete'' instead of actually deleting it. This only requires locking of a single entry, not the entire DB. Those ``Delete'' entries can be removed in a single sweep later on (or during normal ``walk through'' operations), or they can be simply reclaimed for use. Question: what is more efficient? That is, if the DB is large and a walk through all elements is required to free a few then that might take too long, and we shouldn't hold a lock too long. We could gather ``Delete'' elements in a queue, then we don't have to walk through the entire DB. However, then the position of the elements must be fixed such that we can directly access and delete them, or at least lookup prior to deletion must be fast. If the DB internally may rearrange the location of entries then we can't keep a pointer to them. Question: will this ever happen? Some DB versions may do this, how about the ones we use? In some cases, some of the algorithm may require that DB elements don't move, but in most cases the elements just contain pointers to the data which isn't moved and hence can be accessed even if the DB rearranges its internal data structures.
If a system locks various items then there is a potential for deadlocks. One way to prevent this is a locking hierarchy, i.e., items are always locked in the same order. We probably need to define a locking order. It's currently unclear how this can be done such that access is still efficient without too much locking contention. See also Section 4.7.2 for possible ways around locking contention.
The main context for QMGR looks like this: (2004-04-14)
struct qmgr_ctx_S { sm_magic_T sm_magic; pthread_mutex_t qmgr_mutex; unsigned int qmgr_status; /* see below, QMGR_ST_* */ time_T qmgr_st_time; /* Resource flags */ uint32_t qmgr_rflags; /* see QMGR_RFL_* */ /* Overall value to indicate resource usage 0:free 100:overloaded */ unsigned int qmgr_total_usage; /* Status flags */ uint32_t qmgr_sflags; /* see QMGR_SFL_* */ sm_str_P qmgr_hostname; sm_str_P qmgr_pm_addr; /* <postmaster@hostname> */ /* info about connections? */ fs_ctx_P qmgr_fs_ctx; cdb_fsctx_P qmgr_cdb_fsctx; unsigned long qmgr_cdb_kbfree; edb_fsctx_P qmgr_edb_fsctx; unsigned long qmgr_edb_kbfree; unsigned long qmgr_ibdb_kbfree; /* SMTPS */ id_count_T qmgr_idc; /* last used SMTP id counter */ int qmgr_sslfd; /* listen fd */ int qmgr_ssnfd; /* number of used fds */ uint32_t qmgr_ssused; /* bitmask for used elements */ qss_ctx_P qmgr_ssctx[MAX_SMTPS_FD]; ssocc_ctx_P qmgr_ssocc_ctx; occ_ctx_P qmgr_occ_ctx; /* SMTPC */ int qmgr_sclfd; /* listen fd */ int qmgr_scnfd; /* number of used fds */ uint8_t qmgr_scused; /* bitmask for used elements */ qsc_ctx_P qmgr_scctx[MAX_SMTPC_FD]; sm_evthr_ctx_P qmgr_ev_ctx; /* event thread context */ iqdb_P qmgr_iqdb; /* rsc for incoming edb */ ibdb_ctx_P qmgr_ibdb; /* backup for incoming edb */ sm_evthr_task_P qmgr_icommit; /* task for ibdbc commits */ qss_opta_P qmgr_optas; /* open transactions (commit) */ sm_evthr_task_P qmgr_sched; /* scheduling task */ aq_ctx_P qmgr_aq; /* active envelope db */ edb_ctx_P qmgr_edb; /* deferred envelope db */ edbc_ctx_P qmgr_edbc; /* cache for envelope db */ sm_evthr_task_P qmgr_tsk_cleanup; /* task for cleanup */ qcleanup_ctx_P qmgr_cleanup_ctx; sm_maps_P qmgr_maps; /* map system context */ /* AR */ sm_evthr_task_P qmgr_ar_tsk; /* address resolver task */ int qmgr_ar_fd; /* communication fd */ qar_ctx_P qmgr_ar_ctx; sm_rcbh_T qmgr_rcbh; /* head for RCB list */ unsigned int qmgr_rcbn; /* number of entries in RCB list */ /* currently protected by qmgr_mutex */ qmgr_conf_T qmgr_conf; sm_log_ctx_P qmgr_lctx; sm_logconfig_P qmgr_lcfg; uint8_t qmgr_usage[QMGR_RFL_LAST_I + 1]; uint8_t qmgr_lower[QMGR_RFL_LAST_I + 1]; uint8_t qmgr_upper[QMGR_RFL_LAST_I + 1]; };
There are task contexts for QMGR/SMTPS (2004-04-15):
struct qss_ctx_S { sm_magic_T sm_magic; rcbcom_ctx_T qss_com; qmgr_ctx_P qss_qmgr_ctx; /* pointer back to main ctx */ int qss_id; /* SMTPS id */ uint8_t qss_bit; /* bit for qmgr_ssctx */ qss_status_T qss_status; /* status of SMTPS */ unsigned int qss_max_thrs; /* upper limit for threads */ unsigned int qss_max_cur_thrs; /* current limit for threads */ unsigned int qss_cur_session; /* current # of sessions */ };
and QMGR/SMTPC (2004-04-15):
struct qsc_ctx_S { sm_magic_T sm_magic; rcbcom_ctx_T qsc_com; qmgr_ctx_P qsc_qmgr_ctx; /* pointer back to main ctx */ int qsc_id; /* SMTPC id */ uint8_t qsc_bit; /* bit for qmgr_ssctx */ dadb_ctx_P qsc_dadb_ctx; /* pointer to DA DB context */ /* split this in status and flags? */ qsc_status_T qsc_status; /* status of SMTPC */ uint32_t qsc_id_cnt; };
Both refer to a generic communication structure:
struct qcom_ctx_S { qmgr_ctx_P qcom_qmgr_ctx; /* pointer back to main ctx */ sm_evthr_task_P qcom_tsk; /* pointer to evthr task */ sm_rcb_P qcom_rdrcb; /* rcb for rd */ SIMPLEQ_HEAD(, sm_rcbl_S) qcom_wrrcbl; /* rcb list for wr */ pthread_mutex_t qcom_wrmutex; /* protect qss_wrrcb */ };
The QMGR holds also the necessary data for SMTPS sessions (2004-04-15):
struct qss_sess_S { sessta_id_T qsses_id; time_T qsses_st_time; sm_rpool_P qsess_rpool; struct in_addr qsess_client; /* XXX use a generic struct! */ };
and transactions (2004-04-15):
struct qss_ta_S { sm_rpool_P qssta_rpool; time_T qssta_st_time; qss_mail_P qssta_mail; /* mail from */ qss_rcpts_T qssta_rcpts; /* rcpts */ unsigned int qssta_rcpts_tot; /* total number of recipients */ unsigned int qssta_flags; sessta_id_T qssta_id; cdb_id_P qssta_cdb_id; size_t qssta_msg_size; /* KB */ qss_ctx_P qssta_ssctx; /* pointer back to SMTPS ctx */ pthread_mutex_t qssta_mutex; };
The open transaction context (from SMTPS) stores information about outstanding transactions, i.e., those transactions in SMTPS that have ended the data transmission, but have not yet been confirmed by the QMGR. This data structure (fixed size queue) is used for group commits to notify the threads in the SMTPS servers that hold the open transactions.
struct qss_opta_S { unsigned int qot_max; /* allocated size */ unsigned int qot_cur; /* currently used (basically last-first) */ unsigned int qot_first; /* first index to read */ unsigned int qot_last; /* last index to read (first to write) */ pthread_mutex_t qot_mutex; qss_ta_P *qot_tas; /* array of open transactions */ };
Other structures that the QMGR currently uses are
All envelope DBs (INCEDB: ibdb and iqdb, ACTEDB, DEFEDB, EDB) have their own mutexes in their context structures.
IQDB contains references to qss_sess_T, qss_ta_T, and qss_rcpts_T.
The recipient structure in AQ uses these flags:
AQR_FL_IQDB | from IQDB |
AQR_FL_DEFEDB | from DEFEDB |
AQR_FL_SENT2AR | Sent to AR |
AQR_FL_RCVD4AR | Received from AR |
AQR_FL_RDY4DLVRY | Ready for delivery |
AQR_FL_SCHED | Scheduled for delivery, is going to be sent to DA |
AQR_FL_WAIT4UPD | Waiting for status update, must not be touched by scheduler |
AQR_FL_TO | Too long in AQ |
AQR_FL_TEMP | temporary failure |
AQR_FL_PERM | permanent failure |
AQR_FL_ARF | failure from SMAR |
AQR_FL_DAF | failure from DA |
AQR_FL_MEMAR | memory allocation for aqr_addrs failed, use fallback |
AQR_FL_ARINCOMPL | addr resolution incomplete |
AQR_FL_ARF_ADD | rcpt with SMAR failure added to delivery list |
AQR_FL_TO_ADD | rcpt with timeout added to delivery list |
AQR_FL_IS_BNC | this is a bounce |
AQR_FL_IS_DBNC | double bounce |
AQR_FL_DSN_PERM | perm error |
AQR_FL_DSN_TMT | timeout |
AQR_FL_DSN_GEN | bounce has been generated |
AQR_FL_CNT_UPD | rcpt counters have been updated, i.e., aq_upd_ta_rcpt_cnts() has been called |
AQR_FL_STAT_UPD | rcpt status (aqr_status) has been updated individually |
Section 2.4.3.3 explains how transaction and recipient data flows through the various DBs in QMGR. This section tries to tie the various steps to functions in QMGR (which are explained in Section 4.7.5)
The main() function of the QMGR is very simple (Notice: in almost all example code error checking etc has been removed for simplicity).
ret = sm_qmgr_init0(qmgr_ctx); /* basic initialization */ ret = sm_qmgr_rdcf(qmgr_ctx); /* read configuration */ ret = sm_qmgr_init(qmgr_ctx); /* initialization after configuration */ ret = sm_qmgr_start(qmgr_ctx); /* start all components */ ret = sm_qmgr_loop(qmgr_ctx); /* start event threads loop */ ret = sm_qmgr_stop(qmgr_ctx); /* stop all componets */
where all functions do what is obvious from their name.
The main loop sm_qmgr_loop() simply calls evthr_loop(qmgr_ctx->qmgr_ev_ctx).
sm_qmgr_init0() performs basic initialization, sm_qmgr_rdcf() reads the configuration (currently (2004-02-13) only command line parameters), and sm_qmgr_init() initializes various QMGR data structures.
sm_qmgr_start() starts various tasks:
ret = sm_qm_stli(qmgr_ctx); ret = sm_qm_stcommit(qmgr_ctx, now); ret = sm_qm_stsched(qmgr_ctx, now);
sm_qm_stli() starts two (event thread) tasks listening for connections from SMTPS and SMTPC using the function sm_qm_smtpsli() and sm_qm_smtpcli(). sm_qm_stcommit() starts the periodic commit task and sm_qm_stsched() starts the scheduling task.
The two listener tasks sm_qm_smtpsli() and sm_qm_smtpcli() do basically the same: wait for a new connection from the respective service (SMTPS/SMTPC), ``register'' it in the QMGR context, and start one task sm_qmgr_smtpX(sm_evthr_task_P tsk) that takes care of the communication with the SMTPX process. Notes:
The communication tasks sm_qmgr_smtpX() dispatch a read function sm_smtpX2qmgr() or a write function sm_qmgr2smtpX to deal with the communication request. Those functions use the read RCB qsX_rdrcb to read (sequentially) data from SMTPS/SMTPC and a list of write RCBs qsX_wrrcbl to write data back to those modules. Access to the latter is protected by a mutex and RCBs are appended to the list by various functions. The communication tasks are activated via read/write availability, where the write availability is additionally triggered by functions that put something into the list of write RCBs (otherwise the task would be activated most of the time without actually having anything to do).
The read functions sm_smtpX2qmgr() receive an RCB qsX_ctx->qsX_rdrcb from the module and then call the function sm_qsX_react() to decode the RCB and act accordingly. Those functions may return different value to determine what should happen next with the task. If it is an error, the task terminates (which might be overkill), other values are: QMGR_R_WAITQ (translated to EVTHR_WAITQ), QMGR_R_ASYNC (translated to EVTHR_OK), EVTHR_DEL which cause the task to terminate; other values are directly returned to the event threads library. QMGR_R_ASYNC means that the task has already been returned to the event thread system (waitq), see Section 3.20.5.1.3.
The write function sm_qmgr2mod() locks the mutex qsX_wrmutex, then checks whether the list qsX_wrrcbl of RCBs is empty. If it is, then the task returns and turns off the WRITE request. Otherwise it sends the first element to the respective module using sm_rcb_snd(), removes that element and if the list is empty thereafter turns off the WRITE when it returns. Notice: it currently does not go through the entire list trying to write it all. This is done to prevent the thread from blocking, it is assumed that a single RCB can be sent. This might be wrong in which case the thread blocks (and hopefully another runs), which might be prevented by requiring enough space in the communication buffer (can be set via setsockopt() for sockets).
The commit task sm_qm_stcommit() is responsible for group commits. It checks the list of open transactions qmgr_ctx->qmgr_optas and if it isn't empty calls q_ibdb_commit(qmgr_ctx) which in turns commits the current INCEDB and then notifies all outstanding transactions of this fact. This is done by going through the list and adding an RCB with the commit information to the list of RCBs qss_wrrcbl for the task qss_ta->qssta_ssctx that handles the transaction qss_ta.
The scheduling function sm_qm_stsched() is supposed to implement the core of the QMGR.
A recipient goes through the following stages:
The function sm_qs2c_task(qsc_ctx_P qsc_ctx, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe, sessta_id_P da_sess_id, sessta_id_P da_ta_id) creates one session with one transaction for SMTPC. The protocol is as follows:
RT_Q2C_ID | SMTPC identifier |
RT_Q2C_DCID | delivery class identifier |
RT_Q2C_ONESEID | Session id, only one transaction (hack) |
RT_Q2C_SRVIP4 | IPv4 address of server (hack) |
RT_Q2C_NTAID | New transaction id |
RT_Q2C_MAIL | Mail from address |
RT_Q2C_CDBID | CDB identifier |
RT_Q2C_RCPT_IDX | recipient index |
RT_Q2C_RCPT | recipient address |
Additional recipients can be added via sm_qs2c_add_rcpt(qsc_ctx_P qsc_ctx, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe) which justs adds recipient index and address to the RCB.
If the transaction denotes a bounce message only one recipient can be send and instead of the record tag RT_Q2C_NTAID either RT_Q2C_NTAIDB (bounce) or RT_Q2C_NTAIDDB (double bounce) is used. Additionally an entire error text is sent using RT_Q2C_B_MSG (bounce message) as record tag. Currently this does not include the headers. It should be something like:
Hi! This is the sendmail X MTA. I'm sorry to inform you that a mail from you could not be delivered. See below for details.
listing recipient address, delivery host, and delivery message for each failed recipient.
The main QMGR context contains three arrays which store the lower and upper thresholds for various resources and the current usage. A single scalar contains the overall resource usage.
uint8_t qmgr_usage[QMGR_RFL_LAST_I + 1]; uint8_t qmgr_lower[QMGR_RFL_LAST_I + 1]; uint8_t qmgr_upper[QMGR_RFL_LAST_I + 1]; /* Overall value to indicate resource usage 0:free 100:overloaded */ unsigned int qmgr_total_usage;
To store the amount of free disk space, two data structures are used: one to store the amount of available disk space per partition (see also Section 3.4.10.13.1):
struct filesys_S { dev_t fs_dev; /* unique device id */ unsigned long fs_kbfree; /* KB free */ unsigned long fs_blksize; /* block size, in bytes */ time_T fs_lastupdate; /* last time fs_kbfree was updated */ const char *fs_path; /* some path in the FS */ };
and one which contains an array of those individual structures:
struct fs_ctx_S { #if SM_USE_PTHREADS pthread_mutex_t fsc_mutex; #endif /* SM_USE_PTHREADS */ int fsc_cur_entries; /* cur. number of entries in fsc_sys*/ int fsc_max_entries; /* max. number of entries in fsc_sys*/ filesys_P fsc_sys; /* array of filesys_T */ };
The function qm_comp_resource(qmgr_ctx_P qmgr_ctx, thr_lock_T locktype) computes a value that is a measure for the overall resource usage: qmgr_total_usage. Moreover, the function also invokes functions that return the amount of free disk for a DB that is stored on disk: cdb_fs_getfree(), edb_fs_getfree(), and ibdb_fs_getfree(). Each of these functions receives a pointer to a variable of type fs_ctx_T and a pointer to a integer variable which will contain the amount of available disk space after a succesful return. The functions themselves check the last update timestamp to avoid invoking system functions too often. Since each DB operations tries to keep track of the amount of disk space changes, this should return a reasonable estimate of the actual value.
The function q2s_throttle(qss_ctx_P qss_ctx, sm_evthr_task_P tsk, unsigned int nthreads) informs one SMTP server (referenced by qss_ctx) about the new maximum number of threads it should allow.
The generic function qs_control(qss_ctx_P qss_ctx, int direction, unsigned int use, unsigned int resource) checks the new usage of a resource and based on the input parameter direction decides whether to (un)throttle one SMTP server. qs_control() has the following behavior: throttle the system iff
else unthrottle the system iff
The specific function qs_unthrottle(qss_ctx_P qss_ctx) checks whether one SMTP server can be unthrottled based on the current resource usage. It is called by sm_smtps_wakeup() which is scheduled by sm_qss_wakeup(qmgr_ctx_P qmgr_ctx, thr_lock_T locktype) as a sleep() task. sm_qss_wakeup() in turn is invoked from qm_resource() when all resources are available (again).
The function qs_comp_control(qss_ctx_P qss_ctx, bool unthrottle) is invoked from sm_qss_react(). It will only check whether the address resolver (SMAR) is available and accordingly call qs_control().
The requirements for updating the recipient status after a delivery attempt has been made are described in Section 2.4.3.4. Section 3.4.16 describes the the functionality, which distinguishes several reasons that require updating the status of a recipient:
Before examining these cases, a short note about updating the various queues: entries in IQDB are removed immediately if the recipient was in that queue (this can be done because the recipient is safely stored in DEFEDB or IBDB). To update DEFEDB and IBDB more complicated measures are taken: a request is queued that the status must be changed (this may also mean removal of an entry from the respective DB) while the function goes through all the recipients of the transaction. DEFEDB provides functions to do this: edb_ta_rm_req() and edb_rcpt_rm_req() which are described in Section 4.10.4. See Section 4.10.2 about the implementation of updating IBDB based on a list of change requests. However, if a recipient was successfully delivered on the first attempt, IBDB can be updated directly: even if further operations fail (which are related to updating the recipient status), the recipient can be removed without any negative consequences.
As explained in Section 3.4.16.1 it is necessary to preserve the order of updates for recipients and transactions when those changes are committed to DEFEDB.
To update the status for some (failed) recipients (case 2a) the function
qm_fr_sc_rcpts(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_rcb_P rcb, unsigned int err_st)
is used, the RCB contains the recipient status from a DA. This function simply takes the data out of the RCB and updates the recipient status in the active queue. For this it invokes
aq_rcpt_status(aq_ctx, da_ta_id, idx, rcpt_status, err_st, errmsg),
which updates the field aqr_status_new that is later on used for aq_upd_ta_rcpt_cnts() (see 4.7.5.7.6) which requires the previous and the new status of a recipient to determine which recipients counters to change in the transaction context.
To update the status for an entire transaction (case 2b) from a DA the function
qda_update_ta_stat(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_ret_T status, unsigned int err_st, dadb_ctx_P dadb_ctx, dadb_entry_P dadb_entry, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, thr_lock_T locktype)
is called. This function walks through all recipients of a transaction and updates the various DBs and counters based on the individual recipient status (which may be different from the overall transaction status). See Section 2.4.3.4 for a high-level description.
The function qda_update_ta_stat() simple invokes
qda_upd_ta_stat(qmgr_ctx, da_ta_id, status, err_st, dadb_ctx, dadb_entry, aq_ta, aq_rcpt, &edb_req_hd, &ibdb_req_hd, locktype)
(see 4.7.5.7.4) and then writes the changes for DEFEBD and IBDB to disk (unless the result is an error).
The function
qda_upd_ta_rcpt_stat(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_ret_T status, unsigned int err_st, dadb_ctx_P dadb_ctx, dadb_entry_P dadb_entry, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, edb_req_hd_P edb_req_hd, ibdb_req_hd_P ibdb_req_hd, thr_lock_T locktype)
can be used to update an entire transaction,
i.e., all recipients of that transaction,
or just an individual recipient.
These two cases are distinguished by specifying
exactly one of the DA transaction identifier da_ta_id
(i.e., the id must be valid - the first character must not be '0')
and the recipient aq_rcpt (i.e., must not be NULL).
This function is also used in other places to update the status of a single recipient, e.g., for failures from the address resolver (called from the scheduler when it comes across such a recipient). qda_upd_ta_rcpt_stat() invokes
q_upd_rcpt_stat(qmgr_ctx, ss_ta_id, status, err_st, aq_ta, aq_rcpt, edb_req_hd, ibdb_req_hd, &iqdb_rcpts_done, THR_NO_LOCK).
(see 4.7.5.7.5) for all recipients that need to be updated. Afterwards it checks whether iqdb_rcpts_done is greater than zero in which case the function qda_upd_iqdb(qmgr_ctx, iqdb_rcpts_done, ss_ta_id, cdb_id, ibdb_req_hd) is invoked, see 4.7.5.7.10.
If there are no deliverable recipients in AQ anymore for the current transaction or it is required to update the transaction, then the function performs the following steps: first check whether there are no recipients at all, i.e., aq_ta->aqt_rcpts_left is zero, which means that the transaction and the data file (CDB) must be removed. If that's not the case but the transaction needs to be updated in DEFEDB, then a request is appended to the DEFEDB request list and the flag AQ_TA_FL_DEFEDB is set4.3and the flags AQ_TA_FL_EDB_UPD_C and AQ_TA_FL_EDB_UPD_R are cleared. A transaction needs to be updated if at least one of the following conditions helds:
Note: When a RCPT is updated in DEFEDB then TA is in DEFEDB or
the counters change (the counters do not change iff the RCPT status does
not change; if the RCPT status does not change, then the only reason
the RCPT is written to DEFEDB is because it was there earlier
and hence TA was there too - that is a pre-requirement of the algorithm:
a RCPT is only in DEFEDB iff its TA is there too).
Hence we can simplify
to
.
Note: this is a side-effect of the current scheduler which keeps recipients in AQ until a delivery attempt is complete. If the scheduler changes to include pre-empting then the update logic must be modified to take care of that, i.e., the requirement - a RCPT is only in DEFEDB iff its TA is there too - does not necessarily hold anymore.
This can be expressed as:
Without the simplification it is:
If aq_ta->aqt_rcpts_left is zero and the transaction is in DEFEDB, then a remove request is appended to the request list.
If there are no more recipients in AQ for the TA (aq_ta->aqt_rcpts == 0), then the TA is removed from AQ.
If aq_ta->aqt_rcpts_left is zero and the CDB identifier is set (which must be the case), then the entry is removed from the CDB.
Finally, if the DA TA identifier is valid and the DA context is not NULL, then the session is closed (which can be done because the scheduler is currently a hack that only uses one transaction per session).
The function
q_upd_rcpt_stat(qmgr_ctx_P qmgr_ctx, sessta_id_T ss_ta_id, sm_ret_T status, unsigned int err_st, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, edb_req_hd_P edb_req_hd, ibdb_req_hd_P ibdb_req_hd, unsigned int *piqdb_rcpts_done, thr_lock_T locktype)
in turn updates the status for one recipient. If the recipient is in IQDB and it won't be retried, i.e.,
(rcpt_status == SM_SUCCESS || smtp_reply_type(rcpt_status) == SMTP_RTYPE_FAIL || !AQR_MORE_DESTS(aq_rcpt) || AQR_DEFER(aq_rcpt))
then it is immediately removed from IQDB. Next the recipient counters in the transaction are updated:
aq_upd_ta_rcpt_cnts(aq_ta, aq_rcpt->aqr_status, rcpt_status)
(see 4.7.5.7.6).
Then one of two functions is called:
if the recipient has been delivered or is a double bounce that can't be delivered (and hence will be dropped on the floor); see 4.7.5.7.7.
for temporary or permanent errors; see 4.7.5.7.8.
Afterwards it is checked whether there can be no more retries for that recipient in which case it is removed from AQ4.4, otherwise the next destination host will be tried and the flags AQR_FL_SCHED, AQR_FL_WAIT4UPD, AQR_FL_STAT_NEW, and AQR_FL_ERRST_UPD are cleared.
The counters in the transaction are updated via
aq_upd_ta_rcpt_cnts(aq_ta, oldstatus, newstatus)
This function sets the flag AQ_TA_FL_EDB_UPD_C if a counter has been changed.
Case 1 (from 4.7.5.7.5): q_upd_rcpt_ok() is responsible to remove a recipient from the queues in which it is stored.
q_upd_rcpt_ok(qmgr_ctx_P qmgr_ctx, sessta_id_T ss_ta_id, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, ibdb_rcpt_P ibdb_rcpt, rcpt_id_T rcpt_id, edb_req_hd_P edb_req_hd)
In case of a double bounce it decrements the number of recipients left and logs the problem (dropped a double bounce). Then it removes the recipient from IBDB if it is stored there (directly, without going via the request queue), or from DEFEDB by appending the remove operation to the request queue. Finally, if the recipient is a (double) bounce, the function qda_upd_dsn() is called to remove the recipients for which the DSN has been generated; see 4.7.5.7.9.
Case 2 (from 4.7.5.7.5): q_upd_rcpt_fail() examines rcpt_status to decide whether it is a temporary error or a permanent failure. In the former case the time in the queue is checked: if it exceeds a limit and there are no more destination hosts to try or the recipient must be deferred (e.g., address resolver failure), then two flags are set: AQR_FL_PERM and AQR_FL_DSN_TMT, in the latter case the flags AQR_FL_PERM and AQR_FL_DSN_PERM are set.
If the recipient can't be delivered and is not a double bounce itself then qm_bounce_add(qmgr_ctx, aq_ta, aq_rcpt, errmsg) is called to create a bounce message for this recipient; see 4.7.5.8.1.
If there are no more destinations to try or the recipient must be deferred (because of an address resolver problem or because it is too long in AQ), or a bounce message has been generated4.5, then the number of tries is incremented, the next time to try is computed if necessary (i.e., recipient has only a temporary failure or it has a permanent failure but no bounce because generation of bounce recipient failed), and a request to update the recipient status in DEFEDB is appended to the request list. If that was successful and the recipient is a (double) bounce then qda_upd_dsn(qmgr_ctx, aq_ta, aq_rcpt, ss_ta_id, edb_req_hd) is called to remove the recipients for which this was a bounce (see 4.7.5.7.9).
If the recipient must be retried, i.e., it is not permanent failure then it is added to the EDB cache: edbc_add(qmgr_ctx->qmgr_edbc, rcpt_id, aq_rcpt->aqr_next_try, false)
If the recipient was in IQDB then a status update is appended to the request list for IBDB using the function ibdb_rcpt_app().
Finally q_upd_rcpt_fail() returns a flag value that indicates either an error or whether some actions (in this case: activate the address resolver) need to be performed by the caller.
qda_upd_dsn(qmgr_ctx_P qmgr_ctx, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sessta_id_T ss_ta_id, edb_req_hd_P edb_req_hd)
is responsible for removing the recipients for which the DSN has been generated, which is done by going through its list (aq_rcpt->aqr_dsns[]) and appending remove requests to the DEFEDB change queue. It also updates the number of recipients left if necessary, i.e., if the DSN was for more than one recipient, and resets the used data structures.
qda_upd_iqdb(qmgr_ctx_P qmgr_ctx, unsigned int iqdb_rcpts_done, sessta_id_T ss_ta_id, cdb_id_P cdb_id, ibdb_req_hd_P ibdb_req_hd) updates IQDB status for one transaction; if all recipients have been delivered then it removes the transaction from IQDB using iqdb_trans_rm(), adds a request to remove it from IBDB via ibdb_ta_app() and removes it from the internal DB using qss_ta_free().
The current implementation of sendmail X does not support DSN per RFC 1894, but it creates non-delivery reports in a ``free'' format; see also 4.7.5.5.
If a bounce message is generated the function
qm_bounce_add(qmgr_ctx_P qmgr_ctx, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sm_str_P errmsg)
is used. See Section 4.10.3 about the data structures that are relevant here (AQ transaction and recipient), and Section 4.7.3 about the flags (esp. those containing DSN or BNC in the name).
To generate a bounce, a new recipient is created (the ``bounce recipient'') using the function qm_bounce_new() unless the transaction already has a bounce recipient (that hasn't been scheduled yet). This recipient has an array aqr_dsns which contains the indices of the recipients for which this recipient contains the bounce message. Whether a transaction already has a (double) bounce recipient is recorded in the transaction (see 4.10.3): aqt_bounce_idx and aqt_dbl_bounce_idx. These can be reused to add more recipients to a bounce recipient (instead of sending one DSN per bounce).
The function
qm_bounce_new(qmgr_ctx_P qmgr_ctx, aq_ta_P aq_ta, bool dbl_bounce, aq_rcpt_P *aq_rcpt_bounce)
creates a new bounce recipient. It uses aq_ta->aqt_nxt_idx as the index for the bounce recipient (after checking it against the maximum value: currently the index is only 16 bits) and stores the value in aqt_bounce_idx or aqt_dbl_bounce_idx, respectively. aq_rcpt_add() is used to add a new recipient to AQ, then an array of size aq_ta->aqt_rcpts_tot is created to hold the indices of those recipients for which this will be a bounce. This array is in general too big, some optimization can be applied (later on). qm_bounce_new() then fills in the data for the recipient and sends it to the address resolver using qmgr_rcpt2ar(). It also increases the number of recipients for this transaction (aqt_rcpts_tot and aqt_rcpts). This may create an inconsistent state since the bounce recipient is only in AQ, not in a persistent DB (DEFEDB), see 4.7.5.8.3.
A bounce recipient is not written to a persistent DB when it is generated, but the failed recipients are written to DEFEDB. Only when a delivery attempt for a bounce message fails the bounce recipient is written to DEFEDB and the recipients for which it is a bounce are removed by qda_upd_dsn(), see 4.7.5.7.9. Hence when the system crashes before the bounce is delivered (or at least tried and then written to the queue), the bounce will be lost. However, the original recipients that caused the bounce are in DEFEDB, and hence the bounce message can be reconstructed.
Alternatively the bounce recipient can be written to one of the persistent queues and the original recipients can be removed, this could reduce the on-disk storage. However, it requires that the RCB to store the bounce recipient is fairly large because it contains the complete error text for each failed recipient and a large list of recipient indices. Theoretically it might also be possible to store the error text in IBDB, but that most likely requires changes to the storage format which does not make much sense because bounces should occur infrequently. Moreover, recovery of IBDB would become more complicated. Additionally, the failed recipient might not be in IBDB but in DEFEDB, hence making it even harder to correctly reconstruct the bounce data because it can be spread out over various places.
This is a place where optimizations are certainly possible, but it is currently not important enough (it is more important to implement the full sendmail X.0 system instead of optimizing bounces).
As explained in Section 2.4.6.3 there are 8 states with respect to delayed DSNs which means 3 bits are required for a recipient to denote those states.
The complexity of updating recipients in case 2(b)i is very high. That can cause programming errors and maybe inconsistencies in AQ or DEFEDB. Question: Is there are simpler approach to the problem?
Here is one proposal: write the dDSN to DEFEDB and immediately change the state to 2(b)ii, as the dDSN will be safely stored. Note: if updating DEFEDB fails then the recipient for which this is a dDSN is not written to DEFEDB either, hence this approach should be safe. Moreover, it is significantly simpler, and causes only a bit extra disk I/O because the recipient is written to DEFEDB anyway. This approach causes the handling of bounces and dDSNs to be different, which however should not matter much as the approach explained above is definitely different from bounce handling. However, this approach causes another problem: dDSN will be sent individually; see Section 2.4.6.
Why are dDSNs and bounces (``failure'' DSNs: fDSN) so different? A fDSN means that the original recipients will be removed while they will be retried for a dDSN. So for a fDSN DEFEDB must be changed anyways (recipients must be removed), but for a dDSN it could (theoretically) be avoided.
The address resolver sends resolved addresses to QMGR which are used in turn by the scheduler for delivery attempts. The basic protocol returns a DA that must be used for delivery and a number of addresses to try.
A significantly more complicated part is alias expansion (see also Section 3.4.15.1).
The address resolver is called smar for sendmail address resolver since ar is already used. SMAR is based on the event threads library described in Section 4.3.8.
The description below is based on the implementation from 2003-06-26 and hence not up to date.
The AR uses a very simple mailertable which must be in a strict form: a domain part of an e-mail address, then one or more whitespace characters, followed by the IPv4 address (in square brackets) of the host to which the mail should be sent or the hostname itself. If an entry is not found in the mailertable or the RHS is a hostname, DNS lookups (for MX and A records) are performed (see Section 4.3.10 for the DNS library).
The main context for the AR has currently the following elements:
struct smar_ctx_S { sm_magic_T sm_magic; pthread_mutex_t smar_mutex; int smar_status; /* see below, SMAR_ST_* */ sm_evthr_ctx_P smar_ev_ctx; /* event thread context */ int smar_qfd; /* fd for communication with QMGR */ sm_evthr_task_P smar_qmgr_tsk; /* QMGR communication task */ rcbcom_ctx_T smar_qmgr_com; /* QMGR communication context */ sm_log_ctx_P smar_lctx; ipv4_T smar_nameserveripv4; unsigned int smar_dns_flags; dns_tsk_P smar_dns_tsk; dns_mgr_ctx_P smar_dns_mgr_ctx; bht_P smar_mt; /* "mailertable" (XXX HACK) */ };
To store addresses sent by the QMGR to the AR for resolving, the following structure is used:
struct smar_rcpt_S { sm_str_P arr_rcpt_pa; /* printable addr */ rcpt_id_T arr_rcpt_id; /* rcpt id */ sm_str_P arr_domain_pa; unsigned int arr_flags; /* status of address resolving */ unsigned int arr_da; /* DA */ int arr_n_mx; /* total number of MX records */ int arr_c_mx; /* current number of MX records */ int arr_n_a; /* total number of A records */ smar_dns_T *arr_res; /* array of results */ ipv4_T arr_ip4; /* single A record */ sm_rcbe_P arr_rcbe; /* RCB to write back result */ smar_ctx_P arr_smar_ctx; /* pointer back to SMAR context */ };
arr_n_mx stores the total number of MX records after the initial query for an MX record comes back. arr_c_mx keeps track of the current number of answer for those MX records, i.e., when both variables have the same value then all outstanding requests have been answered and the complete result can be returned to the QMGR.
The array of results has the following type:
struct smar_dns_S { unsigned int ardns_ttl; /* TTL from DNS */ unsigned short ardns_pref; /* preference from DNS */ sm_cstr_P ardns_name; /* name from DNS */ int ardns_n_a; /* number of A records */ ipv4_T *ardns_a; /* pointer to list of A records */ };
The address resolver receives recipient addresses from QMGR, creates a recipient structure using
smar_rcpt_new(smar_ctx, &smar_rcpt)
fills in the necessary data, and invokes
smar_rcpt_rslv(smar_ctx, smar_rcpt)
which first checks the ``mailertable'' and if it can't find a matching entry, then it will invoke the DNS resolver (see Section 4.3.10):
dns_req_add(dns_mgr_ctx, q, T_MX, smar_rcpt_cb, smar_rcpt)
The callback function
smar_rcpt_cb(dns_res_P dns_res, void *ctx)
locks smar_ctx and analyzes dns_res as follows.
If the result is DNSR_NOTFOUND and the lookup was for an MX record then it will simply try to find A records. If it is another error then a general error handling section will be invoked.
First it will be checked whether the query was for an A record in which case arr_c_mx is incremented. Then the error type (temporary or permanent) is checked and a flag in smar_rcpt is set in the former case. An error is returned to the caller iff the query was for an MX record or the query was for an A record and all records have been received and there was a temporary error. If that is not the case, but all open requests are answered, then the results are returned to the caller using smar_rcpt_re_all(smar_rcpt) and thereafter the rcpt structure is freed using smar_rcpt_free(smar_rcpt).
If the result was not an error then two cases have to be taken care of:
Expanding aliases makes the address resolver significantly more complex. Unfortunately the current implementation does not allow for a simple layer around the current recipient resolver. This is due to the asynchronous nature of the DNS library which requires the use of callback functions. As explained in the previous section, the callback function checks whether all results arrived in which case it will put the data into an RCB and send it back to QMGR.
Question: how to implement owner-alias and alias-request? Problem: bounces go to owner-alias (see also Section 2.6.7). Does this mean a transaction should be ``cloned'' or should the transaction context be extended? What if mail is sent to two lists and two "normal" rcpts?
mail from:<sender> rcpt to:<list1> rcpt to:<list2> rcpt to:<rcpt1> rcpt to:<rcpt2>
Usually a bounce goes to sender. However, if mail to listX fails, the function that creates bounces needs to handle this differently. Here's a list of possible ways to handle owner- aliases and the problems with them.
Problems:
Problems:
Problems:
It might be possible to add another counter: aqt_clones which counts the number of cloned transactions. A cloned transaction contains a link to the original transaction (just the TA id). If the number of recipients aqt_rcpts_left is decreased to zero, then it is checked whether the TA is cloned in which case the clone counter in the original TA is decreased. A CDB entry is removed if the two counters aqt_rcpts_left and aqt_clones in a original TA are both zero.
Overall, proposal 3 seems like the ``cleanest'' solution even though it currently (2004-06-30) has the biggest impact on the implementation, i.e., it requires a lot of changes.
Note: this is based on an old document and might not be up to date; it is listed here only as a starting point for the sendmail X protocol.
The protocol consists of simple data strings which are tagged with a single character. The first element of each exchange is a 32 bit integer which denotes the total length of the string to follow, including the command and maybe NUL characters. Note: since we control both sides, we can rely on those. If the data is incorrect, we can simply terminate the connection and log an error.
The characters for specifying the data are:
MTA to Milter (upper case char):
char | meaning | parameters |
A | Abort | - |
B | Body chunk | buffer NUL |
C | Connection information | hostname NUL family port sockinfo |
D | Define macro | Where4.6 ( name NUL value NUL ) * |
E | final body chunk (End) | buffer NUL |
H | HELO | heloname NUL |
L | Header | field NUL value NUL |
M | MAIL from | sender NUL ( arg NUL )* |
N | End of headers (EOH) | - |
O | Option negotiation | version fflags pflags |
Q | QUIT (once per connection) | - |
R | RCPT to | recipient NUL ( arg NUL )* |
The buffers that transfer body chunks can contain NUL characters, hence the length parameter is important. For fflags and pflags see 4.9.1.2.
version, fflags, and pflags are 32 bit integers (network format). family is one byte:
family | data |
U | unknown; SOCKADDR = NULL |
4 | AF_INET |
6 | AF_INET6 |
L | AF_UNIX |
port is a 16 bit integer (currently assumed to be a u_short - need uint16_t to assure this) (network format) (empty for U).
family | sockinfo |
U | empty |
4 | string (inet_aton/ntoa) |
6 | string (inet_pton/ntop) (optional IPv6: address prefix) |
L | path |
unless U is used, the string includes the trailing NUL.
Milter to MTA (lower case and punctuation char, except for negotiation):
char | meaning | parameters |
O | Option negotiation | version fflags pflags |
+ | add recipient | rcpt NUL |
- | remove recipient | rcpt NUL |
a | accept | - |
b | replace body (chunk) | buffer NUL |
c | continue | - |
d | discard | - |
h | add header | field NUL value NUL |
m | modify header | index (32 bit) field NUL replacement NUL |
p | progress | - |
q | quarantine | reason NUL |
r | reject | - |
t | tempfail | - |
y | reply code | rcode SPACE xcode SPACE message NUL |
Macros and their values can precede Connect, HELO, MAIL, and RCPT. The first character (Where) after the command denotes the command character to which those macros belong (C, H, M, R). Implementation note: the entire "string" is stored in the context (array of pointers), and searched in smfi_getsymval().
The default macros expected to be passed from MTA to milter per-command are (this can be changed using the Milter.macros.* options):
Where | Macros |
C | daemon_name, if_name, if_addr, j, _ |
H | tls_version, cipher, cipher_bits, cert_subject, cert_issuer |
M | i, auth_type, auth_authen, auth_ssf, auth_author, |
mail_mailer, mail_host, mail_addr | |
R | rcpt_mailer, rcpt_host, rcpt_addr |
All macros stay in effect from the point they are received until the end of the connection with the exception of rcpt_* which should only be set during the call to xxfi_envrcpt().
Note: there should be a better way to deal with differences in the available actions and extra commands: capabilities and requirements should be announced and then each side decides on its own whether those are sufficient and fulfilled, respectively.
Here are the protocol exchange types:
RT_S2M_NID | uint32 | id of SMTPS, new connection |
RT_S2M_ID | uint32 | id of SMTPS |
RT_S2M_CID | uint32 | id of SMTPS, closing (shutting down) |
RT_S2M_MAXTHRDS | uint32 | max number of threads |
RT_S2M_NSEID | str | new session id |
RT_S2M_EHLO | str | ehlo string |
RT_S2M_SEID | str | session id |
RT_S2M_CSEID | str | close session id |
RT_S2M_CLTIP4 | uint32 | client IPv4 address |
RT_S2M_NTAID | str | new transaction id |
RT_S2M_TAID | str | transaction id |
RT_S2M_MAIL | str | MAIL from |
RT_S2M_RCPT_IDX | uint32 | rcpt idx |
RT_S2M_RCPT | str | RCPT to |
RT_S2M_DATA | str | data |
RT_S2M_BODY | str | body chunk |
RT_S2M_DOT | str | final data chunk |
RT_S2M_ARG | str | argument (for MAIL, RCPT) |
RT_S2M_MACW | uint32 | macro: Where (see 4.9.1.1) |
RT_S2M_MACN | str | macro name |
RT_S2M_MACV | str | macro value |
RT_S2M_PROT | uint32 | SMTPS - PMILTER protocol version |
RT_S2M_CAP | uint32 | SMTPS capabilities |
RT_S2M_REQ | uint32 | SMTPS requirements |
Initial setup: SMTPS sends libmilter its id (RT_S2M_NID) and the maximum number of threads (RT_S2M_MAXTHRDS) it is going to create. The latter value can be used by libmilter to size internal tables if necessary.
ToDo: option negotation: must be better than what is done in sm8 (see 4.9.1.2), see 4.9.2.1 for possible solutions.
For the first test implementation it is: SMTP server sends RT_S2M_PROT, RT_S2M_CAP, RT_S2M_REQ, RT_S2M_MAXTHRDS. Milter responds with RT_M2S_ID, RT_M2S_PROT, RT_M2S_CAP, RT_M2S_REQ,
The rest follows ESMTP (all protocol exchanges include RT_S2M_ID as first element).
Replies use the following record types:
RT_M2S_ID | uint32 | id of SMTPS |
RT_M2S_SEID | str | session id |
RT_M2S_RCODE | uint32 | SMTP reply code |
RT_M2S_PROT | uint32 | SMTPS - PMILTER protocol version |
RT_M2S_CAP | uint32 | PMILTER capabilities |
RT_M2S_REQ | uint32 | PMILTER requirements |
Each reply must contain the SMTP server id RT_M2S_ID and the session id RT_M2S_SEID.
The possible reply codes are:
250 | accept |
350 | continue |
450 | temporary error |
550 | permanent error |
As specified in the sendmail 8 libmilter documentation ``accept'' means that the entire session or transaction will be accepted and no further commands should be sent to the milter. ``continue'' means the milter wants session/transaction to continue, while the other two have the obvious meaning.
If an SMTP server shuts down, it notifies libmilter by sending RT_S2M_CID.
Basic idea:
protocol version: major/minor, major must match, minor can be different.
Announce capabilities and requirements (both sides). libmilter: a milter can register a callback the decides whether the capabilities (of libmilter and MTA: libmilter can be dynamically linked, or some source code is linked against an old version) are sufficient and give its OK (or error and hence abort).
The most obvious difference is that a policy milter cannot change the body of a mail nor can it add or replace headers. It also cannot delete or add recipients.
Another difference is that sendmail X does not send headers individually to the policy milter, it just send the mail (in chunks). It is probably the best to provide a library function that can parse headers which can be called by a milter if necessary. It would maintain a state (how far it read in the buffer). A small problem may arise if the headers are larger than the first buffer because then it might need to reassemble a header line (if it crosses a buffer boundary which would be very likely if it actually exceeds the size).
IQDB is currently implemented as typed RSC, see Section 4.3.5.1. IQDB simply contains references to qss_sess_T, qss_ta_T, and qss_rcpts_T, see Section 4.7.3, it does not have its own data structures, it merely allows for a way to access the data via a key (SMTPS session, transaction, recipient identifier).
The API of IBDB is described in Section 3.13.4.3.
The current implementation requires more functions than described there. This is an outcome of the requirement to updata the various queues safely and in a transaction based manner. To achieve this, a list of change request can be maintained; the elements of this list have the following structure:
struct ibdb_req_S { unsigned int ibdb_req_type; int ibdb_req_status; sessta_id_T ibdb_req_ss_ta_id; /* SMTPS transaction id */ sm_str_P ibdb_req_addr_pa; /* MAIL/RCPT address */ cdb_id_P ibdb_req_cdb_id; unsigned int ibdb_req_nrcpts; rcpt_idx_T ibdb_req_rcpt_idx; /* RCPT index */ SIMPLEQ_ENTRY(ibdb_req_S) ibdb_req_link; };
The functions to deal with the request lists are:
sm_ret_T ibdb_rcpt_app(ibdb_ctx_P ibdb_ctx, ibdb_rcpt_P ibdb_rcpt, ibdb_req_hd_P ibdb_req_hd, int status) appends a recipient change request.
sm_ret_T ibdb_ta_app(ibdb_ctx_P ibdb_ctx, ibdb_ta_P ibdb_ta, ibdb_req_hd_P ibdb_req_hd, int status) appends a transaction change request.
sm_ret_T ibdb_req_cancel(ibdb_ctx_P ibdb_ctx, ibdb_req_hd_P ibdb_req_hd) cancel all the requests in the list.
sm_ret_T ibdb_wr_status(ibdb_ctx_P ibdb_ctx, ibdb_req_hd_P ibdb_req_hd) perform the status updates as specified by the request list.
The active queue currently uses the following structure as context:
struct aq_ctx_S { pthread_mutex_t aq_mutex; /* only one mutex for now */ unsigned int aq_limit; /* maximum number of entries */ unsigned int aq_entries; /* current number of entries */ unsigned int aq_t_da; /* entries being delivered */ aq_tas_T aq_tas; aq_rcpts_T aq_rcpts; };
For now we just use lists of aq_ta/aq_dta/aq_rcpt structures. Of course we need better access methods later on. Currently we will use FIFO as only scheduling strategy.
A recipient context consists of the following elements:
struct aq_rcpt_S { sessta_id_T aqr_ss_ta_id; /* ta id in SMTPS */ sessta_id_T aqr_da_ta_id; /* ta id in DA */ sm_str_P aqr_rcpt_pa; /* printable addr */ smtp_status_T aqr_status; /* status */ smtp_status_T aqr_status_new; /* new status */ unsigned int aqr_err_st; /* state which caused error */ unsigned int aqr_flags; /* flags */ rcpt_idx_T aqr_rcpt_idx; /* rcpt idx */ unsigned int aqr_tries; /* # of delivery attempts */ unsigned int aqr_da_idx; /* DA idx (kind of DA) */ /* SMTPC id (actually selected DA) */ int aqr_qsc_id; /* ** HACK! Need list of addresses. Do we only need IP addresses ** or do we need more (MX records, TTLs)? We need at least some ** kind of ordering, i.e., the priority. This is needed for the ** scheduler (if a domain has several MX records with the same ** priority, we can deliver to any of those, there's no order ** between them). Moreover, if we store this data in DEFEDB, ** we also need TTLs. */ /* ** Number of entries in address array. ** Should this be "int" instead and denote the maximum index, ** where -1 means: no entries? ** Currently the check for "is there another entry" is ** (aqr_addr_cur < aqr_addr_max - 1) ** i.e., valid entries are 0 to aqr_addr_max - 1. */ unsigned int aqr_addr_max; unsigned int aqr_addr_cur; /* cur idx in address array */ aq_raddr_T *aqr_addrs; /* array of addresses */ /* XXX Hack */ ipv4_T aqr_addr_fail; /* failed address */ /* address storage to use if memory allocaction failed */ aq_raddr_T aqr_addr_mf; time_T aqr_entered; /* entered into AQ */ time_T aqr_st_time; /* start time (rcvd) */ time_T aqr_last_try; /* last time scheduled */ /* next time to try (after it has been stored in DEFEDB) */ time_T aqr_next_try; /* Error message if delivery failed */ sm_str_P aqr_msg; /* ** Bounce recipient: stores list of recipient indices for which ** this is a bounce message. ** Note: if this is stored in DEFEDB, then the array doesn't need ** to be saved provided that the recipients are removed in the ** same (DB) transaction because the bounce recipient contains ** all necessary data for the DSN. If, however, the recipients ** are not removed "simultaneously, then it is a bid harder to ** get consistency because it isn't obvious for which recipients ** this bounce has been created. That data is only indirectly ** available through aqr_bounce_idx (see below). */ sm_str_P aqr_dsn_msg; unsigned int aqr_dsn_rcpts; unsigned int aqr_dsn_rcpts_max; rcpt_idx_T *aqr_dsns; /* array of rcpt indices */ /* ** rcpt idx for bounce: stores the rcpt_idx (> 0) if a bounce ** for this recipient is generated and being delivered. ** This is used as "semaphore" to avoid multiple bounces for ** the same recipient (needs to be stored in DEFEDB). */ rcpt_idx_T aqr_bounce_idx; /* linked list for AQ, currently this is the way to access all rcpts */ TAILQ_ENTRY(aq_rcpt_S) aqr_db_link; /* links */ /* ** Linked lists for: ** - SMTPS transaction: ** to find all recipients for the original transaction ** (to find out whether they can be delivered in the same ** transaction, i.e., same DA, + MX piggybacking) ** - DA transaction: ** to find the recipients that belong to one delivery attempt ** and update their status ** Link to ta: ** to update the recipient counter(s). */ sm_ring_T aqr_ss_link; sm_ring_T aqr_da_link; aq_ta_P aqr_ss_ta; /* transaction */ };
This structure contains linked lists for:
A reference to the transaction aqr_ss_ta is used to make it easier to update the recipient counter(s).
A transaction context has these elements:
struct aq_ta_S { /* XXX other times? */ time_T aqt_st_time; /* start time (received) */ aq_mail_P aqt_mail; /* mail from */ /* XXX only in aq_da_ta */ unsigned int aqt_rcpts; /* number of recipients in AQ */ unsigned int aqt_rcpts_ar; /* rcpts to receive from AR */ unsigned int aqt_rcpts_arf; /* #of entries with SMAR failure */ /* Number of recipients in DEFEDB */ unsigned int aqt_rcpts_tot; /* total number of recipients */ unsigned int aqt_rcpts_left; /* rcpts still to deliver */ unsigned int aqt_rcpts_temp; /* rcpts temp failed */ unsigned int aqt_rcpts_perm; /* rcpts perm failed */ unsigned int aqt_rcpts_tried; /* rcpts already tried */ rcpt_idx_T aqt_nxt_idx; /* next recipient index */ unsigned int aqt_state; unsigned int aqt_flags; /* ** rcpt idx for (double) bounce; when a bounce is needed a recipient ** struct is created, its rcpt_idx is this bounce_idx. ** It should be aqt_rcpts_tot (+1) when it is created; afterwards ** aqt_rcpts_tot is increased of course. */ rcpt_idx_T aqt_bounce_idx; rcpt_idx_T aqt_dbl_bounce_idx; sessta_id_T aqt_ss_ta_id; /* ta id in SMTPS */ cdb_id_P aqt_cdb_id; TAILQ_ENTRY(aq_ta_S) aqt_ta_l; /* links */ /* XXX add list of recipients? that makes lookups easier... see above */ };
The field aqt_ta_l links all transactions in the active queue together.
As it can be seen, there are many counters in the transaction context:
The number of recipients that encountered a temporary failure aqt_rcpts_temp does not reflect those recipients that are in DEFEDB. When a recipient is written to DEFEDB, the status flag indicating a temporary failure will not be saved. Hence when a recipient is tried again, the previous temporary failure is mostly ignored except for bookkeeping.
The current implementation (2003-01-01) of the main queue uses Berkeley DB 4.1 (proposal 2 from Section 3.13.6.1).
The main context looks like this:
typedef SIMPLEQ_HEAD(, edb_req_S) edb_req_hd_T, *edb_req_hd_P; struct edb_ctx_S { pthread_mutex_t edb_mutex; /* only one mutex for now */ unsigned int edb_entries; /* current number of entries */ edb_req_hd_T edb_reql_wr; /* request list (wr) */ edb_req_hd_T edb_reql_pool; DB_ENV *edb_bdbenv; DB *edb_bdb; /* Berkeley DB */ };
A change request has the following structure:
struct edb_req_S { unsigned int edb_req_type; smtp_id_T edb_req_id; sm_rcb_P edb_rcb; SIMPLEQ_ENTRY(edb_req_S) edb_req_link; };
The context maintains two lists of requests: a pool of free request list entries (edb_reql_pool) for reuse, and a list of change requests (edb_reql_wr) that are committed to disk when edb_wr_status(edb_ctx_P edb_ctx) is called.
A request itself contains a type (TA or RCPT), and identifier (which is used as key) and a RCB that stores the appropriate context in encoding form as defined by the RCB format. The type can actually be more than just transaction or recipient, it can also denote that the entry matching the identifier should be removed from the DB.
The data structures for transactions (mail sender) and recipients are shared with the active queue, see Section 4.10.3.
See Section 3.13.6 for the DEFEDB API, here's the current implementation:
Two functions are available to open and close a DEFEDB:
The functions which take a parameter edb_req_hd_P edb_req_hd will use that argument as the head of the request list unless it is NULL, in which case the write request list edb_reql_wr of the context will be used.
To retrieve an entry from the DEFEDB one function is provided:
To decode the RCB retrieved via edb_rd_req() and fill out an active queue context of the correct type the following two functions are available:
To read through a DEFEDB these four functions are provided:
To remove a transaction or a recipient from the DB (directly) use:
To add a request to remove a transaction or a recipient from the DB use:
and commit that request later on with edb_wr_status().
Internal functions to manage a request entry or list are:
This section describes the implementation of various programs.
How to efficiently perform IBDB cleanup?
Try to minimize the amount of data to clean up. This can be done by performing rollovers at an appropriate moment, i.e., when the number of outstanding transactions and recipients is zero. This is probably only possible for low-volume sites. If those two values are zero, then all preceeding files can be removed.
Read an IBDB file and create a new one that has only the open transactions/recipients in there? Leave ``holes'' in the sequence, e.g., use 0x1-0xf and leave 0 free for ``cleaning'', i.e., read 0x1-0xf and then write all the open transactions into 0. Problem: what to do with repeated passes?
How about different names (extensions) instead?
It might be possible to ignore logfiles that are older than the transactional timeout. Those logfiles can only contain data about transaction that have been either completed or have timed out. Neither of these are of interest for the reconstruction of the queue. Does this mean a very simple cleanup process is possible which simply removes old logfiles? This minimizes the amount of effort during runtime at the expense of diskspace and startup after an unclean shutdown. For the first sendmail X version this might be a ``good enough'' solution.
Section 3.11.1 posed the question: how accurate must the output be? Here is a closer look at the problem: using Berkeley DB for the implementation of the deferred queue poses some problems because there is no ``read-only'' access to the content, at least not in the current implementation: the function to open a DB environment offers no read-only option. Obviously it is a bad idea to open the environment read/write just to read the queue: the program should not make any modifications to the DB; sendmail X is designed such that there is a single writer for it only, all locking is done inside QMGR, nothing relies on locking within Berkeley DB (this is done to avoid the DB corruption problems that may occur otherwise, especially if some other program crashes while accessing the DB). However, there is a trick that gives us read-only access: open the DB directly without opening the environment. In this case we can make the DB group readable, which should be used to avoid any type of corruption. Even though this looks like a nice solution to the main problem, it opens up another one: the DB may not contain up-to-date information. Berkeley DB keeps a log file (if used in transaction mode) and hence the DB itself might have old (and even inconsistent) data.
An alternative approach is to query QMGR about the content of the deferred queue. This allows us to keep our single writer (and reader), but it causes other problems, e.g., locking the DB for quite some time and hence causing performance problems.
Note: a fairly accurate state of the mail queue content can be gathered from CDB and the logfile: the CDB file names are the SMTP server transaction ids which can be looked up in the logfile to find the sender and recipients. To find out which recipients have not yet been delivered, the logfile must be scanned for that data. This can be time consuming but it is less intrusive.
As requested in Section 1.1.3.5.1 there are many test programs which can as usual be invoked by make check.
Some of them are included in the directories for which they perform tests, e.g., smar and libdns, but most of them are in the directories checks, check2, and chkmts. There are several ``check'' directories because of some length restrictions on AIX (2048 bytes line length for some utilities used by the autoconf tools).
The test programs range from very simple: testing a few library functions (``unit tests''), e.g., string related, to those which test (aspects of) the entire MTS.
This section describes some of the problems that have been encountered in the implementation of sendmail X. There are two different kind of problems:
Implementing sendmail X as a few modules that are required to run under (almost) all circumstances turned out to be significantly more complicated than expected.
The most complicated problem is to deal with out-of-memory errors. Note: proper recovery from this is still an open problem. This is even more so a problem because some OS allow to ``overcommit'' memory. As soon as overcommitted memory is actually used and the need can not be satisfied, the OS ``randomly'' kills a process to allow progress of the entire system. Even though the result of every memory allocation call in sendmail X is carefully checked and error handlers are invoked if the call returns NULL, these efforts may be useless on those OSs.
Dealing with memory shortage is also severely hampered by the non-availability of a (standard) function to actually determine the current memory usage of a process. The function getrusage(2) returns a structure that has some fields which can contain information about the memory usage, but some OSs do not put valid data into those fields, and there is only one field (ru_maxrss) that is related to memory usage4.7. It might be possible that the memory allocation functions try to keep track of the amount of used memory themselves, but that requires that the free() function receives an additional argument which denotes the size of the memory segment, otherwise the memory allocation module needs to keep track of that information itself which is a waste of memory and programming effort. Moreover, due to internal fragmentation the value might not be correct anyway.
Another problem is transaction based processing, see also Section 4.1.3. It turns out to be non-trivial to implement this, i.e., a function must either successfully complete all of its operations or none of them, especially if multiple functions are involved in a sequence, because this means that those functions need to have some kind of ``undo'' counterparts which can be invoked in case of errors.
The communication between the various modules of sendmail X turns out to be a problem due to the latency. The most significant example of this is the interaction of the scheduler and the delivery agents. The scheduler needs to keep track of the number of open (outgoing) connections to implement the slow start algorithm and to obey limits set by the user. However, some tests show that the actual number of open connections to a client and the data that the scheduler has can be significantly different. For example, in some tests the scheduler reached an upper limit of 256 open connections while the sink program only listed 2 open connections, while SMTPC itself had some (low) number of busy threads, e.g., 4 to 16. There are two different problems:
One reason for these disagreements is that the messages are very small, hence it takes a very short time to deliver them (in a lab environment). This can be demonstrated by using larger messages sizes (the default is less than 1KB): for a message size mix of 32KB and 64KB the number of concurrent connection reaches the limit in the sink.
About item 2: Here is what seems to happen: as long as the concurrency limit (in qmgr) is not reached, the scheduler sends task to SMTPC; it stops only when the limit is reached or no more entries are in AQ (for scheduling). The scheduler has a lock on AQ hence no updates from SMTPC can be done. When the scheduler releases the lock results from SMTPC can be read. The scheduler is activated as soon as a result comes back from SMTPC, it then immediately schedules more entries (just one because the limit is reached) and the ping-pong (one result, one entry scheduled) continues.