Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 49 additions & 5 deletions dfetch/project/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import datetime
import os
from dataclasses import dataclass
from typing import Iterable, Optional

import yaml
from typing_extensions import TypedDict
Expand All @@ -15,6 +17,29 @@
"""


@dataclass
class FileInfo:
"""Information about a single fetched file."""

path: str
hash: str
permissions: str # octal

def __repr__(self) -> str:
return f"{self.path.replace("|", r"\|")}|{self.hash}|{self.permissions}"

@staticmethod
def from_list(data: Iterable[str]) -> Iterable["FileInfo"]:
"""Create a list of FileInfo's from a string"""
parsed = []
for entry in data:
path, hash_digest, permissions = (
entry.split("|", maxsplit=3) + ["", "", ""]
)[:3]
parsed.append(FileInfo(path, hash_digest, permissions.zfill(3)))
return parsed


class Options(TypedDict): # pylint: disable=too-many-ancestors
"""Argument types for Metadata class construction."""

Expand All @@ -26,6 +51,7 @@ class Options(TypedDict): # pylint: disable=too-many-ancestors
destination: str
hash: str
patch: str
files: Optional[Iterable[FileInfo]] = None


class Metadata:
Expand All @@ -50,6 +76,9 @@ def __init__(self, kwargs: Options) -> None:
self._destination: str = str(kwargs.get("destination", ""))
self._hash: str = str(kwargs.get("hash", ""))
self._patch: str = str(kwargs.get("patch", ""))
self._files: Optional[Iterable[FileInfo]] = FileInfo.from_list(
kwargs.get("files", [])
)

@classmethod
def from_project_entry(cls, project: ProjectEntry) -> "Metadata":
Expand All @@ -63,6 +92,7 @@ def from_project_entry(cls, project: ProjectEntry) -> "Metadata":
"last_fetch": datetime.datetime(2000, 1, 1, 0, 0, 0),
"hash": "",
"patch": project.patch,
"files": [],
}
return cls(data)

Expand All @@ -73,12 +103,19 @@ def from_file(cls, path: str) -> "Metadata":
data: Options = yaml.safe_load(metadata_file)["dfetch"]
return cls(data)

def fetched(self, version: Version, hash_: str = "", patch_: str = "") -> None:
def fetched(
self,
version: Version,
hash_: str = "",
patch_: str = "",
files: Optional[Iterable[FileInfo]] = None,
) -> None:
"""Update metadata."""
self._last_fetch = datetime.datetime.now()
self._version = version
self._hash = hash_
self._patch = patch_
self._files = files

@property
def version(self) -> Version:
Expand All @@ -90,6 +127,11 @@ def branch(self) -> str:
"""Branch as stored in the metadata."""
return self._version.branch

@property
def files(self) -> Iterable[FileInfo]:
"""File info as stored in the metadata."""
return self._files
Comment on lines +130 to +133
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect return type annotation for files property category Functionality

Tell me more
What is the issue?

The files property returns Optional[Iterable[FileInfo]] but is annotated to return Iterable[FileInfo]

Why this matters

This type mismatch could cause runtime errors when consumers expect a non-None return value but receive None

Suggested change ∙ Feature Preview

Update the return type annotation to match the actual return type:

@property
def files(self) -> Optional[Iterable[FileInfo]]:
    """File info as stored in the metadata."""
    return self._files

Report a problem with this comment

💬 Looking for more details? Reply to this comment to chat with Korbit.


@property
def tag(self) -> str:
"""Tag as stored in the metadata."""
Expand Down Expand Up @@ -147,6 +189,7 @@ def __eq__(self, other: object) -> bool:
other._version.revision == self._version.revision,
other.hash == self.hash,
other.patch == self.patch,
other._files == self._files,
]
)

