Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 42 additions & 25 deletions analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/nbd-wtf/go-nostr"
"github.com/relaytools/feedbuilder/internal/relayurl"
)

type Event struct {
Expand Down Expand Up @@ -143,12 +144,12 @@ func analyzeCmd(args []string) {
pk := strings.ToLower(ev.PubKey)
for _, tag := range ev.Tags {
if len(tag) >= 2 && tag[0] == "r" {
url := normalizeURL(tag[1])
if url == "" {
u, err := relayurl.New(tag[1])
if err != nil {
continue
}
host := urlToHost(url)
if exHosts.has(host) {
url := u.String()
if exHosts.has(u.Host()) {
continue
}
// If the URL points to an inbox endpoint, skip it and prefer a different URL for outbox
Expand Down Expand Up @@ -215,8 +216,12 @@ func analyzeCmd(args []string) {

// Collect all unique relay URLs we want to check
allRelays := set{}
for url := range writeMap {
allRelays.add(normalizeURL(url))
for rawURL := range writeMap {
u, err := relayurl.New(rawURL)
if err != nil {
continue
}
allRelays.add(u.String())
}

monitorData := fetchNIP66MonitorData(monitorRelayList, allRelays, time.Duration(*monitorTimeout)*time.Second)
Expand All @@ -236,16 +241,25 @@ func analyzeCmd(args []string) {
// Filter writePairs to only include online relays
var filteredPairs []string
onlineRelays := set{}
for url, info := range monitorData {
for rawURL, info := range monitorData {
if info.Status == "online" {
onlineRelays.add(normalizeURL(url))
u, err := relayurl.New(rawURL)
if err != nil {
continue
}
onlineRelays.add(u.String())
}
}

for _, pair := range writePairs {
fields := strings.Fields(pair)
if len(fields) >= 2 {
relayURL := normalizeURL(strings.Join(fields[1:], " "))
raw := strings.Join(fields[1:], " ")
u, err := relayurl.New(raw)
if err != nil {
continue
}
relayURL := u.String()
if onlineRelays.has(relayURL) {
filteredPairs = append(filteredPairs, pair)
}
Expand Down Expand Up @@ -367,9 +381,9 @@ func fetchNIP66MonitorData(monitorRelays []string, targetRelays set, timeout tim
result := make(map[string]*RelayMonitorInfo)

// Initialize all target relays as unknown
for url := range targetRelays {
result[url] = &RelayMonitorInfo{
URL: url,
for relay := range targetRelays {
result[relay] = &RelayMonitorInfo{
URL: relay,
Status: "unknown",
}
}
Expand All @@ -381,8 +395,8 @@ func fetchNIP66MonitorData(monitorRelays []string, targetRelays set, timeout tim
// Convert target relays to slice for filter
// NIP-66 requires normalized URLs with trailing slashes in d-tags
var dTags []string
for url := range targetRelays {
normalized := url
for relay := range targetRelays {
normalized := relay
if !strings.HasSuffix(normalized, "/") {
normalized += "/"
}
Expand Down Expand Up @@ -474,8 +488,11 @@ func parseNIP66Event(event *nostr.Event, result map[string]*RelayMonitorInfo, mo
// NIP-66 d-tags have trailing slashes, but our stored URLs don't
for _, tag := range event.Tags {
if len(tag) >= 2 && tag[0] == "d" {
// normalizeURL removes the trailing slash to match our stored URLs
relayURL = normalizeURL(tag[1])
u, err := relayurl.New(tag[1])
if err != nil {
continue
}
relayURL = u.String()
break
}
}
Expand Down Expand Up @@ -596,8 +613,8 @@ func writeMonitorReport(path string, data map[string]*RelayMonitorInfo) error {

// Sort by URL
var urls []string
for url := range data {
urls = append(urls, url)
for relayURL := range data {
urls = append(urls, relayURL)
}
sort.Strings(urls)

Expand All @@ -606,8 +623,8 @@ func writeMonitorReport(path string, data map[string]*RelayMonitorInfo) error {
fmt.Fprintln(w, "# Format: URL | Status | RTT-Open | RTT-Read | RTT-Write | Monitors | Last-Checked")
fmt.Fprintln(w, "")

for _, url := range urls {
info := data[url]
for _, relayURL := range urls {
info := data[relayURL]
lastChecked := "never"
if info.LastChecked > 0 {
lastChecked = time.Unix(info.LastChecked, 0).Format(time.RFC3339)
Expand Down Expand Up @@ -640,20 +657,20 @@ func uniqueByHost(relayMap map[string]set) []string {
have := set{}
var out []string
var urls []string
for url := range relayMap {
urls = append(urls, url)
for relay := range relayMap {
urls = append(urls, relay)
}
sort.Strings(urls)
for _, url := range urls {
h := urlToHost(url)
for _, relay := range urls {
h := urlToHost(relay)
if h == "" {
continue
}
if have.has(h) {
continue
}
have.add(h)
out = append(out, url)
out = append(out, relay)
}
return out
}
Expand Down
64 changes: 43 additions & 21 deletions collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

nostr "github.com/nbd-wtf/go-nostr"
"github.com/relaytools/feedbuilder/internal/relayurl"
)

// eventLine represents a relay list event for serialized JSONL writes
Expand Down Expand Up @@ -61,13 +62,33 @@ func collectCmd(args []string) {
userPubkeyPath := filepath.Join(dataDirectory, "user_pubkey.txt")
followSetsDir := filepath.Join(dataDirectory, "follow_sets")

relays := splitCSV(*relaysCSV)
if len(relays) == 0 {
relaysRaw := splitCSV(*relaysCSV)
if len(relaysRaw) == 0 {
fmt.Fprintln(os.Stderr, "no relays provided")
os.Exit(1)
}
followRelayURL := *followRelay
if followRelayURL == "" {
relays := make([]relayurl.RelayURL, 0, len(relaysRaw))
for _, raw := range relaysRaw {
r, err := relayurl.New(raw)
if err != nil {
fmt.Fprintf(os.Stderr, "warning: skipping invalid relay URL %q: %v\n", raw, err)
continue
}
relays = append(relays, r)
}
if len(relays) == 0 {
fmt.Fprintln(os.Stderr, "no valid relays provided")
os.Exit(1)
}
var followRelayURL relayurl.RelayURL
if *followRelay != "" {
r, err := relayurl.New(*followRelay)
if err != nil {
fmt.Fprintf(os.Stderr, "invalid follow-relay URL: %v\n", err)
os.Exit(1)
}
followRelayURL = r
} else {
followRelayURL = relays[0]
}

Expand All @@ -78,7 +99,7 @@ func collectCmd(args []string) {
fmt.Println("\n==> Step 1: Fetching your relay list (kind 10002)")
fmt.Printf(" Connecting to %s...\n", followRelayURL)

userRelays, err := fetchUserRelayList(ctx, followRelayURL, *pubkey, timeout)
userRelays, err := fetchUserRelayList(ctx, followRelayURL.String(), *pubkey, timeout)
if err != nil {
fmt.Fprintf(os.Stderr, "warning: failed to get your relay list from %s: %v\n", followRelayURL, err)
// Continue anyway - not critical
Expand All @@ -96,7 +117,7 @@ func collectCmd(args []string) {
fmt.Println("\n==> Step 2: Fetching your follow list (kind 3)")
fmt.Printf(" Connecting to %s...\n", followRelayURL)

follows, err := fetchFollows(ctx, followRelayURL, *pubkey, timeout)
follows, err := fetchFollows(ctx, followRelayURL.String(), *pubkey, timeout)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get follows from %s: %v\n", followRelayURL, err)
os.Exit(1)
Expand All @@ -111,7 +132,7 @@ func collectCmd(args []string) {
if err := os.MkdirAll(followSetsDir, 0o755); err != nil {
fmt.Fprintf(os.Stderr, "warning: failed to create follow_sets directory: %v\n", err)
} else {
followSets, err := fetchAndSaveFollowSets(ctx, followRelayURL, *pubkey, timeout, followSetsDir)
followSets, err := fetchAndSaveFollowSets(ctx, followRelayURL.String(), *pubkey, timeout, followSetsDir)
if err != nil {
fmt.Fprintf(os.Stderr, "warning: failed to get follow sets from %s: %v\n", followRelayURL, err)
} else {
Expand Down Expand Up @@ -216,18 +237,18 @@ func collectCmd(args []string) {
semaphore := make(chan struct{}, *parallel)
var wg sync.WaitGroup

for _, relayURL := range relays {
for _, relay := range relays {
semaphore <- struct{}{}
wg.Add(1)
go func(url string) {
go func(r relayurl.RelayURL) {
defer wg.Done()
defer func() { <-semaphore }()

if err := fetchAllBatches(ctx, url, batches, timeout, eventChan, progress); err != nil {
if err := fetchAllBatches(ctx, r, batches, timeout, eventChan, progress); err != nil {
// Log errors but continue with other relays
fmt.Fprintf(os.Stderr, " ⚠ Error from %s: %v\n", url, err)
fmt.Fprintf(os.Stderr, " ⚠ Error from %s: %v\n", r, err)
}
}(relayURL)
}(relay)
}

wg.Wait()
Expand Down Expand Up @@ -301,11 +322,12 @@ func fetchUserRelayList(ctx context.Context, relayURL, pubkey string, timeout ti
// Extract relay URLs from r-tags
for _, tag := range event.Tags {
if len(tag) >= 2 && tag[0] == "r" {
relayURL := strings.TrimSpace(tag[1])
// Only include valid relay URLs (no query params, etc)
if isValidRelayURL(relayURL) {
relays = append(relays, relayURL)
raw := strings.TrimSpace(tag[1])
u, err := relayurl.New(raw)
if err != nil {
continue
}
relays = append(relays, u.String())
}
}
}
Expand Down Expand Up @@ -587,24 +609,24 @@ func sanitizeFilename(s string) string {
}

// fetchAllBatches opens one connection to a relay and processes all batches sequentially
func fetchAllBatches(ctx context.Context, relayURL string, batches [][]string, timeout time.Duration,
func fetchAllBatches(ctx context.Context, relay relayurl.RelayURL, batches [][]string, timeout time.Duration,
out chan<- eventLine, progress *progressTracker) error {

// Connect once to the relay
connectCtx, connectCancel := context.WithTimeout(ctx, timeout)
defer connectCancel()

relay, err := nostr.RelayConnect(connectCtx, relayURL)
relayConn, err := nostr.RelayConnect(connectCtx, relay.String())
if err != nil {
return fmt.Errorf("relay connect: %w", err)
}
defer relay.Close()
defer relayConn.Close()

// Process each batch with a new subscription on the same connection
for batchIdx, authors := range batches {
if err := fetchBatch(ctx, relay, relayURL, authors, batchIdx, timeout, out); err != nil {
if err := fetchBatch(ctx, relayConn, relay.String(), authors, batchIdx, timeout, out); err != nil {
// Log error but continue with next batch
fmt.Fprintf(os.Stderr, " ⚠ Error from %s batch %d: %v\n", relayURL, batchIdx+1, err)
fmt.Fprintf(os.Stderr, " ⚠ Error from %s batch %d: %v\n", relay, batchIdx+1, err)
}
progress.batchesDone.Add(1)
}
Expand Down
38 changes: 25 additions & 13 deletions gen_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"path/filepath"
"sort"
"strings"

"github.com/relaytools/feedbuilder/internal/relayurl"
)

type streamConfig struct {
Expand Down Expand Up @@ -106,7 +108,9 @@ func greedySelectAndAssignN(relayAuthors map[string][]string, replicas int) ([]s
assigned[r] = uniqueSorted(assigned[r])
}
for i := range selected {
selected[i] = normalizeURL(selected[i])
if u, err := relayurl.New(selected[i]); err == nil {
selected[i] = u.String()
}
}
return selected, assigned
}
Expand Down Expand Up @@ -152,12 +156,13 @@ func genRouterCmd(args []string) {
continue
}
pk := strings.ToLower(fields[0])
rurl := normalizeURL(strings.Join(fields[1:], " "))
if _, ok := followsSet[pk]; !ok {
rurlRaw := strings.Join(fields[1:], " ")
u, err := relayurl.New(rurlRaw)
if err != nil {
continue
}
// Skip invalid relay URLs
if !isValidRelayURL(rurl) {
rurl := u.String()
if _, ok := followsSet[pk]; !ok {
continue
}
relayAuthors[rurl] = append(relayAuthors[rurl], pk)
Expand All @@ -177,7 +182,9 @@ func genRouterCmd(args []string) {
var streams []streamConfig
// Create per-relay down streams for selected relays with their assigned authors
for _, relay := range selected {
relay = normalizeURL(relay)
if u, err := relayurl.New(relay); err == nil {
relay = u.String()
}
auths := assigned[relay]
if len(auths) == 0 {
continue
Expand Down Expand Up @@ -257,11 +264,13 @@ func genRouterCmd(args []string) {

// Load user's relay list from file and filter out invalid URLs
userRelaysRaw := readLinesIfExists(userRelayListFile)
var userRelays []string
for _, relay := range userRelaysRaw {
if isValidRelayURL(relay) {
userRelays = append(userRelays, relay)
userRelays := make([]string, 0, len(userRelaysRaw))
for _, relayLine := range userRelaysRaw {
u, err := relayurl.New(relayLine)
if err != nil {
continue
}
userRelays = append(userRelays, u.String())
}
if len(userRelays) == 0 {
fmt.Fprintf(os.Stderr, "warning: no user relay list found at %s, skipping notification streams\n", userRelayListFile)
Expand All @@ -271,7 +280,6 @@ func genRouterCmd(args []string) {

// Add stream for notifications mentioning user (inbox)
for _, relay := range userRelays {
relay = normalizeURL(relay)
name := fmt.Sprintf("notifs_inbox_%s", safeName(relay))
streams = append(streams, streamConfig{
Name: name,
Expand Down Expand Up @@ -300,7 +308,9 @@ func readLinesMust(path string) []string {
os.Exit(1)
}
for i := range lines {
lines[i] = normalizeURL(lines[i])
if u, err := relayurl.New(lines[i]); err == nil {
lines[i] = u.String()
}
}
return lines
}
Expand All @@ -311,7 +321,9 @@ func readLinesIfExists(path string) []string {
return nil
}
for i := range lines {
lines[i] = normalizeURL(lines[i])
if u, err := relayurl.New(lines[i]); err == nil {
lines[i] = u.String()
}
}
return lines
}
Expand Down
Loading