Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 0 additions & 60 deletions AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class AsyncStreamBroadcaster<Element> {
/// 구독할 continuation 값들
private var continuations: [UUID: AsyncStream<Element>.Continuation] = [:]

public init() {}

/// 구독 메서드로 stream을 반환
/// - Returns: stream 반환
public func stream() -> AsyncStream<Element> {
Expand Down
173 changes: 89 additions & 84 deletions AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
import Foundation
import AsyncAlgorithms

public final class WebSocketClient: NSObject {
public class WebSocketClient: NSObject, WebSocketProvider {
/// 소켓 상태 채널
private var stateStream: AsyncStream<WebSocket.State>
/// WebSocket의 상태 변화를 여러 Consumer에게 동시에 전달하는 브로드캐스터
public var stateBroadCaster: AsyncStreamBroadcaster<WebSocket.State> = .init()
public var stateBroadCaster: AsyncStreamBroadcaster<WebSocket.State>
/// 메세지 채널
public var incomingChannel: AsyncChannel<URLSessionWebSocketTask.Message>

private let url: URL
private let session: URLSession
private var task: URLSessionWebSocketTask?
private(set) var url: URL
private(set) var session: URLSessionType
private(set) var task: WebSocketType?

private var stateTask: Task<Void, Error>?
private var receiveTask: Task<Void, Error>?
private(set) var stateTask: Task<Void, Error>?
private(set) var receiveTask: Task<Void, Error>?

/// 핑 전송 task
private var healthCheck: Task<Void, Error>?
private(set) var healthCheck: Task<Void, Error>?
private var pingInterval: Duration = .seconds(30)
private var pingTimeout: Duration = .seconds(10)
private var attempts: Int = 0

public init(url: URL, session: URLSession = .shared) {
public init(
url: URL,
session: URLSessionType = URLSession.shared,
stateBroadCaster: AsyncStreamBroadcaster<WebSocket.State> = .init()
) {
self.url = url
self.session = session
self.stateBroadCaster = stateBroadCaster

stateStream = stateBroadCaster.stream()
incomingChannel = AsyncChannel<URLSessionWebSocketTask.Message>()
Expand All @@ -35,12 +41,9 @@ public final class WebSocketClient: NSObject {
/// 웹소켓 세션을 연결하고 작업을 생성합니다.
public func connect() async {
await stateBroadCaster.send(.connecting)
self.task = session.webSocketTask(with: url)
self.task = session.makeWebSocketTask(with: url)
task?.delegate = self
task?.resume()

// 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음
try? await performWithTimeout(sendPing, at: pingTimeout)
}

/// 명시적으로 현재 WebSocket 연결을 정상적으로 종료합니다.
Expand All @@ -52,73 +55,33 @@ public final class WebSocketClient: NSObject {

/// 텍스트 형태의 메시지를 WebSocket 서버로 전송합니다.
public func send(text: String) async throws {
try await task?.send(.string(text))
guard let task else { throw URLError(.notConnectedToInternet) }
try await task.send(.string(text))
}

/// 바이너리(Data) 형태의 메시지를 WebSocket 서버로 전송합니다.
public func send(data: Data) async throws {
try await task?.send(.data(data))
}

deinit {
debugPrint(String(describing: Self.self), #function)
task?.cancel()
task = nil
stateBroadCaster.finish()
incomingChannel.finish()
}
}

// MARK: - Test용 메소드
// TODO: Deprecated 예정입니다.
extension WebSocketClient {
public func sendState(with state: WebSocket.State) async {
await stateBroadCaster.send(state)
}

public func cancel(with code: URLSessionWebSocketTask.CloseCode) {
task?.cancel(with: code, reason: nil)
task = nil
}

public func cancel() {
task?.cancel()
task = nil
guard let task else { throw URLError(.notConnectedToInternet) }
try await task.send(.data(data))
}
}

// MARK: - Private
extension WebSocketClient {
/// 서버로 Ping 프레임을 전송하여 연결 상태를 확인합니다.
private func sendPing() async throws {
return try await withCheckedThrowingContinuation { continuation in
task?.sendPing { error in
Task {
if let error {
debugPrint("Ping Failed: \(error)")
continuation.resume(throwing: error)
return
}

continuation.resume()
}
}
}
}

/// WebSocket의 상태 변화를 관찰하고 각 상태에 맞는 동작을 수행합니다.
private func observeState() {
stateTask = Task {
for await state in stateStream {
for await state in stateStream.removeDuplicates() {
switch state {
case .connecting:
debugPrint("Connecting")
continue
case .connected:
debugPrint("Connected")
clearAttempts()
receive()
checkingAlive()
case .failed, .closed:
case .closed:
debugPrint("Closed")
release()
case .reconnecting:
Expand All @@ -129,37 +92,73 @@ extension WebSocketClient {
}
}

// FIXME: 개선이 필요한지 한 번 더 생각해보기
/// 서버로부터 WebSocket 메시지를 지속적으로 수신합니다.
private func receive() {
receiveTask?.cancel()

receiveTask = Task {
do {
guard let task else { return }
while true {
guard let task else { throw CancellationError() }
let message = try await task.receive()
await incomingChannel.send(message)
receive()
} catch {
print("종료되어 더 이상 웹소켓 데이터를 받지 않습니다.")
}
}
}

/// 주기적으로 Ping을 전송하여 WebSocket 연결 상태를 점검합니다.
private func checkingAlive() {
healthCheck?.cancel()

healthCheck = Task {
do {
while true {
try await performWithTimeout(sendPing, at: pingTimeout)
try await Task.sleep(until: .now + pingInterval)
try await performWithTimeout(sendPing, at: .seconds(10))
}
} catch is CancellationError {
debugPrint("작업이 취소되었습니다.")
} catch {
await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2)))
if handlePingError(error) {
if task?.state == .running {
task?.cancel()
}
}
}
}
}

/// sendPing(:) 으로부터 받은 에러를 핸들링하는 메소드입니다.
/// - Parameter error: 에러를 전달받습니다.
/// - Returns: 재연결해야 한다면 true를 반환합니다.
private func handlePingError(_ error: Error) -> Bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

핑에서 발생하는 에러에 대해 알아봐야겠네요

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

핑에서 발생하는 에러가 엄청 많아서 재연결할만한 에러를 구분하는게 쉽지 않아보이더라구요🥲

if let urlError = error as? URLError {
switch urlError.code { // URLError (네트워크 단절)
case .notConnectedToInternet, .networkConnectionLost:
return true
default:
return false
}
} else if let posixError = error as? POSIXError {
switch posixError.code { // POSIXError (소켓이 죽음)
case .EPIPE, .ECONNRESET:
return true
default:
return false
}
} else { // 소켓이 정상상태가 아님.
return true
}
}

/// 서버로 Ping 프레임을 전송하여 연결 상태를 확인합니다.
private func sendPing() async throws {
return try await withCheckedThrowingContinuation { continuation in
task?.sendPing { error in
Task {
if let error {
debugPrint("Ping Failed: \(error)")
continuation.resume(throwing: error)
return
}

continuation.resume()
}
}
}
}
Expand All @@ -169,17 +168,28 @@ extension WebSocketClient {
if userClose {
await stateBroadCaster.send(.closed)
} else {
await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2)))
await stateBroadCaster.send(.reconnecting)
}
}

private func clearAttempts() {
attempts = 0
}

/// WebSocket 재연결시에 백오프를 적용합니다.
/// - Returns: 백오프하는 시간을 Int 타입으로 반환합니다.
private func backoff() -> Int {
attempts += 1
let base = min(pow(2.0, Double(attempts)) * 100.0, 10000)
let jitter = Double.random(in: 0.5...1.0)

return Int(base * jitter)
}

/// WebSocket 재연결을 시도합니다.
private func reconnect() async {
guard task?.state != .running else {
return
}

try? await Task.sleep(for: .seconds(2))
if task?.state == .running || attempts > 10 { return }
try? await Task.sleep(for: .milliseconds(backoff()))
await connect()
}

Expand All @@ -189,11 +199,6 @@ extension WebSocketClient {
receiveTask = nil
healthCheck?.cancel()
healthCheck = nil

if task?.state == .running {
task?.cancel(with: .goingAway, reason: nil)
}

task = nil
}
}
Expand All @@ -213,7 +218,7 @@ extension WebSocketClient: URLSessionWebSocketDelegate {
// 1. 네트워크 닫힘, 2. 에러로 종료, 3. 정상적으로 완료
public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) {
if let _ = error {
Task { await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) }
Task { await stateBroadCaster.send(.reconnecting) }
}
}
}
Expand Down
Loading