Blame | Last modification | View Log | Download | RSS feed
/*************************************************************************** Copyright 2011 Jules White** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.***************************************************************************/package org.vt.ece4564.latmb;import java.awt.geom.Point2D;import java.util.ArrayList;import java.util.Calendar;import java.util.Collections;import java.util.GregorianCalendar;import java.util.HashMap;import java.util.LinkedList;import java.util.List;import java.util.Map;import java.util.logging.Level;import java.util.logging.Logger;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelEvent;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ChannelStateEvent;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelUpstreamHandler;import org.jboss.netty.channel.group.ChannelGroup;import org.jboss.netty.channel.group.DefaultChannelGroup;import org.vt.ece4564.latmb.LATMBProtocol.DateTime;import org.vt.ece4564.latmb.LATMBProtocol.Message;import org.vt.ece4564.latmb.LATMBProtocol.Position;import org.vt.ece4564.latmb.LATMBProtocol.TrackingMessage;import org.vt.ece4564.latmb.LATMBProtocol.Message.Builder;/*** Handles a server-side channel.** @author <a href="http://www.jboss.org/netty/">The Netty Project</a>* @author <a href="http://gleamynode.net/">Trustin Lee</a>** @version $Rev: 2121 $, $Date: 2010-02-02 09:38:07 +0900 (Tue, 02 Feb 2010) $*/public class LATMBServerHandler extends SimpleChannelUpstreamHandler {private static final Logger logger = Logger.getLogger(LATMBServerHandler.class.getName());static final ChannelGroup channels = new DefaultChannelGroup();// Map of channels connected to chatroomsstatic final Map<String, ChannelGroup> chatrooms = Collections.synchronizedMap(new HashMap<String, ChannelGroup>());// Map of messages for each chatroomstatic final Map<String, List<LATMBMessage>> messages = Collections.synchronizedMap(new HashMap<String, List<LATMBMessage>>());// Map of channels and their locationsstatic final Map<Channel, Point2D.Double> gpsChannels = Collections.synchronizedMap(new HashMap<Channel, Point2D.Double>());// List storing messages for GPS lookupstatic final List<LATMBMessage> gpsMessages = new LinkedList<LATMBMessage>();private Thread checkThread_;@Overridepublic void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)throws Exception {if (e instanceof ChannelStateEvent) {logger.info(e.toString());}super.handleUpstream(ctx, e);}public void broadcast(TrackingMessage msg) {for (Channel c : channels) {c.write(msg);}}private synchronized void checkRecords() {if (checkThread_ == null) {checkThread_ = new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {Thread.currentThread().sleep(5000);// Removes empty chatroomsfor (String str : chatrooms.keySet()) {if (chatrooms.get(str).size() == 0) {chatrooms.remove(str);}}// Removes expired messages and empty lists from chatroomsCalendar c = Calendar.getInstance();for (String str : messages.keySet()) {if (messages.get(str).size() == 0) {messages.remove(str);} else {List<LATMBMessage> toRemove = new ArrayList<LATMBMessage>();for (LATMBMessage message : messages.get(str)) {if (c.after(message.getExpiration())) {toRemove.add(message);}}for (LATMBMessage message : toRemove) {messages.get(str).remove(message);}}}// Remove expired messages from list of messages with GPS locationsList<LATMBMessage> toRemove = new ArrayList<LATMBMessage>();for (LATMBMessage msg : gpsMessages) {if (c.after(msg.getExpiration())) {toRemove.add(msg);}}for (LATMBMessage message : toRemove) {gpsMessages.remove(message);}} catch (Exception e) {e.printStackTrace();}}}});checkThread_.start();}}@Overridepublic void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)throws Exception {channels.add(e.getChannel());// Start the thread to check for expired entriescheckRecords();}@Overridepublic void channelDisconnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception {// Unregister the channel from the global channel list// so the channel does not receive messages anymore.channels.remove(e.getChannel());// Remove channel from list of GPS channels as wellgpsChannels.remove(e.getChannel());}@Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {TrackingMessage tmsg = (TrackingMessage) e.getMessage();switch (tmsg.getType()) {case 0: // Initial connection from a clientSystem.out.println("Initial Request Message Recieved");if (tmsg.getMessage(0).hasChatroom()) {String chatroom = tmsg.getMessage(0).getChatroom();// If chatroom exists, send back all messages from that chatroomif (messages.containsKey(chatroom)) {e.getChannel().write(aggregateMessagesFromChatrooms(chatroom));}// Add the current channel to the list of channels for that chatroomif (chatrooms.containsKey(chatroom)) {chatrooms.get(chatroom).add(e.getChannel());} else {ChannelGroup channels = new DefaultChannelGroup();channels.add(e.getChannel());chatrooms.put(chatroom, channels);}} else if (tmsg.getMessage(0).hasCoordinates()) {double lat = tmsg.getMessage(0).getCoordinates().getLatitude();double lon = tmsg.getMessage(0).getCoordinates().getLongitude();double radius = tmsg.getMessage(0).getRadius();// Get messages that are within radius of channele.getChannel().write(aggregateMessagesFromGPS(lat, lon, radius));// Save channel with its locationPoint2D.Double pt = new Point2D.Double(lat, lon);gpsChannels.put(e.getChannel(), pt);}break;case 1: // Saves message and broadcast itSystem.out.println("New Update Message Recieved");LATMBMessage msg = serializeFromProtoBuffer(tmsg);if (tmsg.getMessage(0).hasChatroom()) {String chatroom = tmsg.getMessage(0).getChatroom();// Send message to all channels connected to the chatroomif (messages.containsKey(chatroom)) {messages.get(chatroom).add(msg);for (Channel c : chatrooms.get(chatroom)) {c.write(tmsg);}} else {List<LATMBMessage> newList = new LinkedList<LATMBMessage>();newList.add(msg);messages.put(chatroom, newList);for (Channel c : chatrooms.get(chatroom)) {c.write(tmsg);}}} else if (tmsg.getMessage(0).hasCoordinates()) {// Otherwise search for channels that are within radiusfor (Channel ch : gpsChannels.keySet()) {double lat = gpsChannels.get(ch).getX();double lon = gpsChannels.get(ch).getY();double radius = msg.getRadius();// Send message to each channel within the radiusif (getCoordinateDistance(lat, lon, msg.getLatitude(), msg.getLongitude()) <= radius) {ch.write(tmsg);}}// Save message to list of GPS associated messagesgpsMessages.add(msg);}break;}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {logger.log(Level.WARNING, "Unexpected exception from downstream.",e.getCause());e.getChannel().close();}// Returns a TrackingMessage of all messages that are within radius of specified pointprivate TrackingMessage aggregateMessagesFromGPS (double lat, double lon, double radius) {org.vt.ece4564.latmb.LATMBProtocol.TrackingMessage.Builder msg = TrackingMessage.newBuilder().setId(0).setType(1);for (LATMBMessage entry : gpsMessages) {if (getCoordinateDistance(lat, lon, entry.getLatitude(), entry.getLongitude()) <= radius) {Calendar c = entry.getTimestamp();DateTime timestamp = DateTime.newBuilder().setYear(c.get(Calendar.YEAR)).setMonth(c.get(Calendar.MONTH)).setDay(c.get(Calendar.DAY_OF_MONTH)).setHour(c.get(Calendar.HOUR_OF_DAY)).setMinute(c.get(Calendar.MINUTE)).setSecond(c.get(Calendar.SECOND)).build();c = entry.getExpiration();DateTime expiration = DateTime.newBuilder().setYear(c.get(Calendar.YEAR)).setMonth(c.get(Calendar.MONTH)).setDay(c.get(Calendar.DAY_OF_MONTH)).setHour(c.get(Calendar.HOUR_OF_DAY)).setMinute(c.get(Calendar.MINUTE)).setSecond(c.get(Calendar.SECOND)).build();Builder message = Message.newBuilder().setMessage(entry.getMessage()).setTimestamp(timestamp).setExpiration(expiration);if (entry.getUsername() != null) {message.setUsername(entry.getUsername());}if (entry.getChatroom() != null) {message.setChatroom(entry.getChatroom());} else if (entry.getLatitude() != 0 && entry.getLongitude() != 0){Position pos = Position.newBuilder().setLatitude(entry.getLatitude()).setLongitude(entry.getLongitude()).setAccuracy(0).build();message.setCoordinates(pos);}if (entry.getRadius() != 0) {message.setRadius(entry.getRadius());}msg.addMessage(message);}}if (msg.isInitialized()) {return msg.build();} else {return null;}}// Returns a TrackingMessage containing all the messages from a chatroomprivate TrackingMessage aggregateMessagesFromChatrooms (String chatroom) {List<LATMBMessage> list = messages.get(chatroom);org.vt.ece4564.latmb.LATMBProtocol.TrackingMessage.Builder msg = TrackingMessage.newBuilder()// Type 1 == New Message.setId(0).setType(1);for (LATMBMessage entry : list) {Calendar c = entry.getTimestamp();DateTime timestamp = DateTime.newBuilder().setYear(c.get(Calendar.YEAR)).setMonth(c.get(Calendar.MONTH)).setDay(c.get(Calendar.DAY_OF_MONTH)).setHour(c.get(Calendar.HOUR_OF_DAY)).setMinute(c.get(Calendar.MINUTE)).setSecond(c.get(Calendar.SECOND)).build();c = entry.getExpiration();DateTime expiration = DateTime.newBuilder().setYear(c.get(Calendar.YEAR)).setMonth(c.get(Calendar.MONTH)).setDay(c.get(Calendar.DAY_OF_MONTH)).setHour(c.get(Calendar.HOUR_OF_DAY)).setMinute(c.get(Calendar.MINUTE)).setSecond(c.get(Calendar.SECOND)).build();Builder message = Message.newBuilder().setMessage(entry.getMessage()).setTimestamp(timestamp).setExpiration(expiration);if (entry.getUsername() != null) {message.setUsername(entry.getUsername());}if (entry.getChatroom() != null) {message.setChatroom(entry.getChatroom());} else if (entry.getLatitude() != 0 && entry.getLongitude() != 0){Position pos = Position.newBuilder().setLatitude(entry.getLatitude()).setLongitude(entry.getLongitude()).setAccuracy(0).build();message.setCoordinates(pos);}if (entry.getRadius() != 0) {message.setRadius(entry.getRadius());}msg.addMessage(message);}if (msg.isInitialized()) {return msg.build();} else {return null;}}// Returns a TrackingMessage created from the passed LATMBMessageprivate TrackingMessage serializeToProtoBuffer (LATMBMessage entry) {Calendar c = entry.getTimestamp();DateTime timestamp = DateTime.newBuilder().setYear(c.get(Calendar.YEAR)).setMonth(c.get(Calendar.MONTH)).setDay(c.get(Calendar.DAY_OF_MONTH)).setHour(c.get(Calendar.HOUR_OF_DAY)).setMinute(c.get(Calendar.MINUTE)).setSecond(c.get(Calendar.SECOND)).build();c = entry.getExpiration();DateTime expiration = DateTime.newBuilder().setYear(c.get(Calendar.YEAR)).setMonth(c.get(Calendar.MONTH)).setDay(c.get(Calendar.DAY_OF_MONTH)).setHour(c.get(Calendar.HOUR_OF_DAY)).setMinute(c.get(Calendar.MINUTE)).setSecond(c.get(Calendar.SECOND)).build();Builder message = Message.newBuilder().setMessage(entry.getMessage()).setTimestamp(timestamp).setExpiration(expiration);if (entry.getUsername() != null) {message.setUsername(entry.getUsername());}if (entry.getChatroom() != null) {message.setChatroom(entry.getChatroom());} else if (entry.getLatitude() != 0 && entry.getLongitude() != 0){Position pos = Position.newBuilder().setLatitude(entry.getLatitude()).setLongitude(entry.getLongitude()).setAccuracy(0).build();message.setCoordinates(pos);}if (entry.getRadius() != 0) {message.setRadius(entry.getRadius());}TrackingMessage msg = TrackingMessage.newBuilder()// Type 1 == New Message.setId(0).setType(1).addMessage(message).build();return msg;}// Returns a LATMBMessage from the passed TrackingMessageprivate LATMBMessage serializeFromProtoBuffer(TrackingMessage message) {Message msg = message.getMessage(0);LATMBMessage newEntry = new LATMBMessage();newEntry.setMessage(msg.getMessage());DateTime timestamp = msg.getTimestamp();DateTime expiration = msg.getExpiration();Calendar time = new GregorianCalendar();time.set(timestamp.getYear(), timestamp.getMonth(), timestamp.getDay(),timestamp.getHour(), timestamp.getMinute(), timestamp.getSecond());newEntry.setTimestamp(time);Calendar exp = new GregorianCalendar();exp.set(expiration.getYear(), expiration.getMonth(), expiration.getDay(),expiration.getHour(), expiration.getMinute(), expiration.getSecond());newEntry.setExpiration(exp);if (msg.hasUsername()) {newEntry.setUsername(msg.getUsername());} else {newEntry.setUsername("Anonymous");}if (msg.hasChatroom()) {newEntry.setChatroom(msg.getChatroom());}if (msg.hasCoordinates()) {Position coord = msg.getCoordinates();newEntry.setLatitude(coord.getLatitude());newEntry.setLongitude(coord.getLongitude());}if (msg.hasRadius()) {newEntry.setRadius(msg.getRadius());}return newEntry;}// Returns the distance (in km) between the two specified pointsprivate double getCoordinateDistance(double startLat, double startLong, double endLat, double endLong) {double d2r = Math.PI / 180;double dlong = (endLong - startLong) * d2r;double dlat = (endLat - startLat) * d2r;double a =Math.pow(Math.sin(dlat / 2.0), 2)+ Math.cos(startLat * d2r)* Math.cos(endLat * d2r)* Math.pow(Math.sin(dlong / 2.0), 2);double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));double d = 6367 * c;return d;}}