19 July 2019
To support JMS specification Wildfly uses Apache ActiveMQ Artemis over active-mq subsystem. Last one provides mechanism to filtering out duplicate messages without application code changes.
To enable duplicate message detection you just need to set a special property on the message to a unique value
message.setStringProperty("_AMQ_DUPL_ID", uniqueId);
So, lets see how it works on practice and create simple Message Driven Bean to consume messages:
@JMSDestinationDefinition(
name = DuplicateJMSTestBean.DUPLICATE_QUEUE,
interfaceName = "javax.jms.Queue"
)
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = DuplicateJMSTestBean.DUPLICATE_QUEUE),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class DuplicateJMSTestBean implements MessageListener {
public final static String DUPLICATE_QUEUE = "java:global/jms/duplicateTestQueue";
@Override
public void onMessage(Message msg) {
System.out.println("Got new message.");
MessageStorage.messages.add(msg);
try {
Thread.sleep(5_000l);
} catch(Exception ignore) {}
System.out.println("Message successfully processed");
}
}
And simple JAX-RS endpoint to produce messages
@Path("/")
@Stateless
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class DuplicateTestEndpoint {
@Inject
private JMSContext context;
@Resource(lookup = DuplicateJMSTestBean.DUPLICATE_QUEUE)
private Queue queue;
@GET
@Path("/sendmessage")
public Response sendMessage(@QueryParam("duplicate-id") String duplicateId) {
try {
ObjectMessage message = context.createObjectMessage();
if (duplicateId == null) {
context.createProducer().send(queue, message);
} else {
message.setStringProperty("_AMQ_DUPL_ID", duplicateId);
context.createProducer().send(queue, message);
}
return Response.ok().entity("Message was sent. Recieved " + MessageStorage.messages.size() + " messagges: " + MessageStorage.messages).build();
} catch (Throwable e) {
return Response.ok().entity("Error: " + e).build();
}
}
}
Now in case we send message with the same _AMQ_DUPL_ID
without transaction by http://127.0.0.1:8080/jms-examples/sendmessage?duplicate-id=myuniqueid
we will get in logs:
WARN [org.apache.activemq.artemis.core.server] (Thread-448 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$5@e47887a)) AMQ222059: Duplicate message detected - message will not be routed. Message information:
CoreMessage[messageID=1505,durable=true,userID=3d27afde-a9fa-11e9-af5d-0242e352ec80,priority=4, timestamp=Fri Jul 19 10:53:04 EEST 2019,expiration=0, durable=true, address=jms.queue.jms-examples_jms-examples_jms-examples_java:global/jms/duplicateTestQueue,size=416,properties=TypedProperties[__AMQ_CID=38587489-a9fa-11e9-af5d-0242e352ec80,_AMQ_DUPL_ID=myuniqueid,_AMQ_ROUTING_TYPE=1]]@145077408
and message will NOT consume by consumer. In case you send message in transaction - you will get Exception
on commit.
Keep in mind, that to store IDs activemq uses circular fixed size cache
/subsystem=messaging-activemq/server=default:read-attribute(name=id-cache-size)
{
"outcome" => "success",
"result" => 20000
}
so, this value should be big enough to avoid rewriting. Also, you can configure persist cache or not (by default: true
)
/subsystem=messaging-activemq/server=default:write-attribute(name=persist-id-cache,value=false)
Source code available on GitHub