@ -3,39 +3,44 @@ package main
import (
import (
"context"
"context"
"log"
"log"
"net"
"os"
"os"
"os/signal"
"os/signal"
"strconv"
"syscall"
server "test3k/authPostman/internal"
"test3k/authPostman/internal/config"
"test3k/authPostman/internal/config"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go"
)
)
func main ( ) {
func main ( ) {
r := kafka . NewReader ( kafka . ReaderConfig {
config := config . NewConfig ( )
Topic : "registrations" ,
ctx , _ := context . WithCancel ( context . Background ( ) )
Brokers : [ ] string { "localhost:9092" } ,
srv := server . NewServer ( ctx , config )
GroupID : "consumer-group-id" ,
Partition : 0 ,
MinBytes : 10e3 , // 10KB
MaxBytes : 10e6 , // 10MB
} )
defer r . Close ( )
//
//
// r.SetOffset(0)
signalChannel := make ( chan os . Signal , 1 )
signal . Notify ( signalChannel , syscall . SIGINT )
signal . Notify ( signalChannel , syscall . SIGTERM )
defer stop ( signalChannel , srv )
// Запуск сервера
go start ( config , srv )
//
for {
for {
m , err := r . ReadMessage ( context . Background ( ) )
select {
if err != nil {
case <- signalChannel :
break
return
case <- ctx . Done ( ) :
return
}
}
log . Printf ( "message at offset %d: %s = %s\n" , m . Offset , string ( m . Key ) , string ( m . Value ) )
}
}
}
}
// Остановка сервера
// Остановка сервера
func stop ( signalChannel chan os . Signal , srv * server . AuthDB Server ) {
func stop ( signalChannel chan os . Signal , srv * server . AuthPostman Server ) {
defer srv . GracefulStop ( )
defer srv . GracefulStop ( )
defer signal . Stop ( signalChannel )
defer signal . Stop ( signalChannel )
@ -43,9 +48,32 @@ func stop(signalChannel chan os.Signal, srv *server.AuthDBServer) {
}
}
// Запуск сервера
// Запуск сервера
func start ( config * config . Config , srv * server . AuthDBServer ) {
func start ( config * config . Config , srv * server . AuthPostmanServer ) {
// connStr := net.JoinHostPort("", strconv.Itoa(config.App.Port))
connStr := net . JoinHostPort ( config . Kafka . Host , strconv . Itoa ( config . Kafka . Port ) )
//
log . Printf ( "authPostmanServer starting (listening to %s)\n" , connStr )
//
//
log . Printf ( "authPostmanServer starting (%s)\n" , connStr )
r := kafka . NewReader ( kafka . ReaderConfig {
Topic : "registrations" ,
Brokers : [ ] string { connStr } ,
GroupID : "consumer-group-id" ,
Partition : 0 ,
MinBytes : 10e3 , // 10KB
MaxBytes : 10e6 , // 10MB
} )
defer r . Close ( )
// ...
r . SetOffset ( 0 )
for {
m , err := r . ReadMessage ( context . Background ( ) )
if err != nil {
break
}
log . Printf ( "message at offset %d: %s = %s\n" , m . Offset , string ( m . Key ) , string ( m . Value ) )
}
}
}