RabbitMQ五种模式(ROUTING模式)

Send.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Send {
public static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/zhang");
factory.setUsername("zhang");
factory.setPassword("zhang");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message = "message";
channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
channel.close();
connection.close();
}
}

Recv.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Recv {
public static String QUEUE_NAME = "test_queue_direct_1";
public static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/zhang");
factory.setUsername("zhang");
factory.setPassword("zhang");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
QUEUE_NAME, false, false,
false, null
);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

Recv2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Recv2 {
public static String QUEUE_NAME = "test_queue_direct_2";
public static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/zhang");
factory.setUsername("zhang");
factory.setPassword("zhang");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
QUEUE_NAME, false, false,
false, null
);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

build.gradle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
plugins {
id 'java'
}

version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
mavenCentral()
}

dependencies {
compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.3.0'
testCompile group: 'junit', name: 'junit', version: '4.12'
}

rabbit-mq源码地址​github.com