일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- java
- 시큐리티
- 페이징
- Security
- OAuth
- oauth2
- load balancing
- clean code
- g1
- JPA
- Producer
- 스프링
- assertj
- apache
- GC
- JWT
- gdg
- 페이스북
- 스프링부트
- RabbitMQ
- spring boot
- 권한
- jvm
- 스프링 부트
- tomcat
- 클린코드
- Spring
- Refactoring
- 비동기
- 리팩토링
- Today
- Total
허원철의 개발 블로그
Spring Boot - RabbitMQ 본문
이번 글은 저번 메시지 큐(Message Queue) 훑어보기에 이어 Spring Boot 에서 RabbitMQ를 활용한 예제에 대한 글입니다.
1. 어떻게 RabbitMQ를 접하게 되었는가..?
2. 왜 RabbitMQ인가?
3. 설치 과정
4. 개념 정리
5. 예제
1. 어떻게 RabbitMQ를 접하게 되었는가..?
이번에 사내 프로젝트를 진행하면서, 많은 데이터 처리가 웹서버만으로 힘들기 때문에 여러 방법을 구상 중 이였습니다.
앞단에 로드밸런싱을 해볼까?
서버사이드에 분산은 되겠지만, DB가 버텨줄지 의문이였습니다. 기존 시스템 또한 DB가 말썽이였기 때문입니다. 또한 DB 사이드에 session을 무한정 늘릴 수도 없습니다.
그래서 선택하게 된 것이 메시지큐입니다. 이미 몇 년전부터 많은 기업에서 사용 중 이였고, DB 과부하는 충분히 커버되리라 생각되어 검토 후 사용해보기로 했습니다.
2. 왜 RabbitMQ인가?
* RabbitMQ에서 플러그인 추가만으로 모니터링이 가능하다.
* 빠른 편에 속한다.
* 다양한 언어 지원하다.
JAVA만이 아닌 다른 언어로도 시스템이 돌고 있는 회사 시스템에 적용하기 위해서 RabbitMQ를 선택하게 되었습니다.
3. 설치 과정
① 다운로드 URL
https://www.rabbitmq.com/download.html
Default User : guest/guest
② 모니터링 플러그인 추가(rabbitmq cmd )
rabbitmq-plugins enable rabbitmq-management
③ rabbitmq 서비스 재시작
이로써 http://localhost:15672으로 모니터링이 가능합니다.
4. 개념 정리
* RabbitMQ 기본개념
1) Message
: 일정 구조를 지닙니다.
- header
① message id
② timestamp
③ contenttype
- body
① data
2) Exchange
: Producer가 메시지를 보낼 때, AMQP로 교환하는 과정을 나타냅니다.
종류
① Direct(default)
- Routing Key와 Exchange이 같을 때 입니다.
② Topic
- Routing Key와 Exchange이 일정 패턴이 동일할 때 입니다.
③ Fanout
- 무관하게 넣습니다.
3) Queue
: Message가 쌓이는 구성요소. 즉, Consumer가 Message를 받는 구성요소를 나타냅니다.
- Name : Queue 이름
- Durable : 대기열을 유지할지를 정할 플래그
- Exclusive : 선언된 연결에 의해서만 사용할지 정할 플래그
- Auto-Delete : 더 이상 사용되지 않는 큐를 삭제할지 정할 플래그
4) Binding
: 메시지를 주고 받을 수 있게 도와주는 교환기
5) Connection
: 실질적인 TCP 연결
6) Channel
: 가상연결, 메시지에 대한 작업 단위
* Spring RabbitMQ
1) ConnectionFactory : RabbitMQ를 연결하고 관리하는 인터페이스
- CachingConnectionFactory : Spring에서 제공하는 ConnectionFactory 객체
- SingleConnectionFactory : Spring에서 제공하는 Test ConnectionFactory 객체
2) SSL 적용
- RabbitConnectionFactoryBean 이용하여 가능합니다.
3) Send Message
- ...Send() : 메소드로 보낼 수 있습니다.
- v1.4.+ : MessagingMessageConverter 로 Coverter 추가가 가능합니다.( ex) Jackson2MessageConverter )
- v.1.4.2 : batchRabbitTemplate로 배치형태의 메시지 Send 가능합니다.
- v1.6.+ : Validate User Id
4) Receive Message
1) Polling Consumer
: 한번에 하나의 Message를 가져옵니다.
- receive...()
2) Asynchronous Consumer
: 비동기적으로 요청시 메시지를 수신하는 수신기를 등록합니다.
- SimpleRabbitListenerContainerFactory로 커스텀 수신이 가능합니다.
(Default Consumer = min : 3 / max : 10)
① MesageListener 를 상속받는 객체 생성 및 등록합니다.
② 실행될 메소드에 @RabbitListener 를 추가합니다
※ @Configuration에 @EableRabbit 추가해줘야 합니다.
5. 예졔
① Gradle 설정
② Config 설정
③ Producer 설정
④ Consumer 설정
① Gradle 설정
1 2 3 4 | dependencies { compile('org.springframework.boot:spring-boot-starter-amqp') testCompile('org.springframework.boot:spring-boot-starter-test') } | cs |
② Config 설정
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | @Configuration @EnableRabbit public class RabbitMQConfig { public static final String QUEUE_NAME = "queue"; private static final String EXCHANGE = QUEUE_NAME + "-exchange"; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setRoutingKey(QUEUE_NAME); template.setMessageConverter(jsonMessageConverter()); return template; } @Bean public SimpleMessageListenerContainer container() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setQueueNames(QUEUE_NAME); // container.setMessageListener(baseMesage()); container.setMessageConverter(jsonMessageConverter()); return container; } @Bean public Queue queue() { return new Queue(QUEUE_NAME, false); } @Bean public TopicExchange exchange() { return new TopicExchange(EXCHANGE); } @Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME); } @Bean public Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public BaseMesage baseMesage() { return new BaseMesage(); } @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost("localhost"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq"); return factory; } } | cs |
③ Producer 설정
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | @Component public class SendSchedler { @Autowired private RabbitTemplate rabbitTemplate; private static final Logger logger = Logger.getLogger(SendSchedler.class); @Scheduled(cron = "0/3 * * * * *") public void onSend() { logger.info("Sending message... Start"); StopWatch stopWatch = new StopWatch(); stopWatch.start(); IntStream.range(1, 15000) .parallel() .forEach(val -> { rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, new Base(val, "Hello, RabbitMQ! 1")); }); stopWatch.stop(); logger.info(stopWatch.toString()); logger.info("Sending message... End"); } } | cs |
④ Consumer 설정
1 2 3 4 5 6 7 8 9 10 | @Component public class BaseJsonMessage { private static final Logger logger = Logger.getLogger(BaseJsonMessage.class); @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) public void onMessage(Base base) { logger.info("Received < " + base.toString() + " >"); } } | cs |
참고
'web' 카테고리의 다른 글
Spring Boot - jar로 Deploy(배포)하기 (405) | 2017.01.19 |
---|---|
Spring Boot - Apache proxy를 이용한 로드밸런싱 (437) | 2017.01.17 |
메시지 큐(Message Queue) 훑어보기 (441) | 2017.01.08 |
Spring Boot - Exception Handler (405) | 2017.01.03 |
책 '서블릿 컨테이너의 이해' 후기 (419) | 2016.12.29 |