From 7587c8caf1c3299efe887f9e267e1711c48d34d9 Mon Sep 17 00:00:00 2001 From: Leumor <116955025+leumor@users.noreply.github.com> Date: Sat, 17 Jan 2026 19:19:43 +0000 Subject: [PATCH 1/2] refactor(storage): extract splitfile fetcher helpers Isolate layout math, segment init, resume parsing, and persistence writes into helper classes to reduce constructor complexity and make resume flow easier to follow. --- .../SplitFileFetcherSegmentsBuilder.java | 410 +++++++++ .../async/SplitFileFetcherSegmentsInit.java | 52 ++ .../SplitFileFetcherSegmentsLoadParams.java | 173 ++++ .../client/async/SplitFileFetcherStorage.java | 798 +++--------------- .../async/SplitFileFetcherStorageLayout.java | 140 +++ ...itFileFetcherStoragePersistenceWriter.java | 80 ++ .../SplitFileFetcherStorageRecovery.java | 307 +++++++ .../SplitFileFetcherStorageResumeReader.java | 317 +++++++ 8 files changed, 1600 insertions(+), 677 deletions(-) create mode 100644 src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsBuilder.java create mode 100644 src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsInit.java create mode 100644 src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsLoadParams.java create mode 100644 src/main/java/network/crypta/client/async/SplitFileFetcherStorageLayout.java create mode 100644 src/main/java/network/crypta/client/async/SplitFileFetcherStoragePersistenceWriter.java create mode 100644 src/main/java/network/crypta/client/async/SplitFileFetcherStorageRecovery.java create mode 100644 src/main/java/network/crypta/client/async/SplitFileFetcherStorageResumeReader.java diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsBuilder.java b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsBuilder.java new file mode 100644 index 0000000000..337e1fb16f --- /dev/null +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsBuilder.java @@ -0,0 +1,410 @@ +package network.crypta.client.async; + +import java.io.DataInputStream; +import java.io.IOException; +import network.crypta.client.FetchContext; +import network.crypta.client.FetchException; +import network.crypta.client.FetchException.FetchExceptionMode; +import network.crypta.client.Metadata; +import network.crypta.keys.CHKBlock; +import network.crypta.node.KeysFetchingLocally; +import network.crypta.support.io.StorageFormatException; + +/** + * Builds splitfile segment storage objects for new and resumed fetches. + * + *
This stateless helper centralizes the offset calculations and segment instantiation performed + * during splitfile storage setup. Callers assemble either a {@link SegmentsBuildContext} for the + * fresh-download path or a {@link SplitFileFetcherSegmentsLoadParams} bundle for the resume path + * and then invoke the corresponding initializer. Each initializer walks the segment list, computes + * per-segment offsets, validates counts against expected totals, and wires cross-segment helpers so + * that the surrounding {@link SplitFileFetcherStorage} can schedule work without duplicating layout + * math. + * + *
The builder does not retain a mutable state between calls; thread safety therefore depends on + * the provided storage, arrays, and key listeners. Inputs are expected to be consistent with the + * splitfile metadata and persisted layout, and the methods enforce this with validation and + * assertions rather than defensive copying. + * + *
Callers populate the fields before invoking {@link + * #initSegmentsAndKeys(SegmentsBuildContext)}. The instance is consumed as-is, so all arrays, + * offsets, and sizing metadata must already describe a coherent splitfile layout. + * + *
The default constructor performs no validation and leaves all fields unset. Callers are + * responsible for filling each field before handing the instance to the builder. + * + * @see #initSegmentsAndKeys(SegmentsBuildContext) + */ + static final class SegmentsBuildContext { + /** Creates an empty context to be populated before initialization. */ + SegmentsBuildContext() {} + + /** Owning storage used for offsets, logging, and key registration. */ + SplitFileFetcherStorage parent; + + /** Target array that receives constructed segment storage instances. */ + SplitFileFetcherSegmentStorage[] segments; + + /** Segment key metadata aligned with the segment array. */ + SplitFileSegmentKeys[] segmentKeys; + + /** Splitfile metadata used to allocate cross-segment helpers. */ + Metadata metadata; + + /** Number of cross-check blocks per segment; used in sizing and offsets. */ + int crossCheckBlocks; + + /** Expected data blocks per segment as reported by metadata. */ + int blocksPerSegment; + + /** Expected check blocks per segment as reported by metadata. */ + int checkBlocksPerSegment; + + /** Original fetch context supplying policy limits for segment sizing. */ + FetchContext origFetchContext; + + /** Salting helper used when registering segment keys with the listener. */ + KeySalter salt; + + /** Optional helper for marking keys as being fetched locally. */ + KeysFetchingLocally keysFetching; + + /** Accumulated sizes for keys and status persisted in the storage layout. */ + SplitFileFetcherStorageLayout.AccumulatedSizes acc; + + /** Total stored data length in bytes for data blocks (excluding truncation). */ + long storedBlocksLength; + + /** Total stored cross-check length in bytes when truncation completes. */ + long storedCrossCheckBlocksLength; + + /** Whether the persisted layout uses truncation to mark completion. */ + boolean completeViaTruncation; + + /** Whether the storage is persistent and should write resume metadata. */ + boolean persistent; + + /** Whether a single splitfile crypto key is present for stored key lengths. */ + boolean hasSplitfileSingleCryptoKey; + + /** Length in bytes of the checksum appended to stored metadata. */ + int checksumLength; + } + + /** + * Initializes segment storage and registers keys for a new splitfile fetch. + * + *
The method walks each {@link SplitFileSegmentKeys} entry, calculates offsets for stored + * segment data and metadata, validates per-segment limits from the original fetch context, and + * then creates {@link SplitFileFetcherSegmentStorage} instances with the computed values. It also + * registers each segment key with the parent key listener so that later scheduling can rely on + * fully populated Bloom filters. + * + *
The caller must provide a fully-populated {@link SegmentsBuildContext}. The method assumes + * the segment arrays and metadata are consistent; violations surface through assertions or a + * {@link FetchException}. This initializer is not idempotent because it allocates storage helpers + * and registers keys each time it runs. + * + * @param ctx container holding metadata, arrays, offsets, and policy settings; must describe a + * consistent splitfile layout and remain non-null during the call. + * @return newly built segment and cross-segment storage helpers for the fetcher to use. + * @throws FetchException when segment sizes exceed fetch policy limits or metadata is invalid. + */ + static SplitFileFetcherSegmentsInit initSegmentsAndKeys(SegmentsBuildContext ctx) + throws FetchException { + long dataOffset = 0; + long crossCheckBlocksOffset = ctx.storedBlocksLength; // Only used if completeViaTruncation + long segmentKeysOffset = ctx.parent.offsetKeyList; + long segmentStatusOffset = ctx.parent.offsetSegmentStatus; + + for (int i = 0; i < ctx.segments.length; i++) { + SplitFileSegmentKeys keys = ctx.segmentKeys[i]; + final int dataBlocks = keys.getDataBlocks() - ctx.crossCheckBlocks; + final int checkBlocks = keys.getCheckBlocks(); + validateBlocksPerSegmentLimit(ctx.origFetchContext, dataBlocks, checkBlocks); + SplitFileFetcherSegmentStorage.InitParams p = new SplitFileFetcherSegmentStorage.InitParams(); + p.parent = ctx.parent; + p.segNumber = i; + p.dataBlocks = dataBlocks; + p.checkBlocks = checkBlocks; + p.crossCheckBlocks = ctx.crossCheckBlocks; + p.segmentDataOffset = dataOffset; + p.segmentCrossCheckDataOffset = ctx.completeViaTruncation ? crossCheckBlocksOffset : -1; + p.segmentKeysOffset = segmentKeysOffset; + p.segmentStatusOffset = segmentStatusOffset; + p.writeRetries = ctx.parent.maxRetries != -1; + p.keys = keys; + p.keysFetching = ctx.keysFetching; + ctx.segments[i] = new SplitFileFetcherSegmentStorage(p); + dataOffset += (long) dataBlocks * CHKBlock.DATA_LENGTH; + if (!ctx.completeViaTruncation) { + dataOffset += (long) ctx.crossCheckBlocks * CHKBlock.DATA_LENGTH; + } else { + crossCheckBlocksOffset += (long) ctx.crossCheckBlocks * CHKBlock.DATA_LENGTH; + } + segmentKeysOffset += + SplitFileFetcherSegmentStorage.storedKeysLength( + dataBlocks + ctx.crossCheckBlocks, + checkBlocks, + ctx.hasSplitfileSingleCryptoKey, + ctx.checksumLength); + segmentStatusOffset += + SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength( + dataBlocks, + checkBlocks, + ctx.crossCheckBlocks, + ctx.parent.maxRetries != -1, + ctx.checksumLength, + ctx.persistent); + for (int j = 0; j < (dataBlocks + ctx.crossCheckBlocks + checkBlocks); j++) { + ctx.parent.keyListener.addKey(keys.getKey(j, null, false).getNodeKey(false), i, ctx.salt); + } + debugSegmentOffsets(ctx.parent, i, ctx.segments[i]); + } + assert (dataOffset == ctx.storedBlocksLength); + assert !ctx.completeViaTruncation + || (crossCheckBlocksOffset == ctx.storedCrossCheckBlocksLength + ctx.storedBlocksLength); + assert (segmentKeysOffset + == ctx.storedBlocksLength + ctx.storedCrossCheckBlocksLength + ctx.acc.storedKeysLength()); + assert (segmentStatusOffset + == ctx.storedBlocksLength + + ctx.storedCrossCheckBlocksLength + + ctx.acc.storedKeysLength() + + ctx.acc.storedSegmentStatusLength()); + + // Lie about the required number of blocks. See the original inline comment for rationale. + int totalCrossCheckBlocks = ctx.segmentKeys.length * ctx.crossCheckBlocks; + ctx.parent.fetcher.setSplitfileBlocks( + ctx.acc.splitfileDataBlocks() + totalCrossCheckBlocks, ctx.acc.splitfileCheckBlocks()); + + ctx.parent.keyListener.finishedSetup(); + + SplitFileFetcherCrossSegmentStorage[] crossSegments = + SplitFileFetcherCrossSegmentAllocator.createCrossSegments( + ctx.parent, + ctx.metadata, + ctx.crossCheckBlocks, + ctx.blocksPerSegment, + ctx.segments, + ctx.parent.fecCodec); + return new SplitFileFetcherSegmentsInit(ctx.segments, crossSegments, null); + } + + /** + * Reconstructs segment storage from a persisted settings stream. + * + *
This method reads per-segment metadata from the supplied {@link DataInputStream}, computes + * the layout offsets expected by the persisted storage format, and instantiates each {@link + * SplitFileFetcherSegmentStorage} in the provided array. After all segments are loaded, it + * validates the accumulated block totals and reads any cross-segment metadata that follows in the + * stream, returning the resulting storage bundle. + * + *
The {@link SplitFileFetcherSegmentsLoadParams} input must reflect the persisted layout, + * including offsets and total counts. The method consumes the stream in order and is therefore + * not idempotent; callers should pass a stream positioned at the start of segment metadata. + * + * @param params bundle of storage, offsets, totals, and stream state used to reconstruct + * segments; must not be null and must reference the same splitfile layout as the persisted + * data. + * @return initialized segment and cross-segment storages plus the remaining stream position. + * @throws StorageFormatException when persisted, data is inconsistent with expected totals. + * @throws IOException when the underlying stream cannot be read. + */ + static SplitFileFetcherSegmentsInit initSegmentsFromStream( + SplitFileFetcherSegmentsLoadParams params) throws StorageFormatException, IOException { + SplitFileFetcherStorage parent = params.parent(); + int totalDataBlocks = params.totalDataBlocks(); + int totalCheckBlocks = params.totalCheckBlocks(); + int totalCrossCheckBlocks = params.totalCrossCheckBlocks(); + DataInputStream dis = params.dis(); + boolean completeViaTruncation = params.completeViaTruncation(); + KeysFetchingLocally keysFetching = params.keysFetching(); + SplitFileFetcherSegmentStorage[] segments = params.segments(); + int checksumLength = params.checksumLength(); + boolean hasSplitfileSingleCryptoKey = params.hasSplitfileSingleCryptoKey(); + long offsetKeyList = params.offsetKeyList(); + long offsetSegmentStatus = params.offsetSegmentStatus(); + long rafLength = params.rafLength(); + + long dataOffset = 0; + long crossCheckBlocksOffset = + completeViaTruncation ? (long) totalDataBlocks * CHKBlock.DATA_LENGTH : 0; + long segmentKeysOffset = offsetKeyList; + long segmentStatusOffset = offsetSegmentStatus; + int countDataBlocks = 0; + int countCheckBlocks = 0; + int countCrossCheckBlocks = 0; + for (int i = 0; i < segments.length; i++) { + SplitFileFetcherSegmentStorage.LoadParams lp = + new SplitFileFetcherSegmentStorage.LoadParams(); + lp.parent = parent; + lp.dis = dis; + lp.segNo = i; + lp.writeRetries = parent.maxRetries != -1; + lp.segmentDataOffset = dataOffset; + lp.segmentCrossCheckDataOffset = completeViaTruncation ? crossCheckBlocksOffset : -1; + lp.segmentKeysOffset = segmentKeysOffset; + lp.segmentStatusOffset = segmentStatusOffset; + lp.keysFetching = keysFetching; + segments[i] = new SplitFileFetcherSegmentStorage(lp); + int dataBlocks = segments[i].dataBlocks; + countDataBlocks += dataBlocks; + int checkBlocks = segments[i].checkBlocks; + countCheckBlocks += checkBlocks; + int crossCheckBlocks = segments[i].crossSegmentCheckBlocks; + countCrossCheckBlocks += crossCheckBlocks; + dataOffset += (long) dataBlocks * CHKBlock.DATA_LENGTH; + if (completeViaTruncation) + crossCheckBlocksOffset += (long) crossCheckBlocks * CHKBlock.DATA_LENGTH; + else dataOffset += (long) crossCheckBlocks * CHKBlock.DATA_LENGTH; + segmentKeysOffset += + SplitFileFetcherSegmentStorage.storedKeysLength( + dataBlocks + crossCheckBlocks, + checkBlocks, + hasSplitfileSingleCryptoKey, + checksumLength); + segmentStatusOffset += + SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength( + dataBlocks, + checkBlocks, + crossCheckBlocks, + parent.maxRetries != -1, + checksumLength, + true); + validateSegmentOffsets(dataOffset, segments[i], rafLength); + debugSegmentOffsets(parent, i, segments[i]); + } + validateTotals( + countDataBlocks, + totalDataBlocks, + countCheckBlocks, + totalCheckBlocks, + countCrossCheckBlocks, + totalCrossCheckBlocks); + + int crossSegmentsCount = dis.readInt(); + SplitFileFetcherCrossSegmentStorage[] crossSegmentsLocal = + (crossSegmentsCount == 0) + ? null + : new SplitFileFetcherCrossSegmentStorage[crossSegmentsCount]; + for (int i = 0; i < crossSegmentsCount; i++) { + // crossSegmentsLocal is non-null when crossSegmentsCount > 0 + crossSegmentsLocal[i] = new SplitFileFetcherCrossSegmentStorage(parent, i, dis); + } + return new SplitFileFetcherSegmentsInit(segments, crossSegmentsLocal, dis); + } + + /** + * Validates that counted blocks match the expected totals from persisted metadata. + * + * @param countDataBlocks number of data blocks tallied from loaded segments. + * @param totalDataBlocks expected total data blocks from the settings header. + * @param countCheckBlocks number of check blocks tallied from loaded segments. + * @param totalCheckBlocks expected total check blocks from the settings header. + * @param countCrossCheckBlocks number of cross-check blocks tallied from loaded segments. + * @param totalCrossCheckBlocks expected total cross-check blocks from the settings header. + * @throws StorageFormatException when any of the totals differ from the expected values. + */ + private static void validateTotals( + int countDataBlocks, + int totalDataBlocks, + int countCheckBlocks, + int totalCheckBlocks, + int countCrossCheckBlocks, + int totalCrossCheckBlocks) + throws StorageFormatException { + if (countDataBlocks != totalDataBlocks) + throw new StorageFormatException( + "Total data blocks " + countDataBlocks + " but expected " + totalDataBlocks); + if (countCheckBlocks != totalCheckBlocks) + throw new StorageFormatException( + "Total check blocks " + countCheckBlocks + " but expected " + totalCheckBlocks); + if (countCrossCheckBlocks != totalCrossCheckBlocks) + throw new StorageFormatException( + "Total cross-check blocks " + + countCrossCheckBlocks + + " but expected " + + totalCrossCheckBlocks); + } + + /** + * Logs computed offsets for a segment when debug logging is enabled. + * + * @param parent storage owner used for logging context; must not be null. + * @param index zero-based segment index being reported. + * @param segment segment whose data and cross-check offsets are logged. + */ + private static void debugSegmentOffsets( + SplitFileFetcherStorage parent, int index, SplitFileFetcherSegmentStorage segment) { + if (SplitFileFetcherStorage.LOG.isDebugEnabled()) { + SplitFileFetcherStorage.LOG.debug( + "Segment {}: data blocks offset {} cross-check blocks offset {} for segment {} of {}", + index, + segment.segmentBlockDataOffset, + segment.segmentCrossCheckBlockDataOffset, + index, + parent); + } + } + + /** + * Ensures the computed segment offsets do not extend past the backing file length. + * + * @param dataOffset next data offset after the segment's data blocks, in bytes. + * @param segment segment whose cross-check offset is validated. + * @param rafLength total length of the backing file, in bytes. + * @throws StorageFormatException when computed, offsets exceed the file length. + */ + private static void validateSegmentOffsets( + long dataOffset, SplitFileFetcherSegmentStorage segment, long rafLength) + throws StorageFormatException { + if (dataOffset > rafLength) + throw new StorageFormatException( + "Data offset past end of file " + dataOffset + " of " + rafLength); + if (segment.segmentCrossCheckBlockDataOffset > rafLength) + throw new StorageFormatException( + "Cross-check blocks offset past end of file " + + segment.segmentCrossCheckBlockDataOffset + + " of " + + rafLength); + } + + /** + * Validates per-segment data and check block counts against the original fetch policy. + * + * @param origFetchContext original fetch context providing block limit configuration. + * @param blocksPerSegment number of data blocks for the segment being constructed. + * @param checkBlocksPerSegment number of check blocks for the segment being constructed. + * @throws FetchException when either data or check block count exceeds configured limits. + */ + private static void validateBlocksPerSegmentLimit( + FetchContext origFetchContext, int blocksPerSegment, int checkBlocksPerSegment) + throws FetchException { + if ((blocksPerSegment > origFetchContext.getMaxDataBlocksPerSegment()) + || (checkBlocksPerSegment > origFetchContext.getMaxCheckBlocksPerSegment())) { + throw new FetchException( + FetchExceptionMode.TOO_MANY_BLOCKS_PER_SEGMENT, + "Too many blocks per segment: " + + blocksPerSegment + + " data, " + + checkBlocksPerSegment + + " check"); + } + } +} diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsInit.java b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsInit.java new file mode 100644 index 0000000000..2a9f7ce5c8 --- /dev/null +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherSegmentsInit.java @@ -0,0 +1,52 @@ +package network.crypta.client.async; + +import java.io.DataInputStream; + +/** + * Holds the results of initializing splitfile segment and cross-segment storage. + * + *
This package-private container groups the arrays produced during segment setup with the + * remainder of the settings stream, if any. It is returned by {@link + * SplitFileFetcherSegmentsBuilder} to convey the initialized segment storage objects together with + * any cross-segment helpers and the stream position that follows them. Callers typically unpack the + * fields immediately and do not retain the instance beyond construction of {@link + * SplitFileFetcherStorage} state. + * + *
The instance is immutable but references mutable arrays and a stream, so it should be treated + * as a transient handoff object. The arrays are expected to be fully populated by the builder; when + * cross-segment storage is not present, {@code crossSegments} may be {@code null}. + * + *
This record captures the resume-time inputs that {@link SplitFileFetcherSegmentsBuilder} needs + * when rehydrating segment storage from a settings stream. Callers populate the full set of + * metadata, offsets, and sizing information once, then pass the record to the builder, which + * consumes the {@link DataInputStream} in-order and reconstructs per-segment offsets. + * + *
The record is immutable but refers to mutable collaborators such as the backing buffer and + * segment array. It does not perform validation and assumes the supplied values describe a coherent + * layout consistent with the persisted footer. Construct a fresh instance for each resume attempt + * and avoid reusing it across threads because the stream is stateful. + * + *
The comparison uses reference equality for collaborators such as {@code parent} and {@code + * dis}, because they represent runtime services, while {@code segments} is compared by content to + * reflect the array's elements rather than its identity. + * + * @param other object to compare against; may be {@code null}. + * @return {@code true} when all scalar fields match and segment arrays contain equal entries. + */ + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other + instanceof + SplitFileFetcherSegmentsLoadParams( + var otherParent, + var otherTotalDataBlocks, + var otherTotalCheckBlocks, + var otherTotalCrossCheckBlocks, + var otherDis, + var otherCompleteViaTruncation, + var otherKeysFetching, + var otherSegments, + var otherChecksumLength, + var otherHasSplitfileSingleCryptoKey, + var otherOffsetKeyList, + var otherOffsetSegmentStatus, + var otherRafLength))) { + return false; + } + return completeViaTruncation == otherCompleteViaTruncation + && totalDataBlocks == otherTotalDataBlocks + && totalCheckBlocks == otherTotalCheckBlocks + && totalCrossCheckBlocks == otherTotalCrossCheckBlocks + && checksumLength == otherChecksumLength + && hasSplitfileSingleCryptoKey == otherHasSplitfileSingleCryptoKey + && offsetKeyList == otherOffsetKeyList + && offsetSegmentStatus == otherOffsetSegmentStatus + && rafLength == otherRafLength + && parent == otherParent + && dis == otherDis + && keysFetching == otherKeysFetching + && Arrays.equals(segments, otherSegments); + } + + /** + * Computes a hash code that reflects scalar fields and the segment array contents. + * + *
The segment array is hashed with {@link Arrays#hashCode(Object[])}, while the remaining + * components use {@link Objects#hash(Object...)} to preserve the record's usual behavior. + * + * @return hash code suitable for hash-based collections. + */ + @Override + public int hashCode() { + int result = + Objects.hash( + parent, + totalDataBlocks, + totalCheckBlocks, + totalCrossCheckBlocks, + dis, + completeViaTruncation, + keysFetching, + checksumLength, + hasSplitfileSingleCryptoKey, + offsetKeyList, + offsetSegmentStatus, + rafLength); + result = 31 * result + Arrays.hashCode(segments); + return result; + } + + /** + * Returns a human-readable string that includes the segment array contents. + * + *
The output mirrors the record component order, formatting the {@code segments} array with + * {@link Arrays#toString(Object[])} so that its elements are visible in logs and diagnostics. + * + * @return non-null textual representation of the parameter bundle. + */ + @Override + public @NotNull String toString() { + return "SplitFileFetcherSegmentsLoadParams[" + + "parent=" + + parent + + ", totalDataBlocks=" + + totalDataBlocks + + ", totalCheckBlocks=" + + totalCheckBlocks + + ", totalCrossCheckBlocks=" + + totalCrossCheckBlocks + + ", dis=" + + dis + + ", completeViaTruncation=" + + completeViaTruncation + + ", keysFetching=" + + keysFetching + + ", segments=" + + Arrays.toString(segments) + + ", checksumLength=" + + checksumLength + + ", hasSplitfileSingleCryptoKey=" + + hasSplitfileSingleCryptoKey + + ", offsetKeyList=" + + offsetKeyList + + ", offsetSegmentStatus=" + + offsetSegmentStatus + + ", rafLength=" + + rafLength + + "]"; + } +} diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStorage.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStorage.java index bac29e7a65..5a8bc5004e 100644 --- a/src/main/java/network/crypta/client/async/SplitFileFetcherStorage.java +++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStorage.java @@ -23,7 +23,6 @@ import network.crypta.keys.CHKBlock; import network.crypta.keys.ClientKey; import network.crypta.keys.Key; -import network.crypta.node.KeysFetchingLocally; import network.crypta.node.SendableRequestItem; import network.crypta.node.SendableRequestItemKey; import network.crypta.support.MemoryLimitedJobRunner; @@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory; /** - * Maintains persistent and in-memory state for a splitfile fetch. + * Maintains a persistent and in-memory state for a splitfile fetch. * *
This storage binds a {@link SplitFileFetcher} to a single {@link LockableRandomAccessBuffer}, * keeping hot metadata in memory while persisting block data, key lists, and progress so the fetch @@ -49,8 +48,8 @@ * *
The layout aims to minimize seeks and maximize robustness while tolerating recovery work after * checksum failures. Callbacks into the fetcher run off-thread via job runners, and locking is - * shallow and taken last relative to segment locks. The storage itself is transient and recreated - * on startup; persisted sections carry enough state to rehydrate progress. + * shallow and taken last relatively to segment locks. The storage itself is transient and recreated + * on startup; persisted sections carry enough states to rehydrate progress. * *
On-disk sections include: * @@ -59,7 +58,7 @@ *
The codec is selected from the metadata's {@code SplitfileAlgorithm} and is used when
- * reconstructing missing blocks during segment decode. It remains constant for the lifetime of a
- * {@code SplitFileFetcherStorage} instance and is read-mostly; callers must treat it as
+ * reconstructing missing blocks during segment decoding. It remains constant for the lifetime of
+ * a {@code SplitFileFetcherStorage} instance and is read-mostly; callers must treat it as
* immutable. Implementations may allocate native buffers or other resources inside the codec, so
* it should be reused rather than recreated for every operation.
*/
@@ -117,7 +116,7 @@ public class SplitFileFetcherStorage {
final MemoryLimitedJobRunner memoryLimitedJobRunner;
/**
- * Final length of the downloaded data. *BEFORE* decompression, filtering, etc. I.e. this is the
+ * Final length of the downloaded data. *BEFORE* decompression, filtering, etc. I.e., this is the
* length of the data on disk, which will be written by the StreamGenerator.
*/
final long finalLength;
@@ -136,11 +135,11 @@ public class SplitFileFetcherStorage {
final List This constructor validates footer magic, checksums, and version, locates each logical
* section, and rebuilds segment state and Bloom filters. It also reattaches asynchronous helpers
- * and prepares any pending decode attempts when segment metadata indicates partial progress.
+ * and prepares any pending decoding attempts when segment metadata indicates partial progress.
* Successful completion means the instance is ready for {@link #start(boolean)} with resume
* semantics and will reflect on-disk progress accurately.
*
- * @param p resume parameters including existing buffer and runtime helpers; must be non-null.
+ * @param p resume parameters, including existing buffer and runtime helpers, must be non-null.
* @throws IOException when the underlying buffer cannot be read or locked.
* @throws StorageFormatException when checksums, version, or offsets are invalid.
* @throws FetchException when resumed state violates fetch policy or limits.
@@ -465,23 +472,9 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageResumeParams p)
this.completeViaTruncation = p.completeViaTruncation;
this.rafLength = p.raf.size();
- ensureMinLength(rafLength);
- validateMagic(rafLength);
- byte[] versionBuf = readVersionBytes(rafLength);
- int version = parseInt(versionBuf);
- if (version != VERSION) throw new StorageFormatException("Wrong version " + version);
- byte[] checksumTypeBuf = readChecksumTypeBytes(rafLength);
- validateChecksumType(checksumTypeBuf);
- byte[] flagsBuf = readFlagsBytes(rafLength);
- validateFlags(flagsBuf);
-
- BasicSettingsInfo basicInfo =
- readBasicSettingsLocation(rafLength, flagsBuf, checksumTypeBuf, versionBuf);
- byte[] basicSettingsBuffer = readChecksummed(basicInfo.offset, basicInfo.length);
-
ParsedBasicSettings parsed =
- SplitFileFetcherStorageSettingsCodec.parseBasicSettings(
- basicSettingsBuffer, basicInfo.offset, p.completeViaTruncation, rafLength);
+ SplitFileFetcherStorageResumeReader.readParsedSettings(
+ raf, checksumChecker, checksumLength, rafLength, p.completeViaTruncation);
// Assign parsed values to final fields
this.splitfileType = parsed.getSplitfileType();
@@ -505,322 +498,37 @@ public SplitFileFetcherStorage(SplitFileFetcherStorageResumeParams p)
// Allocate and assign segments array before constructing individual segments.
this.segments = new SplitFileFetcherSegmentStorage[parsed.getSegmentCount()];
this.randomSegmentIterator = new RandomArrayIterator<>(segments);
- SegmentsInit segmentsInit =
- initSegmentsFromStream(
- parsed.getTotalDataBlocks(),
- parsed.getTotalCheckBlocks(),
- parsed.getTotalCrossCheckBlocks(),
- parsed.getSettingsStream(),
- p.completeViaTruncation,
- p.keysFetching,
- this.segments);
+ SplitFileFetcherSegmentsInit segmentsInit =
+ SplitFileFetcherSegmentsBuilder.initSegmentsFromStream(
+ new SplitFileFetcherSegmentsLoadParams(
+ this,
+ parsed.getTotalDataBlocks(),
+ parsed.getTotalCheckBlocks(),
+ parsed.getTotalCrossCheckBlocks(),
+ parsed.getSettingsStream(),
+ p.completeViaTruncation,
+ p.keysFetching,
+ this.segments,
+ checksumLength,
+ splitfileSingleCryptoKey != null,
+ offsetKeyList,
+ offsetSegmentStatus,
+ rafLength));
this.crossSegments = segmentsInit.crossSegments;
this.keyListener =
new SplitFileFetcherKeyListener(
this, fetcher, segmentsInit.remainingStream, false, p.newSalt);
- postInitReadSegmentState();
- readGeneralProgress();
+ SplitFileFetcherStorageRecovery recovery = new SplitFileFetcherStorageRecovery(this);
+ recovery.postInitReadSegmentState();
+ recovery.readGeneralProgress();
initAsyncHelpers();
}
- // ---------- Small helpers to reduce cognitive complexity in constructor ----------
-
- private void ensureMinLength(long length) throws StorageFormatException {
- if (length < 8) throw new StorageFormatException("Too short");
- }
-
- private void validateMagic(long length) throws IOException, StorageFormatException {
- byte[] buf = new byte[8];
- raf.pread(length - 8, buf, 0, 8);
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
- if (dis.readLong() != END_MAGIC) throw new StorageFormatException("Wrong magic bytes");
- }
-
- private static int parseInt(byte[] buf) throws IOException {
- return new DataInputStream(new ByteArrayInputStream(buf)).readInt();
- }
-
- private byte[] readVersionBytes(long length) throws IOException {
- byte[] versionBuf = new byte[4];
- raf.pread(length - 12, versionBuf, 0, 4);
- return versionBuf;
- }
-
- private byte[] readChecksumTypeBytes(long length) throws IOException {
- byte[] checksumTypeBuf = new byte[2];
- raf.pread(length - 14, checksumTypeBuf, 0, 2);
- return checksumTypeBuf;
- }
-
- private void validateChecksumType(byte[] checksumTypeBuf)
- throws IOException, StorageFormatException {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(checksumTypeBuf));
- int checksumType = dis.readShort();
- if (checksumType != ChecksumChecker.CHECKSUM_CRC)
- throw new StorageFormatException("Unknown checksum type " + checksumType);
- }
-
- private byte[] readFlagsBytes(long length) throws IOException {
- byte[] flagsBuf = new byte[4];
- raf.pread(length - 18, flagsBuf, 0, 4);
- return flagsBuf;
- }
-
- private void validateFlags(byte[] flagsBuf) throws IOException, StorageFormatException {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(flagsBuf));
- int flags = dis.readInt();
- if (flags != 0) throw new StorageFormatException("Unknown flags: " + flags);
- }
-
- private BasicSettingsInfo readBasicSettingsLocation(
- long length, byte[] flagsBuf, byte[] checksumTypeBuf, byte[] versionBuf)
- throws IOException, StorageFormatException {
- byte[] buf = new byte[14];
- raf.pread(length - (22 + checksumLength), buf, 0, 4);
- byte[] checksum = new byte[checksumLength];
- raf.pread(length - (18 + checksumLength), checksum, 0, checksumLength);
- System.arraycopy(flagsBuf, 0, buf, 4, 4);
- System.arraycopy(checksumTypeBuf, 0, buf, 8, 2);
- System.arraycopy(versionBuf, 0, buf, 10, 4);
- if (!checksumChecker.checkChecksum(buf, 0, 14, checksum))
- throw new StorageFormatException("Checksum failed on basic settings length and version");
- int basicSettingsLength = parseInt(buf);
- if (basicSettingsLength < 0
- || basicSettingsLength + 12 + 4 + checksumLength > raf.size()
- || basicSettingsLength > 1024 * 1024)
- throw new StorageFormatException("Bad basic settings length");
- long basicSettingsOffset = length - (18 + 4 + checksumLength * 2L + basicSettingsLength);
- return new BasicSettingsInfo(basicSettingsOffset, basicSettingsLength);
- }
-
- private byte[] readChecksummed(long offset, int length)
- throws StorageFormatException, IOException {
- byte[] basicSettingsBuffer = new byte[length];
- try {
- preadChecksummed(offset, basicSettingsBuffer, 0, length);
- } catch (ChecksumFailedException _) {
- throw new StorageFormatException("Basic settings checksum invalid");
- }
- return basicSettingsBuffer;
- }
-
- private SegmentsInit initSegmentsFromStream(
- int totalDataBlocks,
- int totalCheckBlocks,
- int totalCrossCheckBlocks,
- DataInputStream dis,
- boolean completeViaTruncation,
- KeysFetchingLocally keysFetching,
- SplitFileFetcherSegmentStorage[] segments)
- throws StorageFormatException, IOException {
- // segments array is provided and already assigned to this.segments
- long dataOffset = 0;
- long crossCheckBlocksOffset =
- completeViaTruncation ? (long) totalDataBlocks * CHKBlock.DATA_LENGTH : 0;
- long segmentKeysOffset = offsetKeyList;
- long segmentStatusOffset = offsetSegmentStatus;
- int countDataBlocks = 0;
- int countCheckBlocks = 0;
- int countCrossCheckBlocks = 0;
- for (int i = 0; i < segments.length; i++) {
- SplitFileFetcherSegmentStorage.LoadParams lp =
- new SplitFileFetcherSegmentStorage.LoadParams();
- lp.parent = this;
- lp.dis = dis;
- lp.segNo = i;
- lp.writeRetries = maxRetries != -1;
- lp.segmentDataOffset = dataOffset;
- lp.segmentCrossCheckDataOffset = completeViaTruncation ? crossCheckBlocksOffset : -1;
- lp.segmentKeysOffset = segmentKeysOffset;
- lp.segmentStatusOffset = segmentStatusOffset;
- lp.keysFetching = keysFetching;
- segments[i] = new SplitFileFetcherSegmentStorage(lp);
- int dataBlocks = segments[i].dataBlocks;
- countDataBlocks += dataBlocks;
- int checkBlocks = segments[i].checkBlocks;
- countCheckBlocks += checkBlocks;
- int crossCheckBlocks = segments[i].crossSegmentCheckBlocks;
- countCrossCheckBlocks += crossCheckBlocks;
- dataOffset += (long) dataBlocks * CHKBlock.DATA_LENGTH;
- if (completeViaTruncation)
- crossCheckBlocksOffset += (long) crossCheckBlocks * CHKBlock.DATA_LENGTH;
- else dataOffset += (long) crossCheckBlocks * CHKBlock.DATA_LENGTH;
- segmentKeysOffset +=
- SplitFileFetcherSegmentStorage.storedKeysLength(
- dataBlocks + crossCheckBlocks,
- checkBlocks,
- splitfileSingleCryptoKey != null,
- checksumLength);
- segmentStatusOffset +=
- SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength(
- dataBlocks, checkBlocks, crossCheckBlocks, maxRetries != -1, checksumLength, true);
- validateSegmentOffsets(dataOffset, segments[i]);
- debugSegmentOffsets(i, segments[i]);
- }
- validateTotals(
- countDataBlocks,
- totalDataBlocks,
- countCheckBlocks,
- totalCheckBlocks,
- countCrossCheckBlocks,
- totalCrossCheckBlocks);
-
- int crossSegmentsCount = dis.readInt();
- SplitFileFetcherCrossSegmentStorage[] crossSegmentsLocal =
- (crossSegmentsCount == 0)
- ? null
- : new SplitFileFetcherCrossSegmentStorage[crossSegmentsCount];
- for (int i = 0; i < crossSegmentsCount; i++) {
- // crossSegmentsLocal is non-null when crossSegmentsCount > 0
- crossSegmentsLocal[i] = new SplitFileFetcherCrossSegmentStorage(this, i, dis);
- }
- return new SegmentsInit(segments, crossSegmentsLocal, dis);
- }
-
- private static void validateTotals(
- int countDataBlocks,
- int totalDataBlocks,
- int countCheckBlocks,
- int totalCheckBlocks,
- int countCrossCheckBlocks,
- int totalCrossCheckBlocks)
- throws StorageFormatException {
- if (countDataBlocks != totalDataBlocks)
- throw new StorageFormatException(
- "Total data blocks " + countDataBlocks + " but expected " + totalDataBlocks);
- if (countCheckBlocks != totalCheckBlocks)
- throw new StorageFormatException(
- "Total check blocks " + countCheckBlocks + " but expected " + totalCheckBlocks);
- if (countCrossCheckBlocks != totalCrossCheckBlocks)
- throw new StorageFormatException(
- "Total cross-check blocks "
- + countCrossCheckBlocks
- + " but expected "
- + totalCrossCheckBlocks);
- }
-
- private void debugSegmentOffsets(int index, SplitFileFetcherSegmentStorage segment) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Segment {}: data blocks offset {} cross-check blocks offset {} for segment {} of {}",
- index,
- segment.segmentBlockDataOffset,
- segment.segmentCrossCheckBlockDataOffset,
- index,
- this);
- }
- }
-
- private void validateSegmentOffsets(long dataOffset, SplitFileFetcherSegmentStorage segment)
- throws StorageFormatException {
- if (dataOffset > rafLength)
- throw new StorageFormatException(
- "Data offset past end of file " + dataOffset + " of " + rafLength);
- if (segment.segmentCrossCheckBlockDataOffset > rafLength)
- throw new StorageFormatException(
- "Cross-check blocks offset past end of file "
- + segment.segmentCrossCheckBlockDataOffset
- + " of "
- + rafLength);
- }
-
- private void postInitReadSegmentState()
- throws FetchException, IOException, StorageFormatException {
- for (SplitFileFetcherSegmentStorage segment : segments) {
- boolean needsDecode = determineIfSegmentNeedsDecode(segment);
- if (needsDecode) queueSegmentForDecode(segment);
- }
- readAllSegmentKeys();
- checkCrossSegmentsIfAny();
- }
-
- private boolean determineIfSegmentNeedsDecode(SplitFileFetcherSegmentStorage segment)
- throws FetchException, IOException, StorageFormatException {
- boolean needsDecode = false;
- try {
- segment.readMetadata();
- if (segment.hasFailed()) {
- raf.close();
- raf.free(); // Failed, so free it.
- throw new FetchException(FetchExceptionMode.SPLITFILE_ERROR, errors);
- }
- } catch (ChecksumFailedException _) {
- LOG.error("Progress for segment {} on {} corrupted.", segment.segNo, this);
- needsDecode = true;
- }
- if (segment.needsDecode()) needsDecode = true;
- return needsDecode;
- }
-
- private void queueSegmentForDecode(SplitFileFetcherSegmentStorage segment) {
- if (segmentsToTryDecode == null) segmentsToTryDecode = new ArrayList<>();
- segmentsToTryDecode.add(segment);
- }
-
- private void readAllSegmentKeys() throws StorageFormatException, IOException {
- for (SplitFileFetcherSegmentStorage segment : segments) {
- try {
- segment.readSegmentKeys();
- } catch (ChecksumFailedException _) {
- throw new StorageFormatException("Keys corrupted");
- }
- }
- }
-
- private void checkCrossSegmentsIfAny() {
- if (this.crossSegments == null) return;
- for (SplitFileFetcherCrossSegmentStorage crossSegment : this.crossSegments)
- // Must be after reading the metadata for the plain segments.
- crossSegment.checkBlocks();
- }
-
- private static class BasicSettingsInfo {
- final long offset;
- final int length;
-
- BasicSettingsInfo(long offset, int length) {
- this.offset = offset;
- this.length = length;
- }
- }
-
- private static class SegmentsInit {
- final SplitFileFetcherSegmentStorage[] segments;
- final SplitFileFetcherCrossSegmentStorage[] crossSegments;
- final DataInputStream remainingStream;
-
- SegmentsInit(
- SplitFileFetcherSegmentStorage[] segments,
- SplitFileFetcherCrossSegmentStorage[] crossSegments,
- DataInputStream remainingStream) {
- this.segments = segments;
- this.crossSegments = crossSegments;
- this.remainingStream = remainingStream;
- }
- }
-
- private void readGeneralProgress() throws IOException {
- try {
- byte[] buf = preadChecksummedWithLength(offsetGeneralProgress);
- ByteArrayInputStream bais = new ByteArrayInputStream(buf);
- DataInputStream dis = new DataInputStream(bais);
- long flags = dis.readLong();
- if ((flags & HAS_CHECKED_DATASTORE_FLAG) != 0) hasCheckedDatastore = true;
- errors = new FailureCodeTracker(false, dis);
- dis.close();
- } catch (ChecksumFailedException | StorageFormatException e) {
- LOG.error("Failed to read general progress: {}", String.valueOf(e));
- // Reset general progress
- this.hasCheckedDatastore = false;
- this.errors = new FailureCodeTracker(false);
- }
- }
-
/**
* Start the storage layer and enqueue any required recovery work.
*
- * This method reattaches cross-segment helpers, schedules any deferred decode attempts, and
+ * This method reattaches cross-segment helpers, schedules any deferred decoding attempts, and
* optionally notifies the fetcher of resume statistics. When key Bloom filters are missing, it
* triggers asynchronous regeneration and returns {@code false} so the caller does not schedule
* requests prematurely. It performs no network I/O but may queue background work that later
@@ -832,9 +540,10 @@ private void readGeneralProgress() throws IOException {
*/
public boolean start(boolean resume) {
if (resume) onResumeInit();
- restartCrossSegments();
- scheduleTryDecodeForBrokenSegments();
- if (keyListener.needsKeys()) return regenerateKeysAsync();
+ SplitFileFetcherStorageRecovery recovery = new SplitFileFetcherStorageRecovery(this);
+ recovery.restartCrossSegments();
+ recovery.scheduleTryDecodeForBrokenSegments();
+ if (keyListener.needsKeys()) return recovery.regenerateKeysAsync();
return true;
}
@@ -855,80 +564,6 @@ private void onResumeInit() {
fetcher.onResume(succeededBlocks, failedBlocks, clientMetadata, decompressedLength);
}
- private void restartCrossSegments() {
- if (crossSegments == null) return;
- for (SplitFileFetcherCrossSegmentStorage segment : crossSegments) {
- segment.restart();
- }
- }
-
- private void scheduleTryDecodeForBrokenSegments() {
- if (segmentsToTryDecode == null) return;
- List When running in persistent mode, metadata changes are checkpointed asynchronously to reduce
* contention and I/O overhead. Calls may coalesce when invoked in quick succession, so it is safe
* to call this after each state change without creating extra disk churn. In non-persistent mode
- * this method is a no-op. The write occurs off-thread and does not guarantee immediate
+ * this method is a no-op. The writing occurs off-thread and does not guarantee immediate
* durability; callers that need a synchronous flush should use higher-level shutdown paths.
*/
public void lazyWriteMetadata() {
if (!persistent) return;
if (LAZY_WRITE_METADATA_DELAY > 0 && !isFinishing()) {
- // The Runnable must be the same object for de-duplication.
+ // The Runnable must be the same object for deduplication.
ticker.queueTimedJob(
wrapLazyWriteMetadata,
"Write metadata for splitfile",
@@ -1416,7 +868,7 @@ private void closeOffThread() {
jobRunner.queueNormalOrDrop(
_ -> {
// ATOMICITY/DURABILITY: This will run after the checkpoint after completion.
- // So after restart, even if the checkpoint failed, we will be in a valid state.
+ // So after restarting, even if the checkpoint failed, we will be in a valid state.
// This is why this is queue() not queueInternal().
close();
return true;
@@ -1425,8 +877,8 @@ private void closeOffThread() {
private void finishedEncoding() {
FinishState state = computeFinishEncodingState();
- if (state.alreadyFinished) return;
- if (state.lateCompletion) {
+ if (state.alreadyFinished()) return;
+ if (state.lateCompletion()) {
if (allFinished() && !allSucceeded()) {
fail(new FetchException(FetchExceptionMode.SPLITFILE_ERROR, errors));
} else {
@@ -1435,21 +887,12 @@ private void finishedEncoding() {
return;
}
}
- if (state.waitingForFetcher) return;
+ if (state.waitingForFetcher()) return;
closeOffThread();
}
- private static final class FinishState {
- final boolean alreadyFinished;
- final boolean lateCompletion;
- final boolean waitingForFetcher;
-
- FinishState(boolean alreadyFinished, boolean lateCompletion, boolean waitingForFetcher) {
- this.alreadyFinished = alreadyFinished;
- this.lateCompletion = lateCompletion;
- this.waitingForFetcher = waitingForFetcher;
- }
- }
+ private record FinishState(
+ boolean alreadyFinished, boolean lateCompletion, boolean waitingForFetcher) {}
private FinishState computeFinishEncodingState() {
boolean alreadyFinished = false;
@@ -1485,8 +928,8 @@ void close() {
/**
* Called when a segment has finished encoding. It is possible that it has simply restarted; it is
- * not guaranteed to have encoded all blocks etc. But we still need the callback in case e.g. we
- * are in the process of failing, and can't proceed until all the encode jobs have finished.
+ * not guaranteed to have encoded all blocks etc. But we still need the callback in case e.g., we
+ * are in the process of failing, and can't proceed until all the encoding jobs have finished.
*/
void finishedEncoding(SplitFileFetcherSegmentStorage segment) {
if (LOG.isDebugEnabled())
@@ -1545,8 +988,8 @@ public void fail(final FetchException e) {
*
* This helper converts the condition into a consolidated {@link FetchException} and routes it
* through {@link #fail(FetchException)} so shutdown happens consistently. It does not mutate the
- * segment itself; segment state is expected to already reflect the terminal failure condition.
- * Callers typically invoke this after a retry budget check fails.
+ * segment itself; the segment state is expected to already reflect the terminal failure
+ * condition. Callers typically invoke this after a retry budget check fails.
*
* @param segment segment that can no longer make progress, used only for logging.
*/
@@ -1601,7 +1044,7 @@ public void failOnDiskError(final ChecksumFailedException e) {
* Count the number of not-yet-fetched keys across all segments.
*
* The value includes keys that are currently cooling down or temporarily skipped. It is
- * computed from in-memory segment state and does not perform disk I/O. Use this for progress
+ * computed from the in-memory segment state and does not perform disk I/O. Use this for progress
* estimation rather than exact completion criteria; failed blocks and delayed retries can cause
* the count to oscillate.
*
@@ -1652,11 +1095,11 @@ public long countSendableKeys() {
* Lightweight handle identifying a concrete block within a specific segment.
*
* Instances are stable identifiers used by schedulers and callbacks to correlate request
- * outcomes with storage state. The handle is immutable and embeds the owning storage, the segment
- * index, and the block index so it can be used as a map key or queue token without additional
- * lookups. It is not a cryptographic key; instead it is a positional reference that can be
- * resolved to a {@link ClientKey} through {@link SplitFileFetcherStorage#getKey}. Equality and
- * hash semantics incorporate the storage instance to avoid accidental collisions across
+ * outcomes with the storage state. The handle is immutable and embeds the owning storage, the
+ * segment index, and the block index, so it can be used as a map key or queue token without
+ * additional lookups. It is not a cryptographic key; instead it is a positional reference that
+ * can be resolved to a {@link ClientKey} through {@link SplitFileFetcherStorage#getKey}. Equality
+ * and hash semantics incorporate the storage instance to avoid accidental collisions across
* concurrent fetches.
*/
public static final class SplitFileFetcherStorageKey
@@ -1669,8 +1112,8 @@ public static final class SplitFileFetcherStorageKey
* values obtained from segment state or scheduling helpers. The resulting instance is immutable
* and can be safely cached or used as a map key for the lifetime of the storage.
*
- * @param n zero-based block index within the segment, must be in range.
- * @param segNo zero-based segment index for the owning storage, must be valid.
+ * @param n a zero-based block index within the segment must be in range.
+ * @param segNo a zero-based segment index for the owning storage must be valid.
* @param storage owning storage instance used for lookups and equality; must be non-null.
*/
public SplitFileFetcherStorageKey(int n, int segNo, SplitFileFetcherStorage storage) {
@@ -1773,7 +1216,7 @@ public String toString() {
*
* The selection uses a time‑varying seed to distribute requests and avoids segments currently
* decoding or ineligible due to cooldown/retry constraints. The method synchronizes on the
- * segment iterator to keep selection state consistent and may scan multiple segments before
+ * segment iterator to keep the selection state consistent and may scan multiple segments before
* finding a candidate. Returns {@code null} when no key is presently sendable.
*
* @return a randomly selected {@link SplitFileFetcherStorageKey} or {@code null} when none is
@@ -1801,7 +1244,7 @@ public SplitFileFetcherStorageKey chooseRandomKey() {
return null;
}
- /** Cancel the download, stop all FEC decodes, and call close() off-thread when done. */
+ /** Cancel the download, stop all FEC decoding, and call close() off-thread when done. */
void cancel() {
synchronized (this) {
cancelled = true;
@@ -1815,10 +1258,11 @@ void cancel() {
/**
* Callback invoked after checking the local datastore for a candidate key.
*
- * When a check produces a definitive result the storage updates segment state and may schedule
- * additional work. The method runs off-thread to avoid blocking request selection. It increments
- * the appropriate failure counters and notifies each segment that datastore probing has completed
- * so retries can proceed. If all segments are already finished, the callback is a no-op.
+ * When a check produces a definitive result, the storage updates segment state and may
+ * schedule additional work. The method runs off-thread to avoid blocking request selection. It
+ * increments the appropriate failure counters and notifies each segment that datastore probing
+ * has completed so retries can proceed. If all segments are already finished, the callback is a
+ * no-op.
*/
public void finishedCheckingDatastoreOnLocalRequest() {
// At this point, all the blocks will have been processed.
@@ -1841,7 +1285,7 @@ synchronized boolean isFinishing() {
/**
* Record a non-fatal failure for a specific block request.
*
- * The error is accumulated in the failure tracker and the key is made retryable according to
+ * The error is accumulated in the failure tracker, and the key is made retryable according to
* the configured policy. Persistent sessions schedule a metadata checkpoint. Callers should only
* invoke this for failures that may be retried; terminal failures should go through {@link
* #fail(FetchException)} or {@link #failOnSegment(SplitFileFetcherSegmentStorage)}.
@@ -1867,7 +1311,7 @@ public void onFailure(SplitFileFetcherStorageKey key, FetchException fe) {
*
* The lookup reads segment key data, so it may perform disk I/O and acquire the RAF lock. On
* I/O failure the storage reports a disk error and returns {@code null}. The call does not change
- * segment state. Callers should treat the returned {@link ClientKey} as read-only.
+ * the segment state. Callers should treat the returned {@link ClientKey} as read-only.
*
* @param key handle identifying a block, usually from {@link #chooseRandomKey()}.
* @return client key for the block, or {@code null} on I/O failure.
@@ -1899,9 +1343,9 @@ public int maxRetries() {
* Notify the fetcher that a block has permanently failed.
*
* The notification is posted asynchronously to keep request-selection threads responsive. It
- * does not modify storage state directly; callers should ensure segment bookkeeping has already
- * transitioned the block into a terminal state before invoking this callback. Multiple calls may
- * be coalesced by the fetcher.
+ * does not modify the storage state directly; callers should ensure segment bookkeeping has
+ * already transitioned the block into a terminal state before invoking this callback. The fetcher
+ * may coalesce multiple calls.
*/
public void failedBlock() {
jobRunner.queueNormalOrDrop(
@@ -1976,10 +1420,10 @@ void increaseCooldown(final long cooldownTime) {
/**
* Clear global cooldown when all segments have exited their individual cooldown windows.
*
- * This is safe to call frequently; it performs cheap checks and posts to the fetcher when the
- * global flag changes. The method synchronizes on the cooldown lock and will reset the global
- * wakeup time only when every segment reports a cleared cooldown. It does not schedule requests
- * directly; that remains the fetcher's responsibility.
+ * This is safe to call frequently; it performs inexpensive checks and posts to the fetcher
+ * when the global flag changes. The method synchronizes on the cooldown lock and will reset the
+ * global wakeup time only when every segment reports a cleared cooldown. It does not schedule
+ * requests directly; that remains the fetcher's responsibility.
*/
public void maybeClearCooldown() {
synchronized (cooldownLock) {
diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStorageLayout.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageLayout.java
new file mode 100644
index 0000000000..7dadc6485e
--- /dev/null
+++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageLayout.java
@@ -0,0 +1,140 @@
+package network.crypta.client.async;
+
+import network.crypta.client.FetchException;
+import network.crypta.client.FetchException.FetchExceptionMode;
+import network.crypta.keys.CHKBlock;
+
+/**
+ * Computes size totals and validates splitfile metadata for fetcher storage layouts.
+ *
+ * This package-private helper concentrates the calculations that derive persisted lengths from
+ * segment keys and configuration flags. It is intentionally stateless: every method is a pure
+ * computation or validation based on the provided arguments, making it safe to call repeatedly
+ * during planning and preflight validation. The layout math assumes that segment arrays already
+ * encode the final data/check block counts per segment, including any cross-check blocks that must
+ * be removed before reporting the user-visible data block total.
+ *
+ * Typical usage is to precompute storage sizes before allocating persistent files and to perform
+ * lightweight sanity checks on metadata lengths. The methods do not synchronize and do not retain
+ * references to inputs; callers are responsible for ensuring consistent inputs across segments.
+ *
+ * Each component reflects an aggregate across all segments and uses byte counts where noted.
+ * The record is intentionally small and value-like so that callers can pass it between planning
+ * and persistence steps without additional bookkeeping.
+ *
+ * @param splitfileDataBlocks total data blocks after cross-check blocks are excluded
+ * @param splitfileCheckBlocks total check blocks across all segments in the splitfile
+ * @param storedKeysLength total stored key bytes required for all segment key material
+ * @param storedSegmentStatusLength total stored segment status bytes including padding
+ */
+ record AccumulatedSizes(
+ int splitfileDataBlocks,
+ int splitfileCheckBlocks,
+ long storedKeysLength,
+ long storedSegmentStatusLength) {}
+
+ /**
+ * Aggregates per-segment sizes into a single storage summary for the splitfile.
+ *
+ * The calculation walks each segment entry, summing data and check blocks, stored key lengths,
+ * and stored segment status lengths. Cross-check blocks are counted while iterating, then removed
+ * once at the end so that the returned data block count reflects the effective payload size. The
+ * method is deterministic and has no side effects, so callers can reuse it to recompute totals
+ * when the configuration changes.
+ *
+ * The comparison allows a small difference of up to one data block length to tolerate rounding
+ * effects during metadata assembly. When the excess exceeds that tolerance, a fetch exception is
+ * raised to prevent fetchers from trusting malformed metadata. The method performs only the
+ * length comparison and does not mutate state or record diagnostics.
+ *
+ * @param checkLength reported check-length value in bytes from splitfile metadata
+ * @param finalLength expected final length in bytes derived from segment information
+ * @throws FetchException when the check length exceeds the final length beyond tolerance
+ */
+ static void validateCheckLength(long checkLength, long finalLength) throws FetchException {
+ if (checkLength > finalLength && checkLength - finalLength > CHKBlock.DATA_LENGTH)
+ throw new FetchException(
+ FetchExceptionMode.INVALID_METADATA,
+ "Splitfile is " + checkLength + " bytes long but length is " + finalLength + " bytes");
+ }
+
+ /**
+ * Ensures a splitfile describes at least one segment before processing begins.
+ *
+ * This is a defensive check used in assertion-heavy paths. It treats a non-positive segment
+ * count as a programmer error rather than user input and therefore throws an {@link
+ * AssertionError}. The method does not return a value and does not attempt to recover.
+ *
+ * @param segmentCount number of segments that the splitfile metadata declares
+ * @throws AssertionError when {@code segmentCount} is zero or negative
+ */
+ static void validateSegmentCount(int segmentCount) {
+ if (segmentCount <= 0) {
+ throw new AssertionError("A splitfile has to have at least one segment");
+ }
+ }
+}
diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStoragePersistenceWriter.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStoragePersistenceWriter.java
new file mode 100644
index 0000000000..4f77e814d6
--- /dev/null
+++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStoragePersistenceWriter.java
@@ -0,0 +1,80 @@
+package network.crypta.client.async;
+
+import java.io.IOException;
+
+/**
+ * Writes the initial persistent layout for splitfile fetcher storage.
+ *
+ * This helper centralizes creation-time disk writes and footer serialization so the storage
+ * orchestrator can focus on lifecycle and scheduling. It assumes the caller has already assembled
+ * segment keys, precomputed metadata, and encoded the settings payload. The implementation performs
+ * direct random-access writes and metadata serialization, so it must run before any fetch work that
+ * depends on the persistent structure.
+ *
+ * The class is a pure utility: it holds no state and is intended for one-time initialization of
+ * on-disk files. Calls are safe to repeat only when the caller manages file replacement or a clean
+ * initialization path. Concurrency control is delegated to the caller and the storage lock, so
+ * callers should avoid invoking it from multiple threads without external coordination.
+ *
+ * The method acquires the storage lock and then writes per-segment key material to the RAF. If
+ * the storage is persistent, it proceeds to flush segment metadata, general progress, Bloom
+ * filters, and the serialized metadata footer. The method assumes the arrays are aligned by
+ * segment index and does not perform validation beyond what called helpers enforce.
+ *
+ * This helper encapsulates the validation and regeneration work needed when reopening a
+ * persistent splitfile fetcher. It reads per-segment metadata, rebuilds in-memory progress state,
+ * and schedules background work for decoding or Bloom filter regeneration. The class is tightly
+ * coupled to {@link SplitFileFetcherStorage} and expects the storage to be initialized with valid
+ * offsets and segment arrays before any recovery method is invoked.
+ *
+ * The methods are not inherently thread-safe; callers should ensure serialized access using the
+ * storage's locking discipline. Recovery is designed to be idempotent as long as the underlying
+ * persistent data are stable, but the caller is responsible for avoiding concurrent mutation of the
+ * same storage.
+ *
+ * The reference is immutable and shared across recovery steps. Callers must ensure the storage
+ * remains valid for the duration of recovery because methods access RAF offsets, segment arrays,
+ * and listener hooks without additional null checks.
+ */
+ private final SplitFileFetcherStorage storage;
+
+ /**
+ * Creates a recovery helper for the provided storage instance.
+ *
+ * The storage is assumed to be initialized but not yet recovered; callers should construct the
+ * helper after the storage structure and offsets are ready and before any fetch work resumes. The
+ * helper retains the storage reference for all later recovery operations.
+ *
+ * @param storage initialized storage instance whose state will be recovered
+ */
+ SplitFileFetcherStorageRecovery(SplitFileFetcherStorage storage) {
+ this.storage = storage;
+ }
+
+ /**
+ * Reads per-segment metadata and enqueues segments that need decode attempts.
+ *
+ * This method walks every segment, reads its metadata, records decode requirements, then reads
+ * all segment keys, and verifies cross-segment blocks. It is intended to run once after opening
+ * persistent storage, before any decoding tasks are started. The method may close and free the
+ * RAF if a segment is marked failed, propagating a {@link FetchException} in that case.
+ *
+ * @throws FetchException when the stored metadata indicates a splitfile failure
+ * @throws IOException when reading segment metadata or keys fails
+ * @throws StorageFormatException when persisted, segment data is corrupt or inconsistent
+ */
+ void postInitReadSegmentState() throws FetchException, IOException, StorageFormatException {
+ for (SplitFileFetcherSegmentStorage segment : storage.segments) {
+ boolean needsDecode = determineIfSegmentNeedsDecode(segment);
+ if (needsDecode) queueSegmentForDecode(segment);
+ }
+ readAllSegmentKeys();
+ checkCrossSegmentsIfAny();
+ }
+
+ /**
+ * Reads the persisted general progress block and applies it to the storage state.
+ *
+ * The method fetches the check-summed progress payload, decodes flags and failure codes, and
+ * updates the in-memory state of the storage. If the payload is corrupted or fails checksum
+ * validation, the method logs an error and resets the progress fields to safe defaults so the
+ * fetcher can proceed without relying on the corrupted state.
+ *
+ * @throws IOException when the progress block cannot be read from storage
+ */
+ void readGeneralProgress() throws IOException {
+ try {
+ byte[] buf = storage.preadChecksummedWithLength(storage.offsetGeneralProgress);
+ ByteArrayInputStream bais = new ByteArrayInputStream(buf);
+ DataInputStream dis = new DataInputStream(bais);
+ long flags = dis.readLong();
+ if ((flags & SplitFileFetcherStorage.HAS_CHECKED_DATASTORE_FLAG) != 0)
+ storage.hasCheckedDatastore = true;
+ storage.errors = new FailureCodeTracker(false, dis);
+ dis.close();
+ } catch (ChecksumFailedException | StorageFormatException e) {
+ SplitFileFetcherStorage.LOG.error("Failed to read general progress: {}", String.valueOf(e));
+ // Reset general progress
+ storage.hasCheckedDatastore = false;
+ storage.errors = new FailureCodeTracker(false);
+ }
+ }
+
+ /**
+ * Queues an asynchronous Bloom filter regeneration job when persistence is enabled.
+ *
+ * The job rebuilds key indices by scanning segment keys and then writes regenerated Bloom
+ * filters to disk. If persistence is disabled, the queue operation is ignored and the method
+ * returns {@code false}. This method does not wait for completion and always returns {@code
+ * false}, matching the job-runner callback signature used by the caller.
+ *
+ * @return always {@code false}; the regeneration work is scheduled asynchronously
+ */
+ boolean regenerateKeysAsync() {
+ try {
+ storage.jobRunner.queue(
+ _ -> {
+ regenerateKeysJob();
+ return false;
+ },
+ SplitFileFetcherStorage.REGENERATE_KEYS_PRIORITY);
+ } catch (PersistenceDisabledException _) {
+ // Ignore.
+ }
+ return false;
+ }
+
+ /**
+ * Restarts any cross-segment storage blocks that are present.
+ *
+ * The method is a no-op when the storage does not include cross-segments. When present, each
+ * cross-segment is restarted in sequence to rebuild any transient state needed for recovery.
+ */
+ void restartCrossSegments() {
+ if (storage.crossSegments == null) return;
+ for (SplitFileFetcherCrossSegmentStorage segment : storage.crossSegments) {
+ segment.restart();
+ }
+ }
+
+ /**
+ * Schedules decode attempts for any segments marked as broken during recovery.
+ *
+ * The list is captured under the storage lock and then cleared, so each segment is only queued
+ * once. The method is safe to call even when no broken segments were recorded.
+ */
+ void scheduleTryDecodeForBrokenSegments() {
+ List The method reads the segment metadata, reports splitfile failure as a {@link
+ * FetchException}, and treats checksum errors as corruption that requires a decoding attempt. It
+ * also checks the segment's own decode flag after metadata is read. The method does not mutate
+ * the decoding queue; callers decide how to record the result.
+ *
+ * @param segment segment whose metadata should be inspected for decoding requirements
+ * @return {@code true} when the segment should be scheduled for decoding attempts
+ * @throws FetchException when the segment reports a failed splitfile state
+ * @throws IOException when reading segment metadata fails due to I/O errors
+ * @throws StorageFormatException when persisted, metadata cannot be parsed
+ */
+ private boolean determineIfSegmentNeedsDecode(SplitFileFetcherSegmentStorage segment)
+ throws FetchException, IOException, StorageFormatException {
+ boolean needsDecode = false;
+ try {
+ segment.readMetadata();
+ if (segment.hasFailed()) {
+ storage.getRAF().close();
+ storage.getRAF().free(); // Failed, so free it.
+ throw new FetchException(FetchExceptionMode.SPLITFILE_ERROR, storage.errors);
+ }
+ } catch (ChecksumFailedException _) {
+ SplitFileFetcherStorage.LOG.error(
+ "Progress for segment {} on {} corrupted.", segment.segNo, storage);
+ needsDecode = true;
+ }
+ if (segment.needsDecode()) needsDecode = true;
+ return needsDecode;
+ }
+
+ /**
+ * Appends a segment to the decoding queue, initializing the queue if necessary.
+ *
+ * The queue is stored in {@link SplitFileFetcherStorage#segmentsToTryDecode} and may be null
+ * until the first decoding candidate is found. This method performs minimal work and does not
+ * synchronize; callers should coordinate access via the storage lock where required.
+ *
+ * @param segment segment that should be decoded when recovery completes
+ */
+ private void queueSegmentForDecode(SplitFileFetcherSegmentStorage segment) {
+ if (storage.segmentsToTryDecode == null) storage.segmentsToTryDecode = new ArrayList<>();
+ storage.segmentsToTryDecode.add(segment);
+ }
+
+ /**
+ * Reads and validates all segment keys for the storage.
+ *
+ * The method iterates through each segment and loads its key list. If a checksum error occurs,
+ * the method fails fast with a {@link StorageFormatException} so the caller can treat the storage
+ * as corrupt and avoid using partial key data.
+ *
+ * @throws StorageFormatException when any segment keys fail checksum validation
+ * @throws IOException when key reads fail due to underlying I/O errors
+ */
+ private void readAllSegmentKeys() throws StorageFormatException, IOException {
+ for (SplitFileFetcherSegmentStorage segment : storage.segments) {
+ try {
+ segment.readSegmentKeys();
+ } catch (ChecksumFailedException _) {
+ throw new StorageFormatException("Keys corrupted");
+ }
+ }
+ }
+
+ /**
+ * Checks cross-segment blocks after segment metadata has been read.
+ *
+ * The method is a no-op when the storage has no cross-segment structures. The ordering matters
+ * because cross-segment validation depends on metadata from the plain segments.
+ */
+ private void checkCrossSegmentsIfAny() {
+ if (storage.crossSegments == null) return;
+ for (SplitFileFetcherCrossSegmentStorage crossSegment : storage.crossSegments)
+ // Must be after reading the metadata for the plain segments.
+ crossSegment.checkBlocks();
+ }
+
+ /**
+ * Rebuilds Bloom filters by scanning all segment keys and writing them to storage.
+ *
+ * The job is executed by the storage job runner and logs its progress. It adds every key to
+ * the key listener, writes regenerated Bloom filters, and informs the fetcher that recovery
+ * completed after corruption. If key scanning fails, the job terminates early after reporting the
+ * disk error.
+ */
+ private void regenerateKeysJob() {
+ Logger log = SplitFileFetcherStorage.LOG;
+ // Regenerating filters for this storage
+ log.error("Regenerating filters for {}", storage);
+ KeySalter salt = storage.fetcher.getSalter();
+ if (!addAllKeysFromSegments(salt)) return;
+ storage.keyListener.addedAllKeys();
+ writeBloomFiltersSafely();
+ storage.fetcher.restartedAfterDataCorruption();
+ log.warn("Finished regenerating filters for {}", storage);
+ }
+
+ /**
+ * Adds all segment keys to the key listener, returning {@code false} on failure.
+ *
+ * Each segment's key list is read and converted to node keys before being forwarded to the key
+ * listener. I/O or checksum errors cause the storage to be marked failed and stop further
+ * processing so that callers do not rely on a partial key set.
+ *
+ * @param salt salter used to derive node keys during listener updates
+ * @return {@code true} when all segments are processed without I/O or checksum failures
+ */
+ private boolean addAllKeysFromSegments(KeySalter salt) {
+ for (int i = 0; i < storage.segments.length; i++) {
+ SplitFileFetcherSegmentStorage segment = storage.segments[i];
+ try {
+ SplitFileSegmentKeys keys = segment.readSegmentKeys();
+ for (int j = 0; j < keys.totalKeys(); j++) {
+ storage.keyListener.addKey(keys.getKey(j, null, false).getNodeKey(false), i, salt);
+ }
+ } catch (IOException | ChecksumFailedException e) {
+ if (e instanceof IOException io) {
+ storage.failOnDiskError(io);
+ } else {
+ storage.failOnDiskError((ChecksumFailedException) e);
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Writes regenerated Bloom filters and handles I/O errors for persistent storage.
+ *
+ * The method writes segment and main Bloom filters. When persistence is enabled, I/O failures
+ * are reported to the storage failure handler; otherwise they are ignored because the storage is
+ * not durable.
+ */
+ private void writeBloomFiltersSafely() {
+ try {
+ storage.keyListener.initialWriteSegmentBloomFilters(storage.offsetSegmentBloomFilters);
+ storage.keyListener.innerWriteMainBloomFilter(storage.offsetMainBloomFilter);
+ } catch (IOException e) {
+ if (storage.persistent) storage.failOnDiskError(e);
+ }
+ }
+}
diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStorageResumeReader.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageResumeReader.java
new file mode 100644
index 0000000000..21f656089d
--- /dev/null
+++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageResumeReader.java
@@ -0,0 +1,317 @@
+package network.crypta.client.async;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import network.crypta.crypt.ChecksumChecker;
+import network.crypta.crypt.ChecksumFailedException;
+import network.crypta.support.api.LockableRandomAccessBuffer;
+import network.crypta.support.io.StorageFormatException;
+
+/**
+ * Parses and validates persisted splitfile storage headers for resume.
+ *
+ * This reader verifies the footer magic, checksum type, flags, and version before parsing the
+ * basic settings block via {@link SplitFileFetcherStorageSettingsCodec}. It isolates byte-level
+ * parsing and checksum validation from the storage orchestrator so that callers can resume only
+ * from storage that passes structural checks.
+ *
+ * All methods are static and side-effect free aside from reading from the supplied buffer. The
+ * reader assumes the caller has already opened the storage and provided the correct checksum
+ * configuration. It does not synchronize on the buffer, so callers must ensure exclusive access if
+ * they read concurrently.
+ *
+ * The method performs a fixed sequence of checks: minimum length, footer magic, version,
+ * checksum type, flags, and the check-summed basic settings length. It then reads the settings
+ * payload and parses it into a {@link ParsedBasicSettings} instance. The method is deterministic
+ * and does not alter the buffer contents.
+ *
+ * The minimum length is eight bytes because the footer magic is a {@code long} stored at the
+ * end of the file. Shorter buffers cannot contain a valid footer and are rejected early.
+ *
+ * @param length total buffer length in bytes
+ * @throws StorageFormatException when the buffer is shorter than the footer magic size
+ */
+ private static void ensureMinLength(long length) throws StorageFormatException {
+ if (length < 8) throw new StorageFormatException("Too short");
+ }
+
+ /**
+ * Reads and validates the footer magic from the end of the buffer.
+ *
+ * The method reads eight bytes from the end of the file and compares them to the expected
+ * magic constant. A mismatch indicates the storage is not a splitfile resume buffer.
+ *
+ * @param raf backing buffer containing the persisted splitfile storage
+ * @param length total length of the backing buffer in bytes
+ * @throws IOException when the buffer cannot be read
+ * @throws StorageFormatException when the footer magic does not match
+ */
+ private static void validateMagic(LockableRandomAccessBuffer raf, long length)
+ throws IOException, StorageFormatException {
+ byte[] buf = new byte[8];
+ raf.pread(length - 8, buf, 0, 8);
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
+ if (dis.readLong() != SplitFileFetcherStorage.END_MAGIC)
+ throw new StorageFormatException("Wrong magic bytes");
+ }
+
+ /**
+ * Parses a four-byte big-endian integer from the provided buffer.
+ *
+ * @param buf four-byte array containing the serialized integer value
+ * @return parsed integer value from the buffer
+ * @throws IOException when the buffer cannot be read as an integer
+ */
+ private static int parseInt(byte[] buf) throws IOException {
+ return new DataInputStream(new ByteArrayInputStream(buf)).readInt();
+ }
+
+ /**
+ * Reads the serialized version value located just before the footer magic.
+ *
+ * @param raf backing buffer containing the persisted splitfile storage
+ * @param length total length of the backing buffer in bytes
+ * @return four-byte buffer containing the serialized version
+ * @throws IOException when the version bytes cannot be read
+ */
+ private static byte[] readVersionBytes(LockableRandomAccessBuffer raf, long length)
+ throws IOException {
+ byte[] versionBuf = new byte[4];
+ raf.pread(length - 12, versionBuf, 0, 4);
+ return versionBuf;
+ }
+
+ /**
+ * Reads the checksum type bytes stored immediately before the version bytes.
+ *
+ * @param raf backing buffer containing the persisted splitfile storage
+ * @param length total length of the backing buffer in bytes
+ * @return two-byte buffer containing the serialized checksum type
+ * @throws IOException when the checksum type bytes cannot be read
+ */
+ private static byte[] readChecksumTypeBytes(LockableRandomAccessBuffer raf, long length)
+ throws IOException {
+ byte[] checksumTypeBuf = new byte[2];
+ raf.pread(length - 14, checksumTypeBuf, 0, 2);
+ return checksumTypeBuf;
+ }
+
+ /**
+ * Validates that the checksum type matches the supported checksum implementation.
+ *
+ * @param checksumTypeBuf two-byte buffer containing the serialized checksum type
+ * @throws IOException when the checksum type bytes cannot be read
+ * @throws StorageFormatException when the checksum type is not recognized
+ */
+ private static void validateChecksumType(byte[] checksumTypeBuf)
+ throws IOException, StorageFormatException {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(checksumTypeBuf));
+ int checksumType = dis.readShort();
+ if (checksumType != ChecksumChecker.CHECKSUM_CRC)
+ throw new StorageFormatException("Unknown checksum type " + checksumType);
+ }
+
+ /**
+ * Reads the flags field stored before the checksum type and version information.
+ *
+ * @param raf backing buffer containing the persisted splitfile storage
+ * @param length total length of the backing buffer in bytes
+ * @return four-byte buffer containing the serialized flags value
+ * @throws IOException when the flags bytes cannot be read
+ */
+ private static byte[] readFlagsBytes(LockableRandomAccessBuffer raf, long length)
+ throws IOException {
+ byte[] flagsBuf = new byte[4];
+ raf.pread(length - 18, flagsBuf, 0, 4);
+ return flagsBuf;
+ }
+
+ /**
+ * Validates the flags field for unknown bits or unsupported settings.
+ *
+ * @param flagsBuf four-byte buffer containing the serialized flags value
+ * @throws IOException when the flags buffer cannot be read
+ * @throws StorageFormatException when the flags contain unsupported values
+ */
+ private static void validateFlags(byte[] flagsBuf) throws IOException, StorageFormatException {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(flagsBuf));
+ int flags = dis.readInt();
+ if (flags != 0) throw new StorageFormatException("Unknown flags: " + flags);
+ }
+
+ /**
+ * Locates and validates the basic settings block near the end of the buffer.
+ *
+ * The method reads the check-summed length header, verifies its checksum, and enforces bounds
+ * to prevent unreasonable settings lengths. It then computes the offset where the settings block
+ * begins, returning both the offset and the length for later reads.
+ *
+ * @param raf backing buffer containing the persisted splitfile storage
+ * @param length total length of the backing buffer in bytes
+ * @param checksumChecker checker used to validate the basic settings length checksum
+ * @param checksumLength checksum length in bytes used by the storage format
+ * @param flagsBuf buffer containing the serialized flags value
+ * @param checksumTypeBuf buffer containing the serialized checksum type
+ * @param versionBuf buffer containing the serialized storage version
+ * @return offset and length describing the basic settings payload
+ * @throws IOException when buffer reads fail
+ * @throws StorageFormatException when the settings length header is invalid
+ */
+ private static BasicSettingsInfo readBasicSettingsLocation(
+ LockableRandomAccessBuffer raf,
+ long length,
+ ChecksumChecker checksumChecker,
+ int checksumLength,
+ byte[] flagsBuf,
+ byte[] checksumTypeBuf,
+ byte[] versionBuf)
+ throws IOException, StorageFormatException {
+ byte[] buf = new byte[14];
+ raf.pread(length - (22 + checksumLength), buf, 0, 4);
+ byte[] checksum = new byte[checksumLength];
+ raf.pread(length - (18 + checksumLength), checksum, 0, checksumLength);
+ System.arraycopy(flagsBuf, 0, buf, 4, 4);
+ System.arraycopy(checksumTypeBuf, 0, buf, 8, 2);
+ System.arraycopy(versionBuf, 0, buf, 10, 4);
+ if (!checksumChecker.checkChecksum(buf, 0, 14, checksum))
+ throw new StorageFormatException("Checksum failed on basic settings length and version");
+ int basicSettingsLength = parseInt(buf);
+ if (basicSettingsLength < 0
+ || basicSettingsLength + 12 + 4 + checksumLength > raf.size()
+ || basicSettingsLength > 1024 * 1024)
+ throw new StorageFormatException("Bad basic settings length");
+ long basicSettingsOffset = length - (18 + 4 + checksumLength * 2L + basicSettingsLength);
+ return new BasicSettingsInfo(basicSettingsOffset, basicSettingsLength);
+ }
+
+ /**
+ * Reads a check-summed byte range and converts checksum failures to storage format errors.
+ *
+ * @param raf backing buffer containing the persisted splitfile storage
+ * @param checksumChecker checker used to validate the checksum for the range
+ * @param checksumLength checksum length in bytes used by the storage format
+ * @param offset absolute offset where the data payload begins
+ * @param length number of bytes to read into the buffer
+ * @return byte array containing the validated payload
+ * @throws StorageFormatException when checksum validation fails
+ * @throws IOException when the buffer cannot be read
+ */
+ private static byte[] readChecksummed(
+ LockableRandomAccessBuffer raf,
+ ChecksumChecker checksumChecker,
+ int checksumLength,
+ long offset,
+ int length)
+ throws StorageFormatException, IOException {
+ byte[] basicSettingsBuffer = new byte[length];
+ try {
+ preadChecksummed(raf, checksumChecker, checksumLength, offset, basicSettingsBuffer, length);
+ } catch (ChecksumFailedException _) {
+ throw new StorageFormatException("Basic settings checksum invalid");
+ }
+ return basicSettingsBuffer;
+ }
+
+ /**
+ * Reads a range and validates the trailing checksum, zeroing the buffer on failure.
+ *
+ * The method reads {@code length} bytes at {@code fileOffset} followed by the checksum
+ * trailer. When the checksum does not match, the buffer is zeroed to prevent reuse of invalid
+ * data and a {@link ChecksumFailedException} is thrown.
+ *
+ * @param raf backing buffer containing the persisted splitfile storage
+ * @param checksumChecker checker used to validate the checksum for the range
+ * @param checksumLength checksum length in bytes used by the storage format
+ * @param fileOffset absolute offset where the data payload begins
+ * @param buf destination buffer to fill with the payload contents
+ * @param length number of bytes to read into the buffer
+ * @throws IOException when the buffer cannot be read
+ * @throws ChecksumFailedException when the checksum validation fails
+ */
+ private static void preadChecksummed(
+ LockableRandomAccessBuffer raf,
+ ChecksumChecker checksumChecker,
+ int checksumLength,
+ long fileOffset,
+ byte[] buf,
+ int length)
+ throws IOException, ChecksumFailedException {
+ byte[] checksumBuf = new byte[checksumLength];
+ raf.pread(fileOffset, buf, 0, length);
+ raf.pread(fileOffset + length, checksumBuf, 0, checksumLength);
+ if (!checksumChecker.checkChecksum(buf, 0, length, checksumBuf)) {
+ for (int i = 0; i < length; i++) {
+ buf[i] = 0;
+ }
+ throw new ChecksumFailedException();
+ }
+ }
+
+ /**
+ * Holds the offset and length for the basic settings payload near the file footer.
+ *
+ * @param offset absolute offset where the basic settings block begins
+ * @param length length in bytes of the basic settings payload
+ */
+ private record BasicSettingsInfo(long offset, int length) {}
+}
From 48c036ce7777388f2390e7154ad7e1b37658dcf8 Mon Sep 17 00:00:00 2001
From: Leumor <116955025+leumor@users.noreply.github.com>
Date: Sat, 17 Jan 2026 19:22:20 +0000
Subject: [PATCH 2/2] chore(gitignore): ignore opencode plans
Exclude local opencode plans directory from version control.
---
.gitignore | 1 +
1 file changed, 1 insertion(+)
diff --git a/.gitignore b/.gitignore
index 2ce4545e3c..268aa1b984 100644
--- a/.gitignore
+++ b/.gitignore
@@ -44,6 +44,7 @@ qodana*
/.claude
/.gemini
/.playwright-mcp
+/.opencode/plans
.kotlin
.jreleaser
multipass-client-certificate
+ *
+ */
+final class SplitFileFetcherStorageLayout {
+ /** Prevents instantiation; this type is a static utility holder for layout computations. */
+ private SplitFileFetcherStorageLayout() {}
+
+ /**
+ * Immutable totals derived from segment keys and storage configuration inputs.
+ *
+ * {@code
+ * SplitFileFetcherStorageLayout.AccumulatedSizes sizes =
+ * SplitFileFetcherStorageLayout.accumulateSizes(keys, crossChecks, singleKey, checksum, -1,
+ * persistent);
+ * }
+ *
+ * @param segmentKeys non-null array of segment key descriptors to aggregate
+ * @param crossCheckBlocks non-negative cross-check blocks per segment to subtract from data
+ * @param hasSplitfileSingleCryptoKey true when all segments share one crypto key
+ * @param checksumLength checksum length in bytes used for persisted segment metadata
+ * @param maxRetries maximum retry count, or {@code -1} when unbounded or unset
+ * @param persistent true when lengths target persistent storage rather than transient buffers
+ * @return aggregate sizes for blocks, key storage, and segment status storage
+ */
+ static AccumulatedSizes accumulateSizes(
+ SplitFileSegmentKeys[] segmentKeys,
+ int crossCheckBlocks,
+ boolean hasSplitfileSingleCryptoKey,
+ int checksumLength,
+ int maxRetries,
+ boolean persistent) {
+ int splitfileDataBlocks = 0;
+ int splitfileCheckBlocks = 0;
+ long storedKeysLength = 0;
+ long storedSegmentStatusLength = 0;
+ for (SplitFileSegmentKeys keys : segmentKeys) {
+ int dataBlocks = keys.getDataBlocks();
+ int checkBlocks = keys.getCheckBlocks();
+ splitfileDataBlocks += dataBlocks;
+ splitfileCheckBlocks += checkBlocks;
+ storedKeysLength +=
+ SplitFileFetcherSegmentStorage.storedKeysLength(
+ dataBlocks, checkBlocks, hasSplitfileSingleCryptoKey, checksumLength);
+ storedSegmentStatusLength +=
+ SplitFileFetcherSegmentStorage.paddedStoredSegmentStatusLength(
+ dataBlocks - crossCheckBlocks,
+ checkBlocks,
+ crossCheckBlocks,
+ maxRetries != -1,
+ checksumLength,
+ persistent);
+ }
+ // Subtract cross-check blocks from data blocks to get the actual data blocks.
+ splitfileDataBlocks -= segmentKeys.length * crossCheckBlocks;
+ return new AccumulatedSizes(
+ splitfileDataBlocks, splitfileCheckBlocks, storedKeysLength, storedSegmentStatusLength);
+ }
+
+ /**
+ * Validates that the stored check length is not materially larger than the final length.
+ *
+ *
+ *
+ */
+final class SplitFileFetcherStoragePersistenceWriter {
+ /** Prevents instantiation; this type exists only to host static write helpers. */
+ private SplitFileFetcherStoragePersistenceWriter() {}
+
+ /**
+ * Writes the initial persistent records and metadata for a splitfile fetcher.
+ *
+ * {@code
+ * SplitFileFetcherStoragePersistenceWriter.writeToRaf(
+ * storage, keys, prepared, encodedSettings, progress, totalLength);
+ * }
+ *
+ * @param storage initialized fetcher storage providing RAF offsets and locking
+ * @param segmentKeys segment key array aligned to {@code storage.segments} ordering
+ * @param prepared precomputed persistent metadata to write into the footer
+ * @param encodedBasicSettings encoded basic settings payload, already checksummed
+ * @param generalProgress encoded progress bytes for general fetch state
+ * @param totalLength total splitfile length in bytes, used for metadata integrity
+ * @throws IOException when random-access writes or serialization to the RAF fail
+ */
+ static void writeToRaf(
+ SplitFileFetcherStorage storage,
+ SplitFileSegmentKeys[] segmentKeys,
+ SplitFileFetcherStoragePersistence.PreparedMetadata prepared,
+ byte[] encodedBasicSettings,
+ byte[] generalProgress,
+ long totalLength)
+ throws IOException {
+ try (var _ = storage.autoLockOpen()) {
+ for (int i = 0; i < storage.segments.length; i++) {
+ SplitFileFetcherSegmentStorage segment = storage.segments[i];
+ segment.writeKeysWithChecksum(segmentKeys[i]);
+ }
+ if (storage.persistent) {
+ for (SplitFileFetcherSegmentStorage segment : storage.segments) segment.writeMetadata();
+ storage
+ .getRAF()
+ .pwrite(storage.offsetGeneralProgress, generalProgress, 0, generalProgress.length);
+ storage.keyListener.innerWriteMainBloomFilter(storage.offsetMainBloomFilter);
+ storage.keyListener.initialWriteSegmentBloomFilters(storage.offsetSegmentBloomFilters);
+ SplitFileFetcherStoragePersistence.writePersistentMetadata(
+ storage.getRAF(),
+ storage.offsetOriginalMetadata,
+ prepared,
+ encodedBasicSettings,
+ storage.checksumChecker,
+ totalLength);
+ }
+ }
+ }
+}
diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcherStorageRecovery.java b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageRecovery.java
new file mode 100644
index 0000000000..6b76162e4f
--- /dev/null
+++ b/src/main/java/network/crypta/client/async/SplitFileFetcherStorageRecovery.java
@@ -0,0 +1,307 @@
+package network.crypta.client.async;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import network.crypta.client.FailureCodeTracker;
+import network.crypta.client.FetchException;
+import network.crypta.client.FetchException.FetchExceptionMode;
+import network.crypta.crypt.ChecksumFailedException;
+import network.crypta.support.io.StorageFormatException;
+import org.slf4j.Logger;
+
+/**
+ * Coordinates resume-time recovery steps for splitfile fetcher storage.
+ *
+ *
+ *
+ */
+final class SplitFileFetcherStorageRecovery {
+ /**
+ * Backing storage whose metadata and progress state are being recovered.
+ *
+ *
+ *
+ */
+final class SplitFileFetcherStorageResumeReader {
+ /** Prevents instantiation; all operations are exposed as static helpers. */
+ private SplitFileFetcherStorageResumeReader() {}
+
+ /**
+ * Reads persisted settings from the footer and validates versioning metadata.
+ *
+ * {@code
+ * ParsedBasicSettings settings =
+ * SplitFileFetcherStorageResumeReader.readParsedSettings(
+ * raf, checker, checksumLength, raf.size(), completeViaTruncation);
+ * }
+ *
+ * @param raf backing buffer containing the persisted splitfile storage
+ * @param checksumChecker checker used to validate the footer checksums and lengths
+ * @param checksumLength checksum length in bytes used by the storage format
+ * @param rafLength total length of the backing buffer in bytes
+ * @param completeViaTruncation true when truncation represents completion
+ * @return parsed settings derived from the persisted footer and basic settings
+ * @throws IOException when the underlying buffer cannot be read
+ * @throws StorageFormatException when the footer metadata or checksums are invalid
+ */
+ static ParsedBasicSettings readParsedSettings(
+ LockableRandomAccessBuffer raf,
+ ChecksumChecker checksumChecker,
+ int checksumLength,
+ long rafLength,
+ boolean completeViaTruncation)
+ throws IOException, StorageFormatException {
+
+ ensureMinLength(rafLength);
+ validateMagic(raf, rafLength);
+ byte[] versionBuf = readVersionBytes(raf, rafLength);
+ int parsedVersion = parseInt(versionBuf);
+ if (parsedVersion != SplitFileFetcherStorage.VERSION)
+ throw new StorageFormatException("Wrong version " + parsedVersion);
+ byte[] checksumTypeBuf = readChecksumTypeBytes(raf, rafLength);
+ validateChecksumType(checksumTypeBuf);
+ byte[] flagsBuf = readFlagsBytes(raf, rafLength);
+ validateFlags(flagsBuf);
+
+ BasicSettingsInfo basicInfo =
+ readBasicSettingsLocation(
+ raf, rafLength, checksumChecker, checksumLength, flagsBuf, checksumTypeBuf, versionBuf);
+ byte[] basicSettingsBuffer =
+ readChecksummed(raf, checksumChecker, checksumLength, basicInfo.offset, basicInfo.length);
+ return SplitFileFetcherStorageSettingsCodec.parseBasicSettings(
+ basicSettingsBuffer, basicInfo.offset, completeViaTruncation, rafLength);
+ }
+
+ /**
+ * Verifies the backing buffer is long enough to contain a footer magic value.
+ *
+ *