package main import ( "context" "fmt" "log" "net/http" "os" "os/signal" "strconv" "strings" "syscall" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/mmcdole/gofeed" "github.com/rss2/backend/internal/workers" ) var ( logger *log.Logger pool *workers.Config dbPool *pgxpool.Pool sleepSec = 900 // 15 minutes batchSize = 10 ) type URLSource struct { ID int64 Nombre string URL string CategoriaID *int64 PaisID *int64 Idioma *string } func init() { logger = log.New(os.Stdout, "[DISCOVERY] ", log.LstdFlags) } func loadConfig() { sleepSec = getEnvInt("DISCOVERY_INTERVAL", 900) batchSize = getEnvInt("DISCOVERY_BATCH", 10) } func getEnvInt(key string, defaultValue int) int { if value := os.Getenv(key); value != "" { if intVal, err := strconv.Atoi(value); err == nil { return intVal } } return defaultValue } func getPendingURLs(ctx context.Context) ([]URLSource, error) { rows, err := dbPool.Query(ctx, ` SELECT id, nombre, url, categoria_id, pais_id, idioma FROM fuentes_url WHERE active = TRUE ORDER BY CASE WHEN last_check IS NULL THEN 1 WHEN last_status = 'error' THEN 2 WHEN last_status = 'no_feeds' THEN 3 ELSE 4 END, last_check ASC NULLS FIRST LIMIT $1 `, batchSize) if err != nil { return nil, err } defer rows.Close() var sources []URLSource for rows.Next() { var s URLSource if err := rows.Scan(&s.ID, &s.Nombre, &s.URL, &s.CategoriaID, &s.PaisID, &s.Idioma); err != nil { continue } sources = append(sources, s) } return sources, nil } func updateURLStatus(ctx context.Context, urlID int64, status, message string, httpCode int) error { _, err := dbPool.Exec(ctx, ` UPDATE fuentes_url SET last_check = NOW(), last_status = $1, status_message = $2, last_http_code = $3 WHERE id = $4 `, status, message, httpCode, urlID) return err } func discoverFeeds(pageURL string) ([]string, error) { client := &http.Client{ Timeout: 15 * time.Second, } req, err := http.NewRequest("GET", pageURL, nil) if err != nil { return nil, err } req.Header.Set("User-Agent", "Mozilla/5.0 (compatible; RSS2Bot/1.0)") req.Header.Set("Accept", "application/rss+xml, application/atom+xml, application/xml, text/xml, text/html") resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() // Try to parse as feed first parser := gofeed.NewParser() feed, err := parser.Parse(resp.Body) if err == nil && feed != nil && len(feed.Items) > 0 { // It's a valid feed return []string{pageURL}, nil } // If not a feed, try to find feeds in HTML return findFeedLinksInHTML(pageURL) } func findFeedLinksInHTML(baseURL string) ([]string, error) { // Simple feed link finder - returns empty for now // In production, use goquery to parse HTML and find RSS/Atom links return []string{}, nil } func parseFeed(feedURL string) (*gofeed.Feed, error) { client := &http.Client{ Timeout: 30 * time.Second, } req, err := http.NewRequest("GET", feedURL, nil) if err != nil { return nil, err } req.Header.Set("User-Agent", "Mozilla/5.0 (compatible; RSS2Bot/1.0)") req.Header.Set("Accept", "application/rss+xml, application/atom+xml, application/xml, text/xml") resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() parser := gofeed.NewParser() return parser.Parse(resp.Body) } func getFeedMetadata(feedURL string) (title, description, language string, entryCount int, err error) { feed, err := parseFeed(feedURL) if err != nil { return "", "", "", 0, err } title = feed.Title if title == "" { title = "Feed sin título" } description = feed.Description if len(description) > 500 { description = description[:500] } language = feed.Language entryCount = len(feed.Items) return title, description, language, entryCount, nil } func analyzeFeed(title, url, description string) (country, category string) { // Simple heuristics - in production use ML or API lowerTitle := strings.ToLower(title) lowerDesc := strings.ToLower(description) combined := lowerTitle + " " + lowerDesc // Detect country countries := map[string][]string{ "España": {"españa", "español", "madrid", "barcelona"}, "Argentina": {"argentino", "buenos aires"}, "México": {"méxico", "mexicano", "cdmx", "ciudad de méxico"}, "Colombia": {"colombiano", "bogotá"}, "Chile": {"chileno", "santiago"}, "Perú": {"peruano", "lima"}, "EE.UU.": {"estados unidos", "washington", "trump", "biden"}, "Reino Unido": {"reino unido", "londres", "uk"}, "Francia": {"francia", "parís"}, "Alemania": {"alemania", "berlín"}, } for country, keywords := range countries { for _, kw := range keywords { if strings.Contains(combined, kw) { return country, "" } } } return "", "" } func getCountryIDByName(ctx context.Context, countryName string) (*int64, error) { var id int64 err := dbPool.QueryRow(ctx, "SELECT id FROM paises WHERE LOWER(nombre) = LOWER($1)", countryName).Scan(&id) if err != nil { return nil, err } return &id, nil } func getCategoryIDByName(ctx context.Context, categoryName string) (*int64, error) { var id int64 err := dbPool.QueryRow(ctx, "SELECT id FROM categorias WHERE LOWER(nombre) = LOWER($1)", categoryName).Scan(&id) if err != nil { return nil, err } return &id, nil } func createPendingFeed(ctx context.Context, fuenteURLID int64, feedURL string, metadata map[string]interface{}) error { feedTitle := metadata["title"].(string) if feedTitle == "" { feedTitle = "Feed sin título" } description := "" if d, ok := metadata["description"].(string); ok { description = d } language := "" if l, ok := metadata["language"].(string); ok { language = l } entryCount := 0 if c, ok := metadata["entry_count"].(int); ok { entryCount = c } detectedCountry := "" if dc, ok := metadata["detected_country"].(string); ok { detectedCountry = dc } var detectedCountryID *int64 if detectedCountry != "" { if cid, err := getCountryIDByName(ctx, detectedCountry); err == nil { detectedCountryID = cid } } suggestedCategory := "" if sc, ok := metadata["suggested_category"].(string); ok { suggestedCategory = sc } var suggestedCategoryID *int64 if suggestedCategory != "" { if caid, err := getCategoryIDByName(ctx, suggestedCategory); err == nil { suggestedCategoryID = caid } } _, err := dbPool.Exec(ctx, ` INSERT INTO feeds_pending ( fuente_url_id, feed_url, feed_title, feed_description, feed_language, feed_type, entry_count, detected_country_id, suggested_categoria_id, discovered_at ) VALUES ($1, $2, $3, $4, $5, 'rss', $6, $7, $8, NOW()) ON CONFLICT (feed_url) DO UPDATE SET feed_title = EXCLUDED.feed_title, discovered_at = NOW() `, fuenteURLID, feedURL, feedTitle, description, language, entryCount, detectedCountryID, suggestedCategoryID) return err } func createFeedDirectly(ctx context.Context, feedURL string, fuenteURLID *int64, categoriaID, paisID *int64, idioma *string) (bool, error) { title, description, language, _, err := getFeedMetadata(feedURL) if err != nil { return false, err } if language == "" && idioma != nil { language = *idioma } var feedID int64 err = dbPool.QueryRow(ctx, ` INSERT INTO feeds (nombre, descripcion, url, categoria_id, pais_id, idioma, fuente_url_id, activo) VALUES ($1, $2, $3, $4, $5, $6, $7, TRUE) ON CONFLICT (url) DO NOTHING RETURNING id `, title, description, feedURL, categoriaID, paisID, language, fuenteURLID).Scan(&feedID) if err != nil { return false, err } return feedID > 0, nil } func processURLSource(ctx context.Context, source URLSource) { logger.Printf("Processing: %s (%s)", source.Nombre, source.URL) // Try to find feeds on this URL feeds, err := discoverFeeds(source.URL) if err != nil { logger.Printf("Error discovering feeds: %v", err) updateURLStatus(ctx, source.ID, "error", err.Error()[:200], 0) return } if len(feeds) == 0 { logger.Printf("No feeds found for: %s", source.URL) updateURLStatus(ctx, source.ID, "no_feeds", "No feeds found", 200) return } logger.Printf("Found %d feeds for %s", len(feeds), source.URL) maxFeeds := getEnvInt("MAX_FEEDS_PER_URL", 5) if len(feeds) > maxFeeds { feeds = feeds[:maxFeeds] } autoApprove := source.CategoriaID != nil && source.PaisID != nil created := 0 pending := 0 existing := 0 errors := 0 for _, feedURL := range feeds { // Get feed metadata title, description, language, entryCount, err := getFeedMetadata(feedURL) if err != nil { logger.Printf("Error parsing feed %s: %v", feedURL, err) errors++ continue } // Analyze for country/category detectedCountry, suggestedCategory := analyzeFeed(title, feedURL, description) metadata := map[string]interface{}{ "title": title, "description": description, "language": language, "entry_count": entryCount, "detected_country": detectedCountry, "suggested_category": suggestedCategory, } if !autoApprove { // Create pending feed for review if err := createPendingFeed(ctx, source.ID, feedURL, metadata); err != nil { logger.Printf("Error creating pending feed: %v", err) errors++ } else { pending++ } } else { // Create feed directly createdFeed, err := createFeedDirectly(ctx, feedURL, &source.ID, source.CategoriaID, source.PaisID, source.Idioma) if err != nil { logger.Printf("Error creating feed: %v", err) errors++ } else if createdFeed { created++ } else { existing++ } } time.Sleep(1 * time.Second) // Rate limiting } // Update status var status string var message string if created > 0 || pending > 0 { status = "success" parts := []string{} if created > 0 { parts = append(parts, fmt.Sprintf("%d created", created)) } if pending > 0 { parts = append(parts, fmt.Sprintf("%d pending", pending)) } message = strings.Join(parts, ", ") } else if existing > 0 { status = "existing" message = fmt.Sprintf("%d already existed", existing) } else { status = "error" message = fmt.Sprintf("%d errors", errors) } updateURLStatus(ctx, source.ID, status, message, 200) logger.Printf("Processed %s: created=%d, pending=%d, existing=%d, errors=%d", source.URL, created, pending, existing, errors) } func main() { loadConfig() logger.Println("Starting RSS Discovery Worker") cfg := workers.LoadDBConfig() if err := workers.Connect(cfg); err != nil { logger.Fatalf("Failed to connect to database: %v", err) } dbPool = workers.GetPool() defer workers.Close() logger.Println("Connected to PostgreSQL") ctx := context.Background() sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan logger.Println("Shutting down...") os.Exit(0) }() logger.Printf("Config: interval=%ds, batch=%d", sleepSec, batchSize) ticker := time.NewTicker(time.Duration(sleepSec) * time.Second) defer ticker.Stop() for { select { case <-ticker.C: sources, err := getPendingURLs(ctx) if err != nil { logger.Printf("Error fetching URLs: %v", err) continue } if len(sources) == 0 { logger.Println("No pending URLs to process") continue } logger.Printf("Processing %d sources", len(sources)) for _, source := range sources { processURLSource(ctx, source) time.Sleep(2 * time.Second) } } } }