Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ _watcher.EnableRaisingEvents = true;
| No incremental change notification | FileSystemWatcher triggers full scan | Planned: `NotifyLocalChangeAsync()` |
| No virtual file awareness | Can't track placeholder vs downloaded | Planned: `VirtualFileCallback` |
| Single-threaded engine | One sync at a time per instance | By design - create separate instances if needed |
| No bandwidth throttling | Can saturate network | Planned: `SyncOptions.MaxBytesPerSecond` |
| ~~No bandwidth throttling~~ | ~~Can saturate network~~ | `SyncOptions.MaxBytesPerSecond` IMPLEMENTED |
| OCIS TUS not implemented | Falls back to generic upload | Planned for v1.0 |

### Required SharpSync API Additions (v1.0)
Expand All @@ -308,7 +308,7 @@ These APIs are required for v1.0 release to support Nimbus desktop client:
4. OCIS TUS protocol implementation (`WebDavStorage.cs:547` currently falls back)

**Sync Control:**
5. `SyncOptions.MaxBytesPerSecond` - Built-in bandwidth throttling
5. ~~`SyncOptions.MaxBytesPerSecond`~~ ✅ Built-in bandwidth throttling (IMPLEMENTED)
6. `PauseAsync()` / `ResumeAsync()` - Pause and resume long-running syncs
7. `GetPendingOperationsAsync()` - Inspect sync queue for UI display

Expand Down Expand Up @@ -500,7 +500,7 @@ Desktop Client APIs (for Nimbus):
- [ ] `SyncFilesAsync(IEnumerable<string> paths)` - Sync specific files on demand
- [ ] `NotifyLocalChangeAsync(string path, ChangeType type)` - Accept FileSystemWatcher events for incremental sync
- [ ] OCIS TUS protocol implementation (currently falls back to generic upload at `WebDavStorage.cs:547`)
- [ ] `SyncOptions.MaxBytesPerSecond` - Built-in bandwidth throttling
- [x] `SyncOptions.MaxBytesPerSecond` - Built-in bandwidth throttling
- [ ] `PauseAsync()` / `ResumeAsync()` - Pause and resume long-running syncs
- [ ] `GetPendingOperationsAsync()` - Inspect sync queue for UI display
- [ ] Per-file progress events (currently only per-sync-operation)
Expand Down
17 changes: 16 additions & 1 deletion src/SharpSync/Core/SyncOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ public class SyncOptions {
/// </summary>
public List<string> ExcludePatterns { get; set; } = new List<string>();

/// <summary>
/// Gets or sets the maximum transfer rate in bytes per second.
/// Set to 0 or null for unlimited bandwidth.
/// </summary>
/// <remarks>
/// This setting applies to both upload and download operations.
/// Useful for preventing network saturation on shared connections.
/// Example values:
/// - 1_048_576 (1 MB/s)
/// - 10_485_760 (10 MB/s)
/// - 104_857_600 (100 MB/s)
/// </remarks>
public long? MaxBytesPerSecond { get; set; }

/// <summary>
/// Creates a copy of the sync options
/// </summary>
Expand All @@ -81,7 +95,8 @@ public SyncOptions Clone() {
UpdateExisting = UpdateExisting,
ConflictResolution = ConflictResolution,
TimeoutSeconds = TimeoutSeconds,
ExcludePatterns = new List<string>(ExcludePatterns)
ExcludePatterns = new List<string>(ExcludePatterns),
MaxBytesPerSecond = MaxBytesPerSecond
};
}
}
Expand Down
156 changes: 156 additions & 0 deletions src/SharpSync/Storage/ThrottledStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using System.Diagnostics;

namespace Oire.SharpSync.Storage;

