Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.wisdom.api.concurrent.ManagedExecutorService;
import org.wisdom.api.content.ContentEngine;
import org.wisdom.api.content.ParameterFactories;
import org.wisdom.api.http.Context;
import org.wisdom.api.http.websockets.Publisher;
import org.wisdom.api.http.websockets.WebSocketDispatcher;
import org.wisdom.api.http.websockets.WebSocketListener;
Expand Down Expand Up @@ -202,12 +203,13 @@ public synchronized void unbindController(Controller controller) {
* @param content the received content
*/
@Override
public void received(final String uri, final String from, final byte[] content) {
public void received(final String uri, final String from, final byte[] content, final Context context) {
for (final OnMessageWebSocketCallback listener : listeners) {
if (listener.matches(uri)) {
executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Context.CONTEXT.set(context);
try {
listener.invoke(uri, from, content);
} catch (InvocationTargetException e) { //NOSONAR
Expand All @@ -218,6 +220,8 @@ public Void call() throws Exception {
} catch (Exception e) {
LOGGER.error("An error occurred in the @OnMessage callback {}#{} : {}",
listener.getController().getClass().getName(), listener.getMethod().getName(), e.getMessage(), e);
} finally {
Context.CONTEXT.remove();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void foo(@Body String message) {
assertThat(router.listeners.iterator().next().check()).isTrue();
assertThat(router.listeners.iterator().next().getController()).isEqualTo(controller);

router.received("/ws", "client", "hello".getBytes(Charset.defaultCharset()));
router.received("/ws", "client", "hello".getBytes(Charset.defaultCharset()), null);

assertThat(message).isEqualTo("hello");

Expand Down Expand Up @@ -167,7 +167,7 @@ public void foo(@Parameter("name") String name, @Body String message, @Parameter
assertThat(router.listeners.iterator().next().check()).isTrue();
assertThat(router.listeners.iterator().next().getController()).isEqualTo(controller);

router.received("/ws/foo", "client", "hello".getBytes(Charset.defaultCharset()));
router.received("/ws/foo", "client", "hello".getBytes(Charset.defaultCharset()), null);

assertThat(results)
.contains(entry("message", "hello"))
Expand All @@ -176,7 +176,7 @@ public void foo(@Parameter("name") String name, @Body String message, @Parameter

results.clear();

router.received("/ws", "client", "hello".getBytes(Charset.defaultCharset()));
router.received("/ws", "client", "hello".getBytes(Charset.defaultCharset()), null);

// Should not have been received.
assertThat(results).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package org.wisdom.api.http.websockets;

import org.wisdom.api.http.Context;

/**
* Classes implementing this interface should register themselves on {@link WebSocketDispatcher} to receive
* notification when client are opening, closing web sockets or sending data.
Expand All @@ -32,7 +34,7 @@ public interface WebSocketListener {
* @param client the client id
* @param content the received content
*/
public void received(String uri, String client, byte[] content);
public void received(String uri, String client, byte[] content, Context context);

/**
* Callback invoked when a new client connects on a web socket identified by its url.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ public ContextFromVertx(Vertx vertx, io.vertx.core.Context vertxContext, Service
}
}

/**
* Creates a new context with Http headers only, without HttpRequest
* This is used to partially initialize the context on WebSocket
*
* @param accessor a structure containing the used services.
* @param headers the incoming HTTP Headers.
*/
public ContextFromVertx(Vertx vertx, io.vertx.core.Context vertxContext, ServiceAccessor accessor, MultiMap headers) {
id = ids.getAndIncrement();
services = accessor;
request = new RequestFromVertx(headers);
this.vertx = vertx;
flash = new FlashCookieImpl(accessor.getConfiguration());
session = new SessionCookieImpl(accessor.getCrypto(), accessor.getConfiguration());
flash.init(this);
session.init(this);

if (vertxContext == null) {
throw new IllegalArgumentException("Creating a context from vert.x outside of an event loop");
} else {
this.vertxContext = vertxContext;
}
}


/**
* The context id (unique).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.net.MediaType;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.HttpServerRequestImpl;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import org.wisdom.api.cookies.Cookie;
import org.wisdom.api.cookies.Cookies;
Expand All @@ -35,6 +38,8 @@
import org.wisdom.framework.vertx.cookies.CookiesImpl;
import org.wisdom.framework.vertx.file.VertxFileUpload;

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
Expand Down Expand Up @@ -73,7 +78,19 @@ public class RequestFromVertx extends Request {
*/
public RequestFromVertx(final HttpServerRequest request) {
this.request = request;
this.cookies = new CookiesImpl(request);
this.cookies = new CookiesImpl(request.headers());
this.data = new HashMap<>();
}

/**
* Creates a mock {@link org.wisdom.framework.vertx.RequestFromVertx} object from headers
* This is used to retrieve cookies information from WebSocket connection
*
* @param headers Headers from a WebSocket connection
*/
public RequestFromVertx(final MultiMap headers) {
this.request = this.mock;
this.cookies = new CookiesImpl(headers);
this.data = new HashMap<>();
}

Expand Down Expand Up @@ -594,4 +611,151 @@ public boolean ready() {
protected void setRawBody(Buffer raw) {
this.raw = raw;
}

private HttpServerRequest mock = new HttpServerRequest() {
@Override
public HttpServerRequest exceptionHandler(Handler<Throwable> handler) {
return null;
}

@Override
public HttpServerRequest handler(Handler<Buffer> handler) {
return null;
}

@Override
public HttpServerRequest pause() {
return null;
}

@Override
public HttpServerRequest resume() {
return null;
}

@Override
public HttpServerRequest endHandler(Handler<Void> handler) {
return null;
}

@Override
public HttpVersion version() {
return null;
}

@Override
public HttpMethod method() {
return null;
}

@Override
public String uri() {
return null;
}

@Override
public String path() {
return null;
}

@Override
public String query() {
return null;
}

@Override
public HttpServerResponse response() {
return null;
}

@Override
public MultiMap headers() {
return null;
}

@Override
public String getHeader(String s) {
return null;
}

@Override
public String getHeader(CharSequence charSequence) {
return null;
}

@Override
public MultiMap params() {
return null;
}

@Override
public String getParam(String s) {
return null;
}

@Override
public SocketAddress remoteAddress() {
return null;
}

@Override
public SocketAddress localAddress() {
return null;
}

@Override
public X509Certificate[] peerCertificateChain() throws SSLPeerUnverifiedException {
return new X509Certificate[0];
}

@Override
public String absoluteURI() {
return null;
}

@Override
public HttpServerRequest bodyHandler(Handler<Buffer> handler) {
return null;
}

@Override
public NetSocket netSocket() {
return null;
}

@Override
public HttpServerRequest setExpectMultipart(boolean b) {
return null;
}

@Override
public boolean isExpectMultipart() {
return false;
}

@Override
public HttpServerRequest uploadHandler(Handler<HttpServerFileUpload> handler) {
return null;
}

@Override
public MultiMap formAttributes() {
return null;
}

@Override
public String getFormAttribute(String s) {
return null;
}

@Override
public ServerWebSocket upgrade() {
return null;
}

@Override
public boolean isEnded() {
return false;
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private void bind(int p, Handler<AsyncResult<Void>> completion) {

http = vertx.createHttpServer(options)
.requestHandler(new HttpHandler(vertx, accessor, this))
.websocketHandler(new WebSocketHandler(accessor, this));
.websocketHandler(new WebSocketHandler(vertx, accessor, this));

http.listen(thePort, host, event -> {
if (event.succeeded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.wisdom.framework.vertx;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wisdom.api.http.Context;

/**
* Handles web socket frames.
Expand All @@ -38,6 +40,7 @@ public class WebSocketHandler implements Handler<ServerWebSocket> {
* The structure used to access services.
*/
private final ServiceAccessor accessor;
private final Vertx vertx;

/**
* The server configuration.
Expand All @@ -51,9 +54,10 @@ public class WebSocketHandler implements Handler<ServerWebSocket> {
* @param server the server configuration - used to check whether or not the message should be
* allowed or denied
*/
public WebSocketHandler(ServiceAccessor accessor, Server server) {
public WebSocketHandler(Vertx vertx, ServiceAccessor accessor, Server server) {
this.accessor = accessor;
this.configuration = server;
this.vertx = vertx;
}

/**
Expand All @@ -65,20 +69,31 @@ public WebSocketHandler(ServiceAccessor accessor, Server server) {
public void handle(final ServerWebSocket socket) {
LOGGER.info("New web socket connection {}, {}", socket, socket.uri());

final ContextFromVertx context = new ContextFromVertx(vertx, vertx.getOrCreateContext(), accessor, socket.headers());

if (! configuration.accept(socket.uri())) {
LOGGER.warn("Web Socket connection denied on {} by {}", socket.uri(), configuration.name());
return;
}

final Socket sock = new Socket(socket);
Context.CONTEXT.set(context);
//TODO Propagate context in case listeners use executor on socket opening ?
accessor.getDispatcher().addSocket(socket.path(), sock);
Context.CONTEXT.remove();

socket.closeHandler(event -> {
LOGGER.info("Web Socket closed {}, {}", socket, socket.uri());
Context.CONTEXT.set(context);
//TODO Propagate context in case listeners use executor on socket closing ?
accessor.getDispatcher().removeSocket(socket.path(), sock);
Context.CONTEXT.remove();
});

socket.handler(event -> accessor.getDispatcher().received(socket.path(), event.getBytes(), sock));
socket.handler(event -> {
//Context has to be propagated as WebSocketRouter (default WebSocketListener) execute registered routes in an executor
accessor.getDispatcher().received(socket.path(), event.getBytes(), sock, context);
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,17 @@ public void send(String uri, String client, byte[] message) {
* @param uri the web socket url
* @param content the data
* @param socket the client channel
* @param context the context in which to handle the message
*/
public void received(String uri, byte[] content, Socket socket) {
public void received(String uri, byte[] content, Socket socket, ContextFromVertx context) {
List<WebSocketListener> localListeners;
synchronized (this) {
localListeners = new ArrayList<>(this.listeners);
}

for (WebSocketListener listener : localListeners) {
listener.received(uri, id(socket), content);
//
listener.received(uri, id(socket), content, context);
}
}
}
Loading