본문 바로가기

대용량 플랫폼

[박혜웅] RabbitMQ Tutorial for Java

RabbitMQ 홈페이지 튜토리얼의 예제는 Python을 기본으로 하고 있어서, Java 예제는 2 개밖에 없다. 또한 예제끼리 코드가 중복되는 부분도 있고, 실행시 파라미터로 전달받게 되어 있어서 이클립스등으로 테스트할 때 불편한 부분도 있다.
그래서 Python 코드를 Java 코드로 컨버팅하고 이클립스로 테스트하기 쉽도록 약간 코드를 수정해 보았다. (실행하기 위해서는 RabbitMQ Client Download 페이지에서 클라이언트를 다운 받아야 한다.)

RabbitMQ 홈페이지 튜토리얼을 먼저 한 번 읽은 후에, 아래 내용을 보는 것을 권장한다.

<튜토리얼에 대한 간략한 설명>
편의상 Producer(메시지를 삽입하는 프로그램)을 P, Consumer(큐에서 메시지를 꺼내는 프로그램)은 C으로 표기한다. 웹서비스에 이용한다면 아마도 P는 웹 어플리케이션(front-end)이 될 것이고, C는 백그라운드로 동작하는 데몬(back-end)이 될 것이다. JAVA로 Linux상에서 데몬을 작성한다면, 이 글을 참고하자. 

규칙에 의해서 메시지를 큐에 전달(Exchange)할 때, 그 규칙을 X로 표기하고이 때 규칙과 큐를 binding하는 인자값(Routing Key)은 Key로 표기한다.

아래 그림에서 모두 P는 1개인데, 실제로 웹서버는 여러 대일 것이므로, P가 여러 개라고 생각하자.

언제나 첫번째 예제인 "Hello World" 프로젝트. 
C가 1개이므로 작업(메시지 처리)가 분산되지 않아, 실제로 사용할 일은 많지 않다.
다만 로직을 P와 C로 분리하여, 오래 걸리는 작업은 C가 담당하면 사용자 입장에서의 응답속도가 빨라질 것이다.


큐에서 순서대로 메시지를 읽어서 C1, C2, ... 순서대로 번갈아 수행한다.
따라서 C1과  C2가 다른 메시지를 받지만 하는 일은 같다.


X(fanout)에 의해, 여러 큐에 모두 동일한 메시지가 삽입된다.
따라서 C1, C2가 모두 동일한 메시지를 받지만 하는 일은 다르다.
Hellow World 예제에서 C가 하던 일을 C1, C2로 분리했다고 생각할 수 도 있다.


C가 X(direct)와 큐를 binding할 때, Key를 명시하고, 메시지의 Key와 비교해서 동일한(complete matching)경우만 각 큐에 삽입된다. 위의 그림에서 Key가 error일 때는 두 큐에 모두 삽입되고, Key가 info, warning일 경우에는 두번 째 큐에만 삽입된다. 그 외의 Key는 큐에 삽입되지 않는다. 다시 말해, 그 외의 key를 가진 메시지는 버려진다.
위의 경우 C1은 중대한 오류(error)에 대해서만 처리하고, C2는 모든 오류(info, error, warning)에 대해서 처리한다.
따라서 C1, C2는 다른 메시지를 받으며, 하는 일도 다르다. 


위의 Routing 예제와 비슷하다. 단 Key를 비교할 때, complete matching이 아니라 pattern matching을 사용한다.
이 때 Key는 .(마침표)로 구분되어 word(토큰)로 나누어지며, 각 word에 대하여 *는 1 단어를, #는 0또는 여러 개의 단어를 의미한다는 것이다. 개인적인 생각이지만, *와 #대신 일반적인 와일드카드인 ?와 *를 사용하는 것이 더 직관적이었을 것이다.
주의할 점은, 위 그림에서 P의 Key가 "lazy"일 때도 Q2에 전달된다는 것이다. 다시 말해 .(마침표)는 단어의 구분자일 뿐이며 X가 비교할 때에는 포함되지 않는다는 것이다.
따라서 C1, C2는 다른 메시지를 받으며, 하는 일도 다르다. 


