Skip to content

Conversation

@WJSGDBZ
Copy link
Contributor

@WJSGDBZ WJSGDBZ commented Oct 16, 2025

What problem does this PR solve?

Add support for hashkey and sortkey scan filters

What is changed and how does it work?

  1. in original logic generate_next_bytes function has two problem
    a. The input buff (i.e. hashkey) can be either ‘str’ or ‘bytearray’. If it's a str, in-place modification like buff[pos] += 1 won't work since strings are immutable.
    b. The pos variable was initialized to a fixed index (len(buff) - 1), which is counterintuitive and could lead to an infinite loop.
    def generate_next_bytes(cls, buff):
    pos = len(buff) - 1
    found = False
    while pos >= 0:
    if ord(buff[pos]) != 0xFF:
    buff[pos] += 1
    found = True
    break
    if found:
    return buff
    else:
    return buff + chr(0)

Checklist

Tests
  • Manual test (add detailed scripts or steps below)
    the test script as below
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor

from pypegasus.pgclient import *
from pypegasus.rrdb.ttypes import filter_type

@inlineCallbacks
def test_prefix_match_with_boundaries_full_scan():
    print("Start test_prefix_match_with_boundaries_full_scan")

    c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'full_scan_test')

    try:
        suc = yield c.init()
        if not suc:
            print("Failed to connect to Pegasus server")
            reactor.stop()
            return
    except Exception as e:
        print(f"Init failed: {e}")
        reactor.stop()
        return

    hkey = 'hkey1'
    for i in range(110):
        skey = f'skey{i}'
        yield c.set(hkey, skey, str(i), 0)
    
    yield c.set(hkey, 'aa', 'aa', 0)
    yield c.set(hkey, 'b', 'b', 0)
    yield c.set(hkey, 'z', 'z', 0)

    hkey = 'hkey2'
    for i in range(110):
        skey = f'skey{i}'
        yield c.set(hkey, skey, str(i), 0)
    
    yield c.set(hkey, 'aa', 'aa', 0)
    yield c.set(hkey, 'b', 'b', 0)
    yield c.set(hkey, 'z', 'z', 0)

    test_cases = [
        {
            'skey_filter_type':filter_type.FT_MATCH_PREFIX,
            'skey_pattern': b'skey9',
            'hkey_filter_type':filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': 'hkey1',
            'batch_size': 2,
            'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99']
        },
        {
            'skey_filter_type':filter_type.FT_MATCH_PREFIX,
            'skey_pattern': b'',
            'hkey_filter_type':filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'',
            'batch_size': 2,
            'expected': 2*['aa', 'b', 'z'] + 2*[f'skey{i}' for i in range(0, 110)]
        },
        {
            'skey_filter_type':filter_type.FT_MATCH_PREFIX,
            'skey_pattern': b'skey',
            'hkey_filter_type':filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': 'hkey2',
            'batch_size': 2,
            'expected': [f'skey{i}' for i in range(0, 110)]
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_PREFIX,
            'skey_pattern': b'skey10',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['skey10', 'skey100', 'skey101', 'skey102', 'skey103', 'skey104', 'skey105', 'skey106', 'skey107', 'skey108', 'skey109']
        },
        # 后缀测试
        {
            'skey_filter_type': filter_type.FT_MATCH_POSTFIX,
            'skey_pattern': b'9',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['skey9', 'skey19', 'skey29', 'skey39', 'skey49', 'skey59', 'skey69', 'skey79', 'skey89', 'skey99', 'skey109']
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_POSTFIX,
            'skey_pattern': b'z',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['z']
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_POSTFIX,
            'skey_pattern': b'',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['aa', 'b', 'z'] + [f'skey{i}' for i in range(0, 110)]
        },
        # 任意匹配测试
        {
            'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
            'skey_pattern': b'9',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['skey9', 'skey19', 'skey29', 'skey39', 'skey49', 'skey59', 'skey69', 'skey79', 'skey89', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99', 'skey109']
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
            'skey_pattern': b'key',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': [f'skey{i}' for i in range(110)]
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
            'skey_pattern': b'99',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['skey99']
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
            'skey_pattern': b'xyz',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': []  
        }
    ]

    for idx, case in enumerate(test_cases):
        print(f"\nRunning test case {idx + 1}...")

        o = ScanOptions()
        o.sortkey_filter_type = case['skey_filter_type']
        o.sortkey_filter_pattern = case['skey_pattern']
        o.batch_size = case['batch_size']
        o.hashkey_filter_type = case['hkey_filter_type']
        o.hashkey_filter_pattern = case['hkey_pattern']

        ss = c.get_unordered_scanners(3, o)
        results = []
        for s in ss:
            while True:
                try:
                    ret = yield s.get_next()
                    if not ret:
                        break
                    results.append(ret)
                except Exception as e:
                    print(f"Exception during scan: {e}")
                    break

            s.close()

        actual_keys = [k[1] for k, _ in results]
        expected_keys = case['expected']
        assert len(actual_keys) == len(expected_keys) and set(actual_keys) == set(expected_keys), \
            f"Test case {idx + 1} failed: Expected {expected_keys} \n got {actual_keys}"

        print(f"✅ Test case {idx + 1} passed: {len(actual_keys)} keys matched")


