Subversion Repositories Code-Repo

Rev

Blame | Last modification | View Log | RSS feed

/**************************************************************************
 * Copyright 2011 Jules White
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ***************************************************************************/
package org.vt.ece4564.latmb;

import java.awt.geom.Point2D;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.vt.ece4564.latmb.LATMBProtocol.DateTime;
import org.vt.ece4564.latmb.LATMBProtocol.Message;
import org.vt.ece4564.latmb.LATMBProtocol.Position;
import org.vt.ece4564.latmb.LATMBProtocol.TrackingMessage;
import org.vt.ece4564.latmb.LATMBProtocol.Message.Builder;

/**
 * Handles a server-side channel.
 * 
 * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
 * @author <a href="http://gleamynode.net/">Trustin Lee</a>
 * 
 * @version $Rev: 2121 $, $Date: 2010-02-02 09:38:07 +0900 (Tue, 02 Feb 2010) $
 */
public class LATMBServerHandler extends SimpleChannelUpstreamHandler {

        private static final Logger logger = Logger
                        .getLogger(LATMBServerHandler.class.getName());

        static final ChannelGroup channels = new DefaultChannelGroup();
        
        // Map of channels connected to chatrooms
        static final Map<String, ChannelGroup> chatrooms = Collections.synchronizedMap(new HashMap<String, ChannelGroup>());
        
        // Map of messages for each chatroom
        static final Map<String, List<LATMBMessage>> messages = Collections.synchronizedMap(new HashMap<String, List<LATMBMessage>>());
        
        // Map of channels and their locations
        static final Map<Channel, Point2D.Double> gpsChannels = Collections.synchronizedMap(new HashMap<Channel, Point2D.Double>());
        
        // List storing messages for GPS lookup
        static final List<LATMBMessage> gpsMessages = new LinkedList<LATMBMessage>();
        
        
        private Thread checkThread_;

        @Override
        public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
                        throws Exception {
                if (e instanceof ChannelStateEvent) {
                        logger.info(e.toString());
                }
                super.handleUpstream(ctx, e);
        }

        public void broadcast(TrackingMessage msg) {
                for (Channel c : channels) {
                        c.write(msg);
                }
        }

        private synchronized void checkRecords() {
                if (checkThread_ == null) {
                        checkThread_ = new Thread(new Runnable() {

                                @Override
                                public void run() {
                                        while (true) {
                                                try {
                                                        Thread.currentThread().sleep(5000);
                                                        
                                                        // Removes empty chatrooms
                                                        for (String str : chatrooms.keySet()) {
                                                                if (chatrooms.get(str).size() == 0) {
                                                                        chatrooms.remove(str);
                                                                }
                                                        }
                                                        
                                                        // Removes expired messages and empty lists from chatrooms
                                                        Calendar c = Calendar.getInstance();
                                                        for (String str : messages.keySet()) {
                                                                if (messages.get(str).size() == 0) {
                                                                        messages.remove(str);
                                                                } else {
                                                                        List<LATMBMessage> toRemove = new ArrayList<LATMBMessage>();
                                                                        for (LATMBMessage message : messages.get(str)) {
                                                                                if (c.after(message.getExpiration())) {
                                                                                        toRemove.add(message);
                                                                                }
                                                                        }
                                                                        for (LATMBMessage message : toRemove) {
                                                                                messages.get(str).remove(message);
                                                                        }
                                                                }
                                                        }
                                                        
                                                        // Remove expired messages from list of messages with GPS locations
                                                        List<LATMBMessage> toRemove = new ArrayList<LATMBMessage>();
                                                        for (LATMBMessage msg : gpsMessages) {
                                                                if (c.after(msg.getExpiration())) {
                                                                        toRemove.add(msg);
                                                                }
                                                        }
                                                        for (LATMBMessage message : toRemove) {
                                                                gpsMessages.remove(message);
                                                        }
                                                        
                                                } catch (Exception e) {
                                                        e.printStackTrace();
                                                }
                                        }
                                }
                        });
                        checkThread_.start();
                }
        }
        
        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
                        throws Exception {

                channels.add(e.getChannel());
                
                // Start the thread to check for expired entries
                checkRecords();
        }