RPC(Remote Procedure Call)를 사용하는 경우이며, 이 소스는 Java로 컨버팅하지 않았다.

<Java 코드>

RabbitmqClient.java
Rabbit 서버에 접속하고 종료하는 루틴을 공통 클래스로 작성했다.
package com.bagesoft.test.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitmqClient {
public static String HOST = "192.168.0.105";// RabbitMQ Server.
private Connection connection = null;
private Channel channel = null;

public Channel getChannel() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitmqClient.HOST);
this.connection = factory.newConnection();
this.channel = connection.createChannel();

return this.channel;
}

public void close() throws IOException {
this.channel.close();
this.connection.close();
}
}


Message.java
편의상 Key와 실제 데이타를 하나로 묶어서 메시지 클래스를 작성했다. 
package com.bagesoft.test.rabbitmq;

public class Message {
public String exchange = "";
public String routingKey = "";
public String body = "";

public Message(String routingKey, String body) {
this.routingKey = routingKey;
this.body = body;
}

public Message(String exchange, String routingKey, String body) {
this.exchange = exchange;
this.routingKey = routingKey;
this.body = body;
}

public String toString() {
if (exchange.length() > 0) {
return String.format("Exchange='%s', Key='%s', '%s'", exchange,
routingKey, body);
} else {
return String.format("Key='%s', '%s'", routingKey, body);
}
}
}



1. Hello World
Send.java
package com.bagesoft.test.rabbitmq.helloworld;

import java.util.ArrayList;
import java.util.List;

import com.bagesoft.test.rabbitmq.Message;
import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;

public class Send {
public final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws java.io.IOException {
RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// create messages.
List<Message> messages = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
messages.add(new Message(QUEUE_NAME, "Hello World! " + i));
}

System.out.println("ready to send.");
for (Message m : messages) {
channel.basicPublish(m.exchange, m.routingKey, null,
m.body.getBytes());
System.out.println(" [x] Sent " + m.toString());
}

client.close();
}
}

Recv.java
package com.bagesoft.test.rabbitmq.helloworld;

import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {
public final static String QUEUE_NAME = Send.QUEUE_NAME;

public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException {

RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);

System.out.println("ready to receive.");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}

2. Work Queues (Round-Robin)
NewTask.java
package com.bagesoft.test.rabbitmq.roundrobin;

import java.util.ArrayList;
import java.util.List;

import com.bagesoft.test.rabbitmq.Message;
import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {
public static final String QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws java.io.IOException {

RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

boolean durable = true;// for durability
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

// create messages.
List<Message> messages = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
messages.add(new Message(QUEUE_NAME, "task" + i
+ repeatChar(".", i)));
}

System.out.println("ready to send.");
for (Message m : messages) {
// for durability //MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish(m.exchange, m.routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN, m.body.getBytes());

System.out.println(" [x] Sent " + m.toString());
}

client.close();
}

private static String repeatChar(String ch, int repeat) {
String s = "";
for (int i = 0; i < repeat; i++) {
s += ch;
}
return s;
}
}

Worker.java
package com.bagesoft.test.rabbitmq.roundrobin;

import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {
private static final String QUEUE_NAME = NewTask.QUEUE_NAME;

public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException {

RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

boolean durable = true;// for durability
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

int prefetchCount = 1;// for fair dispatch
channel.basicQos(prefetchCount);

QueueingConsumer consumer = new QueueingConsumer(channel);

boolean autoAck = false;// for redelivering when consuming failed.
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

System.out.println("ready to receive.");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());

System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}

private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.')
Thread.sleep(1000);
}
}
}

3. Publish/Subscribe (FanOut)
EmitLog.java
package com.bagesoft.test.rabbitmq.fanout;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.bagesoft.test.rabbitmq.Message;
import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;