/// <summary>
/// Stream wrapper that throttles read and write operations to limit bandwidth usage.
/// Uses a token bucket algorithm for smooth rate limiting.
/// </summary>
internal sealed class ThrottledStream: Stream {
private readonly Stream _innerStream;
private readonly long _maxBytesPerSecond;
private readonly Stopwatch _stopwatch;
private long _totalBytesTransferred;
private readonly object _lock = new();

/// <summary>
/// Creates a new ThrottledStream wrapping the specified stream.
/// </summary>
/// <param name="innerStream">The stream to wrap.</param>
/// <param name="maxBytesPerSecond">Maximum bytes per second (must be positive).</param>
/// <exception cref="ArgumentNullException">Thrown when innerStream is null.</exception>
/// <exception cref="ArgumentOutOfRangeException">Thrown when maxBytesPerSecond is not positive.</exception>
public ThrottledStream(Stream innerStream, long maxBytesPerSecond) {
_innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));

if (maxBytesPerSecond <= 0) {
throw new ArgumentOutOfRangeException(nameof(maxBytesPerSecond), "Max bytes per second must be positive.");
}

_maxBytesPerSecond = maxBytesPerSecond;
_stopwatch = Stopwatch.StartNew();
_totalBytesTransferred = 0;
}

public override bool CanRead => _innerStream.CanRead;
public override bool CanSeek => _innerStream.CanSeek;
public override bool CanWrite => _innerStream.CanWrite;
public override long Length => _innerStream.Length;

public override long Position {
get => _innerStream.Position;
set => _innerStream.Position = value;
}

public override void Flush() => _innerStream.Flush();

public override Task FlushAsync(CancellationToken cancellationToken) =>
_innerStream.FlushAsync(cancellationToken);

public override int Read(byte[] buffer, int offset, int count) {
ThrottleSync(count);
var bytesRead = _innerStream.Read(buffer, offset, count);
RecordBytesTransferred(bytesRead);
return bytesRead;
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
await ThrottleAsync(count, cancellationToken);
var bytesRead = await _innerStream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
RecordBytesTransferred(bytesRead);
return bytesRead;
}

public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) {
await ThrottleAsync(buffer.Length, cancellationToken);
var bytesRead = await _innerStream.ReadAsync(buffer, cancellationToken);
RecordBytesTransferred(bytesRead);
return bytesRead;
}

public override void Write(byte[] buffer, int offset, int count) {
ThrottleSync(count);
_innerStream.Write(buffer, offset, count);
RecordBytesTransferred(count);
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
await ThrottleAsync(count, cancellationToken);
await _innerStream.WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
RecordBytesTransferred(count);
}

public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) {
await ThrottleAsync(buffer.Length, cancellationToken);
await _innerStream.WriteAsync(buffer, cancellationToken);
RecordBytesTransferred(buffer.Length);
}

public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin);

public override void SetLength(long value) => _innerStream.SetLength(value);

/// <summary>
/// Synchronously waits if the transfer rate would exceed the limit.
/// </summary>
private void ThrottleSync(int requestedBytes) {
var delay = CalculateDelay(requestedBytes);
if (delay > TimeSpan.Zero) {
Thread.Sleep(delay);
}
}

/// <summary>
/// Asynchronously waits if the transfer rate would exceed the limit.
/// </summary>
private async Task ThrottleAsync(int requestedBytes, CancellationToken cancellationToken) {
var delay = CalculateDelay(requestedBytes);
if (delay > TimeSpan.Zero) {
await Task.Delay(delay, cancellationToken);
}
}

/// <summary>
/// Calculates the delay needed to maintain the target transfer rate.
/// </summary>
private TimeSpan CalculateDelay(int requestedBytes) {
lock (_lock) {
var elapsedSeconds = _stopwatch.Elapsed.TotalSeconds;
if (elapsedSeconds <= 0) {
return TimeSpan.Zero;
}

// Calculate the expected time for the bytes already transferred plus the new bytes
var expectedBytes = _totalBytesTransferred + requestedBytes;
var expectedTimeSeconds = (double)expectedBytes / _maxBytesPerSecond;

// If we're ahead of schedule, delay
if (expectedTimeSeconds > elapsedSeconds) {
var delaySeconds = expectedTimeSeconds - elapsedSeconds;
// Cap delay to a reasonable maximum to prevent extremely long waits
delaySeconds = Math.Min(delaySeconds, 5.0);
return TimeSpan.FromSeconds(delaySeconds);
}

return TimeSpan.Zero;
}
}

