19 июля 2019

Для поддержки JMS спецификация Wildfly использует Apache ActiveMQ Artemis в подсистеме active-mq. Последнее предоставляет механизм фильтрации повторяющихся сообщений без изменения кода приложения.

Чтобы включить обнаружение повторяющихся сообщений, вам просто нужно установить специальное свойтсво сообщения с уникальным значением.

message.setStringProperty("_AMQ_DUPL_ID", uniqueId);

Итак, давайте посмотрим, как это работает на практике и создадим простой Message Driven Bean для использования сообщений:

@JMSDestinationDefinition(
        name = DuplicateJMSTestBean.DUPLICATE_QUEUE,
        interfaceName = "javax.jms.Queue"
)
@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = DuplicateJMSTestBean.DUPLICATE_QUEUE),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class DuplicateJMSTestBean implements MessageListener {

    public final static String DUPLICATE_QUEUE = "java:global/jms/duplicateTestQueue";

    @Override
    public void onMessage(Message msg) {
        System.out.println("Got new message.");
        MessageStorage.messages.add(msg);
        try {
            Thread.sleep(5_000l);
        } catch(Exception ignore) {}
        System.out.println("Message  successfully processed");
    }
}

И простую конечную точку JAX-RS для создания сообщений

@Path("/")
@Stateless
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class DuplicateTestEndpoint {

    @Inject
    private JMSContext context;
    @Resource(lookup = DuplicateJMSTestBean.DUPLICATE_QUEUE)
    private Queue queue;

    @GET
    @Path("/sendmessage")
    public Response sendMessage(@QueryParam("duplicate-id") String duplicateId) {
        try {
            ObjectMessage message = context.createObjectMessage();
            if (duplicateId == null) {
                context.createProducer().send(queue, message);
            } else {
                message.setStringProperty("_AMQ_DUPL_ID", duplicateId);
                context.createProducer().send(queue, message);
            }
            return Response.ok().entity("Message was sent.  Recieved " + MessageStorage.messages.size() + " messagges: " + MessageStorage.messages).build();
        } catch (Throwable e) {
            return Response.ok().entity("Error: " + e).build();
        }
    }
}

Теперь в случае, если мы отправим сообщение с тем же _AMQ_DUPL_ID без транзакции по адресу http://127.0.0.1:8080/jms-examples/sendmessage?duplicate-id=myuniqueid, мы получим в логах:

WARN [org.apache.activemq.artemis.core.server] (Thread-448 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$5@e47887a)) AMQ222059: Duplicate message detected - message will not be routed. Message information:
CoreMessage[messageID=1505,durable=true,userID=3d27afde-a9fa-11e9-af5d-0242e352ec80,priority=4, timestamp=Fri Jul 19 10:53:04 EEST 2019,expiration=0, durable=true, address=jms.queue.jms-examples_jms-examples_jms-examples_java:global/jms/duplicateTestQueue,size=416,properties=TypedProperties[__AMQ_CID=38587489-a9fa-11e9-af5d-0242e352ec80,_AMQ_DUPL_ID=myuniqueid,_AMQ_ROUTING_TYPE=1]]@145077408

и сообщение НЕ будет потребляться потребителем. Если вы отправите сообщение в транзакции - вы получите Исключение при коммите.

Имейте в виду, что для хранения идентификаторов activemq, используют круговой кэш фиксированого размера.

/subsystem=messaging-activemq/server=default:read-attribute(name=id-cache-size)
{
    "outcome" => "success",
    "result" => 20000
}

Поэтому, это значение должно быть достаточно большими для того, чтобы избежать перезаписи. Также, вы можете сконфигурировать постоянный кэш или этого не делать (по умолчанию: true)

/subsystem=messaging-activemq/server=default:write-attribute(name=persist-id-cache,value=false)

Исходный код, доступен на GitHub