Blame | Last modification | View Log | RSS feed
/**************************************************************************
* Copyright 2011 Jules White
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package org.vt.ece4564.latmb;
import java.awt.geom.Point2D;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.vt.ece4564.latmb.LATMBProtocol.DateTime;
import org.vt.ece4564.latmb.LATMBProtocol.Message;
import org.vt.ece4564.latmb.LATMBProtocol.Position;
import org.vt.ece4564.latmb.LATMBProtocol.TrackingMessage;
import org.vt.ece4564.latmb.LATMBProtocol.Message.Builder;
/**
* Handles a server-side channel.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev: 2121 $, $Date: 2010-02-02 09:38:07 +0900 (Tue, 02 Feb 2010) $
*/
public class LATMBServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger
.getLogger(LATMBServerHandler.class.getName());
static final ChannelGroup channels = new DefaultChannelGroup();
// Map of channels connected to chatrooms
static final Map<String, ChannelGroup> chatrooms = Collections.synchronizedMap(new HashMap<String, ChannelGroup>());
// Map of messages for each chatroom
static final Map<String, List<LATMBMessage>> messages = Collections.synchronizedMap(new HashMap<String, List<LATMBMessage>>());
// Map of channels and their locations
static final Map<Channel, Point2D.Double> gpsChannels = Collections.synchronizedMap(new HashMap<Channel, Point2D.Double>());
// List storing messages for GPS lookup
static final List<LATMBMessage> gpsMessages = new LinkedList<LATMBMessage>();
private Thread checkThread_;
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
public void broadcast(TrackingMessage msg) {
for (Channel c : channels) {
c.write(msg);
}
}
private synchronized void checkRecords() {
if (checkThread_ == null) {
checkThread_ = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.currentThread().sleep(5000);
// Removes empty chatrooms
for (String str : chatrooms.keySet()) {
if (chatrooms.get(str).size() == 0) {
chatrooms.remove(str);
}
}
// Removes expired messages and empty lists from chatrooms
Calendar c = Calendar.getInstance();
for (String str : messages.keySet()) {
if (messages.get(str).size() == 0) {
messages.remove(str);
} else {
List<LATMBMessage> toRemove = new ArrayList<LATMBMessage>();
for (LATMBMessage message : messages.get(str)) {
if (c.after(message.getExpiration())) {
toRemove.add(message);
}
}
for (LATMBMessage message : toRemove) {
messages.get(str).remove(message);
}
}
}
// Remove expired messages from list of messages with GPS locations
List<LATMBMessage> toRemove = new ArrayList<LATMBMessage>();
for (LATMBMessage msg : gpsMessages) {
if (c.after(msg.getExpiration())) {
toRemove.add(msg);
}
}
for (LATMBMessage message : toRemove) {
gpsMessages.remove(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
checkThread_.start();
}
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
channels.add(e.getChannel());
// Start the thread to check for expired entries
checkRecords();
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
// Unregister the channel from the global channel list
// so the channel does not receive messages anymore.
channels.remove(e.getChannel());
// Remove channel from list of GPS channels as well
gpsChannels.remove(e.getChannel());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
TrackingMessage tmsg = (TrackingMessage) e.getMessage();
switch (tmsg.getType()) {
case 0: // Initial connection from a client
System.out.println("Initial Request Message Recieved");
if (tmsg.getMessage(0).hasChatroom()) {
String chatroom = tmsg.getMessage(0).getChatroom();
// If chatroom exists, send back all messages from that chatroom
if (messages.containsKey(chatroom)) {
e.getChannel().write(aggregateMessagesFromChatrooms(chatroom));
}
// Add the current channel to the list of channels for that chatroom
if (chatrooms.containsKey(chatroom)) {
chatrooms.get(chatroom).add(e.getChannel());
} else {
ChannelGroup channels = new DefaultChannelGroup();
channels.add(e.getChannel());
chatrooms.put(chatroom, channels);
}
} else if (tmsg.getMessage(0).hasCoordinates()) {
double lat = tmsg.getMessage(0).getCoordinates().getLatitude();
double lon = tmsg.getMessage(0).getCoordinates().getLongitude();
double radius = tmsg.getMessage(0).getRadius();
// Get messages that are within radius of channel
e.getChannel().write(aggregateMessagesFromGPS(lat, lon, radius));
// Save channel with its location
Point2D.Double pt = new Point2D.Double(lat, lon);
gpsChannels.put(e.getChannel(), pt);
}
break;
case 1: // Saves message and broadcast it
System.out.println("New Update Message Recieved");
LATMBMessage msg = serializeFromProtoBuffer(tmsg);
if (tmsg.getMessage(0).hasChatroom()) {
String chatroom = tmsg.getMessage(0).getChatroom();
// Send message to all channels connected to the chatroom
if (messages.containsKey(chatroom)) {
messages.get(chatroom).add(msg);
for (Channel c : chatrooms.get(chatroom)) {
c.write(tmsg);
}
} else {
List<LATMBMessage> newList = new LinkedList<LATMBMessage>();
newList.add(msg);
messages.put(chatroom, newList);
for (Channel c : chatrooms.get(chatroom)) {
c.write(tmsg);
}
}
} else if (tmsg.getMessage(0).hasCoordinates()) {
// Otherwise search for channels that are within radius
for (Channel ch : gpsChannels.keySet()) {
double lat = gpsChannels.get(ch).getX();
double lon = gpsChannels.get(ch).getY();
double radius = msg.getRadius();
// Send message to each channel within the radius
if (getCoordinateDistance(lat, lon, msg.getLatitude(), msg.getLongitude()) <= radius) {
ch.write(tmsg);
}
}
// Save message to list of GPS associated messages
gpsMessages.add(msg);
}
break;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(Level.WARNING, "Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
// Returns a TrackingMessage of all messages that are within radius of specified point
private TrackingMessage aggregateMessagesFromGPS (double lat, double lon, double radius) {
org.vt.ece4564.latmb.LATMBProtocol.TrackingMessage.Builder msg = TrackingMessage.newBuilder()
.setId(0).setType(1);
for (LATMBMessage entry : gpsMessages) {
if (getCoordinateDistance(lat, lon, entry.getLatitude(), entry.getLongitude()) <= radius) {
Calendar c = entry.getTimestamp();
DateTime timestamp = DateTime.newBuilder()
.setYear(c.get(Calendar.YEAR))
.setMonth(c.get(Calendar.MONTH))
.setDay(c.get(Calendar.DAY_OF_MONTH))
.setHour(c.get(Calendar.HOUR_OF_DAY))
.setMinute(c.get(Calendar.MINUTE))
.setSecond(c.get(Calendar.SECOND))
.build();
c = entry.getExpiration();
DateTime expiration = DateTime.newBuilder()
.setYear(c.get(Calendar.YEAR))
.setMonth(c.get(Calendar.MONTH))
.setDay(c.get(Calendar.DAY_OF_MONTH))
.setHour(c.get(Calendar.HOUR_OF_DAY))
.setMinute(c.get(Calendar.MINUTE))
.setSecond(c.get(Calendar.SECOND))
.build();
Builder message = Message.newBuilder()
.setMessage(entry.getMessage())
.setTimestamp(timestamp)
.setExpiration(expiration);
if (entry.getUsername() != null) {
message.setUsername(entry.getUsername());
}
if (entry.getChatroom() != null) {
message.setChatroom(entry.getChatroom());
} else if (entry.getLatitude() != 0 && entry.getLongitude() != 0){
Position pos = Position.newBuilder()
.setLatitude(entry.getLatitude())
.setLongitude(entry.getLongitude())
.setAccuracy(0)
.build();
message.setCoordinates(pos);
}
if (entry.getRadius() != 0) {
message.setRadius(entry.getRadius());
}
msg.addMessage(message);
}
}
if (msg.isInitialized()) {
return msg.build();
} else {
return null;
}
}
// Returns a TrackingMessage containing all the messages from a chatroom
private TrackingMessage aggregateMessagesFromChatrooms (String chatroom) {
List<LATMBMessage> list = messages.get(chatroom);
org.vt.ece4564.latmb.LATMBProtocol.TrackingMessage.Builder msg = TrackingMessage.newBuilder()
// Type 1 == New Message
.setId(0).setType(1);
for (LATMBMessage entry : list) {
Calendar c = entry.getTimestamp();
DateTime timestamp = DateTime.newBuilder()
.setYear(c.get(Calendar.YEAR))
.setMonth(c.get(Calendar.MONTH))
.setDay(c.get(Calendar.DAY_OF_MONTH))
.setHour(c.get(Calendar.HOUR_OF_DAY))
.setMinute(c.get(Calendar.MINUTE))
.setSecond(c.get(Calendar.SECOND))
.build();
c = entry.getExpiration();
DateTime expiration = DateTime.newBuilder()
.setYear(c.get(Calendar.YEAR))
.setMonth(c.get(Calendar.MONTH))
.setDay(c.get(Calendar.DAY_OF_MONTH))
.setHour(c.get(Calendar.HOUR_OF_DAY))
.setMinute(c.get(Calendar.MINUTE))
.setSecond(c.get(Calendar.SECOND))
.build();
Builder message = Message.newBuilder()
.setMessage(entry.getMessage())
.setTimestamp(timestamp)
.setExpiration(expiration);
if (entry.getUsername() != null) {
message.setUsername(entry.getUsername());
}
if (entry.getChatroom() != null) {
message.setChatroom(entry.getChatroom());
} else if (entry.getLatitude() != 0 && entry.getLongitude() != 0){
Position pos = Position.newBuilder()
.setLatitude(entry.getLatitude())
.setLongitude(entry.getLongitude())
.setAccuracy(0)
.build();
message.setCoordinates(pos);
}
if (entry.getRadius() != 0) {
message.setRadius(entry.getRadius());
}
msg.addMessage(message);
}
if (msg.isInitialized()) {
return msg.build();
} else {
return null;
}
}
// Returns a TrackingMessage created from the passed LATMBMessage
private TrackingMessage serializeToProtoBuffer (LATMBMessage entry) {
Calendar c = entry.getTimestamp();
DateTime timestamp = DateTime.newBuilder()
.setYear(c.get(Calendar.YEAR))
.setMonth(c.get(Calendar.MONTH))
.setDay(c.get(Calendar.DAY_OF_MONTH))
.setHour(c.get(Calendar.HOUR_OF_DAY))
.setMinute(c.get(Calendar.MINUTE))
.setSecond(c.get(Calendar.SECOND))
.build();
c = entry.getExpiration();
DateTime expiration = DateTime.newBuilder()
.setYear(c.get(Calendar.YEAR))
.setMonth(c.get(Calendar.MONTH))
.setDay(c.get(Calendar.DAY_OF_MONTH))
.setHour(c.get(Calendar.HOUR_OF_DAY))
.setMinute(c.get(Calendar.MINUTE))
.setSecond(c.get(Calendar.SECOND))
.build();
Builder message = Message.newBuilder()
.setMessage(entry.getMessage())
.setTimestamp(timestamp)
.setExpiration(expiration);
if (entry.getUsername() != null) {
message.setUsername(entry.getUsername());
}
if (entry.getChatroom() != null) {
message.setChatroom(entry.getChatroom());
} else if (entry.getLatitude() != 0 && entry.getLongitude() != 0){
Position pos = Position.newBuilder()
.setLatitude(entry.getLatitude())
.setLongitude(entry.getLongitude())
.setAccuracy(0)
.build();
message.setCoordinates(pos);
}
if (entry.getRadius() != 0) {
message.setRadius(entry.getRadius());
}
TrackingMessage msg = TrackingMessage.newBuilder()
// Type 1 == New Message
.setId(0).setType(1)
.addMessage(message)
.build();
return msg;
}
// Returns a LATMBMessage from the passed TrackingMessage
private LATMBMessage serializeFromProtoBuffer(TrackingMessage message) {
Message msg = message.getMessage(0);
LATMBMessage newEntry = new LATMBMessage();
newEntry.setMessage(msg.getMessage());
DateTime timestamp = msg.getTimestamp();
DateTime expiration = msg.getExpiration();
Calendar time = new GregorianCalendar();
time.set(timestamp.getYear(), timestamp.getMonth(), timestamp.getDay(),
timestamp.getHour(), timestamp.getMinute(), timestamp.getSecond());
newEntry.setTimestamp(time);
Calendar exp = new GregorianCalendar();
exp.set(expiration.getYear(), expiration.getMonth(), expiration.getDay(),
expiration.getHour(), expiration.getMinute(), expiration.getSecond());
newEntry.setExpiration(exp);
if (msg.hasUsername()) {
newEntry.setUsername(msg.getUsername());
} else {
newEntry.setUsername("Anonymous");
}
if (msg.hasChatroom()) {
newEntry.setChatroom(msg.getChatroom());
}
if (msg.hasCoordinates()) {
Position coord = msg.getCoordinates();
newEntry.setLatitude(coord.getLatitude());
newEntry.setLongitude(coord.getLongitude());
}
if (msg.hasRadius()) {
newEntry.setRadius(msg.getRadius());
}
return newEntry;
}
// Returns the distance (in km) between the two specified points
private double getCoordinateDistance(double startLat, double startLong, double endLat, double endLong) {
double d2r = Math.PI / 180;
double dlong = (endLong - startLong) * d2r;
double dlat = (endLat - startLat) * d2r;
double a =
Math.pow(Math.sin(dlat / 2.0), 2)
+ Math.cos(startLat * d2r)
* Math.cos(endLat * d2r)
* Math.pow(Math.sin(dlong / 2.0), 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
double d = 6367 * c;
return d;
}
}