public class EmitLog {
public static String EXCHANGE = "logs";// exchange name
public static String EXCHANGE_TYPE = "fanout";// exchange type

public static void main(String[] args) throws IOException {
RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

channel.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE);

// create messages.
List<Message> messages = new ArrayList<Message>();
for (int i = 1; i < 11; i++) {
messages.add(new Message(EXCHANGE, "", "fanout" + i));
}

System.out.println("ready to send.");
for (Message m : messages) {
channel.basicPublish(m.exchange, m.routingKey, null,
m.body.getBytes());

System.out.println(" [x] Sent " + m.toString());

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

client.close();
}
}

ReceiveLog.java
package com.bagesoft.test.rabbitmq.fanout;

import java.io.IOException;

import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.bagesoft.test.rabbitmq.direct.EmitLog;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLog {
public static void main(String[] args) throws IOException,
InterruptedException {
RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

// bind exchange to random queue.
channel.exchangeDeclare(EmitLog.EXCHANGE, EmitLog.EXCHANGE_TYPE);
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EmitLog.EXCHANGE, "");

QueueingConsumer consumer = new QueueingConsumer(channel);

boolean autoAck = false;// for redelivering when consuming failed.
channel.basicConsume(queue, autoAck, consumer);

System.out.println("ready to receive.");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());

System.out.println(" [x] Received '" + message + "'");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}


4. Routing (Direct, Exact Matching)
EmitLog.java
package com.bagesoft.test.rabbitmq.direct;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.bagesoft.test.rabbitmq.Message;
import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;

public class EmitLog {
public static String EXCHANGE = "direct_logs";// exchange name
public static String EXCHANGE_TYPE = "direct";// exchange type

public enum SEVERITY {
INFO, WARNING, ERROR
}

public static void main(String[] args) throws IOException {
RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

channel.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE);
System.out.println("ready to send.");

// create messages.
List<Message> messages = new ArrayList<Message>();
messages.add(new Message(EXCHANGE, SEVERITY.INFO.name(), "start server"));
messages.add(new Message(EXCHANGE, SEVERITY.WARNING.name(),
"disk space left only 10%"));
messages.add(new Message(EXCHANGE, SEVERITY.INFO.name(), "stop server"));

// send message with routing key (=severity)
System.out.println("ready to send.");
for (Message m : messages) {
channel.basicPublish(m.exchange, m.routingKey, null,
m.body.getBytes());

System.out.println(" [x] Sent " + m.toString());
}

client.close();
}
}

ReceiveLog1.java
package com.bagesoft.test.rabbitmq.direct;

import java.io.IOException;