        @Override
        public void channelDisconnected(ChannelHandlerContext ctx,
                        ChannelStateEvent e) throws Exception {
                // Unregister the channel from the global channel list
                // so the channel does not receive messages anymore.
                channels.remove(e.getChannel());
                
                // Remove channel from list of GPS channels as well
                gpsChannels.remove(e.getChannel());
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
                
                TrackingMessage tmsg = (TrackingMessage) e.getMessage();
                
                switch (tmsg.getType()) {
                case 0: // Initial connection from a client
                        System.out.println("Initial Request Message Recieved");
                        if (tmsg.getMessage(0).hasChatroom()) {
                                String chatroom = tmsg.getMessage(0).getChatroom();
                                // If chatroom exists, send back all messages from that chatroom
                                if (messages.containsKey(chatroom)) {
                                        e.getChannel().write(aggregateMessagesFromChatrooms(chatroom));
                                }
                                // Add the current channel to the list of channels for that chatroom
                                if (chatrooms.containsKey(chatroom)) {
                                        chatrooms.get(chatroom).add(e.getChannel());
                                } else {
                                        ChannelGroup channels = new DefaultChannelGroup();
                                        channels.add(e.getChannel());
                                        chatrooms.put(chatroom, channels);
                                }
                        } else if (tmsg.getMessage(0).hasCoordinates()) {
                                double lat = tmsg.getMessage(0).getCoordinates().getLatitude();
                                double lon = tmsg.getMessage(0).getCoordinates().getLongitude();
                                double radius = tmsg.getMessage(0).getRadius();
                                
                                // Get messages that are within radius of channel
                                e.getChannel().write(aggregateMessagesFromGPS(lat, lon, radius));
                                
                                // Save channel with its location
                                Point2D.Double pt = new Point2D.Double(lat, lon);
                                gpsChannels.put(e.getChannel(), pt);
                        }
                        break;
                case 1: // Saves message and broadcast it
                        System.out.println("New Update Message Recieved");
                        LATMBMessage msg = serializeFromProtoBuffer(tmsg);
                        if (tmsg.getMessage(0).hasChatroom()) {
                                String chatroom = tmsg.getMessage(0).getChatroom();
                                // Send message to all channels connected to the chatroom
                                if (messages.containsKey(chatroom)) {
                                        messages.get(chatroom).add(msg);
                                        for (Channel c : chatrooms.get(chatroom)) {
                                                c.write(tmsg);
                                        }
                                } else {
                                        List<LATMBMessage> newList = new LinkedList<LATMBMessage>();
                                        newList.add(msg);
                                        messages.put(chatroom, newList);
                                        for (Channel c : chatrooms.get(chatroom)) {
                                                c.write(tmsg);
                                        }
                                }
                        } else if (tmsg.getMessage(0).hasCoordinates()) {
                                // Otherwise search for channels that are within radius
                                for (Channel ch : gpsChannels.keySet()) {
                                        double lat = gpsChannels.get(ch).getX();
                                        double lon = gpsChannels.get(ch).getY();
                                        double radius = msg.getRadius();
                                        
                                        // Send message to each channel within the radius
                                        if (getCoordinateDistance(lat, lon, msg.getLatitude(), msg.getLongitude()) <= radius) {
                                                ch.write(tmsg);
                                        }
                                }
                                // Save message to list of GPS associated messages
                                gpsMessages.add(msg);
                        }
                        break;
                }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
                logger.log(Level.WARNING, "Unexpected exception from downstream.",
                                e.getCause());
                e.getChannel().close();
        }
        
