From c9bc445b67e8d5dfe15bc0ee7dfba8185b892f0e Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 12:38:34 +0300 Subject: [PATCH] * --- cmd/main.go | 66 ++++++++++++++++++++++++++++++++------------- internal/postman.go | 18 +++++++++++++ 2 files changed, 65 insertions(+), 19 deletions(-) create mode 100644 internal/postman.go diff --git a/cmd/main.go b/cmd/main.go index 5526e12..c3dac61 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,39 +3,44 @@ package main import ( "context" "log" + "net" "os" "os/signal" + "strconv" + "syscall" + server "test3k/authPostman/internal" "test3k/authPostman/internal/config" "github.com/segmentio/kafka-go" ) func main() { - r := kafka.NewReader(kafka.ReaderConfig{ - Topic: "registrations", - Brokers: []string{"localhost:9092"}, - GroupID: "consumer-group-id", - Partition: 0, - MinBytes: 10e3, // 10KB - MaxBytes: 10e6, // 10MB - }) - defer r.Close() + config := config.NewConfig() + ctx, _ := context.WithCancel(context.Background()) + srv := server.NewServer(ctx, config) // - // r.SetOffset(0) + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT) + signal.Notify(signalChannel, syscall.SIGTERM) + defer stop(signalChannel, srv) + + // Запуск сервера + go start(config, srv) + // for { - m, err := r.ReadMessage(context.Background()) - if err != nil { - break + select { + case <-signalChannel: + return + case <-ctx.Done(): + return } - - log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } } // Остановка сервера -func stop(signalChannel chan os.Signal, srv *server.AuthDBServer) { +func stop(signalChannel chan os.Signal, srv *server.AuthPostmanServer) { defer srv.GracefulStop() defer signal.Stop(signalChannel) @@ -43,9 +48,32 @@ func stop(signalChannel chan os.Signal, srv *server.AuthDBServer) { } // Запуск сервера -func start(config *config.Config, srv *server.AuthDBServer) { - // connStr := net.JoinHostPort("", strconv.Itoa(config.App.Port)) +func start(config *config.Config, srv *server.AuthPostmanServer) { + connStr := net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port)) + + // + log.Printf("authPostmanServer starting (listening to %s)\n", connStr) // - log.Printf("authPostmanServer starting (%s)\n", connStr) + r := kafka.NewReader(kafka.ReaderConfig{ + Topic: "registrations", + Brokers: []string{connStr}, + GroupID: "consumer-group-id", + Partition: 0, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }) + defer r.Close() + + // ... + r.SetOffset(0) + + for { + m, err := r.ReadMessage(context.Background()) + if err != nil { + break + } + + log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) + } } diff --git a/internal/postman.go b/internal/postman.go new file mode 100644 index 0000000..f8f9f13 --- /dev/null +++ b/internal/postman.go @@ -0,0 +1,18 @@ +package postman + +import ( + "context" + "test3k/authPostman/internal/config" +) + +type AuthPostmanServer struct { +} + +func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer { + return &AuthPostmanServer{} +} + +func (s *AuthPostmanServer) GracefulStop() error { + // return s.db.Close() + return nil +}