Apache Kafka Nedir ve .Net Core ile Kafka Üzerinden Nasıl Haberleşme Sağlanır

Özdecan GÜRCAN
7 min readJan 24, 2021

--

Apache Kafka

Microservice yapıların da event-driven architecture mimarilerin de message queue yapıları oldukça kullanışlı olmaktadır. Ana uygulama yükünü azaltmakta ve fire and forget yapılarında güzel çözümler sunmaktadır.

Apache Kafka ise bu yapılardan biridir. Open source olarak geliştirilen Apache Kafka distrubuted, scalable ve yüksek performans sunan bir pub/sub message broker teknolojisidir.

Amazon, Netflix, Spotify gibi şirketlerin kullandığı bu teknoloji de amaç yüksek hacimli verileri hızlı bir şekilde depolamak ve işleyebilmektir.

Kafka Mimarisi

Kafka bir veya birden çok sunucu üzerinde bir cluster oluşturarak çalışır. Kafka üzerinde bulunan her bir kayıt key-value ve timestamp bilgileri kullanılarak “topic” adı verilen kategoriler içerisinde saklanır. Kafka içinde kendine ait bir jargon barındırır. Bunları başlıklar halinde inceleyebiliriz.

Message

Kafka mesaj tabanlı bir teknolojidir. İşlemler mesajlar ile yapılır. Bu mesajlar key-value ikilisi ile oluşur. Serialize edilebilen her şeyi message olarak kullanabiliriz.

Topic

Kafka’da mesajlar topic’lere iletilir. Birbiri ile bağlantı olan mesajlar aynı topic içerisinde muhafaza edilir. Örnek olarak siparişler bir topic üzerinde saklanırken, ürün güncellemeleri farklı bir topic üzerinde tutulur.

Partition

Kafka birden fazla node olarak sanal/fiziksel makineler üzerinde çalışabilir. Performans amaçlı olarak topic’ler farklı parçalardan oluşabilir ve makineler üzerinde farklı farklı partition’lara ayrılırlar. Bunu HDD’de bulunan partition’a benzetebiliriz.

Partition

Offset

Bir topic partition’ı içinde bulunan message’lar ulaşma anına göre sıralı olarak saklanır. Bu sıra offset değeri ile belirlenir. Bir consumer’ın hangi mesajda kaldığı yine bu offset değeri ile belirlenir. Herhangi bir olumsuzluk durumunda consumer’ın tekrardan başlaması gerekirse bu offset değeri ile kaldığı yerden çalışmaya devam eder. Bir mesaj consumer tarafından okunduktan sonra topic içerisinden silinmez ve bu sayede farklı bir consumer eski bir offset’den başlayarak mesajları işleyebilir.

Producer

Topic’lere oluşturulan mesajları gönderen uygulamalar producer olarak adlandırılır. Oluşturulan mesajların hangi topic’e iletileceği uygulama tarafından belirlenir.

Consumer

Topic’lere iletilen mesajları okuyan uygulamalara consumer adı verilir. Consumer uygulamaların hangi topiclerden mesaj okuyacağını belirlemek için ilgili topiclere subscribe olurlar. Sadece kendilerine atanmış partitionlarda mesajlara ulaşıp okuyabilirler.

Kafka’yı Docker ile Çalıştırma

Apache Zookeper

Kafka kurulumunu yaparken ihtiyacımız olan bir diğer serviste Zookeeper’dır. Zookeeper distributed uygulamalar geliştirmeye yarayan bir koordinasyon servisidir. Multiple instance içeren distribute sistemleri configure etmeye olanak sağlar.

Yukarıda bahsettiğimiz gibi Kafka distributed bir sistem üzerinde çalışmaktadır. Kafka configuration bilgilerini saklamak için Zookeeper’ı kullanmaktadır. Bu yüzden kafka kurulumu öncesi Zookeeper’ın kurulumu zorunludur.

O zaman Docker üzerinde yapmamız gereken ilk işlem aşağıdaki komutu kullanarak Zookeeper image’ını indirip container’ı ayağa kaldırmaktır.

docker run -d --name zookeeper -p 2181:2181 jplock/zookeeper:latest

