From b80138a4d656234141ec891f56e59d6cdf67a5fc Mon Sep 17 00:00:00 2001 From: Michele Campus Date: Tue, 31 Dec 2024 18:45:00 +0100 Subject: [PATCH 1/5] implementation of IPV6 parsing --- src/lib.rs | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 166ddf7..8b49611 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -232,7 +232,8 @@ impl PcapVTab { if data.len() >= 14 { let ethertype = u16::from_be_bytes([data[12], data[13]]); debug_print!("Ethertype: 0x{:04x}", ethertype); - + + // IPv4 Parsing if ethertype == 0x0800 && data.len() >= 34 { let ip_header_len = (data[14] & 0x0f) * 4; debug_print!("IP header length: {}", ip_header_len); @@ -275,8 +276,54 @@ impl PcapVTab { payload = data[payload_start..].to_vec(); } - } else if ethertype == 0x86DD { - protocol = String::from("IPv6"); + } + // IPv6 Parsing + else if ethertype == 0x86DD && data.len() >= 54 { + src_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + u16::from_be_bytes([data[22], data[23]]), + u16::from_be_bytes([data[24], data[25]]), + u16::from_be_bytes([data[26], data[27]]), + u16::from_be_bytes([data[28], data[29]]), + u16::from_be_bytes([data[30], data[31]]), + u16::from_be_bytes([data[32], data[33]]), + u16::from_be_bytes([data[34], data[35]]), + u16::from_be_bytes([data[36], data[37]])); + dst_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + u16::from_be_bytes([data[38], data[39]]), + u16::from_be_bytes([data[40], data[41]]), + u16::from_be_bytes([data[42], data[43]]), + u16::from_be_bytes([data[44], data[45]]), + u16::from_be_bytes([data[46], data[47]]), + u16::from_be_bytes([data[48], data[49]]), + u16::from_be_bytes([data[50], data[51]]), + u16::from_be_bytes([data[52], data[53]])); + + let next_header = data[20]; + debug_print!("Next Header: {}", next_header); + + let transport_header_start = 54; // IPv6 header is 40 bytes + + match next_header { + 6 => { + protocol = String::from("TCP"); + if data.len() >= transport_header_start + 4 { + src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); + dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); + } + }, + 17 => { + protocol = String::from("UDP"); + if data.len() >= transport_header_start + 4 { + src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); + dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); + } + }, + _ => protocol = format!("IPv6({})", next_header), + } + + if data.len() > transport_header_start { + payload = data[transport_header_start..].to_vec(); + } } } From 142cb3a1b46cb78d8eaa7ff5a6aebdf2328dc15b Mon Sep 17 00:00:00 2001 From: Michele Campus Date: Fri, 3 Jan 2025 00:04:41 +0100 Subject: [PATCH 2/5] Fix IPV6 parser --- src/lib.rs | 185 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 105 insertions(+), 80 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8b49611..5b61341 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -219,6 +219,8 @@ impl VTab for PcapVTab { } impl PcapVTab { + // parse_packet + // Return the source IP, destination IP, source port, destination port, protocol, payload fn parse_packet(data: &[u8]) -> Result<(String, String, u16, u16, String, Vec), Box> { let mut src_ip = String::from("0.0.0.0"); let mut dst_ip = String::from("0.0.0.0"); @@ -229,107 +231,130 @@ impl PcapVTab { debug_print!("Parsing packet of length: {}", data.len()); + // Check if we have enough data to parse the Ethernet header if data.len() >= 14 { + let ethertype = u16::from_be_bytes([data[12], data[13]]); debug_print!("Ethertype: 0x{:04x}", ethertype); - // IPv4 Parsing + // Check if the packet is IPv4 and we have enough data to parse the IP header if ethertype == 0x0800 && data.len() >= 34 { + let ip_header_len = (data[14] & 0x0f) * 4; debug_print!("IP header length: {}", ip_header_len); - + src_ip = format!("{}.{}.{}.{}", data[26], data[27], data[28], data[29]); dst_ip = format!("{}.{}.{}.{}", data[30], data[31], data[32], data[33]); - + let ip_protocol = data[23]; debug_print!("IP Protocol: {}", ip_protocol); - + let transport_header_start = 14 + ip_header_len as usize; - + + // Check if we have enough data to parse the transport header match ip_protocol { - 6 => { - protocol = String::from("TCP"); - if data.len() >= transport_header_start + 4 { - src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); - dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); - } - }, - 17 => { - protocol = String::from("UDP"); - if data.len() >= transport_header_start + 4 { - src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); - dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); - } - }, - _ => protocol = format!("IP({})", ip_protocol), - } - - let payload_start = transport_header_start + match ip_protocol { - 6 => 20, - 17 => 8, - _ => 0, - }; - - if data.len() > payload_start { - payload = data[payload_start..].to_vec(); - } - + 6 => { + protocol = String::from("TCP"); + if data.len() >= transport_header_start + 4 { + src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); + debug_print!("TCP Source Port: {}", src_port); + dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); + debug_print!("TCP Destination Port: {}", dst_port); + } + }, + 17 => { + protocol = String::from("UDP"); + if data.len() >= transport_header_start + 4 { + src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); + debug_print!("UDP Source Port: {}", src_port); + dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); + debug_print!("UDP Destination Port: {}", dst_port); + } + }, + _ => protocol = format!("IP({})", ip_protocol), } - // IPv6 Parsing - else if ethertype == 0x86DD && data.len() >= 54 { - src_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", - u16::from_be_bytes([data[22], data[23]]), - u16::from_be_bytes([data[24], data[25]]), - u16::from_be_bytes([data[26], data[27]]), - u16::from_be_bytes([data[28], data[29]]), - u16::from_be_bytes([data[30], data[31]]), - u16::from_be_bytes([data[32], data[33]]), - u16::from_be_bytes([data[34], data[35]]), - u16::from_be_bytes([data[36], data[37]])); - dst_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", - u16::from_be_bytes([data[38], data[39]]), - u16::from_be_bytes([data[40], data[41]]), - u16::from_be_bytes([data[42], data[43]]), - u16::from_be_bytes([data[44], data[45]]), - u16::from_be_bytes([data[46], data[47]]), - u16::from_be_bytes([data[48], data[49]]), - u16::from_be_bytes([data[50], data[51]]), - u16::from_be_bytes([data[52], data[53]])); - - let next_header = data[20]; - debug_print!("Next Header: {}", next_header); - - let transport_header_start = 54; // IPv6 header is 40 bytes - - match next_header { - 6 => { - protocol = String::from("TCP"); - if data.len() >= transport_header_start + 4 { - src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); - dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); - } - }, - 17 => { - protocol = String::from("UDP"); - if data.len() >= transport_header_start + 4 { - src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); - dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); - } - }, - _ => protocol = format!("IPv6({})", next_header), - } + + // Calculate the start of the payload + let payload_start = transport_header_start + match ip_protocol { + 6 => 20, // TCP: 20 bytes + 17 => 8, // UDP: 8 bytes + _ => 0, + }; + + // Copy the payload data + if data.len() > payload_start { + payload = data[payload_start..].to_vec(); + } + + // Check if the packet is IPv6 and we have enough data to parse the IP header + } + else if ethertype == 0x86DD && data.len() >= 54 { + + let ip_header_len = 54; + debug_print!("IPv6 header length: {}", ip_header_len); + + src_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + u16::from_be_bytes([data[22], data[23]]), + u16::from_be_bytes([data[24], data[25]]), + u16::from_be_bytes([data[26], data[27]]), + u16::from_be_bytes([data[28], data[29]]), + u16::from_be_bytes([data[30], data[31]]), + u16::from_be_bytes([data[32], data[33]]), + u16::from_be_bytes([data[34], data[35]]), + u16::from_be_bytes([data[36], data[37]])); + dst_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + u16::from_be_bytes([data[38], data[39]]), + u16::from_be_bytes([data[40], data[41]]), + u16::from_be_bytes([data[42], data[43]]), + u16::from_be_bytes([data[44], data[45]]), + u16::from_be_bytes([data[46], data[47]]), + u16::from_be_bytes([data[48], data[49]]), + u16::from_be_bytes([data[50], data[51]]), + u16::from_be_bytes([data[52], data[53]])); - if data.len() > transport_header_start { - payload = data[transport_header_start..].to_vec(); - } + let ip_protocol = data[20]; + debug_print!("IP protocol: {}", ip_protocol); + + let transport_header_start = ip_header_len; // IPv6 header is fixed size 54 bytes + + // Check if we have enough data to parse the transport header + match ip_protocol { + 6 => { + protocol = String::from("TCP"); + if data.len() >= transport_header_start + 4 { + src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); + dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); + } + }, + 17 => { + protocol = String::from("UDP"); + if data.len() >= transport_header_start + 4 { + src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); + dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); + } + }, + _ => protocol = format!("IPv6({})", ip_protocol), + } + + // Calculate the start of the payload + let payload_start = transport_header_start + match ip_protocol { + 6 => 20, // TCP: 20 bytes + 17 => 8, // UDP: 8 bytes + _ => 0, + }; + + // Copy the payload data + if data.len() > payload_start { + payload = data[payload_start..].to_vec(); + } } } debug_print!("Parsed packet: {}:{} -> {}:{} ({})", - src_ip, src_port, dst_ip, dst_port, protocol); - + src_ip, src_port, dst_ip, dst_port, protocol); + Ok((src_ip, dst_ip, src_port, dst_port, protocol, payload)) } } From 7436cca77dabf8dc2ca603c9c80934d6093eb386 Mon Sep 17 00:00:00 2001 From: kYroL01 Date: Sun, 5 Jan 2025 13:50:44 -0600 Subject: [PATCH 3/5] Improve code - removed code duplication --- src/lib.rs | 148 ++++++++++++++++++++++------------------------------- 1 file changed, 61 insertions(+), 87 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5b61341..390bb09 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -219,8 +219,10 @@ impl VTab for PcapVTab { } impl PcapVTab { - // parse_packet - // Return the source IP, destination IP, source port, destination port, protocol, payload + /* + function parse_packet + Return the source IP, destination IP, source port, destination port, protocol, payload + */ fn parse_packet(data: &[u8]) -> Result<(String, String, u16, u16, String, Vec), Box> { let mut src_ip = String::from("0.0.0.0"); let mut dst_ip = String::from("0.0.0.0"); @@ -228,33 +230,66 @@ impl PcapVTab { let mut dst_port = 0; let mut protocol = String::from("UNKNOWN"); let mut payload = Vec::new(); - + debug_print!("Parsing packet of length: {}", data.len()); - - // Check if we have enough data to parse the Ethernet header + + // Parse the Ethernet header if data.len() >= 14 { - let ethertype = u16::from_be_bytes([data[12], data[13]]); debug_print!("Ethertype: 0x{:04x}", ethertype); - - // Check if the packet is IPv4 and we have enough data to parse the IP header + + let mut transport_header_start = 0; + let mut ip_protocol = 0; + + // Parse the IP header if ethertype == 0x0800 && data.len() >= 34 { - + let ip_header_len = (data[14] & 0x0f) * 4; debug_print!("IP header length: {}", ip_header_len); - + src_ip = format!("{}.{}.{}.{}", data[26], data[27], data[28], data[29]); dst_ip = format!("{}.{}.{}.{}", data[30], data[31], data[32], data[33]); - - let ip_protocol = data[23]; + + ip_protocol = data[23]; debug_print!("IP Protocol: {}", ip_protocol); + + transport_header_start = 14 + ip_header_len as usize; + } + // Parse the IPv6 header + else if ethertype == 0x86DD && data.len() >= 54 { + + let ip_header_len = 54; + debug_print!("IPv6 header length: {}", ip_header_len); + + src_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + u16::from_be_bytes([data[22], data[23]]), + u16::from_be_bytes([data[24], data[25]]), + u16::from_be_bytes([data[26], data[27]]), + u16::from_be_bytes([data[28], data[29]]), + u16::from_be_bytes([data[30], data[31]]), + u16::from_be_bytes([data[32], data[33]]), + u16::from_be_bytes([data[34], data[35]]), + u16::from_be_bytes([data[36], data[37]])); + dst_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + u16::from_be_bytes([data[38], data[39]]), + u16::from_be_bytes([data[40], data[41]]), + u16::from_be_bytes([data[42], data[43]]), + u16::from_be_bytes([data[44], data[45]]), + u16::from_be_bytes([data[46], data[47]]), + u16::from_be_bytes([data[48], data[49]]), + u16::from_be_bytes([data[50], data[51]]), + u16::from_be_bytes([data[52], data[53]])); + + ip_protocol = data[20]; + debug_print!("IP protocol: {}", ip_protocol); + + transport_header_start = ip_header_len as usize; + } - let transport_header_start = 14 + ip_header_len as usize; - - // Check if we have enough data to parse the transport header - match ip_protocol { + // Parse the transport header + match ip_protocol { 6 => { protocol = String::from("TCP"); if data.len() >= transport_header_start + 4 { @@ -276,85 +311,24 @@ impl PcapVTab { _ => protocol = format!("IP({})", ip_protocol), } - // Calculate the start of the payload + // Parse the payload let payload_start = transport_header_start + match ip_protocol { - 6 => 20, // TCP: 20 bytes - 17 => 8, // UDP: 8 bytes - _ => 0, + 6 => 20, + 17 => 8, + _ => 0, }; - // Copy the payload data - if data.len() > payload_start { - payload = data[payload_start..].to_vec(); - } - - // Check if the packet is IPv6 and we have enough data to parse the IP header - } - else if ethertype == 0x86DD && data.len() >= 54 { - - let ip_header_len = 54; - debug_print!("IPv6 header length: {}", ip_header_len); - - src_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", - u16::from_be_bytes([data[22], data[23]]), - u16::from_be_bytes([data[24], data[25]]), - u16::from_be_bytes([data[26], data[27]]), - u16::from_be_bytes([data[28], data[29]]), - u16::from_be_bytes([data[30], data[31]]), - u16::from_be_bytes([data[32], data[33]]), - u16::from_be_bytes([data[34], data[35]]), - u16::from_be_bytes([data[36], data[37]])); - dst_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", - u16::from_be_bytes([data[38], data[39]]), - u16::from_be_bytes([data[40], data[41]]), - u16::from_be_bytes([data[42], data[43]]), - u16::from_be_bytes([data[44], data[45]]), - u16::from_be_bytes([data[46], data[47]]), - u16::from_be_bytes([data[48], data[49]]), - u16::from_be_bytes([data[50], data[51]]), - u16::from_be_bytes([data[52], data[53]])); - - let ip_protocol = data[20]; - debug_print!("IP protocol: {}", ip_protocol); - - let transport_header_start = ip_header_len; // IPv6 header is fixed size 54 bytes - - // Check if we have enough data to parse the transport header - match ip_protocol { - 6 => { - protocol = String::from("TCP"); - if data.len() >= transport_header_start + 4 { - src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); - dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); - } - }, - 17 => { - protocol = String::from("UDP"); - if data.len() >= transport_header_start + 4 { - src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); - dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); - } - }, - _ => protocol = format!("IPv6({})", ip_protocol), - } - - // Calculate the start of the payload - let payload_start = transport_header_start + match ip_protocol { - 6 => 20, // TCP: 20 bytes - 17 => 8, // UDP: 8 bytes - _ => 0, - }; - - // Copy the payload data + // Copy the payload if data.len() > payload_start { payload = data[payload_start..].to_vec(); } - } } - + + // Print the parsed packet debug_print!("Parsed packet: {}:{} -> {}:{} ({})", - src_ip, src_port, dst_ip, dst_port, protocol); - + src_ip, src_port, dst_ip, dst_port, protocol); + + // Return the parsed packet Ok((src_ip, dst_ip, src_port, dst_port, protocol, payload)) } } From d5063d2817ea0bc182912c35dabd5ca5a3648b89 Mon Sep 17 00:00:00 2001 From: kYroL01 Date: Fri, 10 Jan 2025 06:08:30 -0600 Subject: [PATCH 4/5] skip metadata from pcap (thanks to giuseppe) --- src/lib.rs | 393 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 225 insertions(+), 168 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 390bb09..3696595 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,19 +6,19 @@ extern crate pcap_parser; use std::mem::ManuallyDrop; use duckdb::{ - core::{DataChunkHandle, LogicalTypeHandle, LogicalTypeId, Inserter}, + core::{DataChunkHandle, Inserter, LogicalTypeHandle, LogicalTypeId}, vtab::{BindInfo, Free, FunctionInfo, InitInfo, VTab}, Connection, Result, }; use duckdb_loadable_macros::duckdb_entrypoint_c_api; use libduckdb_sys as ffi; -use pcap_parser::*; use pcap_parser::traits::PcapReaderIterator; +use pcap_parser::*; use std::{ error::Error, ffi::{c_char, CStr, CString}, fs::File, - io::{Read,Cursor}, + io::{Cursor, Read}, }; macro_rules! debug_print { @@ -63,12 +63,18 @@ impl VTab for PcapVTab { type BindData = PcapBindData; unsafe fn bind(bind: &BindInfo, data: *mut PcapBindData) -> Result<(), Box> { - bind.add_result_column("timestamp", LogicalTypeHandle::from(LogicalTypeId::Timestamp)); + bind.add_result_column( + "timestamp", + LogicalTypeHandle::from(LogicalTypeId::Timestamp), + ); bind.add_result_column("src_ip", LogicalTypeHandle::from(LogicalTypeId::Varchar)); bind.add_result_column("dst_ip", LogicalTypeHandle::from(LogicalTypeId::Varchar)); bind.add_result_column("src_port", LogicalTypeHandle::from(LogicalTypeId::Integer)); bind.add_result_column("dst_port", LogicalTypeHandle::from(LogicalTypeId::Integer)); - bind.add_result_column("protocol", LogicalTypeHandle::from(LogicalTypeId::Varchar)); + bind.add_result_column( + "L4 protocol", + LogicalTypeHandle::from(LogicalTypeId::Varchar), + ); bind.add_result_column("length", LogicalTypeHandle::from(LogicalTypeId::Integer)); bind.add_result_column("payload", LogicalTypeHandle::from(LogicalTypeId::Varchar)); @@ -79,136 +85,166 @@ impl VTab for PcapVTab { Ok(()) } + // Initialize the VTab unsafe fn init(info: &InitInfo, data: *mut PcapInitData) -> Result<(), Box> { - let bind_data = info.get_bind_data::(); - let filepath = unsafe { CStr::from_ptr((*bind_data).filepath).to_str()? }; - - debug_print!("Opening file: {}", filepath); - - let reader: Box = if filepath.starts_with("http://") || filepath.starts_with("https://") { - debug_print!("Using HTTP reader for {}", filepath); - - // Create a channel to receive the response - let (tx, rx) = std::sync::mpsc::channel(); - - let request = ehttp::Request::get(filepath); - ehttp::fetch(request, move |result: ehttp::Result| { - tx.send(result).unwrap(); - }); - - // Wait for the response - let response = rx.recv()?.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - Box::new(Cursor::new(response.bytes)) - } else { - debug_print!("Using file reader for {}", filepath); - Box::new(File::open(filepath)?) - }; - - unsafe { - (*data).reader = Some(ManuallyDrop::new( - LegacyPcapReader::new(65536, reader).expect("PcapReader") - )); - (*data).done = false; - } - Ok(()) + let bind_data = info.get_bind_data::(); + let filepath = unsafe { CStr::from_ptr((*bind_data).filepath).to_str()? }; + + debug_print!("Opening file: {}", filepath); + + let reader: Box = + if filepath.starts_with("http://") || filepath.starts_with("https://") { + debug_print!("Using HTTP reader for {}", filepath); + + // Create a channel to receive the response + let (tx, rx) = std::sync::mpsc::channel(); + + let request = ehttp::Request::get(filepath); + ehttp::fetch(request, move |result: ehttp::Result| { + tx.send(result).unwrap(); + }); + + // Wait for the response + let response = rx + .recv()? + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + Box::new(Cursor::new(response.bytes)) + } else { + debug_print!("Using file reader for {}", filepath); + Box::new(File::open(filepath)?) + }; + + unsafe { + (*data).reader = Some(ManuallyDrop::new( + LegacyPcapReader::new(65536, reader).expect("PcapReader"), + )); + (*data).done = false; + } + Ok(()) } - unsafe fn func(func: &FunctionInfo, output: &mut DataChunkHandle) -> Result<(), Box> { + unsafe fn func( + func: &FunctionInfo, + output: &mut DataChunkHandle, + ) -> Result<(), Box> { let init_data = func.get_init_data::(); - + unsafe { if (*init_data).done { output.set_len(0); return Ok(()); } } - - let reader = unsafe { (*init_data).reader.as_mut() }.unwrap(); + let mut count = 0; - let mut next_result = reader.next(); - - while let Err(PcapError::Incomplete(_)) = next_result { - unsafe { (*init_data).reader.as_mut() }.unwrap().refill()?; - next_result = unsafe { (*init_data).reader.as_mut() }.unwrap().next(); - } - - match next_result { - Ok((offset, block)) => { - let (timestamp, length_str, src_ip, dst_ip, src_port, dst_port, protocol, payload) = match block { - PcapBlockOwned::Legacy(packet) => { - let parsed = Self::parse_packet(&packet.data)?; - let (src_ip, dst_ip, src_port, dst_port, protocol, payload) = parsed; - - let timestamp_micros = packet.ts_sec as i64 * 1_000_000 + packet.ts_usec as i64; - - (timestamp_micros, packet.origlen.to_string(), - src_ip, dst_ip, src_port, dst_port, - protocol, payload) - }, - PcapBlockOwned::LegacyHeader(_) => { - (0, "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(), - 0, 0, "UNKNOWN".to_string(), Vec::new()) - }, - _ => { - (0, "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(), - 0, 0, "UNKNOWN".to_string(), Vec::new()) + + // Read packets from the pcap file + 'read_loop: loop { + let next_result = unsafe { (*init_data).reader.as_mut().unwrap().next() }; + + // Handle the next packet + let (offset, block) = match next_result { + Ok(result) => result, + Err(PcapError::Incomplete(_)) => { + unsafe { + (*init_data).reader.as_mut().unwrap().refill()?; } - }; + continue 'read_loop; + } + Err(PcapError::Eof) => { + unsafe { + (*init_data).done = true; + } + output.set_len(count); + return Ok(()); + } + Err(e) => return Err(Box::new(e)), + }; - debug_print!("Processing packet: timestamp={}, src={}:{}, dst={}:{}, proto={}, len={}", - timestamp, src_ip, src_port, dst_ip, dst_port, protocol, length_str); - - output.flat_vector(0).as_mut_slice::()[0] = timestamp as i64; - output.flat_vector(1).insert(count, CString::new(src_ip)?); - output.flat_vector(2).insert(count, CString::new(dst_ip)?); - output.flat_vector(3).as_mut_slice::()[0] = src_port as i32; - output.flat_vector(4).as_mut_slice::()[0] = dst_port as i32; - output.flat_vector(5).insert(count, CString::new(protocol)?); - output.flat_vector(6).as_mut_slice::()[0] = length_str.parse::().unwrap(); - - let payload_str = if !payload.is_empty() { - if let Ok(utf8_str) = std::str::from_utf8(&payload) { - if utf8_str.chars().all(|c| c.is_ascii_graphic() || c.is_ascii_whitespace()) { - format!("{}", utf8_str) - } else { - let hex_str: Vec = payload.iter() - .take(32) - .map(|b| format!("{:02x}", b)) - .collect(); - format!("{}{}", hex_str.join(" "), - if payload.len() > 32 { " ..." } else { "" }) - } - } else { - let hex_str: Vec = payload.iter() - .take(32) - .map(|b| format!("{:02x}", b)) - .collect(); - format!("{}{}", hex_str.join(" "), - if payload.len() > 32 { " ..." } else { "" }) - } - } else { - "empty".to_string() - }; - output.flat_vector(7).insert(count, CString::new(payload_str)?); - - /* - let hex: String = payload.iter() - .map(|b| format!("{:02x}", b)) - .collect(); - output.flat_vector(7).insert(count, CString::new(hex)?); - */ - - count += 1; - unsafe { (*init_data).reader.as_mut() }.unwrap().consume(offset); - }, - Err(PcapError::Eof) => { - unsafe { (*init_data).done = true; } - output.set_len(count); - return Ok(()); - }, - Err(e) => return Err(Box::new(e)), + // Process the block + match block { + // Handle the packet + PcapBlockOwned::Legacy(packet) => { + let parsed = Self::parse_packet(&packet.data)?; + let (src_ip, dst_ip, src_port, dst_port, l4_protocol, payload) = parsed; + let timestamp_micros = packet.ts_sec as i64 * 1_000_000 + packet.ts_usec as i64; + + debug_print!( + "Processing packet: timestamp={}, src={}:{}, dst={}:{}, proto={}, len={}", + timestamp_micros, + src_ip, + src_port, + dst_ip, + dst_port, + l4_protocol, + packet.origlen + ); + + output.flat_vector(0).as_mut_slice::()[0] = timestamp_micros; + output.flat_vector(1).insert(count, CString::new(src_ip)?); + output.flat_vector(2).insert(count, CString::new(dst_ip)?); + output.flat_vector(3).as_mut_slice::()[0] = src_port as i32; + output.flat_vector(4).as_mut_slice::()[0] = dst_port as i32; + output + .flat_vector(5) + .insert(count, CString::new(l4_protocol)?); + output.flat_vector(6).as_mut_slice::()[0] = packet.origlen as i32; + + let payload_str = if !payload.is_empty() { + if let Ok(utf8_str) = std::str::from_utf8(&payload) { + if utf8_str + .chars() + .all(|c| c.is_ascii_graphic() || c.is_ascii_whitespace()) + { + format!("{}", utf8_str) + } else { + let hex_str: Vec = payload + .iter() + .take(32) + .map(|b| format!("{:02x}", b)) + .collect(); + format!( + "{}{}", + hex_str.join(" "), + if payload.len() > 32 { " ..." } else { "" } + ) + } + } else { + let hex_str: Vec = payload + .iter() + .take(32) + .map(|b| format!("{:02x}", b)) + .collect(); + format!( + "{}{}", + hex_str.join(" "), + if payload.len() > 32 { " ..." } else { "" } + ) + } + } else { + "empty".to_string() + }; + output + .flat_vector(7) + .insert(count, CString::new(payload_str)?); + + count += 1; + + unsafe { + (*init_data).reader.as_mut().unwrap().consume(offset); + } + break 'read_loop; + } + // Skip non-packet blocks + PcapBlockOwned::LegacyHeader(_) | _ => { + unsafe { + (*init_data).reader.as_mut().unwrap().consume(offset); + } + continue 'read_loop; + } + } } - + // Set the number of rows in the output output.set_len(count); Ok(()) } @@ -221,49 +257,48 @@ impl VTab for PcapVTab { impl PcapVTab { /* function parse_packet - Return the source IP, destination IP, source port, destination port, protocol, payload + Return the source IP, destination IP, source port, destination port, L4 protocol, payload */ - fn parse_packet(data: &[u8]) -> Result<(String, String, u16, u16, String, Vec), Box> { + fn parse_packet( + data: &[u8], + ) -> Result<(String, String, u16, u16, String, Vec), Box> { let mut src_ip = String::from("0.0.0.0"); let mut dst_ip = String::from("0.0.0.0"); let mut src_port = 0; let mut dst_port = 0; - let mut protocol = String::from("UNKNOWN"); + let mut l4_protocol = String::from("UNKNOWN"); let mut payload = Vec::new(); - + debug_print!("Parsing packet of length: {}", data.len()); - + // Parse the Ethernet header if data.len() >= 14 { let ethertype = u16::from_be_bytes([data[12], data[13]]); debug_print!("Ethertype: 0x{:04x}", ethertype); - + let mut transport_header_start = 0; let mut ip_protocol = 0; - + // Parse the IP header if ethertype == 0x0800 && data.len() >= 34 { - let ip_header_len = (data[14] & 0x0f) * 4; debug_print!("IP header length: {}", ip_header_len); - - src_ip = format!("{}.{}.{}.{}", - data[26], data[27], data[28], data[29]); - dst_ip = format!("{}.{}.{}.{}", - data[30], data[31], data[32], data[33]); - + + src_ip = format!("{}.{}.{}.{}", data[26], data[27], data[28], data[29]); + dst_ip = format!("{}.{}.{}.{}", data[30], data[31], data[32], data[33]); + ip_protocol = data[23]; debug_print!("IP Protocol: {}", ip_protocol); - + transport_header_start = 14 + ip_header_len as usize; - } + } // Parse the IPv6 header else if ethertype == 0x86DD && data.len() >= 54 { - let ip_header_len = 54; debug_print!("IPv6 header length: {}", ip_header_len); - - src_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + + src_ip = format!( + "{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", u16::from_be_bytes([data[22], data[23]]), u16::from_be_bytes([data[24], data[25]]), u16::from_be_bytes([data[26], data[27]]), @@ -271,8 +306,10 @@ impl PcapVTab { u16::from_be_bytes([data[30], data[31]]), u16::from_be_bytes([data[32], data[33]]), u16::from_be_bytes([data[34], data[35]]), - u16::from_be_bytes([data[36], data[37]])); - dst_ip = format!("{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + u16::from_be_bytes([data[36], data[37]]) + ); + dst_ip = format!( + "{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", u16::from_be_bytes([data[38], data[39]]), u16::from_be_bytes([data[40], data[41]]), u16::from_be_bytes([data[42], data[43]]), @@ -280,56 +317,76 @@ impl PcapVTab { u16::from_be_bytes([data[46], data[47]]), u16::from_be_bytes([data[48], data[49]]), u16::from_be_bytes([data[50], data[51]]), - u16::from_be_bytes([data[52], data[53]])); - + u16::from_be_bytes([data[52], data[53]]) + ); + ip_protocol = data[20]; debug_print!("IP protocol: {}", ip_protocol); - + transport_header_start = ip_header_len as usize; } - + // Parse the transport header match ip_protocol { 6 => { - protocol = String::from("TCP"); + l4_protocol = String::from("TCP"); if data.len() >= transport_header_start + 4 { - src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); + src_port = u16::from_be_bytes([ + data[transport_header_start], + data[transport_header_start + 1], + ]); debug_print!("TCP Source Port: {}", src_port); - dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); + dst_port = u16::from_be_bytes([ + data[transport_header_start + 2], + data[transport_header_start + 3], + ]); debug_print!("TCP Destination Port: {}", dst_port); } - }, + } 17 => { - protocol = String::from("UDP"); + l4_protocol = String::from("UDP"); if data.len() >= transport_header_start + 4 { - src_port = u16::from_be_bytes([data[transport_header_start], data[transport_header_start + 1]]); + src_port = u16::from_be_bytes([ + data[transport_header_start], + data[transport_header_start + 1], + ]); debug_print!("UDP Source Port: {}", src_port); - dst_port = u16::from_be_bytes([data[transport_header_start + 2], data[transport_header_start + 3]]); + dst_port = u16::from_be_bytes([ + data[transport_header_start + 2], + data[transport_header_start + 3], + ]); debug_print!("UDP Destination Port: {}", dst_port); } - }, - _ => protocol = format!("IP({})", ip_protocol), + } + _ => l4_protocol = format!("IP({})", ip_protocol), } - + // Parse the payload - let payload_start = transport_header_start + match ip_protocol { - 6 => 20, - 17 => 8, - _ => 0, - }; - + let payload_start = transport_header_start + + match ip_protocol { + 6 => 20, + 17 => 8, + _ => 0, + }; + // Copy the payload if data.len() > payload_start { payload = data[payload_start..].to_vec(); } } - + // Print the parsed packet - debug_print!("Parsed packet: {}:{} -> {}:{} ({})", - src_ip, src_port, dst_ip, dst_port, protocol); - - // Return the parsed packet - Ok((src_ip, dst_ip, src_port, dst_port, protocol, payload)) + debug_print!( + "Parsed packet: {}:{} -> {}:{} ({})", + src_ip, + src_port, + dst_ip, + dst_port, + l4_protocol + ); + + // Return the parsed packet + Ok((src_ip, dst_ip, src_port, dst_port, l4_protocol, payload)) } } From d165a364b6cc0c9b4465db316f2c30566a204bb7 Mon Sep 17 00:00:00 2001 From: kYroL01 Date: Wed, 29 Jan 2025 18:12:59 +0100 Subject: [PATCH 5/5] Add TEST for cargo test --- src/lib.rs | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 3696595..30d935d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -396,3 +396,112 @@ pub unsafe fn extension_entrypoint(con: Connection) -> Result<(), Box .expect("Failed to register pcap_reader function"); Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + // Helper function to create a simple IPv4 packet + fn create_ipv4_tcp_packet() -> Vec { + let packet = vec![ + // Ethernet header (14 bytes) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Destination MAC + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Source MAC + 0x08, 0x00, // IPv4 EtherType + // IPv4 header (20 bytes) + 0x45, 0x00, // Version & IHL, DSCP & ECN + 0x00, 0x28, // Total Length + 0x00, 0x00, 0x00, 0x00, // ID, Flags, Fragment Offset + 0x40, 0x06, // TTL, Protocol (6 = TCP) + 0x00, 0x00, // Header Checksum + 192, 168, 1, 100, // Source IP (192.168.1.100) + 10, 0, 0, 1, // Destination IP (10.0.0.1) + // TCP header (20 bytes) + 0x12, 0x34, // Source Port (4660) + 0x45, 0x67, // Destination Port (17767) + 0x00, 0x00, 0x00, 0x00, // Sequence Number + 0x00, 0x00, 0x00, 0x00, // Acknowledgment Number + 0x50, 0x00, // Data Offset & Flags + 0x00, 0x00, // Window Size + 0x00, 0x00, // Checksum + 0x00, 0x00, // Urgent Pointer + // Payload + 0x48, 0x65, 0x6c, 0x6c, 0x6f, // "Hello" in ASCII + ]; + packet + } + + // Helper function to create a simple IPv6 packet + fn create_ipv6_udp_packet() -> Vec { + let packet = vec![ + // Ethernet header (14 bytes) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Destination MAC + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Source MAC + 0x86, 0xDD, // IPv6 EtherType + // IPv6 header (40 bytes) + 0x60, 0x00, 0x00, 0x00, // Version, Traffic Class, Flow Label + 0x00, 0x08, // Payload Length + 17, 0x40, // Next Header (17 = UDP), Hop Limit + 0x20, 0x01, 0x0d, 0xb8, // Source IP (2001:db8::1) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x01, + 0x0d, 0xb8, // Destination IP (2001:db8::2) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, + // UDP header (8 bytes) + 0x89, 0x13, // Source Port (35091) + 0x12, 0x34, // Destination Port (4660) + 0x00, 0x08, // Length + 0x00, 0x00, // Checksum + // Payload + 0x54, 0x65, 0x73, 0x74, // "Test" in ASCII + ]; + packet + } + + #[test] + fn test_parse_ipv4_tcp_packet() { + let packet = create_ipv4_tcp_packet(); + let result = PcapVTab::parse_packet(&packet).unwrap(); + + assert_eq!(result.0, "192.168.1.100"); // Source IP + assert_eq!(result.1, "10.0.0.1"); // Destination IP + assert_eq!(result.2, 4660); // Source Port + assert_eq!(result.3, 17767); // Destination Port + assert_eq!(result.4, "TCP"); // Protocol + assert_eq!(result.5, b"Hello"); // Payload + } + + #[test] + fn test_parse_ipv6_udp_packet() { + let packet = create_ipv6_udp_packet(); + let result = PcapVTab::parse_packet(&packet).unwrap(); + + assert_eq!(result.0, "2001:db8:0:0:0:0:0:1"); // Source IP + assert_eq!(result.1, "2001:db8:0:0:0:0:0:2"); // Destination IP + assert_eq!(result.2, 35091); // Source Port + assert_eq!(result.3, 4660); // Destination Port + assert_eq!(result.4, "UDP"); // Protocol + assert_eq!(result.5, b"Test"); // Payload + } + + #[test] + fn test_parse_small_packet() { + let packet = vec![0; 10]; // Packet too small to contain headers + let result = PcapVTab::parse_packet(&packet).unwrap(); + + assert_eq!(result.0, "0.0.0.0"); // Default IP + assert_eq!(result.1, "0.0.0.0"); // Default IP + assert_eq!(result.2, 0); // Default Port + assert_eq!(result.3, 0); // Default Port + assert_eq!(result.4, "UNKNOWN"); // Default Protocol + assert!(result.5.is_empty()); // Empty Payload + } + + #[test] + fn test_parse_unknown_protocol() { + let mut packet = create_ipv4_tcp_packet(); + packet[23] = 100; // Change protocol number to unknown value + let result = PcapVTab::parse_packet(&packet).unwrap(); + + assert_eq!(result.4, "IP(100)"); // Unknown protocol + } +}