package authDB

import (
	"context"
	"encoding/json"
	"errors"

	"net"
	"strconv"
	"sync"
	"time"

	kafka "git.slaventius.ru/test3k/authDB/internal/transport/kafka"

	"git.slaventius.ru/test3k/authDB/internal/config"
	api "git.slaventius.ru/test3k/umate/pkg/api"
	apiKafka "git.slaventius.ru/test3k/umate/pkg/kafka"
	logger "git.slaventius.ru/test3k/umate/pkg/logger"
)

type user struct {
	ID        int32
	Login     string
	Password  string
	Confirmed bool
	Time      time.Time
	apiKafka.MessageRegistration
}

type AuthDBServer struct {
	mu          sync.Mutex
	users       map[string]*user
	kafkaWriter *kafka.KafkaWriter
	logger      *logger.Logger
	api.UnimplementedAuthDBServer
	ctx context.Context
	id  int32
}

func NewServer(ctx context.Context, config *config.Config) *AuthDBServer {
	logger := logger.NewLogger("test3k:authDBService", config.Sentry.DSN)

	return &AuthDBServer{
		mu:          sync.Mutex{},
		users:       make(map[string]*user),
		kafkaWriter: kafka.NewWriter(ctx, logger, apiKafka.TopicRegistrations, net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))),
		logger:      logger,
		ctx:         ctx,
		id:          0,
	}
}

func (s *AuthDBServer) GracefulStop() error {
	return s.kafkaWriter.Close()
}

func (s *AuthDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	//
	user, ok := s.users[req.GetLogin()]
	if !ok {
		return nil, errors.New("login unknown")
	}

	//
	if !user.Confirmed {
		return nil, errors.New("login unconfirmed")
	}

	//
	if user.Password != req.Password {
		return nil, errors.New("password incorrect")
	}

	return &api.LoginResponse{
		ID: user.ID,
	}, nil
}

func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	//
	if val, ok := s.users[req.GetLogin()]; ok {
		if time.Now().Before(val.Time) {
			return nil, errors.New("login already registered")
		}
	} else {
		s.id = s.id + 1
	}

	//
	user := &user{
		ID:        s.id,
		Login:     req.GetLogin(),
		Password:  req.GetPassword(),
		Confirmed: false,
		Time:      time.Now().Add(time.Minute * 15),
		MessageRegistration: apiKafka.MessageRegistration{
			Code:  strconv.Itoa(time.Now().Nanosecond()),
			Email: req.GetEmail(),
		},
	}

	// TODO
	value, eru := json.Marshal(user.MessageRegistration)
	if eru != nil {
		return nil, eru
	}

	//
	s.logger.Printf("publication code %s to %s ...", user.MessageRegistration.Code, user.MessageRegistration.Email)

	//
	err := s.kafkaWriter.WriteMessage([]byte(user.Login), value)
	if err != nil {
		s.logger.Error(err)

		return nil, err
	} else {
		s.users[req.Login] = user
	}

	//
	s.logger.Printf("publication code %s to %s completed", user.MessageRegistration.Code, user.MessageRegistration.Email)

	return &api.RegistrationResponse{
		Code:  user.MessageRegistration.Code,
		Email: user.MessageRegistration.Email,
	}, nil
}

func (s *AuthDBServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	//
	for _, x := range s.users {
		if x.Code == req.GetCode() {
			if x.Confirmed {
				return nil, errors.New("already confirmed")
			}

			//
			x.Confirmed = true

			return &api.ConfirmationResponse{
				ID: x.ID,
			}, nil
		}
	}

	return nil, errors.New("code unknown")
}