Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions client/src/main/scala/rabbit/Emit.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import spray.json._
import MessageJsonProtocol._
import scala.language.postfixOps

object Emit {
private val emitter: RabbitmqEmitter = new RabbitmqEmitter

def send(message: String) = emitter.SendMessage(message, "fuckoff")
}

object emit2 extends App {

val message: JsValue = Message("ajeje", "brazorf", 213).toJson

Emit.send(message.toString())
}
65 changes: 65 additions & 0 deletions client/src/main/scala/rabbit/RabbitmqConnection.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import com.rabbitmq.client.{ConnectionFactory, Delivery}

trait RabbitmqConnection{
val factory = new ConnectionFactory
val exchangeName: String = "topicLogs"
val exchangeType: String = "topic"
var routingKey: String = "prova." //potrebbe essere migliorato -> cambiare struttura classi
val charsetName = "UTF-8"
}

class RabbitmqEmitter extends RabbitmqConnection {

def SendMessage(message: String, logName: String = "*") = {
try{
val connection = factory.newConnection
val channel = connection.createChannel

// exchange configuration
routingKey = routingKey + logName
channel.exchangeDeclare(exchangeName, exchangeType)
try{
//send message
channel.basicPublish(exchangeName, routingKey, null, message.getBytes(charsetName))
System.out.println(" [x] Sent to " + routingKey + ": '" + message + "'")

} catch {
case _ => println("Error in sending")
} finally {
if (channel.isOpen) channel.close()
if (connection.isOpen) connection.close()
}
} catch {
case _ => println("Error in connect creation")
}
}

}

class RabbitmqReciver extends RabbitmqConnection {

def work(callback: String => Unit, logName: String = "*"): Unit = {
try{
val connection = factory.newConnection
val channel = connection.createChannel

// queue configuration
channel.exchangeDeclare(exchangeName, exchangeType)
val queueName = channel.queueDeclare.getQueue
routingKey = routingKey + logName
channel.queueBind(queueName, exchangeName, routingKey)

//recive message
val autoAck = true
channel.basicConsume(queueName, autoAck,
(consumerTag: String, delivery: Delivery) => {
val message = new String(delivery.getBody, charsetName)
callback(message)
},
(consumerTag: String) => {})
} catch {
case _ => println("Error in reciver")
}

}
}
24 changes: 24 additions & 0 deletions client/src/main/scala/rabbit/Recv.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import spray.json._
import MessageJsonProtocol._

object Recv {
private val reciver: RabbitmqReciver = new RabbitmqReciver

def work(callback: String => Unit, logName: String = "*") = reciver.work(callback, logName)
}

object r1 extends App {
Recv.work(s => println(s), "fuckoff")
}

object r2 extends App {
Recv.work(s => println(s), "bitches")
}

object r3 extends App {
Recv.work(s => {
println(s)
val msg = s.parseJson.convertTo[Message]
println(msg.payload)
})
}