Messaging e Filas
O que é um Message-Driven Bean (MDB)?
- Um bean que permite processar mensagens assincronamente
- Age como "Java Message Service (JMS) message listener"
- Como um listener de eventos mas recebe mensagens JMS
- Um JMS provider (sistema de mensagem) deve ser provido pelo servidor J2EE para tratar
das mensagens
- Quem pode enviar mensagens?
- Qualquer aplicação mesmo que não use J2EE
- Qualquer componente J2EE (componente de aplicação, EJB, componente Web)
- Qualquer aplicação JMS
- O modelo de comunicação
- Cada mensagem é enviada para uma fila específica
- Clientes receptores extraem mensagens das filas estabelecidas para armazenar suas
mensagens
- Filas retêm as mensagens até a mensagem ser consumida ou até que caduque
Point-to-point messaging
- Cada mensagem tem um único consumidor
- Não há dependência de timing entre produtor e consumidor
- O outro lado pode nem estar executando quando um lado produz ou consome
- O receptor reconhece o processamento bem sucedido de uma mensagem

Publish-subscribe messaging
- Cada mensagem pode ter vários consumidores
- Publishers e subscribers têm dependência de timing
- Um cliente que se cadastra para um tópico só pode consumir mensagens publicadas depois
da criação do "subscription" (assinatura)

Consumo de mensagens
- Messaging é inerentemente assincrono
- Produtor e consumidor estão operando assincronamente
- Porém, do ponto de vista do consumidor, há duas formas de receber mensagens e as
palavras "sincronamente" e "assincronamente" são usadas
- Pede uma mensagem e bloquea até a mensagem chegar
- Cadastra um listener cujo método onMessage será chamado quando uma mensagem chegar
Como MDBs são diferentes de Entity beans ou Session beans?
- Clientes não acessam MDBs através de interfaces
- MDBs são possuem a classe do bean
- De várias formas, um MDB parece um stateless session bean
- Instâncias de MDBs não podem reter estado
- Todas as instâncias são equivalentes
- O container pode usar um pool de MDBs
- Um MDB pode processar mensagens de vários clientes
- Algum estado pode ser mantido pelo MDB (conexão JMS, conexão de banco de dados, etc.)
mas não pode ser específico aos clientes
- Clientes não localizam MDBs para depois chamar métodos
- Um cliente envia mensagens para o destino da mensagem e JMS se encarrega de ativar o
listener
- O destino de um MDBs é atribuido durante o deployment
- Características de MDBs
- Executam quando recebem uma mensagem
- São invocadas assincronamente
- Normalmente têm curto tempo de vida
- Não representam dados compartilhados num BD mas podem prover acesso a esses dados
- Podem usar transações
- Se a transação sofrer rollback, a mensagem será reprocessada
- São stateless
Quando usar MDBs
- Só se pode usar messaging assíncrono quando o chamador não precisa de uma resposta
imediata
- Um método que retorna algo não é candidato para messaging
- Um método que lança uma exceção da aplicação não é candidato para messaging
- Supondo que essas condições se apliquem, eis alguns motivos possíveis:
- Para melhorar a percepção que o cliente tem do tempo de resposta (blocking demora mais
do que assincronia)
- Porém, lembre que a carga total será maior devido ao overhead de menssaging
- Quando a lógica de negócio pede que a resposta seja entregue de forma indireta e não
como resposta direta de um método
- Para realizar operações lentas
- Para realizar operações em sistemas que poderão estar fora do ar num certo momento
- Não há necessidade de todos os subsistemas estarem no ar ao mesmo tempo
- Importante ao cruzar domínios administrativos (integrar aplicações de várias
empresas)
- Quando a ordem de processamento não importa
- Quando a ordem importa, blocking é necessário
- Para realizar bom balanceamento de carga
- Devido ao enfileiramento, o processamento da carga pode ser espaçado no tempo
- Para realizar acoplamento frouxo entre componentes
- Mas lembre que interfaces normais de java podem desacoplar bem
- Para integrar aplicações que exponham interfaces baseadas em mensagens
- Para criar threads de forma indireta
- EJB não permite que você crie threads diretamente
- Uma grande desvantagem
- É muito mais difícil depurar código assíncrono do que código síncrono
Um Exemplo
O problema
- Vamos discutir uma aplicação que informa o melhor preço e disponib ilidade de um item
ao pesquisar vários fornecedores
- Chamamos a aplicação de Price Buster e vamos ver várias implementações
- MDBs vão permitir fazer processamento paralelo de 1 pedido
A necessidade de processamento concorrente
- A aplicação recebe um nome ou número de produto e pesquisa o preço e disponibilidade
em vários fornecedores e formata o resultado
- A implementação ideal pesquisa muitos fornecedores no menor tempo possível
- Se demorar 15 segundos para pesquisar um fornecedor, então pesquisar 10 fornecedores
sequencialmente vai demorar 150 segundos, tempo demais
- Devemos portanto usar paralelismo
EJB e o processamento concorrente
- Ver arquitetura típica na figura abaixo
- Podemos usar multithreading para implementar a concorrência
- Podemos adicionar concorrência das seguintes formas
- O módulo Web cria vários threads na JVM que roda os servlets, um thread por fornecedor
- O servlet pode esperar um tempo limite ou esperar que todos os threads terminem
- O problema é que isso deixa o processamento de threads com o desenvolvedor que deveria
estar se concentrando em business logic
- O componente EJB de pesquisa pode criar vários threads
- A especificação EJB proibe isso!
- Motivo: atrapalharia o container que gerencia os threads
- Implementa o componente de pesquisa como MDB
- Várias mensagens são enviadas em paralelo (os threads não bloqueam esperando
resposta)
- Os fornecedores processarão as mensagens em paralelo
- A melhor solução é 3 (messaging)
Concorrência num ambiente EJB usando MDBs
- Na implementação que discutiremos, os componentes EJB são MDBs
- São invocados na recepção de uma mensagem
- Não há exposição de seus métodos via interface
- Ver figura

