AMQP Publisher/Subscriber

The publisher/subscriber subsystem in Orb is configurable to either use an AMQP message broker or an in-memory implementation. (The in-memory implementation should only be used during development and for demos.)

The AMQP URL is specified by the startup parameter mq-url. If this parameter is not set then the in-memory implementation is used.

Publisher and Subscriber

A subscriber is a handler for a message posted to a queue. On startup, each Orb instance subscribes to various queues. The queues are global to the domain, i.e. each Orb instance subscribes to the same queues.

A publisher posts messages to the queues. The posted message is handled by one (and only one) of the subscribers in one of the Orb instances. If the handler replies with an ack, then the message is considered to be processed.

It is up to the AMQP implementation to direct messages to subscribers using a load-balancing algorithm.

../../_images/mq-pubsub.svg

Message Redelivery

When a message is published to a queue, one of the subscribers in the Orb domain handles the message. If a processing error occurs (such as the DB is temporarily unavailable) then the handler replies with a nack. In this case the message is sent to a orb.redelivery queue so that it may be retried at a later time.

The orb.redelivery queue is configured as the dead-letter-queue for all queues in Orb. When a message is rejected (nacked) by a subscriber, it is automatically sent to the orb.redelivery queue. The first time a message is rejected, the redelivery handler immediately redelivers the message to the original destination queue. If the message is rejected again, an expiration header value is set on the message, and the message is posted to a orb.wait queue. The expiration value is calculated by a backoff algorithm using the following parameters:

  1. mq-redelivery-initial-interval

  2. mq-redelivery-multiplier

  3. mq-redelivery-max-interval

The backoff algorithm increases the expiration with each redelivery attempt. For example, if the initial interval is set to 2s and the multiplier is set to 1.5 then the expiration is set 3s. The next time a redelivery of the message occurs, the expiration will be set to 4.5s. Expiration time is limited by parameter mq-redelivery-max-interval.

The orb.wait queue has no subscribers, so the message sits there until it expires. The orb.redelivery queue is also configured as the dead-letter-queue for the orb.wait queue, so when the message expires it is automatically sent back to the orb.redelivery queue and the redelivery handler processes the message again.

The redelivery handler looks at the reason field in the message header. If reason is set to ‘expired’ then the message is posted to the original destination queue, otherwise (if reason is ‘rejected’) it is posted to the orb.wait queue with an even greater expiration value. This process repeats until the maximum number of redelivery attempts has been reached, at which point redelivery for the message is aborted.

../../_images/mq-pubsub-redeliver.svg

Publisher Pool

The Publisher publishes messages over an AMQP channel to an AMQP server. A single publisher channel publishes requests synchronously and therefore (for performance reasons) a pool of channels is used so that requests may be published concurrently. The channels in a pool are opened at startup over a single connection, and are reused over the lifetime of the server (since there is a performance penalty for creating and closing channels). The size of the channel pool is specified by parameter mq-publisher-channel-pool-size. If the value of this paramter is zero then a publisher pool is not used and a channel is opened/closed for each publish request.

An AMQP server may have a limit to the number of channels that can be opened for a single connection. This limit is specified by parameter mq-max-connection-channels. If the value of mq-publisher-channel-pool-size is greater than the value of mq-max-connection-channels then multiple publisher pools are created (each with its own dedicated connection) and the requests are balanced across the pools.

Subscriber Pool

Each AMQP subscription is handled synchronously. If the handler takes a long time then subsequent messages in the queue need to wait until the previous message is processed. A subscriber pool may be configured for a given queue such that multiple subscribers concurrently process messages from the same queue. This setting is available for the following queues:

  1. op-queue-pool

  2. mq-observer-pool

Typically, all subscriber channels are created on the same AMQP connection, although an AMQP server may have a limit to the number of channels that can be opened for a single connection. Therefore, the limit for the number of channels for a single connection is specified by parameter mq-max-connection-channels. If the size of the subscriber pool reaches this limit then a new connection is automatically opened for any new subscriber channel.

Queues

When an Orb instance starts up it creates a number of queues (if they aren’t already created) and subscribes to receive messages from these queues. Following is a description of the queues:

orb.activity.outbox

ActivityPub activities posted to the outbox are published to the orb.activity.outbox queue which is consumed by the outbox handler.

orb.activity.inbox

ActivityPub activities posted to the inbox are published to the orb.activity.inbox queue which is consumed by the inbox handler.

orb.operation

Sidetree DID operations posted to the operations endpoint are published to the orb.operation queue which is consumed by the operation queue handler.

orb.anchor_linkset

The witness proof handler publishes anchor linksets to the orb.anchor_linkset queue which is consumed by the anchor linkset handler.

orb.anchor

The anchor linkset handler publishes messages to the orb.anchor queue which is consumed by the observer.

orb.did

DID messages are published to the orb.did queue which is consumed by the observer.

orb.redelivery

A message that has been NACK’ed is published to the orb.redelivery queue so that it may be redelivered.

orb.wait

A message that has been NACK’ed and has a backoff time is published to the orb.wait queue. The message sits in this queue for the duration of the specified backoff time, then it is automatically sent to the orb.redeivery queue for redelivery.