Expand All @@ -156,14 +199,15 @@ def dump(self) -> None:
"dfetch": {
"remote_url": self.remote_url,
"branch": self._version.branch,
"revision": self._version.revision,
"last_fetch": self.last_fetch_string(),
"tag": self._version.tag,
"hash": self.hash,
"last_fetch": self.last_fetch_string(),
"patch": self.patch,
"revision": self._version.revision,
"tag": self._version.tag,
"files": [str(info) for info in self._files or []],
}
}

with open(self.path, "w+", encoding="utf-8") as metadata_file:
metadata_file.write(DONT_EDIT_WARNING)
yaml.dump(metadata, metadata_file)
yaml.dump(metadata, metadata_file, sort_keys=False)
81 changes: 69 additions & 12 deletions dfetch/project/vcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import os
import pathlib
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence, Tuple
from contextlib import suppress
from typing import Iterable, List, Optional, Sequence, Tuple

from halo import Halo
from patch_ng import fromfile
Expand All @@ -13,8 +14,13 @@
from dfetch.manifest.project import ProjectEntry
from dfetch.manifest.version import Version
from dfetch.project.abstract_check_reporter import AbstractCheckReporter
from dfetch.project.metadata import Metadata
from dfetch.util.util import hash_directory, safe_rm
from dfetch.project.metadata import FileInfo, Metadata
from dfetch.util.util import (
hash_directory,
hash_file_normalized,
recursive_listdir,
safe_rm,
)
from dfetch.util.versions import latest_tag_from_list

logger = get_logger(__name__)
Expand Down Expand Up @@ -111,7 +117,20 @@ def update(self, force: bool = False) -> None:

if os.path.exists(self.local_path):
logger.debug(f"Clearing destination {self.local_path}")
safe_rm(self.local_path)

with suppress(TypeError):
metadata_files = Metadata.from_file(self.__metadata.path).files
Comment on lines +121 to +122
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unexplained Error Suppression category Readability

Tell me more
What is the issue?

Silent error suppression without explaining why TypeError is expected or can be safely ignored.

Why this matters

Code maintainers will have to dig through the codebase to understand why this error is suppressed, making the code harder to understand and maintain.

Suggested change ∙ Feature Preview

Add a comment explaining the rationale:

# Suppress TypeError when metadata file is invalid or has old format without 'files' field
with suppress(TypeError):
    metadata_files = Metadata.from_file(self.__metadata.path).files

Report a problem with this comment

💬 Looking for more details? Reply to this comment to chat with Korbit.


if metadata_files:
for file in metadata_files:
full_path = os.path.join(self.local_path, file.path)
safe_rm(full_path)
parent_dir = os.path.dirname(full_path)
# remove parent if empty
if not os.listdir(parent_dir):
safe_rm(parent_dir)
else:
safe_rm(self.local_path)

with Halo(
text=f"Fetching {self.__project.name} {to_fetch}",
Expand All @@ -130,10 +149,34 @@ def update(self, force: bool = False) -> None:
else:
logger.warning(f"Skipping non-existent patch {self.__project.patch}")

if os.path.isfile(self.local_path):
files_list = (
FileInfo(
os.path.basename(self.local_path),
hash_file_normalized(os.path.join(self.local_path)).hexdigest(),
oct(os.stat(os.path.join(self.local_path)).st_mode)[-3:],
),
)
Comment on lines +153 to +159
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant file operations category Performance

Tell me more
What is the issue?

Redundant os.path.join calls and file stat operations

Why this matters

Multiple system calls to the same file wastes I/O operations which impacts performance, especially when dealing with many files

Suggested change ∙ Feature Preview

Cache the joined path and file stat results:

full_path = os.path.join(self.local_path)
stat_result = os.stat(full_path)
files_list = (
    FileInfo(
        os.path.basename(self.local_path),
        hash_file_normalized(full_path).hexdigest(),
        oct(stat_result.st_mode)[-3:],
    ),
)

Report a problem with this comment

💬 Looking for more details? Reply to this comment to chat with Korbit.

else:
all_files = (
file_path
for file_path in recursive_listdir(self.local_path)
if file_path is not self.__metadata.FILENAME
)
files_list = (
FileInfo(
os.path.relpath(file_path, self.local_path),
hash_file_normalized(file_path).hexdigest(),
oct(os.stat(file_path).st_mode)[-3:],
)
for file_path in all_files
)

self.__metadata.fetched(
actually_fetched,
hash_=hash_directory(self.local_path, skiplist=[self.__metadata.FILENAME]),
patch_=applied_patch,
files=files_list,
)

logger.debug(f"Writing repo metadata to: {self.__metadata.path}")
Expand Down Expand Up @@ -289,23 +332,24 @@ def on_disk_version(self) -> Optional[Version]:
)
return None