@inlineCallbacks
def test_prefix_match_with_boundaries():
    print("Start test_prefix_match_with_boundaries")

    c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'scan_test')

    try:
        suc = yield c.init()
        if not suc:
            print("Failed to connect to Pegasus server")
            reactor.stop()
            return
    except Exception as e:
        print(f"Init failed: {e}")
        reactor.stop()
        return

    hkey = 'hkey1'

    # 写入测试数据:skey0 ~ skey109
    for i in range(110):
        skey = f'skey{i}'
        yield c.set(hkey, skey, str(i), 0)
    
    yield c.set(hkey, 'aa', 'aa', 0)
    yield c.set(hkey, 'b', 'b', 0)
    yield c.set(hkey, 'z', 'z', 0)

    test_cases = [
        {
            'pattern': b'',
            'start': b'aa',
            'stop': b'b',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['aa', 'b']
        },
        {
            'pattern': b'skey9',
            'start': b'skey50',
            'stop': b'skey99',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99']
        },
        {
            'pattern': b'skey9',
            'start': b'skey50',
            'stop': b'skey99',
            'start_inclusive': False,
            'stop_inclusive': False,
            'batch_size': 10,
            'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98']
        },
        {
            'pattern': b'skey9',
            'start': b'skey9',
            'stop': b'skey99',
            'start_inclusive': True,
            'stop_inclusive': False,
            'batch_size': 50,
            'expected': ['skey9'] + [f'skey{i}' for i in range(90, 99)]
        },
        {
            'pattern': b'skey9',
            'start': b'skey9',
            'stop': b'skey99',
            'start_inclusive': False,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': [f'skey{i}' for i in range(90, 100)]
        },
        {
            'pattern': b'skey',
            'start': b'skey9',
            'stop': b'skey99',
            'start_inclusive': False,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': [f'skey{i}' for i in range(90, 100)]
        },
        {
            'pattern': b'skey9',
            'start': b'skey9',
            'stop': b'skey99',
            'start_inclusive': False,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': [f'skey{i}' for i in range(90, 100)]
        },
        {
            'pattern': b'skey9',
            'start': b'skey100',
            'stop': b'skey105',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 10,
            'expected': []
        },
        {
            'pattern': b'skey9',
            'start': b'skey9',
            'stop': b'skey9',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['skey9']
        },
        {
            'pattern': b'skey9',
            'start': b'skey91',
            'stop': b'skey9',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': []
        },
        {
            'pattern': b'',
            'start': b'skey91',
            'stop': b'',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['z']+[f'skey{i}' for i in range(91, 100)]
        },
        {
            'pattern': b'skey',
            'start': b'skey88',
            'stop': b'skey98',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['skey9'] + [f'skey{i}' for i in range(88, 99)]
        },
        {
            'pattern': b'skey',
            'start': b'skey88',
            'stop': b'skey98',
            'start_inclusive': False,
            'stop_inclusive': False,
            'batch_size': 1,
            'expected': ['skey9'] + [f'skey{i}' for i in range(89, 98)]
        },
        {
            'pattern': b'skey1',
            'start': b'aa',
            'stop': b'',
            'start_inclusive': True,
            'stop_inclusive': False,
            'batch_size': 1,
            'expected': ['skey1', 'skey10', 'skey100', 'skey101', 'skey102', 'skey103', 'skey104', 
                         'skey105', 'skey106', 'skey107', 'skey108', 'skey109', 'skey11', 'skey12', 'skey13', 
                         'skey14', 'skey15', 'skey16', 'skey17', 'skey18', 'skey19']
        }
    ]

    for idx, case in enumerate(test_cases):
        print(f"\nRunning test case {idx + 1}...")

        o = ScanOptions()
        o.sortkey_filter_type = filter_type.FT_MATCH_PREFIX
        o.sortkey_filter_pattern = case['pattern']
        o.start_inclusive = case['start_inclusive']
        o.stop_inclusive = case['stop_inclusive']
        o.batch_size = case['batch_size']

        s = c.get_scanner(hkey, case['start'], case['stop'], o)

        results = []
        while True:
            try:
                ret = yield s.get_next()
                if not ret:
                    break
                results.append(ret)
            except Exception as e:
                print(f"Exception during scan: {e}")
                break

        s.close()

        actual_keys = [k[1] for k, _ in results]
        expected_keys = case['expected']

        assert set(actual_keys) == set(expected_keys), \
            f"Test case {idx + 1} failed: \n Expected {expected_keys} \n got {actual_keys}"

        print(f"✅ Test case {idx + 1} passed: {len(actual_keys)} keys matched")

    print("All test cases passed!")

