들어가며
코인은 주식과는 다르게 24시간 거래가 가능합니다. 때문에 코인에 대한 기술적 분석을 하기 위해서는 24/7 발생되는 실시간 데이터를 수집하고 처리할 수 있어야 합니다. 카프카를 활용하여 실시간으로 코인 데이터를 수집하는 데이터 파이프라인 구축 사례를 공유합니다.
무엇이 필요할까?
어떤 코인 데이터를 어떤 방법으로 수집할 수 있을지 알아보자.
Upbit OPEN API
코인 데이터는 각 거래소에서 제공하는 OPEN API 를 통해 조회할 수 있다. 나는 코인 거래소 중 하나인 업비트에서 제공하는 API 를 사용하고자 한다.
업비트에서는 분(minute) 단위 캔들 데이터를 조회할 수 있는 API 를 제공하고 있다. (API 문서) 이 API 를 통해 각 코인의 분 단위 캔들 데이터를 최대 200개까지 조회할 수 있다.
OPEN API 의 특징
OPEN API 는 보통 초당 또는 분당 호출할 수 있는 요청이 제한되어 있다. 분 단위 캔들 데이터를 제공하는 업비트의 OPEN API 의 경우에는 초당 10회, 분당 600회로 제한되어 있다.
분 단위 별 캔들 데이터
기술적 분석의 가장 기본이 되는 데이터는 캔들 데이터이다. (위키피디아) 캔들 데이터는 특정 단위 시간 내 주가의 움직임을 캔들 스틱으로 표현한 것인데, 개인적으로는 1분, 5분, 15분, 60분, 240분 총 5개의 분 단위 캔들을 활용한다. 따라서, 5개의 분 단위 주기마다 전체 코인에 대한 캔들 데이터를 조회하여 저장해야 한다. 현재 코인 거래소 업비트의 원화 마켓에서 거래할 수 있는 코인은 총 118종이 있다.
1분봉 데이터에 대한 조회는 1분 주기로 118번의 API 요청이 발생하고, 5분봉 데이터 조회는 5분 주기로 118번의 API 호출이 발생한다. 5분째가 되면 1분봉 데이터와 5분봉 데이터를 모두 조회하는 시점이기 때문에 총 236번의 호출이 발생한다. 240분째가 되면 1분, 5분, 15분, 60분, 240분 캔들 데이터를 모두 조회하는 시점이 겹치기 때문에 590번의 API 호출이 발생하게 된다.
제약 사항
별도의 처리없이 1분 주기로 118종에 대한 조회를 한번에 해버리면 보통 1초 내외로 모든 요청이 이뤄지기 때문에 초당 10회 요청을 넘어버리게 된다. 따라서 한번에 모든 요청을 발생시키는 것이 아니라 초당 10회, 분당 600회에 맞춰서 OPEN API 를 호출하도록 트래픽을 분산할 필요가 있다.
트래픽 분산하기
트래픽을 분산하는데는 다양한 방법들이 있지만 이번에는 아파치 카프카를 활용하여 트래픽을 분산해보고자 했다. (토이 프로젝트에 카프카를 사용해보고 싶었기 때문에..)
각 분 단위 캔들 요청을 바로 OPEN API 를 호출하게 하는 것이 아니라, 각 요청을 먼저 카프카 메시지로 발행한다 (produce). 발행한 메시지는 컨슈머가 1초에 10개씩 컨슘할 수 있도록 구성한다 (consume).
카프카 설정
해당 메시지를 컨슘하기 위한 토픽은 1개, 파티션도 1개로 설정했다.
토픽을 분리하지 않은 이유는 5개의 조회 요청에 대해 동일한 로직을 수행할 것이기 때문에 굳이 분리하지 않았고, 초당 10개의 메시지만 컨슘할 것이기 때문에 파티션도 1개면 충분할 것이라고 생각했다.
Topic 설정
토픽의 보관 기간의 기본값은 7일로 설정되어 있다. 따라서 별도의 설정을 하지 않으면 7일간 메시지가 보관된다. 하지만 분 단위 캔들 데이터를 조회하는 경우에는 1분 이내로 모든 메시지들이 컨슘되기 때문에 7일간 메시지를 보관할 필요가 없다. 적당히 1시간으로 설정해두었다.
@Configuration
class KafkaTopicConfig(
@Value("\${crypto.kafka.topics.candle-minute}") private val candleMinuteTopic: String,
) {
@Bean
fun minuteCandleTopic(): NewTopic {
return TopicBuilder.name(candleMinuteTopic)
.configs(
mapOf(
TopicConfig.RETENTION_MS_CONFIG to "3600000" // 1 hour
)
)
.partitions(1)
.replicas(1)
.build()
}
}
Producer 설정
Producer 설정은 다음과 같다.
@Configuration
class KafkaProducerConfig {
private val bootstrapServers: String = "localhost:29092, localhost:39092, localhost:49092"
@Bean
fun kafkaAdmin(): KafkaAdmin {
val properties = mutableMapOf<String, Any>()
properties[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
return KafkaAdmin(properties)
}
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val properties = mutableMapOf<String, Any>()
properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] =
"org.apache.kafka.common.serialization.StringSerializer"
properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] =
"org.springframework.kafka.support.serializer.JsonSerializer"
return DefaultKafkaProducerFactory(properties)
}
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<String, String>): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory)
}
}
Consumer 설정
1초당 10개의 메시지를 컨슘하기 위한 설정은 MAX_POLL_RECORDS_CONFIG 와 idleBetweenPolls 이다.
MAX_POLL_RECORDS_CONFIG 는 컨슈머가 메시지를 poll 을 할 때 최대 가져올 메시지 개수를 제한한다. 기본값은 500 이다. idleBetweenPolls 는 각 poll 사이의 유휴 시간을 의미한다. 이 두 조건을 조합하여 초당 10건의 메시지를 컨슘할 수 있도록 설정했다.
Consumer 설정은 다음과 같다.
@Configuration
class KafkaConsumerConfig {
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val properties = mutableMapOf<String, Any>()
properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:29092, localhost:39092, localhost:49092"
properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] =
"org.apache.kafka.common.serialization.StringDeserializer"
properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] =
"org.springframework.kafka.support.serializer.JsonDeserializer"
properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
properties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 10 // poll 할 때마다 최대 가져올 메시지 개수
return DefaultKafkaConsumerFactory(properties)
}
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<String, String>): ConcurrentKafkaListenerContainerFactory<String, String> {
val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
containerFactory.consumerFactory = consumerFactory
containerFactory.containerProperties.idleBetweenPolls = 1_000 // 각 poll 사이의 유휴시간
containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.BATCH
return containerFactory
}
}
메시지 발행
어플리케이션에서 메시지가 발행되면 설정한 카프카 토픽에 메시지들이 쌓인다.
카프카 도커 파일
도커를 활용하여 카프카를 사용했다. 아래는 카프카를 도커에 올리기 위해 사용한 docker-compose 파일이다.
3대의 클러스터로 구성하게끔 했다.
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.9
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
kafka-1:
image: confluentinc/cp-kafka:7.0.9
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092, PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
kafka-2:
image: confluentinc/cp-kafka:7.0.9
depends_on:
- zookeeper
ports:
- "39092:39092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092, PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
kafka-3:
image: confluentinc/cp-kafka:7.0.9
depends_on:
- zookeeper
ports:
- "49092:49092"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092, PLAINTEXT_HOST://localhost:49092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
kafka-ui:
image: provectuslabs/kafka-ui
depends_on:
- zookeeper
- kafka-1
- kafka-2
- kafka-3
ports:
- "9000:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092, kafka-2:9092, kafka-3:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
레디스를 활용하여 데이터 저장
위와 같이 API 를 통해 가져온 데이터는 레디스에 저장하도록 구성했다. 아래 그림은 비트코인 (BTC) 에 대해 1분, 5분, 15분, 60분 데이터가 쌓여있는 모습이다.
1분 데이터를 보면 정상적으로 잘 쌓이는 것을 확인할 수 있다.
마무리하며
이번 글에서는 카프카를 활용하여 트래픽을 분산하고 TPS 를 관리하는 방법에 대해 알아보았다. 트래픽을 분산하는 측면에서는 확실히 장점이 있다고 생각한다. 또한 분 단위 캔들 데이터 조회 요청을 하는 부분과 "초당 10회 요청 제한"이라는 외부 조건을 분리하여 관리할 수도 있다. 만약 분 단위 캔들이 신규로 필요한 경우에는 (예: 3분봉 데이터가 추가로 필요) 해당 토픽으로 메시지만 추가로 발행하면 된다. 컨슘하는 쪽에서는 여전히 1초당 10개의 메시지만 컨슘하기 때문에 시스템은 무너지지 않는다.
다만 신규 분 단위의 캔들이 지속적으로 추가되는 경우에는 produce 속도가 consume 속도를 넘어설 수 있기 때문에 메시지 렉이 쌓이지 않도록 추가 작업이 필요하다. 예를 들어, 초당 10회 요청 제한은 각 IP 주소를 기준으로 하기 때문에 새로운 서버를 둘 수도 있다. 또는 분 단위 데이터는 한번의 API 요청에 최대 200개까지 조회할 수 있기 때문에 API 요청 빈도를 늦추고 조회하는 캔들 개수를 늘려서 처리할 수도 있다. 지금 당장은 1분에 최대 590번의 캔들 조회만 발생하기 때문에 (= 600번 이내) 별도의 처리는 할 필요가 없다.
TPS 를 관리하는 측면에서는 아직 잘 모르겠다. 왜냐하면 최대 가져오는 메시지 개수를 제한하기 위해 MAX_POLL_RECORDS 라는 설정을 활용했는데 이 설정의 경우에는 최대만 제한 하는 것이지 매번 확정적으로 원하는 값을 가져오진 않는다. 10개로 설정했다고 해도 그 보다 더 작은 개수가 polling 될 수도 있다.
또한 지금은 5개의 분 단위 데이터를 가져오는 과정에서 별도의 우선순위를 두지 않았는데 (예: 1분봉 데이터는 항상 가장 먼저 처리할 수 있도록) 카프카를 활용하면 이 부분을 해결하기 조금 어렵겠다는 생각을 했다. 이런 부분들 때문에 다음에는 레디스도 한번 활용해보는 것이 좋겠다는 생각을 하게 되었다.
'스프링 > 만들면서 배우는 실무 백엔드 개발' 카테고리의 다른 글
14. GitHub Action 에서 스프링 배치 실행하기 (0) | 2023.08.08 |
---|---|
13. SpringBatch 5 + Kotlin 적용하기 (0) | 2023.07.16 |
12. SpringBoot 에서 AOP 적용하기 (Kotlin) (0) | 2023.05.04 |
11. Jacoco + Gradle.kts 로 테스트 커버리지 확인하기 (feat. SonarQube) (0) | 2023.04.07 |
10. ArchUnit 으로 아키텍쳐 검사하기 (0) | 2023.04.05 |
댓글