diff --git a/tgrag/experiments/egonets/get_topk.py b/tgrag/experiments/egonets/get_topk.py new file mode 100755 index 00000000..fc4a9f99 --- /dev/null +++ b/tgrag/experiments/egonets/get_topk.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import argparse +import csv +import gzip +import heapq +import sys +from pathlib import Path +from typing import Iterable + +from tgrag.utils.matching import flip_if_needed + +DEFAULT_CRAWL = 'CC-MAIN-2025-05' +DEFAULT_PC1_FILE = Path('../../../data/dqr/domain_pc1.csv') + + +def parse_args(argv: list[str]) -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument( + 'scratch', + type=Path, + help='Scratch directory that contains crawl-data//output1/vertices.csv.gz', + ) + p.add_argument( + 'k', + nargs='?', + type=int, + default=50, + help='K for top/bottom selection (default: 50)', + ) + p.add_argument( + '--crawl', + type=str, + default=DEFAULT_CRAWL, + help=f'Common Crawl name (default: {DEFAULT_CRAWL})', + ) + p.add_argument( + '--pc1-file', + type=Path, + default=DEFAULT_PC1_FILE, + help=f'CSV with columns including domain,pc1 (default: {DEFAULT_PC1_FILE})', + ) + return p.parse_args(argv) + + +def norm_domain(raw: str) -> str: + return flip_if_needed(raw.strip().lower()) + + +def load_pc1_map(path: Path) -> dict[str, float]: + pc1_map: dict[str, float] = {} + with path.open(newline='') as f: + reader = csv.DictReader(f) + if not reader.fieldnames: + raise ValueError(f'PC1 file has no header: {path}') + + # Be slightly defensive about column naming. + domain_key = 'domain' + pc1_key = 'pc1' + if domain_key not in reader.fieldnames or pc1_key not in reader.fieldnames: + raise ValueError( + f'PC1 file must include columns {domain_key!r} and {pc1_key!r}. ' + f'Found: {reader.fieldnames}' + ) + + for row in reader: + dom = norm_domain(row[domain_key]) + try: + pc1 = float(row[pc1_key]) + except (TypeError, ValueError): + continue + pc1_map[dom] = pc1 + return pc1_map + + +def iter_vertices_rows(vertices_gz: Path) -> tuple[list[str], Iterable[list[str]]]: + """Returns (header, row_iter). Each row is a list[str] from csv.reader.""" + f = gzip.open(vertices_gz, 'rt', newline='') + reader = csv.reader(f) + + try: + header = next(reader) + except StopIteration as e: + f.close() + raise ValueError(f'Vertices file is empty: {vertices_gz}') from e + + def _rows() -> Iterable[list[str]]: + try: + for row in reader: + yield row + finally: + f.close() + + return header, _rows() + + +def main(argv: list[str]) -> int: + args = parse_args(argv) + + if args.k <= 0: + raise ValueError(f'k must be positive; got {args.k}') + + vertices_gz = ( + args.scratch / 'crawl-data' / args.crawl / 'output1' / 'vertices.csv.gz' + ) + if not vertices_gz.exists(): + raise FileNotFoundError(f'Vertices file not found: {vertices_gz}') + + if not args.pc1_file.exists(): + raise FileNotFoundError(f'PC1 file not found: {args.pc1_file}') + + pc1_map = load_pc1_map(args.pc1_file) + + header, rows_iter = iter_vertices_rows(vertices_gz) + + best_row_by_domain: dict[str, tuple[float, list[str]]] = {} + + for row in rows_iter: + if not row: + continue + raw_dom = row[0] + dom = norm_domain(raw_dom) + pc1 = pc1_map.get(dom) + if pc1 is None: + continue + best_row_by_domain.setdefault(dom, (pc1, row)) + + top_heap: list[tuple[float, str, list[str]]] = [] + bottom_heap: list[tuple[float, str, list[str]]] = [] # stores (-pc1, dom, row) + k = args.k + + for dom, (pc1, row) in best_row_by_domain.items(): + top_item = (pc1, dom, row) + if len(top_heap) < k: + heapq.heappush(top_heap, top_item) + elif pc1 > top_heap[0][0]: + heapq.heapreplace(top_heap, top_item) + + neg_item = (-pc1, dom, row) + if len(bottom_heap) < k: + heapq.heappush(bottom_heap, neg_item) + elif -pc1 > bottom_heap[0][0]: + heapq.heapreplace(bottom_heap, neg_item) + + topk = sorted(top_heap, key=lambda x: x[0], reverse=True) + bottomk = sorted( + bottom_heap, key=lambda x: x[0] + ) # most negative first => lowest pc1 + + out = csv.writer(sys.stdout, lineterminator='\n') + + sys.stdout.write('### TOP K ###\n') + out.writerow(['domain', 'pc1', *header[1:]]) + for pc1, dom, row in topk: + out.writerow([dom, f'{pc1}', *row[1:]]) + + sys.stdout.write('\n### BOTTOM K ###\n') + out.writerow(['domain', 'pc1', *header[1:]]) + for neg_pc1, dom, row in bottomk: + pc1 = -neg_pc1 + out.writerow([dom, f'{pc1}', *row[1:]]) + + return 0 + + +if __name__ == '__main__': + raise SystemExit(main(sys.argv[1:])) diff --git a/tgrag/experiments/egonets/topk_by_credibility.csv b/tgrag/experiments/egonets/topk_by_credibility.csv new file mode 100644 index 00000000..6b878e4b --- /dev/null +++ b/tgrag/experiments/egonets/topk_by_credibility.csv @@ -0,0 +1,105 @@ +### TOP K ### +domain,pc1,ts,in_deg,out_deg +apnews.com,0.998049167729267,20250127,66304,0 +charitynavigator.org,0.985751566494374,20250127,9924,1073 +rollcall.com,0.982850843623789,20250127,5,0 +smithsonianmag.com,0.971184072481041,20250127,29646,22687 +climateactiontracker.org,0.970824619355302,20250127,2452,89 +stripes.com,0.970705909096091,20250127,5099,537 +drugs.com,0.965142662952843,20250127,6,1 +africacheck.org,0.964973906244947,20250127,860,0 +usaspending.gov,0.964973906244947,20250127,1625,2 +cbo.gov,0.964973906244947,20250127,6742,181 +nature.com,0.964534054690512,20250127,0,27 +cdc.gov,0.964445730133301,20250127,187973,4270 +leadstories.com,0.963826303031933,20250127,627,7045 +climatefeedback.org,0.963420694296498,20250127,348,544 +sandiegouniontribune.com,0.961102639657948,20250127,3,10 +indystar.com,0.957207994613488,20250127,9,0 +nasa.gov,0.956315766887341,20250127,41128,1506 +hopkinsmedicine.org,0.956072309009315,20250127,30696,1364 +armytimes.com,0.955899260798174,20250127,2432,1180 +skepticalinquirer.org,0.95553129695965,20250127,590,0 +afp.com,0.954600577620557,20250127,4495,116 +cnet.com,0.951854977853014,20250127,5,0 +militarytimes.com,0.951497998550873,20250127,4682,2194 +nationmaster.com,0.950775844278038,20250127,2152,229 +prri.org,0.950775844278038,20250127,1515,536 +truthorfiction.com,0.950775844278038,20250127,0,10 +bringmethenews.com,0.950669426837722,20250127,875,503 +upi.com,0.950555959253076,20250127,0,5 +americanscientist.org,0.950370235542603,20250127,1976,0 +airspacemag.com,0.950370235542603,20250127,1121,0 +tennessean.com,0.949767015915909,20250127,4,0 +newsnationnow.com,0.946742643739216,20250127,3296,0 +arstechnica.com,0.946362498302995,20250127,30882,0 +verafiles.org,0.944724772225377,20250127,229,1258 +biomedcentral.com,0.944319163489942,20250127,6964,0 +aaas.org,0.944319163489942,20250127,4024,0 +apa.org,0.944319163489942,20250127,42446,0 +nejm.org,0.944319163489942,20250127,12,0 +medpagetoday.com,0.944057016812522,20250127,6,0 +voanews.com,0.943723767994693,20250127,16465,1759 +edge.org,0.942677265108602,20250127,13,22 +skeptoid.com,0.942677265108602,20250127,765,1140 +afr.com,0.941320926225632,20250127,3,32 +syracuse.com,0.940444469429823,20250127,5,0 +newscientist.com,0.935582630250769,20250127,23636,0 +sciencebasedmedicine.org,0.935582630250769,20250127,2385,1477 +csmonitor.com,0.935094190129455,20250127,15276,3302 +eagletribune.com,0.934656389724824,20250127,1059,0 +medscape.com,0.934094911736869,20250127,12063,0 +patch.com,0.933491002134259,20250127,5,0 + +### BOTTOM K ### +domain,pc1,ts,in_deg,out_deg +naturalhealth365.com,0.105014708866534,20250127,685,1054 +theforbiddenknowledge.com,0.103489056918317,20250127,321,4 +sonsoflibertymedia.com,0.103216003511302,20250127,287,0 +nvic.org,0.0936019688779002,20250127,889,588 +jesus-is-savior.com,0.0928594665531297,20250127,559,1151 +dcdirtylaundry.com,0.0916982813557337,20250127,150,12 +usareally.com,0.0916982813557337,20250127,34,2 +creation.com,0.0914843426995647,20250127,3,5 +cfact.org,0.0914843426995647,20250127,501,4 +fabiosa.com,0.0905862324182262,20250127,124,0 +trump.news,0.0903891710036623,20250127,336,527 +wakingtimes.com,0.0901725391626428,20250127,1095,3691 +majalla.com,0.0901558716143552,20250127,178,26 +worldnewspolitics.com,0.0901558716143552,20250127,16,1 +freedomadvocates.org,0.0886084846223804,20250127,70,82 +shoebat.com,0.0886084846223804,20250127,466,1468 +americanjournalreview.com,0.0878375859873984,20250127,19,0 +welovetrump.com,0.0871406069110147,20250127,2,2 +explainlife.com,0.0866873538647112,20250127,18,0 +truthuncensored.net,0.0853217706757043,20250127,57,0 +learntherisk.org,0.0772812008277641,20250127,141,0 +pdmj.org,0.075993329502495,20250127,62,0 +gaia.com,0.075993329502495,20250127,116,0 +nowtheendbegins.com,0.0731174714253107,20250127,542,554 +consumerwellness.org,0.0731174714253107,20250127,65,12 +counterthink.com,0.0723314494829261,20250127,50,5 +newstarget.com,0.0723314494829261,20250127,1462,7656 +climate.news,0.0723314494829261,20250127,1736,3780 +awarenessact.com,0.0705539230341695,20250127,224,1669 +chlorellafactor.com,0.0622998357836465,20250127,14,14 +alternativenews.com,0.0622998357836465,20250127,648,0 +biodefense.com,0.0622998357836465,20250127,13,37 +darkjournalist.com,0.0603583657276983,20250127,50,0 +skeptiko.com,0.0603583657276983,20250127,172,259 +biggovernment.news,0.060130193404361,20250127,238,696 +bugout.news,0.0594239791741224,20250127,62,574 +campusinsanity.com,0.0594239791741224,20250127,84,634 +ecology.news,0.0594239791741224,20250127,62,183 +conspiracydailyupdate.com,0.0575930709941221,20250127,38,0 +geoengineeringwatch.org,0.057482507650514,20250127,926,2176 +humansarefree.com,0.0572211982440859,20250127,7,0 +beforeitsnews.com,0.0560036536327054,20250127,3716,1385 +infowars.com,0.0458475179096454,20250127,2,3 +prisonplanet.com,0.0447515223913456,20250127,1690,0 +stormfront.org,0.0447515223913456,20250127,585,4053 +rense.com,0.0447515223913456,20250127,2825,632 +pandemic.news,0.0438171358377697,20250127,447,601 +davidicke.com,0.0416143549077332,20250127,1334,0 +worldtruth.tv,0.017643643892146,20250127,348,0 +naturalnews.com,0.0,20250127,8025,8636 diff --git a/tgrag/experiments/subnetworks/README.MD b/tgrag/experiments/subnetworks/README.MD new file mode 100644 index 00000000..f77263cb --- /dev/null +++ b/tgrag/experiments/subnetworks/README.MD @@ -0,0 +1,9 @@ +# Subnetwork analysis + +Attempting to discern the structural & temporal characteristics of credible v. non-credible subnetworks in our data. + +- Goal: analyse non-credible and credible subnets across months, monitor their features + - Ideally, also looking at other signlas (mbfc, semi-supervised signals) + - Currently 2-hop, could we go further? Is there any incentive to? + +> **Ego Nets:** from [the literature](https://dl.acm.org/doi/pdf/10.1145/2392622.2392624), an ego network is a social network formed of an individual (ego) and the people with whom ego is in contact (alters). diff --git a/tgrag/experiments/subnetworks/build_seeds.py b/tgrag/experiments/subnetworks/build_seeds.py new file mode 100644 index 00000000..17526448 --- /dev/null +++ b/tgrag/experiments/subnetworks/build_seeds.py @@ -0,0 +1,91 @@ +import argparse +import gzip +import os +from typing import Dict + +# import preprocess_edges from your construct_networks.py +from construct_networks import preprocess_edges + +from tgrag.utils.data_loading import load_dqr +from tgrag.utils.matching import flip_if_needed, reverse_domain + + +def ensure_edges_by_src(edges_csv_gz: str, edges_by_src_gz: str) -> None: + """Create edges_by_src.tsv.gz (srcdst, sorted by src) if it does not exist.""" + if os.path.exists(edges_by_src_gz): + print(f'[INFO] Reusing existing {edges_by_src_gz}') + return + + print( + f'[INFO] Building edges_by_src.tsv.gz from {edges_csv_gz} → {edges_by_src_gz}' + ) + env = os.environ.copy() + env['LC_ALL'] = 'C' + preprocess_edges(edges_csv_gz, edges_by_src_gz, env) + + +def main() -> None: + ap = argparse.ArgumentParser(description='Build graph-native seeds from DQR.') + ap.add_argument( + '--edges_csv_gz', + required=True, + help='Path to edges.csv.gz (src,dst,ts with header).', + ) + ap.add_argument( + '--edges_by_src_gz', + required=True, + help='Path to edges_by_src.tsv.gz (srcdst, sorted by src). ' + 'Will be created if it does not exist.', + ) + ap.add_argument( + '--dqr_csv', + required=True, + help="Path to domain_ratings.csv (must contain 'domain,pc1,...').", + ) + ap.add_argument( + '--out_tsv', + required=True, + help='Output seeds TSV: graph_node_idpc1_root.', + ) + args = ap.parse_args() + + ensure_edges_by_src(args.edges_csv_gz, args.edges_by_src_gz) + + print(f'[INFO] loading DQR ratings from {args.dqr_csv} ...') + dqr = load_dqr(args.dqr_csv) + + print(f'[INFO] scanning graph nodes in {args.edges_by_src_gz} and matching...') + matched: Dict[str, float] = {} + total_seen = 0 + + with gzip.open(args.edges_by_src_gz, 'rt') as f: + for line in f: + if not line: + continue + total_seen += 1 + src = line.split('\t', 1)[0].strip().lower() + + candidates = { # TODO change this to just flip_if_needed. + # just need to test. + flip_if_needed(src), + flip_if_needed(reverse_domain(src)), + src, + } + for cand in candidates: + pc1 = dqr.get(cand) + if pc1 is not None: + matched[src] = pc1 + break + + print(f'[INFO] scanned {total_seen} edges (src nodes)') + print(f'[INFO] matched {len(matched)} graph-native roots') + + os.makedirs(os.path.dirname(args.out_tsv), exist_ok=True) + print(f'[INFO] writing seeds → {args.out_tsv}') + with open(args.out_tsv, 'w') as out: + for node, pc1 in sorted(matched.items()): + out.write(f'{node}\t{pc1}\n') + + +if __name__ == '__main__': + main() diff --git a/tgrag/experiments/subnetworks/construct_networks.py b/tgrag/experiments/subnetworks/construct_networks.py new file mode 100644 index 00000000..061deede --- /dev/null +++ b/tgrag/experiments/subnetworks/construct_networks.py @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 +import argparse +import os +import subprocess +import tempfile +from typing import Dict, List, Optional + +from tgrag.utils.data_loading import shlex_quote + + +def sort_file_inplace_tsv(file_path: str, key_fields: str = '-k1,1') -> None: + """Enforce LC_ALL=C byte-order sorting on a TSV file.""" + if not os.path.exists(file_path): + return + + tmp = file_path + '.sorted' + env = os.environ.copy() + env['LC_ALL'] = 'C' + + key_parts = key_fields.split() + + cmd = ['sort', '-t', '\t'] + key_parts + [file_path] + with open(tmp, 'wb') as out_f: + subprocess.check_call(cmd, env=env, stdout=out_f) + + os.replace(tmp, file_path) + + +def assert_sorted_tsv(file_path: str, key_fields: str = '-k1,1') -> None: + """Validate that a TSV file is sorted lexicographically by the given keys.""" + if not os.path.exists(file_path): + return + + env = os.environ.copy() + env['LC_ALL'] = 'C' + key_parts = key_fields.split() + + cmd = ['sort', '-c', '-t', '\t'] + key_parts + [file_path] + subprocess.check_call(cmd, env=env) + + +def run_checked(cmd: List[str], env: Optional[Dict[str, str]] = None) -> None: + print(f"[DEBUG] Running: {' '.join(shlex_quote(c) for c in cmd)}") + result = subprocess.run(cmd, env=env) + if result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, cmd) + + +def _atomic_write_from_pipeline( + cmds: List[List[str]], out_path: str, env: Optional[Dict[str, str]] = None +) -> None: + os.makedirs(os.path.dirname(out_path), exist_ok=True) + + with tempfile.NamedTemporaryFile( + prefix='.tmp_', suffix='.gz', dir=os.path.dirname(out_path), delete=False + ) as tmp: + tmp_path = tmp.name + + procs = [] + prev_stdout = None + try: + for i, argv in enumerate(cmds): + print( + f"[DEBUG] Pipeline stage {i}: {' '.join(shlex_quote(c) for c in argv)}" + ) + p = subprocess.Popen( + argv, + stdin=prev_stdout, + stdout=(subprocess.PIPE if i < len(cmds) - 1 else open(tmp_path, 'wb')), + env=env, + ) + if prev_stdout is not None: + prev_stdout.close() + prev_stdout = p.stdout + procs.append(p) + + retcode = 0 + for p in procs[::-1]: + p.wait() + if p.returncode != 0: + retcode = p.returncode + if retcode != 0: + raise subprocess.CalledProcessError( + retcode, ' | '.join(' '.join(c) for c in cmds) + ) + + os.replace(tmp_path, out_path) + + except Exception: + if os.path.exists(tmp_path): + os.remove(tmp_path) + raise + + +def preprocess_edges( + edges_csv_gz: str, out_edges_tsv_gz: str, env: Dict[str, str] +) -> None: + print(f'[INFO] Preprocessing edges → {out_edges_tsv_gz}') + cmds = [ + ['gzip', '-cd', '--', edges_csv_gz], + ['awk', '-F,', 'NR>1 {print $1 "\t" $2}'], + ['sort', '-t', '\t', '-k1,1'], + ['gzip', '-c'], + ] + _atomic_write_from_pipeline(cmds, out_edges_tsv_gz, env=env) + + +def make_frontier0(seeds_tsv: str, frontier0_tsv: str, env: Dict[str, str]) -> None: + print(f'[INFO] Building depth-0 frontier → {frontier0_tsv}') + bash_script = f""" +set -euo pipefail +awk -F $'\\t' '{{print $1"\\t"$1"\\t0\\t"$2}}' {shlex_quote(seeds_tsv)} | + sort -t $'\\t' -k1,1 -k2,2 > {shlex_quote(frontier0_tsv)} +""" + run_checked(['/bin/bash', '-lc', bash_script], env) + + # verify + enforce sort + sort_file_inplace_tsv(frontier0_tsv, '-k1,1 -k2,2') + assert_sorted_tsv(frontier0_tsv, '-k1,1 -k2,2') + + +def bfs_step( + depth: int, + edges_by_src_tsv_gz: str, + frontier_tsv: str, + next_frontier_tsv: str, + out_edges_tsv_gz: str, + env: Dict[str, str], +) -> None: + print(f'[INFO] BFS depth {depth} → {depth+1}') + + # frontier must be sorted by node + assert_sorted_tsv(frontier_tsv, '-k1,1') + + bash_script = f""" +set -euo pipefail + +gzip -cd -- {shlex_quote(edges_by_src_tsv_gz)} | + join -t $'\\t' -1 1 -2 1 - {shlex_quote(frontier_tsv)} | + tee >( + awk -F $'\\t' '{{ printf "%s\\t%s\\t%d\\t%s\\n", $2, $3, $4+1, $5 }}' | + sort -t $'\\t' -k1,1 -k2,2 -u > {shlex_quote(next_frontier_tsv)} + ) | + awk -F $'\\t' '{{ printf "%s\\t%s\\t%s\\t%d\\t%d\\t%s\\n", $3, $1, $2, $4, $4+1, $5 }}' | + gzip -c +""" + + _atomic_write_from_pipeline( + [['/bin/bash', '-lc', bash_script]], out_edges_tsv_gz, env + ) + + # enforce sorting on next frontier + sort_file_inplace_tsv(next_frontier_tsv, '-k1,1 -k2,2') + assert_sorted_tsv(next_frontier_tsv, '-k1,1 -k2,2') + + +def merge_edges( + edge_files_gz: List[str], out_merged_gz: str, env: Dict[str, str] +) -> None: + print(f'[INFO] Merging edges → {out_merged_gz}') + cmds = [ + ['gzip', '-cd', '--', *edge_files_gz], + ['sort', '-t', '\t', '-k1,1', '-k2,2', '-k3,3', '-k4,4n', '-k5,5n'], + ['gzip', '-c'], + ] + _atomic_write_from_pipeline(cmds, out_merged_gz, env) + + +def merge_nodes( + frontier_files: List[str], out_nodes_tsv: str, env: Dict[str, str] +) -> None: + """Merge frontier_* files (node,root,depth,pc1) into: + noderootmin_depthpc1_root. + + With streaming and external sort. + """ + print(f'[INFO] Merging node frontiers → {out_nodes_tsv}') + tmp = out_nodes_tsv + '.tmp' + + awk_first_per_key = r""" + BEGIN { FS="\t"; OFS="\t"; prev="" } + { + key = $2 FS $1; # root \t node + if (key != prev) { + print $1, $2, $3, $4; # node, root, depth(min), pc1 + prev = key; + } + } + """ + + bash_script = f""" +set -euo pipefail + +# Sort by (root, node, depth) so first row per (root,node) has min depth +cat {" ".join(shlex_quote(f) for f in frontier_files)} | + LC_ALL=C sort -T {shlex_quote(os.path.dirname(out_nodes_tsv))} -S 50% -t $'\\t' -k2,2 -k1,1 -k3,3n | + awk '{awk_first_per_key}' > {shlex_quote(tmp)} +""" + run_checked(['/bin/bash', '-lc', bash_script], env) + + os.replace(tmp, out_nodes_tsv) + + sort_file_inplace_tsv(out_nodes_tsv, '-k2,2 -k1,1') + assert_sorted_tsv(out_nodes_tsv, '-k2,2 -k1,1') + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--edges', required=True) + parser.add_argument('--ratings', required=True) + parser.add_argument('--outdir', required=True) + args = parser.parse_args() + + scratch = os.environ.get('SCRATCH', None) + if scratch is None: + raise RuntimeError('SCRATCH environment variable not set.') + + outdir = args.outdir + os.makedirs(outdir, exist_ok=True) + print(f'[INFO] Output directory: {outdir}') + + env = os.environ.copy() + env['LC_ALL'] = 'C' + + edges_by_src = os.path.join(outdir, 'edges_by_src.tsv.gz') + seeds_tsv = os.path.join(outdir, 'seeds_graph_native.tsv') + frontier0 = os.path.join(outdir, 'frontier_depth0.tsv') + frontier1 = os.path.join(outdir, 'frontier_depth1.tsv') + frontier2 = os.path.join(outdir, 'frontier_depth2.tsv') + edges_d0 = os.path.join(outdir, 'edges_depth0.tsv.gz') + edges_d1 = os.path.join(outdir, 'edges_depth1.tsv.gz') + merged_edges = os.path.join(outdir, 'subgraph_edges.tsv.gz') + merged_nodes = os.path.join(outdir, 'subgraph_nodes.tsv') + + if not os.path.exists(edges_by_src): + preprocess_edges(args.edges, edges_by_src, env) + else: + print('[INFO] Reusing edges_by_src') + + if not os.path.exists(seeds_tsv): + raise RuntimeError( + f'Missing {seeds_tsv} — must run build_graph_native_seeds.py first.' + ) + + print(f'[INFO] Using seeds from {seeds_tsv}') + sort_file_inplace_tsv(seeds_tsv, '-k1,1') + assert_sorted_tsv(seeds_tsv, '-k1,1') + + # Build frontier 0 + if not os.path.exists(frontier0): + make_frontier0(seeds_tsv, frontier0, env) + else: + print('[INFO] Reusing frontier0') + assert_sorted_tsv(frontier0, '-k1,1 -k2,2') + + # BFS levels + bfs_step(0, edges_by_src, frontier0, frontier1, edges_d0, env) + bfs_step(1, edges_by_src, frontier1, frontier2, edges_d1, env) + + # Merge results + merge_edges([edges_d0, edges_d1], merged_edges, env) + merge_nodes([frontier0, frontier1, frontier2], merged_nodes, env) + + print('[INFO] Finished merging global subnetworks.') + + egos_dir = os.path.join(outdir, 'egos') + os.makedirs(egos_dir, exist_ok=True) + print(f'[INFO] Splitting subnetworks per ego → {egos_dir}') + + nodes_split_script = f""" + awk -F $'\\t' ' + NR>1 {{ + node=$1; root=$2; + out=sprintf("{egos_dir}/%s.nodes", root); + print >> out; + }} + ' {shlex_quote(merged_nodes)} + """ + run_checked(['/bin/bash', '-lc', nodes_split_script], env) + + edges_split_script = f""" + gzip -cd -- {shlex_quote(merged_edges)} | + awk -F $'\\t' ' + {{ + root=$1; + out=sprintf("{egos_dir}/%s.edges", root); + print >> out; + }} + ' + """ + run_checked(['/bin/bash', '-lc', edges_split_script], env) + + print(f'[INFO] Per-ego subnetworks in: {egos_dir}') + print('[INFO] Example: ls -lh egos/*.nodes') + + +if __name__ == '__main__': + main() diff --git a/tgrag/utils/data_loading.py b/tgrag/utils/data_loading.py index f2729de5..21a3b0ef 100644 --- a/tgrag/utils/data_loading.py +++ b/tgrag/utils/data_loading.py @@ -1,4 +1,6 @@ +import csv import gzip +import shlex import subprocess from datetime import date from pathlib import Path @@ -19,6 +21,15 @@ def iso_week_to_timestamp(iso_week_str: str) -> str: return monday_date.strftime('%Y%m%d') +def load_dqr(path: str) -> Dict[str, float]: + dqr = {} + with open(path, newline='') as f: + r = csv.DictReader(f) + for row in r: + dqr[row['domain'].strip().lower()] = float(row['pc1']) + return dqr + + def run(cmd: List[str], check: bool = True) -> subprocess.CompletedProcess[str]: p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if check and p.returncode != 0: @@ -91,6 +102,10 @@ def read_edge_file(path: str, id_to_domain: Dict[int, str]) -> Set[Tuple[str, st return result +def shlex_quote(s: str) -> str: + return shlex.quote(s) + + def open_file(path: str) -> Callable[..., IO]: return gzip.open if path.endswith('.gz') else open