diff --git a/.gitignore b/.gitignore index 3872603b..1ff911c5 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,41 @@ logs/scdbserver.log *.DS_Store # ignore bazelrc for remote cache -.remote.bazelrc \ No newline at end of file +.remote.bazelrc +# 本地bazel配置覆盖文件 +.bazelrc.user + +# scdb-tutorial 配置和日志文件 +examples/scdb-tutorial/scdb/conf/config.yml +examples/scdb-tutorial/scdb/conf/config_local.yml +examples/scdb-tutorial/scdb/conf/config_hive.yml +examples/scdb-tutorial/logs/ +examples/scdb-tutorial/hive/logs/ + +# 本地配置文件(包含密码等敏感信息) +examples/scdb-tutorial/engine/*/conf/gflags_local.conf + +# scdb-tutorial 数据库文件 +examples/scdb-tutorial/scdb/*.db + +# 根目录编译生成的可执行文件 +/scdbclient +/scqltool +/brokerctl + +# 根目录日志文件 +/logs/ + +# 本地临时测试文件和脚本 +/create_*.sql +/test_*.sh +/setup_local_env.py + +# tutorial 本地配置和脚本文件 +examples/scdb-tutorial/client/users_local.json +examples/scdb-tutorial/configure_local.py +examples/scdb-tutorial/init_database_local.sh +examples/scdb-tutorial/start_all.sh +examples/scdb-tutorial/start_all_hive.sh +examples/scdb-tutorial/stop_all.sh +examples/scdb-tutorial/stop_all_hive.sh diff --git a/engine/datasource/datasource.proto b/engine/datasource/datasource.proto index 855e8332..5a0983ec 100644 --- a/engine/datasource/datasource.proto +++ b/engine/datasource/datasource.proto @@ -27,6 +27,7 @@ enum DataSourceKind { ARROWSQL = 5; GRPC = 6; DATAPROXY = 7; + HIVE = 8; } message DataSource { @@ -38,4 +39,4 @@ message DataSource { // concrete data source connection string // It is comprehend to related data source adaptor. string connection_str = 4; -}; \ No newline at end of file +}; diff --git a/engine/datasource/datasource_adaptor_mgr.cc b/engine/datasource/datasource_adaptor_mgr.cc index 418cdc7f..efce1455 100644 --- a/engine/datasource/datasource_adaptor_mgr.cc +++ b/engine/datasource/datasource_adaptor_mgr.cc @@ -75,8 +75,10 @@ void DatasourceAdaptorMgr::RegisterBuiltinAdaptorFactories() { {DataSourceKind::DATAPROXY, std::make_shared()}); factory_maps_.insert( {DataSourceKind::CSVDB, std::make_shared()}); - factory_maps_.insert( - {DataSourceKind::ARROWSQL, std::make_shared()}); + auto arrow_sql_adaptor_factory = std::make_shared(); + factory_maps_.insert({DataSourceKind::ARROWSQL, arrow_sql_adaptor_factory}); + // Hive uses Arrow Flight SQL protocol for better performance and native columnar support + factory_maps_.insert({DataSourceKind::HIVE, arrow_sql_adaptor_factory}); } -} // namespace scql::engine \ No newline at end of file +} // namespace scql::engine diff --git a/examples/scdb-tutorial/client/users_local.json b/examples/scdb-tutorial/client/users_local.json new file mode 100644 index 00000000..debc0d82 --- /dev/null +++ b/examples/scdb-tutorial/client/users_local.json @@ -0,0 +1,14 @@ +{ + "alice": { + "UserName": "alice", + "Password": "some_password" + }, + "bob": { + "UserName": "bob", + "Password": "another_password" + }, + "root": { + "UserName": "root", + "Password": "root" + } +} diff --git a/examples/scdb-tutorial/configure_local.py b/examples/scdb-tutorial/configure_local.py new file mode 100644 index 00000000..82bd2c58 --- /dev/null +++ b/examples/scdb-tutorial/configure_local.py @@ -0,0 +1,86 @@ +import os +import sys +import glob + +# Configuration +WORK_DIR = os.getcwd() +if not WORK_DIR.endswith("examples/scdb-tutorial"): + print("Error: Please run this script from 'examples/scdb-tutorial' directory") + sys.exit(1) + +# User input for MySQL password +mysql_password = input("Please enter your local MySQL root password: ").strip() +if not mysql_password: + print("Error: Password cannot be empty") + sys.exit(1) + +# Paths to config files +alice_conf = "engine/alice/conf/gflags.conf" +bob_conf = "engine/bob/conf/gflags.conf" +scdb_conf = "scdb/conf/config.yml" +scdb_host = "scdb/conf/config.yml" + +def update_file(filepath, replacements): + with open(filepath, 'r') as f: + content = f.read() + + for old, new in replacements.items(): + content = content.replace(old, new) + + with open(filepath, 'w') as f: + f.write(content) + print(f"Updated {filepath}") + +# 1. Update Alice Config +# We need to read from current file which might have random password from setup.sh +# But it's easier to read the file and replace the whole connection string regex, +# or just simpler: re-read .template if available? +# Let's try to be robust and read the template if exists, else read the file. +# However, setup.sh modifies files in place if they don't end in .template (it generates them from .template) +# So we can re-generate from .template + +def process_template(template_path, output_path, replacements): + if not os.path.exists(template_path): + print(f"Warning: Template {template_path} not found, skipping.") + return + with open(template_path, 'r') as f: + content = f.read() + + for old, new in replacements.items(): + content = content.replace(old, new) + + with open(output_path, 'w') as f: + f.write(content) + print(f"Generated {output_path} from template") + +# Replacements for Alice +alice_replacements = { + "__MYSQL_ROOT_PASSWD__": mysql_password, + "/home/admin/engine/conf": f"{WORK_DIR}/engine/alice/conf", + "host=mysql": "host=127.0.0.1", + # Fix setup.sh's randomness if we are running on top of it + # But since we use template, we don't care about previous random password +} +process_template("engine/alice/conf/gflags.conf.template", alice_conf, alice_replacements) + +# Replacements for Bob +bob_replacements = { + "__MYSQL_ROOT_PASSWD__": mysql_password, + "/home/admin/engine/conf": f"{WORK_DIR}/engine/bob/conf", + "host=mysql": "host=127.0.0.1", + "--listen_port=8003": "--listen_port=8004", # Change Port! +} +process_template("engine/bob/conf/gflags.conf.template", bob_conf, bob_replacements) + +# Replacements for SCDB +scdb_replacements = { + "__MYSQL_ROOT_PASSWD__": mysql_password, + "mysql:3306": "127.0.0.1:3306", +} +process_template("scdb/conf/config.yml.template", scdb_conf, scdb_replacements) + +print("\nConfiguration files updated successfully!") +print(f"Alice DB Port: 8003 (default)") +print(f"Bob DB Port: 8004 (modified)") +print(f"SCDB Port: 8080 (default)") + diff --git a/examples/scdb-tutorial/engine/alice/conf/gflags_hive.conf b/examples/scdb-tutorial/engine/alice/conf/gflags_hive.conf new file mode 100644 index 00000000..af153fb1 --- /dev/null +++ b/examples/scdb-tutorial/engine/alice/conf/gflags_hive.conf @@ -0,0 +1,20 @@ +--listen_port=8003 +--datasource_router=embed +--enable_driver_authorization=false +--server_enable_ssl=false +--driver_enable_ssl_as_client=false +--peer_engine_enable_ssl_as_client=false +# Hive configuration using Arrow Flight SQL protocol +# Connection string format: grpc+tcp://: or grpc+tcp://:@: +# Note: Requires Arrow Flight SQL server running (e.g., Hive with Arrow Flight support, Spark Thrift Server with Arrow, etc.) +--embed_router_conf={"datasources":[{"id":"ds001","name":"hive db","kind":"HIVE","connection_str":"grpc://localhost:8815"}],"rules":[{"db":"*","table":"*","datasource_id":"ds001"}]} +# Arrow Flight SQL TLS configuration (optional) +--arrow_client_disable_server_verification=true +# --arrow_cert_pem_path=/path/to/ca.pem +# --arrow_client_key_pem_path=/path/to/client-key.pem +# --arrow_client_cert_pem_path=/path/to/client-cert.pem +# party authentication flags (disabled for testing) +--enable_self_auth=false +--enable_peer_auth=false +# --private_key_pem_path=./examples/scdb-tutorial/engine/alice/conf/ed25519key.pem +# --authorized_profile_path=./examples/scdb-tutorial/engine/alice/conf/authorized_profile.json diff --git a/examples/scdb-tutorial/engine/bob/conf/gflags_hive.conf b/examples/scdb-tutorial/engine/bob/conf/gflags_hive.conf new file mode 100644 index 00000000..25614fa1 --- /dev/null +++ b/examples/scdb-tutorial/engine/bob/conf/gflags_hive.conf @@ -0,0 +1,20 @@ +--listen_port=8004 +--datasource_router=embed +--enable_driver_authorization=false +--server_enable_ssl=false +--driver_enable_ssl_as_client=false +--peer_engine_enable_ssl_as_client=false +# Hive configuration using Arrow Flight SQL protocol +# Connection string format: grpc+tcp://: or grpc+tcp://:@: +# Note: Requires Arrow Flight SQL server running (e.g., Hive with Arrow Flight support, Spark Thrift Server with Arrow, etc.) +--embed_router_conf={"datasources":[{"id":"ds001","name":"hive db","kind":"HIVE","connection_str":"grpc://localhost:8816"}],"rules":[{"db":"*","table":"*","datasource_id":"ds001"}]} +# Arrow Flight SQL TLS configuration (optional) +--arrow_client_disable_server_verification=true +# --arrow_cert_pem_path=/path/to/ca.pem +# --arrow_client_key_pem_path=/path/to/client-key.pem +# --arrow_client_cert_pem_path=/path/to/client-cert.pem +# party authentication flags (disabled for testing) +--enable_self_auth=false +--enable_peer_auth=false +# --private_key_pem_path=./examples/scdb-tutorial/engine/bob/conf/ed25519key.pem +# --authorized_profile_path=./examples/scdb-tutorial/engine/bob/conf/authorized_profile.json diff --git a/examples/scdb-tutorial/hive/arrow_flight_server.py b/examples/scdb-tutorial/hive/arrow_flight_server.py new file mode 100644 index 00000000..6bf18d49 --- /dev/null +++ b/examples/scdb-tutorial/hive/arrow_flight_server.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +""" +Arrow Flight SQL 测试服务器 +用于模拟 Hive 后端,支持 SCQL 联合查询测试 + +此服务器实现了 Arrow Flight SQL 协议的核心功能,包括: +- GetFlightInfo: 处理 SQL 查询请求 (解析 CommandStatementQuery protobuf) +- DoGet: 返回查询结果 + +使用方法: + # 启动 Alice 服务器 (端口 8815) + python3 arrow_flight_server.py --party alice --port 8815 + + # 启动 Bob 服务器 (端口 8816) + python3 arrow_flight_server.py --party bob --port 8816 +""" + +import argparse +import pyarrow as pa +import pyarrow.flight as flight +import duckdb + + +def parse_flight_sql_command(data: bytes) -> str: + """ + 解析 Arrow Flight SQL 的 CommandStatementQuery protobuf 消息 + + CommandStatementQuery 的 protobuf 定义大致是: + message CommandStatementQuery { + string query = 1; + string transaction_id = 2; + } + + 在 wire format 中: + - Field 1 (query): tag = 0x0a (field 1, wire type 2 = length-delimited) + - 然后是 varint 长度 + - 然后是 UTF-8 编码的字符串 + """ + if not data: + return "" + + try: + # 检查是否是 google.protobuf.Any 包装 + # Any 的格式是: field 1 = type_url, field 2 = value + # type_url 通常以 "type.googleapis.com/" 开头 + if b"type.googleapis.com" in data: + # 跳过 Any 包装,查找内部的 CommandStatementQuery + # 查找 field 2 (value) 的开始位置 + idx = 0 + while idx < len(data): + if data[idx] == 0x12: # field 2, wire type 2 + idx += 1 + # 读取 varint 长度 + length, varint_size = _read_varint(data, idx) + idx += varint_size + # 提取内部消息 + inner_data = data[idx:idx+length] + # 递归解析内部消息 + return parse_flight_sql_command(inner_data) + idx += 1 + + # 尝试直接解析 CommandStatementQuery + idx = 0 + while idx < len(data): + tag = data[idx] + idx += 1 + + if tag == 0x0a: # field 1 (query), wire type 2 (length-delimited) + length, varint_size = _read_varint(data, idx) + idx += varint_size + query_bytes = data[idx:idx+length] + return query_bytes.decode("utf-8") + elif (tag & 0x07) == 2: # 其他 length-delimited 字段,跳过 + length, varint_size = _read_varint(data, idx) + idx += varint_size + length + elif (tag & 0x07) == 0: # varint 字段,跳过 + _, varint_size = _read_varint(data, idx) + idx += varint_size + else: + # 未知的 wire type,跳过 + break + + # 如果解析失败,尝试直接作为字符串解码 + return data.decode("utf-8", errors="replace") + + except Exception as e: + print(f"[警告] 解析 protobuf 失败: {e}") + # 回退到直接解码 + return data.decode("utf-8", errors="replace") + + +def _read_varint(data: bytes, start: int) -> tuple: + """读取 protobuf varint,返回 (value, bytes_consumed)""" + result = 0 + shift = 0 + idx = start + while idx < len(data): + byte = data[idx] + result |= (byte & 0x7f) << shift + idx += 1 + if (byte & 0x80) == 0: + break + shift += 7 + return result, idx - start + + +class FlightSqlServer(flight.FlightServerBase): + """ + Arrow Flight SQL 服务器实现 + + 支持 SCQL 引擎通过 FlightSqlClient 发送的请求 + """ + + def __init__(self, host="0.0.0.0", port=8815, party="alice"): + location = f"grpc://0.0.0.0:{port}" + super().__init__(location) + self.party = party + self._port = port + self._host = host + self.conn = duckdb.connect(":memory:") + self._queries = {} # ticket_id -> query + self._ticket_counter = 0 + self._init_data() + print(f"[{party}] Arrow Flight SQL 服务器启动在端口 {port}") + + def _init_data(self): + """初始化测试数据""" + # 创建 default schema 以兼容 SCQL 的 db.table 格式 + self.conn.execute("CREATE SCHEMA IF NOT EXISTS \"default\"") + + if self.party == "alice": + # Alice 的用户信用数据 + self.conn.execute(""" + CREATE TABLE "default".user_credit ( + ID VARCHAR PRIMARY KEY, + credit_rank INTEGER, + income INTEGER, + age INTEGER + ) + """) + self.conn.execute(""" + INSERT INTO "default".user_credit VALUES + ('id0001', 6, 100000, 20), + ('id0002', 5, 90000, 19), + ('id0003', 6, 89700, 32), + ('id0005', 6, 607000, 30), + ('id0006', 5, 30070, 25), + ('id0007', 6, 12070, 28), + ('id0008', 6, 200800, 50), + ('id0009', 6, 607000, 30), + ('id0010', 5, 30070, 25), + ('id0011', 5, 12070, 28), + ('id0012', 6, 200800, 50), + ('id0013', 5, 30070, 25), + ('id0014', 5, 12070, 28), + ('id0015', 6, 200800, 18), + ('id0016', 5, 30070, 26), + ('id0017', 5, 12070, 27), + ('id0018', 6, 200800, 16), + ('id0019', 6, 30070, 25), + ('id0020', 5, 12070, 28) + """) + print(f"[{self.party}] 初始化 user_credit 表 (19 行)") + + elif self.party == "bob": + # Bob 的用户统计数据 + self.conn.execute(""" + CREATE TABLE "default".user_stats ( + ID VARCHAR PRIMARY KEY, + order_amount INTEGER, + is_active INTEGER + ) + """) + self.conn.execute(""" + INSERT INTO "default".user_stats VALUES + ('id0001', 5000, 1), + ('id0002', 3000, 1), + ('id0003', 8000, 0), + ('id0005', 12000, 1), + ('id0006', 1500, 1), + ('id0007', 2500, 0), + ('id0008', 9500, 1), + ('id0009', 7000, 1), + ('id0010', 500, 0), + ('id0011', 3500, 1), + ('id0012', 15000, 1), + ('id0013', 2000, 0), + ('id0014', 4500, 1), + ('id0015', 6500, 1), + ('id0016', 1000, 0), + ('id0017', 8500, 1), + ('id0018', 11000, 1), + ('id0019', 3200, 1), + ('id0020', 7500, 0) + """) + print(f"[{self.party}] 初始化 user_stats 表 (19 行)") + + def _preprocess_query(self, query: str) -> str: + """预处理 SQL 查询,将 default.table 转换为 "default".table""" + import re + # 匹配 default.tablename 并替换为 "default".tablename + query = re.sub(r'\bdefault\.(\w+)', r'"default".\1', query, flags=re.IGNORECASE) + return query + + def _generate_ticket(self, query: str) -> bytes: + """生成唯一的 ticket ID""" + self._ticket_counter += 1 + ticket_id = f"{self.party}_{self._ticket_counter}" + self._queries[ticket_id] = query + return ticket_id.encode("utf-8") + + def get_flight_info(self, context, descriptor): + """ + 处理 GetFlightInfo 请求 + + Arrow Flight SQL 客户端通过此方法发送 SQL 查询。 + 命令被编码在 descriptor.command 中,格式为 CommandStatementQuery protobuf。 + """ + # 从 descriptor 中提取 SQL 查询 + if descriptor.descriptor_type == flight.DescriptorType.CMD: + query = parse_flight_sql_command(descriptor.command) + elif descriptor.descriptor_type == flight.DescriptorType.PATH: + # 表名查询 + table_name = "/".join(p.decode() if isinstance(p, bytes) else p for p in descriptor.path) + query = f"SELECT * FROM {table_name}" + else: + raise flight.FlightUnavailableError("Unsupported descriptor type") + + # 预处理查询:将 default.table_name 转换为 "default".table_name + query = self._preprocess_query(query) + + print(f"[{self.party}] GetFlightInfo - Query: {query[:100]}...") + + # 执行查询获取 schema + try: + result = self.conn.execute(query).fetch_arrow_table() + schema = result.schema + num_rows = result.num_rows + + # 保存查询以供 DoGet 使用 + ticket_bytes = self._generate_ticket(query) + ticket = flight.Ticket(ticket_bytes) + + # 创建 endpoint + location = flight.Location.for_grpc_tcp("localhost", self._port) + endpoint = flight.FlightEndpoint(ticket, [location]) + + info = flight.FlightInfo( + schema, + descriptor, + [endpoint], + num_rows, + -1 # 未知的字节数 + ) + + print(f"[{self.party}] FlightInfo created - rows: {num_rows}, columns: {len(schema)}") + return info + + except Exception as e: + print(f"[{self.party}] 查询错误: {e}") + raise flight.FlightServerError(f"Query execution failed: {e}") + + def do_get(self, context, ticket): + """ + 处理 DoGet 请求,返回查询结果 + + ticket 包含查询 ID 或直接是 SQL 查询 + """ + ticket_data = ticket.ticket.decode("utf-8") + + # 检查是否是保存的 ticket ID + if ticket_data in self._queries: + query = self._queries[ticket_data] + # 清理已使用的 ticket + del self._queries[ticket_data] + else: + # 直接使用 ticket 作为查询 + query = ticket_data + + # 预处理查询 + query = self._preprocess_query(query) + + print(f"[{self.party}] DoGet - Query: {query[:100]}...") + + try: + result = self.conn.execute(query).fetch_arrow_table() + print(f"[{self.party}] 返回 {result.num_rows} 行, {result.num_columns} 列") + return flight.RecordBatchStream(result) + except Exception as e: + print(f"[{self.party}] 查询错误: {e}") + raise flight.FlightServerError(f"Query execution failed: {e}") + + def list_flights(self, context, criteria): + """列出可用的表""" + tables = self.conn.execute("SHOW TABLES").fetchall() + for table in tables: + table_name = table[0] + descriptor = flight.FlightDescriptor.for_path(table_name) + schema = self.conn.execute(f"SELECT * FROM {table_name} LIMIT 0").fetch_arrow_table().schema + yield flight.FlightInfo( + schema, + descriptor, + [], + -1, + -1 + ) + + def do_action(self, context, action): + """处理 Action 请求""" + action_type = action.type + print(f"[{self.party}] Action: {action_type}") + + if action_type == "healthcheck": + yield flight.Result(b"ok") + else: + # Flight SQL 使用各种 action,这里返回空结果 + yield flight.Result(b"") + + def list_actions(self, context): + """列出支持的 actions""" + return [ + ("healthcheck", "Health check"), + ] + + +def main(): + parser = argparse.ArgumentParser(description="Arrow Flight SQL 测试服务器") + parser.add_argument("--party", type=str, default="alice", choices=["alice", "bob"], + help="参与方名称 (alice 或 bob)") + parser.add_argument("--port", type=int, default=8815, + help="服务端口") + parser.add_argument("--host", type=str, default="0.0.0.0", + help="监听地址") + args = parser.parse_args() + + server = FlightSqlServer(host=args.host, port=args.port, party=args.party) + print(f"Arrow Flight SQL 服务器 [{args.party}] 正在运行...") + print(f"连接地址: grpc://localhost:{args.port}") + print("按 Ctrl+C 停止服务器") + + try: + server.serve() + except KeyboardInterrupt: + print(f"\n[{args.party}] 服务器已停止") + + +if __name__ == "__main__": + main() diff --git a/examples/scdb-tutorial/hive/drop_alice_table.sql b/examples/scdb-tutorial/hive/drop_alice_table.sql new file mode 100644 index 00000000..2346260c --- /dev/null +++ b/examples/scdb-tutorial/hive/drop_alice_table.sql @@ -0,0 +1,3 @@ +-- Alice 删除自己的表 +DROP TABLE IF EXISTS hive_test.user_credit; + diff --git a/examples/scdb-tutorial/hive/drop_bob_table.sql b/examples/scdb-tutorial/hive/drop_bob_table.sql new file mode 100644 index 00000000..6d3ada3a --- /dev/null +++ b/examples/scdb-tutorial/hive/drop_bob_table.sql @@ -0,0 +1,3 @@ +-- Bob 删除自己的表 +DROP TABLE IF EXISTS hive_test.user_stats; + diff --git a/examples/scdb-tutorial/hive/drop_tables.sql b/examples/scdb-tutorial/hive/drop_tables.sql new file mode 100644 index 00000000..4068926a --- /dev/null +++ b/examples/scdb-tutorial/hive/drop_tables.sql @@ -0,0 +1,4 @@ +-- 删除旧表 (root 运行) +DROP TABLE IF EXISTS hive_test.user_credit; +DROP TABLE IF EXISTS hive_test.user_stats; + diff --git a/examples/scdb-tutorial/hive/federated_query.sql b/examples/scdb-tutorial/hive/federated_query.sql new file mode 100644 index 00000000..2f18ecb1 --- /dev/null +++ b/examples/scdb-tutorial/hive/federated_query.sql @@ -0,0 +1,4 @@ +-- 联合查询测试 (可以使用 alice 或 bob 用户运行) +-- 查询: 查找年龄 >= 20 且活跃用户的信用和消费信息 +SELECT a.ID, a.credit_rank, a.income, b.order_amount, b.is_active FROM hive_test.user_credit a JOIN hive_test.user_stats b ON a.ID = b.ID WHERE a.age >= 20 AND b.is_active = 1; + diff --git a/examples/scdb-tutorial/hive/grant_ccl_alice.sql b/examples/scdb-tutorial/hive/grant_ccl_alice.sql new file mode 100644 index 00000000..c5856711 --- /dev/null +++ b/examples/scdb-tutorial/hive/grant_ccl_alice.sql @@ -0,0 +1,5 @@ +-- Alice 授权 CCL (alice 用户运行) +-- 允许 Alice 自己查看 user_credit 表的所有列 +GRANT SELECT PLAINTEXT(ID, credit_rank, income, age) ON hive_test.user_credit TO alice; +-- 允许 Bob 查看 user_credit 表的所有列 +GRANT SELECT PLAINTEXT(ID, credit_rank, income, age) ON hive_test.user_credit TO bob; diff --git a/examples/scdb-tutorial/hive/grant_ccl_bob.sql b/examples/scdb-tutorial/hive/grant_ccl_bob.sql new file mode 100644 index 00000000..5b35aaf6 --- /dev/null +++ b/examples/scdb-tutorial/hive/grant_ccl_bob.sql @@ -0,0 +1,5 @@ +-- Bob 授权 CCL (bob 用户运行) +-- 允许 Bob 自己查看 user_stats 表的所有列 +GRANT SELECT PLAINTEXT(ID, order_amount, is_active) ON hive_test.user_stats TO bob; +-- 允许 Alice 查看 user_stats 表的所有列 +GRANT SELECT PLAINTEXT(ID, order_amount, is_active) ON hive_test.user_stats TO alice; diff --git a/examples/scdb-tutorial/hive/grant_privileges.sql b/examples/scdb-tutorial/hive/grant_privileges.sql new file mode 100644 index 00000000..ad449d7f --- /dev/null +++ b/examples/scdb-tutorial/hive/grant_privileges.sql @@ -0,0 +1,6 @@ +-- 授权 (root 运行) +GRANT ALL ON hive_test.* TO alice; +GRANT ALL ON hive_test.* TO bob; +SHOW GRANTS FOR alice; +SHOW GRANTS FOR bob; + diff --git a/examples/scdb-tutorial/hive/initdb/alice_init.hql b/examples/scdb-tutorial/hive/initdb/alice_init.hql new file mode 100644 index 00000000..2885da0f --- /dev/null +++ b/examples/scdb-tutorial/hive/initdb/alice_init.hql @@ -0,0 +1,35 @@ +-- Hive initialization script for Alice +-- Run this in Hive/beeline: beeline -u jdbc:hive2://localhost:10000 -f alice_init.hql + +CREATE DATABASE IF NOT EXISTS alice; +USE alice; + +DROP TABLE IF EXISTS user_credit; +CREATE TABLE user_credit ( + ID STRING, + credit_rank INT, + income INT, + age INT +) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE; + +INSERT INTO user_credit VALUES + ('id0001', 6, 100000, 20), + ('id0002', 5, 90000, 19), + ('id0003', 6, 89700, 32), + ('id0005', 6, 607000, 30), + ('id0006', 5, 30070, 25), + ('id0007', 6, 12070, 28), + ('id0008', 6, 200800, 50), + ('id0009', 6, 607000, 30), + ('id0010', 5, 30070, 25), + ('id0011', 5, 12070, 28), + ('id0012', 6, 200800, 50), + ('id0013', 5, 30070, 25), + ('id0014', 5, 12070, 28), + ('id0015', 6, 200800, 18), + ('id0016', 5, 30070, 26), + ('id0017', 5, 12070, 27), + ('id0018', 6, 200800, 16), + ('id0019', 6, 30070, 25), + ('id0020', 5, 12070, 28); + diff --git a/examples/scdb-tutorial/hive/initdb/bob_init.hql b/examples/scdb-tutorial/hive/initdb/bob_init.hql new file mode 100644 index 00000000..bf3b0f54 --- /dev/null +++ b/examples/scdb-tutorial/hive/initdb/bob_init.hql @@ -0,0 +1,35 @@ +-- Hive initialization script for Bob +-- Run this in Hive/beeline: beeline -u jdbc:hive2://localhost:10000 -f bob_init.hql + +CREATE DATABASE IF NOT EXISTS bob; +USE bob; + +DROP TABLE IF EXISTS user_stats; +CREATE TABLE user_stats ( + ID STRING, + credit_rank INT, + income INT, + age INT +) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE; + +INSERT INTO user_stats VALUES + ('id0001', 6, 100000, 20), + ('id0002', 5, 90000, 19), + ('id0003', 6, 89700, 32), + ('id0005', 6, 607000, 30), + ('id0006', 5, 30070, 25), + ('id0007', 6, 12070, 28), + ('id0008', 6, 200800, 50), + ('id0009', 6, 607000, 30), + ('id0010', 5, 30070, 25), + ('id0011', 5, 12070, 28), + ('id0012', 6, 200800, 50), + ('id0013', 5, 30070, 25), + ('id0014', 5, 12070, 28), + ('id0015', 6, 200800, 18), + ('id0016', 5, 30070, 26), + ('id0017', 5, 12070, 27), + ('id0018', 6, 200800, 16), + ('id0019', 6, 30070, 25), + ('id0020', 5, 12070, 28); + diff --git a/examples/scdb-tutorial/hive/setup_alice.sql b/examples/scdb-tutorial/hive/setup_alice.sql new file mode 100644 index 00000000..967288bb --- /dev/null +++ b/examples/scdb-tutorial/hive/setup_alice.sql @@ -0,0 +1,5 @@ +-- Alice 创建自己的表 (使用 alice 用户运行) +-- REF_TABLE 需要使用 db.table 格式 +CREATE DATABASE IF NOT EXISTS hive_test; +CREATE TABLE hive_test.user_credit (ID STRING, credit_rank INT, income INT, age INT) REF_TABLE=default.user_credit DB_TYPE='hive'; +DESCRIBE hive_test.user_credit; diff --git a/examples/scdb-tutorial/hive/setup_bob.sql b/examples/scdb-tutorial/hive/setup_bob.sql new file mode 100644 index 00000000..aef700eb --- /dev/null +++ b/examples/scdb-tutorial/hive/setup_bob.sql @@ -0,0 +1,5 @@ +-- Bob 创建自己的表 (使用 bob 用户运行) +-- REF_TABLE 需要使用 db.table 格式 +CREATE DATABASE IF NOT EXISTS hive_test; +CREATE TABLE hive_test.user_stats (ID STRING, order_amount INT, is_active INT) REF_TABLE=default.user_stats DB_TYPE='hive'; +DESCRIBE hive_test.user_stats; diff --git a/examples/scdb-tutorial/hive/setup_endpoints.sql b/examples/scdb-tutorial/hive/setup_endpoints.sql new file mode 100644 index 00000000..fd421d73 --- /dev/null +++ b/examples/scdb-tutorial/hive/setup_endpoints.sql @@ -0,0 +1,4 @@ +-- 设置引擎端点 (各用户自己运行) +-- Alice 设置引擎端点 +ALTER USER alice WITH ENDPOINT 'localhost:8003'; + diff --git a/examples/scdb-tutorial/hive/setup_endpoints_bob.sql b/examples/scdb-tutorial/hive/setup_endpoints_bob.sql new file mode 100644 index 00000000..7122560f --- /dev/null +++ b/examples/scdb-tutorial/hive/setup_endpoints_bob.sql @@ -0,0 +1,4 @@ +-- 设置引擎端点 (各用户自己运行) +-- Bob 设置引擎端点 +ALTER USER bob WITH ENDPOINT 'localhost:8004'; + diff --git a/examples/scdb-tutorial/hive/setup_hive_test.sql b/examples/scdb-tutorial/hive/setup_hive_test.sql new file mode 100644 index 00000000..6dbd986a --- /dev/null +++ b/examples/scdb-tutorial/hive/setup_hive_test.sql @@ -0,0 +1,21 @@ +-- SCQL Hive 测试设置脚本 (使用 root 用户运行) + +-- 1. 创建用户 (需要指定 PARTY_CODE) +CREATE USER alice PARTY_CODE "alice" IDENTIFIED BY 'alice123'; +CREATE USER bob PARTY_CODE "bob" IDENTIFIED BY 'bob123'; + +-- 2. 创建数据库 +CREATE DATABASE IF NOT EXISTS hive_test; + +-- 3. 创建 Alice 的表 (user_credit) +CREATE TABLE hive_test.user_credit (ID STRING, credit_rank INT, income INT, age INT) REF_TABLE=user_credit DB_TYPE='hive' OWNER=alice; + +-- 4. 创建 Bob 的表 (user_stats) +CREATE TABLE hive_test.user_stats (ID STRING, order_amount INT, is_active INT) REF_TABLE=user_stats DB_TYPE='hive' OWNER=bob; + +-- 5. 显示表 +SHOW TABLES FROM hive_test; + +-- 6. 描述表 +DESCRIBE hive_test.user_credit; +DESCRIBE hive_test.user_stats; diff --git a/examples/scdb-tutorial/hive/setup_users.sql b/examples/scdb-tutorial/hive/setup_users.sql new file mode 100644 index 00000000..b7073738 --- /dev/null +++ b/examples/scdb-tutorial/hive/setup_users.sql @@ -0,0 +1,6 @@ +-- 创建用户 (root 运行) +CREATE USER alice PARTY_CODE "alice" IDENTIFIED BY 'alice123'; +CREATE USER bob PARTY_CODE "bob" IDENTIFIED BY 'bob123'; +CREATE DATABASE IF NOT EXISTS hive_test; +SHOW DATABASES; + diff --git a/examples/scdb-tutorial/hive/start_arrow_servers.sh b/examples/scdb-tutorial/hive/start_arrow_servers.sh new file mode 100755 index 00000000..ab99ca58 --- /dev/null +++ b/examples/scdb-tutorial/hive/start_arrow_servers.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# 启动 Alice 和 Bob 的 Arrow Flight SQL 测试服务器 + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +echo "=== 启动 Arrow Flight SQL 测试服务器 ===" + +# 停止之前可能运行的服务器 +pkill -f "arrow_flight_server.py" 2>/dev/null +sleep 1 + +# 启动 Alice 服务器 (端口 8815) +echo "启动 Alice Arrow Flight SQL 服务器 (端口 8815)..." +python3 "$SCRIPT_DIR/arrow_flight_server.py" --party alice --port 8815 & +ALICE_PID=$! +echo "Alice PID: $ALICE_PID" + +# 启动 Bob 服务器 (端口 8816) +echo "启动 Bob Arrow Flight SQL 服务器 (端口 8816)..." +python3 "$SCRIPT_DIR/arrow_flight_server.py" --party bob --port 8816 & +BOB_PID=$! +echo "Bob PID: $BOB_PID" + +sleep 2 + +echo "" +echo "=== Arrow Flight SQL 服务器已启动 ===" +echo "Alice: grpc://localhost:8815" +echo "Bob: grpc://localhost:8816" +echo "" +echo "停止服务器: pkill -f arrow_flight_server.py" +echo "" + +# 等待任意进程结束 +wait + diff --git a/examples/scdb-tutorial/hive/test_federated_query.py b/examples/scdb-tutorial/hive/test_federated_query.py new file mode 100644 index 00000000..fce919e6 --- /dev/null +++ b/examples/scdb-tutorial/hive/test_federated_query.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +SCQL 联合查询测试脚本 +测试使用 Hive 后端的联合查询功能 +""" + +import requests +import json +import time + +SCDB_URL = "http://localhost:8080" +ROOT_PASSWORD = "p6>14%h:u2&79k83" # 从日志中获取 + +def execute_sql(sql, user="root", password=ROOT_PASSWORD): + """执行 SCQL 查询""" + url = f"{SCDB_URL}/public/submit_query" + payload = { + "user": {"user": {"account_system_type": "NATIVE_USER", "native_user": {"name": user, "password": password}}}, + "query": sql + } + try: + response = requests.post(url, json=payload, timeout=60) + return response.json() + except Exception as e: + return {"error": str(e)} + +def fetch_result(session_id, user="root", password=ROOT_PASSWORD): + """获取查询结果""" + url = f"{SCDB_URL}/public/fetch_result" + payload = { + "user": {"user": {"account_system_type": "NATIVE_USER", "native_user": {"name": user, "password": password}}}, + "session_id": session_id + } + try: + response = requests.post(url, json=payload, timeout=60) + return response.json() + except Exception as e: + return {"error": str(e)} + +def setup_parties(): + """设置参与方""" + print("=== 设置参与方 ===") + + # 创建 Alice + result = execute_sql("CREATE USER alice IDENTIFIED BY 'alice123'") + print(f"创建 Alice 用户: {result}") + + # 创建 Bob + result = execute_sql("CREATE USER bob IDENTIFIED BY 'bob123'") + print(f"创建 Bob 用户: {result}") + + # 创建数据库 + result = execute_sql("CREATE DATABASE IF NOT EXISTS hive_test") + print(f"创建数据库: {result}") + +def setup_tables(): + """设置表元数据""" + print("\n=== 设置表元数据 ===") + + # Alice 的表 + result = execute_sql(""" + CREATE TABLE hive_test.user_credit ( + ID STRING, + credit_rank INT, + income INT, + age INT + ) REF_TABLE=user_credit DB_TYPE='hive' OWNER='alice' PARTY='alice' + """) + print(f"创建 Alice 表: {result}") + + # Bob 的表 + result = execute_sql(""" + CREATE TABLE hive_test.user_stats ( + ID STRING, + order_amount INT, + is_active INT + ) REF_TABLE=user_stats DB_TYPE='hive' OWNER='bob' PARTY='bob' + """) + print(f"创建 Bob 表: {result}") + +def grant_permissions(): + """授权""" + print("\n=== 授权 ===") + + # Alice 授权给 Bob + result = execute_sql("GRANT SELECT ON hive_test.user_credit TO bob", user="alice", password="alice123") + print(f"Alice 授权给 Bob: {result}") + + # Bob 授权给 Alice + result = execute_sql("GRANT SELECT ON hive_test.user_stats TO alice", user="bob", password="bob123") + print(f"Bob 授权给 Alice: {result}") + +def run_federated_query(): + """运行联合查询""" + print("\n=== 运行联合查询 ===") + + query = """ + SELECT + a.ID, + a.credit_rank, + a.income, + b.order_amount, + b.is_active + FROM hive_test.user_credit a + JOIN hive_test.user_stats b ON a.ID = b.ID + WHERE a.age >= 20 AND b.is_active = 1 + LIMIT 10 + """ + + print(f"查询: {query}") + result = execute_sql(query) + print(f"提交查询结果: {json.dumps(result, indent=2)}") + + if "session_id" in result: + print("\n等待查询结果...") + time.sleep(5) + + fetch = fetch_result(result["session_id"]) + print(f"查询结果: {json.dumps(fetch, indent=2)}") + +def test_basic_connectivity(): + """测试基本连接""" + print("=== 测试 SCDB 连接 ===") + try: + response = requests.get(f"{SCDB_URL}/public/submit_query", timeout=5) + print(f"SCDB 服务器状态: 运行中 (HTTP {response.status_code})") + return True + except Exception as e: + print(f"SCDB 服务器连接失败: {e}") + return False + +def main(): + print("=" * 60) + print("SCQL 联合查询测试 - Hive 后端") + print("=" * 60) + + if not test_basic_connectivity(): + print("请先启动 SCDB 服务器") + return + + # 简单测试 - 检查 SCDB API + print("\n=== 测试 SCDB API ===") + result = execute_sql("SHOW DATABASES") + print(f"SHOW DATABASES: {json.dumps(result, indent=2)}") + +if __name__ == "__main__": + main() + diff --git a/examples/scdb-tutorial/hive/test_query.sql b/examples/scdb-tutorial/hive/test_query.sql new file mode 100644 index 00000000..89a5e885 --- /dev/null +++ b/examples/scdb-tutorial/hive/test_query.sql @@ -0,0 +1,3 @@ +-- 测试 SCQL 基本功能 +SHOW DATABASES; + diff --git a/examples/scdb-tutorial/hive/users_hive.json b/examples/scdb-tutorial/hive/users_hive.json new file mode 100644 index 00000000..3117102e --- /dev/null +++ b/examples/scdb-tutorial/hive/users_hive.json @@ -0,0 +1,14 @@ +{ + "alice": { + "UserName": "alice", + "Password": "alice123" + }, + "bob": { + "UserName": "bob", + "Password": "bob123" + }, + "root": { + "UserName": "root", + "Password": "812357]4gJ9}.>Vx" + } +} diff --git a/examples/scdb-tutorial/init_database_local.sh b/examples/scdb-tutorial/init_database_local.sh new file mode 100644 index 00000000..08d261ae --- /dev/null +++ b/examples/scdb-tutorial/init_database_local.sh @@ -0,0 +1,22 @@ +#!/bin/bash +set -e + +# Ask for password +read -s -p "Enter MySQL root password (leave empty if none): " MYSQL_PASS + +CMD="mysql -u root" +if [ ! -z "$MYSQL_PASS" ]; then + CMD="mysql -u root -p$MYSQL_PASS" +fi + +echo "Initializing Alice Database..." +$CMD < mysql/initdb/alice_init.sql + +echo "Initializing Bob Database..." +$CMD < mysql/initdb/bob_init.sql + +echo "Initializing SCDB Database..." +$CMD < mysql/initdb/scdb_init.sql + +echo "Database initialization complete." + diff --git a/examples/scdb-tutorial/scdb/conf/config_hive.yml b/examples/scdb-tutorial/scdb/conf/config_hive.yml new file mode 100644 index 00000000..6fbfaafc --- /dev/null +++ b/examples/scdb-tutorial/scdb/conf/config_hive.yml @@ -0,0 +1,28 @@ +scdb_host: localhost:8080 +port: 8080 +protocol: http +query_result_callback_timeout: 3m +session_expire_time: 3m +session_expire_check_time: 1m +log_level: debug +storage: + type: sqlite + conn_str: "./examples/scdb-tutorial/scdb/scdb_hive.db" + max_idle_conns: 10 + max_open_conns: 100 + conn_max_idle_time: 2m + conn_max_lifetime: 5m +engine: + timeout: 120s + protocol: http + content_type: application/json + spu: | + { + "protocol": "SEMI2K", + "field": "FM64" + } +party_auth: + method: none + enable_timestamp_check: false + validity_period: 1m + diff --git a/examples/scdb-tutorial/start_all.sh b/examples/scdb-tutorial/start_all.sh new file mode 100755 index 00000000..fefeb013 --- /dev/null +++ b/examples/scdb-tutorial/start_all.sh @@ -0,0 +1,65 @@ +#!/bin/bash +# SCQL 本地启动脚本 + +PROJECT_ROOT="/root/autodl-tmp/scql" +TUTORIAL_DIR="/root/autodl-tmp/scql/examples/scdb-tutorial" +BIN_DIR="$PROJECT_ROOT/bin" + +echo "=========================================" +echo "启动 SCQL 服务" +echo "=========================================" + +# 检查二进制文件 +if [ ! -f "$BIN_DIR/scqlengine" ] || [ ! -f "$BIN_DIR/scdbserver" ]; then + echo "错误: 找不到编译好的二进制文件" + echo "请先运行: make binary" + exit 1 +fi + +# 创建日志目录 +mkdir -p "$TUTORIAL_DIR/logs" + +# 启动 Alice Engine +echo "启动 Alice Engine (端口 8003)..." +nohup "$BIN_DIR/scqlengine" \ + --flagfile="$TUTORIAL_DIR/engine/alice/conf/gflags_local.conf" \ + > "$TUTORIAL_DIR/logs/alice_engine.log" 2>&1 & +ALICE_PID=$! +echo "Alice Engine PID: $ALICE_PID" + +# 启动 Bob Engine +echo "启动 Bob Engine (端口 8004)..." +nohup "$BIN_DIR/scqlengine" \ + --flagfile="$TUTORIAL_DIR/engine/bob/conf/gflags_local.conf" \ + > "$TUTORIAL_DIR/logs/bob_engine.log" 2>&1 & +BOB_PID=$! +echo "Bob Engine PID: $BOB_PID" + +# 等待 Engines 启动 +sleep 2 + +# 启动 SCDB Server +echo "启动 SCDB Server (端口 8080)..." +nohup "$BIN_DIR/scdbserver" \ + -config="$TUTORIAL_DIR/scdb/conf/config_local.yml" \ + > "$TUTORIAL_DIR/logs/scdb_server.log" 2>&1 & +SCDB_PID=$! +echo "SCDB Server PID: $SCDB_PID" + +echo "" +echo "=========================================" +echo "所有服务已启动!" +echo "=========================================" +echo "Alice Engine: http://localhost:8003 (PID: $ALICE_PID)" +echo "Bob Engine: http://localhost:8004 (PID: $BOB_PID)" +echo "SCDB Server: http://localhost:8080 (PID: $SCDB_PID)" +echo "" +echo "日志文件位置: $TUTORIAL_DIR/logs/" +echo "" +echo "停止服务: kill $ALICE_PID $BOB_PID $SCDB_PID" +echo "或运行: bash $TUTORIAL_DIR/stop_all.sh" +echo "" +echo "查看日志:" +echo " tail -f $TUTORIAL_DIR/logs/alice_engine.log" +echo " tail -f $TUTORIAL_DIR/logs/bob_engine.log" +echo " tail -f $TUTORIAL_DIR/logs/scdb_server.log" diff --git a/examples/scdb-tutorial/start_all_hive.sh b/examples/scdb-tutorial/start_all_hive.sh new file mode 100755 index 00000000..c329f460 --- /dev/null +++ b/examples/scdb-tutorial/start_all_hive.sh @@ -0,0 +1,109 @@ +#!/bin/bash +# SCQL 本地启动脚本 - Hive 后端版本 +# 使用 Arrow Flight SQL 服务器模拟 Hive 数据源 + +PROJECT_ROOT="/root/autodl-tmp/scql" +TUTORIAL_DIR="/root/autodl-tmp/scql/examples/scdb-tutorial" +HIVE_DIR="$TUTORIAL_DIR/hive" +BIN_DIR="$PROJECT_ROOT/bin" + +echo "=========================================" +echo "启动 SCQL 服务 (Hive 后端)" +echo "=========================================" + +# 检查二进制文件 +if [ ! -f "$BIN_DIR/scqlengine" ] || [ ! -f "$BIN_DIR/scdbserver" ]; then + echo "错误: 找不到编译好的二进制文件" + echo "请先运行: make binary" + exit 1 +fi + +# 检查 Python 依赖 +if ! python3 -c "import pyarrow.flight, duckdb" 2>/dev/null; then + echo "警告: 缺少 Python 依赖,请安装:" + echo " pip install pyarrow duckdb" + echo "" +fi + +# 创建日志目录 +mkdir -p "$TUTORIAL_DIR/logs" + +# 停止可能已运行的 Arrow Flight 服务器 +pkill -f "arrow_flight_server.py" 2>/dev/null || true +sleep 1 + +# 启动 Arrow Flight SQL 服务器 (模拟 Hive) +echo "启动 Arrow Flight SQL 服务器 (模拟 Hive)..." + +echo " 启动 Alice Arrow Flight (端口 8815)..." +nohup python3 "$HIVE_DIR/arrow_flight_server.py" --party alice --port 8815 \ + > "$TUTORIAL_DIR/logs/alice_flight.log" 2>&1 & +ALICE_FLIGHT_PID=$! + +echo " 启动 Bob Arrow Flight (端口 8816)..." +nohup python3 "$HIVE_DIR/arrow_flight_server.py" --party bob --port 8816 \ + > "$TUTORIAL_DIR/logs/bob_flight.log" 2>&1 & +BOB_FLIGHT_PID=$! + +# 等待 Arrow Flight 服务器启动 +sleep 2 + +# 禁用代理(避免 gRPC 连接被代理拦截) +unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY + +# 启动 Alice Engine (使用 Hive 配置) +echo "启动 Alice Engine (端口 8003, Hive 后端)..." +nohup "$BIN_DIR/scqlengine" \ + --flagfile="$TUTORIAL_DIR/engine/alice/conf/gflags_hive.conf" \ + > "$TUTORIAL_DIR/logs/alice_engine.log" 2>&1 & +ALICE_PID=$! +echo "Alice Engine PID: $ALICE_PID" + +# 启动 Bob Engine (使用 Hive 配置) +echo "启动 Bob Engine (端口 8004, Hive 后端)..." +nohup "$BIN_DIR/scqlengine" \ + --flagfile="$TUTORIAL_DIR/engine/bob/conf/gflags_hive.conf" \ + > "$TUTORIAL_DIR/logs/bob_engine.log" 2>&1 & +BOB_PID=$! +echo "Bob Engine PID: $BOB_PID" + +# 等待 Engines 启动 +sleep 2 + +# 删除旧的 SQLite 数据库(如果存在),以便重新初始化 +rm -f "$TUTORIAL_DIR/scdb/scdb_hive.db" + +# 启动 SCDB Server(设置 root 密码为 "root") +echo "启动 SCDB Server (端口 8080)..." +export SCQL_ROOT_PASSWORD="root" +nohup "$BIN_DIR/scdbserver" \ + -config="$TUTORIAL_DIR/scdb/conf/config_hive.yml" \ + > "$TUTORIAL_DIR/logs/scdb_server.log" 2>&1 & +SCDB_PID=$! +echo "SCDB Server PID: $SCDB_PID" + +echo "" +echo "=========================================" +echo "所有服务已启动 (Hive 后端)!" +echo "=========================================" +echo "" +echo "Arrow Flight SQL (模拟 Hive):" +echo " Alice: grpc://localhost:8815 (PID: $ALICE_FLIGHT_PID)" +echo " Bob: grpc://localhost:8816 (PID: $BOB_FLIGHT_PID)" +echo "" +echo "SCQL 服务:" +echo " Alice Engine: http://localhost:8003 (PID: $ALICE_PID)" +echo " Bob Engine: http://localhost:8004 (PID: $BOB_PID)" +echo " SCDB Server: http://localhost:8080 (PID: $SCDB_PID)" +echo "" +echo "日志文件位置: $TUTORIAL_DIR/logs/" +echo " - alice_flight.log (Arrow Flight)" +echo " - bob_flight.log (Arrow Flight)" +echo " - alice_engine.log (SCQL Engine)" +echo " - bob_engine.log (SCQL Engine)" +echo " - scdb_server.log (SCDB Server)" +echo "" +echo "停止服务: bash $TUTORIAL_DIR/stop_all_hive.sh" +echo "" +echo "运行测试: bash $PROJECT_ROOT/test_privacy_hive.sh" + diff --git a/examples/scdb-tutorial/stop_all.sh b/examples/scdb-tutorial/stop_all.sh new file mode 100755 index 00000000..a935a22c --- /dev/null +++ b/examples/scdb-tutorial/stop_all.sh @@ -0,0 +1,11 @@ +#!/bin/bash +# SCQL 停止脚本 + +echo "停止 SCQL 服务..." + +# 查找并停止进程 +pkill -f "scqlengine.*alice" && echo "✓ Stopped Alice Engine" +pkill -f "scqlengine.*bob" && echo "✓ Stopped Bob Engine" +pkill -f "scdbserver" && echo "✓ Stopped SCDB Server" + +echo "所有服务已停止" diff --git a/examples/scdb-tutorial/stop_all_hive.sh b/examples/scdb-tutorial/stop_all_hive.sh new file mode 100755 index 00000000..f9520634 --- /dev/null +++ b/examples/scdb-tutorial/stop_all_hive.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# SCQL 停止脚本 - Hive 后端版本 + +echo "停止 SCQL 服务 (Hive 后端)..." + +# 停止 Arrow Flight SQL 服务器 +pkill -f "arrow_flight_server.py.*alice" && echo "✓ Stopped Alice Arrow Flight" +pkill -f "arrow_flight_server.py.*bob" && echo "✓ Stopped Bob Arrow Flight" + +# 停止 SCQL 服务 +pkill -f "scqlengine.*alice" && echo "✓ Stopped Alice Engine" +pkill -f "scqlengine.*bob" && echo "✓ Stopped Bob Engine" +pkill -f "scdbserver" && echo "✓ Stopped SCDB Server" + +echo "" +echo "所有服务已停止 (Hive 后端)" + diff --git a/pkg/parser/format/format_dialect.go b/pkg/parser/format/format_dialect.go index 76b0d777..749f58b7 100644 --- a/pkg/parser/format/format_dialect.go +++ b/pkg/parser/format/format_dialect.go @@ -28,6 +28,7 @@ var ( _ Dialect = &PostgresDialect{} _ Dialect = &CVSDBDialect{} _ Dialect = &OdpsDialect{} + _ Dialect = &HiveDialect{} ) type Dialect interface { @@ -370,3 +371,64 @@ func (d *OdpsDialect) ConvertCastTypeToString(asType byte, flen int, decimal int func (d *OdpsDialect) NeedParenthesesForCmpOperand() bool { return true } + +// HiveDialect Hive SQL dialect +type HiveDialect struct { + MySQLDialect + funcNameMap map[string]string +} + +func NewHiveDialect() Dialect { + return &HiveDialect{ + funcNameMap: map[string]string{ + "ifnull": "nvl", + "truncate": "trunc", + "now": "current_timestamp", + "curdate": "current_date", + }, + } +} + +func (d *HiveDialect) SkipSchemaInColName() bool { + return true +} + +func (d *HiveDialect) GetSpecialFuncName(originName string) string { + if res, ok := d.funcNameMap[originName]; ok { + return res + } + return originName +} + +func (d *HiveDialect) ConvertCastTypeToString(asType byte, flen int, decimal int, flag uint) (keyword string, plainWord string, err error) { + switch asType { + case mysql.TypeVarString, mysql.TypeVarchar: + keyword = "STRING" + case mysql.TypeNewDecimal: + keyword = "DECIMAL" + if flen > 0 && decimal > 0 { + plainWord = fmt.Sprintf("(%d, %d)", flen, decimal) + } else if flen > 0 { + plainWord = fmt.Sprintf("(%d)", flen) + } + case mysql.TypeLonglong: + if flag&mysql.UnsignedFlag != 0 { + err = fmt.Errorf("unsupported cast as data type %+v", asType) + return + } + keyword = "BIGINT" + case mysql.TypeDouble, mysql.TypeFloat: + keyword = "DOUBLE" + case mysql.TypeDate: + keyword = "DATE" + case mysql.TypeDatetime: + keyword = "TIMESTAMP" + default: + return d.MySQLDialect.ConvertCastTypeToString(asType, flen, decimal, flag) + } + return +} + +func (d *HiveDialect) NeedParenthesesForCmpOperand() bool { + return true +} diff --git a/pkg/planner/core/database_dialect.go b/pkg/planner/core/database_dialect.go index bcedf16a..15695169 100644 --- a/pkg/planner/core/database_dialect.go +++ b/pkg/planner/core/database_dialect.go @@ -30,6 +30,7 @@ const ( DBTypePostgres DBTypeCSVDB DBTypeODPS + DBTypeHive ) var dbTypeMap = map[string]DBType{ @@ -38,6 +39,7 @@ var dbTypeMap = map[string]DBType{ "postgresql": DBTypePostgres, "csvdb": DBTypeCSVDB, "odps": DBTypeODPS, + "hive": DBTypeHive, } var dbTypeNameMap = map[DBType]string{ @@ -47,6 +49,7 @@ var dbTypeNameMap = map[DBType]string{ DBTypePostgres: "postgresql", DBTypeCSVDB: "csvdb", DBTypeODPS: "odps", + DBTypeHive: "hive", } func (t DBType) String() string { @@ -68,6 +71,7 @@ var ( _ Dialect = &PostgresDialect{} _ Dialect = &CVSDBDialect{} _ Dialect = &OdpsDialect{} + _ Dialect = &HiveDialect{} ) var ( @@ -78,6 +82,7 @@ var ( DBTypePostgres: NewPostgresDialect(), DBTypeSQLite: NewMySQLDialect(), DBTypeODPS: NewOdpsDialect(), + DBTypeHive: NewHiveDialect(), } ) @@ -176,3 +181,25 @@ func (d *OdpsDialect) GetFormatDialect() format.Dialect { func (d *OdpsDialect) SupportAnyValue() bool { return false } + +type HiveDialect struct { + MySQLDialect +} + +func NewHiveDialect() *HiveDialect { + return &HiveDialect{ + MySQLDialect{flags: format.RestoreStringSingleQuotes | format.RestoreKeyWordLowercase, formatDialect: format.NewHiveDialect()}, + } +} + +func (d *HiveDialect) GetRestoreFlags() format.RestoreFlags { + return d.flags +} + +func (d *HiveDialect) GetFormatDialect() format.Dialect { + return d.formatDialect +} + +func (d *HiveDialect) SupportAnyValue() bool { + return false +} diff --git a/pkg/planner/core/logicalplan_to_stmt_test.go b/pkg/planner/core/logicalplan_to_stmt_test.go index 54c125d3..5ca7b019 100644 --- a/pkg/planner/core/logicalplan_to_stmt_test.go +++ b/pkg/planner/core/logicalplan_to_stmt_test.go @@ -37,6 +37,8 @@ import ( ) var _ = Suite(&testRunSQLSuite{}) + +// Note: Hive is not included in testBackEnds for now, add test cases to runsql_in.json before enabling var testBackEnds = []string{MySQL, Postgres, ODPS, CSV} const ( @@ -44,6 +46,7 @@ const ( Postgres = "POSTGRESQL" ODPS = "ODPS" CSV = "CSVDB" + Hive = "HIVE" ) type testRunSQLSuite struct { @@ -62,9 +65,11 @@ type TestCaseSqlString struct { RewrittenSqlMysql string `json:"rewritten_sql_mysql"` RewrittenSqlPg string `json:"rewritten_sql_pg"` RewrittenSqlCSV string `json:"rewritten_sql_csv"` + RewrittenSqlHive string `json:"rewritten_sql_hive"` SkipOdpsTest bool `json:"skip_odps_test"` SkipPgTest bool `json:"skip_pg_test"` SkipCSVTest bool `json:"skip_csv_test"` + SkipHiveTest bool `json:"skip_hive_test"` // default; if RewrittenSql set, all back ends use this sql as default RewrittenSql string `json:"rewritten_sql"` } @@ -161,6 +166,11 @@ func GetExpectSQL(backEnd string, testCase TestCaseSqlString) string { return testCase.RewrittenSqlCSV } } + if backEnd == Hive { + if testCase.RewrittenSqlHive != "" { + return testCase.RewrittenSqlHive + } + } return testCase.RewrittenSql } @@ -174,6 +184,8 @@ func SkipTestFor(backEnd string, testCase TestCaseSqlString) bool { return testCase.SkipOdpsTest case CSV: return testCase.SkipCSVTest + case Hive: + return testCase.SkipHiveTest } return false }