        // Returns a TrackingMessage of all messages that are within radius of specified point
        private TrackingMessage aggregateMessagesFromGPS (double lat, double lon, double radius) {
                org.vt.ece4564.latmb.LATMBProtocol.TrackingMessage.Builder msg = TrackingMessage.newBuilder()
                                .setId(0).setType(1);
                
                for (LATMBMessage entry : gpsMessages) {
                        if (getCoordinateDistance(lat, lon, entry.getLatitude(), entry.getLongitude()) <= radius) {
                                Calendar c = entry.getTimestamp();
                                DateTime timestamp = DateTime.newBuilder()
                                                .setYear(c.get(Calendar.YEAR))
                                                .setMonth(c.get(Calendar.MONTH))
                                                .setDay(c.get(Calendar.DAY_OF_MONTH))
                                                .setHour(c.get(Calendar.HOUR_OF_DAY))
                                                .setMinute(c.get(Calendar.MINUTE))
                                                .setSecond(c.get(Calendar.SECOND))
                                                .build();
                                
                                c = entry.getExpiration();
                                DateTime expiration = DateTime.newBuilder()
                                                .setYear(c.get(Calendar.YEAR))
                                                .setMonth(c.get(Calendar.MONTH))
                                                .setDay(c.get(Calendar.DAY_OF_MONTH))
                                                .setHour(c.get(Calendar.HOUR_OF_DAY))
                                                .setMinute(c.get(Calendar.MINUTE))
                                                .setSecond(c.get(Calendar.SECOND))
                                                .build();
                                
                                Builder message = Message.newBuilder()
                                                .setMessage(entry.getMessage())
                                                .setTimestamp(timestamp)
                                                .setExpiration(expiration);
                                
                                if (entry.getUsername() != null) {
                                        message.setUsername(entry.getUsername());
                                }
                                
                                if (entry.getChatroom() != null) {
                                        message.setChatroom(entry.getChatroom());
                                } else if (entry.getLatitude() != 0 && entry.getLongitude() != 0){
                                        Position pos = Position.newBuilder()
                                                        .setLatitude(entry.getLatitude())
                                                        .setLongitude(entry.getLongitude())
                                                        .setAccuracy(0)
                                                        .build();
                                        message.setCoordinates(pos);
                                }
                                
                                if (entry.getRadius() != 0) {
                                        message.setRadius(entry.getRadius());
                                }
                                
                                msg.addMessage(message);
                        }
                }
                if (msg.isInitialized()) {
                        return msg.build();
                } else {
                        return null;
                }
        }
        
        // Returns a TrackingMessage containing all the messages from a chatroom
        private TrackingMessage aggregateMessagesFromChatrooms (String chatroom) {
                List<LATMBMessage> list = messages.get(chatroom);
                
                org.vt.ece4564.latmb.LATMBProtocol.TrackingMessage.Builder msg = TrackingMessage.newBuilder()
                                // Type 1 == New Message
                                .setId(0).setType(1);
                
                for (LATMBMessage entry : list) {
                        Calendar c = entry.getTimestamp();
                        DateTime timestamp = DateTime.newBuilder()
                                        .setYear(c.get(Calendar.YEAR))
                                        .setMonth(c.get(Calendar.MONTH))
                                        .setDay(c.get(Calendar.DAY_OF_MONTH))
                                        .setHour(c.get(Calendar.HOUR_OF_DAY))
                                        .setMinute(c.get(Calendar.MINUTE))
                                        .setSecond(c.get(Calendar.SECOND))
                                        .build();
                        
                        c = entry.getExpiration();
                        DateTime expiration = DateTime.newBuilder()
                                        .setYear(c.get(Calendar.YEAR))
                                        .setMonth(c.get(Calendar.MONTH))
                                        .setDay(c.get(Calendar.DAY_OF_MONTH))
                                        .setHour(c.get(Calendar.HOUR_OF_DAY))
                                        .setMinute(c.get(Calendar.MINUTE))
                                        .setSecond(c.get(Calendar.SECOND))
                                        .build();
                        
                        Builder message = Message.newBuilder()
                                        .setMessage(entry.getMessage())
                                        .setTimestamp(timestamp)
                                        .setExpiration(expiration);
                        
                        if (entry.getUsername() != null) {
                                message.setUsername(entry.getUsername());
                        }
                        
                        if (entry.getChatroom() != null) {
                                message.setChatroom(entry.getChatroom());
                        } else if (entry.getLatitude() != 0 && entry.getLongitude() != 0){
                                Position pos = Position.newBuilder()
                                                .setLatitude(entry.getLatitude())
                                                .setLongitude(entry.getLongitude())
                                                .setAccuracy(0)
                                                .build();
                                message.setCoordinates(pos);
                        }
                        
                        if (entry.getRadius() != 0) {
                                message.setRadius(entry.getRadius());
                        }
                        msg.addMessage(message);
                }
                if (msg.isInitialized()) {
                        return msg.build();
                } else {
                        return null;
                }
        }

