DEV Community

Pallat Anchaleechamaikorn
Pallat Anchaleechamaikorn

Posted on

2 1

เขียน Go ต่อ Kafka ตอนที่ 1

เวลาเราจะเขียน Go เพื่อไปทำงานกับ Kafka เราก็ต้องเริ่มจากการเลือก library กันก่อน ซึ่งผมจะลองยกตัวอย่างมาให้ดูสัก 3 ตัวครับ
https://githu

  1. https://github.com/confluentinc/confluent-kafka-go ตัวนี้จะถือว่าเป็น official library ก็ว่าได้ เพราะเจ้าของก็คือ confluent เอง ซึ่งก็คือ kafka ในเวอร์ชั่นเสียเงินนั่นเอง แต่ว่า lib ตัวนี้ไม่ค่อยเป็นที่นิยม เนื่องจากมันทำตัวเป็นแค่ wrapper เป็นทางผ่านไปเรียก librdkafka อีกที ซึ่งเป็น lib ตัวจริงที่เขียนด้วย c ทำให้เราจำเป็นต้องติดตั้ง sdk ที่เขียนด้วย c ตัวนี้ด้วยเสมอ
  2. https://github.com/IBM/sarama ตัวนี้เป็นที่นิยมมาก แต่ต้องดูให้ดีเพราะเจ้าของเดิมคือ Shopify ได้โอนให้ IBM เป็นคนดูแลต่อ แต่ตัว lib ตัวเก่าที่ Shopify ก็ยังมีอยู่ด้วย ให้ดูว่าตัวล่าสุดจะอยู่กับ IBM นะครับ
  3. https://github.com/segmentio/kafka-go ตัวนี้เป็นน้องใหม่ที่เข้ามาเสนอทางเลือกว่าตัวเองเขียนง่ายกว่าใครๆ

แต่ในที่นี้ผมจะขอเลือก sarama มาใช้เนื่องจากเราค่อนข้างใช้กันเยอะ และพอเราใช้กันเยอะ คนที่เข้าทีมมาทีหลังก็อาจจะไม่รู้ว่าทำไปจะต้องไปเหนื่อยหาตัวใหม่ด้วย เราก็ใช้ให้มันเหมือนๆกันไปแหล่ะ

ก็ด้วยความที่เราก็ใช้กันไปแหล่ะนั่นแหล่ะครับ ผมก็เลยเห็นว่า เราอาจจะยังขาดความเข้าใจมันอยู่บ้าง เลยหยิบตัวนี้มาอธิบายกันสักหน่อย

ก่อนจะไปเล่าเรื่อง lib เรามาทำความเข้าใจ Kafka กันคร่าวๆก่อน สิ่งที่เราต้องรู้ก่อนคือคำศัพท์ต่างๆที่เกี่ยวข้องกับ Kafka เช่น

  • Producer: คือตัวที่จะส่ง message เข้าไปใน Kafka broker
  • Broker: เอาง่ายๆเลยก็คือ Kafka server นั่นแหล่ะ คือเป็นตัวรับ และ ส่งต่อ message
  • Consumer: ตัวที่จะรับ message ไปใช้งาน
  • Topic: ก็คือ channel ที่จะใช้ส่ง message หากัน ให้นึกถึงช่องใน youtube ว่าคนทำคอนเทนต์ก็คือ producer เขาเปิดช่องมาช่องหนึ่งก็คือ Topic แล้วเราก็ไปดู คนดูก็คือ consumer
  • Partition: เวลาเราส่งข้อมูลเข้าไปใน topic แล้วคนรับข้อมูลไปใช้ แล้วข้อมูลมีเยอะมาก จะทำให้การดึง message ไปใช้ จะใช้เวลานาน ถ้าอยากจะลดเวลาลง ก็ต้องกระจ่าย message ให้มันแบ่งช่องกันอยู่ เวลา consumer มาดึงไป จะได้ช่วยกันได้หลายๆ consumer เราก็จะใช้เทคนิคในการแบ่ง partition ออกไป อยากเร็วกี่เท่าก็สร้าง partition ตามนั้นเลยเช่น อยากเร็วขึ้น 10 เท่า ก็ทำ 10 partition
  • Replica: ก็คือการทำสำเนาข้อมูลไว้สำรอง เผื่อกรณีที่ broker มีปัญหา จะได้มีสำรองไว้
  • ISR (In-sync replica): คือจำนวนของ replica ที่ active อยู่ในขณะนั้น

เอาคร่าวๆเท่านี้ก่อนนะครับ
ทีนี้เราก็จะมาดู code ตัวอย่างที่ทาง sarama ทำไว้ให้ดูที่หน้า package โดยเราจะไปเริ่มที่ตัวอย่าง SyncProducer กันก่อนเลย

producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
    log.Fatalln(err)
}
defer func() {
    if err := producer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
    log.Printf("FAILED to send message: %s\n", err)
} else {
    log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
Enter fullscreen mode Exit fullscreen mode

ปกติโค้ดตัวอย่างของ sarama เวลาเรา copy ลงมาแปะไว้ในเครื่องเรา มันจะใช้ไม่ได้สักตัวเลยนะครับ เพราะมันเขียนผิดบ้าง ไม่ใส่ sarama. บ้าง เราก็ต้องมานั่งแก้ๆกันก่อนครับ เช่น

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
    log.Fatalln(err)
}
defer func() {
    if err := producer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

msg := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
    log.Printf("FAILED to send message: %s\n", err)
} else {
    log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
Enter fullscreen mode Exit fullscreen mode

พอแก้เสร็จแล้วเราก็มาลองอ่านโค้ดกันดูว่าตัวอย่างมันเขียนอะไรไว้บ้าง เริ่มจาก

sarama.NewSyncProducer ตัวนี้คือการสร้าง producer instant โดยมันต้องการ parameter 2 ตัว ตัวแรกคือ []string ที่เราจะต้องระบุลงไปว่าเรามี broker อยู่กี่ตัว ให้ใส่ลงไปให้หมดเช่นถ้ามี 3 ตัวก็อาจจะใส่

[]string{"localhost:9092","localhost:9093","localhost:9094"}
Enter fullscreen mode Exit fullscreen mode

ส่วน parameter ตัวที่สองคือ config ซึ่งถ้าใส่เป็น nil มันก็จะไปใช้ default config ให้เอง

ทีนี้ตัว producer เมื่อใช้เสร็จก็ต้อง close เราก็เลยต้องทำ defer เอาไว้เลยตามตัวอย่าง จากนั้น

msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}

นี่ก็เป็นการสร้าง message instant ด้วยการระบุ topic ชื่อ my_topic และ value คือตัว message นั่นเอง
กรณีที่เราไม่เคยสร้าง topic ไว้ใน kafka มาก่อน ถ้าเรา produce เข้าไปเลย มันจะสร้าง topic ง่ายๆขึ้นมาให้ โดยจะไม่ได้แบ่ง partition และไม่มี replica ด้วย

สุดท้ายเราก็ส่ง message เข้าไปด้วยบรรทัดนี้

partition, offset, err := producer.SendMessage(msg)

เสร็จแล้วมันจะคืนค่ามาว่า message ที่ส่งเข้าไปนั้นไปลงที่ partition เลขอะไร และ message นั้นอยู่ในลำดับ(offset) ที่เท่าไร และมี error หรือไม่

เดี๋ยวคราวหน้าเราจะมาต่อกันที่ consumer นะครับ

Image of Docusign

Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more