package kafka

import (
	"context"
	"fmt"
	"time"

	"git.slaventius.ru/test3k/authPostman/internal/config"

	"github.com/segmentio/kafka-go"
)

type KafkaReader struct {
	ctx    context.Context
	config *config.Config
	reader *kafka.Reader
	first  string
	topic  string
}

func NewReader(ctx context.Context, config *config.Config, topic string, address ...string) *KafkaReader {
	return &KafkaReader{
		ctx:    ctx,
		config: config,
		reader: kafka.NewReader(kafka.ReaderConfig{
			Topic:   topic,
			Brokers: address,
			GroupID: fmt.Sprintf("consumer-group-%s", topic),
			// Partition: 0,    // TODO
			MinBytes:       10e3, // 10KB
			MaxBytes:       10e6, // 10MB
			ReadBackoffMax: time.Millisecond * 100,
		}),
		first: address[0],
		topic: topic,
	}
}

func (s *KafkaReader) Close() error {
	return s.reader.Close()
}

func (s *KafkaReader) ReadMessage() (kafka.Message, error) {
	return s.reader.ReadMessage(s.ctx)
}