def _on_disk_hash(self) -> Optional[str]:
def _on_disk_hash(self) -> Tuple[Iterable[FileInfo], Optional[str]]:
"""Get the hash of the project on disk.

Returns:
Str: Could be None if no on disk version
"""
if not os.path.exists(self.__metadata.path):
return None
return [], None

try:
return Metadata.from_file(self.__metadata.path).hash
metadata = Metadata.from_file(self.__metadata.path)
return metadata.files, metadata.hash
except TypeError:
logger.warning(
f"{pathlib.Path(self.__metadata.path).relative_to(os.getcwd()).as_posix()}"
" is an invalid metadata file, not checking local hash!"
)
return None
return [], None

def _check_for_newer_version(self) -> Optional[Version]:
"""Check if a newer version is available on the given branch.
Expand Down Expand Up @@ -345,11 +389,24 @@ def _are_there_local_changes(self) -> bool:
Bool: True if there are local changes, false if no were detected or no hash was found.
"""
logger.debug(f"Checking if there were local changes in {self.local_path}")
on_disk_hash = self._on_disk_hash()

return bool(on_disk_hash) and on_disk_hash != hash_directory(
self.local_path, skiplist=[self.__metadata.FILENAME]
)
file_info, on_disk_hash = self._on_disk_hash()

if not file_info:
return bool(on_disk_hash) and on_disk_hash != hash_directory(
self.local_path, skiplist=[self.__metadata.FILENAME]
)
Comment on lines +395 to +398
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complex Boolean Logic category Readability

Tell me more
What is the issue?

Complex boolean expression with unclear fallback logic mixing multiple conditions.

Why this matters

The nested conditions and boolean operations make it difficult to understand the flow and intention of the code at a glance.

Suggested change ∙ Feature Preview

Split into more explicit conditions:

if not file_info:
    if not on_disk_hash:
        return False
    current_hash = hash_directory(self.local_path, skiplist=[self.__metadata.FILENAME])
    return on_disk_hash != current_hash

Report a problem with this comment

💬 Looking for more details? Reply to this comment to chat with Korbit.


for file in file_info:
full_path = os.path.join(self.local_path, file.path)
if hash_file_normalized(full_path).hexdigest() != file.hash:
Comment on lines +401 to +402
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing File Existence Check category Error Handling

Tell me more
What is the issue?

The code doesn't check if the file exists before attempting to hash it, which could cause crashes.

Why this matters

If a file was deleted but still exists in the metadata, this will raise an unhandled exception when trying to hash a non-existent file.

Suggested change ∙ Feature Preview

Add file existence check before hashing:

full_path = os.path.join(self.local_path, file.path)
if not os.path.exists(full_path):
    logger.debug(f"File {full_path} no longer exists!")
    return True
if hash_file_normalized(full_path).hexdigest() != file.hash:

Report a problem with this comment

💬 Looking for more details? Reply to this comment to chat with Korbit.

logger.debug(f"The hash of {full_path} changed!")
return True
if oct(os.stat(full_path).st_mode)[-3:] != file.permissions:
logger.debug(f"The file permissions of {full_path} changed!")
return True

return False

@abstractmethod
def _fetch_impl(self, version: Version) -> Version:
Expand Down
35 changes: 35 additions & 0 deletions dfetch/util/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import fnmatch
import hashlib
import os
import re
import shutil
import stat
from contextlib import contextmanager
Expand Down Expand Up @@ -104,6 +105,21 @@ def find_file(name: str, path: str = ".") -> List[str]:
]