if __name__ == "__main__":
    tests = [
        test_prefix_match_with_boundaries,
        test_prefix_match_with_boundaries_full_scan
    ]

    deferreds = [defer.maybeDeferred(test) for test in tests]

    d = defer.gatherResults(deferreds, consumeErrors=False)

    d.addBoth(lambda _: reactor.stop())

    reactor.run()

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for hashkey and sortkey scan filters to the Python Pegasus client. The changes enable filtering of scan results based on pattern matching (prefix, postfix, anywhere) for both hash keys and sort keys during scanning operations.

  • Added filter configuration fields to ScanOptions class for hashkey and sortkey filtering
  • Fixed issues in the generate_next_bytes function to handle different input types and avoid infinite loops
  • Enhanced scanner functionality to properly set filter parameters in scan requests

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
python-client/pypegasus/utils/tools.py Added filter configuration fields to ScanOptions and utility function for byte value handling
python-client/pypegasus/pgclient.py Enhanced scanning logic with filter support, fixed generate_next_bytes function, and improved key generation
python-client/pypegasus/base/ttypes.py Added raw() method to blob class for accessing underlying data

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@WJSGDBZ WJSGDBZ force-pushed the scan_filter_supported branch from f1f3481 to a228dbc Compare November 3, 2025 02:29
Comment on lines 55 to 60
if isinstance(data,str):
self._is_str = True
data = data.encode('UTF-8')
else:
self._is_str = False
self.data = data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(data,str):
self._is_str = True
data = data.encode('UTF-8')
else:
self._is_str = False
self.data = data
if isinstance(data,str):
self._is_str = True
self.data = data.encode('UTF-8')
else:
self._is_str = False
self.data = data

Comment on lines +606 to +607
if sort_key_len >= 0xFFFF:
raise ValueError("sort_key length must be less than 65535")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is currently no restriction on the length of the sort key. The limit on the hash key exists because the hash key length is stored in only two bytes.

Suggested change
if sort_key_len >= 0xFFFF:
raise ValueError("sort_key length must be less than 65535")

return arr
else:
return buff + chr(0)
return bytes(arr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it converted to bytes here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants