data;
while ((data = this.receiveBuffer.poll()) != null) {
- String event = (String)data.get(0);
- super.emit(event, data.toArray());
+ onevent(data);
}
this.receiveBuffer.clear();
@@ -437,6 +478,7 @@ private void ondisconnect() {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format("server disconnect (%s)", this.nsp));
}
+ recovered = false;
this.destroy();
this.onclose("io server disconnect");
}
@@ -498,7 +540,7 @@ public boolean connected() {
/**
* A property on the socket instance that is equal to the underlying engine.io socket id.
- *
+ *
* The value is present once the socket has connected, is removed when the socket disconnects and is updated if the socket reconnects.
*
* @return a socket id
@@ -566,5 +608,13 @@ public Socket offAnyOutgoing(Listener fn) {
}
return this;
}
+
+ public String getLastOffset() {
+ return this._lastOffset;
+ }
+
+ public boolean isRecovered() {
+ return recovered;
+ }
}
diff --git a/src/main/java/io/socket/client/SocketOptionBuilder.java b/src/main/java/io/socket/client/SocketOptionBuilder.java
index ef24bf83..7a061241 100644
--- a/src/main/java/io/socket/client/SocketOptionBuilder.java
+++ b/src/main/java/io/socket/client/SocketOptionBuilder.java
@@ -1,5 +1,7 @@
package io.socket.client;
+import org.json.JSONObject;
+
import java.util.List;
import java.util.Map;
@@ -174,7 +176,7 @@ public SocketOptionBuilder setPath(String path) {
return this;
}
- public SocketOptionBuilder setAuth(Map auth) {
+ public SocketOptionBuilder setAuth(JSONObject auth) {
this.options.auth = auth;
return this;
}
diff --git a/src/test/java/io/socket/client/Connection.java b/src/test/java/io/socket/client/Connection.java
index 9f3a533e..5e185dce 100644
--- a/src/test/java/io/socket/client/Connection.java
+++ b/src/test/java/io/socket/client/Connection.java
@@ -16,63 +16,96 @@ public abstract class Connection {
private static final Logger logger = Logger.getLogger(Connection.class.getName());
- final static int TIMEOUT = 7000;
+ final static int TIMEOUT = 7_000;
final static int PORT = 3000;
+ final static int NO_RECOVERY_PORT = 3001;
private Process serverProcess;
+ private Process noRecoveryServerProcess;
private ExecutorService serverService;
- private Future serverOutput;
- private Future serverError;
+ private Future> serverOutput;
+ private Future> serverError;
+ private Future> noRecoveryServerOutput;
+ private Future> noRecoveryServerError;
@Before
public void startServer() throws IOException, InterruptedException {
- logger.fine("Starting server ...");
+ logger.fine("Starting servers...");
+ // Start main server
final CountDownLatch latch = new CountDownLatch(1);
- serverProcess = Runtime.getRuntime().exec(
- String.format("node src/test/resources/server.js %s", nsp()), createEnv());
+ serverProcess = startServerProcess("node src/test/resources/server.js %s", PORT);
serverService = Executors.newCachedThreadPool();
- serverOutput = serverService.submit(new Runnable() {
+ serverOutput = startServerOutput(serverProcess, "MAIN", latch);
+ serverError = startServerError(serverProcess, "MAIN");
+
+ // Start no-recovery server
+ final CountDownLatch noRecoveryLatch = new CountDownLatch(1);
+ noRecoveryServerProcess = startServerProcess("node src/test/resources/server_no_recovery.js %s", NO_RECOVERY_PORT);
+ noRecoveryServerOutput = startServerOutput(noRecoveryServerProcess, "NO_RECOVERY", noRecoveryLatch);
+ noRecoveryServerError = startServerError(noRecoveryServerProcess, "NO_RECOVERY");
+
+ // Wait for both servers to start
+ latch.await(3000, TimeUnit.MILLISECONDS);
+ noRecoveryLatch.await(3000, TimeUnit.MILLISECONDS);
+ }
+
+ private Process startServerProcess(String script, int port) throws IOException {
+ return Runtime.getRuntime().exec(String.format(script, nsp()), createEnv(port));
+ }
+
+ private Future> startServerOutput(final Process process, final String serverName, final CountDownLatch latch) {
+ return serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
- new InputStreamReader(serverProcess.getInputStream()));
+ new InputStreamReader(process.getInputStream()));
String line;
try {
line = reader.readLine();
latch.countDown();
do {
- logger.fine("SERVER OUT: " + line);
+ logger.fine(serverName + " SERVER OUT: " + line);
} while ((line = reader.readLine()) != null);
} catch (IOException e) {
logger.warning(e.getMessage());
}
}
});
- serverError = serverService.submit(new Runnable() {
+ }
+
+ private Future> startServerError(final Process process, final String serverName) {
+ return serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
- new InputStreamReader(serverProcess.getErrorStream()));
+ new InputStreamReader(process.getErrorStream()));
String line;
try {
while ((line = reader.readLine()) != null) {
- logger.fine("SERVER ERR: " + line);
+ logger.fine(serverName + " SERVER ERR: " + line);
}
} catch (IOException e) {
logger.warning(e.getMessage());
}
}
});
- latch.await(3000, TimeUnit.MILLISECONDS);
}
@After
public void stopServer() throws InterruptedException {
- logger.fine("Stopping server ...");
+ logger.fine("Stopping servers...");
+
+ // Stop main server
serverProcess.destroy();
serverOutput.cancel(false);
serverError.cancel(false);
+
+ // Stop no-recovery server
+ noRecoveryServerProcess.destroy();
+ noRecoveryServerOutput.cancel(false);
+ noRecoveryServerError.cancel(false);
+
serverService.shutdown();
serverService.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
@@ -90,11 +123,16 @@ Socket client(IO.Options opts) {
}
Socket client(String path, IO.Options opts) {
- return IO.socket(URI.create(uri() + path), opts);
+ int port = opts.port != -1 ? opts.port : PORT;
+ return IO.socket(URI.create(uri(port) + path), opts);
}
URI uri() {
- return URI.create("http://localhost:" + PORT);
+ return uri(PORT);
+ }
+
+ URI uri(int port) {
+ return URI.create("http://localhost:" + port);
}
String nsp() {
@@ -108,9 +146,13 @@ IO.Options createOptions() {
}
String[] createEnv() {
+ return createEnv(PORT);
+ }
+
+ String[] createEnv(int port) {
Map env = new HashMap<>(System.getenv());
env.put("DEBUG", "socket.io:*");
- env.put("PORT", String.valueOf(PORT));
+ env.put("PORT", String.valueOf(port));
String[] _env = new String[env.size()];
int i = 0;
for (String key : env.keySet()) {
@@ -118,6 +160,5 @@ String[] createEnv() {
i++;
}
return _env;
-
}
}
diff --git a/src/test/java/io/socket/client/ConnectionTest.java b/src/test/java/io/socket/client/ConnectionTest.java
index aad9f4c4..fd57b124 100644
--- a/src/test/java/io/socket/client/ConnectionTest.java
+++ b/src/test/java/io/socket/client/ConnectionTest.java
@@ -16,12 +16,13 @@
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.*;
@RunWith(JUnit4.class)
public class ConnectionTest extends Connection {
@@ -76,7 +77,6 @@ public void workWithAcks() throws InterruptedException {
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
- socket.emit("callAck");
socket.on("ack", new Emitter.Listener() {
@Override
public void call(Object... args) {
@@ -103,6 +103,7 @@ public void call(Object... args) {
}
}
});
+ socket.emit("callAck");
}
});
socket.connect();
@@ -156,14 +157,14 @@ public void call(Object... args) {
socket.on("ackBack", new Emitter.Listener() {
@Override
public void call(Object... args) {
- byte[] data = (byte[])args[0];
+ byte[] data = (byte[]) args[0];
values.offer(data);
}
});
}
});
socket.connect();
- Assert.assertArrayEquals(buf, (byte[])values.take());
+ Assert.assertArrayEquals(buf, (byte[]) values.take());
socket.close();
}
@@ -180,13 +181,13 @@ public void call(Object... objects) {
@Override
public void call(Object... args) {
- values.offer(args[0]);
+ values.offer(args[0]);
}
});
}
});
socket.connect();
- Assert.assertArrayEquals(buf, (byte[])values.take());
+ Assert.assertArrayEquals(buf, (byte[]) values.take());
socket.close();
}
@@ -207,19 +208,19 @@ public void call(Object... args) {
}
});
socket.connect();
- assertThat((Boolean)values.take(), is(false));
+ assertThat((Boolean) values.take(), is(false));
socket.close();
}
@Test(timeout = TIMEOUT)
public void receiveUTF8MultibyteCharacters() throws InterruptedException {
final BlockingQueue