diff --git a/.gitignore b/.gitignore index 2ce4545e3c..268aa1b984 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,7 @@ qodana* /.claude /.gemini /.playwright-mcp +/.opencode/plans .kotlin .jreleaser multipass-client-certificate diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsBuilder.java b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsBuilder.java new file mode 100644 index 0000000000..337e1fb16f --- /dev/null +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsBuilder.java @@ -0,0 +1,410 @@ +package network.crypta.client.async; + +import java.io.DataInputStream; +import java.io.IOException; +import network.crypta.client.FetchContext; +import network.crypta.client.FetchException; +import network.crypta.client.FetchException.FetchExceptionMode; +import network.crypta.client.Metadata; +import network.crypta.keys.CHKBlock; +import network.crypta.node.KeysFetchingLocally; +import network.crypta.support.io.StorageFormatException; + +/** + * Builds splitfile segment storage objects for new and resumed fetches. + * + *

This stateless helper centralizes the offset calculations and segment instantiation performed + * during splitfile storage setup. Callers assemble either a {@link SegmentsBuildContext} for the + * fresh-download path or a {@link SplitFileFetcherSegmentsLoadParams} bundle for the resume path + * and then invoke the corresponding initializer. Each initializer walks the segment list, computes + * per-segment offsets, validates counts against expected totals, and wires cross-segment helpers so + * that the surrounding {@link SplitFileFetcherStorage} can schedule work without duplicating layout + * math. + * + *

The builder does not retain a mutable state between calls; thread safety therefore depends on + * the provided storage, arrays, and key listeners. Inputs are expected to be consistent with the + * splitfile metadata and persisted layout, and the methods enforce this with validation and + * assertions rather than defensive copying. + * + *

+ * + * @see SplitFileFetcherStorage + * @see SplitFileFetcherSegmentsLoadParams + */ +final class SplitFileFetcherSegmentsBuilder { + /** Hidden utility constructor for a static-only helper. */ + private SplitFileFetcherSegmentsBuilder() {} + + /** + * Mutable context bag for building segments from in-memory metadata. + * + *

Callers populate the fields before invoking {@link + * #initSegmentsAndKeys(SegmentsBuildContext)}. The instance is consumed as-is, so all arrays, + * offsets, and sizing metadata must already describe a coherent splitfile layout. + * + *

The default constructor performs no validation and leaves all fields unset. Callers are + * responsible for filling each field before handing the instance to the builder. + * + * @see #initSegmentsAndKeys(SegmentsBuildContext) + */ + static final class SegmentsBuildContext { + /** Creates an empty context to be populated before initialization. */ + SegmentsBuildContext() {} + + /** Owning storage used for offsets, logging, and key registration. */ + SplitFileFetcherStorage parent; + + /** Target array that receives constructed segment storage instances. */ + SplitFileFetcherSegmentStorage[] segments; + + /** Segment key metadata aligned with the segment array. */ + SplitFileSegmentKeys[] segmentKeys; + + /** Splitfile metadata used to allocate cross-segment helpers. */ + Metadata metadata; + + /** Number of cross-check blocks per segment; used in sizing and offsets. */ + int crossCheckBlocks; + + /** Expected data blocks per segment as reported by metadata. */ + int blocksPerSegment; + + /** Expected check blocks per segment as reported by metadata. */ + int checkBlocksPerSegment; + + /** Original fetch context supplying policy limits for segment sizing. */ + FetchContext origFetchContext; + + /** Salting helper used when registering segment keys with the listener. */ + KeySalter salt; + + /** Optional helper for marking keys as being fetched locally. */ + KeysFetchingLocally keysFetching; + + /** Accumulated sizes for keys and status persisted in the storage layout. */ + SplitFileFetcherStorageLayout.AccumulatedSizes acc; + + /** Total stored data length in bytes for data blocks (excluding truncation). */ + long storedBlocksLength; + + /** Total stored cross-check length in bytes when truncation completes. */ + long storedCrossCheckBlocksLength; + + /** Whether the persisted layout uses truncation to mark completion. */ + boolean completeViaTruncation; + + /** Whether the storage is persistent and should write resume metadata. */ + boolean persistent; + + /** Whether a single splitfile crypto key is present for stored key lengths. */ + boolean hasSplitfileSingleCryptoKey; + + /** Length in bytes of the checksum appended to stored metadata. */ + int checksumLength; + } + + /** + * Initializes segment storage and registers keys for a new splitfile fetch. + * + *

The method walks each {@link SplitFileSegmentKeys} entry, calculates offsets for stored + * segment data and metadata, validates per-segment limits from the original fetch context, and + * then creates {@link SplitFileFetcherSegmentStorage} instances with the computed values. It also + * registers each segment key with the parent key listener so that later scheduling can rely on + * fully populated Bloom filters. + * + *

The caller must provide a fully-populated {@link SegmentsBuildContext}. The method assumes + * the segment arrays and metadata are consistent; violations surface through assertions or a + * {@link FetchException}. This initializer is not idempotent because it allocates storage helpers + * and registers keys each time it runs. + * + * @param ctx container holding metadata, arrays, offsets, and policy settings; must describe a + * consistent splitfile layout and remain non-null during the call. + * @return newly built segment and cross-segment storage helpers for the fetcher to use. + * @throws FetchException when segment sizes exceed fetch policy limits or metadata is invalid. + */ + static SplitFileFetcherSegmentsInit initSegmentsAndKeys(SegmentsBuildContext ctx) + throws FetchException { + long dataOffset = 0; + long crossCheckBlocksOffset = ctx.storedBlocksLength; // Only used if completeViaTruncation + long segmentKeysOffset = ctx.parent.offsetKeyList; + long segmentStatusOffset = ctx.parent.offsetSegmentStatus; + + for (int i = 0; i < ctx.segments.length; i++) { + SplitFileSegmentKeys keys = ctx.segmentKeys[i]; + final int dataBlocks = keys.getDataBlocks() - ctx.crossCheckBlocks; + final int checkBlocks = keys.getCheckBlocks(); + validateBlocksPerSegmentLimit(ctx.origFetchContext, dataBlocks, checkBlocks); + SplitFileFetcherSegmentStorage.InitParams p = new SplitFileFetcherSegmentStorage.InitParams(); + p.parent = ctx.parent; + p.segNumber = i; + p.dataBlocks = dataBlocks; + p.checkBlocks = checkBlocks; + p.crossCheckBlocks = ctx.crossCheckBlocks; + p.segmentDataOffset = dataOffset; + p.segmentCrossCheckDataOffset = ctx.completeViaTruncation ? crossCheckBlocksOffset : -1; + p.segmentKeysOffset = segmentKeysOffset; + p.segmentStatusOffset = segmentStatusOffset; + p.writeRetries = ctx.parent.maxRetries != -1; + p.keys = keys; + p.keysFetching = ctx.keysFetching; + ctx.segments[i] = new SplitFileFetcherSegmentStorage(p); + dataOffset += (long) dataBlocks * CHKBlock.DATA_LENGTH; + if (!ctx.completeViaTruncation) { + dataOffset += (long) ctx.crossCheckBlocks * CHKBlock.DATA_LENGTH; + } else { + crossCheckBlocksOffset += (long) ctx.crossCheckBlocks * CHKBlock.DATA_LENGTH; + } + segmentKeysOffset += + SplitFileFetcherSegmentStorage.storedKeysLength( + dataBlocks + ctx.crossCheckBlocks, + checkBlocks, + ctx.hasSplitfileSingleCryptoKey, + ctx.checksumLength); + segmentStatusOffset += + SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength( + dataBlocks, + checkBlocks, + ctx.crossCheckBlocks, + ctx.parent.maxRetries != -1, + ctx.checksumLength, + ctx.persistent); + for (int j = 0; j < (dataBlocks + ctx.crossCheckBlocks + checkBlocks); j++) { + ctx.parent.keyListener.addKey(keys.getKey(j, null, false).getNodeKey(false), i, ctx.salt); + } + debugSegmentOffsets(ctx.parent, i, ctx.segments[i]); + } + assert (dataOffset == ctx.storedBlocksLength); + assert !ctx.completeViaTruncation + || (crossCheckBlocksOffset == ctx.storedCrossCheckBlocksLength + ctx.storedBlocksLength); + assert (segmentKeysOffset + == ctx.storedBlocksLength + ctx.storedCrossCheckBlocksLength + ctx.acc.storedKeysLength()); + assert (segmentStatusOffset + == ctx.storedBlocksLength + + ctx.storedCrossCheckBlocksLength + + ctx.acc.storedKeysLength() + + ctx.acc.storedSegmentStatusLength()); + + // Lie about the required number of blocks. See the original inline comment for rationale. + int totalCrossCheckBlocks = ctx.segmentKeys.length * ctx.crossCheckBlocks; + ctx.parent.fetcher.setSplitfileBlocks( + ctx.acc.splitfileDataBlocks() + totalCrossCheckBlocks, ctx.acc.splitfileCheckBlocks()); + + ctx.parent.keyListener.finishedSetup(); + + SplitFileFetcherCrossSegmentStorage[] crossSegments = + SplitFileFetcherCrossSegmentAllocator.createCrossSegments( + ctx.parent, + ctx.metadata, + ctx.crossCheckBlocks, + ctx.blocksPerSegment, + ctx.segments, + ctx.parent.fecCodec); + return new SplitFileFetcherSegmentsInit(ctx.segments, crossSegments, null); + } + + /** + * Reconstructs segment storage from a persisted settings stream. + * + *

This method reads per-segment metadata from the supplied {@link DataInputStream}, computes + * the layout offsets expected by the persisted storage format, and instantiates each {@link + * SplitFileFetcherSegmentStorage} in the provided array. After all segments are loaded, it + * validates the accumulated block totals and reads any cross-segment metadata that follows in the + * stream, returning the resulting storage bundle. + * + *

The {@link SplitFileFetcherSegmentsLoadParams} input must reflect the persisted layout, + * including offsets and total counts. The method consumes the stream in order and is therefore + * not idempotent; callers should pass a stream positioned at the start of segment metadata. + * + * @param params bundle of storage, offsets, totals, and stream state used to reconstruct + * segments; must not be null and must reference the same splitfile layout as the persisted + * data. + * @return initialized segment and cross-segment storages plus the remaining stream position. + * @throws StorageFormatException when persisted, data is inconsistent with expected totals. + * @throws IOException when the underlying stream cannot be read. + */ + static SplitFileFetcherSegmentsInit initSegmentsFromStream( + SplitFileFetcherSegmentsLoadParams params) throws StorageFormatException, IOException { + SplitFileFetcherStorage parent = params.parent(); + int totalDataBlocks = params.totalDataBlocks(); + int totalCheckBlocks = params.totalCheckBlocks(); + int totalCrossCheckBlocks = params.totalCrossCheckBlocks(); + DataInputStream dis = params.dis(); + boolean completeViaTruncation = params.completeViaTruncation(); + KeysFetchingLocally keysFetching = params.keysFetching(); + SplitFileFetcherSegmentStorage[] segments = params.segments(); + int checksumLength = params.checksumLength(); + boolean hasSplitfileSingleCryptoKey = params.hasSplitfileSingleCryptoKey(); + long offsetKeyList = params.offsetKeyList(); + long offsetSegmentStatus = params.offsetSegmentStatus(); + long rafLength = params.rafLength(); + + long dataOffset = 0; + long crossCheckBlocksOffset = + completeViaTruncation ? (long) totalDataBlocks * CHKBlock.DATA_LENGTH : 0; + long segmentKeysOffset = offsetKeyList; + long segmentStatusOffset = offsetSegmentStatus; + int countDataBlocks = 0; + int countCheckBlocks = 0; + int countCrossCheckBlocks = 0; + for (int i = 0; i < segments.length; i++) { + SplitFileFetcherSegmentStorage.LoadParams lp = + new SplitFileFetcherSegmentStorage.LoadParams(); + lp.parent = parent; + lp.dis = dis; + lp.segNo = i; + lp.writeRetries = parent.maxRetries != -1; + lp.segmentDataOffset = dataOffset; + lp.segmentCrossCheckDataOffset = completeViaTruncation ? crossCheckBlocksOffset : -1; + lp.segmentKeysOffset = segmentKeysOffset; + lp.segmentStatusOffset = segmentStatusOffset; + lp.keysFetching = keysFetching; + segments[i] = new SplitFileFetcherSegmentStorage(lp); + int dataBlocks = segments[i].dataBlocks; + countDataBlocks += dataBlocks; + int checkBlocks = segments[i].checkBlocks; + countCheckBlocks += checkBlocks; + int crossCheckBlocks = segments[i].crossSegmentCheckBlocks; + countCrossCheckBlocks += crossCheckBlocks; + dataOffset += (long) dataBlocks * CHKBlock.DATA_LENGTH; + if (completeViaTruncation) + crossCheckBlocksOffset += (long) crossCheckBlocks * CHKBlock.DATA_LENGTH; + else dataOffset += (long) crossCheckBlocks * CHKBlock.DATA_LENGTH; + segmentKeysOffset += + SplitFileFetcherSegmentStorage.storedKeysLength( + dataBlocks + crossCheckBlocks, + checkBlocks, + hasSplitfileSingleCryptoKey, + checksumLength); + segmentStatusOffset += + SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength( + dataBlocks, + checkBlocks, + crossCheckBlocks, + parent.maxRetries != -1, + checksumLength, + true); + validateSegmentOffsets(dataOffset, segments[i], rafLength); + debugSegmentOffsets(parent, i, segments[i]); + } + validateTotals( + countDataBlocks, + totalDataBlocks, + countCheckBlocks, + totalCheckBlocks, + countCrossCheckBlocks, + totalCrossCheckBlocks); + + int crossSegmentsCount = dis.readInt(); + SplitFileFetcherCrossSegmentStorage[] crossSegmentsLocal = + (crossSegmentsCount == 0) + ? null + : new SplitFileFetcherCrossSegmentStorage[crossSegmentsCount]; + for (int i = 0; i < crossSegmentsCount; i++) { + // crossSegmentsLocal is non-null when crossSegmentsCount > 0 + crossSegmentsLocal[i] = new SplitFileFetcherCrossSegmentStorage(parent, i, dis); + } + return new SplitFileFetcherSegmentsInit(segments, crossSegmentsLocal, dis); + } + + /** + * Validates that counted blocks match the expected totals from persisted metadata. + * + * @param countDataBlocks number of data blocks tallied from loaded segments. + * @param totalDataBlocks expected total data blocks from the settings header. + * @param countCheckBlocks number of check blocks tallied from loaded segments. + * @param totalCheckBlocks expected total check blocks from the settings header. + * @param countCrossCheckBlocks number of cross-check blocks tallied from loaded segments. + * @param totalCrossCheckBlocks expected total cross-check blocks from the settings header. + * @throws StorageFormatException when any of the totals differ from the expected values. + */ + private static void validateTotals( + int countDataBlocks, + int totalDataBlocks, + int countCheckBlocks, + int totalCheckBlocks, + int countCrossCheckBlocks, + int totalCrossCheckBlocks) + throws StorageFormatException { + if (countDataBlocks != totalDataBlocks) + throw new StorageFormatException( + "Total data blocks " + countDataBlocks + " but expected " + totalDataBlocks); + if (countCheckBlocks != totalCheckBlocks) + throw new StorageFormatException( + "Total check blocks " + countCheckBlocks + " but expected " + totalCheckBlocks); + if (countCrossCheckBlocks != totalCrossCheckBlocks) + throw new StorageFormatException( + "Total cross-check blocks " + + countCrossCheckBlocks + + " but expected " + + totalCrossCheckBlocks); + } + + /** + * Logs computed offsets for a segment when debug logging is enabled. + * + * @param parent storage owner used for logging context; must not be null. + * @param index zero-based segment index being reported. + * @param segment segment whose data and cross-check offsets are logged. + */ + private static void debugSegmentOffsets( + SplitFileFetcherStorage parent, int index, SplitFileFetcherSegmentStorage segment) { + if (SplitFileFetcherStorage.LOG.isDebugEnabled()) { + SplitFileFetcherStorage.LOG.debug( + "Segment {}: data blocks offset {} cross-check blocks offset {} for segment {} of {}", + index, + segment.segmentBlockDataOffset, + segment.segmentCrossCheckBlockDataOffset, + index, + parent); + } + } + + /** + * Ensures the computed segment offsets do not extend past the backing file length. + * + * @param dataOffset next data offset after the segment's data blocks, in bytes. + * @param segment segment whose cross-check offset is validated. + * @param rafLength total length of the backing file, in bytes. + * @throws StorageFormatException when computed, offsets exceed the file length. + */ + private static void validateSegmentOffsets( + long dataOffset, SplitFileFetcherSegmentStorage segment, long rafLength) + throws StorageFormatException { + if (dataOffset > rafLength) + throw new StorageFormatException( + "Data offset past end of file " + dataOffset + " of " + rafLength); + if (segment.segmentCrossCheckBlockDataOffset > rafLength) + throw new StorageFormatException( + "Cross-check blocks offset past end of file " + + segment.segmentCrossCheckBlockDataOffset + + " of " + + rafLength); + } + + /** + * Validates per-segment data and check block counts against the original fetch policy. + * + * @param origFetchContext original fetch context providing block limit configuration. + * @param blocksPerSegment number of data blocks for the segment being constructed. + * @param checkBlocksPerSegment number of check blocks for the segment being constructed. + * @throws FetchException when either data or check block count exceeds configured limits. + */ + private static void validateBlocksPerSegmentLimit( + FetchContext origFetchContext, int blocksPerSegment, int checkBlocksPerSegment) + throws FetchException { + if ((blocksPerSegment > origFetchContext.getMaxDataBlocksPerSegment()) + || (checkBlocksPerSegment > origFetchContext.getMaxCheckBlocksPerSegment())) { + throw new FetchException( + FetchExceptionMode.TOO_MANY_BLOCKS_PER_SEGMENT, + "Too many blocks per segment: " + + blocksPerSegment + + " data, " + + checkBlocksPerSegment + + " check"); + } + } +} diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsInit.java b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsInit.java new file mode 100644 index 0000000000..2a9f7ce5c8 --- /dev/null +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsInit.java @@ -0,0 +1,52 @@ +package network.crypta.client.async; + +import java.io.DataInputStream; + +/** + * Holds the results of initializing splitfile segment and cross-segment storage. + * + *

This package-private container groups the arrays produced during segment setup with the + * remainder of the settings stream, if any. It is returned by {@link + * SplitFileFetcherSegmentsBuilder} to convey the initialized segment storage objects together with + * any cross-segment helpers and the stream position that follows them. Callers typically unpack the + * fields immediately and do not retain the instance beyond construction of {@link + * SplitFileFetcherStorage} state. + * + *

The instance is immutable but references mutable arrays and a stream, so it should be treated + * as a transient handoff object. The arrays are expected to be fully populated by the builder; when + * cross-segment storage is not present, {@code crossSegments} may be {@code null}. + * + *

+ * + * @see SplitFileFetcherSegmentsBuilder + */ +final class SplitFileFetcherSegmentsInit { + /** Segment storage instances constructed for the splitfile; never {@code null}. */ + final SplitFileFetcherSegmentStorage[] segments; + + /** Cross-segment storage instances or {@code null} when the format omits them. */ + final SplitFileFetcherCrossSegmentStorage[] crossSegments; + + /** Stream positioned after segment data, or {@code null} when none remains. */ + final DataInputStream remainingStream; + + /** + * Creates a new result holder for segment initialization. + * + * @param segments segment storage array populated by the builder; must not be {@code null}. + * @param crossSegments cross-segment storage array or {@code null} when absent. + * @param remainingStream stream positioned after segment parsing; may be {@code null}. + */ + SplitFileFetcherSegmentsInit( + SplitFileFetcherSegmentStorage[] segments, + SplitFileFetcherCrossSegmentStorage[] crossSegments, + DataInputStream remainingStream) { + this.segments = segments; + this.crossSegments = crossSegments; + this.remainingStream = remainingStream; + } +} diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsLoadParams.java b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsLoadParams.java new file mode 100644 index 0000000000..6565a464ca --- /dev/null +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsLoadParams.java @@ -0,0 +1,173 @@ +package network.crypta.client.async; + +import java.io.DataInputStream; +import java.util.Arrays; +import java.util.Objects; +import network.crypta.node.KeysFetchingLocally; +import org.jetbrains.annotations.NotNull; + +/** + * Bundles parameters needed to load splitfile segments from persisted storage. + * + *

This record captures the resume-time inputs that {@link SplitFileFetcherSegmentsBuilder} needs + * when rehydrating segment storage from a settings stream. Callers populate the full set of + * metadata, offsets, and sizing information once, then pass the record to the builder, which + * consumes the {@link DataInputStream} in-order and reconstructs per-segment offsets. + * + *

The record is immutable but refers to mutable collaborators such as the backing buffer and + * segment array. It does not perform validation and assumes the supplied values describe a coherent + * layout consistent with the persisted footer. Construct a fresh instance for each resume attempt + * and avoid reusing it across threads because the stream is stateful. + * + *

+ * + * @param parent owning storage that supplies layout constants and retry behavior. + * @param totalDataBlocks total number of data blocks across all segments; non-negative count. + * @param totalCheckBlocks total number of check blocks across all segments; non-negative count. + * @param totalCrossCheckBlocks total number of cross-check blocks across all segments; non-negative + * count. + * @param dis input stream positioned at the start of segment metadata; read in-order. + * @param completeViaTruncation whether completion is recorded via file truncation semantics. + * @param keysFetching optional helper for tracking locally fetched keys; may be {@code null}. + * @param segments mutable segment array to populate with reconstructed storage objects. + * @param checksumLength checksum length in bytes used for persisted checksummed blocks. + * @param hasSplitfileSingleCryptoKey whether a single splitfile crypto key is present. + * @param offsetKeyList byte offset of the persisted key list section in the storage layout. + * @param offsetSegmentStatus byte offset of the persisted segment status section. + * @param rafLength total length of the backing storage buffer in bytes. + * @see SplitFileFetcherSegmentsBuilder#initSegmentsFromStream(SplitFileFetcherSegmentsLoadParams) + */ +record SplitFileFetcherSegmentsLoadParams( + SplitFileFetcherStorage parent, + int totalDataBlocks, + int totalCheckBlocks, + int totalCrossCheckBlocks, + DataInputStream dis, + boolean completeViaTruncation, + KeysFetchingLocally keysFetching, + SplitFileFetcherSegmentStorage[] segments, + int checksumLength, + boolean hasSplitfileSingleCryptoKey, + long offsetKeyList, + long offsetSegmentStatus, + long rafLength) { + /** + * Compares this bundle to another, including array contents for segments. + * + *

The comparison uses reference equality for collaborators such as {@code parent} and {@code + * dis}, because they represent runtime services, while {@code segments} is compared by content to + * reflect the array's elements rather than its identity. + * + * @param other object to compare against; may be {@code null}. + * @return {@code true} when all scalar fields match and segment arrays contain equal entries. + */ + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other + instanceof + SplitFileFetcherSegmentsLoadParams( + var otherParent, + var otherTotalDataBlocks, + var otherTotalCheckBlocks, + var otherTotalCrossCheckBlocks, + var otherDis, + var otherCompleteViaTruncation, + var otherKeysFetching, + var otherSegments, + var otherChecksumLength, + var otherHasSplitfileSingleCryptoKey, + var otherOffsetKeyList, + var otherOffsetSegmentStatus, + var otherRafLength))) { + return false; + } + return completeViaTruncation == otherCompleteViaTruncation + && totalDataBlocks == otherTotalDataBlocks + && totalCheckBlocks == otherTotalCheckBlocks + && totalCrossCheckBlocks == otherTotalCrossCheckBlocks + && checksumLength == otherChecksumLength + && hasSplitfileSingleCryptoKey == otherHasSplitfileSingleCryptoKey + && offsetKeyList == otherOffsetKeyList + && offsetSegmentStatus == otherOffsetSegmentStatus + && rafLength == otherRafLength + && parent == otherParent + && dis == otherDis + && keysFetching == otherKeysFetching + && Arrays.equals(segments, otherSegments); + } + + /** + * Computes a hash code that reflects scalar fields and the segment array contents. + * + *

The segment array is hashed with {@link Arrays#hashCode(Object[])}, while the remaining + * components use {@link Objects#hash(Object...)} to preserve the record's usual behavior. + * + * @return hash code suitable for hash-based collections. + */ + @Override + public int hashCode() { + int result = + Objects.hash( + parent, + totalDataBlocks, + totalCheckBlocks, + totalCrossCheckBlocks, + dis, + completeViaTruncation, + keysFetching, + checksumLength, + hasSplitfileSingleCryptoKey, + offsetKeyList, + offsetSegmentStatus, + rafLength); + result = 31 * result + Arrays.hashCode(segments); + return result; + } + + /** + * Returns a human-readable string that includes the segment array contents. + * + *

The output mirrors the record component order, formatting the {@code segments} array with + * {@link Arrays#toString(Object[])} so that its elements are visible in logs and diagnostics. + * + * @return non-null textual representation of the parameter bundle. + */ + @Override + public @NotNull String toString() { + return "SplitFileFetcherSegmentsLoadParams[" + + "parent=" + + parent + + ", totalDataBlocks=" + + totalDataBlocks + + ", totalCheckBlocks=" + + totalCheckBlocks + + ", totalCrossCheckBlocks=" + + totalCrossCheckBlocks + + ", dis=" + + dis + + ", completeViaTruncation=" + + completeViaTruncation + + ", keysFetching=" + + keysFetching + + ", segments=" + + Arrays.toString(segments) + + ", checksumLength=" + + checksumLength + + ", hasSplitfileSingleCryptoKey=" + + hasSplitfileSingleCryptoKey + + ", offsetKeyList=" + + offsetKeyList + + ", offsetSegmentStatus=" + + offsetSegmentStatus + + ", rafLength=" + + rafLength + + "]"; + } +} diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStorage.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStorage.java index bac29e7a65..5a8bc5004e 100644 --- a/src/main/java/network/crypta/client/async/SplitFileFetcherStorage.java +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStorage.java @@ -23,7 +23,6 @@ import network.crypta.keys.CHKBlock; import network.crypta.keys.ClientKey; import network.crypta.keys.Key; -import network.crypta.node.KeysFetchingLocally; import network.crypta.node.SendableRequestItem; import network.crypta.node.SendableRequestItemKey; import network.crypta.support.MemoryLimitedJobRunner; @@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory; /** - * Maintains persistent and in-memory state for a splitfile fetch. + * Maintains a persistent and in-memory state for a splitfile fetch. * *

This storage binds a {@link SplitFileFetcher} to a single {@link LockableRandomAccessBuffer}, * keeping hot metadata in memory while persisting block data, key lists, and progress so the fetch @@ -49,8 +48,8 @@ * *

The layout aims to minimize seeks and maximize robustness while tolerating recovery work after * checksum failures. Callbacks into the fetcher run off-thread via job runners, and locking is - * shallow and taken last relative to segment locks. The storage itself is transient and recreated - * on startup; persisted sections carry enough state to rehydrate progress. + * shallow and taken last relatively to segment locks. The storage itself is transient and recreated + * on startup; persisted sections carry enough states to rehydrate progress. * *

On-disk sections include: * @@ -59,7 +58,7 @@ *

  • Segment key lists with per-segment checksums. *
  • Segment status records with retry and block flags. *
  • Main and per-segment Bloom filters for key lookup. - *
  • Original metadata/details and basic settings with a checksummed footer. + *
  • Original metadata/details and basic settings with a check-summed footer. * * * @see SplitFileFetcher @@ -68,7 +67,7 @@ * @author toad */ public class SplitFileFetcherStorage { - private static final Logger LOG = LoggerFactory.getLogger(SplitFileFetcherStorage.class); + static final Logger LOG = LoggerFactory.getLogger(SplitFileFetcherStorage.class); final SplitFileFetcherStorageCallback fetcher; @@ -79,9 +78,9 @@ public class SplitFileFetcherStorage { private final long rafLength; /** - * If true we will complete the download by truncating the file. The file was passed in at + * If true, we will complete the download by truncating the file. The file was passed in at * construction, and we are not responsible for freeing it. Once all segments have decoded and - * encoded we call onSuccess(), and we don't free the data. Also, if this is true, cross-check + * encoded, we call onSuccess(), and we don't free the data. Also, if this is true, cross-check * blocks will be kept on disk *AFTER* all the main data and check blocks for the whole file. */ final boolean completeViaTruncation; @@ -105,8 +104,8 @@ public class SplitFileFetcherStorage { * Forward error correction (FEC) codec configured for this splitfile. * *

    The codec is selected from the metadata's {@code SplitfileAlgorithm} and is used when - * reconstructing missing blocks during segment decode. It remains constant for the lifetime of a - * {@code SplitFileFetcherStorage} instance and is read-mostly; callers must treat it as + * reconstructing missing blocks during segment decoding. It remains constant for the lifetime of + * a {@code SplitFileFetcherStorage} instance and is read-mostly; callers must treat it as * immutable. Implementations may allocate native buffers or other resources inside the codec, so * it should be reused rather than recreated for every operation. */ @@ -117,7 +116,7 @@ public class SplitFileFetcherStorage { final MemoryLimitedJobRunner memoryLimitedJobRunner; /** - * Final length of the downloaded data. *BEFORE* decompression, filtering, etc. I.e. this is the + * Final length of the downloaded data. *BEFORE* decompression, filtering, etc. I.e., this is the * length of the data on disk, which will be written by the StreamGenerator. */ final long finalLength; @@ -136,11 +135,11 @@ public class SplitFileFetcherStorage { final List decompressors; /** - * False = Transient: We are using the RAF as scratch space, we only need to write the blocks, and + * False = Transient: We are using the RAF as scratch space, we only need to write the blocks and * the keys (if we don't keep them in memory). True = Persistent: It must be possible to resume - * after a node restart. Ideally we'd like to be able to recover the download in its entirety - * without needing any additional information, but at a minimum we want to be able to continue it - * while passing in the usual external arguments (FetchContext, parent, etc.). + * after a node restarting. Ideally, we'd like to be able to recover the download in its entirety + * without needing any additional information. However, at a minimum we want to be able to + * continue it while passing in the usual external arguments (FetchContext, parent, etc.). */ final boolean persistent; @@ -149,14 +148,12 @@ public class SplitFileFetcherStorage { private boolean cancelled; private boolean succeeded; - /** Errors. For now, this is not persisted. */ - private FailureCodeTracker errors; + /** Errors. For now, this has not persisted. */ + FailureCodeTracker errors; final int maxRetries; - /** - * Every cooldownTries attempts, a key will enter cooldown, and won't be re-tried for a period. - */ + /** Every cooldownTries attempts, a key will enter cooldown and won't be re-tried for a period. */ final int cooldownTries; /** Cooldown lasts this long for each key. */ @@ -172,11 +169,11 @@ public class SplitFileFetcherStorage { final RandomSource random; - // Metadata for the file i.e. stuff we need to be able to efficiently read/write it. + // Metadata for the file i.e., stuff we need to be able to efficiently read/write it. /** Offset to start of the key lists in bytes */ final long offsetKeyList; - /** Offset to start of the segment status's in bytes */ + /** Offset to start of the segment status in bytes */ final long offsetSegmentStatus; /** Offset to start of the general progress section */ @@ -195,7 +192,7 @@ public class SplitFileFetcherStorage { * Offset to start of the original details in bytes. "Original details" includes the URI to this * download (if available), the original URI for the whole download (if available), whether this * is the final fetch (it might be a metadata or container fetch), and data from the ultimate - * client, e.g. the Identifier, whether it is on the Global queue, the client name if it isn't + * client, e.g., the Identifier, whether it is on the Global queue, the client name if it isn't, * etc. */ final long offsetOriginalDetails; @@ -209,8 +206,8 @@ public class SplitFileFetcherStorage { /** Checksum implementation */ final ChecksumChecker checksumChecker; - private boolean hasCheckedDatastore; - private boolean dirtyGeneralProgress; + boolean hasCheckedDatastore; + boolean dirtyGeneralProgress; static final long HAS_CHECKED_DATASTORE_FLAG = 1; /** Fixed value posted at the end of the file (if plaintext!) */ @@ -223,7 +220,7 @@ public class SplitFileFetcherStorage { * List of segments we need to tryStartDecode() on because their metadata was corrupted on * startup. */ - private List segmentsToTryDecode; + List segmentsToTryDecode; /** * Create a new storage instance backed by a fresh on-disk layout. @@ -239,7 +236,7 @@ public class SplitFileFetcherStorage { * scheduling. Once all segments finish and postconditions are met, the fetcher calls {@link * #streamGenerator()} to materialize the final byte stream. * - * @param p immutable parameters including metadata, factories, and execution helpers; must be + * @param p immutable parameters, including metadata, factories, and execution helpers, must be * non-null. * @throws FetchException when policy validation fails before any network activity begins. * @throws MetadataParseException when metadata cannot describe a supported splitfile layout. @@ -287,8 +284,8 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) int checkBlocksPerSegment = p.metadata.getCheckBlocksPerSegment(); // Accumulate sizes and counts over segments. - AccumulatedSizes acc = - accumulateSizes( + SplitFileFetcherStorageLayout.AccumulatedSizes acc = + SplitFileFetcherStorageLayout.accumulateSizes( segmentKeys, crossCheckBlocks, splitfileSingleCryptoKey != null, @@ -301,15 +298,15 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) long storedCrossCheckBlocksLength; if (completeViaTruncation) { storedCrossCheckBlocksLength = (long) totalCrossCheckBlocks * CHKBlock.DATA_LENGTH; - storedBlocksLength = (long) acc.splitfileDataBlocks * CHKBlock.DATA_LENGTH; + storedBlocksLength = (long) acc.splitfileDataBlocks() * CHKBlock.DATA_LENGTH; } else { storedCrossCheckBlocksLength = 0; storedBlocksLength = - ((long) acc.splitfileDataBlocks + totalCrossCheckBlocks) * CHKBlock.DATA_LENGTH; + ((long) acc.splitfileDataBlocks() + totalCrossCheckBlocks) * CHKBlock.DATA_LENGTH; } int segmentCount = p.metadata.getSegmentCount(); - validateSegmentCount(segmentCount); + SplitFileFetcherStorageLayout.validateSegmentCount(segmentCount); CompatibilityMode minCompatMode = resolveAndReportCompatibility( @@ -319,7 +316,7 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) p.origFetchContext, blocksPerSegment, checkBlocksPerSegment, - acc.splitfileCheckBlocks); + acc.splitfileCheckBlocks()); if (LOG.isDebugEnabled()) { LOG.debug( @@ -329,15 +326,15 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) blocksPerSegment, checkBlocksPerSegment, segmentCount, - acc.splitfileDataBlocks, - acc.splitfileCheckBlocks); + acc.splitfileDataBlocks(), + acc.splitfileCheckBlocks()); } segments = new SplitFileFetcherSegmentStorage[segmentCount]; randomSegmentIterator = new RandomArrayIterator<>(segments); long checkLength = - (acc.splitfileDataBlocks - (long) segmentCount * crossCheckBlocks) * CHKBlock.DATA_LENGTH; - validateCheckLength(checkLength, finalLength); + (acc.splitfileDataBlocks() - (long) segmentCount * crossCheckBlocks) * CHKBlock.DATA_LENGTH; + SplitFileFetcherStorageLayout.validateCheckLength(checkLength, finalLength); byte[] localSalt = new byte[32]; random.nextBytes(localSalt); @@ -348,21 +345,21 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) this, false, localSalt, - acc.splitfileDataBlocks + totalCrossCheckBlocks + acc.splitfileCheckBlocks, + acc.splitfileDataBlocks() + totalCrossCheckBlocks + acc.splitfileCheckBlocks(), blocksPerSegment + checkBlocksPerSegment, segmentCount); finalMinCompatMode = minCompatMode; this.offsetKeyList = storedBlocksLength + storedCrossCheckBlocksLength; - this.offsetSegmentStatus = offsetKeyList + acc.storedKeysLength; + this.offsetSegmentStatus = offsetKeyList + acc.storedKeysLength(); byte[] generalProgress = SplitFileFetcherStoragePersistence.encodeGeneralProgress( checksumChecker, hasCheckedDatastore, errors); if (persistent) { - offsetGeneralProgress = offsetSegmentStatus + acc.storedSegmentStatusLength; + offsetGeneralProgress = offsetSegmentStatus + acc.storedSegmentStatusLength(); this.offsetMainBloomFilter = offsetGeneralProgress + generalProgress.length; this.offsetSegmentBloomFilters = offsetMainBloomFilter + keyListener.paddedMainBloomFilterSize(); @@ -375,7 +372,10 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) offsetSegmentBloomFilters = offsetOriginalMetadata = offsetSegmentStatus; } - SegmentsBuildContext segCtx = new SegmentsBuildContext(); + SplitFileFetcherSegmentsBuilder.SegmentsBuildContext segCtx = + new SplitFileFetcherSegmentsBuilder.SegmentsBuildContext(); + segCtx.parent = this; + segCtx.segments = segments; segCtx.metadata = p.metadata; segCtx.segmentKeys = segmentKeys; segCtx.crossCheckBlocks = crossCheckBlocks; @@ -387,7 +387,13 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) segCtx.acc = acc; segCtx.storedBlocksLength = storedBlocksLength; segCtx.storedCrossCheckBlocksLength = storedCrossCheckBlocksLength; - this.crossSegments = initSegmentsAndKeys(segCtx); + segCtx.completeViaTruncation = completeViaTruncation; + segCtx.persistent = persistent; + segCtx.hasSplitfileSingleCryptoKey = splitfileSingleCryptoKey != null; + segCtx.checksumLength = checksumLength; + SplitFileFetcherSegmentsInit segmentsInit = + SplitFileFetcherSegmentsBuilder.initSegmentsAndKeys(segCtx); + this.crossSegments = segmentsInit.crossSegments; // Prepare metadata buffers and compute final layout lengths/offsets (assign finals here). long totalLength; @@ -411,8 +417,8 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) // Now offsets are final, we can encode the basic settings which embed them. encodedBasicSettings = encodeBasicSettings( - acc.splitfileDataBlocks, - acc.splitfileCheckBlocks, + acc.splitfileDataBlocks(), + acc.splitfileCheckBlocks(), crossCheckBlocks * segments.length); totalLength = offsetBasicSettings + encodedBasicSettings.length + 4 + checksumLength + 4 + 4 + 2 + 8; @@ -427,7 +433,8 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) raf = SplitFileFetcherStorageRafFactory.createRafOrThrow( p.storageFile, totalLength, p.rafFactory, p.diskSpaceCheckingRAFFactory, random, LOG); - writeToRAF(segmentKeys, prepared, encodedBasicSettings, totalLength, generalProgress); + SplitFileFetcherStoragePersistenceWriter.writeToRaf( + this, segmentKeys, prepared, encodedBasicSettings, generalProgress, totalLength); if (LOG.isDebugEnabled()) LOG.debug("Fetching {} on {} for {}", p.thisKey, this, fetcher); initAsyncHelpers(); } @@ -437,11 +444,11 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageInitParams p) * *

    This constructor validates footer magic, checksums, and version, locates each logical * section, and rebuilds segment state and Bloom filters. It also reattaches asynchronous helpers - * and prepares any pending decode attempts when segment metadata indicates partial progress. + * and prepares any pending decoding attempts when segment metadata indicates partial progress. * Successful completion means the instance is ready for {@link #start(boolean)} with resume * semantics and will reflect on-disk progress accurately. * - * @param p resume parameters including existing buffer and runtime helpers; must be non-null. + * @param p resume parameters, including existing buffer and runtime helpers, must be non-null. * @throws IOException when the underlying buffer cannot be read or locked. * @throws StorageFormatException when checksums, version, or offsets are invalid. * @throws FetchException when resumed state violates fetch policy or limits. @@ -465,23 +472,9 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageResumeParams p) this.completeViaTruncation = p.completeViaTruncation; this.rafLength = p.raf.size(); - ensureMinLength(rafLength); - validateMagic(rafLength); - byte[] versionBuf = readVersionBytes(rafLength); - int version = parseInt(versionBuf); - if (version != VERSION) throw new StorageFormatException("Wrong version " + version); - byte[] checksumTypeBuf = readChecksumTypeBytes(rafLength); - validateChecksumType(checksumTypeBuf); - byte[] flagsBuf = readFlagsBytes(rafLength); - validateFlags(flagsBuf); - - BasicSettingsInfo basicInfo = - readBasicSettingsLocation(rafLength, flagsBuf, checksumTypeBuf, versionBuf); - byte[] basicSettingsBuffer = readChecksummed(basicInfo.offset, basicInfo.length); - ParsedBasicSettings parsed = - SplitFileFetcherStorageSettingsCodec.parseBasicSettings( - basicSettingsBuffer, basicInfo.offset, p.completeViaTruncation, rafLength); + SplitFileFetcherStorageResumeReader.readParsedSettings( + raf, checksumChecker, checksumLength, rafLength, p.completeViaTruncation); // Assign parsed values to final fields this.splitfileType = parsed.getSplitfileType(); @@ -505,322 +498,37 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageResumeParams p) // Allocate and assign segments array before constructing individual segments. this.segments = new SplitFileFetcherSegmentStorage[parsed.getSegmentCount()]; this.randomSegmentIterator = new RandomArrayIterator<>(segments); - SegmentsInit segmentsInit = - initSegmentsFromStream( - parsed.getTotalDataBlocks(), - parsed.getTotalCheckBlocks(), - parsed.getTotalCrossCheckBlocks(), - parsed.getSettingsStream(), - p.completeViaTruncation, - p.keysFetching, - this.segments); + SplitFileFetcherSegmentsInit segmentsInit = + SplitFileFetcherSegmentsBuilder.initSegmentsFromStream( + new SplitFileFetcherSegmentsLoadParams( + this, + parsed.getTotalDataBlocks(), + parsed.getTotalCheckBlocks(), + parsed.getTotalCrossCheckBlocks(), + parsed.getSettingsStream(), + p.completeViaTruncation, + p.keysFetching, + this.segments, + checksumLength, + splitfileSingleCryptoKey != null, + offsetKeyList, + offsetSegmentStatus, + rafLength)); this.crossSegments = segmentsInit.crossSegments; this.keyListener = new SplitFileFetcherKeyListener( this, fetcher, segmentsInit.remainingStream, false, p.newSalt); - postInitReadSegmentState(); - readGeneralProgress(); + SplitFileFetcherStorageRecovery recovery = new SplitFileFetcherStorageRecovery(this); + recovery.postInitReadSegmentState(); + recovery.readGeneralProgress(); initAsyncHelpers(); } - // ---------- Small helpers to reduce cognitive complexity in constructor ---------- - - private void ensureMinLength(long length) throws StorageFormatException { - if (length < 8) throw new StorageFormatException("Too short"); - } - - private void validateMagic(long length) throws IOException, StorageFormatException { - byte[] buf = new byte[8]; - raf.pread(length - 8, buf, 0, 8); - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); - if (dis.readLong() != END_MAGIC) throw new StorageFormatException("Wrong magic bytes"); - } - - private static int parseInt(byte[] buf) throws IOException { - return new DataInputStream(new ByteArrayInputStream(buf)).readInt(); - } - - private byte[] readVersionBytes(long length) throws IOException { - byte[] versionBuf = new byte[4]; - raf.pread(length - 12, versionBuf, 0, 4); - return versionBuf; - } - - private byte[] readChecksumTypeBytes(long length) throws IOException { - byte[] checksumTypeBuf = new byte[2]; - raf.pread(length - 14, checksumTypeBuf, 0, 2); - return checksumTypeBuf; - } - - private void validateChecksumType(byte[] checksumTypeBuf) - throws IOException, StorageFormatException { - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(checksumTypeBuf)); - int checksumType = dis.readShort(); - if (checksumType != ChecksumChecker.CHECKSUM_CRC) - throw new StorageFormatException("Unknown checksum type " + checksumType); - } - - private byte[] readFlagsBytes(long length) throws IOException { - byte[] flagsBuf = new byte[4]; - raf.pread(length - 18, flagsBuf, 0, 4); - return flagsBuf; - } - - private void validateFlags(byte[] flagsBuf) throws IOException, StorageFormatException { - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(flagsBuf)); - int flags = dis.readInt(); - if (flags != 0) throw new StorageFormatException("Unknown flags: " + flags); - } - - private BasicSettingsInfo readBasicSettingsLocation( - long length, byte[] flagsBuf, byte[] checksumTypeBuf, byte[] versionBuf) - throws IOException, StorageFormatException { - byte[] buf = new byte[14]; - raf.pread(length - (22 + checksumLength), buf, 0, 4); - byte[] checksum = new byte[checksumLength]; - raf.pread(length - (18 + checksumLength), checksum, 0, checksumLength); - System.arraycopy(flagsBuf, 0, buf, 4, 4); - System.arraycopy(checksumTypeBuf, 0, buf, 8, 2); - System.arraycopy(versionBuf, 0, buf, 10, 4); - if (!checksumChecker.checkChecksum(buf, 0, 14, checksum)) - throw new StorageFormatException("Checksum failed on basic settings length and version"); - int basicSettingsLength = parseInt(buf); - if (basicSettingsLength < 0 - || basicSettingsLength + 12 + 4 + checksumLength > raf.size() - || basicSettingsLength > 1024 * 1024) - throw new StorageFormatException("Bad basic settings length"); - long basicSettingsOffset = length - (18 + 4 + checksumLength * 2L + basicSettingsLength); - return new BasicSettingsInfo(basicSettingsOffset, basicSettingsLength); - } - - private byte[] readChecksummed(long offset, int length) - throws StorageFormatException, IOException { - byte[] basicSettingsBuffer = new byte[length]; - try { - preadChecksummed(offset, basicSettingsBuffer, 0, length); - } catch (ChecksumFailedException _) { - throw new StorageFormatException("Basic settings checksum invalid"); - } - return basicSettingsBuffer; - } - - private SegmentsInit initSegmentsFromStream( - int totalDataBlocks, - int totalCheckBlocks, - int totalCrossCheckBlocks, - DataInputStream dis, - boolean completeViaTruncation, - KeysFetchingLocally keysFetching, - SplitFileFetcherSegmentStorage[] segments) - throws StorageFormatException, IOException { - // segments array is provided and already assigned to this.segments - long dataOffset = 0; - long crossCheckBlocksOffset = - completeViaTruncation ? (long) totalDataBlocks * CHKBlock.DATA_LENGTH : 0; - long segmentKeysOffset = offsetKeyList; - long segmentStatusOffset = offsetSegmentStatus; - int countDataBlocks = 0; - int countCheckBlocks = 0; - int countCrossCheckBlocks = 0; - for (int i = 0; i < segments.length; i++) { - SplitFileFetcherSegmentStorage.LoadParams lp = - new SplitFileFetcherSegmentStorage.LoadParams(); - lp.parent = this; - lp.dis = dis; - lp.segNo = i; - lp.writeRetries = maxRetries != -1; - lp.segmentDataOffset = dataOffset; - lp.segmentCrossCheckDataOffset = completeViaTruncation ? crossCheckBlocksOffset : -1; - lp.segmentKeysOffset = segmentKeysOffset; - lp.segmentStatusOffset = segmentStatusOffset; - lp.keysFetching = keysFetching; - segments[i] = new SplitFileFetcherSegmentStorage(lp); - int dataBlocks = segments[i].dataBlocks; - countDataBlocks += dataBlocks; - int checkBlocks = segments[i].checkBlocks; - countCheckBlocks += checkBlocks; - int crossCheckBlocks = segments[i].crossSegmentCheckBlocks; - countCrossCheckBlocks += crossCheckBlocks; - dataOffset += (long) dataBlocks * CHKBlock.DATA_LENGTH; - if (completeViaTruncation) - crossCheckBlocksOffset += (long) crossCheckBlocks * CHKBlock.DATA_LENGTH; - else dataOffset += (long) crossCheckBlocks * CHKBlock.DATA_LENGTH; - segmentKeysOffset += - SplitFileFetcherSegmentStorage.storedKeysLength( - dataBlocks + crossCheckBlocks, - checkBlocks, - splitfileSingleCryptoKey != null, - checksumLength); - segmentStatusOffset += - SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength( - dataBlocks, checkBlocks, crossCheckBlocks, maxRetries != -1, checksumLength, true); - validateSegmentOffsets(dataOffset, segments[i]); - debugSegmentOffsets(i, segments[i]); - } - validateTotals( - countDataBlocks, - totalDataBlocks, - countCheckBlocks, - totalCheckBlocks, - countCrossCheckBlocks, - totalCrossCheckBlocks); - - int crossSegmentsCount = dis.readInt(); - SplitFileFetcherCrossSegmentStorage[] crossSegmentsLocal = - (crossSegmentsCount == 0) - ? null - : new SplitFileFetcherCrossSegmentStorage[crossSegmentsCount]; - for (int i = 0; i < crossSegmentsCount; i++) { - // crossSegmentsLocal is non-null when crossSegmentsCount > 0 - crossSegmentsLocal[i] = new SplitFileFetcherCrossSegmentStorage(this, i, dis); - } - return new SegmentsInit(segments, crossSegmentsLocal, dis); - } - - private static void validateTotals( - int countDataBlocks, - int totalDataBlocks, - int countCheckBlocks, - int totalCheckBlocks, - int countCrossCheckBlocks, - int totalCrossCheckBlocks) - throws StorageFormatException { - if (countDataBlocks != totalDataBlocks) - throw new StorageFormatException( - "Total data blocks " + countDataBlocks + " but expected " + totalDataBlocks); - if (countCheckBlocks != totalCheckBlocks) - throw new StorageFormatException( - "Total check blocks " + countCheckBlocks + " but expected " + totalCheckBlocks); - if (countCrossCheckBlocks != totalCrossCheckBlocks) - throw new StorageFormatException( - "Total cross-check blocks " - + countCrossCheckBlocks - + " but expected " - + totalCrossCheckBlocks); - } - - private void debugSegmentOffsets(int index, SplitFileFetcherSegmentStorage segment) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Segment {}: data blocks offset {} cross-check blocks offset {} for segment {} of {}", - index, - segment.segmentBlockDataOffset, - segment.segmentCrossCheckBlockDataOffset, - index, - this); - } - } - - private void validateSegmentOffsets(long dataOffset, SplitFileFetcherSegmentStorage segment) - throws StorageFormatException { - if (dataOffset > rafLength) - throw new StorageFormatException( - "Data offset past end of file " + dataOffset + " of " + rafLength); - if (segment.segmentCrossCheckBlockDataOffset > rafLength) - throw new StorageFormatException( - "Cross-check blocks offset past end of file " - + segment.segmentCrossCheckBlockDataOffset - + " of " - + rafLength); - } - - private void postInitReadSegmentState() - throws FetchException, IOException, StorageFormatException { - for (SplitFileFetcherSegmentStorage segment : segments) { - boolean needsDecode = determineIfSegmentNeedsDecode(segment); - if (needsDecode) queueSegmentForDecode(segment); - } - readAllSegmentKeys(); - checkCrossSegmentsIfAny(); - } - - private boolean determineIfSegmentNeedsDecode(SplitFileFetcherSegmentStorage segment) - throws FetchException, IOException, StorageFormatException { - boolean needsDecode = false; - try { - segment.readMetadata(); - if (segment.hasFailed()) { - raf.close(); - raf.free(); // Failed, so free it. - throw new FetchException(FetchExceptionMode.SPLITFILE_ERROR, errors); - } - } catch (ChecksumFailedException _) { - LOG.error("Progress for segment {} on {} corrupted.", segment.segNo, this); - needsDecode = true; - } - if (segment.needsDecode()) needsDecode = true; - return needsDecode; - } - - private void queueSegmentForDecode(SplitFileFetcherSegmentStorage segment) { - if (segmentsToTryDecode == null) segmentsToTryDecode = new ArrayList<>(); - segmentsToTryDecode.add(segment); - } - - private void readAllSegmentKeys() throws StorageFormatException, IOException { - for (SplitFileFetcherSegmentStorage segment : segments) { - try { - segment.readSegmentKeys(); - } catch (ChecksumFailedException _) { - throw new StorageFormatException("Keys corrupted"); - } - } - } - - private void checkCrossSegmentsIfAny() { - if (this.crossSegments == null) return; - for (SplitFileFetcherCrossSegmentStorage crossSegment : this.crossSegments) - // Must be after reading the metadata for the plain segments. - crossSegment.checkBlocks(); - } - - private static class BasicSettingsInfo { - final long offset; - final int length; - - BasicSettingsInfo(long offset, int length) { - this.offset = offset; - this.length = length; - } - } - - private static class SegmentsInit { - final SplitFileFetcherSegmentStorage[] segments; - final SplitFileFetcherCrossSegmentStorage[] crossSegments; - final DataInputStream remainingStream; - - SegmentsInit( - SplitFileFetcherSegmentStorage[] segments, - SplitFileFetcherCrossSegmentStorage[] crossSegments, - DataInputStream remainingStream) { - this.segments = segments; - this.crossSegments = crossSegments; - this.remainingStream = remainingStream; - } - } - - private void readGeneralProgress() throws IOException { - try { - byte[] buf = preadChecksummedWithLength(offsetGeneralProgress); - ByteArrayInputStream bais = new ByteArrayInputStream(buf); - DataInputStream dis = new DataInputStream(bais); - long flags = dis.readLong(); - if ((flags & HAS_CHECKED_DATASTORE_FLAG) != 0) hasCheckedDatastore = true; - errors = new FailureCodeTracker(false, dis); - dis.close(); - } catch (ChecksumFailedException | StorageFormatException e) { - LOG.error("Failed to read general progress: {}", String.valueOf(e)); - // Reset general progress - this.hasCheckedDatastore = false; - this.errors = new FailureCodeTracker(false); - } - } - /** * Start the storage layer and enqueue any required recovery work. * - *

    This method reattaches cross-segment helpers, schedules any deferred decode attempts, and + *

    This method reattaches cross-segment helpers, schedules any deferred decoding attempts, and * optionally notifies the fetcher of resume statistics. When key Bloom filters are missing, it * triggers asynchronous regeneration and returns {@code false} so the caller does not schedule * requests prematurely. It performs no network I/O but may queue background work that later @@ -832,9 +540,10 @@ private void readGeneralProgress() throws IOException { */ public boolean start(boolean resume) { if (resume) onResumeInit(); - restartCrossSegments(); - scheduleTryDecodeForBrokenSegments(); - if (keyListener.needsKeys()) return regenerateKeysAsync(); + SplitFileFetcherStorageRecovery recovery = new SplitFileFetcherStorageRecovery(this); + recovery.restartCrossSegments(); + recovery.scheduleTryDecodeForBrokenSegments(); + if (keyListener.needsKeys()) return recovery.regenerateKeysAsync(); return true; } @@ -855,80 +564,6 @@ private void onResumeInit() { fetcher.onResume(succeededBlocks, failedBlocks, clientMetadata, decompressedLength); } - private void restartCrossSegments() { - if (crossSegments == null) return; - for (SplitFileFetcherCrossSegmentStorage segment : crossSegments) { - segment.restart(); - } - } - - private void scheduleTryDecodeForBrokenSegments() { - if (segmentsToTryDecode == null) return; - List brokenSegments; - synchronized (SplitFileFetcherStorage.this) { - brokenSegments = segmentsToTryDecode; - segmentsToTryDecode = null; - } - if (brokenSegments == null) return; - for (SplitFileFetcherSegmentStorage segment : brokenSegments) { - segment.tryStartDecode(); - } - } - - private boolean regenerateKeysAsync() { - try { - this.jobRunner.queue( - _ -> { - regenerateKeysJob(); - return false; - }, - REGENERATE_KEYS_PRIORITY); - } catch (PersistenceDisabledException _) { - // Ignore. - } - return false; - } - - private void regenerateKeysJob() { - // Regenerating filters for this storage - LOG.error("Regenerating filters for {}", SplitFileFetcherStorage.this); - KeySalter salt = fetcher.getSalter(); - if (!addAllKeysFromSegments(salt)) return; - keyListener.addedAllKeys(); - writeBloomFiltersSafely(); - fetcher.restartedAfterDataCorruption(); - LOG.warn("Finished regenerating filters for {}", SplitFileFetcherStorage.this); - } - - private boolean addAllKeysFromSegments(KeySalter salt) { - for (int i = 0; i < segments.length; i++) { - SplitFileFetcherSegmentStorage segment = segments[i]; - try { - SplitFileSegmentKeys keys = segment.readSegmentKeys(); - for (int j = 0; j < keys.totalKeys(); j++) { - keyListener.addKey(keys.getKey(j, null, false).getNodeKey(false), i, salt); - } - } catch (IOException | ChecksumFailedException e) { - if (e instanceof IOException io) { - failOnDiskError(io); - } else { - failOnDiskError((ChecksumFailedException) e); - } - return false; - } - } - return true; - } - - private void writeBloomFiltersSafely() { - try { - keyListener.initialWriteSegmentBloomFilters(offsetSegmentBloomFilters); - keyListener.innerWriteMainBloomFilter(offsetMainBloomFilter); - } catch (IOException e) { - if (persistent) failOnDiskError(e); - } - } - OutputStream checksumOutputStream(OutputStream os) { return checksumChecker.checksumWriter(os); } @@ -939,107 +574,11 @@ private byte[] encodeBasicSettings( this, totalDataBlocks, totalCheckBlocks, totalCrossCheckBlocks); } - /** Container for accumulated sizing information computed from segment keys and configuration. */ - private static final class AccumulatedSizes { - final int splitfileDataBlocks; - final int splitfileCheckBlocks; - final long storedKeysLength; - final long storedSegmentStatusLength; - - AccumulatedSizes( - int splitfileDataBlocks, - int splitfileCheckBlocks, - long storedKeysLength, - long storedSegmentStatusLength) { - this.splitfileDataBlocks = splitfileDataBlocks; - this.splitfileCheckBlocks = splitfileCheckBlocks; - this.storedKeysLength = storedKeysLength; - this.storedSegmentStatusLength = storedSegmentStatusLength; - } - } - - /** Context needed to build segments and keys while computing offsets. */ - private static final class SegmentsBuildContext { - Metadata metadata; - SplitFileSegmentKeys[] segmentKeys; - int crossCheckBlocks; - int blocksPerSegment; - int checkBlocksPerSegment; - FetchContext origFetchContext; - KeySalter salt; - KeysFetchingLocally keysFetching; - AccumulatedSizes acc; - long storedBlocksLength; - long storedCrossCheckBlocksLength; - } - - private static AccumulatedSizes accumulateSizes( - SplitFileSegmentKeys[] segmentKeys, - int crossCheckBlocks, - boolean hasSplitfileSingleCryptoKey, - int checksumLength, - int maxRetries, - boolean persistent) { - int splitfileDataBlocks = 0; - int splitfileCheckBlocks = 0; - long storedKeysLength = 0; - long storedSegmentStatusLength = 0; - for (SplitFileSegmentKeys keys : segmentKeys) { - int dataBlocks = keys.getDataBlocks(); - int checkBlocks = keys.getCheckBlocks(); - splitfileDataBlocks += dataBlocks; - splitfileCheckBlocks += checkBlocks; - storedKeysLength += - SplitFileFetcherSegmentStorage.storedKeysLength( - dataBlocks, checkBlocks, hasSplitfileSingleCryptoKey, checksumLength); - storedSegmentStatusLength += - SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength( - dataBlocks - crossCheckBlocks, - checkBlocks, - crossCheckBlocks, - maxRetries != -1, - checksumLength, - persistent); - } - // Subtract cross-check blocks from data blocks to get the actual data blocks. - splitfileDataBlocks -= segmentKeys.length * crossCheckBlocks; - return new AccumulatedSizes( - splitfileDataBlocks, splitfileCheckBlocks, storedKeysLength, storedSegmentStatusLength); - } - - private void writeToRAF( - SplitFileSegmentKeys[] segmentKeys, - SplitFileFetcherStoragePersistence.PreparedMetadata prepared, - byte[] encodedBasicSettings, - long totalLength, - byte[] generalProgress) - throws IOException { - try (var _ = autoLockOpen()) { - for (int i = 0; i < segments.length; i++) { - SplitFileFetcherSegmentStorage segment = segments[i]; - segment.writeKeysWithChecksum(segmentKeys[i]); - } - if (persistent) { - for (SplitFileFetcherSegmentStorage segment : segments) segment.writeMetadata(); - raf.pwrite(offsetGeneralProgress, generalProgress, 0, generalProgress.length); - keyListener.innerWriteMainBloomFilter(offsetMainBloomFilter); - keyListener.initialWriteSegmentBloomFilters(offsetSegmentBloomFilters); - SplitFileFetcherStoragePersistence.writePersistentMetadata( - raf, - offsetOriginalMetadata, - prepared, - encodedBasicSettings, - checksumChecker, - totalLength); - } - } - } - - private AutoCloseableRafLock autoLockOpen() throws IOException { + AutoCloseableRafLock autoLockOpen() throws IOException { return new AutoCloseableRafLock(raf.lockOpen()); } - private static final class AutoCloseableRafLock implements AutoCloseable { + static final class AutoCloseableRafLock implements AutoCloseable { private final RAFLock lock; AutoCloseableRafLock(RAFLock lock) { @@ -1052,20 +591,6 @@ public void close() { } } - private static void validateCheckLength(long checkLength, long finalLength) - throws FetchException { - if (checkLength > finalLength && checkLength - finalLength > CHKBlock.DATA_LENGTH) - throw new FetchException( - FetchExceptionMode.INVALID_METADATA, - "Splitfile is " + checkLength + " bytes long but length is " + finalLength + " bytes"); - } - - private static void validateSegmentCount(int segmentCount) { - if (segmentCount <= 0) { - throw new AssertionError("A splitfile has to have at least one segment"); - } - } - private CompatibilityMode resolveAndReportCompatibility( Metadata metadata, boolean topDontCompress, @@ -1135,79 +660,6 @@ private static void validateBlocksPerSegmentLimit( } } - private SplitFileFetcherCrossSegmentStorage[] initSegmentsAndKeys(SegmentsBuildContext ctx) - throws FetchException { - long dataOffset = 0; - long crossCheckBlocksOffset = ctx.storedBlocksLength; // Only used if completeViaTruncation - long segmentKeysOffset = offsetKeyList; - long segmentStatusOffset = offsetSegmentStatus; - - for (int i = 0; i < segments.length; i++) { - SplitFileSegmentKeys keys = ctx.segmentKeys[i]; - final int dataBlocks = keys.getDataBlocks() - ctx.crossCheckBlocks; - final int checkBlocks = keys.getCheckBlocks(); - validateBlocksPerSegmentLimit(ctx.origFetchContext, dataBlocks, checkBlocks); - SplitFileFetcherSegmentStorage.InitParams p = new SplitFileFetcherSegmentStorage.InitParams(); - p.parent = this; - p.segNumber = i; - p.dataBlocks = dataBlocks; - p.checkBlocks = checkBlocks; - p.crossCheckBlocks = ctx.crossCheckBlocks; - p.segmentDataOffset = dataOffset; - p.segmentCrossCheckDataOffset = completeViaTruncation ? crossCheckBlocksOffset : -1; - p.segmentKeysOffset = segmentKeysOffset; - p.segmentStatusOffset = segmentStatusOffset; - p.writeRetries = maxRetries != -1; - p.keys = keys; - p.keysFetching = ctx.keysFetching; - segments[i] = new SplitFileFetcherSegmentStorage(p); - dataOffset += (long) dataBlocks * CHKBlock.DATA_LENGTH; - if (!completeViaTruncation) { - dataOffset += (long) ctx.crossCheckBlocks * CHKBlock.DATA_LENGTH; - } else { - crossCheckBlocksOffset += (long) ctx.crossCheckBlocks * CHKBlock.DATA_LENGTH; - } - segmentKeysOffset += - SplitFileFetcherSegmentStorage.storedKeysLength( - dataBlocks + ctx.crossCheckBlocks, - checkBlocks, - splitfileSingleCryptoKey != null, - checksumLength); - segmentStatusOffset += - SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength( - dataBlocks, - checkBlocks, - ctx.crossCheckBlocks, - maxRetries != -1, - checksumLength, - persistent); - for (int j = 0; j < (dataBlocks + ctx.crossCheckBlocks + checkBlocks); j++) { - keyListener.addKey(keys.getKey(j, null, false).getNodeKey(false), i, ctx.salt); - } - debugSegmentOffsets(i, segments[i]); - } - assert (dataOffset == ctx.storedBlocksLength); - assert !completeViaTruncation - || (crossCheckBlocksOffset == ctx.storedCrossCheckBlocksLength + ctx.storedBlocksLength); - assert (segmentKeysOffset - == ctx.storedBlocksLength + ctx.storedCrossCheckBlocksLength + ctx.acc.storedKeysLength); - assert (segmentStatusOffset - == ctx.storedBlocksLength - + ctx.storedCrossCheckBlocksLength - + ctx.acc.storedKeysLength - + ctx.acc.storedSegmentStatusLength); - - // Lie about the required number of blocks. See original inline comment for rationale. - int totalCrossCheckBlocks = ctx.segmentKeys.length * ctx.crossCheckBlocks; - fetcher.setSplitfileBlocks( - ctx.acc.splitfileDataBlocks + totalCrossCheckBlocks, ctx.acc.splitfileCheckBlocks); - - keyListener.finishedSetup(); - - return SplitFileFetcherCrossSegmentAllocator.createCrossSegments( - this, ctx.metadata, ctx.crossCheckBlocks, ctx.blocksPerSegment, segments, fecCodec); - } - /** * Return the priority class forwarded to the request scheduler. * @@ -1290,7 +742,7 @@ private boolean allSucceeded() { * demand; it is not thread-safe. */ public StreamGenerator streamGenerator() { - // Truncation optimization can be added in future if safe. + // Truncation optimization can be added in the future if safe. return new StreamGenerator() { @Override @@ -1319,7 +771,7 @@ private void writeAllSegmentsToStream(OutputStream os) { } // Matches NativeThread.PriorityLevel.LOW_PRIORITY.value + 1. - private static final int REGENERATE_KEYS_PRIORITY = 4; + static final int REGENERATE_KEYS_PRIORITY = 4; static final long LAZY_WRITE_METADATA_DELAY = 5L * 60 * 1000; private PersistentJob writeMetadataJob; @@ -1367,18 +819,18 @@ private void initAsyncHelpers() { private Runnable wrapLazyWriteMetadata; /** - * Schedule a best-effort metadata write. + * Schedule a best-effort metadata writing. * *

    When running in persistent mode, metadata changes are checkpointed asynchronously to reduce * contention and I/O overhead. Calls may coalesce when invoked in quick succession, so it is safe * to call this after each state change without creating extra disk churn. In non-persistent mode - * this method is a no-op. The write occurs off-thread and does not guarantee immediate + * this method is a no-op. The writing occurs off-thread and does not guarantee immediate * durability; callers that need a synchronous flush should use higher-level shutdown paths. */ public void lazyWriteMetadata() { if (!persistent) return; if (LAZY_WRITE_METADATA_DELAY > 0 && !isFinishing()) { - // The Runnable must be the same object for de-duplication. + // The Runnable must be the same object for deduplication. ticker.queueTimedJob( wrapLazyWriteMetadata, "Write metadata for splitfile", @@ -1416,7 +868,7 @@ private void closeOffThread() { jobRunner.queueNormalOrDrop( _ -> { // ATOMICITY/DURABILITY: This will run after the checkpoint after completion. - // So after restart, even if the checkpoint failed, we will be in a valid state. + // So after restarting, even if the checkpoint failed, we will be in a valid state. // This is why this is queue() not queueInternal(). close(); return true; @@ -1425,8 +877,8 @@ private void closeOffThread() { private void finishedEncoding() { FinishState state = computeFinishEncodingState(); - if (state.alreadyFinished) return; - if (state.lateCompletion) { + if (state.alreadyFinished()) return; + if (state.lateCompletion()) { if (allFinished() && !allSucceeded()) { fail(new FetchException(FetchExceptionMode.SPLITFILE_ERROR, errors)); } else { @@ -1435,21 +887,12 @@ private void finishedEncoding() { return; } } - if (state.waitingForFetcher) return; + if (state.waitingForFetcher()) return; closeOffThread(); } - private static final class FinishState { - final boolean alreadyFinished; - final boolean lateCompletion; - final boolean waitingForFetcher; - - FinishState(boolean alreadyFinished, boolean lateCompletion, boolean waitingForFetcher) { - this.alreadyFinished = alreadyFinished; - this.lateCompletion = lateCompletion; - this.waitingForFetcher = waitingForFetcher; - } - } + private record FinishState( + boolean alreadyFinished, boolean lateCompletion, boolean waitingForFetcher) {} private FinishState computeFinishEncodingState() { boolean alreadyFinished = false; @@ -1485,8 +928,8 @@ void close() { /** * Called when a segment has finished encoding. It is possible that it has simply restarted; it is - * not guaranteed to have encoded all blocks etc. But we still need the callback in case e.g. we - * are in the process of failing, and can't proceed until all the encode jobs have finished. + * not guaranteed to have encoded all blocks etc. But we still need the callback in case e.g., we + * are in the process of failing, and can't proceed until all the encoding jobs have finished. */ void finishedEncoding(SplitFileFetcherSegmentStorage segment) { if (LOG.isDebugEnabled()) @@ -1545,8 +988,8 @@ public void fail(final FetchException e) { * *

    This helper converts the condition into a consolidated {@link FetchException} and routes it * through {@link #fail(FetchException)} so shutdown happens consistently. It does not mutate the - * segment itself; segment state is expected to already reflect the terminal failure condition. - * Callers typically invoke this after a retry budget check fails. + * segment itself; the segment state is expected to already reflect the terminal failure + * condition. Callers typically invoke this after a retry budget check fails. * * @param segment segment that can no longer make progress, used only for logging. */ @@ -1601,7 +1044,7 @@ public void failOnDiskError(final ChecksumFailedException e) { * Count the number of not-yet-fetched keys across all segments. * *

    The value includes keys that are currently cooling down or temporarily skipped. It is - * computed from in-memory segment state and does not perform disk I/O. Use this for progress + * computed from the in-memory segment state and does not perform disk I/O. Use this for progress * estimation rather than exact completion criteria; failed blocks and delayed retries can cause * the count to oscillate. * @@ -1652,11 +1095,11 @@ public long countSendableKeys() { * Lightweight handle identifying a concrete block within a specific segment. * *

    Instances are stable identifiers used by schedulers and callbacks to correlate request - * outcomes with storage state. The handle is immutable and embeds the owning storage, the segment - * index, and the block index so it can be used as a map key or queue token without additional - * lookups. It is not a cryptographic key; instead it is a positional reference that can be - * resolved to a {@link ClientKey} through {@link SplitFileFetcherStorage#getKey}. Equality and - * hash semantics incorporate the storage instance to avoid accidental collisions across + * outcomes with the storage state. The handle is immutable and embeds the owning storage, the + * segment index, and the block index, so it can be used as a map key or queue token without + * additional lookups. It is not a cryptographic key; instead it is a positional reference that + * can be resolved to a {@link ClientKey} through {@link SplitFileFetcherStorage#getKey}. Equality + * and hash semantics incorporate the storage instance to avoid accidental collisions across * concurrent fetches. */ public static final class SplitFileFetcherStorageKey @@ -1669,8 +1112,8 @@ public static final class SplitFileFetcherStorageKey * values obtained from segment state or scheduling helpers. The resulting instance is immutable * and can be safely cached or used as a map key for the lifetime of the storage. * - * @param n zero-based block index within the segment, must be in range. - * @param segNo zero-based segment index for the owning storage, must be valid. + * @param n a zero-based block index within the segment must be in range. + * @param segNo a zero-based segment index for the owning storage must be valid. * @param storage owning storage instance used for lookups and equality; must be non-null. */ public SplitFileFetcherStorageKey(int n, int segNo, SplitFileFetcherStorage storage) { @@ -1773,7 +1216,7 @@ public String toString() { * *

    The selection uses a time‑varying seed to distribute requests and avoids segments currently * decoding or ineligible due to cooldown/retry constraints. The method synchronizes on the - * segment iterator to keep selection state consistent and may scan multiple segments before + * segment iterator to keep the selection state consistent and may scan multiple segments before * finding a candidate. Returns {@code null} when no key is presently sendable. * * @return a randomly selected {@link SplitFileFetcherStorageKey} or {@code null} when none is @@ -1801,7 +1244,7 @@ public SplitFileFetcherStorageKey chooseRandomKey() { return null; } - /** Cancel the download, stop all FEC decodes, and call close() off-thread when done. */ + /** Cancel the download, stop all FEC decoding, and call close() off-thread when done. */ void cancel() { synchronized (this) { cancelled = true; @@ -1815,10 +1258,11 @@ void cancel() { /** * Callback invoked after checking the local datastore for a candidate key. * - *

    When a check produces a definitive result the storage updates segment state and may schedule - * additional work. The method runs off-thread to avoid blocking request selection. It increments - * the appropriate failure counters and notifies each segment that datastore probing has completed - * so retries can proceed. If all segments are already finished, the callback is a no-op. + *

    When a check produces a definitive result, the storage updates segment state and may + * schedule additional work. The method runs off-thread to avoid blocking request selection. It + * increments the appropriate failure counters and notifies each segment that datastore probing + * has completed so retries can proceed. If all segments are already finished, the callback is a + * no-op. */ public void finishedCheckingDatastoreOnLocalRequest() { // At this point, all the blocks will have been processed. @@ -1841,7 +1285,7 @@ synchronized boolean isFinishing() { /** * Record a non-fatal failure for a specific block request. * - *

    The error is accumulated in the failure tracker and the key is made retryable according to + *

    The error is accumulated in the failure tracker, and the key is made retryable according to * the configured policy. Persistent sessions schedule a metadata checkpoint. Callers should only * invoke this for failures that may be retried; terminal failures should go through {@link * #fail(FetchException)} or {@link #failOnSegment(SplitFileFetcherSegmentStorage)}. @@ -1867,7 +1311,7 @@ public void onFailure(SplitFileFetcherStorageKey key, FetchException fe) { * *

    The lookup reads segment key data, so it may perform disk I/O and acquire the RAF lock. On * I/O failure the storage reports a disk error and returns {@code null}. The call does not change - * segment state. Callers should treat the returned {@link ClientKey} as read-only. + * the segment state. Callers should treat the returned {@link ClientKey} as read-only. * * @param key handle identifying a block, usually from {@link #chooseRandomKey()}. * @return client key for the block, or {@code null} on I/O failure. @@ -1899,9 +1343,9 @@ public int maxRetries() { * Notify the fetcher that a block has permanently failed. * *

    The notification is posted asynchronously to keep request-selection threads responsive. It - * does not modify storage state directly; callers should ensure segment bookkeeping has already - * transitioned the block into a terminal state before invoking this callback. Multiple calls may - * be coalesced by the fetcher. + * does not modify the storage state directly; callers should ensure segment bookkeeping has + * already transitioned the block into a terminal state before invoking this callback. The fetcher + * may coalesce multiple calls. */ public void failedBlock() { jobRunner.queueNormalOrDrop( @@ -1976,10 +1420,10 @@ void increaseCooldown(final long cooldownTime) { /** * Clear global cooldown when all segments have exited their individual cooldown windows. * - *

    This is safe to call frequently; it performs cheap checks and posts to the fetcher when the - * global flag changes. The method synchronizes on the cooldown lock and will reset the global - * wakeup time only when every segment reports a cleared cooldown. It does not schedule requests - * directly; that remains the fetcher's responsibility. + *

    This is safe to call frequently; it performs inexpensive checks and posts to the fetcher + * when the global flag changes. The method synchronizes on the cooldown lock and will reset the + * global wakeup time only when every segment reports a cleared cooldown. It does not schedule + * requests directly; that remains the fetcher's responsibility. */ public void maybeClearCooldown() { synchronized (cooldownLock) { diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStorageLayout.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageLayout.java new file mode 100644 index 0000000000..7dadc6485e --- /dev/null +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageLayout.java @@ -0,0 +1,140 @@ +package network.crypta.client.async; + +import network.crypta.client.FetchException; +import network.crypta.client.FetchException.FetchExceptionMode; +import network.crypta.keys.CHKBlock; + +/** + * Computes size totals and validates splitfile metadata for fetcher storage layouts. + * + *

    This package-private helper concentrates the calculations that derive persisted lengths from + * segment keys and configuration flags. It is intentionally stateless: every method is a pure + * computation or validation based on the provided arguments, making it safe to call repeatedly + * during planning and preflight validation. The layout math assumes that segment arrays already + * encode the final data/check block counts per segment, including any cross-check blocks that must + * be removed before reporting the user-visible data block total. + * + *

    Typical usage is to precompute storage sizes before allocating persistent files and to perform + * lightweight sanity checks on metadata lengths. The methods do not synchronize and do not retain + * references to inputs; callers are responsible for ensuring consistent inputs across segments. + * + *

    + */ +final class SplitFileFetcherStorageLayout { + /** Prevents instantiation; this type is a static utility holder for layout computations. */ + private SplitFileFetcherStorageLayout() {} + + /** + * Immutable totals derived from segment keys and storage configuration inputs. + * + *

    Each component reflects an aggregate across all segments and uses byte counts where noted. + * The record is intentionally small and value-like so that callers can pass it between planning + * and persistence steps without additional bookkeeping. + * + * @param splitfileDataBlocks total data blocks after cross-check blocks are excluded + * @param splitfileCheckBlocks total check blocks across all segments in the splitfile + * @param storedKeysLength total stored key bytes required for all segment key material + * @param storedSegmentStatusLength total stored segment status bytes including padding + */ + record AccumulatedSizes( + int splitfileDataBlocks, + int splitfileCheckBlocks, + long storedKeysLength, + long storedSegmentStatusLength) {} + + /** + * Aggregates per-segment sizes into a single storage summary for the splitfile. + * + *

    The calculation walks each segment entry, summing data and check blocks, stored key lengths, + * and stored segment status lengths. Cross-check blocks are counted while iterating, then removed + * once at the end so that the returned data block count reflects the effective payload size. The + * method is deterministic and has no side effects, so callers can reuse it to recompute totals + * when the configuration changes. + * + *

    {@code
    +   * SplitFileFetcherStorageLayout.AccumulatedSizes sizes =
    +   *     SplitFileFetcherStorageLayout.accumulateSizes(keys, crossChecks, singleKey, checksum, -1,
    +   *         persistent);
    +   * }
    + * + * @param segmentKeys non-null array of segment key descriptors to aggregate + * @param crossCheckBlocks non-negative cross-check blocks per segment to subtract from data + * @param hasSplitfileSingleCryptoKey true when all segments share one crypto key + * @param checksumLength checksum length in bytes used for persisted segment metadata + * @param maxRetries maximum retry count, or {@code -1} when unbounded or unset + * @param persistent true when lengths target persistent storage rather than transient buffers + * @return aggregate sizes for blocks, key storage, and segment status storage + */ + static AccumulatedSizes accumulateSizes( + SplitFileSegmentKeys[] segmentKeys, + int crossCheckBlocks, + boolean hasSplitfileSingleCryptoKey, + int checksumLength, + int maxRetries, + boolean persistent) { + int splitfileDataBlocks = 0; + int splitfileCheckBlocks = 0; + long storedKeysLength = 0; + long storedSegmentStatusLength = 0; + for (SplitFileSegmentKeys keys : segmentKeys) { + int dataBlocks = keys.getDataBlocks(); + int checkBlocks = keys.getCheckBlocks(); + splitfileDataBlocks += dataBlocks; + splitfileCheckBlocks += checkBlocks; + storedKeysLength += + SplitFileFetcherSegmentStorage.storedKeysLength( + dataBlocks, checkBlocks, hasSplitfileSingleCryptoKey, checksumLength); + storedSegmentStatusLength += + SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength( + dataBlocks - crossCheckBlocks, + checkBlocks, + crossCheckBlocks, + maxRetries != -1, + checksumLength, + persistent); + } + // Subtract cross-check blocks from data blocks to get the actual data blocks. + splitfileDataBlocks -= segmentKeys.length * crossCheckBlocks; + return new AccumulatedSizes( + splitfileDataBlocks, splitfileCheckBlocks, storedKeysLength, storedSegmentStatusLength); + } + + /** + * Validates that the stored check length is not materially larger than the final length. + * + *

    The comparison allows a small difference of up to one data block length to tolerate rounding + * effects during metadata assembly. When the excess exceeds that tolerance, a fetch exception is + * raised to prevent fetchers from trusting malformed metadata. The method performs only the + * length comparison and does not mutate state or record diagnostics. + * + * @param checkLength reported check-length value in bytes from splitfile metadata + * @param finalLength expected final length in bytes derived from segment information + * @throws FetchException when the check length exceeds the final length beyond tolerance + */ + static void validateCheckLength(long checkLength, long finalLength) throws FetchException { + if (checkLength > finalLength && checkLength - finalLength > CHKBlock.DATA_LENGTH) + throw new FetchException( + FetchExceptionMode.INVALID_METADATA, + "Splitfile is " + checkLength + " bytes long but length is " + finalLength + " bytes"); + } + + /** + * Ensures a splitfile describes at least one segment before processing begins. + * + *

    This is a defensive check used in assertion-heavy paths. It treats a non-positive segment + * count as a programmer error rather than user input and therefore throws an {@link + * AssertionError}. The method does not return a value and does not attempt to recover. + * + * @param segmentCount number of segments that the splitfile metadata declares + * @throws AssertionError when {@code segmentCount} is zero or negative + */ + static void validateSegmentCount(int segmentCount) { + if (segmentCount <= 0) { + throw new AssertionError("A splitfile has to have at least one segment"); + } + } +} diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStoragePersistenceWriter.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStoragePersistenceWriter.java new file mode 100644 index 0000000000..4f77e814d6 --- /dev/null +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStoragePersistenceWriter.java @@ -0,0 +1,80 @@ +package network.crypta.client.async; + +import java.io.IOException; + +/** + * Writes the initial persistent layout for splitfile fetcher storage. + * + *

    This helper centralizes creation-time disk writes and footer serialization so the storage + * orchestrator can focus on lifecycle and scheduling. It assumes the caller has already assembled + * segment keys, precomputed metadata, and encoded the settings payload. The implementation performs + * direct random-access writes and metadata serialization, so it must run before any fetch work that + * depends on the persistent structure. + * + *

    The class is a pure utility: it holds no state and is intended for one-time initialization of + * on-disk files. Calls are safe to repeat only when the caller manages file replacement or a clean + * initialization path. Concurrency control is delegated to the caller and the storage lock, so + * callers should avoid invoking it from multiple threads without external coordination. + * + *

    + */ +final class SplitFileFetcherStoragePersistenceWriter { + /** Prevents instantiation; this type exists only to host static write helpers. */ + private SplitFileFetcherStoragePersistenceWriter() {} + + /** + * Writes the initial persistent records and metadata for a splitfile fetcher. + * + *

    The method acquires the storage lock and then writes per-segment key material to the RAF. If + * the storage is persistent, it proceeds to flush segment metadata, general progress, Bloom + * filters, and the serialized metadata footer. The method assumes the arrays are aligned by + * segment index and does not perform validation beyond what called helpers enforce. + * + *

    {@code
    +   * SplitFileFetcherStoragePersistenceWriter.writeToRaf(
    +   *     storage, keys, prepared, encodedSettings, progress, totalLength);
    +   * }
    + * + * @param storage initialized fetcher storage providing RAF offsets and locking + * @param segmentKeys segment key array aligned to {@code storage.segments} ordering + * @param prepared precomputed persistent metadata to write into the footer + * @param encodedBasicSettings encoded basic settings payload, already checksummed + * @param generalProgress encoded progress bytes for general fetch state + * @param totalLength total splitfile length in bytes, used for metadata integrity + * @throws IOException when random-access writes or serialization to the RAF fail + */ + static void writeToRaf( + SplitFileFetcherStorage storage, + SplitFileSegmentKeys[] segmentKeys, + SplitFileFetcherStoragePersistence.PreparedMetadata prepared, + byte[] encodedBasicSettings, + byte[] generalProgress, + long totalLength) + throws IOException { + try (var _ = storage.autoLockOpen()) { + for (int i = 0; i < storage.segments.length; i++) { + SplitFileFetcherSegmentStorage segment = storage.segments[i]; + segment.writeKeysWithChecksum(segmentKeys[i]); + } + if (storage.persistent) { + for (SplitFileFetcherSegmentStorage segment : storage.segments) segment.writeMetadata(); + storage + .getRAF() + .pwrite(storage.offsetGeneralProgress, generalProgress, 0, generalProgress.length); + storage.keyListener.innerWriteMainBloomFilter(storage.offsetMainBloomFilter); + storage.keyListener.initialWriteSegmentBloomFilters(storage.offsetSegmentBloomFilters); + SplitFileFetcherStoragePersistence.writePersistentMetadata( + storage.getRAF(), + storage.offsetOriginalMetadata, + prepared, + encodedBasicSettings, + storage.checksumChecker, + totalLength); + } + } + } +} diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStorageRecovery.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageRecovery.java new file mode 100644 index 0000000000..6b76162e4f --- /dev/null +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageRecovery.java @@ -0,0 +1,307 @@ +package network.crypta.client.async; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import network.crypta.client.FailureCodeTracker; +import network.crypta.client.FetchException; +import network.crypta.client.FetchException.FetchExceptionMode; +import network.crypta.crypt.ChecksumFailedException; +import network.crypta.support.io.StorageFormatException; +import org.slf4j.Logger; + +/** + * Coordinates resume-time recovery steps for splitfile fetcher storage. + * + *

    This helper encapsulates the validation and regeneration work needed when reopening a + * persistent splitfile fetcher. It reads per-segment metadata, rebuilds in-memory progress state, + * and schedules background work for decoding or Bloom filter regeneration. The class is tightly + * coupled to {@link SplitFileFetcherStorage} and expects the storage to be initialized with valid + * offsets and segment arrays before any recovery method is invoked. + * + *

    The methods are not inherently thread-safe; callers should ensure serialized access using the + * storage's locking discipline. Recovery is designed to be idempotent as long as the underlying + * persistent data are stable, but the caller is responsible for avoiding concurrent mutation of the + * same storage. + * + *

    + */ +final class SplitFileFetcherStorageRecovery { + /** + * Backing storage whose metadata and progress state are being recovered. + * + *

    The reference is immutable and shared across recovery steps. Callers must ensure the storage + * remains valid for the duration of recovery because methods access RAF offsets, segment arrays, + * and listener hooks without additional null checks. + */ + private final SplitFileFetcherStorage storage; + + /** + * Creates a recovery helper for the provided storage instance. + * + *

    The storage is assumed to be initialized but not yet recovered; callers should construct the + * helper after the storage structure and offsets are ready and before any fetch work resumes. The + * helper retains the storage reference for all later recovery operations. + * + * @param storage initialized storage instance whose state will be recovered + */ + SplitFileFetcherStorageRecovery(SplitFileFetcherStorage storage) { + this.storage = storage; + } + + /** + * Reads per-segment metadata and enqueues segments that need decode attempts. + * + *

    This method walks every segment, reads its metadata, records decode requirements, then reads + * all segment keys, and verifies cross-segment blocks. It is intended to run once after opening + * persistent storage, before any decoding tasks are started. The method may close and free the + * RAF if a segment is marked failed, propagating a {@link FetchException} in that case. + * + * @throws FetchException when the stored metadata indicates a splitfile failure + * @throws IOException when reading segment metadata or keys fails + * @throws StorageFormatException when persisted, segment data is corrupt or inconsistent + */ + void postInitReadSegmentState() throws FetchException, IOException, StorageFormatException { + for (SplitFileFetcherSegmentStorage segment : storage.segments) { + boolean needsDecode = determineIfSegmentNeedsDecode(segment); + if (needsDecode) queueSegmentForDecode(segment); + } + readAllSegmentKeys(); + checkCrossSegmentsIfAny(); + } + + /** + * Reads the persisted general progress block and applies it to the storage state. + * + *

    The method fetches the check-summed progress payload, decodes flags and failure codes, and + * updates the in-memory state of the storage. If the payload is corrupted or fails checksum + * validation, the method logs an error and resets the progress fields to safe defaults so the + * fetcher can proceed without relying on the corrupted state. + * + * @throws IOException when the progress block cannot be read from storage + */ + void readGeneralProgress() throws IOException { + try { + byte[] buf = storage.preadChecksummedWithLength(storage.offsetGeneralProgress); + ByteArrayInputStream bais = new ByteArrayInputStream(buf); + DataInputStream dis = new DataInputStream(bais); + long flags = dis.readLong(); + if ((flags & SplitFileFetcherStorage.HAS_CHECKED_DATASTORE_FLAG) != 0) + storage.hasCheckedDatastore = true; + storage.errors = new FailureCodeTracker(false, dis); + dis.close(); + } catch (ChecksumFailedException | StorageFormatException e) { + SplitFileFetcherStorage.LOG.error("Failed to read general progress: {}", String.valueOf(e)); + // Reset general progress + storage.hasCheckedDatastore = false; + storage.errors = new FailureCodeTracker(false); + } + } + + /** + * Queues an asynchronous Bloom filter regeneration job when persistence is enabled. + * + *

    The job rebuilds key indices by scanning segment keys and then writes regenerated Bloom + * filters to disk. If persistence is disabled, the queue operation is ignored and the method + * returns {@code false}. This method does not wait for completion and always returns {@code + * false}, matching the job-runner callback signature used by the caller. + * + * @return always {@code false}; the regeneration work is scheduled asynchronously + */ + boolean regenerateKeysAsync() { + try { + storage.jobRunner.queue( + _ -> { + regenerateKeysJob(); + return false; + }, + SplitFileFetcherStorage.REGENERATE_KEYS_PRIORITY); + } catch (PersistenceDisabledException _) { + // Ignore. + } + return false; + } + + /** + * Restarts any cross-segment storage blocks that are present. + * + *

    The method is a no-op when the storage does not include cross-segments. When present, each + * cross-segment is restarted in sequence to rebuild any transient state needed for recovery. + */ + void restartCrossSegments() { + if (storage.crossSegments == null) return; + for (SplitFileFetcherCrossSegmentStorage segment : storage.crossSegments) { + segment.restart(); + } + } + + /** + * Schedules decode attempts for any segments marked as broken during recovery. + * + *

    The list is captured under the storage lock and then cleared, so each segment is only queued + * once. The method is safe to call even when no broken segments were recorded. + */ + void scheduleTryDecodeForBrokenSegments() { + List brokenSegments; + synchronized (storage) { + brokenSegments = storage.segmentsToTryDecode; + storage.segmentsToTryDecode = null; + } + if (brokenSegments == null) return; + for (SplitFileFetcherSegmentStorage segment : brokenSegments) { + segment.tryStartDecode(); + } + } + + /** + * Determines whether a segment needs to be queued for decoding based on its metadata. + * + *

    The method reads the segment metadata, reports splitfile failure as a {@link + * FetchException}, and treats checksum errors as corruption that requires a decoding attempt. It + * also checks the segment's own decode flag after metadata is read. The method does not mutate + * the decoding queue; callers decide how to record the result. + * + * @param segment segment whose metadata should be inspected for decoding requirements + * @return {@code true} when the segment should be scheduled for decoding attempts + * @throws FetchException when the segment reports a failed splitfile state + * @throws IOException when reading segment metadata fails due to I/O errors + * @throws StorageFormatException when persisted, metadata cannot be parsed + */ + private boolean determineIfSegmentNeedsDecode(SplitFileFetcherSegmentStorage segment) + throws FetchException, IOException, StorageFormatException { + boolean needsDecode = false; + try { + segment.readMetadata(); + if (segment.hasFailed()) { + storage.getRAF().close(); + storage.getRAF().free(); // Failed, so free it. + throw new FetchException(FetchExceptionMode.SPLITFILE_ERROR, storage.errors); + } + } catch (ChecksumFailedException _) { + SplitFileFetcherStorage.LOG.error( + "Progress for segment {} on {} corrupted.", segment.segNo, storage); + needsDecode = true; + } + if (segment.needsDecode()) needsDecode = true; + return needsDecode; + } + + /** + * Appends a segment to the decoding queue, initializing the queue if necessary. + * + *

    The queue is stored in {@link SplitFileFetcherStorage#segmentsToTryDecode} and may be null + * until the first decoding candidate is found. This method performs minimal work and does not + * synchronize; callers should coordinate access via the storage lock where required. + * + * @param segment segment that should be decoded when recovery completes + */ + private void queueSegmentForDecode(SplitFileFetcherSegmentStorage segment) { + if (storage.segmentsToTryDecode == null) storage.segmentsToTryDecode = new ArrayList<>(); + storage.segmentsToTryDecode.add(segment); + } + + /** + * Reads and validates all segment keys for the storage. + * + *

    The method iterates through each segment and loads its key list. If a checksum error occurs, + * the method fails fast with a {@link StorageFormatException} so the caller can treat the storage + * as corrupt and avoid using partial key data. + * + * @throws StorageFormatException when any segment keys fail checksum validation + * @throws IOException when key reads fail due to underlying I/O errors + */ + private void readAllSegmentKeys() throws StorageFormatException, IOException { + for (SplitFileFetcherSegmentStorage segment : storage.segments) { + try { + segment.readSegmentKeys(); + } catch (ChecksumFailedException _) { + throw new StorageFormatException("Keys corrupted"); + } + } + } + + /** + * Checks cross-segment blocks after segment metadata has been read. + * + *

    The method is a no-op when the storage has no cross-segment structures. The ordering matters + * because cross-segment validation depends on metadata from the plain segments. + */ + private void checkCrossSegmentsIfAny() { + if (storage.crossSegments == null) return; + for (SplitFileFetcherCrossSegmentStorage crossSegment : storage.crossSegments) + // Must be after reading the metadata for the plain segments. + crossSegment.checkBlocks(); + } + + /** + * Rebuilds Bloom filters by scanning all segment keys and writing them to storage. + * + *

    The job is executed by the storage job runner and logs its progress. It adds every key to + * the key listener, writes regenerated Bloom filters, and informs the fetcher that recovery + * completed after corruption. If key scanning fails, the job terminates early after reporting the + * disk error. + */ + private void regenerateKeysJob() { + Logger log = SplitFileFetcherStorage.LOG; + // Regenerating filters for this storage + log.error("Regenerating filters for {}", storage); + KeySalter salt = storage.fetcher.getSalter(); + if (!addAllKeysFromSegments(salt)) return; + storage.keyListener.addedAllKeys(); + writeBloomFiltersSafely(); + storage.fetcher.restartedAfterDataCorruption(); + log.warn("Finished regenerating filters for {}", storage); + } + + /** + * Adds all segment keys to the key listener, returning {@code false} on failure. + * + *

    Each segment's key list is read and converted to node keys before being forwarded to the key + * listener. I/O or checksum errors cause the storage to be marked failed and stop further + * processing so that callers do not rely on a partial key set. + * + * @param salt salter used to derive node keys during listener updates + * @return {@code true} when all segments are processed without I/O or checksum failures + */ + private boolean addAllKeysFromSegments(KeySalter salt) { + for (int i = 0; i < storage.segments.length; i++) { + SplitFileFetcherSegmentStorage segment = storage.segments[i]; + try { + SplitFileSegmentKeys keys = segment.readSegmentKeys(); + for (int j = 0; j < keys.totalKeys(); j++) { + storage.keyListener.addKey(keys.getKey(j, null, false).getNodeKey(false), i, salt); + } + } catch (IOException | ChecksumFailedException e) { + if (e instanceof IOException io) { + storage.failOnDiskError(io); + } else { + storage.failOnDiskError((ChecksumFailedException) e); + } + return false; + } + } + return true; + } + + /** + * Writes regenerated Bloom filters and handles I/O errors for persistent storage. + * + *

    The method writes segment and main Bloom filters. When persistence is enabled, I/O failures + * are reported to the storage failure handler; otherwise they are ignored because the storage is + * not durable. + */ + private void writeBloomFiltersSafely() { + try { + storage.keyListener.initialWriteSegmentBloomFilters(storage.offsetSegmentBloomFilters); + storage.keyListener.innerWriteMainBloomFilter(storage.offsetMainBloomFilter); + } catch (IOException e) { + if (storage.persistent) storage.failOnDiskError(e); + } + } +} diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStorageResumeReader.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageResumeReader.java new file mode 100644 index 0000000000..21f656089d --- /dev/null +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageResumeReader.java @@ -0,0 +1,317 @@ +package network.crypta.client.async; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import network.crypta.crypt.ChecksumChecker; +import network.crypta.crypt.ChecksumFailedException; +import network.crypta.support.api.LockableRandomAccessBuffer; +import network.crypta.support.io.StorageFormatException; + +/** + * Parses and validates persisted splitfile storage headers for resume. + * + *

    This reader verifies the footer magic, checksum type, flags, and version before parsing the + * basic settings block via {@link SplitFileFetcherStorageSettingsCodec}. It isolates byte-level + * parsing and checksum validation from the storage orchestrator so that callers can resume only + * from storage that passes structural checks. + * + *

    All methods are static and side-effect free aside from reading from the supplied buffer. The + * reader assumes the caller has already opened the storage and provided the correct checksum + * configuration. It does not synchronize on the buffer, so callers must ensure exclusive access if + * they read concurrently. + * + *

    + */ +final class SplitFileFetcherStorageResumeReader { + /** Prevents instantiation; all operations are exposed as static helpers. */ + private SplitFileFetcherStorageResumeReader() {} + + /** + * Reads persisted settings from the footer and validates versioning metadata. + * + *

    The method performs a fixed sequence of checks: minimum length, footer magic, version, + * checksum type, flags, and the check-summed basic settings length. It then reads the settings + * payload and parses it into a {@link ParsedBasicSettings} instance. The method is deterministic + * and does not alter the buffer contents. + * + *

    {@code
    +   * ParsedBasicSettings settings =
    +   *     SplitFileFetcherStorageResumeReader.readParsedSettings(
    +   *         raf, checker, checksumLength, raf.size(), completeViaTruncation);
    +   * }
    + * + * @param raf backing buffer containing the persisted splitfile storage + * @param checksumChecker checker used to validate the footer checksums and lengths + * @param checksumLength checksum length in bytes used by the storage format + * @param rafLength total length of the backing buffer in bytes + * @param completeViaTruncation true when truncation represents completion + * @return parsed settings derived from the persisted footer and basic settings + * @throws IOException when the underlying buffer cannot be read + * @throws StorageFormatException when the footer metadata or checksums are invalid + */ + static ParsedBasicSettings readParsedSettings( + LockableRandomAccessBuffer raf, + ChecksumChecker checksumChecker, + int checksumLength, + long rafLength, + boolean completeViaTruncation) + throws IOException, StorageFormatException { + + ensureMinLength(rafLength); + validateMagic(raf, rafLength); + byte[] versionBuf = readVersionBytes(raf, rafLength); + int parsedVersion = parseInt(versionBuf); + if (parsedVersion != SplitFileFetcherStorage.VERSION) + throw new StorageFormatException("Wrong version " + parsedVersion); + byte[] checksumTypeBuf = readChecksumTypeBytes(raf, rafLength); + validateChecksumType(checksumTypeBuf); + byte[] flagsBuf = readFlagsBytes(raf, rafLength); + validateFlags(flagsBuf); + + BasicSettingsInfo basicInfo = + readBasicSettingsLocation( + raf, rafLength, checksumChecker, checksumLength, flagsBuf, checksumTypeBuf, versionBuf); + byte[] basicSettingsBuffer = + readChecksummed(raf, checksumChecker, checksumLength, basicInfo.offset, basicInfo.length); + return SplitFileFetcherStorageSettingsCodec.parseBasicSettings( + basicSettingsBuffer, basicInfo.offset, completeViaTruncation, rafLength); + } + + /** + * Verifies the backing buffer is long enough to contain a footer magic value. + * + *

    The minimum length is eight bytes because the footer magic is a {@code long} stored at the + * end of the file. Shorter buffers cannot contain a valid footer and are rejected early. + * + * @param length total buffer length in bytes + * @throws StorageFormatException when the buffer is shorter than the footer magic size + */ + private static void ensureMinLength(long length) throws StorageFormatException { + if (length < 8) throw new StorageFormatException("Too short"); + } + + /** + * Reads and validates the footer magic from the end of the buffer. + * + *

    The method reads eight bytes from the end of the file and compares them to the expected + * magic constant. A mismatch indicates the storage is not a splitfile resume buffer. + * + * @param raf backing buffer containing the persisted splitfile storage + * @param length total length of the backing buffer in bytes + * @throws IOException when the buffer cannot be read + * @throws StorageFormatException when the footer magic does not match + */ + private static void validateMagic(LockableRandomAccessBuffer raf, long length) + throws IOException, StorageFormatException { + byte[] buf = new byte[8]; + raf.pread(length - 8, buf, 0, 8); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); + if (dis.readLong() != SplitFileFetcherStorage.END_MAGIC) + throw new StorageFormatException("Wrong magic bytes"); + } + + /** + * Parses a four-byte big-endian integer from the provided buffer. + * + * @param buf four-byte array containing the serialized integer value + * @return parsed integer value from the buffer + * @throws IOException when the buffer cannot be read as an integer + */ + private static int parseInt(byte[] buf) throws IOException { + return new DataInputStream(new ByteArrayInputStream(buf)).readInt(); + } + + /** + * Reads the serialized version value located just before the footer magic. + * + * @param raf backing buffer containing the persisted splitfile storage + * @param length total length of the backing buffer in bytes + * @return four-byte buffer containing the serialized version + * @throws IOException when the version bytes cannot be read + */ + private static byte[] readVersionBytes(LockableRandomAccessBuffer raf, long length) + throws IOException { + byte[] versionBuf = new byte[4]; + raf.pread(length - 12, versionBuf, 0, 4); + return versionBuf; + } + + /** + * Reads the checksum type bytes stored immediately before the version bytes. + * + * @param raf backing buffer containing the persisted splitfile storage + * @param length total length of the backing buffer in bytes + * @return two-byte buffer containing the serialized checksum type + * @throws IOException when the checksum type bytes cannot be read + */ + private static byte[] readChecksumTypeBytes(LockableRandomAccessBuffer raf, long length) + throws IOException { + byte[] checksumTypeBuf = new byte[2]; + raf.pread(length - 14, checksumTypeBuf, 0, 2); + return checksumTypeBuf; + } + + /** + * Validates that the checksum type matches the supported checksum implementation. + * + * @param checksumTypeBuf two-byte buffer containing the serialized checksum type + * @throws IOException when the checksum type bytes cannot be read + * @throws StorageFormatException when the checksum type is not recognized + */ + private static void validateChecksumType(byte[] checksumTypeBuf) + throws IOException, StorageFormatException { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(checksumTypeBuf)); + int checksumType = dis.readShort(); + if (checksumType != ChecksumChecker.CHECKSUM_CRC) + throw new StorageFormatException("Unknown checksum type " + checksumType); + } + + /** + * Reads the flags field stored before the checksum type and version information. + * + * @param raf backing buffer containing the persisted splitfile storage + * @param length total length of the backing buffer in bytes + * @return four-byte buffer containing the serialized flags value + * @throws IOException when the flags bytes cannot be read + */ + private static byte[] readFlagsBytes(LockableRandomAccessBuffer raf, long length) + throws IOException { + byte[] flagsBuf = new byte[4]; + raf.pread(length - 18, flagsBuf, 0, 4); + return flagsBuf; + } + + /** + * Validates the flags field for unknown bits or unsupported settings. + * + * @param flagsBuf four-byte buffer containing the serialized flags value + * @throws IOException when the flags buffer cannot be read + * @throws StorageFormatException when the flags contain unsupported values + */ + private static void validateFlags(byte[] flagsBuf) throws IOException, StorageFormatException { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(flagsBuf)); + int flags = dis.readInt(); + if (flags != 0) throw new StorageFormatException("Unknown flags: " + flags); + } + + /** + * Locates and validates the basic settings block near the end of the buffer. + * + *

    The method reads the check-summed length header, verifies its checksum, and enforces bounds + * to prevent unreasonable settings lengths. It then computes the offset where the settings block + * begins, returning both the offset and the length for later reads. + * + * @param raf backing buffer containing the persisted splitfile storage + * @param length total length of the backing buffer in bytes + * @param checksumChecker checker used to validate the basic settings length checksum + * @param checksumLength checksum length in bytes used by the storage format + * @param flagsBuf buffer containing the serialized flags value + * @param checksumTypeBuf buffer containing the serialized checksum type + * @param versionBuf buffer containing the serialized storage version + * @return offset and length describing the basic settings payload + * @throws IOException when buffer reads fail + * @throws StorageFormatException when the settings length header is invalid + */ + private static BasicSettingsInfo readBasicSettingsLocation( + LockableRandomAccessBuffer raf, + long length, + ChecksumChecker checksumChecker, + int checksumLength, + byte[] flagsBuf, + byte[] checksumTypeBuf, + byte[] versionBuf) + throws IOException, StorageFormatException { + byte[] buf = new byte[14]; + raf.pread(length - (22 + checksumLength), buf, 0, 4); + byte[] checksum = new byte[checksumLength]; + raf.pread(length - (18 + checksumLength), checksum, 0, checksumLength); + System.arraycopy(flagsBuf, 0, buf, 4, 4); + System.arraycopy(checksumTypeBuf, 0, buf, 8, 2); + System.arraycopy(versionBuf, 0, buf, 10, 4); + if (!checksumChecker.checkChecksum(buf, 0, 14, checksum)) + throw new StorageFormatException("Checksum failed on basic settings length and version"); + int basicSettingsLength = parseInt(buf); + if (basicSettingsLength < 0 + || basicSettingsLength + 12 + 4 + checksumLength > raf.size() + || basicSettingsLength > 1024 * 1024) + throw new StorageFormatException("Bad basic settings length"); + long basicSettingsOffset = length - (18 + 4 + checksumLength * 2L + basicSettingsLength); + return new BasicSettingsInfo(basicSettingsOffset, basicSettingsLength); + } + + /** + * Reads a check-summed byte range and converts checksum failures to storage format errors. + * + * @param raf backing buffer containing the persisted splitfile storage + * @param checksumChecker checker used to validate the checksum for the range + * @param checksumLength checksum length in bytes used by the storage format + * @param offset absolute offset where the data payload begins + * @param length number of bytes to read into the buffer + * @return byte array containing the validated payload + * @throws StorageFormatException when checksum validation fails + * @throws IOException when the buffer cannot be read + */ + private static byte[] readChecksummed( + LockableRandomAccessBuffer raf, + ChecksumChecker checksumChecker, + int checksumLength, + long offset, + int length) + throws StorageFormatException, IOException { + byte[] basicSettingsBuffer = new byte[length]; + try { + preadChecksummed(raf, checksumChecker, checksumLength, offset, basicSettingsBuffer, length); + } catch (ChecksumFailedException _) { + throw new StorageFormatException("Basic settings checksum invalid"); + } + return basicSettingsBuffer; + } + + /** + * Reads a range and validates the trailing checksum, zeroing the buffer on failure. + * + *

    The method reads {@code length} bytes at {@code fileOffset} followed by the checksum + * trailer. When the checksum does not match, the buffer is zeroed to prevent reuse of invalid + * data and a {@link ChecksumFailedException} is thrown. + * + * @param raf backing buffer containing the persisted splitfile storage + * @param checksumChecker checker used to validate the checksum for the range + * @param checksumLength checksum length in bytes used by the storage format + * @param fileOffset absolute offset where the data payload begins + * @param buf destination buffer to fill with the payload contents + * @param length number of bytes to read into the buffer + * @throws IOException when the buffer cannot be read + * @throws ChecksumFailedException when the checksum validation fails + */ + private static void preadChecksummed( + LockableRandomAccessBuffer raf, + ChecksumChecker checksumChecker, + int checksumLength, + long fileOffset, + byte[] buf, + int length) + throws IOException, ChecksumFailedException { + byte[] checksumBuf = new byte[checksumLength]; + raf.pread(fileOffset, buf, 0, length); + raf.pread(fileOffset + length, checksumBuf, 0, checksumLength); + if (!checksumChecker.checkChecksum(buf, 0, length, checksumBuf)) { + for (int i = 0; i < length; i++) { + buf[i] = 0; + } + throw new ChecksumFailedException(); + } + } + + /** + * Holds the offset and length for the basic settings payload near the file footer. + * + * @param offset absolute offset where the basic settings block begins + * @param length length in bytes of the basic settings payload + */ + private record BasicSettingsInfo(long offset, int length) {} +}