Message Queue(MQ, 메시지큐)
메시지 큐란? 프로세스 또는 프로그램의 인스턴스 간에 데이터를 서로 교환할때 사용하는 방식 중 하나이며, 미들웨어의에 속하며 메모리 기반(Queue) 형식으로 처리하기 때문에 처리속도가 빠르다.
하지만 카프카는 메모리 기반이 아닌 파일시스템 기반이며, 이를 통해 **영속성(Persistence)**을 얻는다. 영속성을 통해서 과거 소비했던 메시지를 미래에 메시지 발행 없이 다시 소비할 수 있다.
Kafka는 Publish(발행)-Subscribe(구독) 모델을 이용하여 구현된 분산형 MQ
우체국에서 우편을 보내는 것과 매우 유사
Publisher(발행자)가 받는 이의 이름(Topic)과 전달 내용(Message)을 적어서 우체국(Kafka-broker)에 전달한다. 그러면 우체국에서 Topic별로 분산하여 저장한다(Queue in Memory).
Subscriber(구독자)는 Message를 읽을 준비가 되면 우체국에게 나에게(Topic) 온 메시지를 달라고 요청을 한다. 그러면 Kafka의 Broker가 해당 Topic으로 발행된 메시지가 존재하면 해당 메시지를 전달한다.
만약 불행하게도 발행된 Message가 사라진다면 Publisher은 귀찮게 과거에 작성한 Message를 다시 작성해야하는 일이 발생하게 된다. Message의 안전?을 위해서 Kafka는 발행된 Message의 복사본(Replica)을 만들어 여러 우체국에 분산 저장하고 있다.
Reactor kafka에는 두가지 핵심 인터페이스가 있다.
kafka에 Message를 발행(publish)하는 reactor.kafka.sender.KafkaSener
카프카의 Message를 소비(consume)하는 reactor.kafka.reciver.KafkaReceiver
Publish(KafkaSener)
Kafka로 보내는 메시지는 KafkaSender 클래스를 이용해서 보낸다. Sender는 Thread-Safe하며, 여러 스레드에 공유해 처리량을 끌어 올릴 수 있다. KafkaSender는 kafka로 Message를 전송할 때 사용하는 KafkaProducer와 하나로 연결된다.
fun kafkaSender(): KafkaSender<String, CovidSearchLinkDetail> {
val kafkaProducerConfiguration: HashMap<String, Any> = hashMapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer<CovidSearchLinkDetail>()::class.java
)
val senderOption: SenderOptions<String, CovidSearchLinkDetail> =
SenderOptions.create(kafkaProducerConfiguration)
return KafkaSender.create(senderOption)
}
KafkaProducerConfig: Kafaka Topic(=Message) 를 Publish(=Send) 하는 설정 클래스
Subscribe(KafkaReceiver)
Reactive Kafka Recevier
KafkaReceiver<K, V>, ReceiverOptions<K, V>의 제네릭 타입은 Receiver로 소비할 Consumer Record의 Key, Value 타입이다. 그렇기 때문에 둘의 제네릭 타입은 동일해야 맞다.
[JAVA - Example]
public class KafkaConsumerConfig {
private final KafkaProperties kafkaProperties;
// `topicName`만 Consume하는 KafkaReceiver를 위한 카프카 설정을
// ReceiverOptions에 설정 후, #create(options)를 호출하면
// inbound message를 소비할 수 있는 KafkaReceiver Instance를 리턴한다.
// KafkaReceiver Instance가 생성되는 시점은 즉시가 아닌 Lazy Create로 처리된다.
// 생성되는 시점은 inbound flux가 subscribe를 시작하는 시점이다.
public KafkaReceiver<String, String> kafkaReceiver(String topicName) {
ReceiverOptions<String, String> options =
ReceiverOptions.<String, String>create(
kafkaProperties.buildConsumerProperties(null)
)
.subscription(Set.of(topicName))
.addAssignListener((consumer) -> log.info("onPartitionsAssigned {}", consumer))
.addRevokeListener((consumer) -> log.info("onPartitionsRevoked {}", consumer));
return KafkaReceiver.create(options);
}
}
@Slf4j
@RequiredArgsConstructor
@ReactiveKafkaListener(topic = "test")
public class TradeListener {
private final KafkaConsumerConfig kafkaConsumerConfig;
public void subscribe(String topicName) {
// #receive()를 호출함으로써 KafkaReceiver는 inbound kafka flux를
// Consume할 준비가 완료된다.
kafkaConsumerConfig.kafkaReceiver(topicName).receive()
.subscribe(stringStringReceiverRecord -> {
// subscribe하는 시점에 KafkaRecevier 인스턴스가 생성되고
// inbound Flux Message가 ReceiverRecord 타입으로 넘어온다.
log.info("Received Records => {}", stringStringReceiverRecord);
});
}
}
[Kotlin]
@Component
class KafkaConsumerConfig(val kafkaProperties: KafkaProperties) {
fun kafkaReceiver(topicName: String, groupId: String? = null, partitionNo: Int = 1)
: KafkaReceiver<String, CovidSearchLinkDetail> {
// 1. KafkaConsumer에게 제공할 Properties를 지정한다.
val kafkaConsumerConfig: Map<String, Any> = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG to "test-consumer-group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer<CovidSearchLinkDetail>(
CovidSearchLinkDetail::class.java
),
JsonDeserializer.VALUE_DEFAULT_TYPE to "com.example.demo.covid.CovidSearchLinkDetail",
JsonDeserializer.USE_TYPE_INFO_HEADERS to false,
JsonDeserializer.TRUSTED_PACKAGES to "*",
)
// 2. 위에서 정의한 Properties를 이용해서 receiverOptions 인스턴스 생성한다.
val receiverOptions = ReceiverOptions.create<String, CovidSearchLinkDetail>(kafkaConsumerConfig)
// 3. ReceiverOptions객체를 이용해서 KafkaReceiver 인스턴스를 생성한다.
return KafkaReceiver.create(receiverOptions)
}
}
Apache Kafka: Architecture, Real-Time CDC, and Python Integration
Apache Kafka is a distributed streaming platform that has gained significant popularity for its ability to handle high-throughput, fault-tolerant messaging among applications and systems. At its core, Kafka is designed to provide durable storage and stream processing capabilities, making it an ideal choice for building real-time streaming data pipelines and applications. This article will delve into the architecture of Kafka, its key components, and how to interact with Kafka using Python.
∘ Brokers
∘ Interacting with Kafka using Python
∘ Consuming Messages from Kafka
∘ Purpose of Dead Letter Queues
∘ Caching Strategies with Kafka
∘ 2.Kafka Streams State Stores
∘ 3.Interactive Queries in Kafka Streams
∘ Considerations for Caching with Kafka
· Example: Using Kafka Streams State Store for Caching
· Consuming Messages and Caching Results
∘ Partitioning in Apache Kafka
∘ Partitioning in Distributed File Systems
∘ Challenges and Considerations
∘ CDC Data from Kafka with Python
∘ Consuming CDC Data from Kafka with Python
∘ Using confluent-kafka-python
∘ Notes
https://miro.medium.com/v2/0*z5e9GMyQ9R-RFbJ2
The architecture of Apache Kafka is built around a few core concepts: producers, consumers, brokers, topics, partitions, and the ZooKeeper coordination system. Understanding these components is crucial to leveraging Kafka effectively.
Kafka’s architecture is inherently distributed. This design allows Kafka to be highly available and scalable. You can add more brokers to a Kafka cluster to increase its capacity and fault tolerance. Data is replicated across multiple brokers to prevent data loss in the case of a broker failure.
https://miro.medium.com/v2/0*Y60EovrnihBBsZ7B
https://miro.medium.com/v2/0*xUDOJWtIpo-Ax1Wh
Here are some key points about offsets in Kafka:
In summary, offsets are a fundamental concept in Kafka that enables efficient, ordered, and reliable message processing in distributed systems. They facilitate Kafka’s high-throughput capabilities while supporting consumer scalability and fault-tolerant design.
Python developers can interact with Kafka through the confluent-kafka-python library or the kafka-python library. Both provide comprehensive tools to produce and consume messages from a Kafka cluster.
Installation