Komut çalıştıktan sonra eğer Zookeeper image’ı bulunamazsa latest versiyonu Docker Hub üzerinden pull edilip ardından verdiğimiz config bilgileri ile container oluşacaktır. İşlemler tamamlandıktan sonra aşağıdaki komut ile ayakta olan container’ları görebiliriz.

docker ps
Apache Kafka

Apache Kafka’yı ayağa kaldırmak için aşağıdaki komutu kullanabiliriz. Zookeper IP olarak docker üzerinde çalışan container’ın adresini veriyoruz.

docker run -d --name kafka -p 9092:9092 -e ZOOKEEPER_IP=’hostip' ches/kafka:latest

Topic ve Partition Oluşturma

Docker üzerinde ayağa kaldırdığımız Kafka içerisine girdiğimizde bir takım hazır kodlar ile karşılaşacağız. Yeni bir topic oluşturmak için kafka-topics.sh, dahili consumer için kafka-console-consumer.sh ve producer içinde kafka-console.producer.sh adlı scriptleri kullanacağız.

Kafka Docker

Windows Powershell üzerinde 1 replica ve 1 partition’dan oluşan bir topic yaratmak için aşağıdaki komutu kullanabiliriz.

docker run --rm ches/kafka kafka-topics.sh --create --topic firstTopic --replication-factor 1 --partitions 1 --zookeeper HostIp:2181
Kafka Topic Create

Ardından işlem başarılı ise aşağıdaki komutla Kafka içinde yarattığımız tüm topicleri listeleyebiliriz.

docker run --rm ches/kafka kafka-topics.sh --list --zookeeper HostIp:2181

Producer ve Consumer Kullanımı

Şimdi yapacağımız örnek ise Kafka ile dahili gelen console-producer ile oluşturacağımız mesajları yukarıda oluşturduğumuz topic’e göndermek. Aşağıda bulunan komutu çalıştırdığımızda yazacağımız her satır oluşturduğumuz topic’e iletilecektir.

docker run --rm --interactive ches/kafka kafka-console-producer.sh --topic firstTopic --broker-list hostip:9092

Artık mesajları belirlediğimiz topic’e yolladık. Şimdi bu mesajları consume ederek okuyalım. Yine Kafka içinde gelen console-consumer ile bu işlemi yapabiliriz. Aşağıdaki komutu çalıştırdığımızda ilettiğimiz mesajlar console’a yansımaya başlayacaktır.

docker run --rm ches/kafka kafka-console-consumer.sh --topic firstTopic --from-beginning --bootstrap-server hostip:9092

Komut çalıştıktan sonra dikkat etmemiz gereken şeylerden biri de “--from-beginning” parametresi’dir. Bu parametre ile topic içinde bulunan mesajlar en başından itibaren iletilmeye başlayacak fakat bunu kullanmazsak sadece en yeni mesajı almış olacağız.

Topic Partition ve Consumer Group

Kafka’da mesajları aynı consumer tipinde birden fazla consumer’a dağıtımını sağlamak mümkündür. Bunun için mesajları load-balanced dağıtılması için kullandığımız topicleri n consumer kadar partition’a bölmemiz gerekir. Önceden yukarıda tanımladığımız topic tek bir partition’a sahipti. Şimdi de aşağıdaki komutu çalıştırarak 2 partition’a sahip bir topic oluşturalım.

docker run --rm ches/kafka kafka-topics.sh --create --topic commands-2 --replication-factor 1 --partitions 2 --zookeeper hostip:2181

Aynı tipte farklı consumer’lar aynı topic üzerinden mesajları okurken aynı consumer grup içine dahil edilmelidir. Bunun için aşağıdaki komutu çalıştırmamız gerekecektir.

docker run --rm ches/kafka kafka-console-consumer.sh --topic commands --from-beginning --bootstrap-server hostip:9092 --consumer-property group.id=testApp1

Consumer group sayesinde tüm consumer’lar aynı tip olacak ve mesajları sırası ile alacaktır. Aşağıda bu işlemi yaptıktan sonra console üzerinde consumer’ları takip ettiğimizde mesajlar iki consumer arasında paylaştırılmıştır.

Consumer Group

.Net Core ile Kafka Üzerinden Haberleşme

