centos安装
`1、安装erlang 以root身份执行下面命令 yum install erlang 2、安装 rabbitmq-server 打开RabbitMQ的下载页面,http://www.rabbitmq.com/download.html ,选择对应平台的二进制发行包下载;目前使用的是CentOS ,属于与RHEL/Fedora相兼容的版本,下载针对RHEL的二进制版本(Binary)即可: 本例中RabbitMQ的版本是3.5.1,下载得到文件rabbitmq-server-3.5.1-1.noarch.rpm 命令如下: wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1-1.noarch.rpm
安装RabbitMQ Server
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.5.1-1.noarch.rpm
3、启动RabbitMQ
配置为守护进程随系统自动启动,root权限下执行:
chkconfig rabbitmq-server on
启动rabbitMQ服务
/sbin/service rabbitmq-server start 如果报如下异常:
Starting rabbitmq-server (via systemctl): Job for rabbitmq-server.service failed. See 'systemctl status rabbitmq-server.service' and 'journalctl -xn' for details. [FAILED] 尝试下面的操作: 禁用 SELinux ,修改 /etc/selinux/config SELINUX=disabled 修改后重启系统
4、安装Web管理界面插件 终端输入:
rabbitmq-plugins enable rabbitmq_management 安装成功后会显示如下内容
The following plugins have been enabled: mochiweb webmachine rabbitmq_web_dispatch amqp_client rabbitmq_management_agent rabbitmq_management Plugin configuration has changed. Restart RabbitMQ for changes to take effect. 5、登录Web管理界面 安装好插件并开启服务后,可以浏览器输入xxxx:15672,账号密码全输入guest即可登录。
rabbitmq的web管理界面无法使用guest用户登录
这里需要注意下,从3.3.1版本开始,RabbitMQ默认不允许远程ip登录,即只能使用localhost登录。如果希望远程登录,请添加用户权限,方法见我另一篇文章设置RabbitMQ远程ip登录。
rabbitmq例子
消费者,生产者都继承自该类
public abstract class EndPoint {
protected Channel channel;
protected Connection connection;
protected String endPointName;
public EndPoint(String endPointName) throws IOException{
//queue名称
this.endPointName = endPointName;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.2.223.71");
//注意这里的port 为5672
factory.setPort(5672);
factory.setUsername("oyth");
factory.setPassword("oyth");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(endPointName, false, false, false, null);
}
public void close() throws IOException{
this.channel.close();
this.connection.close();
}
生产者
public class Producer extends EndPoint {
public Producer(String endPointName) throws IOException {
super(endPointName);
}
public void sendMessage(Serializable object) throws IOException{
channel.basicPublish("",endPointName,null, SerializationUtils.serialize(object));
}
消费者
public class QueueConsumer extends EndPoint implements Runnable,Consumer {
public QueueConsumer(String endPointName) throws IOException{
super(endPointName);
}
@Override
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer "+consumerTag +" registered");
}
@Override
public void handleCancelOk(String s) {
}
@Override
public void handleCancel(String s) throws IOException {
}
@Override
public void handleShutdownSignal(String s, ShutdownSignalException e) {
}
@Override
public void handleRecoverOk(String s) {
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
Map map = (HashMap) SerializationUtils.deserialize(body);
System.out.println("Message Number "+ map.get("message number") + " received.");
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p/>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
//start consuming messages. Auto acknowledge messages.
channel.basicConsume(endPointName, true,this);
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行代码
public class TestMq {
public TestMq() throws Exception{
QueueConsumer queueConsumer = new QueueConsumer("queue");
Thread thread = new Thread(queueConsumer);
thread.start();
Producer producer = new Producer("queue");
for (int i =0;i<100;i++){
HashMap msg = new HashMap();
msg.put("message number",i);
producer.sendMessage(msg);
System.out.println("Message Number "+ i +" sent.");
}
}
public static void main(String[] args) throws Exception{
new TestMq();
}
}