Message Queue
Classes
- AWSReader
Options : { type : 'aws', topic :
, queue : , region : , credentials : } - Message
The message abstracts away the transport's acknowledge and no-acknowledge, as well as handling routing unpacking
- MessageProcessor
Creates a Message processor. Thes will supply some auto acknowledge logic and default routing to other queues.
options: autoAck: ['onReceipt'|'afterProcess'|'none'] If 'onReceipt', Message.ack() will be called immediately on receipt of the message If 'afterProcess', Message.ack() will be called immediately after processing of the message If 'none', Message.ack() will not be called, and the processor must handle it. ackOnError: [true|false] Only used if autoAck === 'afterProcess' If true, Message.ack() will be called on process error else, Message.noAck() will be called on process error- NsqReader
Options : { type : 'nsq', nsqd : [ ..nsqOptions... ] topic :
channel : maxInFlight : processor : <function( msg, reader )> | - PubSubReader
Options : { type : 'pubsub', topic :
channel : projectId : , keyFilename : , subscriptionOptions : { } // see https://cloud.google.com/nodejs/docs/reference/pubsub/0.16.x/global#CreateSubscriptionRequest } - Reader
The base class for a Reader. This will manage processing a message via the processor function or object. Normally options.processor will be an object that extends MessageProcessor. Subclasses should override onStart() to connect to their transport and onStop() to disconnect
Functions
- routeFor(conditionRouteModel, type, object)
Given a condition route model and a type, the model's rules will be evaluated to find a particular route for that type.
- createWriter()
Options nsqd : [ ...nsqd.... ]
- routeTargets()
A Route has the form: route := <topic_name> | <route_array> |
| route_array := [ , , ...] topic_name := topic = { topic : <topic_name>, options : routeTargets() will return an array containing topic objects or null, if there are no routes.
- createRouteMsg()
Creates a RouteMsg if either the options or the route is not null. Otherwise the message will be returned
AWSReader
Options :
{
type : 'aws',
topic :
Message
The message abstracts away the transport's acknowledge and no-acknowledge, as well as handling routing unpacking
Kind: global class
message.rawMsg()
Returns the raw native message that that transport delivered
Kind: instance method of Message
message.json()
Returns a json representation of the data
Kind: instance method of Message
message.raw()
Returns the raw message data, often the same as json()
Kind: instance method of Message
message.getType()
Returns the type or Message being routed or json().type if this is not a route message
Kind: instance method of Message
message.getMessage()
Returns the Message being routed or json() if this is not a route message
Kind: instance method of Message
message.isRouteMessage()
Returns true if the message type is ROUTE_MSG_TYPE
Kind: instance method of Message
message.getRouteOptions()
If this is a Route Message, this will return any optional options associated with the message
Kind: instance method of Message
message.getNextRoutes()
Returns an array of the next routes to deliver the data on or null if none exist or this is not a route message
Kind: instance method of Message
message.getReceived()
The time the message was created.
Kind: instance method of Message
message.getTimestamp()
If possible, subclasses should overload this.
Kind: instance method of Message
MessageProcessor
Creates a Message processor. Thes will supply some auto acknowledge logic
and default routing to other queues.
options:
autoAck: ['onReceipt'|'afterProcess'|'none']
If 'onReceipt', Message.ack() will be called immediately on receipt of the message
If 'afterProcess', Message.ack() will be called immediately after processing of the message
If 'none', Message.ack() will not be called, and the processor must handle it.
ackOnError: [true|false]
Only used if autoAck === 'afterProcess'
If true, Message.ack() will be called on process error
else, Message.noAck() will be called on process error
Kind: global class
Promise
messageProcessor.onMessage(msg) ⇒ Invoked when the reader has a new message to process.
Kind: instance method of MessageProcessor
Returns: Promise
- a Promise that will resolve with an object
{ processed : msg, result :
Param | Type | Description |
---|---|---|
msg | Message | a Message object |
messageProcessor.route()
This will route the message to the writers as specified by the Message object and forward. Currently, this is fire and forget. If forward is false, no routing occurs. If forward is null, message.getMessage() is routed to the route specifed in the message. If forward is an object, it is routed as specified in message (in lieu of message.getMessage() )
Kind: instance method of MessageProcessor
messageProcessor.process(message, options) ⇒
This method should operate on the message and return a promise. This is invoked from onMessage(), which will route based on the return. The message is an a protocol specific Message object. Use Message.getMessage() to get the actual data to process.
Kind: instance method of MessageProcessor
Returns: a Promise that completes with either null, false, or a replacement message to
forward. If false is returned in the promise, all forward routing is stopped. If null
is returned, the current message will be forwarded. If an object is returned, that
object will be forwarded inplace of the current message. To send messages to other routes,
call this.writer.route() or this.writer.publish()
Param | Type | Description |
---|---|---|
message | Message | the Message object. |
options | object | the route options if any were present. This is message.getRoutingOptions() |
NsqReader
Options :
{
type : 'nsq',
nsqd : [ ..nsqOptions... ]
topic :
PubSubReader
Options :
{
type : 'pubsub',
topic :
Reader
The base class for a Reader. This will manage processing a message via the processor function or object. Normally options.processor will be an object that extends MessageProcessor. Subclasses should override onStart() to connect to their transport and onStop() to disconnect
Kind: global class
- Reader
- .onMessage
- .onExit
- .start()
- .onStart() ⇒
Promise.<this>
- .stop()
- .onStop()
- .onProcessStarted()
- .onProcessComplete()
reader.onMessage
Routes the message to the processor
Kind: instance property of Reader
reader.onExit
This is used to manage a graceful exit. Do not invoke.
Kind: instance property of Reader
reader.start()
This should be called to start the reader. Returns a Promise indicating when the reader has been initialized
Kind: instance method of Reader
Promise.<this>
reader.onStart() ⇒ Returns a Promise indicating when the reader has been initialized. Subclasses should override this method. This should not be called directly. Use start()
Kind: instance method of Reader
reader.stop()
Stops the reader from receiving and processing messages from its connection
Kind: instance method of Reader
reader.onStop()
Called to let subclasses shutdown their connections. This should not be called directly. Use stop()
Kind: instance method of Reader
reader.onProcessStarted()
Called at the beginning of onMessage() to increment the number of active processing threads.
Kind: instance method of Reader
reader.onProcessComplete()
Called at the end of onMessage() to decrement the number of active processing threads. If this reaches zero and the system is exiting, the exit promise resolved
Kind: instance method of Reader
routeFor(conditionRouteModel, type, object)
Given a condition route model and a type, the model's rules will be evaluated to find a particular route for that type.
Kind: global function
Param | Description |
---|---|
conditionRouteModel | the model to inspect |
type | |
object |
createWriter()
Options nsqd : [ ...nsqd.... ]
routeTargets()
A Route has the form:
route := <topic_name> | <route_array> |
routeTargets() will return an array containing topic objects or null, if there are no routes.
createRouteMsg()
Creates a RouteMsg if either the options or the route is not null. Otherwise the message will be returned
Kind: global function