package kafka
import (
"context"
"net"
"strconv"
"time"
logger "git.slaventius.ru/test3k/umate/pkg/logger"
"github.com/segmentio/kafka-go"
)
type KafkaWriter struct {
ctx context . Context
writer * kafka . Writer
logger * logger . Logger
first string
topic string
}
func ( s * KafkaWriter ) fetchTopics ( ) ( map [ string ] bool , error ) {
conn , err := kafka . Dial ( "tcp" , s . first )
if err != nil {
return nil , err
}
defer conn . Close ( )
//
partitions , erp := conn . ReadPartitions ( )
if erp != nil {
return nil , erp
}
//
topics := make ( map [ string ] bool )
for _ , p := range partitions {
topics [ p . Topic ] = true
}
return topics , nil
}
func ( s * KafkaWriter ) createTopic ( ) error {
conn , err := kafka . Dial ( "tcp" , s . first )
if err != nil {
return err
}
defer conn . Close ( )
// Определим ведущего реплики раздела
controller , era := conn . Controller ( )
if era != nil {
return era
}
//
controllerConn , eru := kafka . Dial ( "tcp" , net . JoinHostPort ( controller . Host , strconv . Itoa ( controller . Port ) ) )
if eru != nil {
return eru
}
defer controllerConn . Close ( )
//
topicConfigs := [ ] kafka . TopicConfig {
{
Topic : s . topic ,
NumPartitions : 1 ,
ReplicationFactor : 1 ,
} ,
}
return controllerConn . CreateTopics ( topicConfigs ... )
}
func ( s * KafkaWriter ) checkTopic ( ) error {
topics , err := s . fetchTopics ( )
if err != nil {
return err
}
// Если топика нет, то создадим
if _ , ok := topics [ s . topic ] ; ! ok {
era := s . createTopic ( )
if era != nil {
return era
}
s . logger . Printf ( "create topic %q\n" , s . topic )
return era
}
return nil
}
func NewWriter ( ctx context . Context , logger * logger . Logger , topic string , address ... string ) * KafkaWriter {
s := & KafkaWriter {
ctx : ctx ,
writer : & kafka . Writer {
Topic : topic ,
// Balancer: &MyBalancer{},
Balancer : & kafka . LeastBytes { } ,
// Balancer: &kafka.Murmur2Balancer{},
WriteBackoffMax : time . Millisecond * 100 ,
BatchTimeout : time . Millisecond * 100 ,
Addr : kafka . TCP ( address ... ) ,
} ,
logger : logger ,
first : address [ 0 ] ,
topic : topic ,
}
// Проверим и при необходимости создадим топик
era := s . checkTopic ( )
if era != nil {
logger . Fatal ( era )
}
return s
}
func ( s * KafkaWriter ) Close ( ) error {
return s . writer . Close ( )
}
func ( s * KafkaWriter ) WriteMessage ( key [ ] byte , value [ ] byte ) error {
return s . writer . WriteMessages ( s . ctx , kafka . Message {
Key : key ,
Value : value ,
} )
}