@ -21,6 +21,7 @@ type AuthPostmanServer struct {
kafkaReader * kafka . KafkaReader
kafkaReader * kafka . KafkaReader
logger * logger . Logger
logger * logger . Logger
config * config . Config
config * config . Config
msgs map [ int64 ] bool
}
}
func NewServer ( ctx context . Context , config * config . Config ) * AuthPostmanServer {
func NewServer ( ctx context . Context , config * config . Config ) * AuthPostmanServer {
@ -32,6 +33,7 @@ func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer {
kafkaReader : kafka . NewReader ( ctx , config , api . TopicRegistrations , address ... ) ,
kafkaReader : kafka . NewReader ( ctx , config , api . TopicRegistrations , address ... ) ,
logger : logger ,
logger : logger ,
config : config ,
config : config ,
msgs : make ( map [ int64 ] bool ) ,
}
}
}
}
@ -57,8 +59,17 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error {
return erk
return erk
}
}
// Проверим пришедшее сообщение, не является ли оно повтором
if _ , ok := s . msgs [ msg . ID ] ; ok {
s . logger . Printf ( "the message #%d is a duplicate" , msg . ID )
continue
} else {
s . msgs [ msg . ID ] = true
}
//
//
s . logger . Printf ( "send code %s to %s ..." , msg . Code , msg . Email )
s . logger . Printf ( "send code %s to %s (message #%d) ..." , msg . Code , msg . Email , msg . ID )
//
//
text := fmt . Sprintf ( "Confirmation code %v" , msg . Code )
text := fmt . Sprintf ( "Confirmation code %v" , msg . Code )
@ -73,7 +84,7 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error {
if ers != nil {
if ers != nil {
s . logger . Print ( ers )
s . logger . Print ( ers )
} else {
} else {
s . logger . Printf ( "send code %s to %s completed" , msg . Code , msg . Email )
s . logger . Printf ( "send code %s to %s (message #%d) completed" , msg . Code , msg . Email , msg . ID )
}
}
}
}
}
}