DeepSeek Java DatagramChannel 完整实现
·
Java DatagramChannel 完整实现
下面是一个完整的 DatagramChannel 实现示例,包括 UDP 服务器和客户端:
1. 基础 DatagramChannel 演示
import java.io.IOException;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
public class DatagramChannelDemo {
public static void main(String[] args) {
System.out.println("=== Java DatagramChannel 演示 ===\n");
if (args.length > 0 && args[0].equals("server")) {
runUDPServer();
} else {
runUDPClient();
}
}
/**
* UDP 服务器实现
*/
private static void runUDPServer() {
System.out.println("启动 UDP 服务器...");
try (DatagramChannel serverChannel = DatagramChannel.open()) {
// 绑定服务器端口
serverChannel.bind(new InetSocketAddress(9999));
serverChannel.configureBlocking(false); // 非阻塞模式
System.out.println("服务器监听端口: 9999");
System.out.println("等待客户端消息...\n");
ByteBuffer buffer = ByteBuffer.allocate(1024);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_READ);
while (true) {
// 使用选择器等待事件
int readyChannels = selector.select(5000); // 5秒超时
if (readyChannels == 0) {
System.out.println("等待客户端连接...");
continue;
}
for (SelectionKey key : selector.selectedKeys()) {
if (key.isReadable()) {
DatagramChannel channel = (DatagramChannel) key.channel();
buffer.clear();
// 接收数据包
SocketAddress clientAddress = channel.receive(buffer);
if (clientAddress != null) {
buffer.flip();
String message = StandardCharsets.UTF_8.decode(buffer).toString();
InetSocketAddress client = (InetSocketAddress) clientAddress;
System.out.printf("收到来自 %s:%d 的消息: %s\n",
client.getAddress().getHostAddress(),
client.getPort(),
message);
// 发送响应
if (!message.trim().equalsIgnoreCase("exit")) {
String response = "服务器已收到您的消息: " + message;
buffer.clear();
buffer.put(response.getBytes(StandardCharsets.UTF_8));
buffer.flip();
channel.send(buffer, clientAddress);
System.out.println("已发送响应\n");
} else {
System.out.println("客户端请求退出,结束通信\n");
String response = "服务器关闭连接";
buffer.clear();
buffer.put(response.getBytes(StandardCharsets.UTF_8));
buffer.flip();
channel.send(buffer, clientAddress);
}
}
}
}
selector.selectedKeys().clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* UDP 客户端实现
*/
private static void runUDPClient() {
System.out.println("启动 UDP 客户端...");
try (DatagramChannel clientChannel = DatagramChannel.open()) {
clientChannel.configureBlocking(false);
// 连接到服务器(UDP是"无连接"的,这里只是设置默认目标)
SocketAddress serverAddress = new InetSocketAddress("localhost", 9999);
clientChannel.connect(serverAddress);
System.out.println("连接到服务器: localhost:9999");
System.out.println("输入消息 (输入 'exit' 退出):\n");
ByteBuffer buffer = ByteBuffer.allocate(1024);
Selector selector = Selector.open();
clientChannel.register(selector, SelectionKey.OP_READ);
// 读取用户输入
BufferedReader userInput = new BufferedReader(
new InputStreamReader(System.in));
while (true) {
System.out.print("客户端> ");
String message = userInput.readLine();
if (message == null || message.trim().isEmpty()) {
continue;
}
// 发送消息
buffer.clear();
buffer.put(message.getBytes(StandardCharsets.UTF_8));
buffer.flip();
int bytesSent = clientChannel.send(buffer, serverAddress);
System.out.println("已发送 " + bytesSent + " 字节");
if (message.trim().equalsIgnoreCase("exit")) {
System.out.println("正在退出...");
// 等待服务器的响应
for (int i = 0; i < 3; i++) {
if (selector.select(1000) > 0) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isReadable()) {
buffer.clear();
clientChannel.receive(buffer);
buffer.flip();
String response = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("服务器响应: " + response);
}
}
break;
}
}
break;
}
// 等待服务器响应
System.out.println("等待服务器响应...");
selector.select(5000);
for (SelectionKey key : selector.selectedKeys()) {
if (key.isReadable()) {
buffer.clear();
clientChannel.receive(buffer);
buffer.flip();
String response = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("服务器响应: " + response);
}
}
selector.selectedKeys().clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
2. 高级 DatagramChannel 实现
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.*;
/**
* 高级 UDP 服务器实现
* 支持多客户端、广播、组播等功能
*/
public class AdvancedUDPServer {
private static final int DEFAULT_PORT = 8888;
private static final int BUFFER_SIZE = 65507; // UDP 最大数据包大小
private static final Charset CHARSET = StandardCharsets.UTF_8;
private DatagramChannel channel;
private Selector selector;
private boolean running = false;
private Map<SocketAddress, ClientInfo> clients = new ConcurrentHashMap<>();
// 客户端信息类
private static class ClientInfo {
SocketAddress address;
long lastActive;
int messageCount;
ClientInfo(SocketAddress address) {
this.address = address;
this.lastActive = System.currentTimeMillis();
this.messageCount = 0;
}
void updateActivity() {
this.lastActive = System.currentTimeMillis();
this.messageCount++;
}
}
/**
* 启动服务器
*/
public void start(int port) throws IOException {
channel = DatagramChannel.open();
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(port));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);
running = true;
System.out.printf("UDP 服务器启动在端口 %d\n", port);
// 启动清理线程
new Thread(this::cleanupInactiveClients).start();
// 启动统计线程
new Thread(this::printStatistics).start();
// 主事件循环
runEventLoop();
}
/**
* 事件循环处理
*/
private void runEventLoop() {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
try {
while (running) {
int readyChannels = selector.select(1000); // 1秒超时
if (readyChannels == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
handleRead(key, buffer);
}
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
stop();
}
}
/**
* 处理读取事件
*/
private void handleRead(SelectionKey key, ByteBuffer buffer) throws IOException {
DatagramChannel channel = (DatagramChannel) key.channel();
buffer.clear();
SocketAddress clientAddress = channel.receive(buffer);
if (clientAddress != null) {
buffer.flip();
// 处理数据包
processPacket(clientAddress, buffer);
}
}
/**
* 处理接收到的数据包
*/
private void processPacket(SocketAddress clientAddress, ByteBuffer buffer) {
try {
// 记录客户端活动
ClientInfo clientInfo = clients.computeIfAbsent(
clientAddress, ClientInfo::new);
clientInfo.updateActivity();
// 解析消息
String message = CHARSET.decode(buffer).toString().trim();
InetSocketAddress client = (InetSocketAddress) clientAddress;
System.out.printf("[%tT] 收到来自 %s:%d 的消息: %s\n",
new Date(),
client.getAddress().getHostAddress(),
client.getPort(),
message);
// 处理命令
if (message.startsWith("/")) {
handleCommand(clientAddress, message);
} else {
// 广播消息给所有客户端
broadcastMessage(clientAddress, message);
}
} catch (Exception e) {
System.err.println("处理数据包时出错: " + e.getMessage());
}
}
/**
* 处理客户端命令
*/
private void handleCommand(SocketAddress sender, String command) throws IOException {
String[] parts = command.split("\\s+", 2);
String cmd = parts[0].toLowerCase();
String args = parts.length > 1 ? parts[1] : "";
switch (cmd) {
case "/echo":
sendMessage(sender, "ECHO: " + args);
break;
case "/time":
sendMessage(sender, "服务器时间: " + new Date());
break;
case "/clients":
sendMessage(sender, "在线客户端数: " + clients.size());
break;
case "/help":
String help = "可用命令:\n" +
"/echo <message> - 回显消息\n" +
"/time - 获取服务器时间\n" +
"/clients - 查看在线客户端数\n" +
"/help - 显示此帮助";
sendMessage(sender, help);
break;
default:
sendMessage(sender, "未知命令: " + cmd);
}
}
/**
* 广播消息给所有客户端
*/
private void broadcastMessage(SocketAddress sender, String message) throws IOException {
String broadcastMsg = String.format("[广播] %s: %s",
getClientIdentifier(sender),
message);
ByteBuffer buffer = CHARSET.encode(broadcastMsg);
for (SocketAddress client : clients.keySet()) {
if (!client.equals(sender)) {
buffer.rewind();
channel.send(buffer, client);
}
}
// 给发送者确认
sendMessage(sender, "消息已广播");
}
/**
* 发送消息到指定客户端
*/
private void sendMessage(SocketAddress client, String message) throws IOException {
ByteBuffer buffer = CHARSET.encode(message);
channel.send(buffer, client);
}
/**
* 获取客户端标识
*/
private String getClientIdentifier(SocketAddress address) {
if (address instanceof InetSocketAddress) {
InetSocketAddress inetAddr = (InetSocketAddress) address;
return String.format("%s:%d",
inetAddr.getAddress().getHostAddress(),
inetAddr.getPort());
}
return address.toString();
}
/**
* 清理不活跃的客户端
*/
private void cleanupInactiveClients() {
while (running) {
try {
Thread.sleep(30000); // 30秒检查一次
long now = System.currentTimeMillis();
long inactiveThreshold = 300000; // 5分钟不活跃
Iterator<Map.Entry<SocketAddress, ClientInfo>> iterator =
clients.entrySet().iterator();
int removed = 0;
while (iterator.hasNext()) {
Map.Entry<SocketAddress, ClientInfo> entry = iterator.next();
if (now - entry.getValue().lastActive > inactiveThreshold) {
System.out.printf("移除不活跃客户端: %s\n",
getClientIdentifier(entry.getKey()));
iterator.remove();
removed++;
}
}
if (removed > 0) {
System.out.printf("清理了 %d 个不活跃客户端\n", removed);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
/**
* 打印统计信息
*/
private void printStatistics() {
while (running) {
try {
Thread.sleep(60000); // 每分钟打印一次
System.out.println("\n=== 服务器统计 ===");
System.out.println("在线客户端: " + clients.size());
if (!clients.isEmpty()) {
System.out.println("\n客户端详情:");
clients.forEach((addr, info) -> {
System.out.printf(" %s: %d 条消息, 最后活动: %tT\n",
getClientIdentifier(addr),
info.messageCount,
new Date(info.lastActive));
});
}
System.out.println("================\n");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
/**
* 停止服务器
*/
public void stop() {
running = false;
try {
if (selector != null && selector.isOpen()) {
selector.close();
}
if (channel != null && channel.isOpen()) {
channel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("服务器已停止");
}
/**
* 主方法
*/
public static void main(String[] args) {
AdvancedUDPServer server = new AdvancedUDPServer();
int port = args.length > 0 ? Integer.parseInt(args[0]) : DEFAULT_PORT;
try {
server.start(port);
} catch (IOException e) {
System.err.println("启动服务器失败: " + e.getMessage());
}
}
}
3. UDP 广播和组播示例
import java.io.IOException;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
/**
* UDP 广播和组播示例
*/
public class UDPBroadcastMulticast {
/**
* UDP 广播发送器
*/
public static class BroadcastSender {
private final DatagramChannel channel;
private final InetSocketAddress broadcastAddress;
public BroadcastSender(int port) throws IOException {
channel = DatagramChannel.open();
channel.configureBlocking(false);
// 允许广播
channel.setOption(StandardSocketOptions.SO_BROADCAST, true);
// 创建广播地址
broadcastAddress = new InetSocketAddress("255.255.255.255", port);
System.out.println("广播发送器初始化完成,目标端口: " + port);
}
public void sendBroadcast(String message) throws IOException {
ByteBuffer buffer = StandardCharsets.UTF_8.encode(message);
int bytesSent = channel.send(buffer, broadcastAddress);
System.out.printf("发送广播消息: %s (%d 字节)\n", message, bytesSent);
}
public void close() throws IOException {
channel.close();
}
}
/**
* UDP 广播接收器
*/
public static class BroadcastReceiver {
private final DatagramChannel channel;
private final int port;
private volatile boolean running = false;
public BroadcastReceiver(int port) throws IOException {
this.port = port;
channel = DatagramChannel.open();
channel.configureBlocking(false);
// 绑定到指定端口
channel.bind(new InetSocketAddress(port));
System.out.println("广播接收器初始化完成,监听端口: " + port);
}
public void start() throws IOException {
running = true;
ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("开始监听广播消息...");
while (running) {
buffer.clear();
SocketAddress sender = channel.receive(buffer);
if (sender != null) {
buffer.flip();
String message = StandardCharsets.UTF_8.decode(buffer).toString();
InetSocketAddress inetSender = (InetSocketAddress) sender;
System.out.printf("收到广播消息 [%s:%d]: %s\n",
inetSender.getAddress().getHostAddress(),
inetSender.getPort(),
message);
}
try {
Thread.sleep(100); // 避免 CPU 占用过高
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public void stop() {
running = false;
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* UDP 组播发送器
*/
public static class MulticastSender {
private final DatagramChannel channel;
private final InetSocketAddress multicastGroup;
public MulticastSender(String multicastAddress, int port) throws IOException {
channel = DatagramChannel.open(StandardProtocolFamily.INET);
channel.configureBlocking(false);
// 设置 TTL (生存时间)
channel.setOption(StandardSocketOptions.IP_MULTICAST_TTL, 1);
multicastGroup = new InetSocketAddress(
InetAddress.getByName(multicastAddress), port);
System.out.printf("组播发送器初始化完成,目标: %s:%d\n",
multicastAddress, port);
}
public void sendToGroup(String message) throws IOException {
ByteBuffer buffer = StandardCharsets.UTF_8.encode(message);
int bytesSent = channel.send(buffer, multicastGroup);
System.out.printf("发送组播消息: %s (%d 字节)\n", message, bytesSent);
}
public void close() throws IOException {
channel.close();
}
}
/**
* UDP 组播接收器
*/
public static class MulticastReceiver {
private final DatagramChannel channel;
private final InetAddress multicastGroup;
private final NetworkInterface networkInterface;
private volatile boolean running = false;
public MulticastReceiver(String multicastAddress, int port) throws IOException {
// 创建组播通道
channel = DatagramChannel.open(StandardProtocolFamily.INET);
channel.configureBlocking(false);
// 绑定到端口
channel.bind(new InetSocketAddress(port));
// 获取组播地址
multicastGroup = InetAddress.getByName(multicastAddress);
// 获取合适的网络接口
networkInterface = NetworkInterface.getByName("eth0");
if (networkInterface == null) {
networkInterface = NetworkInterface.getByInetAddress(
InetAddress.getLocalHost());
}
// 加入组播组
channel.join(multicastGroup, networkInterface);
System.out.printf("组播接收器初始化完成,加入组: %s,网络接口: %s\n",
multicastAddress, networkInterface.getDisplayName());
}
public void start() throws IOException {
running = true;
ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("开始监听组播消息...");
while (running) {
buffer.clear();
SocketAddress sender = channel.receive(buffer);
if (sender != null) {
buffer.flip();
String message = StandardCharsets.UTF_8.decode(buffer).toString();
InetSocketAddress inetSender = (InetSocketAddress) sender;
System.out.printf("收到组播消息 [%s:%d]: %s\n",
inetSender.getAddress().getHostAddress(),
inetSender.getPort(),
message);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public void stop() {
running = false;
try {
// 离开组播组
channel.leave(multicastGroup, networkInterface);
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 测试主方法
*/
public static void main(String[] args) throws Exception {
System.out.println("=== UDP 广播和组播演示 ===\n");
if (args.length > 0) {
switch (args[0]) {
case "broadcast-send":
testBroadcastSend();
break;
case "broadcast-receive":
testBroadcastReceive();
break;
case "multicast-send":
testMulticastSend();
break;
case "multicast-receive":
testMulticastReceive();
break;
default:
System.out.println("用法: java UDPBroadcastMulticast [mode]");
System.out.println("模式: broadcast-send, broadcast-receive, multicast-send, multicast-receive");
}
} else {
System.out.println("请指定运行模式");
}
}
private static void testBroadcastSend() throws Exception {
BroadcastSender sender = new BroadcastSender(9998);
for (int i = 0; i < 5; i++) {
sender.sendBroadcast("广播消息 #" + (i + 1) + " - " + new Date());
Thread.sleep(2000);
}
sender.close();
}
private static void testBroadcastReceive() throws Exception {
BroadcastReceiver receiver = new BroadcastReceiver(9998);
receiver.start();
}
private static void testMulticastSend() throws Exception {
// 使用私有组播地址 224.0.0.0 - 239.255.255.255
MulticastSender sender = new MulticastSender("230.0.0.1", 9999);
for (int i = 0; i < 5; i++) {
sender.sendToGroup("组播消息 #" + (i + 1) + " - " + new Date());
Thread.sleep(2000);
}
sender.close();
}
private static void testMulticastReceive() throws Exception {
MulticastReceiver receiver = new MulticastReceiver("230.0.0.1", 9999);
receiver.start();
}
}
4. 实用的 UDP 工具类
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.concurrent.*;
/**
* 实用的 UDP 工具类
*/
public class UDPUtils {
private static final Charset UTF8 = StandardCharsets.UTF_8;
/**
* 发送 UDP 数据包(同步)
*/
public static void sendPacket(String host, int port, String message) throws IOException {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = UTF8.encode(message);
InetSocketAddress address = new InetSocketAddress(host, port);
channel.send(buffer, address);
}
}
/**
* 发送 UDP 数据包(异步)
*/
public static CompletableFuture<Void> sendPacketAsync(
String host, int port, String message) {
return CompletableFuture.runAsync(() -> {
try {
sendPacket(host, port, message);
} catch (IOException e) {
throw new CompletionException(e);
}
});
}
/**
* 接收 UDP 数据包(带超时)
*/
public static String receivePacket(int port, int timeoutMillis)
throws IOException, TimeoutException {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(port));
ByteBuffer buffer = ByteBuffer.allocate(65507);
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);
long startTime = System.currentTimeMillis();
while (true) {
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed > timeoutMillis) {
throw new TimeoutException("接收超时");
}
int readyChannels = selector.select(timeoutMillis - elapsed);
if (readyChannels > 0) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isReadable()) {
buffer.clear();
channel.receive(buffer);
buffer.flip();
return UTF8.decode(buffer).toString();
}
}
selector.selectedKeys().clear();
}
}
}
}
/**
* 发现网络中的 UDP 服务
*/
public static List<InetSocketAddress> discoverServices(
int broadcastPort, String discoveryMessage, int timeoutMillis)
throws IOException {
List<InetSocketAddress> discovered = new CopyOnWriteArrayList<>();
// 创建接收器
Thread receiver = new Thread(() -> {
try (DatagramChannel receiverChannel = DatagramChannel.open()) {
receiverChannel.configureBlocking(true);
receiverChannel.socket().setSoTimeout(timeoutMillis);
receiverChannel.bind(new InetSocketAddress(broadcastPort));
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (!Thread.currentThread().isInterrupted()) {
try {
SocketAddress sender = receiverChannel.receive(buffer);
buffer.flip();
String response = UTF8.decode(buffer).toString();
if (response.contains("DISCOVERY_RESPONSE")) {
discovered.add((InetSocketAddress) sender);
}
buffer.clear();
} catch (SocketTimeoutException e) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
});
receiver.start();
// 发送发现广播
try (DatagramChannel senderChannel = DatagramChannel.open()) {
senderChannel.configureBlocking(false);
senderChannel.setOption(StandardSocketOptions.SO_BROADCAST, true);
ByteBuffer buffer = UTF8.encode(discoveryMessage);
InetSocketAddress broadcastAddress =
new InetSocketAddress("255.255.255.255", broadcastPort);
senderChannel.send(buffer, broadcastAddress);
}
// 等待接收完成
try {
receiver.join(timeoutMillis);
receiver.interrupt();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return discovered;
}
/**
* UDP 端口扫描器
*/
public static List<Integer> scanPorts(String host, int startPort, int endPort)
throws IOException {
List<Integer> openPorts = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(50);
for (int port = startPort; port <= endPort; port++) {
final int currentPort = port;
executor.submit(() -> {
try {
sendPacket(host, currentPort, "PING");
System.out.printf("端口 %d 可能开放\n", currentPort);
openPorts.add(currentPort);
} catch (IOException e) {
// 端口可能关闭
}
});
}
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return openPorts;
}
/**
* UDP 代理服务器
*/
public static class UDPProxy {
private final int listenPort;
private final String targetHost;
private final int targetPort;
private volatile boolean running = false;
public UDPProxy(int listenPort, String targetHost, int targetPort) {
this.listenPort = listenPort;
this.targetHost = targetHost;
this.targetPort = targetPort;
}
public void start() throws IOException {
running = true;
try (DatagramChannel clientChannel = DatagramChannel.open();
DatagramChannel serverChannel = DatagramChannel.open()) {
clientChannel.bind(new InetSocketAddress(listenPort));
clientChannel.configureBlocking(false);
InetSocketAddress targetAddress =
new InetSocketAddress(targetHost, targetPort);
System.out.printf("UDP 代理启动: %d -> %s:%d\n",
listenPort, targetHost, targetPort);
ByteBuffer buffer = ByteBuffer.allocate(65507);
Map<SocketAddress, SocketAddress> routeTable = new ConcurrentHashMap<>();
while (running) {
buffer.clear();
SocketAddress clientAddress = clientChannel.receive(buffer);
if (clientAddress != null) {
buffer.flip();
// 记录路由
routeTable.put(clientAddress, targetAddress);
// 转发到目标服务器
serverChannel.send(buffer, targetAddress);
System.out.printf("转发: %s -> %s:%d (%d 字节)\n",
clientAddress, targetHost, targetPort, buffer.limit());
}
// 接收服务器响应并转发回客户端
buffer.clear();
SocketAddress serverResponseAddress = serverChannel.receive(buffer);
if (serverResponseAddress != null &&
serverResponseAddress.equals(targetAddress)) {
buffer.flip();
// 查找对应的客户端
for (Map.Entry<SocketAddress, SocketAddress> entry :
routeTable.entrySet()) {
if (entry.getValue().equals(serverResponseAddress)) {
clientChannel.send(buffer, entry.getKey());
System.out.printf("响应: %s:%d -> %s (%d 字节)\n",
targetHost, targetPort, entry.getKey(), buffer.limit());
break;
}
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
public void stop() {
running = false;
}
}
}
使用说明
1. 基本使用
// 启动服务器
java DatagramChannelDemo server
// 启动客户端
java DatagramChannelDemo
2. 高级功能
// 启动高级服务器
java AdvancedUDPServer 8888
// 测试广播
java UDPBroadcastMulticast broadcast-send
java UDPBroadcastMulticast broadcast-receive
// 测试组播
java UDPBroadcastMulticast multicast-send
java UDPBroadcastMulticast multicast-receive
3. 主要特性
- 非阻塞IO:使用
Selector实现高效的多客户端处理 - 广播功能:支持向整个子网发送消息
- 组播功能:支持向特定的组播组发送消息
- 连接管理:自动清理不活跃的客户端
- 命令处理:支持服务器端命令
- 统计功能:实时监控服务器状态
- 错误处理:完善的异常处理机制
注意事项
- UDP 是无连接的,不保证数据包的顺序和可靠性
- 单次 UDP 数据包最大为 65507 字节
- 广播和组播需要适当的网络配置
- 在生产环境中需要考虑安全性问题
这个实现展示了 DatagramChannel 的核心功能和高级用法,可以根据实际需求进行调整和扩展。
更多推荐



所有评论(0)