        // Returns a TrackingMessage created from the passed LATMBMessage
        private TrackingMessage serializeToProtoBuffer (LATMBMessage entry) {
                
                Calendar c = entry.getTimestamp();
                DateTime timestamp = DateTime.newBuilder()
                                .setYear(c.get(Calendar.YEAR))
                                .setMonth(c.get(Calendar.MONTH))
                                .setDay(c.get(Calendar.DAY_OF_MONTH))
                                .setHour(c.get(Calendar.HOUR_OF_DAY))
                                .setMinute(c.get(Calendar.MINUTE))
                                .setSecond(c.get(Calendar.SECOND))
                                .build();
                
                c = entry.getExpiration();
                DateTime expiration = DateTime.newBuilder()
                                .setYear(c.get(Calendar.YEAR))
                                .setMonth(c.get(Calendar.MONTH))
                                .setDay(c.get(Calendar.DAY_OF_MONTH))
                                .setHour(c.get(Calendar.HOUR_OF_DAY))
                                .setMinute(c.get(Calendar.MINUTE))
                                .setSecond(c.get(Calendar.SECOND))
                                .build();
                
                Builder message = Message.newBuilder()
                                .setMessage(entry.getMessage())
                                .setTimestamp(timestamp)
                                .setExpiration(expiration);
                
                if (entry.getUsername() != null) {
                        message.setUsername(entry.getUsername());
                }
                
                if (entry.getChatroom() != null) {
                        message.setChatroom(entry.getChatroom());
                } else if (entry.getLatitude() != 0 && entry.getLongitude() != 0){
                        Position pos = Position.newBuilder()
                                        .setLatitude(entry.getLatitude())
                                        .setLongitude(entry.getLongitude())
                                        .setAccuracy(0)
                                        .build();
                        message.setCoordinates(pos);
                }
                
                if (entry.getRadius() != 0) {
                        message.setRadius(entry.getRadius());
                }
                
                TrackingMessage msg = TrackingMessage.newBuilder()
                                // Type 1 == New Message
                                .setId(0).setType(1)
                                .addMessage(message)
                                .build();
                
                return msg;
        }
        
        // Returns a LATMBMessage from the passed TrackingMessage
        private LATMBMessage serializeFromProtoBuffer(TrackingMessage message) {
                Message msg = message.getMessage(0);
                LATMBMessage newEntry = new LATMBMessage();
                
                newEntry.setMessage(msg.getMessage());
                
                DateTime timestamp = msg.getTimestamp();
                DateTime expiration = msg.getExpiration();
                
                Calendar time = new GregorianCalendar();
                time.set(timestamp.getYear(), timestamp.getMonth(), timestamp.getDay(), 
                                timestamp.getHour(), timestamp.getMinute(), timestamp.getSecond());
                newEntry.setTimestamp(time);
                
                Calendar exp = new GregorianCalendar();
                exp.set(expiration.getYear(), expiration.getMonth(), expiration.getDay(), 
                                expiration.getHour(), expiration.getMinute(), expiration.getSecond());
                newEntry.setExpiration(exp);
                
                if (msg.hasUsername()) {
                        newEntry.setUsername(msg.getUsername());
                } else {
                        newEntry.setUsername("Anonymous");
                }
                
                if (msg.hasChatroom()) {
                        newEntry.setChatroom(msg.getChatroom());
                }
                
                if (msg.hasCoordinates()) {
                        Position coord = msg.getCoordinates();
                        newEntry.setLatitude(coord.getLatitude());
                        newEntry.setLongitude(coord.getLongitude());
                }
                
                if (msg.hasRadius()) {
                        newEntry.setRadius(msg.getRadius());
                }
                
                return newEntry;
        }
        
        // Returns the distance (in km) between the two specified points
        private double getCoordinateDistance(double startLat, double startLong, double endLat, double endLong) {
                double d2r = Math.PI / 180;

            double dlong = (endLong - startLong) * d2r;
            double dlat = (endLat - startLat) * d2r;
            double a =
                Math.pow(Math.sin(dlat / 2.0), 2)
                    + Math.cos(startLat * d2r)
                    * Math.cos(endLat * d2r)
                    * Math.pow(Math.sin(dlong / 2.0), 2);
            double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
            double d = 6367 * c;

            return d;
        }
}