/// <summary>
/// Records that bytes have been transferred.
/// </summary>
private void RecordBytesTransferred(int bytes) {
if (bytes > 0) {
lock (_lock) {
_totalBytesTransferred += bytes;
}
}
}

protected override void Dispose(bool disposing) {
if (disposing) {
_innerStream?.Dispose();
}
base.Dispose(disposing);
}
}
23 changes: 21 additions & 2 deletions src/SharpSync/Sync/SyncEngine.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using Oire.SharpSync.Core;
using Oire.SharpSync.Storage;

namespace Oire.SharpSync.Sync;

Expand All @@ -24,6 +25,7 @@ public class SyncEngine: ISyncEngine {
private bool _disposed;
private readonly SemaphoreSlim _syncSemaphore;
private CancellationTokenSource? _currentSyncCts;
private long? _currentMaxBytesPerSecond;

/// <summary>
/// Gets whether the engine is currently synchronizing
Expand Down Expand Up @@ -113,6 +115,7 @@ public async Task<SyncResult> SynchronizeAsync(SyncOptions? options = null, Canc
try {
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_currentSyncCts = linkedCts;
_currentMaxBytesPerSecond = options?.MaxBytesPerSecond;
var syncToken = linkedCts.Token;
var result = new SyncResult();
var sw = Stopwatch.StartNew();
Expand Down Expand Up @@ -157,6 +160,7 @@ public async Task<SyncResult> SynchronizeAsync(SyncOptions? options = null, Canc
return result;
} finally {
_currentSyncCts = null;
_currentMaxBytesPerSecond = null;
_syncSemaphore.Release();
}
}
Expand Down Expand Up @@ -830,7 +834,8 @@ private async Task DownloadFileAsync(SyncAction action, ThreadSafeSyncResult res
await _localStorage.CreateDirectoryAsync(action.Path, cancellationToken);
} else {
using var remoteStream = await _remoteStorage.ReadFileAsync(action.Path, cancellationToken);
await _localStorage.WriteFileAsync(action.Path, remoteStream, cancellationToken);
var streamToRead = WrapWithThrottling(remoteStream);
await _localStorage.WriteFileAsync(action.Path, streamToRead, cancellationToken);
}

result.IncrementFilesSynchronized();
Expand All @@ -841,7 +846,8 @@ private async Task UploadFileAsync(SyncAction action, ThreadSafeSyncResult resul
await _remoteStorage.CreateDirectoryAsync(action.Path, cancellationToken);
} else {
using var localStream = await _localStorage.ReadFileAsync(action.Path, cancellationToken);
await _remoteStorage.WriteFileAsync(action.Path, localStream, cancellationToken);
var streamToRead = WrapWithThrottling(localStream);
await _remoteStorage.WriteFileAsync(action.Path, streamToRead, cancellationToken);
}

result.IncrementFilesSynchronized();
Expand Down Expand Up @@ -1091,6 +1097,19 @@ private async Task UpdateDatabaseStateAsync(ChangeSet changes, CancellationToken
_ => SyncOperation.Unknown
};

/// <summary>
/// Wraps the provided stream with a throttled stream if bandwidth limiting is enabled.
/// </summary>
/// <param name="stream">The stream to potentially wrap.</param>
/// <returns>The original stream or a throttled wrapper.</returns>
private Stream WrapWithThrottling(Stream stream) {
if (_currentMaxBytesPerSecond.HasValue && _currentMaxBytesPerSecond.Value > 0) {
return new ThrottledStream(stream, _currentMaxBytesPerSecond.Value);
}

return stream;
}

private void RaiseProgress(SyncProgress progress, SyncOperation operation) {
ProgressChanged?.Invoke(this, new SyncProgressEventArgs(progress, progress.CurrentItem, operation));
}
Expand Down
Loading
Loading