coconews/backend/cmd/discovery/main.go

468 lines
11 KiB
Go

package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/mmcdole/gofeed"
"github.com/rss2/backend/internal/workers"
)
var (
logger *log.Logger
pool *workers.Config
dbPool *pgxpool.Pool
sleepSec = 900 // 15 minutes
batchSize = 10
)
type URLSource struct {
ID int64
Nombre string
URL string
CategoriaID *int64
PaisID *int64
Idioma *string
}
func init() {
logger = log.New(os.Stdout, "[DISCOVERY] ", log.LstdFlags)
}
func loadConfig() {
sleepSec = getEnvInt("DISCOVERY_INTERVAL", 900)
batchSize = getEnvInt("DISCOVERY_BATCH", 10)
}
func getEnvInt(key string, defaultValue int) int {
if value := os.Getenv(key); value != "" {
if intVal, err := strconv.Atoi(value); err == nil {
return intVal
}
}
return defaultValue
}
func getPendingURLs(ctx context.Context) ([]URLSource, error) {
rows, err := dbPool.Query(ctx, `
SELECT id, nombre, url, categoria_id, pais_id, idioma
FROM fuentes_url
WHERE active = TRUE
ORDER BY
CASE
WHEN last_check IS NULL THEN 1
WHEN last_status = 'error' THEN 2
WHEN last_status = 'no_feeds' THEN 3
ELSE 4
END,
last_check ASC NULLS FIRST
LIMIT $1
`, batchSize)
if err != nil {
return nil, err
}
defer rows.Close()
var sources []URLSource
for rows.Next() {
var s URLSource
if err := rows.Scan(&s.ID, &s.Nombre, &s.URL, &s.CategoriaID, &s.PaisID, &s.Idioma); err != nil {
continue
}
sources = append(sources, s)
}
return sources, nil
}
func updateURLStatus(ctx context.Context, urlID int64, status, message string, httpCode int) error {
_, err := dbPool.Exec(ctx, `
UPDATE fuentes_url
SET last_check = NOW(),
last_status = $1,
status_message = $2,
last_http_code = $3
WHERE id = $4
`, status, message, httpCode, urlID)
return err
}
func discoverFeeds(pageURL string) ([]string, error) {
client := &http.Client{
Timeout: 15 * time.Second,
}
req, err := http.NewRequest("GET", pageURL, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Mozilla/5.0 (compatible; RSS2Bot/1.0)")
req.Header.Set("Accept", "application/rss+xml, application/atom+xml, application/xml, text/xml, text/html")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Try to parse as feed first
parser := gofeed.NewParser()
feed, err := parser.Parse(resp.Body)
if err == nil && feed != nil && len(feed.Items) > 0 {
// It's a valid feed
return []string{pageURL}, nil
}
// If not a feed, try to find feeds in HTML
return findFeedLinksInHTML(pageURL)
}
func findFeedLinksInHTML(baseURL string) ([]string, error) {
// Simple feed link finder - returns empty for now
// In production, use goquery to parse HTML and find RSS/Atom links
return []string{}, nil
}
func parseFeed(feedURL string) (*gofeed.Feed, error) {
client := &http.Client{
Timeout: 30 * time.Second,
}
req, err := http.NewRequest("GET", feedURL, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Mozilla/5.0 (compatible; RSS2Bot/1.0)")
req.Header.Set("Accept", "application/rss+xml, application/atom+xml, application/xml, text/xml")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
parser := gofeed.NewParser()
return parser.Parse(resp.Body)
}
func getFeedMetadata(feedURL string) (title, description, language string, entryCount int, err error) {
feed, err := parseFeed(feedURL)
if err != nil {
return "", "", "", 0, err
}
title = feed.Title
if title == "" {
title = "Feed sin título"
}
description = feed.Description
if len(description) > 500 {
description = description[:500]
}
language = feed.Language
entryCount = len(feed.Items)
return title, description, language, entryCount, nil
}
func analyzeFeed(title, url, description string) (country, category string) {
// Simple heuristics - in production use ML or API
lowerTitle := strings.ToLower(title)
lowerDesc := strings.ToLower(description)
combined := lowerTitle + " " + lowerDesc
// Detect country
countries := map[string][]string{
"España": {"españa", "español", "madrid", "barcelona"},
"Argentina": {"argentino", "buenos aires"},
"México": {"méxico", "mexicano", "cdmx", "ciudad de méxico"},
"Colombia": {"colombiano", "bogotá"},
"Chile": {"chileno", "santiago"},
"Perú": {"peruano", "lima"},
"EE.UU.": {"estados unidos", "washington", "trump", "biden"},
"Reino Unido": {"reino unido", "londres", "uk"},
"Francia": {"francia", "parís"},
"Alemania": {"alemania", "berlín"},
}
for country, keywords := range countries {
for _, kw := range keywords {
if strings.Contains(combined, kw) {
return country, ""
}
}
}
return "", ""
}
func getCountryIDByName(ctx context.Context, countryName string) (*int64, error) {
var id int64
err := dbPool.QueryRow(ctx, "SELECT id FROM paises WHERE LOWER(nombre) = LOWER($1)", countryName).Scan(&id)
if err != nil {
return nil, err
}
return &id, nil
}
func getCategoryIDByName(ctx context.Context, categoryName string) (*int64, error) {
var id int64
err := dbPool.QueryRow(ctx, "SELECT id FROM categorias WHERE LOWER(nombre) = LOWER($1)", categoryName).Scan(&id)
if err != nil {
return nil, err
}
return &id, nil
}
func createPendingFeed(ctx context.Context, fuenteURLID int64, feedURL string, metadata map[string]interface{}) error {
feedTitle := metadata["title"].(string)
if feedTitle == "" {
feedTitle = "Feed sin título"
}
description := ""
if d, ok := metadata["description"].(string); ok {
description = d
}
language := ""
if l, ok := metadata["language"].(string); ok {
language = l
}
entryCount := 0
if c, ok := metadata["entry_count"].(int); ok {
entryCount = c
}
detectedCountry := ""
if dc, ok := metadata["detected_country"].(string); ok {
detectedCountry = dc
}
var detectedCountryID *int64
if detectedCountry != "" {
if cid, err := getCountryIDByName(ctx, detectedCountry); err == nil {
detectedCountryID = cid
}
}
suggestedCategory := ""
if sc, ok := metadata["suggested_category"].(string); ok {
suggestedCategory = sc
}
var suggestedCategoryID *int64
if suggestedCategory != "" {
if caid, err := getCategoryIDByName(ctx, suggestedCategory); err == nil {
suggestedCategoryID = caid
}
}
_, err := dbPool.Exec(ctx, `
INSERT INTO feeds_pending (
fuente_url_id, feed_url, feed_title, feed_description,
feed_language, feed_type, entry_count,
detected_country_id, suggested_categoria_id,
discovered_at
)
VALUES ($1, $2, $3, $4, $5, 'rss', $6, $7, $8, NOW())
ON CONFLICT (feed_url) DO UPDATE
SET feed_title = EXCLUDED.feed_title,
discovered_at = NOW()
`, fuenteURLID, feedURL, feedTitle, description, language, entryCount, detectedCountryID, suggestedCategoryID)
return err
}
func createFeedDirectly(ctx context.Context, feedURL string, fuenteURLID *int64, categoriaID, paisID *int64, idioma *string) (bool, error) {
title, description, language, _, err := getFeedMetadata(feedURL)
if err != nil {
return false, err
}
if language == "" && idioma != nil {
language = *idioma
}
var feedID int64
err = dbPool.QueryRow(ctx, `
INSERT INTO feeds (nombre, descripcion, url, categoria_id, pais_id, idioma, fuente_url_id, activo)
VALUES ($1, $2, $3, $4, $5, $6, $7, TRUE)
ON CONFLICT (url) DO NOTHING
RETURNING id
`, title, description, feedURL, categoriaID, paisID, language, fuenteURLID).Scan(&feedID)
if err != nil {
return false, err
}
return feedID > 0, nil
}
func processURLSource(ctx context.Context, source URLSource) {
logger.Printf("Processing: %s (%s)", source.Nombre, source.URL)
// Try to find feeds on this URL
feeds, err := discoverFeeds(source.URL)
if err != nil {
logger.Printf("Error discovering feeds: %v", err)
updateURLStatus(ctx, source.ID, "error", err.Error()[:200], 0)
return
}
if len(feeds) == 0 {
logger.Printf("No feeds found for: %s", source.URL)
updateURLStatus(ctx, source.ID, "no_feeds", "No feeds found", 200)
return
}
logger.Printf("Found %d feeds for %s", len(feeds), source.URL)
maxFeeds := getEnvInt("MAX_FEEDS_PER_URL", 5)
if len(feeds) > maxFeeds {
feeds = feeds[:maxFeeds]
}
autoApprove := source.CategoriaID != nil && source.PaisID != nil
created := 0
pending := 0
existing := 0
errors := 0
for _, feedURL := range feeds {
// Get feed metadata
title, description, language, entryCount, err := getFeedMetadata(feedURL)
if err != nil {
logger.Printf("Error parsing feed %s: %v", feedURL, err)
errors++
continue
}
// Analyze for country/category
detectedCountry, suggestedCategory := analyzeFeed(title, feedURL, description)
metadata := map[string]interface{}{
"title": title,
"description": description,
"language": language,
"entry_count": entryCount,
"detected_country": detectedCountry,
"suggested_category": suggestedCategory,
}
if !autoApprove {
// Create pending feed for review
if err := createPendingFeed(ctx, source.ID, feedURL, metadata); err != nil {
logger.Printf("Error creating pending feed: %v", err)
errors++
} else {
pending++
}
} else {
// Create feed directly
createdFeed, err := createFeedDirectly(ctx, feedURL, &source.ID, source.CategoriaID, source.PaisID, source.Idioma)
if err != nil {
logger.Printf("Error creating feed: %v", err)
errors++
} else if createdFeed {
created++
} else {
existing++
}
}
time.Sleep(1 * time.Second) // Rate limiting
}
// Update status
var status string
var message string
if created > 0 || pending > 0 {
status = "success"
parts := []string{}
if created > 0 {
parts = append(parts, fmt.Sprintf("%d created", created))
}
if pending > 0 {
parts = append(parts, fmt.Sprintf("%d pending", pending))
}
message = strings.Join(parts, ", ")
} else if existing > 0 {
status = "existing"
message = fmt.Sprintf("%d already existed", existing)
} else {
status = "error"
message = fmt.Sprintf("%d errors", errors)
}
updateURLStatus(ctx, source.ID, status, message, 200)
logger.Printf("Processed %s: created=%d, pending=%d, existing=%d, errors=%d",
source.URL, created, pending, existing, errors)
}
func main() {
loadConfig()
logger.Println("Starting RSS Discovery Worker")
cfg := workers.LoadDBConfig()
if err := workers.Connect(cfg); err != nil {
logger.Fatalf("Failed to connect to database: %v", err)
}
dbPool = workers.GetPool()
defer workers.Close()
logger.Println("Connected to PostgreSQL")
ctx := context.Background()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
logger.Println("Shutting down...")
os.Exit(0)
}()
logger.Printf("Config: interval=%ds, batch=%d", sleepSec, batchSize)
ticker := time.NewTicker(time.Duration(sleepSec) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sources, err := getPendingURLs(ctx)
if err != nil {
logger.Printf("Error fetching URLs: %v", err)
continue
}
if len(sources) == 0 {
logger.Println("No pending URLs to process")
continue
}
logger.Printf("Processing %d sources", len(sources))
for _, source := range sources {
processURLSource(ctx, source)
time.Sleep(2 * time.Second)
}
}
}
}