rss2/backend/cmd/topics/main.go

383 lines
8 KiB
Go

package main
import (
"context"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rss2/backend/internal/workers"
)
var (
logger *log.Logger
dbPool *pgxpool.Pool
sleepSec = 10
batchSz = 500
)
type Topic struct {
ID int64
Weight int
Keywords []string
}
type Country struct {
ID int64
Name string
Keywords []string
}
func init() {
logger = log.New(os.Stdout, "[TOPICS] ", log.LstdFlags)
}
func loadConfig() {
sleepSec = getEnvInt("TOPICS_SLEEP", 10)
batchSz = getEnvInt("TOPICS_BATCH", 500)
}
func getEnvInt(key string, defaultValue int) int {
if value := os.Getenv(key); value != "" {
if intVal, err := strconv.Atoi(value); err == nil {
return intVal
}
}
return defaultValue
}
func ensureSchema(ctx context.Context) error {
_, err := dbPool.Exec(ctx, `
CREATE TABLE IF NOT EXISTS topics (
id SERIAL PRIMARY KEY,
slug VARCHAR(50) UNIQUE NOT NULL,
name VARCHAR(100) NOT NULL,
weight INTEGER DEFAULT 1,
keywords TEXT,
group_name VARCHAR(50)
);
`)
if err != nil {
return err
}
_, err = dbPool.Exec(ctx, `
CREATE TABLE IF NOT EXISTS news_topics (
noticia_id VARCHAR(32) REFERENCES noticias(id) ON DELETE CASCADE,
topic_id INTEGER REFERENCES topics(id) ON DELETE CASCADE,
score INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (noticia_id, topic_id)
);
`)
if err != nil {
return err
}
_, err = dbPool.Exec(ctx, `
ALTER TABLE noticias ADD COLUMN IF NOT EXISTS topics_processed BOOLEAN DEFAULT FALSE;
`)
return err
}
func loadTopics(ctx context.Context) ([]Topic, error) {
rows, err := dbPool.Query(ctx, "SELECT id, weight, keywords FROM topics")
if err != nil {
return nil, err
}
defer rows.Close()
var topics []Topic
for rows.Next() {
var t Topic
var kwStr *string
if err := rows.Scan(&t.ID, &t.Weight, &kwStr); err != nil {
continue
}
if kwStr != nil {
keywords := strings.Split(*kwStr, ",")
for i := range keywords {
keywords[i] = strings.ToLower(strings.TrimSpace(keywords[i]))
}
t.Keywords = keywords
}
topics = append(topics, t)
}
return topics, nil
}
func loadCountries(ctx context.Context) ([]Country, error) {
rows, err := dbPool.Query(ctx, "SELECT id, nombre FROM paises")
if err != nil {
return nil, err
}
defer rows.Close()
aliases := map[string][]string{
"Estados Unidos": {"eeuu", "ee.uu.", "usa", "estadounidense", "washington"},
"Rusia": {"ruso", "rusa", "moscú", "kremlin"},
"China": {"chino", "china", "pekin", "beijing"},
"Ucrania": {"ucraniano", "kiev", "kyiv"},
"Israel": {"israelí", "tel aviv", "jerusalén"},
"España": {"español", "madrid"},
"Reino Unido": {"uk", "londres", "británico"},
"Francia": {"francés", "parís"},
"Alemania": {"alemán", "berlín"},
"Palestina": {"palestino", "gaza", "cisjordania"},
"Irán": {"iraní", "teherán"},
}
var countries []Country
for rows.Next() {
var c Country
if err := rows.Scan(&c.ID, &c.Name); err != nil {
continue
}
c.Keywords = []string{strings.ToLower(c.Name)}
if kw, ok := aliases[c.Name]; ok {
c.Keywords = append(c.Keywords, kw...)
}
countries = append(countries, c)
}
return countries, nil
}
type NewsItem struct {
ID string
Titulo *string
Resumen *string
}
func fetchPendingNews(ctx context.Context, limit int) ([]NewsItem, error) {
rows, err := dbPool.Query(ctx, `
SELECT id, titulo, resumen
FROM noticias
WHERE topics_processed = FALSE
ORDER BY fecha DESC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var items []NewsItem
for rows.Next() {
var n NewsItem
if err := rows.Scan(&n.ID, &n.Titulo, &n.Resumen); err != nil {
continue
}
items = append(items, n)
}
return items, nil
}
func findTopics(text string, topics []Topic) []struct {
TopicID int64
Score int
} {
text = strings.ToLower(text)
var matches []struct {
TopicID int64
Score int
}
for _, topic := range topics {
count := 0
for _, kw := range topic.Keywords {
if strings.Contains(text, kw) {
count++
}
}
if count > 0 {
matches = append(matches, struct {
TopicID int64
Score int
}{topic.ID, topic.Weight * count})
}
}
return matches
}
func findBestCountry(text string, countries []Country) *int64 {
text = strings.ToLower(text)
bestID := new(int64)
bestCount := 0
for _, c := range countries {
count := 0
for _, kw := range c.Keywords {
if strings.Contains(text, kw) {
count++
}
}
if count > bestCount {
bestCount = count
*bestID = c.ID
}
}
if bestCount > 0 {
return bestID
}
return nil
}
func processBatch(ctx context.Context, topics []Topic, countries []Country) (int, error) {
items, err := fetchPendingNews(ctx, batchSz)
if err != nil {
return 0, err
}
if len(items) == 0 {
return 0, nil
}
type topicMatch struct {
NoticiaID string
TopicID int64
Score int
}
type countryUpdate struct {
PaisID int64
NoticiaID string
}
var topicMatches []topicMatch
var countryUpdates []countryUpdate
var processedIDs []string
for _, item := range items {
var text string
if item.Titulo != nil {
text += *item.Titulo
}
if item.Resumen != nil {
text += " " + *item.Resumen
}
// Find topics
matches := findTopics(text, topics)
for _, m := range matches {
topicMatches = append(topicMatches, topicMatch{item.ID, m.TopicID, m.Score})
}
// Find best country
if countryID := findBestCountry(text, countries); countryID != nil {
countryUpdates = append(countryUpdates, countryUpdate{*countryID, item.ID})
}
processedIDs = append(processedIDs, item.ID)
}
// Insert topic relations
if len(topicMatches) > 0 {
for _, tm := range topicMatches {
_, err := dbPool.Exec(ctx, `
INSERT INTO news_topics (noticia_id, topic_id, score)
VALUES ($1, $2, $3)
ON CONFLICT (noticia_id, topic_id) DO UPDATE SET score = EXCLUDED.score
`, tm.NoticiaID, tm.TopicID, tm.Score)
if err != nil {
logger.Printf("Error inserting topic: %v", err)
}
}
}
// Update country
if len(countryUpdates) > 0 {
for _, cu := range countryUpdates {
_, err := dbPool.Exec(ctx, `
UPDATE noticias SET pais_id = $1 WHERE id = $2
`, cu.PaisID, cu.NoticiaID)
if err != nil {
logger.Printf("Error updating country: %v", err)
}
}
}
// Mark as processed
if len(processedIDs) > 0 {
_, err := dbPool.Exec(ctx, `
UPDATE noticias SET topics_processed = TRUE WHERE id = ANY($1)
`, processedIDs)
if err != nil {
return 0, err
}
}
return len(items), nil
}
func main() {
loadConfig()
logger.Println("Starting Topics Worker")
cfg := workers.LoadDBConfig()
if err := workers.Connect(cfg); err != nil {
logger.Fatalf("Failed to connect to database: %v", err)
}
dbPool = workers.GetPool()
defer workers.Close()
ctx := context.Background()
// Ensure schema
if err := ensureSchema(ctx); err != nil {
logger.Printf("Error ensuring schema: %v", err)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
logger.Println("Shutting down...")
os.Exit(0)
}()
logger.Printf("Config: sleep=%ds, batch=%d", sleepSec, batchSz)
for {
select {
case <-time.After(time.Duration(sleepSec) * time.Second):
topics, err := loadTopics(ctx)
if err != nil {
logger.Printf("Error loading topics: %v", err)
continue
}
if len(topics) == 0 {
logger.Println("No topics found in DB")
time.Sleep(time.Duration(sleepSec) * time.Second)
continue
}
countries, err := loadCountries(ctx)
if err != nil {
logger.Printf("Error loading countries: %v", err)
continue
}
count, err := processBatch(ctx, topics, countries)
if err != nil {
logger.Printf("Error processing batch: %v", err)
continue
}
if count > 0 {
logger.Printf("Processed %d news items", count)
}
if count < batchSz {
time.Sleep(time.Duration(sleepSec) * time.Second)
}
}
}
}