From e6ef6b21866d60e42966c6a3e4e8c65ebd6b20c7 Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 22:48:17 +0300 Subject: [PATCH] * --- internal/transport/grpc/grpc.go | 4 ++-- internal/transport/kafka/kafka_writer.go | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 4aae2f1..664ae71 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -113,7 +113,7 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe } // - log.Printf("send code %s to %s ...", user.msg.Code, user.msg.Email) + log.Printf("publication code %s to %s ...", user.msg.Code, user.msg.Email) // err := s.kafkaWriter.WriteMessage([]byte(user.Login), value) @@ -124,7 +124,7 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe } // - log.Printf("send code %s to %s completed", user.msg.Code, user.msg.Email) + log.Printf("publication code %s to %s completed", user.msg.Code, user.msg.Email) return &api.RegistrationResponse{ Code: user.msg.Code, diff --git a/internal/transport/kafka/kafka_writer.go b/internal/transport/kafka/kafka_writer.go index 83a307a..0359f77 100644 --- a/internal/transport/kafka/kafka_writer.go +++ b/internal/transport/kafka/kafka_writer.go @@ -5,6 +5,7 @@ import ( "log" "net" "strconv" + "time" "github.com/segmentio/kafka-go" ) @@ -20,9 +21,10 @@ func NewWriter(ctx context.Context, topic string, address ...string) *KafkaWrite s := &KafkaWriter{ ctx: ctx, writer: &kafka.Writer{ - Topic: topic, - Balancer: &kafka.LeastBytes{}, - Addr: kafka.TCP(address...), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + WriteBackoffMax: time.Millisecond * 100, + Addr: kafka.TCP(address...), }, first: address[0], topic: topic,