Merhabalar,
Notion.so üzerinde oldukça ciddi emek harcayarak kendim için tuttuğum notları biraz toparlayıp burada yayınlamanın kamu yararı için oldukça faydalı olacağını düşündüm. Umarım yanılmamışımdır.
Çok iddialı konuşup beklentiyi de yükseltmek istemiyorum ama RabbitMQ için Türkçe olarak en kapsamlı kaynağı okuyor olabilirsiniz.🙂
Eğer Exchange tipleri ile ilgili örneklere açıklama satırları eklediysen uğraştırma bizi bir çatal alıp kaçalım, bir ara inceleriz diyen arkadaşlar için doğrudan erişebilecekleri GitHub linki. ** 🍴**
Eğer notion kullanan arkadaşlar varsa kopyalamaları için ve oradan devam etmeleri için köprüden önceki son çıkış burası.🚗
Bu yazıda, Medium’un tekdüzeliğinden biraz olsun arınmak için fazlaca emoji kullandım umarım Kobra Murat’ın evine benzememiştir. 😅
📌RabbitMQ nedir?
RabbitMQ, Erlang dili ile açık kaynak olarak geliştirilen, popüler ve hızlı bir mesaj-kuyruk yapısıdır. Advanced Message Queuing Protocol (AMQP) protokolünü kullanarak uygulamalar ve sunucular arası veri alışverişini sağlar.
Bu uygulamalar arası iletişim sağlanırken, aynı teknolojileri kullanma zorunluluğu yoktur. Yani mesajı yayınlayan uygulama(Publisher) Python diliyle, mesajı dinleyen uygulama(consumer) ise C# diliyle yazılmış bir .Net teknolojisi olabilir.
💊message broker: mesaj kuyruk sistemlerine verilen genel bir isimdir.
RabbitMQ’ya alternatif diğer message broker’lar;
🔹 Azure Queue Storage
🔹 Azure Service Bus
🔹 Kafka
🔹 MSMQ
📌Neden ve nerelerde RabbitMQ kullanmalıyız?
RabbitMQ ya da diğer message broker sistemlerini kullanmak için birçok neden olabilir, bu nedenlerden en ön plana çıkanı tabii ki kullanıcıyı gereksiz bekleyişlerden kurtarmak ve bu sayede kullanıcı deneyimini arttırmak.
Biraz daha açacak olursak eğer; çok uzun süren bir PDF işlemi ya da kullanıcının sonucunu çok da önemsemediği geri dönüş mailleri gibi senaryolarda message brokerları kullanmak oldukça mantıklı olacaktır.
Özetleyecek olursak aşağıdaki senaryolarda RabbitMQ kullanılabilir;
🔹 Son kullanıcıyı etkilemeyen arka plan işlemlerinde
🔹 Farklı platformlardaki uygulamalar arasında asenkron veri transferi işlemlerinde
🔹 Anlık işlemlerin çok önemli olmadığı işlemlerde (Örneğin; siparişiniz yola çıktı maili)
🔹 Veri tutarlılığının önemli olduğu işlemlerde (Örneğin; birçok pazaryerinden gelen siparişler sonucu stok miktarının tutarlılığı)
🔹 İşlem hacminin büyük olduğu sunucuyu yoran işlemlerde
Örnekleri arttırmak pek tabii mümkün.
📌RabbitMQ nasıl çalışır?
RabbitMQ Publisher ve Consumer mantığıyla çalışır. Yani mesajı üretip kutuya( queue ) atan ve kutudaki mesajları okuyan olmak üzere iki temel yapı üzerine inşa edilmiştir. Bir de senaryomuza göre bu üretilen mesajların hangi kutuya atılması gerektiğinden sorumlu olan exchange yapımız mevcut.
💡Bazı terimleri inceleyecek olursak;
🔹Producer/Publisher(yayıncı): Queue'ya mesajı gönderen yapıya verilen isimdir.
🔹Exchange(yönlendirici*):* Routing, yani kuyruğa iletilen mesajı yönlendiren yapı.
🔹Queue(kuyruk*):* First-in-first-out(FIFO) ilk giren ilk çıkar mantığıyla çalışan kuyruk yapısı.
🔹Consumer(tüketici*):* Queue'yu dinleyerek ilgili mesajları teslim alan yapıya verilen isimdir.
Genel hatlarıyla iş akışı aşağıdaki gibidir;
Bir yayıncı mesajı yayınlar, yönlendirici ilgili mesajı ilgili kuyruğa iletir ve dinleyiciler ilgili kuyruktan dinleme işlemini gerçekleştirir.
💊 Producer/Publisher illa ki Exchange kullanmak zorunda değildir. Doğrudan Queue’ya mesajı iletebilir.
💊 Consumer birden fazla kuyruğu dinleyebildiği gibi bir kuyruğu da birden fazla consumer dinleyebilir.
Olası bir senaryo üzerinden inceleyecek olursak eğer;
1️⃣ Kullanıcı PDF oluşturma isteğini iletir.
2️⃣ Producer ilgili PDF oluşturma isteğini Message Broker’a(RabbitMQ) iletir.
3️⃣ RabbitMQ ilgili isteği, Exchange ile ya da doğrudan kuyruğa ekler.
4️⃣ Consumer kuyruktaki PDF oluşturma isteğini alır ve PDF’i oluşturur.
📌 RabbitMQ’yu Docker Üzerinde Nasıl Kurabiliriz?
RabbitMQ ile ilgili çalışabileceğimiz bir çok farklı ortam yaratmak mümkün, bu link üzerinden bir çok kurulum ortamı için bilgi edinilebilir.
🔹 CloudAMQP kullanarak; https://customer.cloudamqp.com/signup linki üzerinden hesap açarak, maksimum 100 kuyruk ve 28 gün mesaj saklama limitleri dahilinde kullanmak mümkün.
🔹 Docker’a kurulum yaparak; https://hub.docker.com/_/rabbitmq linki üzerinden gerekli talimatları takip ederek.
🔹 İşletim sistemine kurulum yaparak; https://www.rabbitmq.com/install-windows.html üzerinde ilgili talimatları takip ederek kullanmak mümkün.
💡RabbitMQ’nun Docker Üzerinde Ayaklandırılması
Doğrudan aşağıdaki komut ile RabbitMQ’yu ayaklandırabiliriz.
docker run -d --name myRabbitMQ -p 5672:5672 -p 15672:15672 rabbitmq:3.8.14-management
💊İkinci port yönetim paneli için.
💊Eğer management tagını eklemezsek yönetim panelini eklemez. Sadece RabbitMQ’yu kurmuş oluruz.
Her şeyin sorunsuzca tamamlandığından emin olmak istersek eğer; http://localhost:15672/ adresine gidip açılan ekranda kullanıcı adı ve şifre kısma guest yazarak oturum açabiliriz.
Sisteme giriş yaptıktan sonra bizi, kullanıcılarımızı yönetebileceğimiz, kuyruklarımızı, kanallarımızı vs. gözlemleyebileceğimiz basit bir ara yüz karşılıyor. Devam etmeden önce biraz kurcalamanızı öneririm.
📌Exchange Nedir ve Çeşitleri Nelerdir?
En basit tabiriyle yönlendirici diyebiliriz, yani Publisher’dan gelen mesajı ya da mesajları alıp Queue’ya yönlendiren yapı olarak tanımlamak mümkün.
💊Exchange Type ile sadece Publisher ilgilenir. Consumer, Queue ile ilgilenir.
Exchange tiplerine ve örneklerine geçmeden önce temel hatlarıyla kullanacağımız yapı aşağıdaki gibi olacak. Ayrıca tüm örneklerde kullanacağımız tek nuget paketi RabbitMQ.Clientolacak.
İlk oluşturması biraz sıkıcı ve zaman alıcı olsa da ileride hatırlamakta zorlandığımız exchange tip olduğunda bulması ve anlaşılabilir olması benim için önemliydi. Bundan sebep her birini bağımsız Consumer ve Publisher konsol uygulamaları olarak oluşturmak istedim.
Evet çok fazla kod tekrarı var kabul ama burada harika bir yapı kurmaya çalışmıyoruz, RabbitMQ’yu öğrenmeye çalışıyoruz :).
Yukarıdaki görselde anlatmaya çalıştığım gibi klasörleri ve ilgili konsol uygulamalarını oluşturduktan sonra ihtiyacımız olan nuget paketini kurduysak eğer Exchange türlerine geçmeden, öncelikle temel hatlarıyla bir kuyruk okuma-yazma işlemi gerçekleştirelim.
Sonrasındaysa Exchange routing yapılandırmalarını tek tek inceleyerek her biri için basit örnekler yapalım.
💊 RabbitMQ’da Exchange kullanmak zorunda değiliz, doğrudan Queue’ya mesajlarımızı gönderebilir ve Consumer(lar)’ın dinlemesini sağlayabiliriz.
1️⃣Exchange Kullanmadan Doğrudan Queue Kullanım Örneği
Exchange kullanmadan basit bir RabbitMQ örneği yapalım. Daha önceden suda beklettiğimiz WithoutExchange.Publisher uygulamamıza geliyoruz ve içeriğini aşağıdaki gibi kodluyoruz.
static void Main(string[] args) | |
{ | |
//Öncelikle RabbitMQ'ya bağlanıyoruz. | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
//Bağlantı açıyoruz | |
using var connection = factory.CreateConnection(); | |
//Yukarıda oluşturduğumuz bağlantı üzerinden bir kanal oluşturuyoruz. | |
IModel channel = connection.CreateModel(); | |
/* Oluşturduğumuz kanal üzerinden bir kuyruk yaratıyoruz. QueueDeclare'nin parametreleri sırasıyla; | |
* queue -> Kuyruğumuzun adı | |
* durable(dayanıklı) -> Eğer true yaparsak kuyruklarımız fiziksel,false yaparsak hafızada saklanır. | |
* exclusive(özel) -> True sadece bu kanal, false farklı kanallar üzerinden de erişim demek. Kuyruğumuza Consumer uygulaması üzerinden, yani farklı bir kanal üzerinden erişeceğimiz için false yapmalıyız. | |
*autoDelete ->Kuyrukta yer alan veri consumera ulaştığında silinmesi belirtilir. */ | |
channel.QueueDeclare(queue: "logs", durable: true, exclusive: false, autoDelete: false); | |
Enumerable.Range(1, 50).ToList().ForEach(x => | |
{ | |
//Mesajımızı oluşturup, byte array'e çeviriyoruz. | |
string logMessage = $"LogID -> {x}"; | |
var logMessageBody = Encoding.UTF8.GetBytes(logMessage); | |
/*Mesajlarımızı doğrudan kuyruğa ekleyeceğimiz için yani bir exchange routingi kullanmadığımız için string.Empty ile boş gönderiyoruz.Daha sonra kuyruğumuzun adını yazıyoruz.Propertyleriyle ilgilenmeyeceğimiz için null deyip ardından mesajımızı parametre olarak ekliyoruz.*/ | |
channel.BasicPublish(string.Empty, "logs", null, logMessageBody); | |
Console.WriteLine($"Log kuyruğa gönderilmiştir: {logMessage}"); | |
}); | |
Console.ReadKey(); | |
} |
Burada neler yaptık. Öncelikle bağlantımızı oluşturup bu bağlantı üzerinden bir kanal oluşturduk. Hemen akabinde QueueDeclare metodunu kullanarak herhangi bir yönlendirme(exchange) kullanmadan doğrudan kuyruğumuzu oluşturduk.
💡Tabi kuyruğumuzu oluştururken belirlememiz gereken bazı parametreler vardı bunlardan biraz bahsedelim;
🔹Queue: Kuyruğumuzun adı. Herhangi bir exchange kullanamadığımız için bu kuyruk adı önemli. Consumer tarafında kuyruğumuza doğrudan ismiyle erişeceğiz.
🔹Durable(dayanıklı): Kelime anlamı dayanıklı olan bu parametre sayesinde kuyruğumuzun nerede saklanacağını belirliyoruz. Eğer true yaparsak kuyruğumuz fiziksel, false yaparsak hafızada saklanır.
🔹Exclusive(özel): Kelime anlamı özel olan bu parametre ile kuyruğumuza farklı kanallar üzerinden erişilip erişilemeyeceğini belirliyoruz. True sadece bu kanal, false farklı kanallar üzerinden de erişilebilir demek. Kuyruğumuza Consumer uygulaması üzerinden, yani farklı bir kanal üzerinden erişeceğimiz için false yapmalıyız.
🔹AutoDelete: Kuyrukta yer alan veri Consumer’a ulaştığında silinip silinmemesi belirtilir.
İşlenmeye hazır elli adet mesajımızın oluştuğunu ara yüzümüzden kontrol edelim.
Şimdi gelelim bu oluşturduğumuz mesajların dinleneceği Consumer tarafına. Yapı yine oldukça basit. Öncesinde WithoutExchange.Consumer ’e kodlarımızı yazalım sonrasında da üzerine konuşalım.
static void Main(string[] args) | |
{ | |
//Öncelikle RabbitMQ'ya bağlanıyoruz. | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
//Bağlantı açıyoruz | |
using var connection = factory.CreateConnection(); | |
//Yukarıda oluşturduğumuz bağlantı üzerinden bir kanal oluşturuyoruz. | |
IModel channel = connection.CreateModel(); | |
/*Eğer Cunsumer'ı ayağa kaldırdığımızda aşağıdaki kuyruk gibi bir kuyruğumuz | |
yoksa hata alırız. Publisher'ın aynı isimde kuyruğu oluşturduğuna eminsek | |
yorum satırı yapabiliriz. Diğer bir önemli not: Parametreleri | |
değiştirmemeliyiz.Yani Publisher tarafında aynı isimde bir kuyruk varsa ve | |
burada da her ihtimale karşı hata alamamak adına aynı isimde kuyruk | |
oluşturuyorsak parametreleri aynı olmalı aksi halde yine hata alırız. */ | |
channel.QueueDeclare(queue: "logs", durable: true, exclusive: false, autoDelete: false); | |
/*Yukarıdaki kodlar zaten publisher kısmında yazdığımız kodlarla birebir aynı. | |
Standart bağlantı-kanal-kuyruk açma kodları. Şimdi consumer ile ilgili | |
kodlarımızı yazıyoruz.*/ | |
/*prefetchSize -> Mesaj boyutunun bizim için önemli olmadığını 0 vererek | |
ifade ediyoruz. | |
prefetchCount-> Dağıtım adetini ifade eder. | |
global-> false -> prefectCount kadar gönder örneğin 5'er 5'er | |
true -> prefectCount'ı paylaştır örneğin 6 mesajımız ve 2 | |
consumer'ımız olsun 3'er 3'er ver demek ya da örneğin 6 | |
consumer varsa 1-1 ver demek*/ | |
channel.BasicQos(prefetchSize:0, prefetchCount:1, global:false); | |
//İlgili kanal üzerinden mesajlarımızı tüketecek consumer'ı oluşturuyoruz. | |
var consumer = new EventingBasicConsumer(channel); | |
/*Kanal üzerinden consumer'ın hangi kuyruğu dinleyeceğini tanımlıyoruz. | |
autoAck -> Kuyruğun silinip silinmeme kararı, eğer true dersek -> Mesajın | |
doğru işlenip işlenmediğine bakılmaksızın sil demek Eğer false dersek -> | |
Sen silme ben haber vereceğim sileceğin zaman*/ | |
channel.BasicConsume(queue:"logs", autoAck:false, consumer:consumer); | |
consumer.Received += (object sender, BasicDeliverEventArgs e) => | |
{ | |
var message = Encoding.UTF8.GetString(e.Body.ToArray()); | |
Thread.Sleep(1500); | |
Console.WriteLine("Gelen Mesaj: " + message); | |
/*Mesajların doğru bir şekilde işlendiğini ve kuyruktan silineceğini | |
bildiriyoruz.multiple -> işlenmiş ama rabbitmq'dan silinmemiş | |
mesajları da silsin istersek true demeliyiz.Biz sadece kendi | |
mesajımızla ilgileneceğimiz için false diyoruz.*/ | |
channel.BasicAck(deliveryTag:e.DeliveryTag, multiple:false); | |
}; | |
Console.ReadKey(); | |
} |
Yapılan tüm işlemleri yorum satırı olarak yazdım fakat yine de Consumer tarafında neler yaptık üstünden geçelim.
Öncelikle standart Publisher tarafında yaptığımız gibi, RabbitMQ’ya bağlandık. Bağlantımızı açtıktan sonra bu bağlantı üzerinden bir kanal oluşturduk.
Daha sonra bu kanal üzerinden her ihtimale karşı Publisher tarafında oluşturduğumuz kuyruğu, birebir aynı özellikleriyle tekrar oluşturduk. Tabi burası tercih meselesi eğer publisher tarafında kanalımızın oluştuğuna yüzde yüz eminsek Consumer tarafında tekrar oluşturmaya gerek yok.
Hemen akabinde de oluşturduğumuz kanal ile EventingBasicConsumer nesnesinden faydalanarak Consumer'ımızı yarattık.
Daha sonra BasicConsume metodunu kullanarak Consumer'ın hangi kuyruğu dinleyeceğini belirttik. Burada autoAck önemli.
🔹AutoAck (Auto Acknowledgement) : Mesajın Consumer’a ulaştıktan sonra onaylanıp onaylanmama, haliyle silinip silinmeme durumunu belirlediğimiz parametre. Özetle;
Eğer true dersek -> Mesajın doğru işlenip işlenmediğine bakılmaksızın sil.
Eğer false dersek -> Sen silme ben haber vereceğim sileceğin zaman
Consumer’ı çalıştırdığımızda tüm logs mesajlarımızın işlendiğini görebiliriz.
2️⃣Fanout Exchange Tanım ve Örneği
En sade exchange tipimizdir. Publisher tarafından gönderilen mesajları alıp route key tanımlaması gibi ayrımlara ihtiyaç duymadan, Fanout Exchange ’e bind olan bütün kuyruklara aynı ve eşit bir şekilde iletilmesini sağlayan yapıdır.
Consumer‘ın tek yapması gereken kuyruktaki mesajları okuyup işlemektir.
Bu yapıyı radyo yayını yapan bir frekansa ya da Youtube üzerinden yayın yapan bir Youtuber’a benzetmek mümkün. Tüm dinleyiciler ilgili kanala eriştiğinde aynı mesajları alır.
Az çok kafamızda bir şeyler oluştuysa eğer kodlarımızı yazmaya başlayabiliriz.
Bir önceki örneğimizde yaptığımız işlemlere oldukça benzer bir yapı olacak, bundan sebep bir önceki kodlarımızın aynısını alıp daha önceden oluşturduğumuz FanoutExchange_Publisher'a yapıştırıyorum.
static void Main(string[] args) | |
{ | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
using var connection = factory.CreateConnection(); | |
IModel channel = connection.CreateModel(); | |
/*Artık bir QueueDeclare'a gerek kalmadı. Biz yayınımızı yapalım, dinlemek | |
isteyen Consumer kuyruğunu oluşturup dinlesin.*/ | |
//channel.QueueDeclare(queue: "logs", durable: true, exclusive: false, autoDelete: false); | |
Enumerable.Range(1, 20).ToList().ForEach(x => | |
{ | |
string logMessage = $"LogID -> {x}"; | |
var logMessageBody = Encoding.UTF8.GetBytes(logMessage); | |
/* Fanout tipli exchange'lerimiz mesajlarımız herhangi bir routing key'imiz | |
olmadığı için ikinci parametremizi boş bırakıyoruz (string.Empty).Bir önceki | |
örneğimizde ilk parametre boş ikinci parametre yani queue logs'tu */ | |
channel.BasicPublish("logs-fanout", string.Empty, null, logMessageBody); | |
Console.WriteLine($"Log kuyruğa gönderilmiştir: {logMessage}"); | |
}); | |
Console.ReadKey(); | |
} |
Yukarıdaki kodları biraz açacak olursak; bir önceki WithoutExchange.Publisher'den çok farklı değil.
Önceden mesajlarımızı herhangi bir Exchange kullanmayıp doğrudan Queue’ya gönderdiğimiz için channel.BasicPublish() metodunda Exchange tipimizi boş geçip, doğrudan kuyruğumuzun adını yazıyorduk. Şimdi ise tam tersini yapıyoruz.
Artık QueueDeclare metodu oluşturmamıza gerek kalmadı. Biz sadece Exchange'imizi tanımlıyoruz(logs-fanout). İsteyen Consumer kuyruğu oluşturup dinlemeye başlayabiliyor.
💊Fanout kullanınca illa Consumer tarafında oluşturulacak diye bir şey söz konusu değil, senaryomuza göre burada da oluşturmak pek ala mümkün.
Eğer her şey tamamsa uygulamamızı çalıştırıp RabbitMQ panelini açalım. Gördüğünüz üzere biz tip olarak Fanout kullanarak yayın yapmaya başladık. İsteyen Consumer Queue'sunu oluşturup, dinleyebilir ve işlemlerini gerçekleştirebilir.
Yani Consumer’lar kuyruk oluşturup bizim kanalımıza abone olurlarsa, biz onların posta kutularına mektuplarımızı atabiliriz.
Gördüğünüz gibi henüz bir bind olan bir consumer yok. Hadi şimdi FanoutExchange_Publisher ’a gidip consumer tarafını kodlayalım.
static void Main(string[] args) | |
{ | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
using var connection = factory.CreateConnection(); | |
IModel channel = connection.CreateModel(); | |
/*RabbitMQ'nun bizim için rastgele bir isimde geçici kuyruk oluşturmasını | |
sağlıyoruz.Bu bir zorunluluk değil, bu Consumer'ı birden çok kez | |
çalıştırıp aynı mesajları aldığını göstermek için oluşturdum.*/ | |
#region Eğer kalıcı bir kuyruk oluşturmak isteseydik | |
//var randomQueueName = "log-database-save"; //channel.QueueDeclare().QueueName; | |
//channel.QueueDeclare(randomQueueName, true, false, true, null); | |
#endregion | |
var randomQueueName = channel.QueueDeclare().QueueName; | |
//Yukarıda tanımladığımız kuyruğu exchange'imizle eşleştiriyoruz. | |
channel.QueueBind(queue: randomQueueName, exchange: "logs-fanout", routingKey: string.Empty, arguments: null); | |
channel.BasicQos(prefetchSize:0, prefetchCount:1, global:false); | |
var consumer = new EventingBasicConsumer(channel); | |
//Yukarıda oluşturulan randomQueueName'i burada kullanıyoruz. | |
channel.BasicConsume(queue:randomQueueName,autoAck:false,consumer:consumer); | |
consumer.Received += (object sender, BasicDeliverEventArgs e) => | |
{ | |
var message = Encoding.UTF8.GetString(e.Body.ToArray()); | |
Thread.Sleep(1500); | |
Console.WriteLine("Gelen Mesaj: " + message); | |
channel.BasicAck(deliveryTag:e.DeliveryTag, multiple:false); | |
}; | |
Console.ReadKey(); | |
} |
İlk Consumer örneğimizde yaptığımız işlemlerin çoğu burada da aynıydı bundan sebep aynı kısımlara yorum satırı eklemeyip değişiklik olan kısımlara açıklama satırları ekledim.
Bir önceki örneğimizden tek farkı oluşturduğumuz kuyruk ile Exchange’imizi Bind ediyoruz yani bağlıyoruz.
Şimdi kodladığımız Consumer’ın çalışıp çalışmadığını ve birden fazla Consumer’a aynı mesajın gidip gitmediğini doğrulayalım.
💊Bunun için iki adet terminal açabileceğiniz gibi yan yana hizalı görmek için VS Code’un terminali de kullanabilirsiniz. Tek yapmamız gereken terminalde Consumer’ın yoluna gidip dotnet run komutunu çalıştırmak
Burada dikkat edilmesi gereken husus önce Consumer’ın çalıştırılmasıdır. Eğer önce Publisher’ı çalıştırırsak mesajları iletilecek bir Queue olmadığı için boşa/boşluğa gider ve Consumer(lar)’ı çalıştırdığımızda okuyacak herhangi bir mesaj bulamaz.
Mesajlarımız okunurken biz o sıra RabbitMQ’nun paneline gidip, gerçekten de rastgele isimlerle kuyruklarımızın oluşup oluşmadığını ve Fanout Exchange’imize Bind olup olmadıklarını doğrulayabiliriz.
Terminal üzerinde çalışan Consumer’larımızı durduğumuz da kuyrukların ve bind’ların otomatik silindiğini görebilirsiniz.
3️⃣Direct Exchange Tanım ve Örneği
Fanout’tan farklı olarak şimdi mesajlarımıza bir yol haritası çizeceğiz yani routing key kullanmaya başlayacağız.
Direct Exchange'te işleyiş şu şekilde; Publisher tarafında mesajımız gönderilirken içerisinde bir routing_key barındırıyor Direct Exchange bu key'e bakıyor ve kuyruğumuzla eşleştiriyor. Consumer yine aynı key üzerinden gidip ilgili kuyruk ile eşleşip mesajları okuyabiliyor.
Örneğimizi şöyle yapalım, Error , Warning , Info tipinde üç adet hata türümüz olsun. Gerçek hayatta API ya da farklı kanallardan gelir bu hatalar ama biz rastgele üretelim. Yine gerçek hatta belki Error tipinde olanlar bir yere yazılıp, Info ve Warning tipinde olanlar bir dashboard'tan izlenebilir. Biz istediğimiz türe göre konsola yazdıralım.
Bir önceki Fanout örneğinde kuyruk deklare etme işini Consumer tarafına bırakmıştık. Burada ise Publisher tarafında oluşturup, yine bind etme işlemini de burada gerçekleştirelim.
Yine daha önceden oluşturduğumuz DirectExchange.Publisher'a geçip bir önceki örneğimiz olan FanoutExchange.Publisher'ı birebir kopyalıyoruz ve aşağıdaki gibi revize ediyoruz.
static void Main(string[] args) | |
{ | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
using var connection = factory.CreateConnection(); | |
IModel channel = connection.CreateModel(); | |
//Exchange adımızı ve tipimizi aşağıdaki gibi düzenliyoruz. | |
channel.ExchangeDeclare(exchange: "logs-direct", type: ExchangeType.Direct, durable: true); | |
/*Bir önceki Fanout örneğinde kuyruk deklare etme işini Consumer tarafına | |
bırakmıştık.Burada ise Publisher tarafında oluşturup, yine bind etme | |
işlemini de burada gerçekleştirdik. En aşağıda -LogTypes- adında bir | |
enum tanımladık ve içerisine 3 tip hata ekledik. Bu 3 hata arasında dönüp | |
3 adet kuyruk ve route key oluşturup bind işlemini gerçekleştiriyoruz.*/ | |
Enum.GetNames(typeof(LogTypes)).ToList().ForEach(x => | |
{ | |
var routeKey = $"route-{x}"; | |
var queueName = $"direct-queue-{x}"; | |
//Kuyruğumuzu oluşturuyoruz. | |
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false); | |
//Oluşturduğumuz kuyruk ile exchange'imizi routeKey tanımlaması yaparak bind ediyoruz. | |
channel.QueueBind(queue: queueName, exchange: "logs-direct", routeKey, null); | |
}); | |
Enumerable.Range(1, 20).ToList().ForEach(x => | |
{ | |
/*Senaryo gereği rastgele tiplerden ve mesajlardan oluşan toplam | |
20 adet mesaj üretiyoruz.*/ | |
LogTypes logType = (LogTypes)new Random().Next(1, 4); | |
string logMessage = $"Log Type -> {logType}"; | |
var routeKey = $"route-{logType}"; | |
//Mesajımızı oluşturup, byte array'e çeviriyoruz. | |
var logMessageBody = Encoding.UTF8.GetBytes(logMessage); | |
/*İlk iki örneğimizde RoutingKey'i boş geçiyorduk artık yukarıda | |
oluşan routeKey'i kullanıyoruz*/ | |
channel.BasicPublish("logs-direct", routeKey, null, logMessageBody); | |
Console.WriteLine($"Log kuyruğa gönderilmiştir: {logMessage}"); | |
}); | |
Console.ReadKey(); | |
} | |
//Senaryo gereği oluşturduğumuz enum | |
enum LogTypes { Error = 1, Warning = 2, Info = 3 } |
Gerekli bilgileri yorum satırı olarak ekledim. Kabaca bahsedecek olursam eğer gerçek hayat senaryosuna benzetmek için rastgele üç adet hata tipi ile oluşan mesajlar ve route key’ler ürettik. Bu sayede Consumer tarafında route key yardımı ile isteyen istediği log türünü dinleyebilecek.
Uygulamamızı çalıştırdığımızda aşağıdaki gibi bir konsol görünümü oluşmalı.
RabbitMQ panelinde ise şu şekilde bir görünüm oluşmuş olmalı; üç adet Queue, bir adet log-direct adında ve direct tipinde exchange. Bu oluşan Exchange’imize tıkladığımızda ise oluşan üç kuyruğun bind edilmiş olması gerekir.
Şimdi Consumer tarafını oluşturalım. Yine FanoutExchange.Consumer ’i DirectExchange.Consumer ’a kopyalayıp aşağıdaki gibi revize ediyoruz.
static void Main(string[] args) | |
{ | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
using var connection = factory.CreateConnection(); | |
IModel channel = connection.CreateModel(); | |
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); | |
var consumer = new EventingBasicConsumer(channel); | |
//Kullanıcının dinlemek istediği tipi belirlemesini sağlıyoruz. | |
Console.WriteLine("Dinlemek istediğiniz log tipini(Error,Warning,Info) belirtiniz:"); | |
var selectedType = Console.ReadLine(); | |
var queueName = $"direct-queue-{selectedType}"; | |
//Yukarıda oluşturulan queueName'i burada kullanıyoruz. | |
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); | |
consumer.Received += (object sender, BasicDeliverEventArgs e) => | |
{ | |
var message = Encoding.UTF8.GetString(e.Body.ToArray()); | |
Thread.Sleep(1000); | |
Console.WriteLine("Gelen Mesaj: " + message); | |
channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); | |
}; | |
Console.ReadKey(); | |
} |
Yapılan işlem oldukça basit. Her şey zaten Publisher tarafında oluştuğu için, Consumer tarafında tek yapmamız gereken dinlemek istediğimiz kuyruğun adını belirlemek. Fanout’tan farklı olarak ne yapmış olduk mesajlarımızı route key’ler ile ayırmış olduk. Bu sayede isteyen istediğini kanalı/kuyruğu dinleyebiliyor.
4️⃣Topic Exchange Tanım ve Örneği
Topic Exchange çok daha detaylı routing yapmak istediğimiz zaman kullanılan tiptir. Direct Exchange'ten tek farkı routing key tanımlarken wildcard'lar ile beraber belirli pattern'ler kullanabiliyor olmamız.
💊 Wildcard: Türkçedeki karşılığı Özel/Yıldızlı karakterler diye bilinir ve yer tutucular(placeholder) olarak kullanılır.
Route Key’i Publisher tarafında oluştururken aralara sadece nokta koyarak ayırırız → a.b.c.d
Route Key’i Consumer tarafında çağırırken pattern içerisinde bizim için fark etmeyen bir alan ya da alanlar varsa * (yıldız) kullanarak geçebileceğimiz gibi sadece başı ve sonuyla ilgileneceksek eğer # işareti kullanarak ilgilenmeyeceğimizi belirtebiliriz.
🔹 a.*.*.d → route key’in başında a ve sonunda d olanları dinlemek için
🔹 *.*.c.* → route key’in üçüncü sırasında c olanları dinlemek için.
🔹 a.# → route key’in başında a olan ve gerisiyle ilgilenmediğim kuyrukları dinlemek için
🔹 #.d → yukarıdakinin tam tersi başıyla ortasıyla ilgilenmeyip sonuyla ilgilendiğimiz senaryolar
💡Biraz somutlaştırılmış örnek verecek olursa eğer;
Publisher tarafında dünyadaki anlık doğum oranlarının raporlarının excel ve pdf formatında aşağıdaki gibi bir routing key pattern’i kullanılarak yayınlandığını varsayalım;
**avrupa.fransa.paris.pdf
avrupa.fransa.paris.excel
avrupa.almanya.berlin.pdf
avrupa.almanya.munih.pdf
asya.cin.pekin.pdf
asya.cin.pekin.excel**
Consumer tarafında çağırırken;
🔹 #.pdf → başında ve ortasında ne olduğu önemli değil tüm şehirlerin pdf’leri getir.
🔹 *.*.*.pdf → yukarıdaki ile aynı işlemi yapar.
🔹 avrupa.# → ortasında ve sonunda ne olduğu beni ilgilendirmez, Avrupa’nın tüm şehirlerinin hem pdf hem de excel’lerini getir.
🔹 *.almanya.*.pdf → Almanya’nın tüm şehirlerinin doğum raporlarının pdf halini getir.
🔹 asya.*.*.excel → Asya kıtasındaki tüm ülkelere ait tüm şehirlerin excel raporlarını getir.
Şimdi kodlama tarafına geçelim. Bir önceki örneğimizin kopyasını alıp üzerinde düzenleme yaparak gideceğim. Bir önceki örneğimizden farklı olarak Queue oluşturma işlemini bu sefer Consumer tarafına bırakacağız.
static void Main(string[] args) | |
{ | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
using var connection = factory.CreateConnection(); | |
IModel channel = connection.CreateModel(); | |
//Exchange adımızı ve tipimizi aşağıdaki gibi düzenliyoruz. | |
channel.ExchangeDeclare(exchange: "logs-topic", type: ExchangeType.Topic, durable: true); | |
Enumerable.Range(1, 20).ToList().ForEach(x => | |
{ | |
/*Topic senaryo gereği 3 çeşit rastgele tiplerden ve mesajlardan oluşan | |
toplam 20 adet mesaj üretiyoruz.*/ | |
Random rnm = new Random(); | |
LogTypes log1 = (LogTypes)rnm.Next(1, 4); | |
LogTypes log2 = (LogTypes)rnm.Next(1, 4); | |
LogTypes log3 = (LogTypes)rnm.Next(1, 4); | |
//Mesajımızı ve RouteKey'imizi Topic senaryosuna göre güncelliyoruz. | |
var routeKey = $"{log1}.{log2}.{log3}"; | |
string logMessage = $"Log Types -> {log1}-{log2}-{log3}"; | |
var logMessageBody = Encoding.UTF8.GetBytes(logMessage); | |
/*Route Key'i logs-topic olarak güncelliyoruz. */ | |
channel.BasicPublish("logs-topic", routeKey, null, logMessageBody); | |
Console.WriteLine($"Log kuyruğa gönderilmiştir: {logMessage}"); | |
}); | |
Console.ReadKey(); | |
} | |
//Senaryo gereği oluşturduğumuz enum | |
enum LogTypes { Error = 1, Warning = 2, Info = 3 } |
Yine her zamanki gibi tüm detayları açıklama satırları olarak ekledim. Bizim için aşağıdaki gibi rastgele üçlü serilerden oluşan hata tipleri oluştursun Publisher tarafı.
Şimdi Consumer tarafında örneğin; ortasında Info olanları ya da başında Warning olan ortasında ve sonunda ne olduğunun önemli olmadığı gibi örekler belirleyerek hata tiplerini ekrana yazdıralım.
Yine bir önceki örneğimizi kopyalayıp aşağıdaki gibi revize ettim. Değiştirdiğim yerlerin başına mutlaka açıklama satırı ekledim.
static void Main(string[] args) | |
{ | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
using var connection = factory.CreateConnection(); | |
IModel channel = connection.CreateModel(); | |
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); | |
var consumer = new EventingBasicConsumer(channel); | |
Console.WriteLine("Route Key'in ortasında Error olan kuyruklar dinleniyor."); | |
//Random isimli bir kuyruk tanımlıyoruz. | |
var queueName = channel.QueueDeclare().QueueName; | |
//Başı ve sonu önemli değil ortasında error yazanlar gelsin istiyoruz. | |
var routeKey = "*.Error.*"; | |
//var routeKey = "Info.*.Error"; Başı Info sonu Error olanlar. | |
//var routeKey = "#.Info"; Başın ne olduğu önemli değil sonu Info olanlar. | |
//Bind işlemini gerçekleştiriyoruz. | |
channel.QueueBind(queue: queueName, exchange: "logs-topic", routingKey: routeKey); | |
//Yukarıda oluşturulan queueName'i burada kullanıyoruz. | |
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); | |
consumer.Received += (object sender, BasicDeliverEventArgs e) => | |
{ | |
var message = Encoding.UTF8.GetString(e.Body.ToArray()); | |
Thread.Sleep(1000); | |
Console.WriteLine("Gelen Mesaj: " + message); | |
channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); | |
}; | |
Console.ReadKey(); | |
} |
Şimdi kuyruk mekanizmamızın çalıştığını doğrulayalım. Burada unutulmaması gerekir ki Queue oluşturma işlemi Consumer tarafında bu yüzden önce Consumer tarafını ayaklandırıp ardından Publisher’ı çalıştırmak gerekir. Önce Publisher çalıştırılırsa mesajlar boşa gider.
💣Yukarıda da görüleceği üzere önce Consumer tarafını ardından da Publisher tarafını çalıştırıyorum.
Consumer tarafında route key’lerin başıyla ve sonuyla ilgilenmeyip sadece ortasında Error olanlarla ilgileneceğimi belirttiğim için sadece ortasında Error olanlar listeleniyor.
Consumer’da rastgele isimle oluşturduğumuz Queue Name ve bind işlemi aşağıdaki gibi RabbitMQ panelinde gözlemlenebilir.
5️⃣Header Exchange Tanım ve Örneği
Header Exchange diğerlerinden biraz farklı, diğerleri genelde birbirine benziyordu. Route key'den bahsettik ve ufak tefek detaylandırmalar yaparak ilerledik. Bu sefer header kavramıyla tanışacağız.
💊Header Exchange en az kullanılan exchange tipidir.
Header Exchange tipinde eşleştirmelerimiz routing key ile değil, Publisher tarafından mesajla birlikte gönderilen header ile olacak.
Header’da gönderilen değerin x-match key’ine verilen all veya any değerine göre eşleşenler kuyruğa iletilecek.
🔹 any→ değerlerden herhangi birinin eşleşmesi şartı
🔹 all → tüm değerlerin eşleşmesi şartı
Biraz havada kalmış olabilir yine küçük bir örnek ile kafamızı biraz daha berraklaştıralım. Yine her zamanki gibi daha önce oluşturduğumuz HeaderExchange_Publisher'a geliyorum ve içeriğimi aşağıdaki gibi düzenliyorum.
static void Main(string[] args) | |
{ | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
using var connection = factory.CreateConnection(); | |
IModel channel = connection.CreateModel(); | |
//Exchange adımızı ve tipimizi aşağıdaki gibi düzenliyoruz. | |
channel.ExchangeDeclare(exchange: "logs-header", type: ExchangeType.Headers, durable: true); | |
/*Mesajımızı iletirken headerda göndermek üzere bir key-value obje oluşturalım.*/ | |
Dictionary<string, object> header = new Dictionary<string, object>(); | |
header.Add("logLevel", "Error"); | |
header.Add("reportFormat", "pdf"); | |
//Oluşturulan header'ı, aşağıdaki gibi Headers'a eşitleyelim | |
var properties = channel.CreateBasicProperties(); | |
properties.Headers = header; | |
//Örnek bir rapor oluştuğunu varsayalım. | |
var message = Encoding.UTF8.GetBytes("----Kritik_Hata_Raporu.pdf----"); | |
/*Exchange tipimizi belirtiyoruz, route key kullanamdığımız için boş | |
geçiyoruz. Mesajımızın header'ını doldurduğumuz için yukarıda | |
tanımladığımız propertiyi ve son olarak da mesajımızı ekliyoruz. */ | |
channel.BasicPublish(exchange:"logs-header",routingKey:string.Empty, basicProperties:properties, body : message); | |
Console.WriteLine("Kritik_Hata_Raporu.pdf RabbitMQ'ya gönderilmiştir."); | |
Console.ReadKey(); | |
} |
Açıklama satırlarında neler yaptığımızı aktarmaya çalıştım.
Kabaca özetleyecek olursak artık mesajımızı routing-key ile değil header ile gönderdik. Mesajımıza header ekleyebilmek için de mesajın Property'lerinden Headers'a kendi oluşturduğumuz header'ı atadık.
Şimdi Consumer tarafında kuyruğumuzu oluşturalım ve Headers'tan
x-match yardımı ile dinleme işlemini gerçekleştirelim. HeaderExchange_Consumer'a geliyorum ve içeriğimi aşağıdaki gibi düzenliyorum.
static void Main(string[] args) | |
{ | |
var factory = new ConnectionFactory(); | |
factory.Uri = new Uri("amqp://guest:guest@localhost:5672"); | |
using var connection = factory.CreateConnection(); | |
IModel channel = connection.CreateModel(); | |
/*Yeniden declare etmek zorunda değiliz. Önce Consumer'ı ayağa kaldırırsak | |
hata almamak için ekledik */ | |
channel.ExchangeDeclare(exchange: "logs-header", type: ExchangeType.Headers, durable: true); | |
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); | |
var consumer = new EventingBasicConsumer(channel); | |
//Random isimli bir kuyruk tanımlıyoruz. | |
var queueName = channel.QueueDeclare().QueueName; | |
/* x-match zorunludur. | |
any→ değerlerden herhangi birinin eşleşmesi şartı | |
all → tüm değerlerin eşleşmesi şartı */ | |
Dictionary<string, object> header = new Dictionary<string, object>(); | |
header.Add("logLevel", "Error"); //Biz belirliyoruz | |
header.Add("reportFormat", "pdf"); //Biz belirliyoruz. | |
header.Add("x-match","all"); //Biz BELİRLEMİYORUZ. (any ya da all olarak karar veriyoruz.) | |
//header'ımızı da ekleyip bind işlemini gerçekleştiriyoruz. | |
channel.QueueBind(queue: queueName, exchange: "logs-header", routingKey: string.Empty, arguments:header); | |
//Yukarıda oluşturulan queueName'i burada kullanıyoruz. | |
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); | |
Console.WriteLine("Kuyruk dinleniyor..."); | |
consumer.Received += (object sender, BasicDeliverEventArgs e) => | |
{ | |
var message = Encoding.UTF8.GetString(e.Body.ToArray()); | |
Thread.Sleep(1000); | |
Console.WriteLine("Mail olarak gönderilen rapor : " + message); | |
channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); | |
}; | |
Console.ReadKey(); | |
} |
Temel işlemler yine sabit kaldı. Yeni eklenen ya da değişen kısımlara açıklamaları ekledim.
Ne yaptık diyecek olursak eğer; İşimizi sağlama almak adına Publisher tarafında oluşturduğumuz Headers tipindeki Exchange'imizi Consumer tarafında yani burada tekrar tanımladım. Tanımlamasak da olur muydu? Olurdu ama önce Consumer'ı çalıştırsak patlardı.💣
Daha sonrasında beklediğimiz header’ı ayarladık ve Publisher’dakinden farklı olarak x-match key'ini kullanıp value olarak all diyerek birebir eşleşme olanları getirdik. İstersek any diyerek headers da ki herhangi bir değerle eşleşenleri de getirtebilirdik tercih ve kurgu meselesi.
Son olarak da QueueBind metodunun routingKey parametresini boş bırakıp argümanlara header'ımızı ekledik.
Kurgumuzun çalışıp çalışmadığını test edelim. Önce sol tarafta Consumer’ı çalıştırıp dinlemeye başlıyorum. Sonrasında da sağ tarafta Publisher’ı çalıştırıyorum.
RabbitMQ panelinin Exchanges sekmesinde Routing key'imizin boş olduğunu ve Arguments kısmının da header'ımızın dolu olduğunu gözlemleyebiliriz.
Son olarak hem genel bir tekrar hem de farklı bir örnek görmek açısından aşağıdaki beş dakikalık videoyu kesinlikle izlemenizi öneririm.
Bu yazımda aktaracaklarım bu kadar. Fazlasıyla uzun bir yazı oldu farkındayım. Ayrı ayrı seri halinde yapılabilirdi elbette ama tek bir yerde topluca erişilebilecek sağlam bir kaynak(kendimce) olsun istedim.
Umarım ufak da olsa bir faydam dokunmuştur.
📕Bu yazı serisindeki örneklerde Faith çakıroğlu’nun Udemy eğitiminden faydalanılmıştır.
📝Yazım hatalarındaki desteğinden dolayı Birol Gökkaya özel teşekkürler.
Top comments (0)