| /* |
| * Conditions Of Use |
| * |
| * This software was developed by employees of the National Institute of |
| * Standards and Technology (NIST), an agency of the Federal Government. |
| * Pursuant to title 15 Untied States Code Section 105, works of NIST |
| * employees are not subject to copyright protection in the United States |
| * and are considered to be in the public domain. As a result, a formal |
| * license is not needed to use the software. |
| * |
| * This software is provided by NIST as a service and is expressly |
| * provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED |
| * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF |
| * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT |
| * AND DATA ACCURACY. NIST does not warrant or make any representations |
| * regarding the use of the software or the results thereof, including but |
| * not limited to the correctness, accuracy, reliability or usefulness of |
| * the software. |
| * |
| * Permission to use this software is contingent upon your acceptance |
| * of the terms of this agreement |
| * |
| * . |
| * |
| */ |
| /******************************************************************************* |
| * Product of NIST/ITL Advanced Networking Technologies Division (ANTD). * |
| *******************************************************************************/ |
| package gov.nist.javax.sip.stack; |
| |
| import java.io.IOException; |
| import java.util.LinkedList; |
| import java.net.*; |
| |
| import gov.nist.core.*; |
| |
| /** |
| * Sit in a loop and handle incoming udp datagram messages. For each Datagram |
| * packet, a new UDPMessageChannel is created (upto the max thread pool size). |
| * Each UDP message is processed in its own thread). |
| * |
| * @version 1.2 $Revision: 1.37 $ $Date: 2009/11/14 20:06:16 $ |
| * |
| * @author M. Ranganathan <br/> |
| * |
| * |
| * |
| * <a href="{@docRoot}/../uml/udp-request-processing-sequence-diagram.jpg"> |
| * See the implementation sequence diagram for processing incoming requests. |
| * </a> |
| * |
| * |
| * Acknowledgement: Jeff Keyser contributed ideas on starting and stoppping the |
| * stack that were incorporated into this code. Niklas Uhrberg suggested that |
| * thread pooling be added to limit the number of threads and improve |
| * performance. |
| */ |
| public class UDPMessageProcessor extends MessageProcessor { |
| /** |
| * The Mapped port (in case STUN suport is enabled) |
| */ |
| private int port; |
| |
| /** |
| * Incoming messages are queued here. |
| */ |
| protected LinkedList messageQueue; |
| |
| /** |
| * A list of message channels that we have started. |
| */ |
| protected LinkedList messageChannels; |
| |
| /** |
| * Max # of udp message channels |
| */ |
| protected int threadPoolSize; |
| |
| protected DatagramSocket sock; |
| |
| /** |
| * A flag that is set to false to exit the message processor (suggestion by |
| * Jeff Keyser). |
| */ |
| protected boolean isRunning; |
| |
| private static final int HIGHWAT=5000; |
| |
| private static final int LOWAT=2500; |
| |
| /** |
| * Constructor. |
| * |
| * @param sipStack |
| * pointer to the stack. |
| */ |
| protected UDPMessageProcessor(InetAddress ipAddress, |
| SIPTransactionStack sipStack, int port) throws IOException { |
| super(ipAddress, port, "udp",sipStack); |
| |
| this.sipStack = sipStack; |
| |
| this.messageQueue = new LinkedList(); |
| |
| this.port = port; |
| try { |
| this.sock = sipStack.getNetworkLayer().createDatagramSocket(port, |
| ipAddress); |
| // Create a new datagram socket. |
| sock.setReceiveBufferSize(sipStack.getReceiveUdpBufferSize()); |
| sock.setSendBufferSize(sipStack.getSendUdpBufferSize()); |
| |
| /** |
| * If the thread auditor is enabled, define a socket timeout value in order to |
| * prevent sock.receive() from blocking forever |
| */ |
| if (sipStack.getThreadAuditor().isEnabled()) { |
| sock.setSoTimeout((int) sipStack.getThreadAuditor().getPingIntervalInMillisecs()); |
| } |
| if ( ipAddress.getHostAddress().equals(IN_ADDR_ANY) || |
| ipAddress.getHostAddress().equals(IN6_ADDR_ANY)){ |
| // Store the address to which we are actually bound |
| // Note that on WINDOWS this is actually broken. It will |
| // return IN_ADDR_ANY again. On linux it will return the |
| // address to which the socket was actually bound. |
| super.setIpAddress( sock.getLocalAddress() ); |
| |
| } |
| } catch (SocketException ex) { |
| throw new IOException(ex.getMessage()); |
| } |
| } |
| |
| |
| |
| /** |
| * Get port on which to listen for incoming stuff. |
| * |
| * @return port on which I am listening. |
| */ |
| public int getPort() { |
| return this.port; |
| } |
| |
| /** |
| * Start our processor thread. |
| */ |
| public void start() throws IOException { |
| |
| |
| this.isRunning = true; |
| Thread thread = new Thread(this); |
| thread.setDaemon(true); |
| // Issue #32 on java.net |
| thread.setName("UDPMessageProcessorThread"); |
| // Issue #184 |
| thread.setPriority(Thread.MAX_PRIORITY); |
| thread.start(); |
| } |
| |
| /** |
| * Thread main routine. |
| */ |
| public void run() { |
| // Check for running flag. |
| this.messageChannels = new LinkedList(); |
| // start all our messageChannels (unless the thread pool size is |
| // infinity. |
| if (sipStack.threadPoolSize != -1) { |
| for (int i = 0; i < sipStack.threadPoolSize; i++) { |
| UDPMessageChannel channel = new UDPMessageChannel(sipStack, |
| this); |
| this.messageChannels.add(channel); |
| |
| } |
| } |
| |
| // Ask the auditor to monitor this thread |
| ThreadAuditor.ThreadHandle threadHandle = sipStack.getThreadAuditor().addCurrentThread(); |
| |
| // Somebody asked us to exit. if isRunnning is set to false. |
| while (this.isRunning) { |
| |
| try { |
| // Let the thread auditor know we're up and running |
| threadHandle.ping(); |
| |
| int bufsize = sock.getReceiveBufferSize(); |
| byte message[] = new byte[bufsize]; |
| DatagramPacket packet = new DatagramPacket(message, bufsize); |
| sock.receive(packet); |
| |
| |
| |
| // This is a simplistic congestion control algorithm. |
| // It accepts packets if queuesize is < LOWAT. It drops |
| // requests if the queue size exceeds a HIGHWAT and accepts |
| // requests with probability p proportional to the difference |
| // between current queue size and LOWAT in the range |
| // of queue sizes between HIGHWAT and LOWAT. |
| // TODO -- penalize spammers by looking at the source |
| // port and IP address. |
| if ( sipStack.stackDoesCongestionControl ) { |
| if ( this.messageQueue.size() >= HIGHWAT) { |
| if (sipStack.isLoggingEnabled()) { |
| sipStack.getStackLogger().logDebug("Dropping message -- queue length exceeded"); |
| |
| } |
| //System.out.println("HIGHWAT Drop!"); |
| continue; |
| } else if ( this.messageQueue.size() > LOWAT && this .messageQueue.size() < HIGHWAT ) { |
| // Drop the message with a probabilty that is linear in the range 0 to 1 |
| float threshold = ((float)(messageQueue.size() - LOWAT))/ ((float)(HIGHWAT - LOWAT)); |
| boolean decision = Math.random() > 1.0 - threshold; |
| if ( decision ) { |
| if (sipStack.isLoggingEnabled()) { |
| sipStack.getStackLogger().logDebug("Dropping message with probability " + (1.0 - threshold)); |
| |
| } |
| //System.out.println("RED Drop!"); |
| continue; |
| } |
| |
| } |
| } |
| |
| |
| |
| // Count of # of packets in process. |
| // this.useCount++; |
| if (sipStack.threadPoolSize != -1) { |
| // Note: the only condition watched for by threads |
| // synchronizing on the messageQueue member is that it is |
| // not empty. As soon as you introduce some other |
| // condition you will have to call notifyAll instead of |
| // notify below. |
| |
| synchronized (this.messageQueue) { |
| // was addLast |
| this.messageQueue.add(packet); |
| this.messageQueue.notify(); |
| } |
| } else { |
| new UDPMessageChannel(sipStack, this, packet); |
| } |
| } catch (SocketTimeoutException ex) { |
| // This socket timeout alows us to ping the thread auditor periodically |
| } catch (SocketException ex) { |
| if (sipStack.isLoggingEnabled()) |
| getSIPStack().getStackLogger() |
| .logDebug("UDPMessageProcessor: Stopping"); |
| isRunning = false; |
| // The notifyAll should be in a synchronized block. |
| // ( bug report by Niklas Uhrberg ). |
| synchronized (this.messageQueue) { |
| this.messageQueue.notifyAll(); |
| } |
| } catch (IOException ex) { |
| isRunning = false; |
| ex.printStackTrace(); |
| if (sipStack.isLoggingEnabled()) |
| getSIPStack().getStackLogger() |
| .logDebug("UDPMessageProcessor: Got an IO Exception"); |
| } catch (Exception ex) { |
| if (sipStack.isLoggingEnabled()) |
| getSIPStack().getStackLogger() |
| .logDebug("UDPMessageProcessor: Unexpected Exception - quitting"); |
| InternalErrorHandler.handleException(ex); |
| return; |
| } |
| } |
| } |
| |
| /** |
| * Shut down the message processor. Close the socket for recieving incoming |
| * messages. |
| */ |
| public void stop() { |
| synchronized (this.messageQueue) { |
| this.isRunning = false; |
| this.messageQueue.notifyAll(); |
| sock.close(); |
| |
| |
| } |
| } |
| |
| /** |
| * Return the transport string. |
| * |
| * @return the transport string |
| */ |
| public String getTransport() { |
| return "udp"; |
| } |
| |
| /** |
| * Returns the stack. |
| * |
| * @return my sip stack. |
| */ |
| public SIPTransactionStack getSIPStack() { |
| return sipStack; |
| } |
| |
| /** |
| * Create and return new TCPMessageChannel for the given host/port. |
| */ |
| public MessageChannel createMessageChannel(HostPort targetHostPort) |
| throws UnknownHostException { |
| return new UDPMessageChannel(targetHostPort.getInetAddress(), |
| targetHostPort.getPort(), sipStack, this); |
| } |
| |
| public MessageChannel createMessageChannel(InetAddress host, int port) |
| throws IOException { |
| return new UDPMessageChannel(host, port, sipStack, this); |
| } |
| |
| /** |
| * Default target port for UDP |
| */ |
| public int getDefaultTargetPort() { |
| return 5060; |
| } |
| |
| /** |
| * UDP is not a secure protocol. |
| */ |
| public boolean isSecure() { |
| return false; |
| } |
| |
| /** |
| * UDP can handle a message as large as the MAX_DATAGRAM_SIZE. |
| */ |
| public int getMaximumMessageSize() { |
| return 8*1024; |
| } |
| |
| /** |
| * Return true if there are any messages in use. |
| */ |
| public boolean inUse() { |
| synchronized (messageQueue) { |
| return messageQueue.size() != 0; |
| } |
| } |
| |
| } |