使用java代码操作rabbitmq时,首先需要一个有创建用户等权限的管理员账号,需要在rabbitmq的后台管理页面手动创建这个账号,系统推荐的这几个tag可以让账号有rabbitmq后台管理页面的访问权限
图一
管理账号创建完成后就可以在代码中操作新增编辑mq账号及vhost等等了,点击rabbitmq后台管理页面左下角的HTTP API(见上文图一左下角)可以查看所有API接口
以下代码中,rabbitmqUsername 为管理员账号的用户名,rabbitmqPassword为管理员账号的密码,rabbitmqUrl为rabbitmq服务器接口地址(例:http://127.0.0.1:15672/api/)
1. 新增用户或修改用户密码
1.1 API
1.2 代码示例
以下代码中,yourUsername为新增账号的用户名,yourPassword为新增账号的密码,guest为新增账号的tag,可以自定义,也可以使用rabbitmq提供的tag(见上文图一),该接口也可以用来修改已有账号的密码
//add user String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) ); HttpPut putCriaUsuario = new HttpPut( rabbitmqUrl+"users/"+yourUserName ); // RabbitMQ requires a user with create permission, create it mannually first putCriaUsuario.addHeader( "Authorization", "Basic " + enc ); putCriaUsuario.addHeader( "content-type", "application/json" ); putCriaUsuario.setEntity( new StringEntity( "{\"password\":\""+yourPassword+"\",\"tags\":\"guest\"}" ) ); CloseableHttpClient client = HttpClients.createDefault(); client.execute( putCriaUsuario );
2. 新增vhost
2.1 API
2.2 代码示例
以下代码中 yourVhost 为新增vhost的名称,guest为自定义的tag
// 管理员账号用户名密码 String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) ); //add vhost HttpPut putVhost = new HttpPut( rabbitmqUrl+"vhosts/"+yourVhost ); putVhost.addHeader( "Authorization", "Basic " + enc ); putVhost.addHeader( "content-type", "application/json" ); putVhost.setEntity( new StringEntity( "{\"tags\":\"guest\"}" ) ); CloseableHttpClient putVhostClient = HttpClients.createDefault(); putVhostClient.execute( putVhost );
3. mq用户绑定vhost并设置权限
3.1 API
3.2 代码示例
以下代码中,yourVhost 与yourUsername为绑定的mq用户与vhost的名称,代码示例中该用户对该vhost只开启了read权限,如果需要开启全部的configure(配置),write(写入),read(读取)权限,参数需要写成:
"{\"configure\":\".*\",\"write\":\".*\",\"read\":\".*\"}"
//管理员账号用户名密码 String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) ); //add permissions and bind user&vhost HttpPut putPermissions = new HttpPut( rabbitmqUrl+"permissions/"+yourVhost+"/"+yourUsername); putPermissions.addHeader( "Authorization", "Basic " + enc ); putPermissions.addHeader( "content-type", "application/json" ); putPermissions.setEntity( new StringEntity( "{\"configure\":\"\",\"write\":\"\",\"read\":\".*\"}" ) ); CloseableHttpClient putPermissionsClient = HttpClients.createDefault(); putPermissionsClient.execute( putPermissions );
4. 动态创建exchange交换机和queue队列,并绑定指定vhost虚拟机
//add exchange, queue and bind vhost RabbitModuleInfo rabbitModuleInfo = new RabbitModuleInfo(); rabbitModuleInfo.setVhost(vhost); RabbitModuleInfo.Queue queue = new RabbitModuleInfo.Queue(); Map<String, Object> arguments = new HashMap<>(); //消息过期时间 arguments.put("x-message-ttl",3600000); queue.setName(queueName); queue.setArguments(arguments); rabbitModuleInfo.setQueue(queue); RabbitModuleInfo.Exchange exchange = new RabbitModuleInfo.Exchange(); exchange.setName(exchangeName); rabbitModuleInfo.setExchange(exchange); rabbitModuleInfo.setRoutingKey(queueName); rabbitModuleInitializer.declareRabbitModule(rabbitModuleInfo);
/** * RabbitMQ队列初始化器 */ public class RabbitModuleInitializer{ private AmqpAdmin amqpAdmin; private RealtimePushProducer realtimePushProducer; public RabbitModuleInitializer(AmqpAdmin amqpAdmin,RealtimePushProducer realtimePushProducer) { this.amqpAdmin = amqpAdmin; this.realtimePushProducer = realtimePushProducer; } /** * RabbitMQ 根据配置动态创建和绑定队列、交换机 */ public void declareRabbitModule(RabbitModuleInfo rabbitModuleInfo) { configParamValidate(rabbitModuleInfo); // 队列 Queue queue = convertQueue(rabbitModuleInfo.getQueue()); // 交换机 Exchange exchange = convertExchange(rabbitModuleInfo.getExchange()); // 绑定关系 String routingKey = rabbitModuleInfo.getRoutingKey(); String queueName = rabbitModuleInfo.getQueue().getName(); String exchangeName = rabbitModuleInfo.getExchange().getName(); Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); realtimePushProducer.bindVhostExchangeQueue(rabbitModuleInfo.getVhost(),exchange,queue,binding); } /** * RabbitMQ动态配置参数校验 * * @param rabbitModuleInfo */ public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) { String routingKey = rabbitModuleInfo.getRoutingKey(); Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置"); Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey); Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey); Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey); Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name属性", routingKey); } /** * 转换生成RabbitMQ队列 * * @param queue * @return */ public Queue convertQueue(RabbitModuleInfo.Queue queue) { Map<String, Object> arguments = queue.getArguments(); // 转换ttl的类型为long if (arguments != null && arguments.containsKey("x-message-ttl")) { arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl"))); } // 是否需要绑定死信队列 String deadLetterExchange = queue.getDeadLetterExchange(); String deadLetterRoutingKey = queue.getDeadLetterRoutingKey(); if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) { if (arguments == null) { arguments = new HashMap<>(4); } arguments.put("x-dead-letter-exchange", deadLetterExchange); arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey); } return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments); } /** * 转换生成RabbitMQ交换机 * * @param exchangeInfo * @return */ public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) { AbstractExchange exchange = null; RabbitExchangeTypeEnum exchangeType = exchangeInfo.getType(); String exchangeName = exchangeInfo.getName(); boolean isDurable = exchangeInfo.isDurable(); boolean isAutoDelete = exchangeInfo.isAutoDelete(); Map<String, Object> arguments = exchangeInfo.getArguments(); switch (exchangeType) { case DIRECT:// 直连交换机 exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments); break; case TOPIC: // 主题交换机 exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments); break; case FANOUT: //扇形交换机 exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments); break; case HEADERS: // 头交换机 exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments); break; } return exchange; } }
/** * 根据配置动态创建和绑定队列、交换机 * @param vhost * @param exchange * @param queue * @param binding */ @Override public void bindVhostExchangeQueue(String vhost, Exchange exchange, Queue queue, Binding binding) { ConnectionFactory factory = queueConfig.pushConnectionFactory(rabbitProperties,vhost); RabbitAdmin rabbitAdmin = new RabbitAdmin(factory); log.debug("bind vhost={},exchange={},queue={}",vhost,exchange.getName(),queue.getName()); // 创建队列 rabbitAdmin.declareQueue(queue); // 创建交换机 rabbitAdmin.declareExchange(exchange); // 队列 绑定 交换机 rabbitAdmin.declareBinding(binding); }
/** * 生成指定vhost的ConnectionFactory * @param rabbitProperties * @param vhost * @return */ public ConnectionFactory pushConnectionFactory(RabbitProperties rabbitProperties, String vhost) { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setHost(rabbitProperties.getHost()); cachingConnectionFactory.setPort(rabbitProperties.getPort()); cachingConnectionFactory.setUsername(rabbitProperties.getUsername()); cachingConnectionFactory.setPassword(rabbitProperties.getPassword()); cachingConnectionFactory.setVirtualHost(vhost); return cachingConnectionFactory; }