import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLog1 {
public static void main(String[] args) throws IOException,
InterruptedException {
RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

// bind exchange to random queue.
channel.exchangeDeclare(EmitLog.EXCHANGE, EmitLog.EXCHANGE_TYPE);
String queue = channel.queueDeclare().getQueue();

// bind for each routing key (=severity)
channel.queueBind(queue, EmitLog.EXCHANGE, "ERROR");

QueueingConsumer consumer = new QueueingConsumer(channel);

boolean autoAck = false;// for redelivering when consuming failed.
channel.basicConsume(queue, autoAck, consumer);

System.out.println("ready to receive.");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());

System.out.println(" [x] Received '"
+ delivery.getEnvelope().getRoutingKey() + "' '" + message
+ "'");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

ReceiveLog2.java
package com.bagesoft.test.rabbitmq.direct;

import java.io.IOException;

import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLog2 {
public static void main(String[] args) throws IOException,
InterruptedException {
RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

// bind exchange to random queue.
channel.exchangeDeclare(EmitLog.EXCHANGE, EmitLog.EXCHANGE_TYPE);
String queue = channel.queueDeclare().getQueue();

// bind for each routing key (=severity)
channel.queueBind(queue, EmitLog.EXCHANGE, "INFO");
channel.queueBind(queue, EmitLog.EXCHANGE, "WARNING");
channel.queueBind(queue, EmitLog.EXCHANGE, "ERROR");

QueueingConsumer consumer = new QueueingConsumer(channel);

boolean autoAck = false;// for redelivering when consuming failed.
channel.basicConsume(queue, autoAck, consumer);

System.out.println("ready to receive.");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());

System.out.println(" [x] Received '"
+ delivery.getEnvelope().getRoutingKey() + "' '" + message
+ "'");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}


5. Topics (Pattern Matching)
EmitLog.java
package com.bagesoft.test.rabbitmq.topic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.bagesoft.test.rabbitmq.Message;
import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;

public class EmitLog {
public static String EXCHANGE = "topic_logs";// exchange name
public static String EXCHANGE_TYPE = "topic";// exchange type

public static void main(String[] args) throws IOException {
RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

channel.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE);

// create messages.
List<Message> messages = new ArrayList<Message>();
messages.add(new Message("quick.orange.rabbit", "m1"));
messages.add(new Message("lazy.orange.elephant", "m2"));
messages.add(new Message("quick.orange.fox", "m3"));
messages.add(new Message("lazy.brown.fox", "m4"));
messages.add(new Message("lazy.pink.rabbit", "m5"));
messages.add(new Message("quick.brown.fox", "m6"));
messages.add(new Message("quick.orange.male.rabbit", "m7"));
messages.add(new Message("lazy.orange.male.rabbit", "m8"));
messages.add(new Message("lazy", "m9"));

System.out.println("ready to send.");
for (Message m : messages) {
// send message with routing key
channel.basicPublish(m.exchange, m.routingKey, null,
m.body.getBytes());

System.out.println(" [x] Sent " + m.toString());
}

client.close();
}

}

ReceiveLog1.java
package com.bagesoft.test.rabbitmq.topic;

import java.io.IOException;

import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLog1 {
public static void main(String[] args) throws IOException,
InterruptedException {
RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

// bind exchange to random queue.
channel.exchangeDeclare(EmitLog.EXCHANGE, EmitLog.EXCHANGE_TYPE);
String queue = channel.queueDeclare().getQueue();

// bind for each routing key (severity)
String routingKey = "*.orange.*";
channel.queueBind(queue, EmitLog.EXCHANGE, routingKey);

QueueingConsumer consumer = new QueueingConsumer(channel);

boolean autoAck = false;// for redelivering when consuming failed.
channel.basicConsume(queue, autoAck, consumer);

System.out.println("ready to receive. routingKey=" + routingKey);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());

System.out.println(" [x] Received '"
+ delivery.getEnvelope().getRoutingKey() + "' '" + message
+ "'");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

ReceiveLog2.java
package com.bagesoft.test.rabbitmq.topic;

import java.io.IOException;

import com.bagesoft.test.rabbitmq.RabbitmqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLog2 {
public static void main(String[] args) throws IOException,
InterruptedException {
RabbitmqClient client = new RabbitmqClient();
Channel channel = client.getChannel();

// bind exchange to random queue.
channel.exchangeDeclare(EmitLog.EXCHANGE, EmitLog.EXCHANGE_TYPE);
String queue = channel.queueDeclare().getQueue();

// bind for each routing key (severity)
String routingKey1 = "lazy.#";
String routingKey2 = "*.*.rabbit";
channel.queueBind(queue, EmitLog.EXCHANGE, routingKey1);
channel.queueBind(queue, EmitLog.EXCHANGE, routingKey2);

QueueingConsumer consumer = new QueueingConsumer(channel);

boolean autoAck = false;// for redelivering when consuming failed.
channel.basicConsume(queue, autoAck, consumer);

System.out.println("ready to receive. routingKey=" + routingKey1);
System.out.println("ready to receive. routingKey=" + routingKey2);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());

System.out.println(" [x] Received '"
+ delivery.getEnvelope().getRoutingKey() + "' '" + message
+ "'");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}