- O fluxo é descrito aqui
- O usuário chama o servlet pela Web fornecendo os parâmetros da pesquisa (1)
- O servlet chama um método do componente de pesquisa EJB
- O componente de pesquisa constroi 3 mensagens JMS, uma para cada fornecedor a ser
pesquisado e os coloca na fila de pedidos (RequestQ) (2)
- O componente EJB espera respostas na fila ResponseQ
- A chegada de mensagens na RequestQ gatilha a invocação dos MDBs
- Um MDB é chamado por mensagem e todos iniciam processamento ao mesmo tempo (3)
- As MDBs devolvem as respostas através de mensagens JMS e as colocam na ResponseQ (4)
- Devemos ter as respostas em mais ou menos 15 segundos (digamos)
- O componente de pesquisa EJB recebe as mensagens (5), e devolve resultados ao servlet
inicial que encaminha o resultado para a view apropriada (6)
Considerações de design
- Em uso multiusuário, haverá muitos MDBs mandando mensagens para a ResponseQ e deve
haver uma forma de mapear mensagens para o componente EJB de pesquisa apropriado. Formas
de fazer isso:
- Usar uma fila temporária para cada pedido
- O componente EJB cria uma fila só para ele e passa seu nome nas mensagens
- Usar um pool de filas de resposta
- Como antes mas, com um pouco de trabalho de gestão de pool, permite evitar o overhead
de criar as filas
- Usar a funcionalidade de "selector" do JMS que permite escolher apenas
mensagens desejadas da fila de resposta
- O selector permite que um cliente selecione as mensagens de interesse usando headers de
mensagens
- Só serão recebidas mensagens que casarem os headers e propriedades de mensagens
- O componente de pesquisa EJB pode criar uma chave (key) para cada fornecedor e passá-la
nas mensagens
- Em seguida, o componente de pesquisa criar um selector com a chave e espera mensagens
- O código abaixo mostra como fazer isso
- A desvantagem é que o selector pode ser lento se houver muitas mensagens na fila
Código para o componente de pesquisa
/* Step 1: Create a unique key, use some class, say
UniqueKeyCreater. **/
String uniqueKey = UniqueKeyCreater.getKey();
String retailers [] = new String []{"A", "B", "C"};
/* Step 2: Put three messages in the REQUESTQ, one for each retailer.
Define a header called "KEY" and set its value to uniqueKey
Note: The retailer MDBs also have to set the same header/value
in the response messages. **/
QueueSender queueSender = queueSession.createSender(requestQueue);
Message message = queueSession.createTextMessage();
message.setStringProperty("KEY",uniqueKey);
for(int i = 0; i < retailers.length; i++) {
message.setText("Retailer:" + retailers[i] + ",Item:" + itemName);
queueSender.send(message);
}
/* Step 3: Now wait for messages in the response queue. Get only those
messages that have the header KEY with value set to uniqueKey.
Use a JMS selector for this purpose. The while loop will break if
all
three responses arrive in RESPONSEQ or 30 seconds are over. Even if
all three response messages do not arrive in 30 seconds, the code
will be out of while loop, ensuring a guaranteed response time of 30
seconds. **/
String selector = "KEY = '" + uniqueKey + "'";
QueueReceiver queueReceiver =
queueSession.createReceiver(responseQueue, selector);
long startTime = System.currentTimeMillis();
int messagesExpected = retailers.length;
long waitTime = 30000; // 30 seconds
Vector responses = new Vector(retailers.length);
while(messagesExpected > 0 && waitTime > 0 ) {
Message rcvdMsg = queueReceiver.receive(waitTime);
//Check if we got a msg, if not then break.
if (rcvdMsg == null){
//Wait time expired.
break;
}
responses.add(rcvdMsg);
messagesExpected--;
waitTime = 30000 - (System.currentTimeMillis() - startTime);
}
- A melhor solução depende de testes de desempenho feitos com seu servidor de
aplicação
- Na solução que adotamos, podemos ter um pico muito alto de atividade já que tudo
ocorre em paralelo sem restrição
- Uma solução é usar um pool de ResponseQ e limitar o paralelismo por aí
Tempo de resposta garantido
- Como garantir tempo de resposta, mesmo que alguns fornecedores não respondam ou demorem
a responder?
- O componente EJB pode esperar respostas por um tempo máximo usando o método
queueReceiver.receive(long waitimeout)
- Ver código acima
Bibliografia