def recursive_listdir(directory):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Type Hints category Readability

Tell me more
What is the issue?

Missing type hints for both the parameter and return type in the recursive_listdir function.

Why this matters

Type hints help with code understanding, IDE support, and static type checking. Their absence makes it harder to understand what the function expects and returns without diving into the implementation.

Suggested change ∙ Feature Preview
def recursive_listdir(directory: str) -> Generator[str, None, None]:

Report a problem with this comment

💬 Looking for more details? Reply to this comment to chat with Korbit.

"""List all entries in the current directory."""
entries = os.listdir(directory)
Comment on lines +108 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Error Handling in Directory Traversal category Error Handling

Tell me more
What is the issue?

The function recursive_listdir() doesn't handle potential permission errors or broken symlinks when accessing directories.

Why this matters

If the function encounters a directory without read permissions or a broken symlink, it will raise an unhandled OSError/PermissionError, causing the entire directory traversal to fail.

Suggested change ∙ Feature Preview

Add error handling to gracefully skip inaccessible directories:

def recursive_listdir(directory):
    """List all entries in the current directory."""
    try:
        entries = os.listdir(directory)
        
        for entry in entries:
            full_path = os.path.join(directory, entry)
            
            try:
                if os.path.isdir(full_path):
                    yield from recursive_listdir(full_path)
                else:
                    yield full_path
            except (OSError, PermissionError):
                continue
    except (OSError, PermissionError):
        return

Report a problem with this comment

💬 Looking for more details? Reply to this comment to chat with Korbit.


for entry in entries:
full_path = os.path.join(directory, entry)
Comment on lines +110 to +113
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directory Traversal Vulnerability category Security

Tell me more
What is the issue?

The recursive_listdir function is vulnerable to directory traversal attacks if the input directory path is not validated.

Why this matters

Without path validation, malicious input could potentially access files outside the intended directory tree through symbolic links or relative paths.

Suggested change ∙ Feature Preview

Add path validation and resolve symbolic links:

directory = os.path.abspath(directory)
if not os.path.realpath(directory).startswith(os.path.realpath(safe_root)):
    raise ValueError("Access denied: Directory outside allowed path")

Report a problem with this comment

💬 Looking for more details? Reply to this comment to chat with Korbit.


if os.path.isdir(full_path):
# If the entry is a directory, recurse into it
yield from recursive_listdir(full_path)
else:
# If the entry is a file, yield its path
yield full_path


def hash_directory(path: str, skiplist: Optional[List[str]]) -> str:
"""Hash a directory with all its files."""
digest = hashlib.md5() # nosec
Expand Down Expand Up @@ -131,3 +147,22 @@ def hash_file(file_path: str, digest: HASH) -> HASH:
buf = f_obj.read(1024 * 1024)

return digest


def hash_file_normalized(file_path: str) -> "hashlib._Hash":
"""
hash a file's contents, ignoring line feed differences (line ending normalization)
"""
digest = hashlib.sha1(usedforsecurity=False)

if os.path.isfile(file_path):
normalize_re = re.compile(b"\r\n|\r")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undocumented Regex Pattern category Readability

Tell me more
What is the issue?

Regular expression pattern is defined inside the function without explanation of what it matches.

Why this matters

Complex regex patterns without documentation or clear variable names make the code harder to understand and maintain.

Suggested change ∙ Feature Preview
# Define at module level with clear name
LINE_ENDING_PATTERN = re.compile(b"\r\n|\r")  # Matches Windows (CRLF) and old Mac (CR) line endings

Report a problem with this comment

💬 Looking for more details? Reply to this comment to chat with Korbit.


with open(file_path, "rb") as f_obj:
buf = f_obj.read(1024 * 1024)
while buf:
normalized_buf = normalize_re.sub(b"\n", buf)
digest.update(normalized_buf) # nosec
buf = f_obj.read(1024 * 1024)

return digest
Loading