Genivia Home Documentation
The mq plugin for inbound message queueing and message replay

updated Thu Mar 21 2024 by Robert van Engelen
 
The mq plugin for inbound message queueing and message replay

The inbound message queueing plugin can be used to queue messages that should not be discarded with the WS-RM protocol's NoDiscard behavior. Messages that are out of sequence as per WS-RM protocol and should be handled by one thread (or a thread pool) should be queued for later replay and service operation invocation. If an unlimited number of threads is available, the simplest WS-RM protocol NoDiscard behavior is implemented by starting a thread for each inbound message and letting the thread block with the soap_wsrm_check_and_wait() or soap_wsrm_check_send_empty_response_and_wait() calls. However, that approach is not efficient with HTTP keep-alive because the next messages on the keep-alive socket will be blocked from being processed. This plugin is designed to process messages on an HTTP keep-alive socket even when operations block.

Server-Side Queueing of One-Way Messages

Queueing one-way messages for internal replay is implemented with the message queueing plugin as follows, by queueing inbound messages received on a single socket and then replaying them all in sequence as received from the socket:

#include "mq.h"
int main()
{
struct soap *soap = soap_new1(SOAP_IO_KEEPALIVE);
soap_register_plugin(soap, soap_mq);
...
// initializations, port bind etc.
...
while (soap_valid_socket(soap_accept(soap)))
{
// queue all messages on this socket (socket is HTTP keep alive)
// for each message received, we immediately respond with HTTP 202 Accepted
struct ms_queue *queue = soap_mq_queue(soap);
struct ms_msg *msg;
while (soap_mq_get(soap, queue))
soap_send_empty_response(soap, 202); // 202 Accepted
// we now internally replay all messages to invoke services
// services are assumed to NOT send a response message back
// i.e. one-way operations
for (msg = soap_mq_begin(queue); msg; msg = soap_mq_next(msg))
soap_serve(&msg->soap);
// delete all queued messages, this also calls the following functions on each queued msg context:
// soap_destroy(&msg->soap);
// soap_end(&msg->soap);
// soap_done(&msg->soap);
soap_mq_del(queue, NULL);
// delete the queue (allocated in current context)
soap_destroy(soap);
soap_end(soap);
}
...
// finalize
...
soap_free(soap);
}

Alternatively, it is also possible to call soap_mq_del(queue, msg) after soap_serve(&msg->soap) to immediately delete the message after processing. Calling soap_mq_next(msg) for the next loop iteration is still valid, of course.

WS-RM Server-Side Message Queueing for NoDiscard Behavior with Callback Services

When messages are controlled by the WS-ReliableMessaging protocol, we can keep the WS-RM messages in a queue that were received out of order until the order is restored and queued messages can be dispatched. This WS-RM behavior is desirable with WS-RM NoDiscard. To implement this approach, we use an inbound message queue for each socket accepted and processed by a thread.

#include "wsaapi.h"
#include "wsrmapi.h"
#include "mq.h"
#include "threads.h"
int main()
{
struct soap *soap = soap_new1(SOAP_IO_KEEPALIVE);
soap_register_plugin(soap, soap_wsa);
soap_register_plugin(soap, soap_wsrm);
soap_register_plugin(soap, soap_mq);
...
// initializations, port bind etc.
...
while (soap_valid_socket(soap_accept(soap)))
{
THREAD_TYPE tid;
struct soap *tsoap = soap_copy(soap);
if (!tsoap)
soap_closesock(soap);
else
while (THREAD_CREATE(&tid, (void*(*)(void*))process_request, (void*)tsoap))
sleep(1);
}
...
// finalize
...
soap_free(soap);
}
void *process_request(void *tsoap)
{
struct soap *soap = (struct soap*)tsoap;
struct ms_queue *queue = soap_mq_queue(soap);
struct ms_msg *msg;
struct soap ctx;
while ((msg = soap_mq_get(soap, queue)) != NULL)
{
// parse the message headers, if NoDiscard then keep message in queue to retry later
// copy the context, since we want to preserve the original to retry later
soap_copy_context(&ctx, &msg->soap);
if (soap_begin_serve(&ctx))
{
soap_send_fault(&ctx); // send fault, close socket
soap_mq_del(queue, msg); // delete message from queue
}
else if (!ctx.header || !ctx.header->wsrm__Sequence)
{
// this is not a WS-RM message, so serve immediately
soap_serve(&msg->soap); // service operations
soap_mq_del(queue, msg); // delete message from queue
}
else if (!soap_wsrm_check(&ctx))
{
// check is OK, process this WS-RM message now
soap_serve(&msg->soap); // service operations SHOULD NOT call soap_wsrm_check()
soap_mq_del(queue, msg); // delete message from queue
}
else if (ctx.error != SOAP_STOP)
{
// check failed, not a WS-RM message or other WS-RM error
soap_send_fault(&ctx); // send fault, close socket
soap_mq_del(queue, msg); // delete message from queue
}
soap_destroy(&ctx);
soap_end(&ctx);
soap_done(&ctx);
}
// as long as the queue is not empty and WS-RM sequence(s) not terminated, keep trying
while ((msg = soap_mq_begin(queue)) != NULL)
{
// process queued WS-RM messages
for (; msg != NULL; msg = soap_mq_next(msg))
{
// try next message in queue
soap_copy_context(&ctx, &msg->soap);
if (!soap_begin_serve(&ctx) && !soap_wsrm_check(&ctx))
{
// check is OK, process message
soap_serve(&msg->soap);
soap_mq_del(queue, msg);
}
else if (ctx.error != SOAP_STOP)
soap_mq_del(queue, msg);
soap_destroy(&ctx);
soap_end(&ctx);
soap_done(&ctx);
}
sleep(1); // sleep some before around we go again
}
return NULL;
}

In the first loop that runs over the messages received on the same keep-alive socket, the messages will be processed and services dispatched immediately for non-WS-RM messages and when the WS-RM check succeeds. This check is done in the server dispatch loop as shown, which means that WS-RM-based service operations SHOULD NOT call soap_wsrm_check() again. WS-RM messages that cannot be processed yet since they are out of the sequence order will remain in the queue.

The second loop over the queued messages will retry to dispatch service operations according to the WS-RM message order as required by WS-RM NoDiscard sequence behavior. The loop will run until the queue is empty or when the WS-RM sequences are closed/terminated.