Introduction
Spring Data MongoDB의 맞춤식 커서를 활용해 MongoDB를 infinite(무한한) data stream으로 사용하는 방법에 대해서 알아보겠습니다.
Tailable Cursors
쿼리를 실행할 때, db driver는 커서를 열어 일치하는 documents를 제공합니다. 기본적으로, MongoDB는 클라이언트가 모든 결과를 읽었을 때 자동으로 커서를 닫습니다. 따라서 변환하면 finite(유한한) data stream이 생성됩니다.
그러나 계속 열린상태인 맞춤식 커서(tailable cursor)가 있는 제한 크기 컬렉션(capped collections)을 사용하여 무한 데이터 스트림을 만들 수 있습니다. 클라이언트가 처음에 반환된 데이터를 모두 소비한 후에도, 이 방법은 채팅 메시지 또는 주식 업데이트와 같은 이벤트 스트림을 처리하는 응용 프로그램에 유용합니다.
Spring Data MongoDB 프로젝트는 반응형 데이터베이스 기능을 활용할 수 있도록 지원합니다.
MongoDB Capped Collection(제한 컬렉션) 이란
제한 컬렉션은 삽입 순서에 따라 문서를 삽입하고 검색하는 높은 처리 작업을 지원하는 고정 크기 컬렉션으로, 순환 버퍼와 유사한 방식으로 작동합니다. 컬렉션이 할당 된 공간을 채우면 컬렉션에서 가장 오래된 문서를 덮어 써 새 문서를위한 공간을 만듭니다. 즉 쉽게말해, 일정 크기 안에서만 데이터를 사용하고, 더 사용할 공간이 없으면 제일 처음 생성한 데이터에 덮어쓰게 됩니다.
주의: 삭제가 불가능, document를 이동하는 식의 갱신은 불가능
장점: 입력속도가 빠르다, 새로운 데이터가 입력될 때 자동으로 옛날데이터 삭제
Setup
언급된 기능에 대해서 효과적으로 설명하기 위해 로그 카운터 애플리케이션을 만들어 보겠습니다. 모든 로그를 수집하고 유지하는 로그 수집기가 MongoDB 제한 컬렉션에 위치한다고 가정하겠습니다.
먼저 간단한 Log entity 를 사용합니다.
@Document
public class Log {
private @Id String id;
private String service;
private LogLevel level;
private String message;
}
다음으로, MongoDB 제한 컬렉션에 로그를 저장합니다. 제한 컬렉션은 삽입 순서에 따라 문서를 삽입하고 검색하는 고정 크기의 컬렉션입니다. MongoOperations.createCollection
으로 생성할 수 있습니다.
db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
.capped(true)
.sizeInBytes(1024)
.maxDocuments(5));
제한 컬렉션의 경우, sizeInBytes
속성을 정리해야 합니다. 또한 maxDocuments는 컬렉션이 가질 수 있는 최대 documents 수를 지정합니다. 이때 지정한 값에 도달하게 되면, 이전 문서가 컬렉션에서 제거됩니다.
셋째로, 적절한 Spring Boot starter dependency를 추가합니다.
Gradle
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb-reactive', version: '2.4.0'
Maven
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
<version>2.4.0</version>
</dependency>
Reactive Tailable Cursors
우리는 명령형(imperative)과 반응형(reactive) MongoDB API를 모두 사용하여 Tailable Cursors를 사용할 수 있습니다. 명령형과 반응형 중에서는 반응형 변형(reactive variant)을 사용하는 것이 좋습니다.
reactive한 접근 방식을 사용하여 WARN level 로그 카운터를 구현해 보겠습니다. ReactiveMongoOperations.tail
메서드를 사용하여 무한 스트림 쿼리를 생성 할 수 있습니다.
새 documents가 제한 컬렉션에 도착하고 필터 쿼리와 일치함에 따라, tailable cursor는 열린 상태로 유지되고 data (엔티티 Flux)를 내 보냅니다.
private Disposable subscription;
public WarnLogsCounter(ReactiveMongoOperations template) {
Flux<Log> stream = template.tail(
query(where("level").is(LogLevel.WARN)),
Log.class);
subscription = stream.subscribe(logEntity ->
counter.incrementAndGet()
);
}
WARN level 로그를 가진 새로운 document가 컬렉션에서 지속된다면, subscriber(lambda 식)가 카운터를 증가시킵니다.
마지막으로 우리는 구독(subscription)을 취소하고 stream을 닫아야 합니다.
public void close() {
this.subscription.dispose();
}
또한 우리는 여기서 두가지를 더 유의해야 합니다.
- tailable cursor가 죽어버리는 경우
- 쿼리가 처음에 일치하는 것을 리턴하지 못할 때, tailable cursor가 유효하지 않는 경우
즉, 새로운 지속되는 documents가 필터 쿼리와 일치하더라도 subscriber는 해당 documents를 수신 할 수 없습니다. 이것은 MongoDB tailable cursor의 알려진 제한 사항입니다. tailable cursor를 만들기 전에 capped collection에 일치하는 documents가 있는지 확인해야 합니다.
Tailable Cursors with a Reactive Repository
Spring Data 프로젝트는 reactive 버전을 포함하여 다양한 데이터 저장소에 대한 저장소 추상화를 제공합니다. MongoDB도 예외는 아닙니다. 자세한 내용은 MongoDB를 사용한 Spring Data Reactive Repositories 를 보시면 좋을 것 같습니다. 또한 MongoDB reactive repository는 쿼리 method에 @Tailable 어노테이션을 달아 무한 스트림을 지원합니다. Flux 나 여러 요소를 내보낼 수있는 다른 reactive types을 반환하는 그 어떤 repository method에 어노테이션을 달 수 있습니다.
public interface LogsRepository extends ReactiveCrudRepository<Log, String> {
@Tailable
Flux<Log> findByLevel(LogLevel level);
}
이 tailable repository 방법을 사용하여 INFO 로그를 계산해보겠습니다.
private Disposable subscription;
public InfoLogsCounter(LogsRepository repository) {
Flux<Log> stream = repository.findByLevel(LogLevel.INFO);
this.subscription = stream.subscribe(logEntity ->
counter.incrementAndGet()
);
}
마찬가지로 WarnLogsCounter
의 경우, 구독(subscription)을 삭제하여 스트림을 닫아야합니다.
public void close() {
this.subscription.dispose();
}
Tailable Cursors with a MessageListener
그럼에도 불구하고 reactive API를 사용할 수 없다면, Spring의 메시징 개념을 활용할 수 있습니다.
먼저 전송된 SubscriptionRequest
객체를 처리 할 MessageListenerContainer
를 만들어야합니다. Syncronous(동기식) MongoDB 드라이버는 제한 컬렉션에서 새로운 documents를 수신하는 long-running하고 blocking한 task을 생성합니다.
Spring Data MongoDB는 TailableCursorRequest
이라는 Task 인스턴스를 생성하고 실행 할 수 있는 default implementabtion이 제공됩니다.
private String collectionName;
private MessageListenerContainer container;
private AtomicInteger counter = new AtomicInteger();
public ErrorLogsCounter(MongoTemplate mongoTemplate,
String collectionName) {
this.collectionName = collectionName;
this.container = new DefaultMessageListenerContainer(mongoTemplate);
container.start();
TailableCursorRequest<Log> request = getTailableCursorRequest();
container.register(request, Log.class);
}
private TailableCursorRequest<Log> getTailableCursorRequest() {
MessageListener<Document, Log> listener = message ->
counter.incrementAndGet();
return TailableCursorRequest.builder()
.collection(collectionName)
.filter(query(where("level").is(LogLevel.ERROR)))
.publishTo(listener)
.build();
}
TailableCursorRequest
는 ERROR level 로그 만 필터링하는 쿼리를 생성합니다. 일치하는 각 documents는 카운터를 증가시키는 MessageListener
에 게시(published)됩니다.
여전히 초기 쿼리가 일부 결과를 반환하는지 확인해야합니다. 그렇지 않으면 tailable cursor가 즉시 닫힙니다.
추가로, container가 더 이상 필요하지 않게 되면 멈추는 것을 잊지 말아야 합니다.
public void close() {
container.stop();
}
Conclusion
Tailable Cursor를 사용하는 MongoDB 제한 컬렉션은 DB에서 data를 지속적으로 수신하는 데 도움이됩니다. 명시적으로 닫힐 때까지 결과를 계속 제공하는 쿼리를 실행할 수 있습니다. Spring Data MongoDB는 tailable cursors를 활용하여 blocking 및 reactive 방법을 하는 것을 제공합니다.
전체 예제의 소스 코드는 GitHub에서 사용할 수 있습니다.
출처:
본 포스팅은 https://www.baeldung.com/spring-data-mongodb-tailable-cursors 에 설명된 내용을 해석하여 작성하였습니다.
'개발 > DB' 카테고리의 다른 글
CAP 정리와 PACELC 정리 (0) | 2022.08.10 |
---|---|
[Redis] Redis Cluster의 기초 (0) | 2022.06.21 |
[mongoDB] 3. CRUD Operations - Create, Read (0) | 2020.08.21 |
[mongoDB] 2. Databases, Collections, Documents (0) | 2020.08.20 |
[mongoDB] 1. 시작하기 (소개, SQL vs NoSQL, 특징, 설치) (0) | 2020.08.19 |
댓글