.Net Core üzerinde gerçekleştireceğimiz senaryo çoğu sistemde kullanılan email gönderimi üzerine olacak. Sistem üzerinden iletilecek mailleri kuyruğa ekleyerek mail yollayan sistemimize yollaması gereken mailleri ileteceğiz.

İlk olarak bir blank solution oluşturup içine iki adet console projesi oluşturalım. Bu projelerden biri EmailProducer olacak diğeri ise EmailConsumer. Biri mailleri kuyruğa yollayacak diğeri ise bunları consume ederek işleme alacak.

Proje Yapısı

Şimdi her iki projeye de nuget üzerinden Confluent.Kafka client kütüphanesini ekleyelim. Ayrıntılı bilgi için aşağıdaki github sayfasına bakabilirsiniz.

Email Producer

Kafka Producer

Projeyi Kafka mantığının oturması için oldukça basit yazmaya çalıştım. Öncelikle static async bir metod oluşturarak Kafka’ya ileteceğimiz mesajları yollayacak olan metodu yazıyoruz. ProducerConfig ile docker üzerinde çalışan Kafka container’ın adresini bildiriyoruz. Ardından ProducerBuilder aracılığı ile bir producer oluşturuyoruz. Ben burada sürekli veri girebilmek için işlemlerimi bir while döngüsüne aldım ve kurduğum senaryo için verileri aldım ve aşağıda oluşturduğum Email sınıfına verilerimi ekleyerek JsonConvert ile verimi serialize ettim.

Email Class

Artık verimi hazırladığıma göre bu veriyi Kafka’ya yollayarak kuyruğa alabilirim. Kodun başında yarattığım producer nesnesine ProduceAsync metodu ile mesajımı hangi topic içine yollayacağımı ardından da bir Message objesi oluşturarak yollayacağım json objesini parametre olarak verdim. Burada ProduceAsync kullanmamın sebebi ise produce request’in sonucunu almak istemem. Eşzamanlı senaryolarda, örneğin web request’lerin handling edilmesi için bu yöntemi kullanmak yararlı olacaktır. Client burada Kafka brokerları ile iletişimi optimize ederek, request’leri uygun şekilde gruplayarak yönetecektir. Son olarak produce işlemi bittikten sonra sonucu ekrana yazdırıyoruz.

Email Producer Console

Sonucu ekrana yazdırdığımız gönderilen verimizi ve bu verinin hangi topic de hangi partition’a ve hangi offset içine yazıldığını gördük.

Email Consumer

Kafka’ya message’ları ilettikten sonra şimdi sıra bunları consume etmeye geldi.

Email Consumer

EmailConsumer metoduna ConsumerConfig ile ayarları yaptık. GroupId ile consumer grubunu tanımladık. Ardından Kafka adresimizi belirledik. AutoOffsetReset property ile de programı başlattığımızda consumption işleminin belirlediğimiz topic’de bulunan en eski mesaj ile başlayacağını bildirdik.

Ardından ConsumerBuilder ile bir consumer nesnesi yarattık. Consumer nesnesine subscribe olması gereken topic ismini bildirdik. Ardından bir while döngüsü ile Consume işleminin sürekli çalışmasını sağladık ve sonucu ekrarna yazdırdık. Artık yazdığımız program sayesinde belirlediğimiz topic’e gelen her mesaj consume edilecek ve işleme alınacaktır.

Email Consumer Console

Projeyi çalıştırdığımız emailTopic içine yazdığımız message’ları consume ederek ekrana yazdırdık. Mesajımız da yer alan emailTopic [[0]] @0 ise bize mesajın topic-partition-offset bilgilerini vermektedir.

Kafka günümüzde real-time streaming uygulamalar için oldukça performanslı ve hızlı bir çözüm sunmaktadır. Hız ve scaleable bir message-broker ihtiyacı duyacak bir uygulamamız varsa Kafka en iyi seçenekler arasında yer alabilir diyebilirim.

Burada elimden geldiğince basit bir şekilde Kafka’nın dünyasını anlatmaya çalıştım. Kafka burada anlattığımdan daha geniş bir doğaya ve özelliklere sahip oldukça popüler bir message-broker sistemidir. İlerleyen zamanlarda bu konu hakkında daha fazla bilgi edinme hedefim var ama şimdilik bu kadar.

Umarım keyifli ve yol gösterici bir makale olmuştur.

--

--