Java DatagramChannel 完整实现

下面是一个完整的 DatagramChannel 实现示例,包括 UDP 服务器和客户端:

1. 基础 DatagramChannel 演示

import java.io.IOException;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;

public class DatagramChannelDemo {
    
    public static void main(String[] args) {
        System.out.println("=== Java DatagramChannel 演示 ===\n");
        
        if (args.length > 0 && args[0].equals("server")) {
            runUDPServer();
        } else {
            runUDPClient();
        }
    }
    
    /**
     * UDP 服务器实现
     */
    private static void runUDPServer() {
        System.out.println("启动 UDP 服务器...");
        
        try (DatagramChannel serverChannel = DatagramChannel.open()) {
            // 绑定服务器端口
            serverChannel.bind(new InetSocketAddress(9999));
            serverChannel.configureBlocking(false); // 非阻塞模式
            
            System.out.println("服务器监听端口: 9999");
            System.out.println("等待客户端消息...\n");
            
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            Selector selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_READ);
            
            while (true) {
                // 使用选择器等待事件
                int readyChannels = selector.select(5000); // 5秒超时
                
                if (readyChannels == 0) {
                    System.out.println("等待客户端连接...");
                    continue;
                }
                
                for (SelectionKey key : selector.selectedKeys()) {
                    if (key.isReadable()) {
                        DatagramChannel channel = (DatagramChannel) key.channel();
                        buffer.clear();
                        
                        // 接收数据包
                        SocketAddress clientAddress = channel.receive(buffer);
                        if (clientAddress != null) {
                            buffer.flip();
                            String message = StandardCharsets.UTF_8.decode(buffer).toString();
                            InetSocketAddress client = (InetSocketAddress) clientAddress;
                            
                            System.out.printf("收到来自 %s:%d 的消息: %s\n",
                                    client.getAddress().getHostAddress(),
                                    client.getPort(),
                                    message);
                            
                            // 发送响应
                            if (!message.trim().equalsIgnoreCase("exit")) {
                                String response = "服务器已收到您的消息: " + message;
                                buffer.clear();
                                buffer.put(response.getBytes(StandardCharsets.UTF_8));
                                buffer.flip();
                                channel.send(buffer, clientAddress);
                                System.out.println("已发送响应\n");
                            } else {
                                System.out.println("客户端请求退出,结束通信\n");
                                String response = "服务器关闭连接";
                                buffer.clear();
                                buffer.put(response.getBytes(StandardCharsets.UTF_8));
                                buffer.flip();
                                channel.send(buffer, clientAddress);
                            }
                        }
                    }
                }
                selector.selectedKeys().clear();
            }
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * UDP 客户端实现
     */
    private static void runUDPClient() {
        System.out.println("启动 UDP 客户端...");
        
        try (DatagramChannel clientChannel = DatagramChannel.open()) {
            clientChannel.configureBlocking(false);
            
            // 连接到服务器(UDP是"无连接"的,这里只是设置默认目标)
            SocketAddress serverAddress = new InetSocketAddress("localhost", 9999);
            clientChannel.connect(serverAddress);
            
            System.out.println("连接到服务器: localhost:9999");
            System.out.println("输入消息 (输入 'exit' 退出):\n");
            
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            Selector selector = Selector.open();
            clientChannel.register(selector, SelectionKey.OP_READ);
            
            // 读取用户输入
            BufferedReader userInput = new BufferedReader(
                new InputStreamReader(System.in));
            
            while (true) {
                System.out.print("客户端> ");
                String message = userInput.readLine();
                
                if (message == null || message.trim().isEmpty()) {
                    continue;
                }
                
                // 发送消息
                buffer.clear();
                buffer.put(message.getBytes(StandardCharsets.UTF_8));
                buffer.flip();
                
                int bytesSent = clientChannel.send(buffer, serverAddress);
                System.out.println("已发送 " + bytesSent + " 字节");
                
                if (message.trim().equalsIgnoreCase("exit")) {
                    System.out.println("正在退出...");
                    
                    // 等待服务器的响应
                    for (int i = 0; i < 3; i++) {
                        if (selector.select(1000) > 0) {
                            for (SelectionKey key : selector.selectedKeys()) {
                                if (key.isReadable()) {
                                    buffer.clear();
                                    clientChannel.receive(buffer);
                                    buffer.flip();
                                    String response = StandardCharsets.UTF_8.decode(buffer).toString();
                                    System.out.println("服务器响应: " + response);
                                }
                            }
                            break;
                        }
                    }
                    break;
                }
                
                // 等待服务器响应
                System.out.println("等待服务器响应...");
                selector.select(5000);
                
                for (SelectionKey key : selector.selectedKeys()) {
                    if (key.isReadable()) {
                        buffer.clear();
                        clientChannel.receive(buffer);
                        buffer.flip();
                        String response = StandardCharsets.UTF_8.decode(buffer).toString();
                        System.out.println("服务器响应: " + response);
                    }
                }
                selector.selectedKeys().clear();
            }
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2. 高级 DatagramChannel 实现

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.*;

/**
 * 高级 UDP 服务器实现
 * 支持多客户端、广播、组播等功能
 */
public class AdvancedUDPServer {
    
    private static final int DEFAULT_PORT = 8888;
    private static final int BUFFER_SIZE = 65507; // UDP 最大数据包大小
    private static final Charset CHARSET = StandardCharsets.UTF_8;
    
    private DatagramChannel channel;
    private Selector selector;
    private boolean running = false;
    private Map<SocketAddress, ClientInfo> clients = new ConcurrentHashMap<>();
    
    // 客户端信息类
    private static class ClientInfo {
        SocketAddress address;
        long lastActive;
        int messageCount;
        
        ClientInfo(SocketAddress address) {
            this.address = address;
            this.lastActive = System.currentTimeMillis();
            this.messageCount = 0;
        }
        
        void updateActivity() {
            this.lastActive = System.currentTimeMillis();
            this.messageCount++;
        }
    }
    
    /**
     * 启动服务器
     */
    public void start(int port) throws IOException {
        channel = DatagramChannel.open();
        channel.configureBlocking(false);
        channel.bind(new InetSocketAddress(port));
        
        selector = Selector.open();
        channel.register(selector, SelectionKey.OP_READ);
        
        running = true;
        System.out.printf("UDP 服务器启动在端口 %d\n", port);
        
        // 启动清理线程
        new Thread(this::cleanupInactiveClients).start();
        
        // 启动统计线程
        new Thread(this::printStatistics).start();
        
        // 主事件循环
        runEventLoop();
    }
    
    /**
     * 事件循环处理
     */
    private void runEventLoop() {
        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        
        try {
            while (running) {
                int readyChannels = selector.select(1000); // 1秒超时
                
                if (readyChannels == 0) {
                    continue;
                }
                
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    
                    if (key.isReadable()) {
                        handleRead(key, buffer);
                    }
                    
                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            stop();
        }
    }
    
    /**
     * 处理读取事件
     */
    private void handleRead(SelectionKey key, ByteBuffer buffer) throws IOException {
        DatagramChannel channel = (DatagramChannel) key.channel();
        buffer.clear();
        
        SocketAddress clientAddress = channel.receive(buffer);
        
        if (clientAddress != null) {
            buffer.flip();
            
            // 处理数据包
            processPacket(clientAddress, buffer);
        }
    }
    
    /**
     * 处理接收到的数据包
     */
    private void processPacket(SocketAddress clientAddress, ByteBuffer buffer) {
        try {
            // 记录客户端活动
            ClientInfo clientInfo = clients.computeIfAbsent(
                clientAddress, ClientInfo::new);
            clientInfo.updateActivity();
            
            // 解析消息
            String message = CHARSET.decode(buffer).toString().trim();
            InetSocketAddress client = (InetSocketAddress) clientAddress;
            
            System.out.printf("[%tT] 收到来自 %s:%d 的消息: %s\n",
                    new Date(),
                    client.getAddress().getHostAddress(),
                    client.getPort(),
                    message);
            
            // 处理命令
            if (message.startsWith("/")) {
                handleCommand(clientAddress, message);
            } else {
                // 广播消息给所有客户端
                broadcastMessage(clientAddress, message);
            }
            
        } catch (Exception e) {
            System.err.println("处理数据包时出错: " + e.getMessage());
        }
    }
    
    /**
     * 处理客户端命令
     */
    private void handleCommand(SocketAddress sender, String command) throws IOException {
        String[] parts = command.split("\\s+", 2);
        String cmd = parts[0].toLowerCase();
        String args = parts.length > 1 ? parts[1] : "";
        
        switch (cmd) {
            case "/echo":
                sendMessage(sender, "ECHO: " + args);
                break;
                
            case "/time":
                sendMessage(sender, "服务器时间: " + new Date());
                break;
                
            case "/clients":
                sendMessage(sender, "在线客户端数: " + clients.size());
                break;
                
            case "/help":
                String help = "可用命令:\n" +
                            "/echo <message> - 回显消息\n" +
                            "/time - 获取服务器时间\n" +
                            "/clients - 查看在线客户端数\n" +
                            "/help - 显示此帮助";
                sendMessage(sender, help);
                break;
                
            default:
                sendMessage(sender, "未知命令: " + cmd);
        }
    }
    
    /**
     * 广播消息给所有客户端
     */
    private void broadcastMessage(SocketAddress sender, String message) throws IOException {
        String broadcastMsg = String.format("[广播] %s: %s",
                getClientIdentifier(sender),
                message);
        
        ByteBuffer buffer = CHARSET.encode(broadcastMsg);
        
        for (SocketAddress client : clients.keySet()) {
            if (!client.equals(sender)) {
                buffer.rewind();
                channel.send(buffer, client);
            }
        }
        
        // 给发送者确认
        sendMessage(sender, "消息已广播");
    }
    
    /**
     * 发送消息到指定客户端
     */
    private void sendMessage(SocketAddress client, String message) throws IOException {
        ByteBuffer buffer = CHARSET.encode(message);
        channel.send(buffer, client);
    }
    
    /**
     * 获取客户端标识
     */
    private String getClientIdentifier(SocketAddress address) {
        if (address instanceof InetSocketAddress) {
            InetSocketAddress inetAddr = (InetSocketAddress) address;
            return String.format("%s:%d",
                    inetAddr.getAddress().getHostAddress(),
                    inetAddr.getPort());
        }
        return address.toString();
    }
    
    /**
     * 清理不活跃的客户端
     */
    private void cleanupInactiveClients() {
        while (running) {
            try {
                Thread.sleep(30000); // 30秒检查一次
                
                long now = System.currentTimeMillis();
                long inactiveThreshold = 300000; // 5分钟不活跃
                
                Iterator<Map.Entry<SocketAddress, ClientInfo>> iterator = 
                    clients.entrySet().iterator();
                
                int removed = 0;
                while (iterator.hasNext()) {
                    Map.Entry<SocketAddress, ClientInfo> entry = iterator.next();
                    if (now - entry.getValue().lastActive > inactiveThreshold) {
                        System.out.printf("移除不活跃客户端: %s\n", 
                                getClientIdentifier(entry.getKey()));
                        iterator.remove();
                        removed++;
                    }
                }
                
                if (removed > 0) {
                    System.out.printf("清理了 %d 个不活跃客户端\n", removed);
                }
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    /**
     * 打印统计信息
     */
    private void printStatistics() {
        while (running) {
            try {
                Thread.sleep(60000); // 每分钟打印一次
                
                System.out.println("\n=== 服务器统计 ===");
                System.out.println("在线客户端: " + clients.size());
                
                if (!clients.isEmpty()) {
                    System.out.println("\n客户端详情:");
                    clients.forEach((addr, info) -> {
                        System.out.printf("  %s: %d 条消息, 最后活动: %tT\n",
                                getClientIdentifier(addr),
                                info.messageCount,
                                new Date(info.lastActive));
                    });
                }
                System.out.println("================\n");
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    /**
     * 停止服务器
     */
    public void stop() {
        running = false;
        
        try {
            if (selector != null && selector.isOpen()) {
                selector.close();
            }
            if (channel != null && channel.isOpen()) {
                channel.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        System.out.println("服务器已停止");
    }
    
    /**
     * 主方法
     */
    public static void main(String[] args) {
        AdvancedUDPServer server = new AdvancedUDPServer();
        int port = args.length > 0 ? Integer.parseInt(args[0]) : DEFAULT_PORT;
        
        try {
            server.start(port);
        } catch (IOException e) {
            System.err.println("启动服务器失败: " + e.getMessage());
        }
    }
}

3. UDP 广播和组播示例

import java.io.IOException;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;

/**
 * UDP 广播和组播示例
 */
public class UDPBroadcastMulticast {
    
    /**
     * UDP 广播发送器
     */
    public static class BroadcastSender {
        private final DatagramChannel channel;
        private final InetSocketAddress broadcastAddress;
        
        public BroadcastSender(int port) throws IOException {
            channel = DatagramChannel.open();
            channel.configureBlocking(false);
            
            // 允许广播
            channel.setOption(StandardSocketOptions.SO_BROADCAST, true);
            
            // 创建广播地址
            broadcastAddress = new InetSocketAddress("255.255.255.255", port);
            
            System.out.println("广播发送器初始化完成,目标端口: " + port);
        }
        
        public void sendBroadcast(String message) throws IOException {
            ByteBuffer buffer = StandardCharsets.UTF_8.encode(message);
            
            int bytesSent = channel.send(buffer, broadcastAddress);
            System.out.printf("发送广播消息: %s (%d 字节)\n", message, bytesSent);
        }
        
        public void close() throws IOException {
            channel.close();
        }
    }
    
    /**
     * UDP 广播接收器
     */
    public static class BroadcastReceiver {
        private final DatagramChannel channel;
        private final int port;
        private volatile boolean running = false;
        
        public BroadcastReceiver(int port) throws IOException {
            this.port = port;
            channel = DatagramChannel.open();
            channel.configureBlocking(false);
            
            // 绑定到指定端口
            channel.bind(new InetSocketAddress(port));
            
            System.out.println("广播接收器初始化完成,监听端口: " + port);
        }
        
        public void start() throws IOException {
            running = true;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            
            System.out.println("开始监听广播消息...");
            
            while (running) {
                buffer.clear();
                SocketAddress sender = channel.receive(buffer);
                
                if (sender != null) {
                    buffer.flip();
                    String message = StandardCharsets.UTF_8.decode(buffer).toString();
                    InetSocketAddress inetSender = (InetSocketAddress) sender;
                    
                    System.out.printf("收到广播消息 [%s:%d]: %s\n",
                            inetSender.getAddress().getHostAddress(),
                            inetSender.getPort(),
                            message);
                }
                
                try {
                    Thread.sleep(100); // 避免 CPU 占用过高
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        public void stop() {
            running = false;
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * UDP 组播发送器
     */
    public static class MulticastSender {
        private final DatagramChannel channel;
        private final InetSocketAddress multicastGroup;
        
        public MulticastSender(String multicastAddress, int port) throws IOException {
            channel = DatagramChannel.open(StandardProtocolFamily.INET);
            channel.configureBlocking(false);
            
            // 设置 TTL (生存时间)
            channel.setOption(StandardSocketOptions.IP_MULTICAST_TTL, 1);
            
            multicastGroup = new InetSocketAddress(
                InetAddress.getByName(multicastAddress), port);
            
            System.out.printf("组播发送器初始化完成,目标: %s:%d\n", 
                    multicastAddress, port);
        }
        
        public void sendToGroup(String message) throws IOException {
            ByteBuffer buffer = StandardCharsets.UTF_8.encode(message);
            
            int bytesSent = channel.send(buffer, multicastGroup);
            System.out.printf("发送组播消息: %s (%d 字节)\n", message, bytesSent);
        }
        
        public void close() throws IOException {
            channel.close();
        }
    }
    
    /**
     * UDP 组播接收器
     */
    public static class MulticastReceiver {
        private final DatagramChannel channel;
        private final InetAddress multicastGroup;
        private final NetworkInterface networkInterface;
        private volatile boolean running = false;
        
        public MulticastReceiver(String multicastAddress, int port) throws IOException {
            // 创建组播通道
            channel = DatagramChannel.open(StandardProtocolFamily.INET);
            channel.configureBlocking(false);
            
            // 绑定到端口
            channel.bind(new InetSocketAddress(port));
            
            // 获取组播地址
            multicastGroup = InetAddress.getByName(multicastAddress);
            
            // 获取合适的网络接口
            networkInterface = NetworkInterface.getByName("eth0");
            if (networkInterface == null) {
                networkInterface = NetworkInterface.getByInetAddress(
                    InetAddress.getLocalHost());
            }
            
            // 加入组播组
            channel.join(multicastGroup, networkInterface);
            
            System.out.printf("组播接收器初始化完成,加入组: %s,网络接口: %s\n",
                    multicastAddress, networkInterface.getDisplayName());
        }
        
        public void start() throws IOException {
            running = true;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            
            System.out.println("开始监听组播消息...");
            
            while (running) {
                buffer.clear();
                SocketAddress sender = channel.receive(buffer);
                
                if (sender != null) {
                    buffer.flip();
                    String message = StandardCharsets.UTF_8.decode(buffer).toString();
                    InetSocketAddress inetSender = (InetSocketAddress) sender;
                    
                    System.out.printf("收到组播消息 [%s:%d]: %s\n",
                            inetSender.getAddress().getHostAddress(),
                            inetSender.getPort(),
                            message);
                }
                
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        public void stop() {
            running = false;
            try {
                // 离开组播组
                channel.leave(multicastGroup, networkInterface);
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 测试主方法
     */
    public static void main(String[] args) throws Exception {
        System.out.println("=== UDP 广播和组播演示 ===\n");
        
        if (args.length > 0) {
            switch (args[0]) {
                case "broadcast-send":
                    testBroadcastSend();
                    break;
                case "broadcast-receive":
                    testBroadcastReceive();
                    break;
                case "multicast-send":
                    testMulticastSend();
                    break;
                case "multicast-receive":
                    testMulticastReceive();
                    break;
                default:
                    System.out.println("用法: java UDPBroadcastMulticast [mode]");
                    System.out.println("模式: broadcast-send, broadcast-receive, multicast-send, multicast-receive");
            }
        } else {
            System.out.println("请指定运行模式");
        }
    }
    
    private static void testBroadcastSend() throws Exception {
        BroadcastSender sender = new BroadcastSender(9998);
        
        for (int i = 0; i < 5; i++) {
            sender.sendBroadcast("广播消息 #" + (i + 1) + " - " + new Date());
            Thread.sleep(2000);
        }
        
        sender.close();
    }
    
    private static void testBroadcastReceive() throws Exception {
        BroadcastReceiver receiver = new BroadcastReceiver(9998);
        receiver.start();
    }
    
    private static void testMulticastSend() throws Exception {
        // 使用私有组播地址 224.0.0.0 - 239.255.255.255
        MulticastSender sender = new MulticastSender("230.0.0.1", 9999);
        
        for (int i = 0; i < 5; i++) {
            sender.sendToGroup("组播消息 #" + (i + 1) + " - " + new Date());
            Thread.sleep(2000);
        }
        
        sender.close();
    }
    
    private static void testMulticastReceive() throws Exception {
        MulticastReceiver receiver = new MulticastReceiver("230.0.0.1", 9999);
        receiver.start();
    }
}

4. 实用的 UDP 工具类

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.concurrent.*;

/**
 * 实用的 UDP 工具类
 */
public class UDPUtils {
    
    private static final Charset UTF8 = StandardCharsets.UTF_8;
    
    /**
     * 发送 UDP 数据包(同步)
     */
    public static void sendPacket(String host, int port, String message) throws IOException {
        try (DatagramChannel channel = DatagramChannel.open()) {
            ByteBuffer buffer = UTF8.encode(message);
            InetSocketAddress address = new InetSocketAddress(host, port);
            channel.send(buffer, address);
        }
    }
    
    /**
     * 发送 UDP 数据包(异步)
     */
    public static CompletableFuture<Void> sendPacketAsync(
            String host, int port, String message) {
        
        return CompletableFuture.runAsync(() -> {
            try {
                sendPacket(host, port, message);
            } catch (IOException e) {
                throw new CompletionException(e);
            }
        });
    }
    
    /**
     * 接收 UDP 数据包(带超时)
     */
    public static String receivePacket(int port, int timeoutMillis) 
            throws IOException, TimeoutException {
        
        try (DatagramChannel channel = DatagramChannel.open()) {
            channel.configureBlocking(false);
            channel.bind(new InetSocketAddress(port));
            
            ByteBuffer buffer = ByteBuffer.allocate(65507);
            Selector selector = Selector.open();
            channel.register(selector, SelectionKey.OP_READ);
            
            long startTime = System.currentTimeMillis();
            
            while (true) {
                long elapsed = System.currentTimeMillis() - startTime;
                if (elapsed > timeoutMillis) {
                    throw new TimeoutException("接收超时");
                }
                
                int readyChannels = selector.select(timeoutMillis - elapsed);
                
                if (readyChannels > 0) {
                    for (SelectionKey key : selector.selectedKeys()) {
                        if (key.isReadable()) {
                            buffer.clear();
                            channel.receive(buffer);
                            buffer.flip();
                            return UTF8.decode(buffer).toString();
                        }
                    }
                    selector.selectedKeys().clear();
                }
            }
        }
    }
    
    /**
     * 发现网络中的 UDP 服务
     */
    public static List<InetSocketAddress> discoverServices(
            int broadcastPort, String discoveryMessage, int timeoutMillis) 
            throws IOException {
        
        List<InetSocketAddress> discovered = new CopyOnWriteArrayList<>();
        
        // 创建接收器
        Thread receiver = new Thread(() -> {
            try (DatagramChannel receiverChannel = DatagramChannel.open()) {
                receiverChannel.configureBlocking(true);
                receiverChannel.socket().setSoTimeout(timeoutMillis);
                receiverChannel.bind(new InetSocketAddress(broadcastPort));
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        SocketAddress sender = receiverChannel.receive(buffer);
                        buffer.flip();
                        String response = UTF8.decode(buffer).toString();
                        
                        if (response.contains("DISCOVERY_RESPONSE")) {
                            discovered.add((InetSocketAddress) sender);
                        }
                        
                        buffer.clear();
                    } catch (SocketTimeoutException e) {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        receiver.start();
        
        // 发送发现广播
        try (DatagramChannel senderChannel = DatagramChannel.open()) {
            senderChannel.configureBlocking(false);
            senderChannel.setOption(StandardSocketOptions.SO_BROADCAST, true);
            
            ByteBuffer buffer = UTF8.encode(discoveryMessage);
            InetSocketAddress broadcastAddress = 
                new InetSocketAddress("255.255.255.255", broadcastPort);
            
            senderChannel.send(buffer, broadcastAddress);
        }
        
        // 等待接收完成
        try {
            receiver.join(timeoutMillis);
            receiver.interrupt();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        return discovered;
    }
    
    /**
     * UDP 端口扫描器
     */
    public static List<Integer> scanPorts(String host, int startPort, int endPort) 
            throws IOException {
        
        List<Integer> openPorts = new ArrayList<>();
        ExecutorService executor = Executors.newFixedThreadPool(50);
        
        for (int port = startPort; port <= endPort; port++) {
            final int currentPort = port;
            executor.submit(() -> {
                try {
                    sendPacket(host, currentPort, "PING");
                    System.out.printf("端口 %d 可能开放\n", currentPort);
                    openPorts.add(currentPort);
                } catch (IOException e) {
                    // 端口可能关闭
                }
            });
        }
        
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        return openPorts;
    }
    
    /**
     * UDP 代理服务器
     */
    public static class UDPProxy {
        private final int listenPort;
        private final String targetHost;
        private final int targetPort;
        private volatile boolean running = false;
        
        public UDPProxy(int listenPort, String targetHost, int targetPort) {
            this.listenPort = listenPort;
            this.targetHost = targetHost;
            this.targetPort = targetPort;
        }
        
        public void start() throws IOException {
            running = true;
            
            try (DatagramChannel clientChannel = DatagramChannel.open();
                 DatagramChannel serverChannel = DatagramChannel.open()) {
                
                clientChannel.bind(new InetSocketAddress(listenPort));
                clientChannel.configureBlocking(false);
                
                InetSocketAddress targetAddress = 
                    new InetSocketAddress(targetHost, targetPort);
                
                System.out.printf("UDP 代理启动: %d -> %s:%d\n",
                        listenPort, targetHost, targetPort);
                
                ByteBuffer buffer = ByteBuffer.allocate(65507);
                Map<SocketAddress, SocketAddress> routeTable = new ConcurrentHashMap<>();
                
                while (running) {
                    buffer.clear();
                    SocketAddress clientAddress = clientChannel.receive(buffer);
                    
                    if (clientAddress != null) {
                        buffer.flip();
                        
                        // 记录路由
                        routeTable.put(clientAddress, targetAddress);
                        
                        // 转发到目标服务器
                        serverChannel.send(buffer, targetAddress);
                        
                        System.out.printf("转发: %s -> %s:%d (%d 字节)\n",
                                clientAddress, targetHost, targetPort, buffer.limit());
                    }
                    
                    // 接收服务器响应并转发回客户端
                    buffer.clear();
                    SocketAddress serverResponseAddress = serverChannel.receive(buffer);
                    
                    if (serverResponseAddress != null && 
                        serverResponseAddress.equals(targetAddress)) {
                        
                        buffer.flip();
                        
                        // 查找对应的客户端
                        for (Map.Entry<SocketAddress, SocketAddress> entry : 
                                routeTable.entrySet()) {
                            
                            if (entry.getValue().equals(serverResponseAddress)) {
                                clientChannel.send(buffer, entry.getKey());
                                
                                System.out.printf("响应: %s:%d -> %s (%d 字节)\n",
                                        targetHost, targetPort, entry.getKey(), buffer.limit());
                                break;
                            }
                        }
                    }
                    
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
        
        public void stop() {
            running = false;
        }
    }
}

使用说明

1. 基本使用

// 启动服务器
java DatagramChannelDemo server

// 启动客户端
java DatagramChannelDemo

2. 高级功能

// 启动高级服务器
java AdvancedUDPServer 8888

// 测试广播
java UDPBroadcastMulticast broadcast-send
java UDPBroadcastMulticast broadcast-receive

// 测试组播
java UDPBroadcastMulticast multicast-send
java UDPBroadcastMulticast multicast-receive

3. 主要特性

  1. 非阻塞IO:使用 Selector 实现高效的多客户端处理
  2. 广播功能:支持向整个子网发送消息
  3. 组播功能:支持向特定的组播组发送消息
  4. 连接管理:自动清理不活跃的客户端
  5. 命令处理:支持服务器端命令
  6. 统计功能:实时监控服务器状态
  7. 错误处理:完善的异常处理机制

注意事项

  1. UDP 是无连接的,不保证数据包的顺序和可靠性
  2. 单次 UDP 数据包最大为 65507 字节
  3. 广播和组播需要适当的网络配置
  4. 在生产环境中需要考虑安全性问题

这个实现展示了 DatagramChannel 的核心功能和高级用法,可以根据实际需求进行调整和扩展。

Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